-
-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathdelay.py
More file actions
216 lines (158 loc) · 6.16 KB
/
delay.py
File metadata and controls
216 lines (158 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#########################################################################################
##
## TIME DOMAIN DELAY BLOCK
## (blocks/delay.py)
##
#########################################################################################
# IMPORTS ===============================================================================
import numpy as np
from collections import deque
from ._block import Block
from ..utils.adaptivebuffer import AdaptiveBuffer
from ..events.schedule import Schedule
from ..utils.mutable import mutable
# BLOCKS ================================================================================
@mutable
class Delay(Block):
"""Delays the input signal by a time constant 'tau' in seconds.
Supports two modes of operation:
**Continuous mode** (default, ``sampling_period=None``):
Uses an adaptive interpolating buffer for continuous-time delay.
.. math::
y(t) =
\\begin{cases}
x(t - \\tau) & , t \\geq \\tau \\\\
0 & , t < \\tau
\\end{cases}
**Discrete mode** (``sampling_period`` provided):
Uses a ring buffer with scheduled sampling events for N-sample delay,
where ``N = round(tau / sampling_period)``.
.. math::
y[k] = x[k - N]
Note
----
In continuous mode, the internal adaptive buffer uses interpolation for
the evaluation. This is required to be compatible with variable step solvers.
It has a drawback however. The order of the ode solver used will degrade
when this block is used, due to the interpolation.
Note
----
This block supports vector input, meaning we can have multiple parallel
delay paths through this block.
Example
-------
Continuous-time delay:
.. code-block:: python
#5 time units delay
D = Delay(tau=5)
Discrete-time N-sample delay (10 samples):
.. code-block:: python
D = Delay(tau=0.01, sampling_period=0.001)
Parameters
----------
tau : float
delay time constant in seconds
sampling_period : float, None
sampling period for discrete mode, default is continuous mode
Attributes
----------
_buffer : AdaptiveBuffer
internal interpolatable adaptive rolling buffer (continuous mode)
_ring : deque
internal ring buffer for N-sample delay (discrete mode)
"""
def __init__(self, tau=1e-3, sampling_period=None):
super().__init__()
#time delay in seconds
self.tau = tau
#params for sampling
self.sampling_period = sampling_period
if sampling_period is None:
#continuous mode: adaptive buffer with interpolation
self._buffer = AdaptiveBuffer(self.tau)
else:
#discrete mode: ring buffer with N-sample delay
self._n = max(1, round(self.tau / self.sampling_period))
self._ring = deque([0.0] * self._n, maxlen=self._n + 1)
#flag to indicate this is a timestep to sample
self._sample_next_timestep = False
#internal scheduled event for periodic sampling
def _sample(t):
self._sample_next_timestep = True
self.events = [
Schedule(
t_start=0,
t_period=sampling_period,
func_act=_sample
)
]
def __len__(self):
#no passthrough by definition
return 0
def reset(self):
super().reset()
if self.sampling_period is None:
#clear the adaptive buffer
self._buffer.clear()
else:
#clear the ring buffer
self._ring.clear()
self._ring.extend([0.0] * self._n)
def to_checkpoint(self, prefix, recordings=False):
"""Serialize Delay state including buffer data."""
json_data, npz_data = super().to_checkpoint(prefix, recordings=recordings)
json_data["sampling_period"] = self.sampling_period
if self.sampling_period is None:
#continuous mode: adaptive buffer
npz_data.update(self._buffer.to_checkpoint(f"{prefix}/buffer"))
else:
#discrete mode: ring buffer
npz_data[f"{prefix}/ring"] = np.array(list(self._ring))
json_data["_sample_next_timestep"] = self._sample_next_timestep
return json_data, npz_data
def load_checkpoint(self, prefix, json_data, npz):
"""Restore Delay state including buffer data."""
super().load_checkpoint(prefix, json_data, npz)
if self.sampling_period is None:
#continuous mode
self._buffer.load_checkpoint(npz, f"{prefix}/buffer")
else:
#discrete mode
ring_key = f"{prefix}/ring"
if ring_key in npz:
self._ring.clear()
self._ring.extend(npz[ring_key].tolist())
self._sample_next_timestep = json_data.get("_sample_next_timestep", False)
def update(self, t):
"""Evaluation of the buffer at different times
via interpolation (continuous) or ring buffer lookup (discrete).
Parameters
----------
t : float
evaluation time
"""
if self.sampling_period is None:
#continuous mode: retrieve value from buffer
y = self._buffer.get(t)
self.outputs.update_from_array(y)
else:
#discrete mode: output the oldest value in the ring buffer
self.outputs[0] = self._ring[0]
def sample(self, t, dt):
"""Sample input values and time of sampling
and add them to the buffer.
Parameters
----------
t : float
evaluation time for sampling
dt : float
integration timestep
"""
if self.sampling_period is None:
#continuous mode: add new value to buffer
self._buffer.add(t, self.inputs.to_array())
else:
#discrete mode: only sample on scheduled events
if self._sample_next_timestep:
self._ring.append(self.inputs[0])
self._sample_next_timestep = False