Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions imap_processing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ class Hi(ProcessInstrument):

def do_processing( # noqa: PLR0912
self, dependencies: ProcessingInputCollection
) -> list[xr.Dataset | Path]:
) -> list[xr.Dataset]:
"""
Perform IMAP-Hi specific processing.

Expand All @@ -789,10 +789,6 @@ def do_processing( # noqa: PLR0912
print(f"Processing IMAP-Hi {self.data_level}")
datasets: list[xr.Dataset] = []

# Check self.repointing is not None (for mypy type checking)
if self.repointing is None:
raise ValueError("Repointing must be provided for Hi processing.")

if self.data_level == "l1a":
science_files = dependencies.get_file_paths(source="hi")
if len(science_files) != 1:
Expand All @@ -806,6 +802,12 @@ def do_processing( # noqa: PLR0912
if l0_files:
datasets = hi_l1b.housekeeping(l0_files[0])
elif "goodtimes" in self.descriptor:
# Check self.repointing is not None (for mypy type checking)
if self.repointing is None:
raise ValueError(
"Repointing must be provided for Hi Goodtimes processing."
)

# Goodtimes processing
l1b_de_paths = dependencies.get_file_paths(
source="hi", data_type="l1b", descriptor="de"
Expand Down
83 changes: 47 additions & 36 deletions imap_processing/hi/hi_goodtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CalibrationProductConfig,
CoincidenceBitmap,
HiConstants,
compute_qualified_event_mask,
parse_sensor_number,
)
from imap_processing.quality_flags import ImapHiL1bDeFlags
Expand Down Expand Up @@ -229,11 +230,28 @@ def _apply_goodtimes_filters(
stats = goodtimes_ds.goodtimes.get_cull_statistics()
logger.info(f"Initial good bins: {stats['good_bins']}/{stats['total_bins']}")

# Build set of qualified coincidence types from calibration product config
qualified_coincidence_types: set[int] = set()
for coin_types in cal_product_config["coincidence_type_values"]:
qualified_coincidence_types.update(coin_types)
logger.info(f"Qualified coincidence types: {qualified_coincidence_types}")
# Pre-compute qualified event masks for each dataset
# These masks check BOTH coincidence_type AND TOF windows
for l1b_de in l1b_de_datasets:
ccsds_index = l1b_de["ccsds_index"].values

# Handle invalid events (FILLVAL trigger_id) to avoid IndexError
# For pointings with no valid events, trigger_id will be at FILLVAL
trigger_id_fillval = l1b_de["trigger_id"].attrs.get("FILLVAL", 65535)
valid_events = l1b_de["trigger_id"].values != trigger_id_fillval

# Initialize with -1 (won't match any config row since ESA energy steps > 0)
esa_energy_steps = np.full(len(ccsds_index), -1, dtype=np.int32)
if np.any(valid_events):
esa_energy_steps[valid_events] = l1b_de["esa_energy_step"].values[
ccsds_index[valid_events]
]

l1b_de["qualified_mask"] = xr.DataArray(
compute_qualified_event_mask(l1b_de, cal_product_config, esa_energy_steps),
dims=["event_met"],
)
logger.info("Pre-computed qualified event masks for all datasets")

# === Apply culling filters ===

Expand All @@ -259,15 +277,13 @@ def _apply_goodtimes_filters(
goodtimes_ds,
l1b_de_datasets,
current_index,
qualified_coincidence_types,
)

# 6. Statistical Filter 2 - short-lived event pulses
logger.info("Applying filter: mark_statistical_filter_2")
mark_statistical_filter_2(
goodtimes_ds,
current_l1b_de,
qualified_coincidence_types,
)


Expand Down Expand Up @@ -1390,7 +1406,7 @@ def mark_statistical_filter_0(

def _compute_qualified_counts_per_sweep(
l1b_de: xr.Dataset,
qualified_coincidence_types: set[int],
qualified_mask: np.ndarray,
) -> xr.Dataset:
"""
Compute qualified calibration product counts per 8-spin interval and reshape.
Expand All @@ -1402,8 +1418,9 @@ def _compute_qualified_counts_per_sweep(
----------
l1b_de : xarray.Dataset
L1B Direct Event dataset with esa_sweep coordinate on epoch dimension.
qualified_coincidence_types : set[int]
Set of coincidence type integers that qualify for calibration products.
qualified_mask : np.ndarray
Boolean mask indicating which events qualify for calibration products.
This mask should check BOTH coincidence_type AND TOF windows.

Returns
-------
Expand All @@ -1416,13 +1433,12 @@ def _compute_qualified_counts_per_sweep(
raise ValueError("Dataset must have esa_sweep coordinate")

# Get values needed for counting
coincidence_type = l1b_de["coincidence_type"].values
ccsds_index = l1b_de["ccsds_index"].values
esa_sweep = l1b_de.coords["esa_sweep"].values
esa_energy_step = l1b_de["esa_energy_step"].values

# Identify qualified events
is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types))
# Use pre-computed qualified mask
is_qualified = qualified_mask

