Skip to content

Commit 8d92a05

Browse files
authored
Merge pull request #412 from hardbyte/feature-socketcan-bcm-owned-by-bus
Feature socketcan's BCM owned by Bus, better documentation, ...
2 parents 24c50df + d91da63 commit 8d92a05

6 files changed

Lines changed: 161 additions & 24 deletions

File tree

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# Validate with curl --data-binary @.codecov.yml https://codecov.io/validate
12
codecov:
23
archive:
34
uploads: no

can/bus.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def __init__(self, channel, can_filters=None, **config):
4949
:param dict config:
5050
Any backend dependent configurations are passed in this dictionary
5151
"""
52+
self._periodic_tasks = []
5253
self.set_filters(can_filters)
5354

5455
def __str__(self):
@@ -159,18 +160,30 @@ def send(self, msg, timeout=None):
159160
"""
160161
raise NotImplementedError("Trying to write to a readonly bus?")
161162

162-
def send_periodic(self, msg, period, duration=None):
163+
def send_periodic(self, msg, period, duration=None, store_task=True):
163164
"""Start sending a message at a given period on this bus.
164165
166+
The task will be active until one of the following conditions are met:
167+
168+
- the (optional) duration expires
169+
- the Bus instance goes out of scope
170+
- the Bus instance is shutdown
171+
- :meth:`Bus.stop_all_periodic_tasks()` is called
172+
- the task's :meth:`Task.stop()` method is called.
173+
165174
:param can.Message msg:
166175
Message to transmit
167176
:param float period:
168177
Period in seconds between each message
169178
:param float duration:
170179
The duration to keep sending this message at given rate. If
171180
no duration is provided, the task will continue indefinitely.
172-
173-
:return: A started task instance
181+
:param bool store_task:
182+
If True (the default) the task will be attached to this Bus instance.
183+
Disable to instead manage tasks manually.
184+
:return:
185+
A started task instance. Note the task can be stopped (and depending on
186+
the backend modified) by calling the :meth:`stop` method.
174187
:rtype: can.broadcastmanager.CyclicSendTaskABC
175188
176189
.. note::
@@ -180,12 +193,39 @@ def send_periodic(self, msg, period, duration=None):
180193
general the message will be sent at the given rate until at
181194
least **duration** seconds.
182195
196+
.. note::
197+
198+
For extremely long running Bus instances with many short lived tasks the default
199+
api with ``store_task==True`` may not be appropriate as the stopped tasks are
200+
still taking up memory as they are associated with the Bus instance.
183201
"""
184202
if not hasattr(self, "_lock_send_periodic"):
185203
# Create a send lock for this bus
186204
self._lock_send_periodic = threading.Lock()
187-
return ThreadBasedCyclicSendTask(
188-
self, self._lock_send_periodic, msg, period, duration)
205+
task = ThreadBasedCyclicSendTask(self, self._lock_send_periodic, msg, period, duration)
206+
# we wrap the task's stop method to also remove it from the Bus's list of tasks
207+
original_stop_method = task.stop
208+
209+
def wrapped_stop_method(remove_task=True):
210+
if remove_task:
211+
try:
212+
self._periodic_tasks.remove(task)
213+
except ValueError:
214+
pass
215+
original_stop_method()
216+
task.stop = wrapped_stop_method
217+
if store_task:
218+
self._periodic_tasks.append(task)
219+
return task
220+
221+
def stop_all_periodic_tasks(self, remove_tasks=True):
222+
"""Stop sending any messages that were started using bus.send_periodic
223+
224+
:param bool remove_tasks:
225+
Stop tracking the stopped tasks.
226+
"""
227+
for task in self._periodic_tasks:
228+
task.stop(remove_task=remove_tasks)
189229

190230
def __iter__(self):
191231
"""Allow iteration on messages as they are received.

can/interfaces/socketcan/socketcan.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ def send_bcm(bcm_socket, data):
214214
else:
215215
raise e
216216

217+
217218
def _add_flags_to_can_id(message):
218219
can_id = message.arbitration_id
219220
if message.is_extended_id:
@@ -240,21 +241,21 @@ class CyclicSendTask(LimitedDurationCyclicSendTaskABC,
240241
241242
"""
242243

243-
def __init__(self, channel, message, period, duration=None):
244+
def __init__(self, bcm_socket, message, period, duration=None):
244245
"""
245-
:param str channel: The name of the CAN channel to connect to.
246+
:param bcm_socket: An open bcm socket on the desired CAN channel.
246247
:param can.Message message: The message to be sent periodically.
247248
:param float period: The rate in seconds at which to send the message.
248249
:param float duration: Approximate duration in seconds to send the message.
249250
"""
250251
super(CyclicSendTask, self).__init__(message, period, duration)
251-
self.channel = channel
252+
self.bcm_socket = bcm_socket
252253
self.duration = duration
253254
self._tx_setup(message)
254255
self.message = message
255256

