forked from brendan-w/python-OBD
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobd.py
More file actions
403 lines (315 loc) · 15.4 KB
/
obd.py
File metadata and controls
403 lines (315 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# -*- coding: utf-8 -*-
########################################################################
# #
# python-OBD: A python OBD-II serial module derived from pyobd #
# #
# Copyright 2004 Donour Sizemore (donour@uchicago.edu) #
# Copyright 2009 Secons Ltd. (www.obdtester.com) #
# Copyright 2009 Peter J. Creath #
# Copyright 2016 Brendan Whitfield (brendan-w.com) #
# #
########################################################################
# #
# obd.py #
# #
# This file is part of python-OBD (a derivative of pyOBD) #
# #
# python-OBD is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 2 of the License, or #
# (at your option) any later version. #
# #
# python-OBD is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with python-OBD. If not, see <http://www.gnu.org/licenses/>. #
# #
########################################################################
import logging
from .__version__ import __version__
from .elm327 import ELM327
from .commands import commands
from .OBDResponse import OBDResponse
from .utils import scan_serial, OBDStatus
from .protocols import Message
from binascii import hexlify, unhexlify
logger = logging.getLogger(__name__)
CAN_PROTOCOLS = ("6", "7", "8", "9")
class OBD(object):
"""
Class representing an OBD-II connection
with it's assorted commands/sensors.
"""
def __init__(self, portstr=None, baudrate=None, protocol=None, fast=True):
self.interface = None
self.supported_commands = set(commands.base_commands())
self.fast = fast # global switch for disabling optimizations
self.__last_command = b"" # used for running the previous command with a CR
self.__frame_counts = {} # keeps track of the number of return frames for each command
logger.info("!=================== python-OBD_multi (v%s) ===================!" % __version__)
self.__connect(portstr, baudrate, protocol) # initialize by connecting and loading sensors
self.__load_commands() # try to load the car's supported commands
logger.info("===================================================================")
def __connect(self, portstr, baudrate, protocol):
"""
Attempts to instantiate an ELM327 connection object.
"""
if portstr is None:
logger.info("Using scan_serial to select port")
portnames = scan_serial()
logger.info("Available ports: " + str(portnames))
if not portnames:
logger.warning("No OBD-II adapters found")
return
for port in portnames:
logger.info("Attempting to use port: " + str(port))
self.interface = ELM327(port, baudrate, protocol)
if self.interface.status() >= OBDStatus.ELM_CONNECTED:
break # success! stop searching for serial
else:
logger.info("Explicit port defined")
self.interface = ELM327(portstr, baudrate, protocol)
# if the connection failed, close it
if self.interface.status() == OBDStatus.NOT_CONNECTED:
# the ELM327 class will report its own errors
self.close()
def __load_commands(self):
"""
Queries for available PIDs, sets their support status,
and compiles a list of command objects.
"""
if self.status() != OBDStatus.CAR_CONNECTED:
logger.warning("Cannot load commands: No connection to car")
return
logger.info("querying for supported commands")
pid_getters = commands.pid_getters()
for get in pid_getters:
# PID listing commands should sequentialy become supported
# Mode 1 PID 0 is assumed to always be supported
if not self.test_cmd(get, warn=False):
continue
# when querying, only use the blocking OBD.query()
# prevents problems when query is redefined in a subclass (like Async)
response = OBD.query(self, get)
if response.is_null():
logger.info("No valid data for PID listing command: %s" % get)
continue
# loop through PIDs bitarray
for i, bit in enumerate(response.value):
if bit:
mode = get.mode
pid = get.pid + i + 1
if commands.has_pid(mode, pid):
self.supported_commands.add(commands[mode][pid])
# set support for mode 2 commands
if mode == 1 and commands.has_pid(2, pid):
self.supported_commands.add(commands[2][pid])
logger.info("finished querying with %d commands supported" % len(self.supported_commands))
def close(self):
"""
Closes the connection, and clears supported_commands
"""
self.supported_commands = set()
if self.interface is not None:
logger.info("Closing connection")
self.interface.close()
self.interface = None
def status(self):
""" returns the OBD connection status """
if self.interface is None:
return OBDStatus.NOT_CONNECTED
else:
return self.interface.status()
# not sure how useful this would be
# def ecus(self):
# """ returns a list of ECUs in the vehicle """
# if self.interface is None:
# return []
# else:
# return self.interface.ecus()
def protocol_name(self):
""" returns the name of the protocol being used by the ELM327 """
if self.interface is None:
return ""
else:
return self.interface.protocol_name()
def protocol_id(self):
""" returns the ID of the protocol being used by the ELM327 """
if self.interface is None:
return ""
else:
return self.interface.protocol_id()
def port_name(self):
""" Returns the name of the currently connected port """
if self.interface is not None:
return self.interface.port_name()
else:
return ""
def is_connected(self):
"""
Returns a boolean for whether a connection with the car was made.
Note: this function returns False when:
obd.status = OBDStatus.ELM_CONNECTED
"""
return self.status() == OBDStatus.CAR_CONNECTED
def print_commands(self):
"""
Utility function meant for working in interactive mode.
Prints all commands supported by the car.
"""
for c in self.supported_commands:
print(str(c))
def supports(self, cmd):
"""
Returns a boolean for whether the given command
is supported by the car
"""
return cmd in self.supported_commands
def test_cmd(self, cmd, warn=True):
"""
Returns a boolean for whether a command will
be sent without using force=True.
"""
# test if the command is supported
if not self.supports(cmd):
if warn:
logger.warning("'%s' is not supported" % str(cmd))
return False
# mode 06 is only implemented for the CAN protocols
if cmd.mode == 6 and self.interface.protocol_id() not in CAN_PROTOCOLS:
if warn:
logger.warning("Mode 06 commands are only supported over CAN protocols")
return False
return True
def query(self, cmd, force=False):
"""
primary API function. Sends commands to the car, and
protects against sending unsupported commands.
"""
if self.status() == OBDStatus.NOT_CONNECTED:
logger.warning("Query failed, no connection available")
return OBDResponse()
# if the user forces, skip all checks
if not force and not self.test_cmd(cmd):
return OBDResponse()
# send command and retrieve message
logger.info("Sending command: %s" % str(cmd))
cmd_string = self.__build_command_string(cmd)
messages = self.interface.send_and_parse(cmd_string)
# if we're sending a new command, note it
# first check that the current command WASN'T sent as an empty CR
# (CR is added by the ELM327 class)
if cmd_string:
self.__last_command = cmd_string
# if we don't already know how many frames this command returns,
# log it, so we can specify it next time
if (cmd not in self.__frame_counts) and (cmd.bytes > 0):
self.__frame_counts[cmd] = sum([len(m.frames) for m in messages])
if not messages:
logger.info("No valid OBD Messages returned")
return OBDResponse()
return cmd(messages) # compute a response object
def __build_command_string(self, cmd):
""" assembles the appropriate command string """
cmd_string = cmd.command
# if we know the number of frames that this command returns,
# only wait for exactly that number. This avoids some harsh
# timeouts from the ELM, thus speeding up queries.
if self.fast and cmd.fast and (cmd in self.__frame_counts):
cmd_string += str(self.__frame_counts[cmd]).encode()
# if we sent this last time, just send a CR
# (CR is added by the ELM327 class)
if self.fast and (cmd_string == self.__last_command):
cmd_string = b""
return cmd_string
def query_multi(self, *cmds, **kwargs):
"""
primary API function. Sends multiple commands to
the car for CAN ONLY, and protects against sending
unsupported commands.
returns a tuple of OBDResponse objects in the order
of *cmds
"""
force = kwargs.get("force", False)
# setup a dict with empty responses for each command
responses = { cmd:OBDResponse() for cmd in cmds }
# helper function to convert the responses dict into a tuple
response = lambda: tuple(responses[cmd] for cmd in cmds)
# pre-flight checks
if self.status() == OBDStatus.NOT_CONNECTED:
logger.warning("Query failed, no connection available")
return response()
elif self.interface.protocol_id() not in CAN_PROTOCOLS:
logger.info("using query_multi over non-CAN protocol")
# slow fallback
return tuple( self.query(cmd, **kwargs) for cmd in cmds )
elif (len(cmds) == 0) or (len(cmds) > 6):
logger.warning("query_multi accepts between 1 and 6 commands")
return response()
elif not force and not all([self.test_cmd(cmd) for cmd in cmds]):
# check each command for support
# skip tests if forced
return response()
elif not all([cmd.mode == cmds[0].mode for cmd in cmds]):
logger.warning("commands for query_multi() must be of the same mode")
return response()
elif not all([cmd.ecu == cmds[0].ecu for cmd in cmds]):
logger.warning("commands for query_multi() must be listening for the same ECU")
return response()
elif not all([cmd.bytes > 0 for cmd in cmds]):
logger.warning("commands for query_multi() must return a definite number of bytes")
return response()
# build the request
cmd_string = cmds[0].command[:2] # mode part
for cmd in cmds:
cmd_string += cmd.command[2:]
# send request
messages = self.interface.send_and_parse(cmd_string)
if not messages:
logger.info("No valid OBD Messages returned")
return response()
# filter by ECU, in case multiple messages were returned
# (unlikely, though possible with multiple ECUs)
for_us = lambda m: (cmds[0].ecu & m.ecu) > 0
messages = list(filter(for_us, messages))
if len(messages) != 1:
logger.info("query_multi recieved no messages from the ECU of interest")
return response()
master = messages[0] # the message that contains our response
mode = master.data.pop(0) # the mode byte (ie, for mode 01 this would be 0x41)
cmds_by_pid = { cmd.pid:cmd for cmd in cmds }
# split the single message into multiple smaller messages.
# data is seperated by PID prefixes:
#
# 04 3F 05 44 0B 21 0C 17 B8
# [ ] [ ] [ ] [ ]
# ^^ first byte is always the PID
while len(master.data) > 0:
pid = master.data[0]
cmd = cmds_by_pid.get(pid)
# if the PID we pulled out wasn't one of the commands we were given
# then something is very wrong. Abort, and proceed with whatever
# we've decoded so far
if cmd is None:
logger.info("query_multi encountered unexpected PID: %s" % hex(pid)[2:])
break
l = cmd.bytes - 1 # this figure INCLUDES the PID byte
# if the message doesn't have enough data left in it to fulfill a
# PID, then abort, and proceed with whatever we've decoded so far
if l > len(master.data):
logger.info("query_multi did not recieve enough data")
break
# construct a new message
message = Message(master.frames) # copy of the original lines
message.ecu = master.ecu
message.data = master.data[:l]
message.data.insert(0, mode) # prepend the original mode byte
# NOTE: OBDCommands perform their own length checking
responses[cmd] = cmd([message])
# remove what we just read
master.data = master.data[l:]
# return responses in the order that they were specified
return response()