diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..32b4d8d2d Binary files /dev/null and b/.DS_Store differ diff --git a/can/.DS_Store b/can/.DS_Store new file mode 100644 index 000000000..3b808cc85 Binary files /dev/null and b/can/.DS_Store differ diff --git a/can/interfaces/.DS_Store b/can/interfaces/.DS_Store new file mode 100644 index 000000000..63b599ffe Binary files /dev/null and b/can/interfaces/.DS_Store differ diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index 755e8675c..dff255d1e 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -12,6 +12,7 @@ "serial": ("can.interfaces.serial.serial_can", "SerialBus"), "pcan": ("can.interfaces.pcan", "PcanBus"), "usb2can": ("can.interfaces.usb2can", "Usb2canBus"), + "usb2can_libusb": ("can.interfaces.usb2can_libusb", "Usb2CanLibUsbBus"), "ixxat": ("can.interfaces.ixxat", "IXXATBus"), "nican": ("can.interfaces.nican", "NicanBus"), "iscan": ("can.interfaces.iscan", "IscanBus"), diff --git a/can/interfaces/usb2can/usb2canInterface.py b/can/interfaces/usb2can/usb2canInterface.py index e51d485cd..663baba08 100644 --- a/can/interfaces/usb2can/usb2canInterface.py +++ b/can/interfaces/usb2can/usb2canInterface.py @@ -3,8 +3,8 @@ """ import logging -from ctypes import byref -from typing import Optional +import ctypes +from typing import Optional, cast from can import BusABC, Message, CanInitializationError, CanOperationError from .usb2canabstractionlayer import Usb2CanAbstractionLayer, CanalMsg, CanalError @@ -14,6 +14,7 @@ IS_ID_TYPE, ) from .serial_selector import find_serial_devices +from can.typechecking import CanFilters, CanFilterExtended # Set up logging log = logging.getLogger("can.usb2can") @@ -87,6 +88,9 @@ class Usb2canBus(BusABC): If both `serial` and `channel` are set, `serial` will be used and channel will be ignored. + :param can_filters (optional): + See :meth:`can.BusABC.set_filters`. + """ def __init__( @@ -96,9 +100,10 @@ def __init__( flags=0x00000008, *_, bitrate=500000, + can_filters: Optional[CanFilters] = None, **kwargs, ): - + self._is_filtered = False self.can = Usb2CanAbstractionLayer(dll) # get the serial number of the device @@ -119,15 +124,15 @@ def __init__( connector = f"{device_id}; {baudrate}" self.handle = self.can.open(connector, flags) - super().__init__(channel=channel, **kwargs) + super().__init__(channel=channel, can_filters=can_filters, **kwargs) def send(self, msg, timeout=None): tx = message_convert_tx(msg) if timeout: - status = self.can.blocking_send(self.handle, byref(tx), int(timeout * 1000)) + status = self.can.blocking_send(self.handle, ctypes.byref(tx), int(timeout * 1000)) else: - status = self.can.send(self.handle, byref(tx)) + status = self.can.send(self.handle, ctypes.byref(tx)) if status != CanalError.SUCCESS: raise CanOperationError("could not send message", error_code=status) @@ -137,11 +142,11 @@ def _recv_internal(self, timeout): messagerx = CanalMsg() if timeout == 0: - status = self.can.receive(self.handle, byref(messagerx)) + status = self.can.receive(self.handle, ctypes.byref(messagerx)) else: time = 0 if timeout is None else int(timeout * 1000) - status = self.can.blocking_receive(self.handle, byref(messagerx), time) + status = self.can.blocking_receive(self.handle, ctypes.byref(messagerx), time) if status == CanalError.SUCCESS: rx = message_convert_rx(messagerx) @@ -154,7 +159,31 @@ def _recv_internal(self, timeout): else: raise CanOperationError("could not receive message", error_code=status) - return rx, False + return rx, self._is_filtered + + def _apply_filters(self, can_filters: Optional[CanFilters]) -> None: + CAN_EXTENDED_FLAG = 0x80000000 + if can_filters is None: + # Pass all messages + # can_filters = [{"can_id": 0, "can_mask": 0}] + self._is_filtered = False + return + filter_data = [] + for can_filter in can_filters: + can_id = can_filter["can_id"] + can_mask = can_filter["can_mask"] + if "extended" in can_filter: + can_filter = cast(CanFilterExtended, can_filter) + # Match on either 11-bit OR 29-bit messages instead of both + can_mask |= CAN_EXTENDED_FLAG + if can_filter["extended"]: + can_id |= CAN_EXTENDED_FLAG + filter_data.append(can_id) + filter_data.append(can_mask) + filter_len =len(filter_data) + + self.can.set_filters(self.handle,ctypes.c_int(filter_len), (ctypes.c_ulong * filter_len)(*filter_data)) + self._is_filtered = True def shutdown(self): """ diff --git a/can/interfaces/usb2can/usb2canabstractionlayer.py b/can/interfaces/usb2can/usb2canabstractionlayer.py index 8a3ae34ca..edd1272aa 100644 --- a/can/interfaces/usb2can/usb2canabstractionlayer.py +++ b/can/interfaces/usb2can/usb2canabstractionlayer.py @@ -197,3 +197,8 @@ def get_library_version(self): def get_vendor_string(self): with error_check("Failed to get vendor string"): return self.__m_dllBasic.CanalGetVendorString() + + def set_filters(self, handle, len, filters_arr): + with error_check("Failed to set filters"): + return CanalError(self.__m_dllBasic.CanalSetFilters(handle, len, filters_arr)) + diff --git a/can/interfaces/usb2can_libusb/__init__.py b/can/interfaces/usb2can_libusb/__init__.py new file mode 100644 index 000000000..53e84b325 --- /dev/null +++ b/can/interfaces/usb2can_libusb/__init__.py @@ -0,0 +1,4 @@ +""" +""" + +from .usb2can_libusb_bus import Usb2CanLibUsbBus diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_device.py b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py new file mode 100644 index 000000000..c01121daf --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_device.py @@ -0,0 +1,182 @@ +import logging +import queue +from threading import Thread +from time import sleep +from importlib import reload + +from .can_8dev_usb_utils import * + +logger = logging.getLogger(__name__) + +try: + import usb.core + import usb.util +except ImportError: + logger.warning( + "The PyUSB module is not installed. Install it using `python3 -m pip install pyusb`" + ) + + +class Can8DevUSBDevice: + cmd_rx_ep: usb.core.Endpoint + cmd_tx_ep: usb.core.Endpoint + data_rx_ep: usb.core.Endpoint + data_tx_ep: usb.core.Endpoint + serial_number: str + _close: bool + _rx_queue: queue.Queue + _recv_thread: Thread + + def __init__(self, serial_number=None): + if serial_number is not None: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, + idProduct=USB_8DEV_PRODUCT_ID, + serial_number=serial_number, + ) + else: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID + ) + + if dev is None: + raise ValueError( + "8Devices CAN interface not found! Serial number provided: %s" + % serial_number + ) + + self.serial_number = dev.serial_number + + for i in range(3): + try: + dev.reset() + #self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) + print("Pavyko reset.") + break + except Exception as x: + print(f"nepavyko reset- {i} {x}") + reload(usb.core) + reload(usb.util) + reload(usb) + if serial_number is not None: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, + idProduct=USB_8DEV_PRODUCT_ID, + serial_number=serial_number, + ) + print("Serial_number found - ",serial_number) + else: + dev = usb.core.find( + idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID + ) + print("no serial number------------------------") + #print(dev) + #sleep(7) + + + # set the active configuration. With no arguments, the first + # configuration will be the active one + dev.set_configuration() + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX + ) + self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX + ) + self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX + ) + self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( + intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX + ) + + if ( + self.cmd_rx_ep is None + or self.cmd_tx_ep is None + or self.data_rx_ep is None + or self.data_tx_ep is None + ): + raise ValueError("Could not configure 8Devices CAN endpoints!") + + self._rx_queue = queue.Queue(MAX_8DEV_RECV_QUEUE) + + def _recv_thread_loop(self): + while True: + byte_buffer = bytes() + try: + # We must read the full possible buffer size each iteration or we risk a buffer overrun exception losing data. + byte_buffer = self.data_rx_ep.read(512, 0).tobytes() + except Exception: + pass + for i in range(0, len(byte_buffer), 21): + # We could have read multiple frames in a single bulk xfer + self._rx_queue.put(Can8DevRxFrame(byte_buffer[i : i + 21])) + if self._close: + return + + def _start_recv_thread(self): + self._close = False + self._recv_thread = Thread(target=self._recv_thread_loop, daemon=True) + self._recv_thread.start() + + def _stop_recv_thread(self): + self._close = True + + def send_command(self, cmd: Can8DevCommandFrame): + self.cmd_tx_ep.write(cmd.to_bytes()) + return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) + + def open( + self, + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, + ): + self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) + open_command = can_8dev_open_frame( + phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot + ) + if self.send_command(open_command).opt1 == 0: + self._start_recv_thread() + return True + else: + return False + + def close(self): + self._stop_recv_thread() + close_command = Can8DevCommand.USB_8DEV_CLOSE + self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) #reset + self.send_command(Can8DevCommandFrame(close_command)) + + + def recv(self, timeout=None): + try: + return self._rx_queue.get(True, timeout=timeout / 1000) + except queue.Empty: + return None + + def send(self, tx_frame: Can8DevTxFrame, timeout=None): + self.data_tx_ep.write(tx_frame.to_bytes(), timeout) + + def get_version(self): + cmd_response = self.send_command( + Can8DevCommandFrame(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) + ) + version = int.from_bytes(cmd_response.data[0:4], byteorder="big") + return version + + def get_firmware_version(self): + version = self.get_version() + return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) + + def get_serial_number(self): + return self.serial_number diff --git a/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py new file mode 100644 index 000000000..b6836303d --- /dev/null +++ b/can/interfaces/usb2can_libusb/can_8dev_usb_utils.py @@ -0,0 +1,221 @@ +from enum import Enum + +MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue + +USB_8DEV_VENDOR_ID = ( + 0x0483 +) # Unfortunately this is actually the ST Microelectronics Vendor ID +USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus +USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. + +USB_8DEV_ABP_CLOCK = 32000000 + +# USB Bulk Endpoint identifiers + +USB_8DEV_ENDP_DATA_RX = 0x81 +USB_8DEV_ENDP_DATA_TX = 0x2 +USB_8DEV_ENDP_CMD_RX = 0x83 +USB_8DEV_ENDP_CMD_TX = 0x4 + +# Open Device Options + +USB_8DEV_SILENT = 0x01 +USB_8DEV_LOOPBACK = 0x02 +USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 +USB_8DEV_STATUS_FRAME = 0x08 + +# Command options +USB_8DEV_BAUD_MANUAL = 0x09 +USB_8DEV_CMD_START = 0x11 +USB_8DEV_CMD_END = 0x22 + +USB_8DEV_CMD_SUCCESS = 0 +USB_8DEV_CMD_ERROR = 255 + +USB_8DEV_CMD_TIMEOUT = 1000 + +# Framing definitions +USB_8DEV_DATA_START = 0x55 +USB_8DEV_DATA_END = 0xAA + +USB_8DEV_TYPE_CAN_FRAME = 0 +USB_8DEV_TYPE_ERROR_FRAME = 3 + +USB_8DEV_EXTID = 0x01 +USB_8DEV_RTR = 0x02 +USB_8DEV_ERR_FLAG = 0x04 + +# Status messages +USB_8DEV_STATUSMSG_OK = 0x00 +USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ +USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ +USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ +USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ +USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ +USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ +USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ +USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ +USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ +USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ + +USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ + +# Available Commands + + +class Can8DevCommand(Enum): + USB_8DEV_RESET = 1 # Reset Device + USB_8DEV_OPEN = 2 # Open Port + USB_8DEV_CLOSE = 3 # Close Port + USB_8DEV_SET_SPEED = 4 + USB_8DEV_SET_MASK_FILTER = ( + 5 + ) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares + USB_8DEV_GET_STATUS = 6 + USB_8DEV_GET_STATISTICS = 7 + USB_8DEV_GET_SERIAL = 8 + USB_8DEV_GET_SOFTW_VER = 9 + USB_8DEV_GET_HARDW_VER = 0xA + USB_8DEV_RESET_TIMESTAMP = 0xB + USB_8DEV_GET_SOFTW_HARDW_VER = 0xC + + +class Can8DevTxFrame: + flags: int + id: int + dlc: int + data: bytes + + def __init__( + self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool + ): + self.can_id = can_id + self.dlc = dlc + self.data = data + self.flags = 0 + if is_ext: + self.flags |= USB_8DEV_EXTID + if is_remote: + self.flags |= USB_8DEV_RTR + + def _pad_data(self, data: bytes): + data_bytes = bytearray(8) + for i in range(0, 7): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_DATA_START) + cmd_buf.append(self.flags) + id_bytes = self.can_id.to_bytes(4, byteorder="big") + cmd_buf.extend(id_bytes) + cmd_buf.append(self.dlc) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_DATA_END) + return bytes(cmd_buf) + + +class Can8DevRxFrame: + data: bytes + id: int + dlc: int + timestamp: int + ext_id: bool + is_error: bool + is_remote: bool + + def __init__(self, bytes_in: bytes): + if len(bytes_in) != 21: + raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") + if bytes_in[0] != USB_8DEV_DATA_START: + raise ValueError("Did not receive a valid 8Dev Data Frame") + if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: + self.data = bytes_in[8:16] + self.dlc = bytes_in[7] + self.ext_id = bytes_in[2] & USB_8DEV_EXTID + self.is_remote = bytes_in[2] & USB_8DEV_RTR + self.id = int.from_bytes(bytes_in[3:7], byteorder="big") + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + self.is_error = False + elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: + self.is_error = True + self.data = bytes_in[7:15] + self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") + else: + raise ValueError("8Dev Data Frame with Unknown Type") + + +class Can8DevCommandFrame: + command: Can8DevCommand + opt1: int + opt2: int + data: bytes + + def __init__(self, command, data=bytes(), opt1=0, opt2=0): + self.command = command + self.data = data + self.opt1 = opt1 + self.opt2 = opt2 + + def _pad_data(self, data: bytes): + data_bytes = bytearray(10) + for i in range(0, 9): + if i < len(data): + data_bytes[i] = data[i] + return bytes(data_bytes) + + def to_bytes(self): + cmd_buf = bytearray() + cmd_buf.append(USB_8DEV_CMD_START) + cmd_buf.append(0) # Supposedly could be a channel value, but unknown + cmd_buf.append(self.command.value) + cmd_buf.append(self.opt1) + cmd_buf.append(self.opt2) + cmd_buf.extend(self._pad_data(self.data)) + cmd_buf.append(USB_8DEV_CMD_END) + return bytes(cmd_buf) + + def from_bytes(byte_input: bytes): + if len(byte_input) != 16: + raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") + return Can8DevCommandFrame( + Can8DevCommand(byte_input[2]), + byte_input[5:15], + byte_input[3], + byte_input[4], + ) + + +def can_8dev_open_frame( + phase_seg1: int, + phase_seg2: int, + sjw: int, + brp: int, + loopback: bool = False, + listenonly: bool = False, + oneshot: bool = False, +) -> Can8DevCommandFrame: + open_command = Can8DevCommand.USB_8DEV_OPEN + opt1 = USB_8DEV_BAUD_MANUAL + flags = 0 + if loopback: + flags |= USB_8DEV_LOOPBACK + if listenonly: + flags |= USB_8DEV_SILENT + if oneshot: + flags |= USB_8DEV_DISABLE_AUTO_RESTRANS + flags_bytes = flags.to_bytes(4, "big") + brp_bytes = brp.to_bytes(2, "big") + data = bytearray(10) + data[0] = phase_seg1 + data[1] = phase_seg2 + data[2] = sjw + data[3] = brp_bytes[0] + data[4] = brp_bytes[1] + data[5] = flags_bytes[0] + data[6] = flags_bytes[1] + data[7] = flags_bytes[2] + data[8] = flags_bytes[3] + return Can8DevCommandFrame(open_command, data, opt1) diff --git a/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py new file mode 100644 index 000000000..e8932d010 --- /dev/null +++ b/can/interfaces/usb2can_libusb/usb2can_libusb_bus.py @@ -0,0 +1,114 @@ +""" +This interface requires LibUSB and `pyusb` to be installed on your system. +The interface will bind by default to the first device with VID +""" + +import logging +from ctypes import byref + +from can import BusABC, Message, BitTiming +from .can_8dev_usb_utils import * + +# Set up logging +log = logging.getLogger("can.usb2can_libusb") + +try: + from .can_8dev_usb_device import * +except NameError: + log.warning( + "The PyUSB module is not installed, but it is required for USB2Can_LibUSB support. Install it using `python3 -m pip install pyusb`" + ) + + +def message_convert_tx(msg): + """convert message from PythonCAN Message to 8Devices frame""" + return Can8DevTxFrame( + can_id=msg.arbitration_id, + dlc=msg.dlc, + data=msg.data, + is_ext=msg.is_extended_id, + is_remote=msg.is_remote_frame, + ) + + +def message_convert_rx(message_rx: Can8DevRxFrame): + """convert message from 8Devices frame to PythonCAN Message""" + + if message_rx.is_error: + return Message( + timestamp=message_rx.timestamp / 1000, + is_error_frame=message_rx.is_error, + data=message_rx.data, + ) + + return Message( + timestamp=message_rx.timestamp / 1000, + is_remote_frame=message_rx.is_remote, + is_extended_id=message_rx.ext_id, + is_error_frame=message_rx.is_error, + arbitration_id=message_rx.id, + dlc=message_rx.dlc, + data=message_rx.data[: message_rx.dlc], + ) + + +class Usb2CanLibUsbBus(BusABC): + """Interface to an 8Devices USB2CAN Bus. + + This device should work on any platform with a working LibUSB and PyUSB. It was tested with a "Korlan USB2Can" but should work with the older module as well. + + Hardware filtering is not provided, if anyone knows how the 8Devices filtering command works, this would be valuable. + + Based on the in-tree Linux kernel SocketCAN driver for USB2CAN. + + :param str channel (optional): + The device's serial number. If not provided, the first matching VID/DID will match (WARNING: 8Devices reuse a random ST VID/DID, so other devices may match!) + + :param int bitrate (optional): + Bitrate of channel in bit/s. Values will be limited to a maximum of 1000 Kb/s. + Default is 500 Kbs + + :param int flags (optional): + Flags to directly pass to open function of the usb2can abstraction layer. + """ + + def __init__(self, channel=None, *args, bitrate=500000, **kwargs): + + self.can = Can8DevUSBDevice(channel) + + # convert to kb/s and cap: max rate is 1000 kb/s + baudrate = min(int(bitrate // 1000), 1000) + + self.channel_info = "USB2CAN LibUSB device {}".format( + self.can.get_serial_number() + ) + + connector = "{}; {}".format("USB2Can_LibUSB", baudrate) + + timing = BitTiming( + tseg1=6, tseg2=1, sjw=1, bitrate=bitrate, f_clock=USB_8DEV_ABP_CLOCK + ) + self.can.open(timing.tseg1, timing.tseg2, timing.sjw, timing.brp) + + super().__init__(channel=channel, bitrate=bitrate, *args, **kwargs) + + def send(self, msg, timeout=None): + tx = message_convert_tx(msg) + if timeout is not None: + timeout *= 1000 + self.can.send(tx, timeout) + + def _recv_internal(self, timeout): + if timeout is not None: + timeout *= 1000 + messagerx = self.can.recv(timeout) + rx = None + if messagerx is not None: + rx = message_convert_rx(messagerx) + return rx, False + + def shutdown(self): + """ + Shuts down connection to the device. + """ + self.can.close() diff --git a/examples/libusb_receive_all.py b/examples/libusb_receive_all.py new file mode 100644 index 000000000..ac505d648 --- /dev/null +++ b/examples/libusb_receive_all.py @@ -0,0 +1,14 @@ +import can + +filters= [{"can_id": 0x1, "can_mask": 0x3, "extended": False}, + {"can_id": 0x2, "can_mask": 0x3, "extended": True}, + {"can_id": 0x3, "can_mask": 0x3}] +with can.interface.Bus(bustype="usb2can_libusb", channel="C454B93C", bitrate=1000000, can_filters=filters) as bus: + try: + while True: + msg = bus.recv(1) + if msg is not None: + print(msg) + except KeyboardInterrupt: + pass # exit normally + diff --git a/examples/pyusblist.py b/examples/pyusblist.py new file mode 100644 index 000000000..c6bf83a8c --- /dev/null +++ b/examples/pyusblist.py @@ -0,0 +1,11 @@ +#!/usr/bin/python +import sys +import usb.core + +# find USB devices +dev = usb.core.find(find_all=True) +# loop through devices, printing vendor and product ids in decimal and hex + +for cfg in dev: + sys.stdout.write('Decimal VendorID=' + str(cfg.idVendor) + ' ProductID=' + str(cfg.idProduct) + ' serial=' + str(cfg.serial_number)+'\n') + sys.stdout.write('Hexadecimal VendorID=' + hex(cfg.idVendor) + ' ProductID=' + hex(cfg.idProduct) + '\n\n') \ No newline at end of file diff --git a/examples/receive_all_korlan.py b/examples/receive_all_korlan.py new file mode 100644 index 000000000..fc51c2e27 --- /dev/null +++ b/examples/receive_all_korlan.py @@ -0,0 +1,14 @@ +import can + +filters= [{"can_id": 0x1, "can_mask": 0x3, "extended": False}, + {"can_id": 0x2, "can_mask": 0x3, "extended": True}, + {"can_id": 0x3, "can_mask": 0x3}] +with can.interface.Bus(bustype="usb2can", channel="cde7e976", bitrate=1000000, dll='./usb2can.dll', can_filters=filters) as bus: + try: + while True: + msg = bus.recv(1) + if msg is not None: + print(msg) + except KeyboardInterrupt: + pass # exit normally + diff --git a/examples/send_one_korlan.py b/examples/send_one_korlan.py new file mode 100644 index 000000000..d604a9be4 --- /dev/null +++ b/examples/send_one_korlan.py @@ -0,0 +1,18 @@ +import can +from time import sleep +from importlib import reload + +# +for j in range(1): + with (can.interface.Bus(bustype="usb2can_libusb", channel="CDE7E976", bitrate=1000000)) as bus: + for i in range(200): + msg = can.Message(arbitration_id=0x1, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=False) + try: + bus.send(msg) + #print(f"Message {i} sent on {bus.channel_info}") + except can.CanError: + print("Message NOT sent") + bus.shutdown() + print(f"closing after series {j+1}") + #sleep(1) + #reload(can) diff --git a/receive_all.py b/receive_all.py new file mode 100644 index 000000000..7ff532079 --- /dev/null +++ b/receive_all.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python + +""" +Shows how the receive messages via polling. +""" + +import can +from can.bus import BusState + + +def receive_all(): + """Receives all messages and prints them to the console until Ctrl+C is pressed.""" + + with can.interface.Bus( + bustype="pcan", channel="PCAN_USBBUS1", bitrate=250000 + ) as bus: + # bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=250000) + # bus = can.interface.Bus(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000) + + # set to read-only, only supported on some interfaces + bus.state = BusState.PASSIVE + + try: + while True: + msg = bus.recv(1) + if msg is not None: + print(msg) + + except KeyboardInterrupt: + pass # exit normally + + +if __name__ == "__main__": + receive_all() diff --git a/usb2can.dll b/usb2can.dll new file mode 100644 index 000000000..478fcc054 Binary files /dev/null and b/usb2can.dll differ