256257
def _tx_setup(self, message):
257-
self.bcm_socket = create_bcm_socket(self.channel)
258+
258259
# Create a low level packed frame to pass to the kernel
259260
self.can_id_with_flags = _add_flags_to_can_id(message)
260261
self.flags = CAN_FD_FRAME if message.is_fd else 0
@@ -283,7 +284,6 @@ def stop(self):
283284

284285
stopframe = build_bcm_tx_delete_header(self.can_id_with_flags, self.flags)
285286
send_bcm(self.bcm_socket, stopframe)
286-
self.bcm_socket.close()
287287

288288
def modify_data(self, message):
289289
"""Update the contents of this periodically sent message.
@@ -460,8 +460,9 @@ def __init__(self, channel="", receive_own_messages=False, fd=False, **kwargs):
460460
self.socket = create_socket()
461461
self.channel = channel
462462
self.channel_info = "socketcan channel '%s'" % channel
463+
self._bcm_sockets = {}
463464

464-
# set the receive_own_messages paramater
465+
# set the receive_own_messages parameter
465466
try:
466467
self.socket.setsockopt(SOL_CAN_RAW,
467468
CAN_RAW_RECV_OWN_MSGS,
@@ -481,12 +482,17 @@ def __init__(self, channel="", receive_own_messages=False, fd=False, **kwargs):
481482
0x1FFFFFFF)
482483

483484
bind_socket(self.socket, channel)
484-
485485
kwargs.update({'receive_own_messages': receive_own_messages, 'fd': fd})
486486
super(SocketcanBus, self).__init__(channel=channel, **kwargs)
487487

488488
def shutdown(self):
489-
"""Closes the socket."""
489+
"""Stops all active periodic tasks and closes the socket."""
490+
self.stop_all_periodic_tasks()
491+
for channel in self._bcm_sockets:
492+
log.debug("Closing bcm socket for channel {}".format(channel))
493+
bcm_socket = self._bcm_sockets[channel]
494+
bcm_socket.close()
495+
log.debug("Closing raw can socket")
490496
self.socket.close()
491497

492498
def _recv_internal(self, timeout):
@@ -578,7 +584,9 @@ def send_periodic(self, msg, period, duration=None):
578584
The duration to keep sending this message at given rate. If
579585
no duration is provided, the task will continue indefinitely.
580586
581-
:return: A started task instance
587+
:return:
588+
A started task instance. This can be used to modify the data,
589+
pause/resume the transmission and to stop the transmission.
582590
:rtype: can.interfaces.socketcan.CyclicSendTask
583591
584592
.. note::
@@ -589,7 +597,15 @@ def send_periodic(self, msg, period, duration=None):
589597
least *duration* seconds.
590598
591599
"""
592-
return CyclicSendTask(msg.channel or self.channel, msg, period, duration)
600+
bcm_socket = self._get_bcm_socket(msg.channel or self.channel)
601+
task = CyclicSendTask(bcm_socket, msg, period, duration)
602+
self._periodic_tasks.append(task)
603+
return task
604+
605+
def _get_bcm_socket(self, channel):
606+
if channel not in self._bcm_sockets:
607+
self._bcm_sockets[channel] = create_bcm_socket(self.channel)
608+
return self._bcm_sockets[channel]
593609

594610
def _apply_filters(self, filters):
595611
try:

doc/bcm.rst

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
.. _bcm:
2+
13
Broadcast Manager
24
=================
35

46
.. module:: can.broadcastmanager
57

6-
The broadcast manager isn't yet supported by all interfaces.
7-
Currently SocketCAN and IXXAT are supported at least partially.
8-
It allows the user to setup periodic message jobs.
9-
10-
If periodic transmission is not supported natively, a software thread
8+
The broadcast manager allows the user to setup periodic message jobs.
9+
For example sending a particular message at a given period. The broadcast
10+
manager supported natively by several interfaces and a software thread
1111
based scheduler is used as a fallback.
1212

1313
This example shows the socketcan backend using the broadcast manager:
@@ -23,6 +23,10 @@ Message Sending Tasks
2323
The class based api for the broadcast manager uses a series of
2424
`mixin classes <https://www.ianlewis.org/en/mixins-and-python>`_.
2525
All mixins inherit from :class:`~can.broadcastmanager.CyclicSendTaskABC`
26+
which inherits from :class:`~can.broadcastmanager.CyclicTask`.
27+
28+
.. autoclass:: can.broadcastmanager.CyclicTask
29+
:members:
2630

