-
-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathsubsystem.py
More file actions
877 lines (663 loc) · 26.9 KB
/
subsystem.py
File metadata and controls
877 lines (663 loc) · 26.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
#########################################################################################
##
## SUBSYSTEM DEFINITION
## (subsystem.py)
##
## This module contains the 'Subsystem' and 'Interface' classes
## that manage subsystems that can be embedded within a larger simulation
##
#########################################################################################
# IMPORTS ===============================================================================
import numpy as np
from .connection import Connection
from .blocks._block import Block
from .optim.booster import ConnectionBooster
from .utils.graph import Graph
from .utils.register import Register
from .utils.portreference import PortReference
from .utils.deprecation import deprecated
from ._constants import (
SIM_ITERATIONS_MAX,
SIM_TOLERANCE_FPI,
LOG_ENABLE
)
# IO CLASS ==============================================================================
class Interface(Block):
"""Bare-bone block that serves as a data interface for the 'Subsystem' class.
It works like this:
- Internal blocks of the subsystem are connected to the inputs and outputs
of this Interface block via the internal connections.
- It behaves like a normal block (inherits the main 'Block' class methods).
- It implements some special methods to get and set the inputs and outputs
of the blocks, that are used to translate between the internal blocks of the
subsystem and the inputs and outputs of the subsystem.
- Handles data transfer to and from the internal subsystem blocks
to and from the inputs and outputs of the subsystem.
"""
def __len__(self):
return 0
def register_port_map(self, port_map_in, port_map_out):
"""Update the input and output registers of the interface block with port mappings
Parameters
----------
port_map_in : dict[str: int]
port alias mapping for block inputs
port_map_out : dict[str: int]
port alias mapping for block outputs
"""
self.input_port_labels = port_map_in
self.output_port_labels = port_map_out
#build registers with mappings
self.inputs = Register(mapping=port_map_in)
self.outputs = Register(mapping=port_map_out)
# MAIN SUBSYSTEM CLASS ==================================================================
class Subsystem(Block):
"""Subsystem class that holds its own blocks and connecions and
can natively interface with the main simulation loop.
IO interface is realized by a special 'Interface' block, that has extra
methods for setting and getting inputs and outputs and serves
as the interface of the internal blocks to the outside.
The subsystem doesnt use its 'inputs' and 'outputs' dicts directly.
It exclusively handles data transfer via the 'Interface' block.
This class can be used just like any other block during the simulation,
since it implements the required methods 'update' for the fixed-point
iteration (resolving algebraic loops with instant time blocks),
the 'step' method that performs timestepping (especially for dynamic
blocks with internal states) and the 'solve' method for solving the
implicit update equation for implicit solvers.
Example
-------
This is how we can wrap up multiple blocks within a subsystem.
In this case vanderpol system built from discrete components
instead of using an ODE block (in practice you should use
a monolithic ODE whenever possible due to performance).
.. code-block:: python
from pathsim import Subsystem, Interface, Connection
from pathsim.blocks import Integrator, Function
#van der Pol parameter
mu = 1000
#blocks in the subsystem
If = Interface() # this is the interface to the outside
I1 = Integrator(2)
I2 = Integrator(0)
Fn = Function(lambda x1, x2: mu*(1 - x1**2)*x2 - x1)
sub_blocks = [If, I1, I2, Fn]
#connections in the subsystem
sub_connections = [
Connection(I2, I1, Fn[1], If[1]),
Connection(I1, Fn, If),
Connection(Fn, I2)
]
#the subsystem acts just like a normal block
vdp = Subsystem(sub_blocks, sub_connections)
Parameters
----------
blocks : list[Block] | None
internal blocks of the subsystem
connections : list[Connection] | None
internal connections of the subsystem
events : list[Event] | None
tolerance_fpi : float
absolute tolerance for convergence of algebraic loops
default see ´SIM_TOLERANCE_FPI´ in ´_constants.py´
iterations_max : int
maximum allowed number of iterations for algebraic loop
solver, default see ´SIM_ITERATIONS_MAX´ in ´_constants.py´
Attributes
----------
interface : Interface
internal interface block for data transfer to the outside
graph : Graph
internal graph representation for fast system funcion
evluations using DAG with algebraic depths
boosters : None | list[ConnectionBooster]
list of boosters (fixed point accelerators) that wrap
algebraic loop closing connections assembled from the
system graph
"""
def __init__(self,
blocks=None,
connections=None,
events=None,
tolerance_fpi=SIM_TOLERANCE_FPI,
iterations_max=SIM_ITERATIONS_MAX
):
#internal integration engine -> initialized later
self.engine = None
#flag to set block (subsystem) active
self._active = True
#error tolerance for alg. loop solver
self.tolerance_fpi = tolerance_fpi
#max iterations for internal alg. loop solver
self.iterations_max = iterations_max
#operators for algebraic and dynamic components (not here)
self.op_alg = None
self.op_dyn = None
#internal graph representation -> initialized later
self.graph = None
self._graph_dirty = False
#internal algebraic loop solvers -> initialized later
self.boosters = None
#internal connecions
self.connections = list(connections) if connections else []
#collect and organize internal blocks
self.blocks = []
self.interface = None
if blocks:
for block in blocks:
if isinstance(block, Interface):
if self.interface is not None:
#interface block is already defined
raise ValueError("Subsystem can only have one 'Interface' block!")
self.interface = block
else:
#regular blocks
self.blocks.append(block)
#check if interface is defined
if self.interface is None:
raise ValueError("Subsystem 'blocks' list needs to contain 'Interface' block!")
#collect events if specified
self._events = [] if events is None else events
#assemble internal graph
self._assemble_graph()
def __len__(self):
"""Check if the Subsystem has algebraic passthrough by quering
the graph for an algebraic path from the interface to itself.
"""
is_alg = self.graph.is_algebraic_path(self.interface, self.interface)
return int(is_alg)
def __call__(self):
"""Recursively get the subsystems internal states of engines
(if available) of all internal blocks and nested subsystems
and the subsystem inputs and outputs as arrays for use outside.
Either for monitoring, postprocessing or event detection.
In any case this enables easy access to the current block state.
"""
_inputs = self.interface.outputs.to_array()
_outputs = self.interface.inputs.to_array()
_states = []
for block in self.blocks:
_i, _o, _s = block()
_states.append(_s)
return _inputs, _outputs, np.hstack(_states)
def __contains__(self, other):
"""Check if blocks and connections are already part of the subsystem
Paramters
---------
other : obj
object to check if its part of subsystem
Returns
-------
bool
"""
return other in self.blocks or other in self.connections
# adding and removing system components ---------------------------------------------------
def add_block(self, block):
"""Adds a new block to the subsystem.
This works dynamically for running simulations.
Parameters
----------
block : Block
block to add to the subsystem
"""
if block in self.blocks:
raise ValueError(f"block {block} already part of subsystem")
#initialize solver if available
if hasattr(self, '_Solver'):
block.set_solver(self._Solver, self._solver_parent, **self._solver_args)
if block.engine:
self._blocks_dyn.append(block)
self.blocks.append(block)
if self.graph:
self._graph_dirty = True
def remove_block(self, block):
"""Removes a block from the subsystem.
This works dynamically for running simulations.
Parameters
----------
block : Block
block to remove from the subsystem
"""
if block not in self.blocks:
raise ValueError(f"block {block} not part of subsystem")
self.blocks.remove(block)
#remove from dynamic list
if hasattr(self, '_blocks_dyn') and block in self._blocks_dyn:
self._blocks_dyn.remove(block)
if self.graph:
self._graph_dirty = True
def add_connection(self, connection):
"""Adds a new connection to the subsystem.
This works dynamically for running simulations.
Parameters
----------
connection : Connection
connection to add to the subsystem
"""
if connection in self.connections:
raise ValueError(f"{connection} already part of subsystem")
self.connections.append(connection)
if self.graph:
self._graph_dirty = True
def remove_connection(self, connection):
"""Removes a connection from the subsystem.
This works dynamically for running simulations.
Parameters
----------
connection : Connection
connection to remove from the subsystem
"""
if connection not in self.connections:
raise ValueError(f"{connection} not part of subsystem")
self.connections.remove(connection)
if self.graph:
self._graph_dirty = True
def add_event(self, event):
"""Adds an event to the subsystem.
This works dynamically for running simulations.
Parameters
----------
event : Event
event to add to the subsystem
"""
if event in self._events:
raise ValueError(f"{event} already part of subsystem")
self._events.append(event)
def remove_event(self, event):
"""Removes an event from the subsystem.
This works dynamically for running simulations.
Parameters
----------
event : Event
event to remove from the subsystem
"""
if event not in self._events:
raise ValueError(f"{event} not part of subsystem")
self._events.remove(event)
# subsystem graph assembly --------------------------------------------------------------
def _assemble_graph(self):
"""Assemble internal graph of subsystem for fast
algebraic evaluation during simulation.
"""
#reset all block inputs to clear stale values from removed connections
for block in self.blocks:
block.inputs.reset()
self.graph = Graph([*self.blocks, self.interface], self.connections)
self._graph_dirty = False
#create boosters for loop closing connections
if self.graph.has_loops:
self.boosters = [
ConnectionBooster(conn) for conn in self.graph.loop_closing_connections()
]
# methods for access to metadata --------------------------------------------------------
@property
def size(self):
"""Get size information from subsystem, recursively assembled
from internal blocks, including nested subsystems.
Returns
-------
sizes : tuple[int]
size of block (number of all internal blocks)
and number of internal states (from internal engines)
"""
total_n, total_nx = 0, 0
for block in self.blocks:
n, nx = block.size
total_n += n
total_nx += nx
return total_n, total_nx
# visualization -------------------------------------------------------------------------
def plot(self, *args, **kwargs):
"""Plot the simulation results by calling all the blocks
that have visualization capabilities such as the 'Scope'
and 'Spectrum'.
Parameters
----------
args : tuple
args for the plot methods
kwargs : dict
kwargs for the plot method
"""
for block in self.blocks:
block.plot(*args, **kwargs)
# extracting data -----------------------------------------------------------------------
@deprecated(version="1.0.0", reason="its against pathsims philosophy")
def collect(self):
"""Aggregate results from internal blocks."""
for block in self.blocks:
yield from block.collect()
# system management ---------------------------------------------------------------------
def reset(self):
"""Reset the subsystem interface and all internal blocks"""
#reset interface
self.interface.reset()
#reset internal blocks
for block in self.blocks:
block.reset()
@staticmethod
def _checkpoint_key(type_name, type_counts):
"""Generate a deterministic checkpoint key from block/event type
and occurrence index (e.g. 'Integrator_0', 'Scope_1').
"""
idx = type_counts.get(type_name, 0)
type_counts[type_name] = idx + 1
return f"{type_name}_{idx}"
def to_checkpoint(self, prefix, recordings=False):
"""Serialize subsystem state by recursively checkpointing internal blocks.
Parameters
----------
prefix : str
key prefix for NPZ arrays (assigned by simulation)
recordings : bool
include recording data (for Scope blocks)
Returns
-------
json_data : dict
JSON-serializable metadata
npz_data : dict
numpy arrays keyed by path
"""
json_data = {
"type": self.__class__.__name__,
"active": self._active,
"blocks": [],
}
npz_data = {}
#checkpoint interface block
if_json, if_npz = self.interface.to_checkpoint(f"{prefix}/interface", recordings=recordings)
json_data["interface"] = if_json
npz_data.update(if_npz)
#checkpoint internal blocks by type + insertion order
type_counts = {}
for block in self.blocks:
key = f"{prefix}/{self._checkpoint_key(block.__class__.__name__, type_counts)}"
b_json, b_npz = block.to_checkpoint(key, recordings=recordings)
b_json["_key"] = key
json_data["blocks"].append(b_json)
npz_data.update(b_npz)
#checkpoint subsystem-level events
if self._events:
evt_jsons = []
for i, event in enumerate(self._events):
evt_prefix = f"{prefix}/evt_{i}"
e_json, e_npz = event.to_checkpoint(evt_prefix)
evt_jsons.append(e_json)
npz_data.update(e_npz)
json_data["events"] = evt_jsons
return json_data, npz_data
def load_checkpoint(self, prefix, json_data, npz):
"""Restore subsystem state by recursively loading internal blocks.
Parameters
----------
prefix : str
key prefix for NPZ arrays (assigned by simulation)
json_data : dict
subsystem metadata from checkpoint JSON
npz : dict-like
numpy arrays from checkpoint NPZ
"""
#verify type
if json_data["type"] != self.__class__.__name__:
raise ValueError(
f"Checkpoint type mismatch: expected '{self.__class__.__name__}', "
f"got '{json_data['type']}'"
)
self._active = json_data["active"]
#restore interface block
if "interface" in json_data:
self.interface.load_checkpoint(f"{prefix}/interface", json_data["interface"], npz)
#restore internal blocks by type + insertion order
block_data = {b["_key"]: b for b in json_data.get("blocks", [])}
type_counts = {}
for block in self.blocks:
key = f"{prefix}/{self._checkpoint_key(block.__class__.__name__, type_counts)}"
if key in block_data:
block.load_checkpoint(key, block_data[key], npz)
#restore subsystem-level events
if self._events and "events" in json_data:
for i, (event, evt_data) in enumerate(zip(self._events, json_data["events"])):
event.load_checkpoint(f"{prefix}/evt_{i}", evt_data, npz)
def on(self):
"""Activate the subsystem and all internal blocks, sets the boolean
evaluation flag to 'True'.
"""
self._active = True
for block in self.blocks:
block.on()
def off(self):
"""Deactivate the subsystem and all internal blocks, sets the boolean
evaluation flag to 'False'. Also resets the subsystem.
"""
self._active = False
for block in self.blocks:
block.off()
self.reset()
def linearize(self, t):
"""Linearize the algebraic and dynamic components of the internal blocks.
This is done by linearizing the internal 'Operator' and 'DynamicOperator'
instances of all the internal blocks of the subsystem in the current system
operating point. The operators create 1st order taylor approximations
internally and use them on subsequent calls after linearization.
Recursively traverses down the hierarchy for nested subsystems and linearizes
all of them.
Parameters
----------
t : float
evaluation time
"""
for block in self.blocks:
block.linearize(t)
def delinearize(self):
"""Revert the linearization of the internal blocks."""
for block in self.blocks:
block.delinearize()
# methods for discrete event management -------------------------------------------------
@property
def events(self):
"""Recursively collect and return events spawned by the
internal blocks of the subsystem, for discrete time
blocks such as triggers / comparators, clocks, etc.
"""
_all_events = self._events.copy()
for block in self.blocks:
_all_events.extend(block.events)
return _all_events
# methods for inter-block data transfer -------------------------------------------------
@property
def inputs(self):
return self.interface.outputs
@property
def outputs(self):
return self.interface.inputs
# methods for data recording ------------------------------------------------------------
def sample(self, t, dt):
"""Update the internal connections again and sample data from
the internal blocks that implement the 'sample' method.
Parameters
----------
t : float
evaluation time
dt : float
integration timestep
"""
#record data if required
for block in self.blocks:
block.sample(t, dt)
# methods for block output and state updates --------------------------------------------
def update(self, t):
"""Update the instant time components of the internal blocks
to evaluate the (distributed) system equation.
Parameters
----------
t : float
evaluation time
"""
#lazy graph rebuild if dirty
if self._graph_dirty:
self._assemble_graph()
#evaluate DAG
self._dag(t)
#algebraic loops -> solve them
if self.graph.has_loops:
self._loops(t)
def _dag(self, t):
"""Update the directed acyclic graph components of the system.
Parameters
----------
t : float
evaluation time for system function
"""
#update interface outgoing connections
for connection in self.graph.outgoing_connections(self.interface):
if connection: connection.update()
#perform gauss-seidel iterations without error checking
for _, blocks_dag, connections_dag in self.graph.dag():
#update blocks at algebraic depth
for block in blocks_dag:
if block: block.update(t)
#update connenctions at algebraic depth (data transfer)
for connection in connections_dag:
if connection: connection.update()
def _loops(self, t):
"""Perform the algebraic loop solve of the subsystem using accelerated
fixed-point iterations on the broken loop directed graph.
Parameters
----------
t : float
evaluation time for system function
"""
#reset accelerators of loop closing connections
for con_booster in self.boosters:
con_booster.reset()
#perform solver iterations on algebraic loops
for iteration in range(1, self.iterations_max):
#iterate DAG depths of broken loops
for depth, blocks_loop, connections_loop in self.graph.loop():
#update blocks at algebraic depth
for block in blocks_loop:
if block: block.update(t)
#step accelerated connenctions at algebraic depth (data transfer)
for connection in connections_loop:
if connection: connection.update()
#step boosters of loop closing connections
max_err = 0.0
for con_booster in self.boosters:
err = con_booster.update()
if err > max_err:
max_err = err
#check convergence after first iteration
if max_err <= self.tolerance_fpi:
return
#not converged -> error
raise RuntimeError(
"algebraic loop in 'Subsystem' not converged (iters: {}, err: {})".format(
self.iterations_max, max_err)
)
# methods for blocks with integration engines -------------------------------------------
def solve(self, t, dt):
"""Advance solution of implicit update equation
for internal blocks.
Parameters
----------
t : float
evaluation time
dt : float
timestep
Returns
-------
max_error : float
maximum error of implicit update equaiton
"""
max_error = 0.0
for block in self._blocks_dyn:
if not block: continue
err = block.solve(t, dt)
if err > max_error:
max_error = err
return max_error
def step(self, t, dt):
"""Explicit component of timestep for internal blocks
including error propagation.
Notes
-----
This is pretty much an exact copy of the '_step' method
from the 'Simulation' class.
Parameters
----------
t : float
evaluation time
dt : float
timestep
Returns
-------
success : bool
indicator if the timestep was successful
max_error : float
maximum local truncation error from integration
scale : float
rescale factor for timestep
"""
#initial timestep rescale and error estimate
success, max_error_norm, min_scale = True, 0.0, None
#step blocks and get error estimates if available
for block in self._blocks_dyn:
#skip inactive internal blocks
if not block: continue
suc, err_norm, scl = block.step(t, dt)
#check solver stepping success
if not suc:
success = False
#update error tracking
if err_norm > max_error_norm:
max_error_norm = err_norm
#track minimum relevant scale directly (avoids list allocation)
if scl is not None:
if min_scale is None or scl < min_scale:
min_scale = scl
return success, max_error_norm, min_scale if min_scale is not None else 1.0
def set_solver(self, Solver, parent, **solver_args):
"""Initialize all blocks with solver for numerical integration
and additional args for the solver such as tolerances, etc.
If blocks already have solvers, change the numerical integrator
to the 'Solver' class.
Parameters
----------
Solver : Solver
numerical solver definition
parent : Solver
numerical solver instance as parent
solver_args : dict
args to initialize solver with
"""
#cache solver info for dynamic block additions
self._Solver = Solver
self._solver_parent = parent
self._solver_args = solver_args
#set integration engines and assemble list of dynamic blocks
self._blocks_dyn = []
for block in self.blocks:
block.set_solver(Solver, parent, **solver_args)
if block.engine:
self._blocks_dyn.append(block)
#only set dummy engine if subsystem has dynamic blocks
#this prevents purely algebraic subsystems from being treated as dynamic
if self._blocks_dyn:
self.engine = Solver(parent=parent, **solver_args)
else:
self.engine = None
def revert(self):
"""revert the internal blocks to the state
of the previous timestep
"""
for block in self._blocks_dyn:
if block: block.revert()
def buffer(self, dt):
"""buffer internal states of blocks with
internal integration engines
Parameters
----------
dt : float
evaluation time for buffering
"""
for block in self._blocks_dyn:
if block: block.buffer(dt)