forked from IMAP-Science-Operations-Center/imap_processing
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
588 lines (513 loc) · 18.8 KB
/
utils.py
File metadata and controls
588 lines (513 loc) · 18.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
"""
Various classes and functions used throughout CoDICE processing.
This module contains utility classes and functions that are used by various
other CoDICE processing modules.
"""
import json
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
import numpy as np
import xarray as xr
from imap_processing.codice import constants
@dataclass
class ViewTabInfo:
"""
Class to hold view table information.
Attributes
----------
apid : int
The APID for the packet.
collapse_table : int
Collapse table id used to determine the collapse pattern.
compression : int
Compression algorithm used for the packet.
sensor : int
Sensor id (0 for LO, 1 for HI).
three_d_collapsed : int
The 3D collapsed value from the LUT.
view_id : int
The view identifier from the packet.
"""
apid: int
collapse_table: int
compression: int
sensor: int
three_d_collapsed: int
view_id: int
class CODICEAPID(IntEnum):
"""Create ENUM for CoDICE APIDs."""
COD_AUT = 1120
COD_BOOT_HK = 1121
COD_BOOT_MEMDMP = 1122
COD_COUNTS_COMMON = 1135
COD_NHK = 1136
COD_EVTMSG = 1137
COD_MEMDMP = 1138
COD_SHK = 1139
COD_RTS = 1141
COD_DIAG_CDHFPGA = 1144
COD_DIAG_SNSR_HV = 1145
COD_DIAG_OPTC_HV = 1146
COD_DIAG_APDFPGA = 1147
COD_DIAG_SSDFPGA = 1148
COD_DIAG_FSW = 1149
COD_DIAG_SYSVARS = 1150
COD_LO_IAL = 1152
COD_LO_PHA = 1153
COD_LO_SW_PRIORITY_COUNTS = 1155
COD_LO_SW_SPECIES_COUNTS = 1156
COD_LO_NSW_SPECIES_COUNTS = 1157
COD_LO_SW_ANGULAR_COUNTS = 1158
COD_LO_NSW_ANGULAR_COUNTS = 1159
COD_LO_NSW_PRIORITY_COUNTS = 1160
COD_LO_INST_COUNTS_AGGREGATED = 1161
COD_LO_INST_COUNTS_SINGLES = 1162
COD_HI_IAL = 1168
COD_HI_PHA = 1169
COD_HI_INST_COUNTS_AGGREGATED = 1170
COD_HI_INST_COUNTS_SINGLES = 1171
COD_HI_OMNI_SPECIES_COUNTS = 1172
COD_HI_SECT_SPECIES_COUNTS = 1173
COD_HI_INST_COUNTS_PRIORITIES = 1174
COD_CSTOL_CONFIG = 2457
class CoDICECompression(IntEnum):
"""Create ENUM for CoDICE compression algorithms."""
NO_COMPRESSION = 0
LOSSY_A = 1
LOSSY_B = 2
LOSSLESS = 3
LOSSY_A_LOSSLESS = 4
LOSSY_B_LOSSLESS = 5
PACK_24_BIT = 6
class SegmentedPacketOrder(IntEnum):
"""ENUM for segmented packet order."""
UNSEGMENTED = 3
FIRST_SEGMENT = 1
CONTINUATION_SEGMENT = 0
LAST_SEGMENT = 2
def read_sci_lut(file_path: Path, table_id: str) -> dict:
"""
Read the SCI-LUT JSON file for a specific table ID.
Parameters
----------
file_path : pathlib.Path
Path to the SCI-LUT JSON file.
table_id : str
Table identifier to extract from the JSON.
Returns
-------
dict
The SCI-LUT data for the specified table id.
"""
sci_lut_data = json.loads(file_path.read_text()).get(f"{table_id}")
if sci_lut_data is None:
raise ValueError(f"SCI-LUT file does not have data for table ID {table_id}.")
return sci_lut_data
def get_view_tab_info(json_data: dict, view_id: int, apid: int) -> dict:
"""
Get the view table information for a specific view and APID.
Parameters
----------
json_data : dict
The JSON data loaded from the SCI-LUT file.
view_id : int
The view ID from the packet.
apid : int
The APID from the packet.
Returns
-------
dict
The view table information containing details like sensor,
collapse_table, data_product, etc.
"""
apid_hex = f"0x{apid:X}"
# This is how we get view information that will be used to get
# collapse pattern:
# table_id -> view_tab -> (view_id, apid) -> sensor -> collapse_table ->compression
# 'view_tab': {'(0, 0x480)': {'collapse_table': 0, '3d_collapse': 1, 'sensor': 0,
# 'compression':0}
view_tab = json_data.get("view_tab").get(f"({view_id}, {apid_hex})")
return view_tab
def get_view_tab_obj(
lut_file: Path, table_id: str, view_id: int, apid: int
) -> tuple[dict, "ViewTabInfo"]:
"""
Read the SCI-LUT and build a ViewTabInfo for the given table ID.
Parameters
----------
lut_file : pathlib.Path
Path to the SCI-LUT JSON file.
table_id : str
Table identifier to extract from the JSON.
view_id : int
The view ID from the packet.
apid : int
The APID from the packet.
Returns
-------
tuple[dict, ViewTabInfo]
The SCI-LUT data dict and a populated ViewTabInfo for the given table ID.
"""
sci_lut_data = read_sci_lut(lut_file, table_id)
view_tab_info = get_view_tab_info(sci_lut_data, view_id, apid)
view_tab_obj = ViewTabInfo(
apid=apid,
view_id=view_id,
sensor=view_tab_info["sensor"],
three_d_collapsed=view_tab_info["3d_collapse"],
collapse_table=view_tab_info["collapse_table"],
compression=view_tab_info["compression"],
)
return sci_lut_data, view_tab_obj
def process_by_table_id(
unpacked_dataset: xr.Dataset,
lut_file: Path,
process_fn: Callable[..., xr.Dataset],
) -> xr.Dataset:
"""
Split dataset by unique table_id values, process each group, and recombine.
This is the shared wrapper logic used by all non-DE/NHK L1A processing
functions. It extracts the fields that are uniform across a packet stream
(``view_id``, ``apid``, ``plan_id``, ``plan_step``), iterates over every
unique ``table_id`` found in the dataset, filters to that group via
``isel``, calls *process_fn* for each group, and finally concatenates the
results sorted by epoch.
Parameters
----------
unpacked_dataset : xarray.Dataset
Full unpacked dataset from the L0 packet file.
lut_file : pathlib.Path
Path to the SCI-LUT JSON file passed through to *process_fn*.
process_fn : Callable
The private ``_process_xxx`` function to call for each table_id group.
It must accept the signature
``(group_ds, lut_file, table_id, view_id, apid, plan_id, plan_step)``
and return an ``xr.Dataset``.
Returns
-------
xarray.Dataset
Combined L1A dataset sorted by epoch.
"""
view_id = unpacked_dataset["view_id"].values[0]
apid = unpacked_dataset["pkt_apid"].values[0]
plan_id = unpacked_dataset["plan_id"].values[0]
plan_step = unpacked_dataset["plan_step"].values[0]
unique_table_ids = np.unique(unpacked_dataset["table_id"].values)
processed = [
process_fn(
unpacked_dataset.isel(
epoch=unpacked_dataset["table_id"].values == table_id
),
lut_file,
table_id,
view_id,
apid,
plan_id,
plan_step,
)
for table_id in unique_table_ids
]
if len(processed) == 1:
return processed[0]
return xr.concat(processed, dim="epoch").sortby("epoch")
def get_collapse_pattern_shape(
json_data: dict, sensor_id: int, collapse_table_id: int
) -> tuple[int, ...]:
"""
Get the collapse pattern for a specific sensor id and collapse table id.
Parameters
----------
json_data : dict
The JSON data loaded from the SCI-LUT file.
sensor_id : int
Sensor identifier (0 for LO, 1 for HI).
collapse_table_id : int
Collapse table id to look up in the SCI-LUT.
Returns
-------
tuple[int, int]
(<spin_sector, inst_azimuth>) describing the collapsed pattern. Examples:
``(1,)`` for a fully collapsed 1-D pattern or ``(N, M)`` for a
reduced 2-D pattern.
"""
sensor = "lo" if sensor_id == 0 else "hi"
collapse_matrix = np.array(
json_data[f"collapse_{sensor}"][f"{collapse_table_id}"]["matrix"]
)
# Analyze the collapse pattern matrix to determine its reduced shape.
# Steps:
# - Extract non-zero elements from the matrix.
# - Reshape to group unique non-zero rows and columns.
# - If all non-zero values are identical, return (1,) for a fully collapsed pattern.
# - Otherwise, compute the number of unique rows and columns to describe the
# reduced shape.
non_zero_data = np.where(collapse_matrix != 0)
non_zero_reformatted = collapse_matrix[non_zero_data].reshape(
np.unique(non_zero_data[0]).size, np.unique(non_zero_data[1]).size
)
if np.unique(non_zero_reformatted).size == 1:
# all non-zero values are identical means -> fully collapsed
return (1,)
# If not fully collapsed, find repeated patterns in rows and columns
# to reduce shape further.
unique_rows = np.unique(non_zero_reformatted, axis=0)
unique_columns = np.unique(non_zero_reformatted, axis=1)
# Unique spin sectors and instrument azimuths to unpack data
unique_spin_sectors = unique_columns.shape[1]
unique_inst_azs = unique_rows.shape[0]
return (unique_spin_sectors, unique_inst_azs)
def get_counters_aggregated_pattern(
json_data: dict, sensor_id: int, collapse_table_id: int
) -> dict:
"""
Return the aggregated counters pattern from the SCI-LUT JSON.
The counters aggregated pattern is stored as {key: list} in the SCI-LUT JSON.
Each variable can be turned on and off in-flight. Because of that, we need to
be flexible. If any variable is turned off, its corresponding row in the
matrix will be all zeros and fill CDF variable for that row with zeros.
Parameters
----------
json_data : dict
The JSON data loaded from the SCI-LUT file.
sensor_id : int
Sensor identifier (0 for LO, 1 for HI).
collapse_table_id : int
Collapse table id to look up in the SCI-LUT.
Returns
-------
dict
The counters key and its corresponding collapse pattern.
"""
sensor = "lo" if sensor_id == 0 else "hi"
full_matrix = json_data[f"collapse_{sensor}"][f"{collapse_table_id}"]["variables"]
# Filter non-zero rows only
non_zero_rows = {
k: data_list for k, data_list in full_matrix.items() if 0 not in data_list
}
# Sort keys in order of unique num of their list.
# Eg. CoDICE Hi's counters-aggregated is not collapsed
# in the order of row by row. It could have collected in this order:
# [
# [1....1],
# [2....2],
# [3....3],
# [4....4],
# [7....7],
# [8....8],
# [11....11],
# [5....5],
# [6....6],
# [9....9],
# [10....10],
# ]
# Sort to get:
# [
# [1....1],
# [2....2],
# ...
# [11....11],
in_order_rows = dict(sorted(non_zero_rows.items(), key=lambda item: item[1][0]))
# Now get collapse pattern for all variables by finding
# collapse pattern for the first key. Then replace all key's
# with that because it should be same. If not,
# that will effect these remaining logic.
first_key = next(iter(in_order_rows))
collapse_patterns = np.array(in_order_rows[first_key])
# We only look for collapse pattern of columns because each variable
# are rows in the collapse pattern matrix.
unique_columns = np.unique(collapse_patterns, axis=0)
unique_spin_sectors = unique_columns.shape[0]
for key in in_order_rows:
in_order_rows[key] = unique_spin_sectors
return in_order_rows
def index_to_position(
json_data: dict, sensor_id: int, collapse_table_id: int
) -> np.ndarray:
"""
Get the indices of non-zero unique rows in the collapse pattern matrix.
Parameters
----------
json_data : dict
The JSON data loaded from the SCI-LUT file.
sensor_id : int
Sensor identifier (0 for LO, 1 for HI).
collapse_table_id : int
Collapse table id to look up in the SCI-LUT.
Returns
-------
np.ndarray
Array of indices corresponding to non-zero unique rows.
"""
sensor = "lo" if sensor_id == 0 else "hi"
collapse_matrix = np.array(
json_data[f"collapse_{sensor}"][f"{collapse_table_id}"]["matrix"]
)
# Find unique non-zero rows and their original indices
non_zero_row_mask = np.any(collapse_matrix != 0, axis=1)
non_zero_rows = collapse_matrix[non_zero_row_mask]
_, unique_indices = np.unique(non_zero_rows, axis=0, return_index=True)
non_zero_row_indices = np.flatnonzero(non_zero_row_mask)[unique_indices]
return non_zero_row_indices
def get_codice_epoch_time(
acq_start_seconds: np.ndarray,
acq_start_subseconds: np.ndarray,
spin_period: np.ndarray,
view_tab_obj: ViewTabInfo,
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate center time and delta.
Parameters
----------
acq_start_seconds : np.ndarray
Array of acquisition start seconds.
acq_start_subseconds : np.ndarray
Array of acquisition start subseconds.
spin_period : np.ndarray
Array of spin periods.
view_tab_obj : ViewTabInfo
The view table information object. It contains information such as sensor ID
and three_d_collapsed value and others.
Returns
-------
tuple[np.ndarray, np.ndarray]
(center_times (s), delta_times (ns)). center_times is converted to
nanoseconds at CDF write time.
"""
# If Lo sensor
if view_tab_obj.sensor == 0:
# Lo sensor, we need to set spins to be constant.
# 32 half spins makes full 16 spins for all non direct event products.
# But Lo direct event's spins is also 16 spins. Because of that, we can use
# the same calculation for all Lo products.
num_spins = 16.0
# If Hi sensor and Direct Event product
elif view_tab_obj.sensor == 1 and view_tab_obj.apid == CODICEAPID.COD_HI_PHA:
# Use constant 16 spins for Hi PHA
num_spins = 16.0
# If Non-Direct Event Hi product
else:
# Use 3d_collapsed value from LUT for other Hi products
num_spins = view_tab_obj.three_d_collapsed
# Units of 'spin ticks', where one 'spin tick' equals 320 microseconds.
# It takes multiple spins to collect data for a view.
spin_period_ns = spin_period.astype(np.float64) * 320 * 1e3 # Convert to ns
delta_times = (num_spins * spin_period_ns) / 2
# subseconds need to converted to seconds using this formula per CoDICE team:
# subseconds / 65536 gives seconds
center_times_seconds = (
acq_start_seconds + acq_start_subseconds / 65536 + (delta_times / 1e9)
)
return center_times_seconds, delta_times
def calculate_acq_time_per_step(
low_stepping_tab: dict, esa_step_dim: int = 128
) -> np.ndarray:
"""
Calculate acquisition time per step from low stepping table.
Parameters
----------
low_stepping_tab : dict
The low stepping table from the SCI-LUT JSON.
esa_step_dim : int
The ESA step dimension size.
Returns
-------
np.ndarray
Array of acquisition times per step of shape (num_esa_steps,).
"""
# These tunable values are used to calculate acquisition time per step
tunable_values = low_stepping_tab["tunable_values"]
# pre-calculate values
sector_time = tunable_values["spin_time_ms"] / tunable_values["num_sectors_ms"]
sector_margin_ms = tunable_values["sector_margin_ms"]
dwell_fraction = tunable_values["dwell_fraction_percentage"]
min_hv_settle_ms = tunable_values["min_hv_settle_ms"]
max_hv_settle_ms = tunable_values["max_hv_settle_ms"]
num_steps_data = np.array(
low_stepping_tab["num_steps"].get("data"), dtype=np.float64
)
# If num_steps_data is less than 128, pad with nan
if len(num_steps_data) < constants.NUM_ESA_STEPS:
pad_size = esa_step_dim - len(num_steps_data)
num_steps_data = np.concatenate((num_steps_data, np.full(pad_size, np.nan)))
# Total non-acquisition time is in column (BD) of science LUT
dwell_fraction_percentage = float(sector_time) * (100.0 - dwell_fraction) / 100.0
# Calculate HV settle time per step not adjusted for Min/Max.
# It's in column (BF) of science LUT.
non_adjusted_hv_settle_per_step = (
dwell_fraction_percentage - sector_margin_ms
) / num_steps_data
hv_settle_per_step = np.minimum(
np.maximum(non_adjusted_hv_settle_per_step, min_hv_settle_ms), max_hv_settle_ms
)
# initialize array of nans for acquisition time per step
acq_time_per_step: np.ndarray = np.full(esa_step_dim, np.nan, dtype=np.float64)
# acquisition time per step in milliseconds
# sector_time - sector_margin_ms / num_steps - hv_settle_per_step
acq_time_per_step[: len(num_steps_data)] = (
(sector_time - sector_margin_ms) / num_steps_data
) - hv_settle_per_step
# Convert to seconds
return acq_time_per_step / 1e3
def get_energy_info(
energy_table: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate energy bin centers and deltas from energy table.
Parameters
----------
energy_table : np.ndarray
The species plus and minus energy array.
Returns
-------
centers : np.ndarray
The geometric centers of the energy bins.
deltas_minus : np.ndarray
The delta minus values of the energy bins.
deltas_plus : np.ndarray
The delta plus values of the energy bins.
"""
# Find the geometric centers and deltas of the energy bins
# The delta minus is the difference between the center of the bin
# and the 'left edge' of the bin. The delta plus is the difference
# between the 'right edge' of the bin and the center of the bin
min_energy = np.array(energy_table["min_energy"], dtype=np.float64)
max_energy = np.array(energy_table["max_energy"], dtype=np.float64)
centers = np.sqrt(min_energy * max_energy)
deltas_minus = centers - min_energy
deltas_plus = max_energy - centers
return centers, deltas_minus, deltas_plus
def apply_replacements_to_attrs(attrs: dict, replacements: dict) -> dict:
"""
Return a shallow-copied attrs dict with placeholders replaced.
This helper replaces occurrences of placeholders like '{species}' and
'{direction}' in string values using simple str.replace calls. It does
not use str.format to avoid errors when templates contain braces for
other reasons.
Parameters
----------
attrs : dict
The attributes dictionary to process (string values may contain
placeholders).
replacements : dict
Mapping of placeholder names (without braces) to replacement values.
Returns
-------
dict
New attributes dict with replacements applied to string values.
"""
if not isinstance(attrs, dict):
return attrs
new = {}
for k, v in attrs.items():
if isinstance(v, str):
s = v
for name, val in replacements.items():
if val is None:
continue
s = s.replace(f"{{{name}}}", str(val))
new[k] = s
else:
new[k] = v
return new