2731
.. autoclass:: can.broadcastmanager.CyclicSendTaskABC
2832
:members:

doc/bus.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ class, for example::
1111
vector_bus = can.Bus(interface='vector', ...)
1212

1313
That bus is then able to handle the interface specific software/hardware interactions
14-
and implements the :class:`~can.BusABC` API. It itself is an instance of ``VectorBus``,
15-
but these specififc buses should not be instantiated directly.
14+
and implements the :class:`~can.BusABC` API.
1615

1716
A thread safe bus wrapper is also available, see `Thread safe bus`_.
1817

@@ -35,8 +34,9 @@ API
3534
Transmitting
3635
''''''''''''
3736

38-
Writing to the bus is done by calling the :meth:`~can.BusABC.send` method and
39-
passing a :class:`~can.Message` instance.
37+
Writing individual messages to the bus is done by calling the :meth:`~can.BusABC.send` method
38+
and passing a :class:`~can.Message` instance. Periodic sending is controlled by the
39+
:ref:`broadcast manager <bcm>`.
4040

4141

4242
Receiving

test/simplecyclic_test.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from .config import *
1616

17+
1718
class SimpleCyclicSendTaskTest(unittest.TestCase):
1819

1920
@unittest.skipIf(IS_CI, "the timing sensitive behaviour cannot be reproduced reliably on a CI server")
@@ -35,5 +36,80 @@ def test_cycle_time(self):
3536
bus1.shutdown()
3637
bus2.shutdown()
3738

39+
40+
def test_removing_bus_tasks(self):
41+
bus = can.interface.Bus(bustype='virtual')
42+
tasks = []
43+
for task_i in range(10):
44+
msg = can.Message(extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7])
45+
msg.arbitration_id = task_i
46+
task = bus.send_periodic(msg, 0.1, 1)
47+
tasks.append(task)
48+
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)
49+
50+
assert len(bus._periodic_tasks) == 10
51+
52+
for task in tasks:
53+
# Note calling task.stop will remove the task from the Bus's internal task management list
54+
task.stop()
55+
56+
assert len(bus._periodic_tasks) == 0
57+
bus.shutdown()
58+
59+
def test_managed_tasks(self):
60+
bus = can.interface.Bus(bustype='virtual', receive_own_messages=True)
61+
tasks = []
62+
for task_i in range(3):
63+
msg = can.Message(extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7])
64+
msg.arbitration_id = task_i
65+
task = bus.send_periodic(msg, 0.1, 10, store_task=False)
66+
tasks.append(task)
67+
self.assertIsInstance(task, can.broadcastmanager.CyclicSendTaskABC)
68+
69+
assert len(bus._periodic_tasks) == 0
70+
71+
# Self managed tasks should still be sending messages
72+
for _ in range(50):
73+
received_msg = bus.recv(timeout=5.0)
74+
assert received_msg is not None
75+
assert received_msg.arbitration_id in {0, 1, 2}
76+
77+
for task in tasks:
78+
task.stop()
79+
80+
for task in tasks:
81+
assert task.thread.join(5.0) is None, "Task didn't stop before timeout"
82+
83+
bus.shutdown()
84+
85+
def test_stopping_perodic_tasks(self):
86+
bus = can.interface.Bus(bustype='virtual')
87+
tasks = []
88+
for task_i in range(10):
89+
msg = can.Message(extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7])
90+
msg.arbitration_id = task_i
91+
task = bus.send_periodic(msg, 0.1, 1)
92+
tasks.append(task)
93+
94+
assert len(bus._periodic_tasks) == 10
95+
# stop half the tasks using the task object
96+
for task in tasks[::2]:
97+
task.stop()
98+
99+
assert len(bus._periodic_tasks) == 5
100+
101+
# stop the other half using the bus api
102+
bus.stop_all_periodic_tasks(remove_tasks=False)
103+
104+
for task in tasks:
105+
assert task.thread.join(5.0) is None, "Task didn't stop before timeout"
106+
107+
# Tasks stopped via `stop_all_periodic_tasks` with remove_tasks=False should
108+
# still be associated with the bus (e.g. for restarting)
109+
assert len(bus._periodic_tasks) == 5
110+
111+
bus.shutdown()
112+
113+
38114
if __name__ == '__main__':
39115
unittest.main()

0 commit comments

Comments
 (0)