-
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathfmu.py
More file actions
313 lines (236 loc) · 10.9 KB
/
fmu.py
File metadata and controls
313 lines (236 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#########################################################################################
##
## FUNCTIONAL MOCK-UP UNIT (FMU) BLOCKS
## (pathsim/blocks/fmu.py)
##
#########################################################################################
# IMPORTS ===============================================================================
import bisect
from ._block import Block
from .dynsys import DynamicalSystem
from ..events.schedule import Schedule, ScheduleList
from ..events.zerocrossing import ZeroCrossing
from ..utils.fmuwrapper import FMUWrapper
# BLOCKS ================================================================================
class CoSimulationFMU(Block):
"""Co-Simulation FMU block using FMPy with support for FMI 2.0 and FMI 3.0.
This block wraps an FMU (Functional Mock-up Unit) for co-simulation.
The FMU encapsulates a simulation model that can be executed independently
and synchronized with the main simulation at discrete communication points.
Parameters
----------
fmu_path : str
path to the FMU file (.fmu)
instance_name : str, optional
name for the FMU instance (default: 'fmu_instance')
start_values : dict, optional
dictionary of variable names and their initial values
dt : float, optional
communication step size for co-simulation. If None, uses the FMU's
default experiment step size if available.
Attributes
----------
fmu_wrapper : FMUWrapper
version-agnostic FMU wrapper instance providing access to model_description,
fmu, and other FMPy objects for advanced usage
dt : float
communication step size
"""
def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None, dt=None):
super().__init__()
self.start_values = start_values
# Create and initialize FMU wrapper
self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='cosimulation')
self.fmu_wrapper.initialize(start_values, start_time=0.0)
# Determine step size
self.dt = dt if dt is not None else self.fmu_wrapper.default_step_size
if self.dt is None:
raise ValueError("No step size provided and FMU has no default experiment step size")
# Setup block I/O from FMU variables
self.inputs, self.outputs = self.fmu_wrapper.create_port_registers()
# Scheduled co-simulation step
self.events = [
Schedule(
t_start=0,
t_period=self.dt,
func_act=self._step_fmu
)
]
# Read initial outputs
self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array())
def _step_fmu(self, t):
"""Perform one FMU co-simulation step."""
self.fmu_wrapper.set_inputs_from_array(self.inputs.to_array())
result = self.fmu_wrapper.do_step(
current_time=t,
step_size=self.dt
)
if result.terminate_simulation:
raise RuntimeError("FMU requested simulation termination")
self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array())
def reset(self):
"""Reset the FMU instance."""
super().reset()
self.fmu_wrapper.reset()
self.fmu_wrapper.initialize(self.start_values, start_time=0.0)
self.outputs.update_from_array(self.fmu_wrapper.get_outputs_as_array())
def __len__(self):
"""FMU is a discrete time source-like block without direct passthrough."""
return 0
class ModelExchangeFMU(DynamicalSystem):
"""Model Exchange FMU block using FMPy with support for FMI 2.0 and FMI 3.0.
This block wraps an FMU (Functional Mock-up Unit) for model exchange.
The FMU provides the right-hand side of an ODE system that is integrated
by PathSim's numerical solvers. Internal FMU events (state events, time
events, and step completion events) are translated to PathSim events.
Parameters
----------
fmu_path : str
path to the FMU file (.fmu)
instance_name : str, optional
name for the FMU instance (default: 'fmu_instance')
start_values : dict, optional
dictionary of variable names and their initial values
tolerance : float, optional
tolerance for event detection (default: 1e-10)
verbose : bool, optional
enable verbose output (default: False)
Attributes
----------
fmu_wrapper : FMUWrapper
version-agnostic FMU wrapper instance providing access to model_description,
fmu, and other FMPy objects for advanced usage
time_event : ScheduleList or None
dynamic time event for FMU-scheduled events
"""
def __init__(self, fmu_path, instance_name="fmu_instance", start_values=None,
tolerance=1e-10, verbose=False):
self.tolerance = tolerance
self.verbose = verbose
self.start_values = start_values
# Create and initialize FMU wrapper
self.fmu_wrapper = FMUWrapper(fmu_path, instance_name, mode='model_exchange')
event_info = self.fmu_wrapper.initialize(start_values, start_time=0.0, tolerance=tolerance)
# Store initial time event if defined
self._initial_time_event = (
event_info.next_event_time
if event_info and event_info.next_event_time_defined
else None
)
# Enter continuous time mode
self.fmu_wrapper.enter_continuous_time_mode()
# Initialize parent DynamicalSystem with FMU dynamics
# Use FMU's Jacobian if available (providesDirectionalDerivative=true)
jac_func = self._get_jacobian if self.fmu_wrapper.provides_jacobian else None
super().__init__(
func_dyn=self._get_derivatives,
func_alg=self._get_outputs,
initial_value=self.fmu_wrapper.get_continuous_states(),
jac_dyn=jac_func
)
# Setup block I/O from FMU variables
self.inputs, self.outputs = self.fmu_wrapper.create_port_registers()
# Initialize time event manager
self.time_event = None
# Create state event (zero-crossing) for each event indicator
for i in range(self.fmu_wrapper.n_event_indicators):
self.events.append(
ZeroCrossing(
func_evt=lambda t, idx=i: self._get_event_indicator(idx),
func_act=self._handle_event,
tolerance=self.tolerance
)
)
# Cache capability flag for sample() performance
self._needs_completed_integrator_step = self.fmu_wrapper.needs_completed_integrator_step
# Schedule initial time event if any
if self._initial_time_event is not None:
self._update_time_events(self._initial_time_event)
def _get_derivatives(self, x, u, t):
"""Evaluate FMU derivatives (RHS of ODE)."""
if self.fmu_wrapper.n_states == 0:
return []
self.fmu_wrapper.set_time(t)
self.fmu_wrapper.set_continuous_states(x)
self.fmu_wrapper.set_inputs_from_array(u)
return self.fmu_wrapper.get_derivatives()
def _get_jacobian(self, x, u, t):
"""Evaluate Jacobian of FMU derivatives w.r.t. states (∂ẋ/∂x)."""
self.fmu_wrapper.set_time(t)
self.fmu_wrapper.set_continuous_states(x)
self.fmu_wrapper.set_inputs_from_array(u)
return self.fmu_wrapper.get_state_jacobian()
def _get_outputs(self, x, u, t):
"""Evaluate FMU outputs (algebraic part)."""
self.fmu_wrapper.set_time(t)
self.fmu_wrapper.set_continuous_states(x)
self.fmu_wrapper.set_inputs_from_array(u)
return self.fmu_wrapper.get_outputs_as_array()
def _get_event_indicator(self, idx):
"""Get value of a specific event indicator."""
return self.fmu_wrapper.get_event_indicators()[idx]
def _handle_event(self, t):
"""Handle FMU event with fixed-point iteration for discrete states."""
if self.verbose:
print(f"FMU event detected at t={t}")
self.fmu_wrapper.enter_event_mode()
# Iterate until discrete states stabilize
while True:
event_info = self.fmu_wrapper.update_discrete_states()
if event_info.terminate_simulation:
raise RuntimeError("FMU requested simulation termination")
if not event_info.discrete_states_need_update:
break
self.fmu_wrapper.enter_continuous_time_mode()
# Update continuous states if changed
if event_info.values_changed:
x_new = self.fmu_wrapper.get_continuous_states()
self.engine.set(x_new)
if self.verbose:
print(f"Continuous states updated after event: {x_new}")
# Schedule new time events
if event_info.next_event_time_defined:
self._update_time_events(event_info.next_event_time)
if self.verbose:
print(f"Next time event scheduled at t={event_info.next_event_time}")
def _update_time_events(self, next_time):
"""Update or create time event schedule."""
if self.time_event is None:
self.time_event = ScheduleList(
times_evt=[next_time],
func_act=self._handle_event,
tolerance=self.tolerance
)
self.events.append(self.time_event)
elif next_time not in self.time_event.times_evt:
bisect.insort(self.time_event.times_evt, next_time)
def sample(self, t, dt):
"""Sample block after successful timestep and handle FMU step completion events."""
super().sample(t, dt)
if self._needs_completed_integrator_step:
enter_event_mode, terminate_simulation = self.fmu_wrapper.completed_integrator_step()
if terminate_simulation:
raise RuntimeError("FMU requested simulation termination")
if enter_event_mode:
if self.verbose:
print(f"Step completion event at t={t}")
self._handle_event(t)
def reset(self):
"""Reset the FMU instance."""
super().reset()
self.fmu_wrapper.reset()
# Re-initialize FMU
event_info = self.fmu_wrapper.initialize(
self.start_values, start_time=0.0, tolerance=self.tolerance
)
self.fmu_wrapper.enter_continuous_time_mode()
# Reset to initial states
self.engine.set(self.fmu_wrapper.get_continuous_states())
# Reset time events
if self.time_event is not None:
self.time_event.times_evt.clear()
# Schedule initial time event from re-initialization or cached initial
if event_info and event_info.next_event_time_defined:
self._update_time_events(event_info.next_event_time)
elif self._initial_time_event is not None:
self._update_time_events(self._initial_time_event)