# Map qualified events to their packet's (esa_sweep, esa_energy_step)
qualified_packet_idx = ccsds_index[is_qualified]
Expand Down Expand Up @@ -1460,17 +1476,16 @@ def _compute_qualified_counts_per_sweep(

def _build_per_sweep_datasets(
l1b_de_datasets: list[xr.Dataset],
qualified_coincidence_types: set[int],
) -> dict[int, xr.Dataset]:
"""
Build per-sweep datasets with qualified counts for each Pointing.

Parameters
----------
l1b_de_datasets : list[xarray.Dataset]
List of L1B DE datasets for multiple Pointings.
qualified_coincidence_types : set[int]
Set of coincidence type integers that qualify for calibration products.
List of L1B DE datasets for multiple Pointings. Each dataset must
contain a "qualified_mask" DataArray indicating which events qualify
for calibration products.

Returns
-------
Expand All @@ -1484,7 +1499,7 @@ def _build_per_sweep_datasets(
# Add esa_sweep coordinate and compute counts per 8-spin interval
l1b_de_with_sweep = _add_sweep_indices(l1b_de)
per_sweep = _compute_qualified_counts_per_sweep(
l1b_de_with_sweep, qualified_coincidence_types
l1b_de_with_sweep, l1b_de["qualified_mask"].values
)
per_sweep_datasets[i] = per_sweep

Expand Down Expand Up @@ -1684,7 +1699,6 @@ def mark_statistical_filter_1(
goodtimes_ds: xr.Dataset,
l1b_de_datasets: list[xr.Dataset],
current_index: int,
qualified_coincidence_types: set[int],
consecutive_threshold_sigma: float = HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA,
extreme_threshold_sigma: float = HiConstants.STAT_FILTER_1_EXTREME_SIGMA,
min_consecutive_intervals: int = HiConstants.STAT_FILTER_1_MIN_CONSECUTIVE,
Expand All @@ -1711,11 +1725,11 @@ def mark_statistical_filter_1(
Goodtimes dataset for the current Pointing to update.
l1b_de_datasets : list[xarray.Dataset]
List of L1B DE datasets for surrounding Pointings. Typically includes
current plus 3 preceding and 3 following Pointings.
current plus 3 preceding and 3 following Pointings. Each dataset must
contain a "qualified_mask" DataArray indicating which events qualify
for calibration products (checking both coincidence_type AND TOF).
current_index : int
Index of the current Pointing in l1b_de_datasets.
qualified_coincidence_types : set[int]
Set of coincidence type integers that qualify for calibration products.
consecutive_threshold_sigma : float, optional
Sigma multiplier for consecutive interval check.
Default is HiConstants.STAT_FILTER_1_CONSECUTIVE_SIGMA.
Expand Down Expand Up @@ -1758,9 +1772,7 @@ def mark_statistical_filter_1(
)

# Step 1: Build per-sweep datasets with qualified counts for each Pointing
per_sweep_datasets = _build_per_sweep_datasets(
l1b_de_datasets, qualified_coincidence_types
)
per_sweep_datasets = _build_per_sweep_datasets(l1b_de_datasets)

# Step 2: Compute median and sigma per ESA energy step using xarray
median_per_esa, sigma_per_esa = _compute_median_and_sigma_per_esa(
Expand Down Expand Up @@ -1902,13 +1914,14 @@ def _compute_bins_for_cluster(
# Generate bin indices with wrapping using modulo
bins_to_mark = np.arange(bin_low, bin_high + 1) % n_bins

logger.debug(f"Cluster {cluster_start} to {cluster_end} bins: {bins_to_mark}")

return bins_to_mark


def mark_statistical_filter_2(
goodtimes_ds: xr.Dataset,
l1b_de: xr.Dataset,
qualified_coincidence_types: set[int],
min_events: int = HiConstants.STAT_FILTER_2_MIN_EVENTS,
max_time_delta: float = HiConstants.STAT_FILTER_2_MAX_TIME_DELTA,
bin_padding: int = HiConstants.STAT_FILTER_2_BIN_PADDING,
Expand Down Expand Up @@ -1942,9 +1955,8 @@ def mark_statistical_filter_2(
- coincidence_type: detector coincidence bitmap
- nominal_bin: spacecraft spin bin (0-89)
- esa_step: ESA energy step for each packet
qualified_coincidence_types : set[int]
Set of coincidence type integers qualifying as calibration
products 1 or 2.
- qualified_mask: boolean mask indicating which events qualify for
calibration products (checking both coincidence_type AND TOF windows)
min_events : int, optional
Minimum events to form a pulse cluster.
Default is HiConstants.STAT_FILTER_2_MIN_EVENTS.
Expand Down Expand Up @@ -1981,19 +1993,18 @@ def mark_statistical_filter_2(

# Add event-level coordinates for grouping
l1b_de_with_sweep = l1b_de_with_sweep.assign_coords(
event_sweep=("event", esa_sweep[ccsds_index]),
event_step=("event", esa_step[ccsds_index]),
event_sweep=("event_met", esa_sweep[ccsds_index]),
event_step=("event_met", esa_step[ccsds_index]),
)

# Filter to qualified events
coincidence_type = l1b_de_with_sweep["coincidence_type"].values
is_qualified = np.isin(coincidence_type, list(qualified_coincidence_types))
# Get qualified mask from the dataset
qualified_mask = l1b_de["qualified_mask"].values

if not np.any(is_qualified):
if not np.any(qualified_mask):
logger.info("Statistical Filter 2: No qualified events found")
return

qualified_events = l1b_de_with_sweep.isel(event=is_qualified)
qualified_events = l1b_de_with_sweep.isel(event_met=qualified_mask)

n_clusters_found = 0
n_bins_marked = 0
Expand Down
122 changes: 26 additions & 96 deletions imap_processing/hi/hi_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@

import logging
from pathlib import Path
from typing import NamedTuple

import numpy as np
import pandas as pd
import xarray as xr
from numpy import typing as npt
from numpy._typing import NDArray

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import parse_filename_like
Expand All @@ -19,6 +17,7 @@
HiConstants,
create_dataset_variables,
full_dataarray,
iter_qualified_events_by_config,
parse_sensor_number,
)
from imap_processing.spice.geometry import (
Expand Down Expand Up @@ -369,106 +368,37 @@ def pset_counts(
)
de_ds = de_ds.isel(event_met=good_mask)

# Get esa_energy_step for each event (recorded per packet, use ccsds_index)
esa_energy_steps = l1b_de_dataset["esa_energy_step"].data[de_ds["ccsds_index"].data]

# The calibration product configuration potentially has different coincidence
# types for each ESA and different TOF windows for each calibration product,
# esa energy step combination. Because of this we need to filter DEs that
# belong to each combo individually.
# Loop over the esa_energy_step values first
for esa_energy, esa_df in config_df.groupby(level="esa_energy_step"):
# Create a mask for all DEs at the current esa_energy_step.
# esa_energy_step is recorded for each packet rather than for each DE,
# so we use ccsds_index to get the esa_energy_step for each DE
esa_mask = (
l1b_de_dataset["esa_energy_step"].data[de_ds["ccsds_index"].data]
== esa_energy
# esa energy step combination. Use the shared generator to iterate over all
# config combinations and get qualified event masks.
for esa_energy, config_row, qualified_mask in iter_qualified_events_by_config(
de_ds, config_df, esa_energy_steps
):
Comment on lines +378 to +380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take this out iter_qualified_events_by_config( de_ds, config_df, esa_energy_steps ) and assign to a variable and then use in the for loop? It may help with performance.

# Filter events using the qualified mask
filtered_de_ds = de_ds.isel(event_met=qualified_mask)

# Bin remaining DEs into spin-bins
i_esa = np.flatnonzero(pset_coords["esa_energy_step"].data == esa_energy)[0]
# spin_phase is in the range [0, 1). Multiplying by N_SPIN_BINS and
# truncating to an integer gives the correct bin index
spin_bin_indices = (filtered_de_ds["spin_phase"].data * N_SPIN_BINS).astype(int)
# When iterating over rows of a dataframe, the names of the multi-index
# are not preserved. Below, `config_row.Index[0]` gets the
# calibration_prod value from the namedtuple representing the
# dataframe row. We map this to the array index using cal_prod_to_index.
i_cal_prod = cal_prod_to_index[config_row.Index[0]]
np.add.at(
counts_var["counts"].data[0, i_esa, i_cal_prod],
spin_bin_indices,
1,
)
# Now loop over the calibration products for the current ESA energy
for config_row in esa_df.itertuples():
# Remove DEs that are not at the current ESA energy and in the list
# of coincidence types for the current calibration product
type_mask = de_ds["coincidence_type"].isin(
config_row.coincidence_type_values
)
filtered_de_ds = de_ds.isel(event_met=(esa_mask & type_mask))

# Use the TOF window mask to remove DEs with TOFs outside the allowed range
tof_fill_vals = {
f"tof_{detector_pair}": l1b_de_dataset[f"tof_{detector_pair}"].attrs[
"FILLVAL"
]
for detector_pair in CalibrationProductConfig.tof_detector_pairs
}
tof_in_window_mask = get_tof_window_mask(
filtered_de_ds, config_row, tof_fill_vals
)
filtered_de_ds = filtered_de_ds.isel(event_met=tof_in_window_mask)

# Bin remaining DEs into spin-bins
i_esa = np.flatnonzero(pset_coords["esa_energy_step"].data == esa_energy)[0]
# spin_phase is in the range [0, 1). Multiplying by N_SPIN_BINS and
# truncating to an integer gives the correct bin index
spin_bin_indices = (filtered_de_ds["spin_phase"].data * N_SPIN_BINS).astype(
int
)
# When iterating over rows of a dataframe, the names of the multi-index
# are not preserved. Below, `config_row.Index[0]` gets the
# calibration_prod value from the namedtuple representing the
# dataframe row. We map this to the array index using cal_prod_to_index.
i_cal_prod = cal_prod_to_index[config_row.Index[0]]
np.add.at(
counts_var["counts"].data[0, i_esa, i_cal_prod],
spin_bin_indices,
1,
)
return counts_var


def get_tof_window_mask(
de_ds: xr.Dataset, prod_config_row: NamedTuple, fill_vals: dict
) -> NDArray[bool]:
"""
Generate a mask indicating which DEs to keep based on TOF windows.

Parameters
----------
de_ds : xarray.Dataset
The Direct Event Dataset for the DEs to filter based on the TOF
windows.
prod_config_row : NamedTuple
A single row of the prod config dataframe represented as a named tuple.
fill_vals : dict
A dictionary containing the fill values used in the input DE TOF
dataframe values. This value should be derived from the L1B DE CDF
TOF variable attributes.

Returns
-------
window_mask : np.ndarray
A mask with one entry per DE in the input `de_df` indicating which DEs
contain TOF values within the windows specified by `prod_config_row`.
The mask is intended to directly filter the DE dataframe.
"""
detector_pairs = CalibrationProductConfig.tof_detector_pairs
tof_in_window_mask = np.empty(
(len(detector_pairs), len(de_ds["event_met"])), dtype=bool
)
for i_pair, detector_pair in enumerate(detector_pairs):
low_limit = getattr(prod_config_row, f"tof_{detector_pair}_low")
high_limit = getattr(prod_config_row, f"tof_{detector_pair}_high")
tof_array = de_ds[f"tof_{detector_pair}"].data
# The TOF in window mask contains True wherever the TOF is within
# the configuration low/high bounds OR the FILLVAL is present. The
# FILLVAL indicates that the detector pair was not hit. DEs with
# the incorrect coincidence_type are already filtered out and this
# implementation simplifies combining the tof_in_window_masks in
# the next step.
tof_in_window_mask[i_pair] = np.logical_or(
np.logical_and(low_limit <= tof_array, tof_array <= high_limit),
tof_array == fill_vals[f"tof_{detector_pair}"],
)
return np.all(tof_in_window_mask, axis=0)


def pset_backgrounds(pset_coords: dict[str, xr.DataArray]) -> dict[str, xr.DataArray]:
"""
Calculate pointing set backgrounds and background uncertainties.
Expand Down
Loading