Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2575950
ENH: Pass Acceleration Data to Parachute Trigger Functions (RocketPy-…
ViniciusCMB Dec 4, 2025
7ec7ac9
ENH: Add acceleration-based parachute triggers with IMU sensor simula…
ViniciusCMB Dec 7, 2025
18aab57
ENH: Enable sensors+acceleration triggers in parachutes
ViniciusCMB Dec 8, 2025
0932277
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Dec 8, 2025
916b0fc
STY: Apply ruff format to flight.py
ViniciusCMB Dec 8, 2025
8e88fa8
STY: Fix pylint for parachute triggers
ViniciusCMB Dec 9, 2025
e84df40
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Dec 9, 2025
a7f5c3a
TST: Add comprehensive unit tests for acceleration-based parachute tr…
ViniciusCMB Dec 9, 2025
ad7e6d0
DOC: Add comprehensive documentation for acceleration-based parachute…
ViniciusCMB Dec 9, 2025
8f65022
STY: Apply ruff format to test_parachute_trigger_acceleration.py
ViniciusCMB Dec 9, 2025
d45b3a5
STY: Fix pylint warnings in test_parachute_triggers.py
ViniciusCMB Dec 9, 2025
6d79d62
FIX: Add missing u_dot parameter to direct triggerfunc calls
ViniciusCMB Dec 9, 2025
5619b3a
STY: Apply ruff format to flight.py
ViniciusCMB Dec 9, 2025
c66cffc
MNT: align acceleration trigger API with review feedback
ViniciusCMB Mar 22, 2026
56c1e51
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Mar 22, 2026
02d344d
STY: apply ruff format fixes
ViniciusCMB Mar 22, 2026
00e94b9
STY: apply ruff format fixes to flight.py
ViniciusCMB Mar 22, 2026
a0cb73d
STY: Apply ruff check after pip update
ViniciusCMB Mar 25, 2026
aba648b
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
ViniciusCMB Mar 25, 2026
6beab3c
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Mar 27, 2026
aa89d50
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Mar 27, 2026
6a9795d
Merge branch 'develop' into enh/acceleration-data-to-trigger-parachutes
MateusStano Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ENH: Add acceleration-based parachute triggers with IMU sensor simula…
…tion

This enhancement enables realistic avionics algorithms by providing access to
acceleration data (u_dot) within parachute trigger functions, simulating how
real flight computers use accelerometers (IMU) to detect flight phases.

* ENH: Pass acceleration data (u_dot) to parachute trigger callbacks
  - Flight class now computes and injects u_dot derivatives into triggers
  - Automatic detection of trigger signatures to optimize performance
  - Only calculates u_dot when trigger explicitly requires it

* ENH: Add built-in acceleration-based trigger functions
  - detect_apogee_acceleration: Detects apogee via vz ≈ 0 and az < 0
  - detect_motor_burnout: Detects motor shutdown by acceleration drop
  - detect_freefall: Detects free-fall condition (low total acceleration)
  - detect_liftoff: Detects liftoff by high total acceleration
  - altitude_trigger_factory: Factory for altitude-based triggers

* ENH: Implement optional accelerometer noise injection
  - New parameter acceleration_noise_function in Flight.__init__
  - Simulates MEMS accelerometer noise (typical 0.1-0.3 m/s²)
  - Applied transparently to all acceleration-based triggers

* TST: Add comprehensive unit tests for trigger evaluation
  - Validates u_dot computation and noise injection
  - Tests backward compatibility with legacy 3-parameter triggers
  - Ensures performance optimization skips u_dot for non-accelerating triggers

Notes
-----
- Triggers can now use signatures: (p, h, y) or (p, h, y, u_dot/acc/acceleration)
- Built-in string triggers: apogee, apogee_acc, burnout, freefall, liftoff
- Performance: u_dot computation doubles physics evaluations at trigger points
- Fully backward compatible with existing altitude and custom triggers
  • Loading branch information
ViniciusCMB committed Dec 7, 2025
commit 7ec7ac9e2ac7b2642350c7620bbd9d510577ce2c
230 changes: 206 additions & 24 deletions rocketpy/rocket/parachute.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you really need try/except blocks here?

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,145 @@
from ..mathutils.function import Function
from ..prints.parachute_prints import _ParachutePrints

def detect_motor_burnout(pressure, height, state_vector, u_dot):
"""Detect motor burnout by sudden drop in acceleration.

Returns True when vertical acceleration becomes significantly negative
(indicating end of propulsion phase) OR when total acceleration drops below
a threshold indicating coasting/free-fall has begun.
"""
try:
if u_dot is None or len(u_dot) < 6:
return False
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated

ax = float(u_dot[3])
ay = float(u_dot[4])
az = float(u_dot[5])

# Defensive checks for NaN/Inf
if not all(np.isfinite([ax, ay, az])):
return False

total_acc = np.sqrt(ax * ax + ay * ay + az * az)
if not np.isfinite(total_acc):
return False
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated

# Additional safety: ignore spurious low-accel readings at t~0 by
# requiring the rocket to be above a small altitude and still ascending
vz = float(state_vector[5]) if len(state_vector) > 5 else 0
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated
if not np.isfinite(vz):
return False

if height < 5.0 or vz <= 0.5:
return False

# Burnout detected when:
# 1. Vertical acceleration becomes very negative (end of thrust phase)
# 2. OR total acceleration drops below 2.0 m/s² (coasting detected)
return az < -8.0 or total_acc < 2.0
except (ValueError, TypeError, IndexError):
return False


def detect_apogee_acceleration(pressure, height, state_vector, u_dot):
"""Detect apogee using near-zero vertical velocity and negative vertical accel.

Apogee occurs when the rocket reaches its highest point, characterized by
vertical velocity approaching zero and negative (downward) acceleration.
"""
try:
if state_vector is None or u_dot is None:
return False
if len(state_vector) < 6 or len(u_dot) < 6:
return False

vz = float(state_vector[5])
az = float(u_dot[5])
if not all(np.isfinite([vz, az])):
return False

# Slightly more permissive thresholds to avoid spurious misses
return abs(vz) < 1.0 and az < -0.1
except (ValueError, TypeError, IndexError):
return False


def detect_freefall(pressure, height, state_vector, u_dot):
"""Detect free-fall when total acceleration magnitude is low.

Free-fall is characterized by acceleration magnitude close to gravitational
acceleration (approximately -g in the vertical direction), or when the rocket
is in a ballistic coasting phase with minimal thrust or drag effects.
"""
try:
if u_dot is None or len(u_dot) < 6:
return False

ax = float(u_dot[3])
ay = float(u_dot[4])
az = float(u_dot[5])
if not all(np.isfinite([ax, ay, az])):
return False

total_acc = np.sqrt(ax * ax + ay * ay + az * az)
if not np.isfinite(total_acc):
return False

# Require the rocket to be descending and above a small altitude to
# avoid false positives before launch.
vz = float(state_vector[5]) if len(state_vector) > 5 else 0
if not np.isfinite(vz):
return False

if height < 5.0 or vz >= -0.2:
return False

# More sensitive threshold: detect free-fall at lower acceleration
return total_acc < 11.5
except (ValueError, TypeError, IndexError):
return False


def detect_liftoff(pressure, height, state_vector, u_dot):
"""Detect liftoff by high total acceleration.

Liftoff is characterized by a sudden increase in acceleration as the motor
ignites and begins producing thrust.
"""
try:
if u_dot is None or len(u_dot) < 6:
return False

ax = float(u_dot[3])
ay = float(u_dot[4])
az = float(u_dot[5])
if not all(np.isfinite([ax, ay, az])):
return False

total_acc = np.sqrt(ax * ax + ay * ay + az * az)
if not np.isfinite(total_acc):
return False

return total_acc > 15.0
except (ValueError, TypeError, IndexError):
return False


def altitude_trigger_factory(target_altitude, require_descent=True):
"""Return a trigger that deploys when altitude <= target_altitude.

If require_descent is True, also require vertical velocity negative
(descending) to avoid firing during ascent.
"""

def trigger(pressure, height, state_vector, u_dot=None):
vz = float(state_vector[5])
if require_descent:
return (height <= target_altitude) and (vz < 0)
return height <= target_altitude

return trigger
Comment thread
ViniciusCMB marked this conversation as resolved.


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Please remove this function. It is not used currently

class Parachute:
"""Keeps information of the parachute, which is modeled as a hemispheroid.
Expand Down Expand Up @@ -222,47 +361,90 @@ def __init__(
def __evaluate_trigger_function(self, trigger):
"""This is used to set the triggerfunc attribute that will be used to
interact with the Flight class.

Notes
-----
The resulting triggerfunc always has signature (p, h, y, fourth) so
Flight can pass either the sensors list or the u_dot (derivative)
depending on the runtime behaviour.
"""
# pylint: disable=unused-argument, function-redefined
# The parachute is deployed by a custom function
if callable(trigger):
# work around for having added sensors to parachute triggers
# to avoid breaking changes
triggerfunc = trigger
sig = signature(triggerfunc)
if len(sig.parameters) == 3:

def triggerfunc(p, h, y, sensors):
return trigger(p, h, y)
# Helper to wrap any callable to the internal (p, h, y, fourth) API
def _make_wrapper(fn):
sig = signature(fn)
params = list(sig.parameters.keys())

self.triggerfunc = triggerfunc
# detect if user function expects acceleration-like argument
expects_udot = any(
name.lower() in ("u_dot", "udot", "acc", "acceleration")
for name in params[3:]
)

def wrapper(p, h, y, fourth): # fourth can be sensors or u_dot
# Support both 3- and 4-arg user functions
num_params = len(sig.parameters)
if num_params == 3:
return fn(p, h, y)
if num_params == 4:
return fn(p, h, y, fourth)
# fallback: try calling with four args, otherwise three
try:
return fn(p, h, y, fourth)
except TypeError:
return fn(p, h, y)

# attach metadata so Flight can decide whether to compute u_dot
wrapper._expects_udot = expects_udot
wrapper._wrapped_fn = fn
return wrapper

# Callable provided by user
if callable(trigger):
self.triggerfunc = _make_wrapper(trigger)
return

# Numeric altitude trigger
if isinstance(trigger, (int, float)):

elif isinstance(trigger, (int, float)):
# The parachute is deployed at a given height
def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument
# p = pressure considering parachute noise signal
# h = height above ground level considering parachute noise signal
# y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3]
return y[5] < 0 and h < trigger

self.triggerfunc = triggerfunc
return

# String: map to built-in triggers
if isinstance(trigger, str):
key = trigger.strip().lower()
mapping = {
"apogee_acc": detect_apogee_acceleration,
"burnout": detect_motor_burnout,
"freefall": detect_freefall,
"liftoff": detect_liftoff,
}
if key in mapping:
self.triggerfunc = _make_wrapper(mapping[key])
return

# Special case: "apogee" (legacy support)
if isinstance(trigger, str) and trigger.lower() == "apogee":

Comment on lines +374 to +376
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not legacy support, it is a valid feature

Suggested change
# Special case: "apogee" (legacy support)
if isinstance(trigger, str) and trigger.lower() == "apogee":
# Special case: "apogee"
if isinstance(trigger, str) and trigger.lower() == "apogee":

elif trigger.lower() == "apogee":
# The parachute is deployed at apogee
def triggerfunc(p, h, y, sensors): # pylint: disable=unused-argument
# p = pressure considering parachute noise signal
# h = height above ground level considering parachute noise signal
# y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3]
return y[5] < 0

self.triggerfunc = triggerfunc

else:
raise ValueError(
f"Unable to set the trigger function for parachute '{self.name}'. "
+ "Trigger must be a callable, a float value or the string 'apogee'. "
+ "See the Parachute class documentation for more information."
)
return

# If we reach this point, the trigger is invalid
raise ValueError(
f"Unable to set the trigger function for parachute '{self.name}'. "
+ "Trigger must be a callable, a float value or one of the strings "
+ "('apogee','burnout','freefall','liftoff'). "
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated
+ "See the Parachute class documentation for more information."
)

def __str__(self):
"""Returns a string representation of the Parachute class.
Expand Down
88 changes: 39 additions & 49 deletions rocketpy/simulation/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,9 +1145,10 @@ def _evaluate_parachute_trigger(
"""Evaluate parachute trigger, optionally computing u_dot (acceleration).

This helper preserves backward compatibility with existing trigger
signatures and will compute ``u_dot`` only if the original user
provided trigger function expects an acceleration argument (detected
by parameter name such as 'u_dot', 'udot', 'acc', or 'acceleration').
signatures and will compute ``u_dot`` only if the prepared wrapper in
`Parachute` or the original trigger signature requests an acceleration
argument (detected by parameter name such as 'u_dot', 'udot', 'acc',
or 'acceleration').

Parameters
----------
Expand All @@ -1171,55 +1172,44 @@ def _evaluate_parachute_trigger(
bool
True if trigger condition met, False otherwise.
"""
# If original trigger is not callable (e.g. numeric or 'apogee'),
# use the prepared wrapper in Parachute
trig_original = parachute.trigger
if not callable(trig_original):
return parachute.triggerfunc(pressure, height, y, sensors)

try:
sig = inspect.signature(trig_original)
params = list(sig.parameters.values())
except (ValueError, TypeError):
return parachute.triggerfunc(pressure, height, y, sensors)

# Detect whether the user-provided trigger expects acceleration
acc_names = {"u_dot", "udot", "acc", "acceleration"}
wants_u_dot = any(p.name in acc_names for p in params)
wants_sensors = any("sensor" in p.name for p in params)

if wants_u_dot:
# Compute derivative and add optional accelerometer noise
u_dot = np.array(derivative_func(t, y), dtype=float)
# Prefer the wrapper metadata: check if the prepared wrapper expects u_dot
triggerfunc = parachute.triggerfunc
expects_udot = getattr(triggerfunc, "_expects_udot", False)

# If the wrapper didn't advertise, inspect the original user trigger
if not expects_udot:
trig_original = getattr(parachute, "trigger", None)
if callable(trig_original):
try:
sig = inspect.signature(trig_original)
params = list(sig.parameters.values())
acc_names = {"u_dot", "udot", "acc", "acceleration"}
if any(p.name.lower() in acc_names for p in params):
expects_udot = True
except (ValueError, TypeError):
expects_udot = False

# If the trigger expects acceleration, compute u_dot and inject noise
if expects_udot:
try:
noise = np.asarray(self.acceleration_noise_function())
if noise.size == 3:
# u_dot layout: [vx, vy, vz, ax, ay, az, ...]
u_dot[3:6] = u_dot[3:6] + noise
u_dot = np.array(derivative_func(t, y), dtype=float)
Comment thread
ViniciusCMB marked this conversation as resolved.
Outdated
try:
noise = np.asarray(self.acceleration_noise_function())
if noise.size == 3:
# u_dot layout: [vx, vy, vz, ax, ay, az, ...]
u_dot[3:6] = u_dot[3:6] + noise
except Exception:
# ignore noise errors and continue
pass
fourth_arg = u_dot
except Exception:
# If noise function fails, ignore and continue
pass
# Fallback to sensors if derivative computation fails
fourth_arg = sensors
else:
fourth_arg = sensors

# Call user function according to detected signature
try:
if wants_sensors:
# common case: (p, h, y, sensors, u_dot)
return trig_original(pressure, height, y, sensors, u_dot)
# fallback by arg count
if len(params) == 4:
# could be (p, h, y, u_dot)
return trig_original(pressure, height, y, u_dot)
if len(params) == 5:
# could be (p, h, y, sensors, u_dot)
return trig_original(pressure, height, y, sensors, u_dot)
# try calling with u_dot as kwarg
return trig_original(pressure, height, y, u_dot=u_dot)
except TypeError:
# If calling the original fails, fallback to wrapper
return parachute.triggerfunc(pressure, height, y, sensors)

# Default: don't compute u_dot and use existing wrapper
return parachute.triggerfunc(pressure, height, y, sensors)
# Call the prepared wrapper (it will forward args to the user's fn)
return triggerfunc(pressure, height, y, fourth_arg)

def __init_solution_monitors(self):
# Initialize solution monitors
Expand Down
Loading
Loading