|
| 1 | +import logging |
| 2 | +import select |
| 3 | +import socket |
| 4 | +import struct |
| 5 | + |
| 6 | +from typing import List, Optional, Tuple, Union |
| 7 | + |
| 8 | +log = logging.getLogger(__name__) |
| 9 | + |
| 10 | +import can |
| 11 | +from can import BusABC |
| 12 | +from can.typechecking import AutoDetectedConfig |
| 13 | + |
| 14 | +from .utils import pack_message, unpack_message, check_msgpack_installed |
| 15 | + |
| 16 | + |
| 17 | +# see socket.getaddrinfo() |
| 18 | +IPv4_ADDRESS_INFO = Tuple[str, int] # address, port |
| 19 | +IPv6_ADDRESS_INFO = Tuple[str, int, int, int] # address, port, flowinfo, scope_id |
| 20 | +IP_ADDRESS_INFO = Union[IPv4_ADDRESS_INFO, IPv6_ADDRESS_INFO] |
| 21 | + |
| 22 | +# Additional constants for the interaction with Unix kernels |
| 23 | +SO_TIMESTAMPNS = 35 |
| 24 | + |
| 25 | + |
| 26 | +class UdpMulticastBus(BusABC): |
| 27 | + """A virtual interface for CAN communications between multiple processes using UDP over Multicast IP. |
| 28 | +
|
| 29 | + It supports IPv4 and IPv6, specified via the channel (which really is just a multicast IP address as a |
| 30 | + string). You can also specify the port and the IPv6 *hop limit*/the IPv4 *time to live* (TTL). |
| 31 | +
|
| 32 | + This bus does not support filtering based on message IDs on the kernel level but instead provides it in |
| 33 | + user space (in Python) as a fallback. |
| 34 | +
|
| 35 | + Both default addresses should allow for multi-host CAN networks in a normal local area network (LAN) where |
| 36 | + multicast is enabled. |
| 37 | +
|
| 38 | + .. note:: |
| 39 | + The auto-detection of available interfaces (see) is implemented using heuristic that checks if the |
| 40 | + required socket operations are available. It then returns two configurations, one based on |
| 41 | + the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6` address and another one based on |
| 42 | + the :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv4` address. |
| 43 | +
|
| 44 | + .. warning:: |
| 45 | + The parameter `receive_own_messages` is currently unsupported and setting it to `True` will raise an |
| 46 | + exception. |
| 47 | +
|
| 48 | + .. warning:: |
| 49 | + This interface does not make guarantees on reliable delivery and message ordering, and also does not |
| 50 | + implement rate limiting or ID arbitration/prioritization under high loads. Please refer to the section |
| 51 | + :ref:`other_virtual_interfaces` for more information on this and a comparison to alternatives. |
| 52 | +
|
| 53 | + :param channel: A multicast IPv4 address (in `224.0.0.0/4`) or an IPv6 address (in `ff00::/8`). |
| 54 | + This defines which version of IP is used. See |
| 55 | + `Wikipedia ("Multicast address") <https://en.wikipedia.org/wiki/Multicast_address>`__ |
| 56 | + for more details on the addressing schemes. |
| 57 | + Defaults to :attr:`~UdpMulticastBus.DEFAULT_GROUP_IPv6`. |
| 58 | + :param port: The IP port to read from and write to. |
| 59 | + :param hop_limit: The hop limit in IPv6 or in IPv4 the time to live (TTL). |
| 60 | + :param receive_own_messages: If transmitted messages should also be received by this bus. |
| 61 | + CURRENTLY UNSUPPORTED. |
| 62 | + :param fd: |
| 63 | + If CAN-FD frames should be supported. If set to false, an error will be raised upon sending such a |
| 64 | + frame and such received frames will be ignored. |
| 65 | + :param can_filters: See :meth:`~can.BusABC.set_filters`. |
| 66 | +
|
| 67 | + :raises RuntimeError: If the *msgpack*-dependency is not available. It should be installed on all |
| 68 | + non Windows platforms via the `setup.py` requirements. |
| 69 | + :raises NotImplementedError: If the `receive_own_messages` is passed as `True`. |
| 70 | + """ |
| 71 | + |
| 72 | + #: An arbitrary IPv6 multicast address with "site-local" scope, i.e. only to be routed within the local |
| 73 | + #: physical network and not beyond it. It should allow for multi-host CAN networks in a normal IPv6 LAN. |
| 74 | + #: This is the default channel and should work with most modern routers if multicast is allowed. |
| 75 | + DEFAULT_GROUP_IPv6 = "ff15:7079:7468:6f6e:6465:6d6f:6d63:6173" |
| 76 | + |
| 77 | + #: An arbitrary IPv4 multicast address with "administrative" scope, i.e. only to be routed within |
| 78 | + #: administrative organizational boundaries and not beyond it. |
| 79 | + #: It should allow for multi-host CAN networks in a normal IPv4 LAN. |
| 80 | + #: This is provided as a default fallback channel if IPv6 is (still) not supported. |
| 81 | + DEFAULT_GROUP_IPv4 = "239.74.163.2" |
| 82 | + |
| 83 | + def __init__( |
| 84 | + self, |
| 85 | + channel: str = DEFAULT_GROUP_IPv6, |
| 86 | + port: int = 43113, |
| 87 | + hop_limit: int = 1, |
| 88 | + receive_own_messages: bool = False, |
| 89 | + fd: bool = True, |
| 90 | + **kwargs, |
| 91 | + ) -> None: |
| 92 | + check_msgpack_installed() |
| 93 | + |
| 94 | + if receive_own_messages: |
| 95 | + raise NotImplementedError("receiving own messages is not yet implemented") |
| 96 | + |
| 97 | + super().__init__(channel, **kwargs) |
| 98 | + |
| 99 | + self.is_fd = fd |
| 100 | + self._multicast = GeneralPurposeUdpMulticastBus(channel, port, hop_limit) |
| 101 | + |
| 102 | + def _recv_internal(self, timeout: Optional[float]): |
| 103 | + result = self._multicast.recv(timeout) |
| 104 | + if not result: |
| 105 | + return None, False |
| 106 | + |
| 107 | + data, _, timestamp = result |
| 108 | + can_message = unpack_message(data, replace={"timestamp": timestamp}) |
| 109 | + |
| 110 | + if not self.is_fd and can_message.is_fd: |
| 111 | + return None, False |
| 112 | + |
| 113 | + return can_message, False |
| 114 | + |
| 115 | + def send(self, message: can.Message, timeout: Optional[float] = None) -> None: |
| 116 | + if not self.is_fd and message.is_fd: |
| 117 | + raise RuntimeError("cannot send FD message over bus with CAN FD disabled") |
| 118 | + |
| 119 | + data = pack_message(message) |
| 120 | + self._multicast.send(data, timeout) |
| 121 | + |
| 122 | + def fileno(self) -> int: |
| 123 | + """Provides the internally used file descriptor of the socket or `-1` if not available.""" |
| 124 | + return self._multicast.fileno() |
| 125 | + |
| 126 | + def shutdown(self) -> None: |
| 127 | + """Close all sockets and free up any resources. |
| 128 | +
|
| 129 | + Never throws errors and only logs them. |
| 130 | + """ |
| 131 | + self._multicast.shutdown() |
| 132 | + |
| 133 | + @staticmethod |
| 134 | + def _detect_available_configs() -> List[AutoDetectedConfig]: |
| 135 | + if hasattr(socket, "CMSG_SPACE"): |
| 136 | + return [ |
| 137 | + { |
| 138 | + "interface": "udp_multicast", |
| 139 | + "channel": UdpMulticastBus.DEFAULT_GROUP_IPv6, |
| 140 | + }, |
| 141 | + { |
| 142 | + "interface": "udp_multicast", |
| 143 | + "channel": UdpMulticastBus.DEFAULT_GROUP_IPv4, |
| 144 | + }, |
| 145 | + ] |
| 146 | + |
| 147 | + # else, this interface cannot be used |
| 148 | + return [] |
| 149 | + |
| 150 | + |
| 151 | +class GeneralPurposeUdpMulticastBus: |
| 152 | + """A general purpose send and receive handler for multicast over IP/UDP.""" |
| 153 | + |
| 154 | + def __init__( |
| 155 | + self, group: str, port: int, hop_limit: int, max_buffer: int = 4096 |
| 156 | + ) -> None: |
| 157 | + self.group = group |
| 158 | + self.port = port |
| 159 | + self.hop_limit = hop_limit |
| 160 | + self.max_buffer = max_buffer |
| 161 | + |
| 162 | + # Look up multicast group address in name server and find out IP version of the first suitable target |
| 163 | + # and then get the address family of it (socket.AF_INET or socket.AF_INET6) |
| 164 | + connection_candidates = socket.getaddrinfo( # type: ignore |
| 165 | + group, self.port, type=socket.SOCK_DGRAM |
| 166 | + ) |
| 167 | + sock = None |
| 168 | + for connection_candidate in connection_candidates: |
| 169 | + address_family: socket.AddressFamily = connection_candidate[0] |
| 170 | + self.ip_version = 4 if address_family == socket.AF_INET else 6 |
| 171 | + try: |
| 172 | + sock = self._create_socket(address_family) |
| 173 | + except OSError as error: |
| 174 | + log.info( |
| 175 | + f"could not connect to the multicast IP network of candidate %s; reason: {error}", |
| 176 | + connection_candidates, |
| 177 | + ) |
| 178 | + if sock is not None: |
| 179 | + self._socket = sock |
| 180 | + else: |
| 181 | + raise RuntimeError("could not connect to a multicast IP network") |
| 182 | + |
| 183 | + # used in recv() |
| 184 | + self.received_timestamp_struct = "@II" |
| 185 | + ancillary_data_size = struct.calcsize(self.received_timestamp_struct) |
| 186 | + self.received_ancillary_buffer_size = socket.CMSG_SPACE(ancillary_data_size) |
| 187 | + |
| 188 | + # used by send() |
| 189 | + self._send_destination = (self.group, self.port) |
| 190 | + self._last_send_timeout: Optional[float] = None |
| 191 | + |
| 192 | + def _create_socket(self, address_family: socket.AddressFamily) -> socket.socket: |
| 193 | + """Creates a new socket. This might fail and raise an exception! |
| 194 | +
|
| 195 | + :param address_family: whether this is of type `socket.AF_INET` or `socket.AF_INET6` |
| 196 | + :raises OSError: if the socket could not be opened or configured correctly; in this case, it is |
| 197 | + guaranteed to be closed/cleaned up |
| 198 | + """ |
| 199 | + # create the UDP socket |
| 200 | + # this might already fail but then there is nothing to clean up |
| 201 | + sock = socket.socket(address_family, socket.SOCK_DGRAM) |
| 202 | + |
| 203 | + # configure the socket |
| 204 | + try: |
| 205 | + |
| 206 | + # set hop limit / TTL |
| 207 | + ttl_as_binary = struct.pack("@I", self.hop_limit) |
| 208 | + if self.ip_version == 4: |
| 209 | + sock.setsockopt( |
| 210 | + socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_as_binary |
| 211 | + ) |
| 212 | + else: |
| 213 | + sock.setsockopt( |
| 214 | + socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_as_binary |
| 215 | + ) |
| 216 | + |
| 217 | + # Allow multiple programs to access that address + port |
| 218 | + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 219 | + |
| 220 | + # set how to receive timestamps |
| 221 | + sock.setsockopt(socket.SOL_SOCKET, SO_TIMESTAMPNS, 1) |
| 222 | + |
| 223 | + # Bind it to the port (on any interface) |
| 224 | + sock.bind(("", self.port)) |
| 225 | + |
| 226 | + # Join the multicast group |
| 227 | + group_as_binary = socket.inet_pton(address_family, self.group) |
| 228 | + if self.ip_version == 4: |
| 229 | + request = group_as_binary + struct.pack("@I", socket.INADDR_ANY) |
| 230 | + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, request) |
| 231 | + else: |
| 232 | + request = group_as_binary + struct.pack("@I", 0) |
| 233 | + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, request) |
| 234 | + |
| 235 | + return sock |
| 236 | + |
| 237 | + except OSError as error: |
| 238 | + # clean up the incompletely configured but opened socket |
| 239 | + try: |
| 240 | + sock.close() |
| 241 | + except OSError as close_error: |
| 242 | + # ignore but log any failures in here |
| 243 | + log.warning("Could not close partly configured socket: %s", close_error) |
| 244 | + |
| 245 | + # still raise the error |
| 246 | + raise error |
| 247 | + |
| 248 | + def send(self, data: bytes, timeout: Optional[float] = None) -> None: |
| 249 | + """Send data to all group members. This call blocks. |
| 250 | +
|
| 251 | + :param timeout: the timeout in seconds after which an Exception is raised is sending has failed |
| 252 | + :param data: the data to be sent |
| 253 | + :raises OSError: if an error occurred while writing to the underlying socket |
| 254 | + :raises socket.timeout: if the timeout ran out before sending was completed (this is a subclass of |
| 255 | + *OSError*) |
| 256 | + """ |
| 257 | + if timeout != self._last_send_timeout: |
| 258 | + self._last_send_timeout = timeout |
| 259 | + # this applies to all blocking calls on the socket, but sending is the only one that is blocking |
| 260 | + self._socket.settimeout(timeout) |
| 261 | + |
| 262 | + bytes_sent = self._socket.sendto(data, self._send_destination) |
| 263 | + if bytes_sent < len(data): |
| 264 | + raise socket.timeout() |
| 265 | + |
| 266 | + def recv( |
| 267 | + self, timeout: Optional[float] = None |
| 268 | + ) -> Optional[Tuple[bytes, IP_ADDRESS_INFO, float]]: |
| 269 | + """ |
| 270 | + Receive up to **max_buffer** bytes. |
| 271 | +
|
| 272 | + :param timeout: the timeout in seconds after which `None` is returned if no data arrived |
| 273 | + :returns: `None` on timeout, or a 3-tuple comprised of: |
| 274 | + - received data, |
| 275 | + - the sender of the data, and |
| 276 | + - a timestamp in seconds |
| 277 | + """ |
| 278 | + # get all sockets that are ready (can be a list with a single value |
| 279 | + # being self.socket or an empty list if self.socket is not ready) |
| 280 | + try: |
| 281 | + # get all sockets that are ready (can be a list with a single value |
| 282 | + # being self.socket or an empty list if self.socket is not ready) |
| 283 | + ready_receive_sockets, _, _ = select.select([self._socket], [], [], timeout) |
| 284 | + except socket.error as exc: |
| 285 | + # something bad (not a timeout) happened (e.g. the interface went down) |
| 286 | + raise can.CanError(f"Failed to wait for IP/UDP socket: {exc}") |
| 287 | + |
| 288 | + if ready_receive_sockets: # not empty |
| 289 | + # fetch data & source address |
| 290 | + ( |
| 291 | + raw_message_data, |
| 292 | + ancillary_data, |
| 293 | + _, # flags |
| 294 | + sender_address, |
| 295 | + ) = self._socket.recvmsg( |
| 296 | + self.max_buffer, self.received_ancillary_buffer_size |
| 297 | + ) |
| 298 | + |
| 299 | + # fetch timestamp; this is configured in in _create_socket() |
| 300 | + assert len(ancillary_data) == 1, "only requested a single extra field" |
| 301 | + cmsg_level, cmsg_type, cmsg_data = ancillary_data[0] |
| 302 | + assert ( |
| 303 | + cmsg_level == socket.SOL_SOCKET and cmsg_type == SO_TIMESTAMPNS |
| 304 | + ), "received control message type that was not requested" |
| 305 | + # see https://man7.org/linux/man-pages/man3/timespec.3.html -> struct timespec for details |
| 306 | + seconds, nanoseconds = struct.unpack( |
| 307 | + self.received_timestamp_struct, cmsg_data |
| 308 | + ) |
| 309 | + timestamp = seconds + nanoseconds * 1.0e-9 |
| 310 | + |
| 311 | + return raw_message_data, sender_address, timestamp |
| 312 | + |
| 313 | + # socket wasn't readable or timeout occurred |
| 314 | + return None |
| 315 | + |
| 316 | + def fileno(self) -> int: |
| 317 | + """Provides the internally used file descriptor of the socket or `-1` if not available.""" |
| 318 | + return self._socket.fileno() |
| 319 | + |
| 320 | + def shutdown(self) -> None: |
| 321 | + """Close all sockets and free up any resources. |
| 322 | +
|
| 323 | + Never throws errors and only logs them. |
| 324 | + """ |
| 325 | + try: |
| 326 | + self._socket.close() |
| 327 | + except OSError as exception: |
| 328 | + log.error("could not close IP socket: %s", exception) |
0 commit comments