Skip to content

Commit 1d7f088

Browse files
Update remote interface with periodic support
1 parent 90d44d0 commit 1d7f088

5 files changed

Lines changed: 87 additions & 106 deletions

File tree

can/interfaces/remote/client.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,67 +89,67 @@ def recv(self, timeout=None):
8989
return event.msg
9090
elif isinstance(event, events.RemoteException):
9191
raise event.exc
92+
elif isinstance(event, events.ConnectionClosed):
93+
raise CanRemoteError("Server closed connection unexpectedly")
9294
return None
9395

94-
def send(self, msg):
96+
def send(self, msg, timeout=None):
9597
"""Transmit a message to CAN bus.
9698
9799
:param can.Message msg: A Message object.
98100
:raises can.interfaces.remote.CanRemoteError:
99101
On failed transmission to socket.
100102
"""
101-
self.conn.send_event(events.CanMessage(msg))
103+
self.send_event(events.CanMessage(msg))
104+
105+
def send_event(self, event):
106+
self.conn.send_event(event)
102107
try:
103108
self.socket.sendall(self.conn.next_data())
104109
except OSError as e:
105110
raise CanRemoteError(str(e))
106111

112+
def send_periodic(self, message, period, duration=None):
113+
return CyclicSendTask(self, message, period, duration)
114+
107115
def shutdown(self):
108116
"""Close socket connection."""
109117
# Give threads a chance to finish up
118+
logger.debug('Closing connection to server')
110119
self.socket.shutdown(socket.SHUT_WR)
111120
while not isinstance(self._next_event(1), events.ConnectionClosed):
112121
pass
113122
self.socket.close()
114123
logger.debug('Network connection closed')
115124

116125

117-
class CyclicSendTask(can.broadcastmanager.CyclicSendTaskABC):
126+
class CyclicSendTask(can.broadcastmanager.LimitedDurationCyclicSendTaskABC,
127+
can.broadcastmanager.RestartableCyclicTaskABC,
128+
can.broadcastmanager.ModifiableCyclicTaskABC):
118129

119-
def __init__(self, channel, message, period):
130+
def __init__(self, bus, message, period, duration=None):
120131
"""
121-
:param channel: The name of the CAN channel to connect to.
132+
:param bus: The remote connection to use.
122133
:param message: The message to be sent periodically.
123134
:param period: The rate in seconds at which to send the message.
124135
"""
125-
super(CyclicSendTask, self).__init__(channel, message, period)
126-
self.message = message
127-
self.period = period
128-
self.socket = create_connection(channel)
129-
self.conn = connection.Connection()
136+
self.bus = bus
137+
super(CyclicSendTask, self).__init__(message, period, duration)
130138
self.start()
131139

132-
def __del__(self):
133-
self.stop()
134-
self.socket.close()
135-
136140
def start(self):
137-
self._send_event(
138-
events.PeriodicMessageStart(self.message, self.period))
141+
self.bus.send_event(
142+
events.PeriodicMessageStart(self.message, self.period, self.duration))
139143

