Skip to content

Commit 7ddea9e

Browse files
authored
Add udp multicast bus (hardbyte#644)
Adds a virtual bus interface udp_multicast that allows multiple processes - potentially on different hosts - to exchange CAN data. This is achieved by serializing the CAN frames using MessagePack, wrapping them in IPv6/4 packets and sending them to a multicast group. The TTL of the IP packets is set to one by default, disallowing the packet to escape the localhost. It is configurable however, and would allow the communicating processes to run on different machines as well.
1 parent ad7a908 commit 7ddea9e

16 files changed

Lines changed: 692 additions & 62 deletions

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ jobs:
117117
can/broadcastmanager.py
118118
can/bus.py
119119
can/interface.py
120+
can/interfaces/udp_multicast/**.py
120121
can/interfaces/slcan.py
121122
can/interfaces/socketcan/**.py
122123
can/interfaces/virtual.py

can/interfaces/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"nican": ("can.interfaces.nican", "NicanBus"),
1818
"iscan": ("can.interfaces.iscan", "IscanBus"),
1919
"virtual": ("can.interfaces.virtual", "VirtualBus"),
20+
"udp_multicast": ("can.interfaces.udp_multicast", "UdpMulticastBus"),
2021
"neovi": ("can.interfaces.ics_neovi", "NeoViBus"),
2122
"vector": ("can.interfaces.vector", "VectorBus"),
2223
"slcan": ("can.interfaces.slcan", "slcanBus"),
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""A module to allow CAN over UDP on IPv4/IPv6 multicast."""
2+
3+
from .bus import UdpMulticastBus
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import logging
2+
import select
3+
import socket
4+
import struct
5+
6+
from typing import List, Optional, Tuple, Union
7+
8+
log = logging.getLogger(__name__)
9+
10+
import can
11+
from can import BusABC
12+
from can.typechecking import AutoDetectedConfig
13+
14+
from .utils import pack_message, unpack_message, check_msgpack_installed
15+
16+
17+
# see socket.getaddrinfo()
18+
IPv4_ADDRESS_INFO = Tuple[str, int] # address, port
19+
IPv6_ADDRESS_INFO = Tuple[str, int, int, int] # address, port, flowinfo, scope_id
20+
IP_ADDRESS_INFO = Union[IPv4_ADDRESS_INFO, IPv6_ADDRESS_INFO]
21+
22+
# Additional constants for the interaction with Unix kernels
23+
SO_TIMESTAMPNS = 35
24+
25+
26+
class UdpMulticastBus(BusABC):
27+
"""A virtual interface for CAN communications between multiple processes using UDP over Multicast IP.
28+
29+
It supports IPv4 and IPv6, specified via the channel (which really is just a multicast IP address as a
30+
string). You can also specify the port and the IPv6 *hop limit*/the IPv4 *time to live* (TTL).
31+
32+
This bus does not support filtering based on message IDs on the kernel level but instead provides it in
33+
user space (in Python) as a fallback.
34+
35+
Both default addresses should allow for multi-host CAN networks in a normal local area network (LAN) where
36+
multicast is enabled.
37+
38+
.. note::
39+
The auto-detection of available interfaces (see) is implemented using heuristic that checks if the
40+
required socket operations are available. It then returns two configurations, one based on
41+
the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6` address and another one based on
42+
the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv4` address.
43+
44+
.. warning::
45+
The parameter `receive_own_messages` is currently unsupported and setting it to `True` will raise an
46+
exception.
47+
48+
.. warning::
49+
This interface does not make guarantees on reliable delivery and message ordering, and also does not
50+
implement rate limiting or ID arbitration/prioritization under high loads. Please refer to the section
51+
:ref:`other_virtual_interfaces` for more information on this and a comparison to alternatives.
52+
53+
:param channel: A multicast IPv4 address (in `224.0.0.0/4`) or an IPv6 address (in `ff00::/8`).
54+
This defines which version of IP is used. See
55+
`Wikipedia ("Multicast address") <https://en.wikipedia.org/wiki/Multicast_address>`__
56+
for more details on the addressing schemes.
57+
Defaults to :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6`.
58+
:param port: The IP port to read from and write to.
59+
:param hop_limit: The hop limit in IPv6 or in IPv4 the time to live (TTL).
60+
:param receive_own_messages: If transmitted messages should also be received by this bus.
61+
CURRENTLY UNSUPPORTED.
62+
:param fd:
63+
If CAN-FD frames should be supported. If set to false, an error will be raised upon sending such a
64+
frame and such received frames will be ignored.
65+
:param can_filters: See :meth:`~can.BusABC.set_filters`.
66+
67+
:raises RuntimeError: If the *msgpack*-dependency is not available. It should be installed on all
68+
non Windows platforms via the `setup.py` requirements.
69+
:raises NotImplementedError: If the `receive_own_messages` is passed as `True`.
70+
"""
71+
72+
#: An arbitrary IPv6 multicast address with "site-local" scope, i.e. only to be routed within the local
73+
#: physical network and not beyond it. It should allow for multi-host CAN networks in a normal IPv6 LAN.
74+
#: This is the default channel and should work with most modern routers if multicast is allowed.
75+
DEFAULT_GROUP_IPv6 = "ff15:7079:7468:6f6e:6465:6d6f:6d63:6173"
76+
77+
#: An arbitrary IPv4 multicast address with "administrative" scope, i.e. only to be routed within
78+
#: administrative organizational boundaries and not beyond it.
79+
#: It should allow for multi-host CAN networks in a normal IPv4 LAN.
80+
#: This is provided as a default fallback channel if IPv6 is (still) not supported.
81+
DEFAULT_GROUP_IPv4 = "239.74.163.2"
82+
83+
def __init__(
84+
self,
85+
channel: str = DEFAULT_GROUP_IPv6,
86+
port: int = 43113,
87+
hop_limit: int = 1,
88+
receive_own_messages: bool = False,
89+
fd: bool = True,
90+
**kwargs,
91+
) -> None:
92+
check_msgpack_installed()
93+
94+
if receive_own_messages:
95+
raise NotImplementedError("receiving own messages is not yet implemented")
96+
97+
super().__init__(channel, **kwargs)
98+
99+
self.is_fd = fd
100+
self._multicast = GeneralPurposeUdpMulticastBus(channel, port, hop_limit)
101+
102+
def _recv_internal(self, timeout: Optional[float]):
103+
result = self._multicast.recv(timeout)
104+
if not result:
105+
return None, False
106+
107+
data, _, timestamp = result
108+
can_message = unpack_message(data, replace={"timestamp": timestamp})
109+
110+
if not self.is_fd and can_message.is_fd:
111+
return None, False
112+
113+
return can_message, False
114+
115+
def send(self, message: can.Message, timeout: Optional[float] = None) -> None:
116+
if not self.is_fd and message.is_fd:
117+
raise RuntimeError("cannot send FD message over bus with CAN FD disabled")
118+
119+
data = pack_message(message)
120+
self._multicast.send(data, timeout)
121+
122+
def fileno(self) -> int:
123+
"""Provides the internally used file descriptor of the socket or `-1` if not available."""
124+
return self._multicast.fileno()
125+
126+
def shutdown(self) -> None:
127+
"""Close all sockets and free up any resources.
128+
129+
Never throws errors and only logs them.
130+
"""
131+
self._multicast.shutdown()
132+
133+
@staticmethod
134+
def _detect_available_configs() -> List[AutoDetectedConfig]:
135+
if hasattr(socket, "CMSG_SPACE"):
136+
return [
137+
{
138+
"interface": "udp_multicast",
139+
"channel": UdpMulticastBus.DEFAULT_GROUP_IPv6,
140+
},
141+
{
142+
"interface": "udp_multicast",
143+
"channel": UdpMulticastBus.DEFAULT_GROUP_IPv4,
144+
},
145+
]
146+
147+
# else, this interface cannot be used
148+
return []
149+
150+
151+
class GeneralPurposeUdpMulticastBus:
152+
"""A general purpose send and receive handler for multicast over IP/UDP."""
153+
154+
def __init__(
155+
self, group: str, port: int, hop_limit: int, max_buffer: int = 4096
156+
) -> None:
157+
self.group = group
158+
self.port = port
159+
self.hop_limit = hop_limit
160+
self.max_buffer = max_buffer
161+
162+
# Look up multicast group address in name server and find out IP version of the first suitable target
163+
# and then get the address family of it (socket.AF_INET or socket.AF_INET6)
164+
connection_candidates = socket.getaddrinfo( # type: ignore
165+
group, self.port, type=socket.SOCK_DGRAM
166+
)
167+
sock = None
168+
for connection_candidate in connection_candidates:
169+
address_family: socket.AddressFamily = connection_candidate[0]
170+
self.ip_version = 4 if address_family == socket.AF_INET else 6
171+
try:
172+
sock = self._create_socket(address_family)
173+
except OSError as error:
174+
log.info(
175+
f"could not connect to the multicast IP network of candidate %s; reason: {error}",
176+
connection_candidates,
177+
)
178+
if sock is not None:
179+
self._socket = sock
180+
else:
181+
raise RuntimeError("could not connect to a multicast IP network")
182+
183+
# used in recv()
184+
self.received_timestamp_struct = "@II"
185+
ancillary_data_size = struct.calcsize(self.received_timestamp_struct)
186+
self.received_ancillary_buffer_size = socket.CMSG_SPACE(ancillary_data_size)
187+
188+
# used by send()
189+
self._send_destination = (self.group, self.port)
190+
self._last_send_timeout: Optional[float] = None
191+
192+
def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket:
193+
"""Creates a new socket. This might fail and raise an exception!
194+
195+
:param address_family: whether this is of type `socket.AF_INET` or `socket.AF_INET6`
196+
:raises OSError: if the socket could not be opened or configured correctly; in this case, it is
197+
guaranteed to be closed/cleaned up
198+
"""
199+
# create the UDP socket
200+
# this might already fail but then there is nothing to clean up
201+
sock = socket.socket(address_family, socket.SOCK_DGRAM)
202+
203+
# configure the socket
204+
try:
205+
206+
# set hop limit / TTL
207+
ttl_as_binary = struct.pack("@I", self.hop_limit)
208+
if self.ip_version == 4:
209+
sock.setsockopt(
210+
socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_as_binary
211+
)
212+
else:
213+
sock.setsockopt(
214+
socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_as_binary
215+
)
216+
217+
# Allow multiple programs to access that address + port
218+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
219+
220+
# set how to receive timestamps
221+
sock.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1)
222+
223+
# Bind it to the port (on any interface)
224+
sock.bind(("", self.port))
225+
226+
# Join the multicast group
227+
group_as_binary = socket.inet_pton(address_family, self.group)
228+
if self.ip_version == 4:
229+
request = group_as_binary + struct.pack("@I", socket.INADDR_ANY)
230+
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, request)
231+
else:
232+
request = group_as_binary + struct.pack("@I", 0)
233+
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, request)
234+
235+
return sock
236+
237+
except OSError as error:
238+
# clean up the incompletely configured but opened socket
239+
try:
240+
sock.close()
241+
except OSError as close_error:
242+
# ignore but log any failures in here
243+
log.warning("Could not close partly configured socket: %s", close_error)
244+
245+
# still raise the error
246+
raise error
247+
248+
def send(self, data: bytes, timeout: Optional[float] = None) -> None:
249+
"""Send data to all group members. This call blocks.
250+
251+
:param timeout: the timeout in seconds after which an Exception is raised is sending has failed
252+
:param data: the data to be sent
253+
:raises OSError: if an error occurred while writing to the underlying socket
254+
:raises socket.timeout: if the timeout ran out before sending was completed (this is a subclass of
255+
*OSError*)
256+
"""
257+
if timeout != self._last_send_timeout:
258+
self._last_send_timeout = timeout
259+
# this applies to all blocking calls on the socket, but sending is the only one that is blocking
260+
self._socket.settimeout(timeout)
261+
262+
bytes_sent = self._socket.sendto(data, self._send_destination)
263+
if bytes_sent < len(data):
264+
raise socket.timeout()
265+
266+
def recv(
267+
self, timeout: Optional[float] = None
268+
) -> Optional[Tuple[bytes, IP_ADDRESS_INFO, float]]:
269+
"""
270+
Receive up to **max_buffer** bytes.
271+
272+
:param timeout: the timeout in seconds after which `None` is returned if no data arrived
273+
:returns: `None` on timeout, or a 3-tuple comprised of:
274+
- received data,
275+
- the sender of the data, and
276+
- a timestamp in seconds
277+
"""
278+
# get all sockets that are ready (can be a list with a single value
279+
# being self.socket or an empty list if self.socket is not ready)
280+
try:
281+
# get all sockets that are ready (can be a list with a single value
282+
# being self.socket or an empty list if self.socket is not ready)
283+
ready_receive_sockets, _, _ = select.select([self._socket], [], [], timeout)
284+
except socket.error as exc:
285+
# something bad (not a timeout) happened (e.g. the interface went down)
286+
raise can.CanError(f"Failed to wait for IP/UDP socket: {exc}")
287+
288+
if ready_receive_sockets: # not empty
289+
# fetch data & source address
290+
(
291+
raw_message_data,
292+
ancillary_data,
293+
_, # flags
294+
sender_address,
295+
) = self._socket.recvmsg(
296+
self.max_buffer, self.received_ancillary_buffer_size
297+
)
298+
299+
# fetch timestamp; this is configured in in _create_socket()
300+
assert len(ancillary_data) == 1, "only requested a single extra field"
301+
cmsg_level, cmsg_type, cmsg_data = ancillary_data[0]
302+
assert (
303+
cmsg_level == socket.SOL_SOCKET and cmsg_type == SO_TIMESTAMPNS
304+
), "received control message type that was not requested"
305+
# see https://man7.org/linux/man-pages/man3/timespec.3.html -> struct timespec for details
306+
seconds, nanoseconds = struct.unpack(
307+
self.received_timestamp_struct, cmsg_data
308+
)
309+
timestamp = seconds + nanoseconds * 1.0e-9
310+
311+
return raw_message_data, sender_address, timestamp
312+
313+
# socket wasn't readable or timeout occurred
314+
return None
315+
316+
def fileno(self) -> int:
317+
"""Provides the internally used file descriptor of the socket or `-1` if not available."""
318+
return self._socket.fileno()
319+
320+
def shutdown(self) -> None:
321+
"""Close all sockets and free up any resources.
322+
323+
Never throws errors and only logs them.
324+
"""
325+
try:
326+
self._socket.close()
327+
except OSError as exception:
328+
log.error("could not close IP socket: %s", exception)

0 commit comments

Comments
 (0)