forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplayer.py
More file actions
129 lines (104 loc) · 3.9 KB
/
player.py
File metadata and controls
129 lines (104 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
This module contains the generic :class:`LogReader` as
well as :class:`MessageSync` which plays back messages
in the recorded order an time intervals.
"""
import pathlib
from time import time, sleep
import typing
from pkg_resources import iter_entry_points
if typing.TYPE_CHECKING:
import can
from .generic import BaseIOHandler
from .asc import ASCReader
from .blf import BLFReader
from .canutils import CanutilsLogReader
from .csv import CSVReader
from .sqlite import SqliteReader
class LogReader(BaseIOHandler):
"""
Replay logged CAN messages from a file.
The format is determined from the file format which can be one of:
* .asc
* .blf
* .csv
* .db
* .log
Exposes a simple iterator interface, to use simply:
>>> for msg in LogReader("some/path/to/my_file.log"):
... print(msg)
.. note::
There are no time delays, if you want to reproduce the measured
delays between messages look at the :class:`can.MessageSync` class.
.. note::
This class itself is just a dispatcher, and any positional an keyword
arguments are passed on to the returned instance.
"""
fetched_plugins = False
message_readers = {
".asc": ASCReader,
".blf": BLFReader,
".csv": CSVReader,
".db": SqliteReader,
".log": CanutilsLogReader,
}
@staticmethod
def __new__(cls, filename: "can.typechecking.StringPathLike", *args, **kwargs):
"""
:param filename: the filename/path of the file to read from
:raises ValueError: if the filename's suffix is of an unknown file type
"""
if not LogReader.fetched_plugins:
LogReader.message_readers.update(
{
reader.name: reader.load()
for reader in iter_entry_points("can.io.message_reader")
}
)
LogReader.fetched_plugins = True
suffix = pathlib.PurePath(filename).suffix.lower()
try:
return LogReader.message_readers[suffix](filename, *args, **kwargs)
except KeyError:
raise ValueError(
f'No read support for this unknown log format "{suffix}"'
) from None
class MessageSync: # pylint: disable=too-few-public-methods
"""
Used to iterate over some given messages in the recorded time.
"""
def __init__(
self,
messages: typing.Iterable["can.Message"],
timestamps: bool = True,
gap: float = 0.0001,
skip: float = 60.0,
) -> None:
"""Creates an new **MessageSync** instance.
:param messages: An iterable of :class:`can.Message` instances.
:param timestamps: Use the messages' timestamps. If False, uses the *gap* parameter
as the time between messages.
:param gap: Minimum time between sent messages in seconds
:param skip: Skip periods of inactivity greater than this (in seconds).
"""
self.raw_messages = messages
self.timestamps = timestamps
self.gap = gap
self.skip = skip
def __iter__(self) -> typing.Generator["can.Message", None, None]:
playback_start_time = time()
recorded_start_time = None
for message in self.raw_messages:
# Work out the correct wait time
if self.timestamps:
if recorded_start_time is None:
recorded_start_time = message.timestamp
now = time()
current_offset = now - playback_start_time
recorded_offset_from_start = message.timestamp - recorded_start_time
remaining_gap = max(0.0, recorded_offset_from_start - current_offset)
sleep_period = max(self.gap, min(self.skip, remaining_gap))
else:
sleep_period = self.gap
sleep(sleep_period)
yield message