Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2575950
ENH: Pass Acceleration Data to Parachute Trigger Functions (RocketPy-…
ViniciusCMB Dec 4, 2025
7ec7ac9
ENH: Add acceleration-based parachute triggers with IMU sensor simula…
ViniciusCMB Dec 7, 2025
18aab57
ENH: Enable sensors+acceleration triggers in parachutes
ViniciusCMB Dec 8, 2025
0932277
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Dec 8, 2025
916b0fc
STY: Apply ruff format to flight.py
ViniciusCMB Dec 8, 2025
8e88fa8
STY: Fix pylint for parachute triggers
ViniciusCMB Dec 9, 2025
e84df40
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Dec 9, 2025
a7f5c3a
TST: Add comprehensive unit tests for acceleration-based parachute tr…
ViniciusCMB Dec 9, 2025
ad7e6d0
DOC: Add comprehensive documentation for acceleration-based parachute…
ViniciusCMB Dec 9, 2025
8f65022
STY: Apply ruff format to test_parachute_trigger_acceleration.py
ViniciusCMB Dec 9, 2025
d45b3a5
STY: Fix pylint warnings in test_parachute_triggers.py
ViniciusCMB Dec 9, 2025
6d79d62
FIX: Add missing u_dot parameter to direct triggerfunc calls
ViniciusCMB Dec 9, 2025
5619b3a
STY: Apply ruff format to flight.py
ViniciusCMB Dec 9, 2025
c66cffc
MNT: align acceleration trigger API with review feedback
ViniciusCMB Mar 22, 2026
56c1e51
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Mar 22, 2026
02d344d
STY: apply ruff format fixes
ViniciusCMB Mar 22, 2026
00e94b9
STY: apply ruff format fixes to flight.py
ViniciusCMB Mar 22, 2026
a0cb73d
STY: Apply ruff check after pip update
ViniciusCMB Mar 25, 2026
aba648b
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Mar 25, 2026
6beab3c
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Mar 27, 2026
aa89d50
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Mar 27, 2026
6a9795d
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ENH: Pass Acceleration Data to Parachute Trigger Functions (RocketPy-…
…Team#XXX)

* ENH: add u_dot parameter computation inside parachute trigger evaluation.

* ENH: add acceleration_noise_function parameter to Flight class for realistic IMU simulation.

* ENH: implement automatic detection of trigger signature to compute derivatives only when needed.

* TST: add unit tests for parachute trigger with acceleration data and noise injection.

* TST: add test runner for trigger acceleration validation without full test suite dependencies.
  • Loading branch information
ViniciusCMB committed Dec 4, 2025
commit 25759506f8006c2d699f6df42ad58edf8fe33fbc
137 changes: 118 additions & 19 deletions rocketpy/simulation/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from functools import cached_property

import numpy as np
import inspect
from scipy.integrate import BDF, DOP853, LSODA, RK23, RK45, OdeSolver, Radau

from rocketpy.simulation.flight_data_exporter import FlightDataExporter
Expand Down Expand Up @@ -94,6 +95,12 @@ class Flight:
function evaluation points and then interpolation is used to
calculate them and feed the triggers. Can greatly improve run
time in some cases.
Note
----
Calculating the derivative `u_dot` inside parachute trigger checks will
cause additional calls to the equations of motion (extra physics
evaluations). This increases CPU cost but enables realistic avionics
algorithms that rely on accelerometer data.
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated
Flight.terminate_on_apogee : bool
Whether to terminate simulation when rocket reaches apogee.
Flight.solver : scipy.integrate.LSODA
Expand Down Expand Up @@ -487,6 +494,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements
name="Flight",
equations_of_motion="standard",
ode_solver="LSODA",
acceleration_noise_function=None,
):
Comment thread
ViniciusCMB marked this conversation as resolved.
"""Run a trajectory simulation.

Expand Down Expand Up @@ -600,6 +608,13 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements
self.name = name
self.equations_of_motion = equations_of_motion
self.ode_solver = ode_solver
# Function that returns accelerometer noise vector [ax_noise, ay_noise, az_noise]
# It should be a callable that returns an array-like of length 3.
self.acceleration_noise_function = (
acceleration_noise_function
if acceleration_noise_function is not None
else (lambda: np.zeros(3))
)
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated

# Controller initialization
self.__init_controllers()
Expand Down Expand Up @@ -739,11 +754,14 @@ def __simulate(self, verbose):
) = self.__calculate_and_save_pressure_signals(
parachute, node.t, self.y_sol[2]
)
if parachute.triggerfunc(
if self._evaluate_parachute_trigger(
parachute,
noisy_pressure,
height_above_ground_level,
self.y_sol,
self.sensors,
phase.derivative,
self.t,
):
# Remove parachute from flight parachutes
self.parachutes.remove(parachute)
Expand Down Expand Up @@ -772,8 +790,7 @@ def __simulate(self, verbose):
lambda self, parachute_porosity=parachute.porosity: setattr(
self, "parachute_porosity", parachute_porosity
),
lambda self,
added_mass_coefficient=parachute.added_mass_coefficient: setattr(
lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr(
self,
"parachute_added_mass_coefficient",
added_mass_coefficient,
Expand Down Expand Up @@ -998,11 +1015,14 @@ def __simulate(self, verbose):
)

# Check for parachute trigger
if parachute.triggerfunc(
if self._evaluate_parachute_trigger(
parachute,
noisy_pressure,
height_above_ground_level,
overshootable_node.y_sol,
self.sensors,
phase.derivative,
overshootable_node.t,
):
# Remove parachute from flight parachutes
self.parachutes.remove(parachute)
Expand All @@ -1020,30 +1040,25 @@ def __simulate(self, verbose):
i += 1
# Create flight phase for time after inflation
callbacks = [
lambda self,
parachute_cd_s=parachute.cd_s: setattr(
lambda self, parachute_cd_s=parachute.cd_s: setattr(
self, "parachute_cd_s", parachute_cd_s
),
lambda self,
parachute_radius=parachute.radius: setattr(
lambda self, parachute_radius=parachute.radius: setattr(
self,
"parachute_radius",
parachute_radius,
),
lambda self,
parachute_height=parachute.height: setattr(
lambda self, parachute_height=parachute.height: setattr(
self,
"parachute_height",
parachute_height,
),
lambda self,
parachute_porosity=parachute.porosity: setattr(
lambda self, parachute_porosity=parachute.porosity: setattr(
self,
"parachute_porosity",
parachute_porosity,
),
lambda self,
added_mass_coefficient=parachute.added_mass_coefficient: setattr(
lambda self, added_mass_coefficient=parachute.added_mass_coefficient: setattr(
self,
"parachute_added_mass_coefficient",
added_mass_coefficient,
Expand Down Expand Up @@ -1124,6 +1139,88 @@ def __calculate_and_save_pressure_signals(self, parachute, t, z):

return noisy_pressure, height_above_ground_level

def _evaluate_parachute_trigger(
self, parachute, pressure, height, y, sensors, derivative_func, t
):
"""Evaluate parachute trigger, optionally computing u_dot (acceleration).

This helper preserves backward compatibility with existing trigger
signatures and will compute ``u_dot`` only if the original user
provided trigger function expects an acceleration argument (detected
by parameter name such as 'u_dot', 'udot', 'acc', or 'acceleration').

Parameters
----------
parachute : Parachute
Parachute object.
pressure : float
Noisy pressure value passed to trigger.
height : float
Height above ground level passed to trigger.
y : array
State vector at evaluation time.
sensors : list
Sensors list passed to trigger.
derivative_func : callable
Function to compute derivatives: derivative_func(t, y)
t : float
Time at which to evaluate derivatives.

Returns
-------
bool
True if trigger condition met, False otherwise.
"""
# If original trigger is not callable (e.g. numeric or 'apogee'),
# use the prepared wrapper in Parachute
trig_original = parachute.trigger
if not callable(trig_original):
return parachute.triggerfunc(pressure, height, y, sensors)

try:
sig = inspect.signature(trig_original)
params = list(sig.parameters.values())
except (ValueError, TypeError):
return parachute.triggerfunc(pressure, height, y, sensors)

# Detect whether the user-provided trigger expects acceleration
acc_names = {"u_dot", "udot", "acc", "acceleration"}
wants_u_dot = any(p.name in acc_names for p in params)
wants_sensors = any("sensor" in p.name for p in params)

if wants_u_dot:
# Compute derivative and add optional accelerometer noise
u_dot = np.array(derivative_func(t, y), dtype=float)
try:
noise = np.asarray(self.acceleration_noise_function())
if noise.size == 3:
# u_dot layout: [vx, vy, vz, ax, ay, az, ...]
u_dot[3:6] = u_dot[3:6] + noise
except Exception:
# If noise function fails, ignore and continue
pass

# Call user function according to detected signature
try:
if wants_sensors:
# common case: (p, h, y, sensors, u_dot)
return trig_original(pressure, height, y, sensors, u_dot)
# fallback by arg count
if len(params) == 4:
# could be (p, h, y, u_dot)
return trig_original(pressure, height, y, u_dot)
if len(params) == 5:
# could be (p, h, y, sensors, u_dot)
return trig_original(pressure, height, y, sensors, u_dot)
# try calling with u_dot as kwarg
return trig_original(pressure, height, y, u_dot=u_dot)
except TypeError:
# If calling the original fails, fallback to wrapper
return parachute.triggerfunc(pressure, height, y, sensors)

# Default: don't compute u_dot and use existing wrapper
return parachute.triggerfunc(pressure, height, y, sensors)

def __init_solution_monitors(self):
# Initialize solution monitors
self.out_of_rail_time = 0
Expand Down Expand Up @@ -1439,7 +1536,9 @@ def udot_rail2(self, t, u, post_processing=False): # pragma: no cover
# Hey! We will finish this function later, now we just can use u_dot
return self.u_dot_generalized(t, u, post_processing=post_processing)

def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements
def u_dot(
self, t, u, post_processing=False
): # pylint: disable=too-many-locals,too-many-statements
"""Calculates derivative of u state vector with respect to time
when rocket is flying in 6 DOF motion during ascent out of rail
and descent without parachute.
Expand Down Expand Up @@ -1759,7 +1858,9 @@ def u_dot(self, t, u, post_processing=False): # pylint: disable=too-many-locals

return u_dot

def u_dot_generalized(self, t, u, post_processing=False): # pylint: disable=too-many-locals,too-many-statements
def u_dot_generalized(
self, t, u, post_processing=False
): # pylint: disable=too-many-locals,too-many-statements
"""Calculates derivative of u state vector with respect to time when the
rocket is flying in 6 DOF motion in space and significant mass variation
effects exist. Typical flight phases include powered ascent after launch
Expand Down Expand Up @@ -3571,9 +3672,7 @@ def add(self, flight_phase, index=None): # TODO: quite complex method
new_index = (
index - 1
if flight_phase.t < previous_phase.t
else index + 1
if flight_phase.t > next_phase.t
else index
else index + 1 if flight_phase.t > next_phase.t else index
)
flight_phase.t += adjust
self.add(flight_phase, new_index)
Expand Down
139 changes: 139 additions & 0 deletions tests/unit/_run_parachute_trigger_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import numpy as np
import traceback

from rocketpy.simulation.flight import Flight
from rocketpy.rocket.parachute import Parachute


def _test_trigger_receives_u_dot_and_noise():
def derivative_func(t, y):
return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0])

recorded = {}

def user_trigger(p, h, y, u_dot):
recorded["u_dot"] = np.array(u_dot)
return True

parachute = Parachute(
name="test",
cd_s=1.0,
trigger=user_trigger,
sampling_rate=100,
)

dummy = type("D", (), {})()
dummy.acceleration_noise_function = lambda: np.array([0.1, -0.2, 0.3])

res = Flight._evaluate_parachute_trigger(
dummy,
parachute,
pressure=0.0,
height=10.0,
y=np.zeros(13),
sensors=[],
derivative_func=derivative_func,
t=0.0,
)

assert res is True
assert "u_dot" in recorded
assert np.allclose(recorded["u_dot"][3:6], np.array([1.1, 1.8, 3.3]))


def _test_trigger_with_sensors_and_u_dot():
def derivative_func(t, y):
return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0])

recorded = {}

def user_trigger(p, h, y, sensors, u_dot):
recorded["sensors"] = sensors
recorded["u_dot"] = np.array(u_dot)
return False

parachute = Parachute(
name="test2",
cd_s=1.0,
trigger=user_trigger,
sampling_rate=100,
)

dummy = type("D", (), {})()
dummy.acceleration_noise_function = lambda: np.array([0.0, 0.0, 0.0])

res = Flight._evaluate_parachute_trigger(
dummy,
parachute,
pressure=0.0,
height=5.0,
y=np.zeros(13),
sensors=["s1"],
derivative_func=derivative_func,
t=1.234,
)

assert res is False
assert recorded["sensors"] == ["s1"]
assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0]))


def _test_legacy_trigger_does_not_compute_u_dot():
def derivative_func(t, y):
raise RuntimeError("derivative should not be called for legacy triggers")

called = {}

def legacy_trigger(p, h, y):
called["ok"] = True
return True

parachute = Parachute(
name="legacy",
cd_s=1.0,
trigger=legacy_trigger,
sampling_rate=100,
)

dummy = type("D", (), {})()
dummy.acceleration_noise_function = lambda: np.zeros(3)

res = Flight._evaluate_parachute_trigger(
dummy,
parachute,
pressure=0.0,
height=0.0,
y=np.zeros(13),
sensors=[],
derivative_func=derivative_func,
t=0.0,
)

assert res is True
assert called.get("ok", False) is True


def run_all():
tests = [
_test_trigger_receives_u_dot_and_noise,
_test_trigger_with_sensors_and_u_dot,
_test_legacy_trigger_does_not_compute_u_dot,
]
failures = 0
for t in tests:
name = t.__name__
try:
t()
print(f"[PASS] {name}")
except Exception:
failures += 1
print(f"[FAIL] {name}")
traceback.print_exc()
if failures:
print(f"{failures} test(s) failed")
raise SystemExit(1)
print("All tests passed")


if __name__ == "__main__":
run_all()
Loading