Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 59 additions & 23 deletions imap_processing/idex/idex_l2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def idex_l2a(l1b_dataset: xr.Dataset, ancillary_files: dict) -> xr.Dataset:
vectorize=True,
output_dtypes=[np.float64] * 6,
Comment thread
lacoak21 marked this conversation as resolved.
keep_attrs=True,
kwargs={"waveform_name": waveform},
)
# Calculate mass and velocity estimates
velocity_mass_results = xr.apply_ufunc(
Expand Down Expand Up @@ -749,7 +750,8 @@ def calculate_area_under_emg(time_slice: np.ndarray, param: np.ndarray) -> float
def estimate_dust_mass(
low_sampling_time: xr.DataArray,
target_signal: xr.DataArray,
remove_noise: bool = True,
remove_noise: bool = False,
waveform_name: str = "",
) -> tuple[NDArray, float, float, float, NDArray]:
Comment on lines 750 to 755
"""
Filter and fit the target or ion grid signals to get the total dust impact charge.
Expand All @@ -763,6 +765,8 @@ def estimate_dust_mass(
remove_noise : bool
If true, attempt to remove background noise, otherwise fit on the unfiltered
signal.
waveform_name : str
Channel name used to select channel-specific fit bounds.

Returns
-------
Expand All @@ -782,34 +786,62 @@ def estimate_dust_mass(
"""
signal = np.array(target_signal.data)
time = np.array(low_sampling_time.data)
good_mask = np.logical_and(
time >= BaselineNoiseTime.START,
Comment thread
lacoak21 marked this conversation as resolved.
time <= BaselineNoiseTime.STOP,
)
# window_start = float(np.min(time))
window_stop = float(np.min(time)) + 5.0
# good_mask = np.logical_and(time >= window_start, time <= window_stop)
good_mask = time <= window_stop
if not np.any(good_mask):
logger.warning(
"Unable to find baseline noise. "
f"There is no signal from {BaselineNoiseTime.START} to "
f"{BaselineNoiseTime.STOP} ns."
f"There is no signal in the first 5 microseconds of the waveform "
f"(Beginning to {window_stop} us)."
)
if remove_noise:
# Remove noise due to "microphonics"
signal = remove_signal_noise(time, signal, good_mask)
Comment thread
lacoak21 marked this conversation as resolved.
# Time before image charge
pre = -2.0
# Get signal values where the time is before the image charge
signal_before_imapact = signal[time < pre]
# Center the baseline signal around zero
signal_baseline = signal_before_imapact - np.mean(signal_before_imapact)
logger.debug(
"estimate_dust_mass fits the raw low-rate waveform directly; "
"remove_noise is ignored for this fit path."
)
Comment thread
lacoak21 marked this conversation as resolved.
signal_before_impact = signal[good_mask]
baseline_mean = float(np.mean(signal_before_impact))
channel_name = waveform_name or str(getattr(target_signal, "name", ""))

# Initial Guess for the parameters of the ion grid signal
time_of_impact = 0.0 # Time of dust hit
constant_offset = 0.0 # Initial baseline
amplitude: float = np.max(signal) # Signal height
rise_time = 0.371 # How fast the signal rises (s)
discharge_time = 0.371 # How fast signal decays (s)

p0 = [time_of_impact, constant_offset, amplitude, rise_time, discharge_time]
constant_offset = baseline_mean
signal_relative_to_baseline = np.asarray(signal - baseline_mean, dtype=float)
if np.any(np.isfinite(signal_relative_to_baseline)):
amplitude = float(
signal_relative_to_baseline[
np.nanargmax(np.abs(signal_relative_to_baseline))
]
)
else:
amplitude = float("nan")
if channel_name != "Ion_Grid" and (not np.isfinite(amplitude) or amplitude <= 0.0):
amplitude = float(np.max(signal) - baseline_mean)
if not np.isfinite(amplitude):
amplitude = float(np.max(signal))
if channel_name != "Ion_Grid" and amplitude <= 0.0:
amplitude = float(np.max(signal))

rise_time_0 = 0.371 # How fast the signal rises (s)
discharge_time_0 = 37.1 # How fast signal decays (s)
Comment thread
lacoak21 marked this conversation as resolved.

p0 = [time_of_impact, constant_offset, amplitude, rise_time_0, discharge_time_0]
positive_min = float(np.finfo(float).eps)
amplitude_lower_bound: float = positive_min
amplitude_upper_bound: float = float(np.inf)
if channel_name == "Ion_Grid":
if np.isfinite(amplitude) and amplitude < 0.0:
amplitude_lower_bound = float(-np.inf)
amplitude_upper_bound = -positive_min
else:
amplitude_lower_bound = positive_min
amplitude_upper_bound = float(np.inf)
bounds = (
[-np.inf, -np.inf, amplitude_lower_bound, positive_min, positive_min],
[np.inf, np.inf, amplitude_upper_bound, np.inf, np.inf],
)

try:
with np.errstate(invalid="ignore", over="ignore"):
Expand All @@ -818,6 +850,7 @@ def estimate_dust_mass(
time,
signal,
p0=p0,
bounds=bounds,
maxfev=100_000, # , epsfcn=1e-10
)
except RuntimeError as e:
Comment thread
lacoak21 marked this conversation as resolved.
Expand All @@ -836,8 +869,11 @@ def estimate_dust_mass(
)

impact_fit = fit_impact(time, *param)
# Calculate the resulting signal amplitude after removing baseline noise
sig_amp = max(impact_fit) - np.mean(signal_baseline)
# Evaluate the analytic extremum of the fitted pulse instead of relying on the
# discrete sample grid.
t_max = param[0] + param[3] * np.log((param[4] / param[3]) + 1.0)
y_max = float(fit_impact(np.asarray([t_max], dtype=float), *param)[0])
sig_amp = abs(float(y_max - param[1]))
Comment thread
lacoak21 marked this conversation as resolved.
chisqr, redchi = chi_square(signal, impact_fit, len(p0))

return param, float(sig_amp), chisqr, redchi, impact_fit
Expand Down
140 changes: 140 additions & 0 deletions imap_processing/tests/idex/test_idex_l2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,146 @@ def test_estimate_dust_mass_no_noise_removal():
assert np.allclose(result, signal)


def test_estimate_dust_mass_logs_baseline_warning(caplog):
"""
Test that estimate_dust_mass() logs a warning if no baseline is found.
"""
time = xr.DataArray(np.linspace(-60, 60, 16))
signal = xr.DataArray(np.linspace(0, 1, 16))
original_any = np.any

def fake_any(arr):
fake_any.calls += 1
if fake_any.calls == 1:
return False
return original_any(arr)

fake_any.calls = 0

with (
mock.patch("imap_processing.idex.idex_l2a.np.any", side_effect=fake_any),
mock.patch(
"imap_processing.idex.idex_l2a.curve_fit",
return_value=(np.array([0.0, 0.0, 1.0, 0.371, 37.1]), None),
),
caplog.at_level("WARNING"),
):
estimate_dust_mass(time, signal)

assert any(
"Unable to find baseline noise" in message
for message in caplog.text.splitlines()
)


def test_estimate_dust_mass_remove_noise_logs_debug(caplog):
"""
Test that estimate_dust_mass() logs that remove_noise is ignored.
"""
time = xr.DataArray(np.linspace(-60, 60, 64))
signal = xr.DataArray(
fit_impact(
time.data,
time_of_impact=0.0,
constant_offset=1.0,
amplitude=10.0,
rise_time=0.371,
discharge_time=0.371,
)
)

with caplog.at_level("DEBUG"):
estimate_dust_mass(time, signal, remove_noise=True)

assert any(
"remove_noise is ignored for this fit path" in message
for message in caplog.text.splitlines()
)


def test_estimate_dust_mass_nonfinite_signal_fallbacks():
"""
Test fallback handling when the input signal is entirely non-finite.
"""
time = xr.DataArray(np.linspace(-60, 60, 16))
signal = xr.DataArray(np.full(16, np.nan))

with mock.patch(
"imap_processing.idex.idex_l2a.curve_fit",
return_value=(np.array([0.0, 0.0, 1.0, 0.371, 37.1]), None),
) as mocked_curve_fit:
estimate_dust_mass(time, signal)

assert np.isnan(mocked_curve_fit.call_args.kwargs["p0"][2])


def test_estimate_dust_mass_non_ion_grid_negative_amplitude_fallback():
"""
Test the non-Ion_Grid negative-amplitude fallback path.
"""
time = xr.DataArray(np.linspace(-60, 60, 16))
signal = xr.DataArray(np.full(16, -2.0))

with mock.patch(
"imap_processing.idex.idex_l2a.curve_fit",
return_value=(np.array([0.0, -2.0, -2.0, 0.371, 37.1]), None),
) as mocked_curve_fit:
estimate_dust_mass(time, signal)

assert mocked_curve_fit.call_args.kwargs["p0"][2] == -2.0


def test_estimate_dust_mass_ion_grid_negative_amplitude_bounds():
"""
Test that Ion Grid fits allow negative amplitudes.
"""
time = xr.DataArray(np.linspace(-60, 60, 64))
signal = xr.DataArray(
fit_impact(
time.data,
time_of_impact=0.0,
constant_offset=0.0,
amplitude=-5.0,
rise_time=0.371,
discharge_time=0.371,
)
)

with mock.patch(
"imap_processing.idex.idex_l2a.curve_fit",
return_value=(np.array([0.0, 0.0, -5.0, 0.371, 37.1]), None),
) as mocked_curve_fit:
estimate_dust_mass(time, signal, waveform_name="Ion_Grid")

bounds = mocked_curve_fit.call_args.kwargs["bounds"]
assert bounds[0][2] == -np.inf
assert bounds[1][2] < 0.0


def test_estimate_dust_mass_curve_fit_failure_returns_nans(caplog):
"""
Test that estimate_dust_mass() returns NaNs if the fit fails.
"""
time = xr.DataArray(np.linspace(-60, 60, 16))
signal = xr.DataArray(np.linspace(0, 1, 16))

with (
mock.patch(
"imap_processing.idex.idex_l2a.curve_fit",
side_effect=RuntimeError("fit failed"),
),
caplog.at_level("WARNING"),
):
param, sig_amp, chisqr, redchi, result = estimate_dust_mass(time, signal)

assert any("Failed to fit curve" in message for message in caplog.text.splitlines())
assert np.all(np.isnan(param))
assert np.isnan(sig_amp)
assert np.isnan(chisqr)
assert np.isnan(redchi)
assert np.all(np.isnan(result))


def test_lowpass_filter():
"""
Tests that the lowpass filter is filtering out high frequency signals.
Expand Down
Loading