forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbroadcastmanager.py
More file actions
139 lines (106 loc) · 4.16 KB
/
broadcastmanager.py
File metadata and controls
139 lines (106 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
# coding: utf-8
"""
Exposes several methods for transmitting cyclic messages.
"""
import can
import abc
import logging
import sched
import threading
import time
log = logging.getLogger('can.bcm')
log.debug("Loading base broadcast manager functionality")
class CyclicTask(object):
"""
Abstract Base for all Cyclic Tasks
"""
@abc.abstractmethod
def stop(self):
"""Cancel this periodic task.
"""
class CyclicSendTaskABC(CyclicTask):
"""
Message send task with defined period
"""
def __init__(self, message, period):
"""
:param message: The :class:`can.Message` to be sent periodically.
:param float period: The rate in seconds at which to send the message.
"""
self.message = message
self.can_id = message.arbitration_id
self.period = period
super(CyclicSendTaskABC, self).__init__()
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC):
def __init__(self, message, period, duration):
"""Message send task with a defined duration and period.
:param message: The :class:`can.Message` to be sent periodically.
:param float period: The rate in seconds at which to send the message.
:param float duration:
The duration to keep sending this message at given rate.
"""
super(LimitedDurationCyclicSendTaskABC, self).__init__(message, period)
self.duration = duration
class RestartableCyclicTaskABC(CyclicSendTaskABC):
"""Adds support for restarting a stopped cyclic task"""
@abc.abstractmethod
def start(self):
"""Restart a stopped periodic task.
"""
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
"""Adds support for modifying a periodic message"""
def modify_data(self, message):
"""Update the contents of this periodically sent message without altering
the timing.
:param message: The :class:`~can.Message` with new :attr:`Message.data`.
"""
self.message = message
class MultiRateCyclicSendTaskABC(CyclicSendTaskABC):
"""Exposes more of the full power of the TX_SETUP opcode.
Transmits a message `count` times at `initial_period` then
continues to transmit message at `subsequent_period`.
"""
def __init__(self, channel, message, count, initial_period, subsequent_period):
super(MultiRateCyclicSendTaskABC, self).__init__(channel, message, subsequent_period)
class ThreadBasedCyclicSendTask(ModifiableCyclicTaskABC,
LimitedDurationCyclicSendTaskABC,
RestartableCyclicTaskABC):
"""Fallback cyclic send task using thread."""
def __init__(self, bus, lock, message, period, duration=None):
super(ThreadBasedCyclicSendTask, self).__init__(message, period, duration)
self.bus = bus
self.lock = lock
self.stopped = True
self.thread = None
self.end_time = time.time() + duration if duration else None
self.start()
def stop(self):
self.stopped = True
def start(self):
self.stopped = False
if self.thread is None or not self.thread.is_alive():
name = "Cyclic send task for 0x%X" % (self.message.arbitration_id)
self.thread = threading.Thread(target=self._run, name=name)
self.thread.daemon = True
self.thread.start()
def _run(self):
while not self.stopped:
# Prevent calling bus.send from multiple threads
with self.lock:
started = time.time()
try:
self.bus.send(self.message)
except Exception as exc:
log.exception(exc)
break
if self.end_time is not None and time.time() >= self.end_time:
break
# Compensate for the time it takes to send the message
delay = self.period - (time.time() - started)
time.sleep(max(0.0, delay))
def send_periodic(bus, message, period):
"""
Send a message every `period` seconds on the given channel.
"""
return can.interface.CyclicSendTask(bus, message, period)