1313logger = logging .getLogger (__name__ )
1414
1515
16- class RemoteServer (socketserver .ThreadingMixIn , socketserver . TCPServer ):
16+ class RemoteServer (socketserver .ThreadingTCPServer ):
1717 """Server for CAN communication."""
1818
1919 def __init__ (self , host = '0.0.0.0' , port = None , ** config ):
@@ -41,69 +41,57 @@ def __init__(self, host='0.0.0.0', port=None, **config):
4141class ClientBusConnection (socketserver .BaseRequestHandler ):
4242 """A client connection on the server."""
4343
44- def handle (self ):
45- #: Socket connection to client
46- self .socket = self . request
44+ def setup (self ):
45+ self . config = dict ( self . server . config )
46+ self .bus = None
4747 self .conn = can .interfaces .remote .connection .Connection ()
4848 # Threads will finish up when this is set
4949 self .stop_event = threading .Event ()
50+ self .send_thread = threading .Thread (target = self ._send_to_client ,
51+ name = 'Send to client' )
52+ self .send_thread .daemon = True
53+ self .send_tasks = {}
54+ # Register with the server
55+ self .server .clients .append (self )
5056
51- event = self ._next_event ()
52- if isinstance (event , events .BusRequest ):
53- self ._start_bus (event )
54- elif isinstance (event , events .PeriodicMessageStart ):
55- self ._start_periodic_transmit (event )
56- else :
57+ def handle (self ):
58+ bus_event = self ._next_event ()
59+ if not isinstance (bus_event , events .BusRequest ):
5760 raise RemoteServerError ('Handshake error' )
5861
59- def _start_bus (self , bus_event ):
60- config = dict (self .server .config )
61- self .config = config
62-
6362 if bus_event .version != can .interfaces .remote .PROTOCOL_VERSION :
6463 raise RemoteServerError ('Protocol version mismatch (%d != %d)' % (
6564 bus_event .version , can .interfaces .remote .PROTOCOL_VERSION ))
6665
67- config .setdefault ("bitrate" , bus_event .bitrate )
66+ self . config .setdefault ("bitrate" , bus_event .bitrate )
6867
6968 filter_event = self ._next_event ()
7069 if not isinstance (filter_event , events .FilterConfig ):
7170 raise RemoteServerError ('Handshake error' )
72- config ["can_filters" ] = filter_event .can_filters
71+ self . config ["can_filters" ] = filter_event .can_filters
7372
7473 try :
75- self .bus = can .interface .Bus (** config )
74+ self .bus = can .interface .Bus (** self . config )
7675 except Exception as e :
7776 self .conn .send_event (events .RemoteException (e ))
7877 raise
7978 else :
8079 logger .info ("Connected to bus '%s'" , self .bus .channel_info )
8180 self .conn .send_event (events .BusResponse (self .bus .channel_info ))
82- # Register with the server
83- self .server .clients .append (self )
8481 finally :
85- self .socket .sendall (self .conn .next_data ())
82+ self .request .sendall (self .conn .next_data ())
8683
87- self .send_thread = threading .Thread (target = self ._send_to_client ,
88- name = 'Send to client' )
89- self .send_thread .daemon = True
9084 self .send_thread .start ()
9185 self ._receive_from_client ()
9286
93- def _start_periodic_transmit (self , start_event ):
94- #: Cyclic send task
95- self .task = can .interface .CyclicSendTask (self .server .config ['channel' ],
96- start_event .msg ,
97- start_event .period )
98-
9987 def _next_event (self ):
10088 """Block until a new event has been received.
10189
10290 :return: Next event in queue
10391 """
10492 event = self .conn .next_event ()
10593 while event is None :
106- self .conn .receive_data (self .socket .recv (256 ))
94+ self .conn .receive_data (self .request .recv (256 ))
10795 event = self .conn .next_event ()
10896 return event
10997
@@ -129,20 +117,24 @@ def _receive_from_client(self):
129117 elif isinstance (event , events .ConnectionClosed ):
130118 break
131119 elif isinstance (event , events .PeriodicMessageStart ):
132- self .task .start ()
120+ if event .msg .arbitration_id in self .send_tasks :
121+ # Modify already existing task
122+ self .send_tasks [event .msg .arbitration_id ].modify_data (event .msg )
123+ else :
124+ # Create new task
125+ task = self .bus .send_periodic (event .msg ,
126+ event .period ,
127+ event .duration )
128+ self .send_tasks [event .msg .arbitration_id ] = task
133129 elif isinstance (event , events .PeriodicMessageStop ):
134- self .task .stop ()
135- elif isinstance (event , events .PeriodicMessageUpdate ):
136- self .task .modify_data (event .msg )
130+ self .send_tasks [event .arbitration_id ].stop ()
137131
138- logger .info ('Closing connection to %s' , self .socket .getpeername ())
132+ def finish (self ):
133+ logger .info ('Closing connection to %s' , self .request .getpeername ())
139134 # Remove itself from the server's list of clients
140135 self .server .clients .remove (self )
141136 self .stop_event .set ()
142- self .send_thread .join (1.0 )
143- self .socket .shutdown (socket .SHUT_WR )
144- self .socket .close ()
145- self .socket = None
137+ self .send_thread .join (3 )
146138
147139 def _send_to_client (self ):
148140 """Continuously read CAN messages and send to client."""
@@ -151,7 +143,7 @@ def _send_to_client(self):
151143 event = self ._next_event_from_bus (0.5 )
152144
153145 # Wait for client to be ready for new messages (max 2 seconds)
154- client_ready = len (select .select ([], [self .socket ], [], 2 )[1 ]) > 0
146+ client_ready = len (select .select ([], [self .request ], [], 2 )[1 ]) > 0
155147
156148 # Read all CAN events to buffer
157149 while event is not None :
@@ -165,7 +157,7 @@ def _send_to_client(self):
165157
166158 # Send all data at once if there is any
167159 if self .conn .data_ready () and client_ready :
168- self .socket .sendall (self .conn .next_data ())
160+ self .request .sendall (self .conn .next_data ())
169161
170162 logger .info ('Disconnecting from CAN bus' )
171163 self .bus .shutdown ()
0 commit comments