forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnixnet.py
More file actions
270 lines (221 loc) · 8.93 KB
/
Copy pathnixnet.py
File metadata and controls
270 lines (221 loc) · 8.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""
NI-XNET interface module.
Implementation references:
NI-XNET Hardware and Software Manual: https://www.ni.com/pdf/manuals/372840h.pdf
NI-XNET Python implementation: https://github.com/ni/nixnet-python
Authors: Javier Rubio Giménez <jvrr20@gmail.com>, Jose A. Escobar <joseleescobar@hotmail.com>
"""
import logging
import sys
import time
import struct
from can import CanError, BusABC, Message
logger = logging.getLogger(__name__)
if sys.platform == "win32":
try:
from nixnet import (
session,
types,
constants,
errors,
system,
database,
XnetError,
)
except ImportError:
logger.error("Error, NIXNET python module cannot be loaded.")
raise ImportError()
else:
logger.error("NI-XNET interface is only available on Windows systems")
raise NotImplementedError("NiXNET is not supported on not Win32 platforms")
class NiXNETcanBus(BusABC):
"""
The CAN Bus implemented for the NI-XNET interface.
"""
def __init__(
self,
channel,
can_filters=None,
bitrate=None,
fd=False,
fd_bitrate=None,
brs=False,
can_termination=False,
log_errors=True,
**kwargs
):
"""
:param str channel:
Name of the object to open (e.g. 'CAN0')
:param int bitrate:
Bitrate in bits/s
:param list can_filters:
See :meth:`can.BusABC.set_filters`.
:param bool log_errors:
If True, communication errors will appear as CAN messages with
``is_error_frame`` set to True and ``arbitration_id`` will identify
the error (default True)
:raises can.interfaces.nixnet.NiXNETError:
If starting communication fails
"""
self._rx_queue = []
self.channel = channel
self.channel_info = "NI-XNET: " + channel
# Set database for the initialization
if not fd:
database_name = ":memory:"
else:
if not brs:
database_name = ":can_fd:"
else:
database_name = ":can_fd_brs:"
try:
# We need two sessions for this application, one to send frames and another to receive them
self.__session_send = session.FrameOutStreamSession(
channel, database_name=database_name
)
self.__session_receive = session.FrameInStreamSession(
channel, database_name=database_name
)
# We stop the sessions to allow reconfiguration, as by default they autostart at creation
self.__session_send.stop()
self.__session_receive.stop()
# See page 1017 of NI-XNET Hardware and Software Manual to set custom can configuration
if bitrate:
self.__session_send.intf.baud_rate = bitrate
self.__session_receive.intf.baud_rate = bitrate
if fd_bitrate:
# See page 951 of NI-XNET Hardware and Software Manual to set custom can configuration
self.__session_send.intf.can_fd_baud_rate = fd_bitrate
self.__session_receive.intf.can_fd_baud_rate = fd_bitrate
if can_termination:
self.__session_send.intf.can_term = constants.CanTerm.ON
self.__session_receive.intf.can_term = constants.CanTerm.ON
self.__session_receive.queue_size = 512
# Once that all the parameters have been restarted, we start the sessions
self.__session_send.start()
self.__session_receive.start()
except errors.XnetError as err:
raise NiXNETError(function="__init__", error_message=err.args[0]) from None
self._is_filtered = False
super(NiXNETcanBus, self).__init__(
channel=channel,
can_filters=can_filters,
bitrate=bitrate,
log_errors=log_errors,
**kwargs
)
def _recv_internal(self, timeout):
try:
if len(self._rx_queue) == 0:
fr = self.__session_receive.frames.read(4, timeout=0)
for f in fr:
self._rx_queue.append(f)
can_frame = self._rx_queue.pop(0)
# Timestamp should be converted from raw frame format(100ns increment from(12:00 a.m. January 1 1601 Coordinated
# Universal Time (UTC)) to epoch time(number of seconds from January 1, 1970 (midnight UTC/GMT))
msg = Message(
timestamp=can_frame.timestamp / 10000000.0 - 11644473600,
channel=self.channel,
is_remote_frame=can_frame.type == constants.FrameType.CAN_REMOTE,
is_error_frame=can_frame.type == constants.FrameType.CAN_BUS_ERROR,
is_fd=(
can_frame.type == constants.FrameType.CANFD_DATA
or can_frame.type == constants.FrameType.CANFDBRS_DATA
),
bitrate_switch=can_frame.type == constants.FrameType.CANFDBRS_DATA,
is_extended_id=can_frame.identifier.extended,
# Get identifier from CanIdentifier structure
arbitration_id=can_frame.identifier.identifier,
dlc=len(can_frame.payload),
data=can_frame.payload,
)
return msg, self._filters is None
except Exception as e:
# print('Error: ', e)
return None, self._filters is None
def send(self, msg, timeout=None):
"""
Send a message using NI-XNET.
:param can.Message msg:
Message to send
:param float timeout:
Max time to wait for the device to be ready in seconds, None if time is infinite
:raises can.interfaces.nixnet.NiXNETError:
If writing to transmit buffer fails.
It does not wait for message to be ACKed currently.
"""
if timeout is None:
timeout = constants.TIMEOUT_INFINITE
if msg.is_remote_frame:
type_message = constants.FrameType.CAN_REMOTE
elif msg.is_error_frame:
type_message = constants.FrameType.CAN_BUS_ERROR
elif msg.is_fd:
if msg.bitrate_switch:
type_message = constants.FrameType.CANFDBRS_DATA
else:
type_message = constants.FrameType.CANFD_DATA
else:
type_message = constants.FrameType.CAN_DATA
can_frame = types.CanFrame(
types.CanIdentifier(msg.arbitration_id, msg.is_extended_id),
type=type_message,
payload=msg.data,
)
try:
self.__session_send.frames.write([can_frame], timeout)
except errors.XnetError as err:
raise NiXNETError(function="send", error_message=err.args[0]) from None
def reset(self):
"""
Resets network interface. Stops network interface, then resets the CAN
chip to clear the CAN error counters (clear error passive state).
Resetting includes clearing all entries from read and write queues.
"""
self.__session_send.flush()
self.__session_receive.flush()
self.__session_send.stop()
self.__session_receive.stop()
self.__session_send.start()
self.__session_receive.start()
def shutdown(self):
"""Close object."""
self.__session_send.flush()
self.__session_receive.flush()
self.__session_send.stop()
self.__session_receive.stop()
self.__session_send.close()
self.__session_receive.close()
@staticmethod
def _detect_available_configs():
configs = []
try:
with system.System() as nixnet_system:
for interface in nixnet_system.intf_refs_can:
cahnnel = str(interface)
logger.debug(
"Found channel index %d: %s", interface.port_num, cahnnel
)
configs.append(
{
"interface": "nixnet",
"channel": cahnnel,
"can_term_available": interface.can_term_cap
== constants.CanTermCap.YES,
}
)
except XnetError as error:
logger.debug("An error occured while searching for configs: %s", str(error))
return configs
# To-Do review error management, I don't like this implementation
class NiXNETError(CanError):
"""Error from NI-XNET driver."""
def __init__(self, function="", error_message=""):
super(NiXNETError, self).__init__()
#: Function that failed
self.function = function
#: Arguments passed to function
self.error_message = error_message
def __str__(self):
return "Function %s failed:\n%s" % (self.function, self.error_message)