88import threading
99import logging
1010import time
11+ try :
12+ import asyncio
13+ except ImportError :
14+ asyncio = None
1115
1216logger = logging .getLogger ('can.Notifier' )
1317
1418
1519class Notifier (object ):
1620
17- def __init__ (self , bus , listeners , timeout = 1.0 ):
21+ def __init__ (self , bus , listeners , timeout = 1.0 , loop = None ):
1822 """Manages the distribution of **Messages** from a given bus/buses to a
1923 list of listeners.
2024
2125 :param can.BusABC bus: A :ref:`bus` or a list of buses to listen to.
2226 :param list listeners: An iterable of :class:`~can.Listener`
2327 :param float timeout: An optional maximum number of seconds to wait for any message.
28+ :param asyncio.AbstractEventLoop loop:
29+ An :mod:`asyncio` event loop to schedule listeners in.
2430 """
2531 self .listeners = listeners
2632 self .bus = bus
2733 self .timeout = timeout
34+ self ._loop = loop
2835
2936 #: Exception raised in thread
3037 self .exception = None
@@ -35,11 +42,24 @@ def __init__(self, bus, listeners, timeout=1.0):
3542 self ._readers = []
3643 buses = self .bus if isinstance (self .bus , list ) else [self .bus ]
3744 for bus in buses :
45+ self .add_bus (bus )
46+
47+ def add_bus (self , bus ):
48+ """Add a bus for notification.
49+
50+ :param can.BusABC bus:
51+ CAN bus instance.
52+ """
53+ if self ._loop is not None and hasattr (bus , 'fileno' ) and bus .fileno () >= 0 :
54+ # Use file descriptor to watch for messages
55+ reader = bus .fileno ()
56+ self ._loop .add_reader (reader , self ._on_message_available , bus )
57+ else :
3858 reader = threading .Thread (target = self ._rx_thread , args = (bus ,),
39- name = 'can.notifier for bus "{}"' .format (bus .channel_info ))
59+ name = 'can.notifier for bus "{}"' .format (bus .channel_info ))
4060 reader .daemon = True
4161 reader .start ()
42- self ._readers .append (reader )
62+ self ._readers .append (reader )
4363
4464 def stop (self , timeout = 5 ):
4565 """Stop notifying Listeners when new :class:`~can.Message` objects arrive
@@ -52,25 +72,54 @@ def stop(self, timeout=5):
5272 self ._running = False
5373 end_time = time .time () + timeout
5474 for reader in self ._readers :
55- now = time .time ()
56- if now < end_time :
57- reader .join (end_time - now )
75+ if isinstance (reader , threading .Thread ):
76+ now = time .time ()
77+ if now < end_time :
78+ reader .join (end_time - now )
79+ else :
80+ # reader is a file descriptor
81+ self ._loop .remove_reader (reader )
5882 for listener in self .listeners :
59- listener .stop ()
83+ if hasattr (listener , 'stop' ):
84+ listener .stop ()
6085
6186 def _rx_thread (self , bus ):
6287 msg = None
6388 try :
6489 while self ._running :
6590 if msg is not None :
6691 with self ._lock :
67- for callback in self .listeners :
68- callback (msg )
92+ if self ._loop is not None :
93+ self ._loop .call_soon_threadsafe (
94+ self ._on_message_received , msg )
95+ else :
96+ self ._on_message_received (msg )
6997 msg = bus .recv (self .timeout )
7098 except Exception as exc :
7199 self .exception = exc
100+ if self ._loop is not None :
101+ self ._loop .call_soon_threadsafe (self ._on_error , exc )
102+ else :
103+ self ._on_error (exc )
72104 raise
73105
106+ def _on_message_available (self , bus ):
107+ msg = bus .recv (0 )
108+ if msg is not None :
109+ self ._on_message_received (msg )
110+
111+ def _on_message_received (self , msg ):
112+ for callback in self .listeners :
113+ res = callback (msg )
114+ if self ._loop is not None and asyncio .iscoroutine (res ):
115+ # Schedule coroutine
116+ self ._loop .create_task (res )
117+
118+ def _on_error (self , exc ):
119+ for listener in self .listeners :
120+ if hasattr (listener , 'on_error' ):
121+ listener .on_error (exc )
122+
74123 def add_listener (self , listener ):
75124 """Add new Listener to the notification list.
76125 If it is already present, it will be called two times
0 commit comments