140144
def stop(self):
141-
self._send_event(
145+
self.bus.send_event(
142146
events.PeriodicMessageStop(self.message.arbitration_id))
143147

144148
def modify_data(self, message):
145149
assert message.arbitration_id == self.message.arbitration_id
146-
self._send_event(
147-
events.PeriodicMessageUpdate(
148-
self.message.arbitration_id, message.data))
149-
150-
def _send_event(self, event):
151-
self.conn.send_event(event)
152-
self.socket.sendall(self.conn.next_data())
150+
self.message = message
151+
event = events.PeriodicMessageStart(self.message, self.period)
152+
self.bus.send_event(event)
153153

154154

155155
class CanRemoteError(can.CanError):

can/interfaces/remote/connection.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,5 @@ class ProtocolError(Exception):
121121
event_types.register(events.RemoteException)
122122
event_types.register(events.PeriodicMessageStart)
123123
event_types.register(events.PeriodicMessageStop)
124-
event_types.register(events.PeriodicMessageUpdate)
125124
event_types.register(events.FilterConfig)
126125
event_types.register(events.ConnectionClosed)

can/interfaces/remote/events.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,9 @@ class PeriodicMessageStart(BaseEvent):
273273
+--------+-------+--------------------------------------------------------+
274274
| Byte | Type | Contents |
275275
+========+=======+========================================================+
276-
| 0 - 7 | F64 | Period (s) |
276+
| 0 - 3 | U32 | Period (ms) |
277+
+--------+-------+--------------------------------------------------------+
278+
| 4 - 7 | U32 | Duration (ms) |
277279
+--------+-------+--------------------------------------------------------+
278280
| 8 - 11 | U32 | Arbitration ID |
279281
+--------+-------+--------------------------------------------------------+
@@ -288,9 +290,9 @@ class PeriodicMessageStart(BaseEvent):
288290
#: Event ID
289291
EVENT_ID = 7
290292

291-
_STRUCT = struct.Struct('>dlBB8s')
293+
_STRUCT = struct.Struct('>lllBB8s')
292294

293-
def __init__(self, msg, period):
295+
def __init__(self, msg, period, duration=None):
294296
"""
295297
:param can.Message msg:
296298
A Message object.
@@ -300,9 +302,12 @@ def __init__(self, msg, period):
300302
#: A :class:`can.Message` instance.
301303
self.msg = msg
302304
self.period = period
305+
self.duration = duration
303306

304307
def encode(self):
305-
buf = self._STRUCT.pack(self.period,
308+
duration = int(self.duration * 1000) if self.duration is not None else 0
309+
buf = self._STRUCT.pack(int(self.period * 1000),
310+
duration,
306311
self.msg.arbitration_id,
307312
self.msg.dlc,
308313
self.msg.id_type,
@@ -312,15 +317,17 @@ def encode(self):
312317
@classmethod
313318
def from_buffer(cls, buf):
314319
try:
315-
period, arb_id, dlc, extended, data = cls._STRUCT.unpack_from(buf)
320+
(period, duration, arb_id, dlc, extended,
321+
data) = cls._STRUCT.unpack_from(buf)
316322
except struct.error:
317323
raise NeedMoreDataError()
318324

319325
msg = can.Message(arbitration_id=arb_id,
320326
extended_id=extended,
321327
dlc=dlc,
322328
data=data[:dlc])
323-
return cls(msg, period)
329+
duration = duration / 1000.0 if duration > 0 else None
330+
return cls(msg, period / 1000.0, duration)
324331

325332
def __len__(self):
326333
return self._STRUCT.size
@@ -364,31 +371,6 @@ def __len__(self):
364371
return self._STRUCT.size
365372

366373

367-
class PeriodicMessageUpdate(CanMessage):
368-
"""Stop periodic transmission of a message.
369-
370-
+--------+-------+--------------------------------------------------------+
371-
| Byte | Type | Contents |
372-
+========+=======+========================================================+
373-
| 0 - 7 | | Reserved |
374-
+--------+-------+--------------------------------------------------------+
375-
| 8 - 11 | U32 | Arbitration ID |
376-
+--------+-------+--------------------------------------------------------+
377-
| 12 | U8 | DLC |
378-
+--------+-------+--------------------------------------------------------+
379-
| 13 | U8 | Flags: |
380-
| | | - Bit 0: Extended ID |
381-
| | | - Bit 1: Remote frame |
382-
| | | - Bit 2: Error frame |
383-
+--------+-------+--------------------------------------------------------+
384-
| 14 - 21| U8 | Data padded to an 8 byte array |
385-
+--------+-------+--------------------------------------------------------+
386-
"""
387-
388-
#: Event ID
389-
EVENT_ID = 9
390-
391-
392374
class FilterConfig(BaseEvent):
393375
"""CAN filter configuration.
394376

can/interfaces/remote/server.py

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
logger = logging.getLogger(__name__)
1414

1515

16-
class RemoteServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
16+
class RemoteServer(socketserver.ThreadingTCPServer):
1717
"""Server for CAN communication."""
1818

1919
def __init__(self, host='0.0.0.0', port=None, **config):
@@ -41,69 +41,57 @@ def __init__(self, host='0.0.0.0', port=None, **config):
4141
class ClientBusConnection(socketserver.BaseRequestHandler):
4242
"""A client connection on the server."""
4343

44-
def handle(self):
45-
#: Socket connection to client
46-
self.socket = self.request
44+
def setup(self):
45+
self.config = dict(self.server.config)
46+
self.bus = None
4747
self.conn = can.interfaces.remote.connection.Connection()
4848
# Threads will finish up when this is set
4949
self.stop_event = threading.Event()
50+
self.send_thread = threading.Thread(target=self._send_to_client,
51+
name='Send to client')
52+
self.send_thread.daemon = True
53+
self.send_tasks = {}
54+
# Register with the server
55+
self.server.clients.append(self)
5056

51-
event = self._next_event()
52-
if isinstance(event, events.BusRequest):
53-
self._start_bus(event)
54-
elif isinstance(event, events.PeriodicMessageStart):
55-
self._start_periodic_transmit(event)
56-
else:
57+
def handle(self):
58+
bus_event = self._next_event()
59+
if not isinstance(bus_event, events.BusRequest):
5760
raise RemoteServerError('Handshake error')
5861

59-
def _start_bus(self, bus_event):
60-
config = dict(self.server.config)
61-
self.config = config
62-
6362
if bus_event.version != can.interfaces.remote.PROTOCOL_VERSION:
6463
raise RemoteServerError('Protocol version mismatch (%d != %d)' % (
6564
bus_event.version, can.interfaces.remote.PROTOCOL_VERSION))
6665

67-
config.setdefault("bitrate", bus_event.bitrate)
66+
self.config.setdefault("bitrate", bus_event.bitrate)
6867

6968
filter_event = self._next_event()
7069
if not isinstance(filter_event, events.FilterConfig):
7170
raise RemoteServerError('Handshake error')
72-
config["can_filters"] = filter_event.can_filters
71+
self.config["can_filters"] = filter_event.can_filters
7372

7473
try:
75-
self.bus = can.interface.Bus(**config)
74+
self.bus = can.interface.Bus(**self.config)
7675
except Exception as e:
7776
self.conn.send_event(events.RemoteException(e))
7877
raise
7978
else:
8079
logger.info("Connected to bus '%s'", self.bus.channel_info)
8180
self.conn.send_event(events.BusResponse(self.bus.channel_info))
82-
# Register with the server
83-
self.server.clients.append(self)
8481
finally:
85-
self.socket.sendall(self.conn.next_data())
82+
self.request.sendall(self.conn.next_data())
8683

87-
self.send_thread = threading.Thread(target=self._send_to_client,
88-
name='Send to client')
89-
self.send_thread.daemon = True
9084
self.send_thread.start()
9185
self._receive_from_client()
9286

93-
def _start_periodic_transmit(self, start_event):
94-
#: Cyclic send task
95-
self.task = can.interface.CyclicSendTask(self.server.config['channel'],
96-
start_event.msg,
97-
start_event.period)
98-
9987
def _next_event(self):
10088
"""Block until a new event has been received.
10189
10290
:return: Next event in queue
10391
"""
10492
event = self.conn.next_event()
10593
while event is None:
106-
self.conn.receive_data(self.socket.recv(256))
94+
self.conn.receive_data(self.request.recv(256))
10795
event = self.conn.next_event()
10896
return event
10997

@@ -129,20 +117,24 @@ def _receive_from_client(self):
129117
elif isinstance(event, events.ConnectionClosed):
130118
break
131119
elif isinstance(event, events.PeriodicMessageStart):
132-
self.task.start()
120+
if event.msg.arbitration_id in self.send_tasks:
121+
# Modify already existing task
122+
self.send_tasks[event.msg.arbitration_id].modify_data(event.msg)
123+
else:
124+
# Create new task
125+
task = self.bus.send_periodic(event.msg,
126+
event.period,
127+
event.duration)
128+
self.send_tasks[event.msg.arbitration_id] = task
133129
elif isinstance(event, events.PeriodicMessageStop):
134-
self.task.stop()
135-
elif isinstance(event, events.PeriodicMessageUpdate):
136-
self.task.modify_data(event.msg)
130+
self.send_tasks[event.arbitration_id].stop()
137131

138-
logger.info('Closing connection to %s', self.socket.getpeername())
132+
def finish(self):
133+
logger.info('Closing connection to %s', self.request.getpeername())
139134
# Remove itself from the server's list of clients
140135
self.server.clients.remove(self)
141136
self.stop_event.set()
142-
self.send_thread.join(1.0)
143-
self.socket.shutdown(socket.SHUT_WR)
144-
self.socket.close()
145-
self.socket = None
137+
self.send_thread.join(3)
146138

147139
def _send_to_client(self):
148140
"""Continuously read CAN messages and send to client."""
@@ -151,7 +143,7 @@ def _send_to_client(self):
151143
event = self._next_event_from_bus(0.5)
152144

153145
# Wait for client to be ready for new messages (max 2 seconds)
154-
client_ready = len(select.select([], [self.socket], [], 2)[1]) > 0
146+
client_ready = len(select.select([], [self.request], [], 2)[1]) > 0
155147

156148
# Read all CAN events to buffer
157149
while event is not None:
@@ -165,7 +157,7 @@ def _send_to_client(self):
165157

166158
# Send all data at once if there is any
167159
if self.conn.data_ready() and client_ready:
168-
self.socket.sendall(self.conn.next_data())
160+
self.request.sendall(self.conn.next_data())
169161

170162
logger.info('Disconnecting from CAN bus')
171163
self.bus.shutdown()

test/remote_test.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,12 @@ def test_periodic_start(self):
8282
msg = can.Message(0x123,
8383
extended_id=False,
8484
data=[1, 2, 3, 4, 5, 6, 7, 8])
85-
event1 = events.PeriodicMessageStart(msg, 0.01)
85+
event1 = events.PeriodicMessageStart(msg, 0.01, 10)
8686
buf = event1.encode()
8787
event2 = events.PeriodicMessageStart.from_buffer(buf)
8888
self.assertEqual(event1, event2)
8989
self.assertAlmostEqual(event1.period, event2.period)
90+
self.assertAlmostEqual(event1.duration, event2.duration)
9091
self.assertEqual(len(event2), len(buf))
9192

9293

@@ -177,8 +178,8 @@ def setUp(self):
177178
time.sleep(0.1)
178179

179180
def tearDown(self):
180-
self.remote_bus.shutdown()
181181
self.real_bus.shutdown()
182+
self.remote_bus.shutdown()
182183

183184
def test_initialization(self):
184185
self.assertEqual(self.remote_bus.channel_info,
@@ -231,13 +232,20 @@ def test_recv_failure(self):
231232
with self.assertRaisesRegexp(can.CanError, 'This is some error'):
232233
self.remote_bus.recv(5)
233234

234-
def _test_cyclic(self):
235-
can.rc['interface'] = 'remote'
236-
can.rc['channel'] = '127.0.0.1:54700'
237-
msg = can.Message(arbitration_id=0xabcdef,
238-
data=[1, 2, 3, 4, 5, 6, 7, 8])
239-
task = can.interfaces.remote.CyclicSendTask('127.0.0.1:54700', msg, 0.1)
240-
time.sleep(3)
235+
def test_cyclic(self):
236+
test_msg = can.Message(arbitration_id=0xabcdef,
237+
data=[1, 2, 3, 4, 5, 6, 7, 8])
238+
task = self.remote_bus.send_periodic(test_msg, 0.01)
239+
time.sleep(2)
240+
task.stop()
241+
msgs = []
242+
msg = self.real_bus.recv(0)
243+
while msg is not None:
244+
msg = self.real_bus.recv(0)
245+
if msg is not None:
246+
msgs.append(msg)
247+
self.assertTrue(150 < len(msgs) < 220)
248+
self.assertEqual(msgs[0], test_msg)
241249

242250

243251
if __name__ == '__main__':

0 commit comments

Comments
 (0)