forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinterface.py
More file actions
119 lines (95 loc) · 4.68 KB
/
interface.py
File metadata and controls
119 lines (95 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import can
from can.util import load_config, choose_socketcan_implementation
from can.broadcastmanager import CyclicSendTaskABC, MultiRateCyclicSendTaskABC
VALID_INTERFACES = set(['kvaser', 'serial', 'pcan', 'socketcan_native',
'socketcan_ctypes', 'socketcan', 'usb2can', 'ixxat',
'virtual'])
class Bus(object):
"""
Instantiates a CAN Bus of the given `bustype`, falls back to reading a
configuration file from default locations.
"""
@classmethod
def __new__(cls, other, channel=None, *args, **kwargs):
if 'bustype' in kwargs:
can.rc['interface'] = kwargs['bustype']
del kwargs['bustype']
if can.rc['interface'] == 'socketcan':
can.rc['interface'] = choose_socketcan_implementation()
if 'interface' not in can.rc or 'channel' not in can.rc or can.rc['interface'] is None:
can.log.debug("Loading default configuration")
# Load defaults
can.rc = load_config()
if can.rc['interface'] not in VALID_INTERFACES:
raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))
# Import the correct Bus backend
interface = can.rc['interface']
if interface == 'kvaser':
from can.interfaces.kvaser import KvaserBus
cls = KvaserBus
elif interface == 'socketcan_ctypes':
from can.interfaces.socketcan_ctypes import SocketscanCtypes_Bus
cls = SocketscanCtypes_Bus
elif interface == 'socketcan_native':
from can.interfaces.socketcan_native import SocketscanNative_Bus
cls = SocketscanNative_Bus
elif interface == 'serial':
from can.interfaces.serial_can import SerialBus
cls = SerialBus
elif interface == 'pcan':
from can.interfaces.pcan import PcanBus
cls = PcanBus
elif interface == 'usb2can':
from can.interfaces.usb2canInterface import Usb2canBus
cls = Usb2canBus
elif interface == 'ixxat':
from can.interfaces.ixxat import IXXATBus
cls = IXXATBus
elif interface == 'virtual':
from can.interfaces.virtual import VirtualBus
cls = VirtualBus
else:
raise NotImplementedError("CAN interface '{}' not supported".format(interface))
if channel is None:
channel = can.rc['channel']
return cls(channel, **kwargs)
class CyclicSendTask(CyclicSendTaskABC):
@classmethod
def __new__(cls, other, channel, *args, **kwargs):
# If can.rc doesn't look valid: load default
if 'interface' not in can.rc or 'channel' not in can.rc:
can.log.debug("Loading default configuration")
can.rc = load_config()
print(can.rc)
if can.rc['interface'] not in VALID_INTERFACES:
raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))
# Import the correct implementation of CyclicSendTask
if can.rc['interface'] == 'socketcan_ctypes':
from can.interfaces.socketcan_ctypes import CyclicSendTask as _ctypesCyclicSendTask
cls = _ctypesCyclicSendTask
elif can.rc['interface'] == 'socketcan_native':
from can.interfaces.socketcan_native import CyclicSendTask as _nativeCyclicSendTask
cls = _nativeCyclicSendTask
else:
can.log.info("Current CAN interface doesn't support CyclicSendTask")
return cls(channel, *args, **kwargs)
class MultiRateCyclicSendTask(MultiRateCyclicSendTaskABC):
@classmethod
def __new__(cls, other, channel, *args, **kwargs):
# If can.rc doesn't look valid: load default
if 'interface' not in can.rc or 'channel' not in can.rc:
can.log.debug("Loading default configuration")
can.rc = load_config()
print(can.rc)
if can.rc['interface'] not in VALID_INTERFACES:
raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))
# Import the correct implementation of CyclicSendTask
if can.rc['interface'] == 'socketcan_ctypes':
from can.interfaces.socketcan_ctypes import MultiRateCyclicSendTask as _ctypesMultiRateCyclicSendTask
cls = _ctypesMultiRateCyclicSendTask
elif can.rc['interface'] == 'socketcan_native':
from can.interfaces.socketcan_native import MultiRateCyclicSendTask as _nativeMultiRateCyclicSendTask
cls = _nativeMultiRateCyclicSendTask
else:
can.log.info("Current CAN interface doesn't support CyclicSendTask")
return cls(channel, *args, **kwargs)