forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnotifier_test.py
More file actions
58 lines (49 loc) · 1.74 KB
/
notifier_test.py
File metadata and controls
58 lines (49 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python
# coding: utf-8
import unittest
import time
import asyncio
import can
class NotifierTest(unittest.TestCase):
def test_single_bus(self):
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
reader = can.BufferedReader()
notifier = can.Notifier(bus, [reader], 0.1)
msg = can.Message()
bus.send(msg)
self.assertIsNotNone(reader.get_message(1))
notifier.stop()
bus.shutdown()
def test_multiple_bus(self):
bus1 = can.Bus(0, bustype="virtual", receive_own_messages=True)
bus2 = can.Bus(1, bustype="virtual", receive_own_messages=True)
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()
bus1.shutdown()
bus2.shutdown()
class AsyncNotifierTest(unittest.TestCase):
def test_asyncio_notifier(self):
loop = asyncio.get_event_loop()
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
reader = can.AsyncBufferedReader()
notifier = can.Notifier(bus, [reader], 0.1, loop=loop)
msg = can.Message()
bus.send(msg)
future = asyncio.wait_for(reader.get_message(), 1.0)
recv_msg = loop.run_until_complete(future)
self.assertIsNotNone(recv_msg)
notifier.stop()
bus.shutdown()
if __name__ == "__main__":
unittest.main()