forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathback2back_test.py
More file actions
99 lines (80 loc) · 3.25 KB
/
back2back_test.py
File metadata and controls
99 lines (80 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import unittest
import time
import can
BITRATE = 500000
TIMEOUT = 0.1
INTERFACE_1 = 'virtual'
CHANNEL_1 = 0
INTERFACE_2 = 'virtual'
CHANNEL_2 = 0
class Back2BackTestCase(unittest.TestCase):
"""
Use two interfaces connected to the same CAN bus and test them against
each other.
"""
def setUp(self):
self.bus1 = can.interface.Bus(channel=CHANNEL_1,
bustype=INTERFACE_1,
bitrate=BITRATE)
self.bus2 = can.interface.Bus(channel=CHANNEL_2,
bustype=INTERFACE_2,
bitrate=BITRATE)
def tearDown(self):
self.bus1.shutdown()
self.bus2.shutdown()
def _check_received_message(self, recv_msg, sent_msg):
self.assertIsNotNone(recv_msg,
"No message was received on %s" % INTERFACE_2)
self.assertEqual(recv_msg.arbitration_id, sent_msg.arbitration_id)
self.assertEqual(recv_msg.id_type, sent_msg.id_type)
self.assertEqual(recv_msg.is_remote_frame, sent_msg.is_remote_frame)
self.assertEqual(recv_msg.is_error_frame, sent_msg.is_error_frame)
self.assertEqual(recv_msg.dlc, sent_msg.dlc)
if not sent_msg.is_remote_frame:
self.assertSequenceEqual(recv_msg.data, sent_msg.data)
def _send_and_receive(self, msg):
# Send with bus 1, receive with bus 2
self.bus1.send(msg)
recv_msg = self.bus2.recv(TIMEOUT)
self._check_received_message(recv_msg, msg)
# Some buses may receive their own messages. Remove it from the queue
self.bus1.recv(0)
# Send with bus 2, receive with bus 1
# Add 1 to arbitration ID to make it a different message
msg.arbitration_id += 1
self.bus2.send(msg)
recv_msg = self.bus1.recv(TIMEOUT)
self._check_received_message(recv_msg, msg)
def test_no_message(self):
self.assertIsNone(self.bus1.recv(0.1))
def test_timestamp(self):
self.bus2.send(can.Message())
recv_msg1 = self.bus1.recv(TIMEOUT)
time.sleep(1)
self.bus2.send(can.Message())
recv_msg2 = self.bus1.recv(TIMEOUT)
delta_time = recv_msg2.timestamp - recv_msg1.timestamp
self.assertTrue(0.95 < delta_time < 1.05)
def test_standard_message(self):
msg = can.Message(extended_id=False,
arbitration_id=0x100,
data=[1, 2, 3, 4, 5, 6, 7, 8])
self._send_and_receive(msg)
def test_extended_message(self):
msg = can.Message(extended_id=True,
arbitration_id=0x123456,
data=[10, 11, 12, 13, 14, 15, 16, 17])
self._send_and_receive(msg)
def test_remote_message(self):
msg = can.Message(extended_id=False,
arbitration_id=0x200,
is_remote_frame=True,
dlc=4)
self._send_and_receive(msg)
def test_dlc_less_than_eight(self):
msg = can.Message(extended_id=False,
arbitration_id=0x300,
data=[4, 5, 6])
self._send_and_receive(msg)
if __name__ == '__main__':
unittest.main()