Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 136 additions & 35 deletions imap_processing/tests/ultra/unit/test_ultra_l1b_culling.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
flag_low_voltage,
flag_rates,
flag_scattering,
flag_statistical_outliers,
get_binned_energy_ranges,
get_binned_spins_edges,
get_de_rejection_mask,
get_energy_and_spin_dependent_rejection_mask,
get_energy_histogram,
get_energy_range_flags,
get_n_sigma,
get_poisson_stats,
get_pulses_per_spin,
get_spin_data,
get_valid_earth_angle_events,
Expand Down Expand Up @@ -337,7 +339,6 @@ def test_flag_low_voltage(test_data):
"leftdeflection_v": np.full(n_spins, 1.5),
}
)
flagged = 65535
spins = np.arange(n_spins)
spin_bin_size = 5
spin_period = np.full(n_spins, 15.0)
Expand All @@ -353,14 +354,14 @@ def test_flag_low_voltage(test_data):
# Check quality flag shape
assert quality_flags.shape == (len(spin_tbin_edges) - 1,)
# Check that every spin is flagged for low voltage
assert np.all(quality_flags == flagged)
assert np.all(quality_flags)

# Set only the first spin to be below threshold
mock_status_dataset["rightdeflection_v"].data[1:] += 5000
mock_status_dataset["leftdeflection_v"].data[1:] += 5000
quality_flags = flag_low_voltage(spin_tbin_edges, mock_status_dataset)
# Check that only the first spin is flagged for low voltage
assert np.all(quality_flags[0] == flagged)
assert np.all(quality_flags[0])
# The rest should not be flagged
assert np.all(quality_flags[1:] == 0)

Expand Down Expand Up @@ -388,9 +389,7 @@ def test_flag_low_voltage_incomplete_bins(test_data):

# check quality flag
assert quality_flags.shape == (n_spins // spin_bin_size,)
# Check that every spin is flagged for low voltage
flagged = 65535
assert np.all(quality_flags == flagged)
assert np.all(quality_flags)


def test_expand_bin_flags_to_spins(caplog):
Expand Down Expand Up @@ -479,9 +478,7 @@ def test_validate_voltage_cull():
xspin.spin_start_time.values,
spin_bin_size,
)
lv_flags = flag_low_voltage(
spin_tbin_edges, status_ds, lv_threshold, low_voltage_flag=1
)
lv_flags = flag_low_voltage(spin_tbin_edges, status_ds, lv_threshold)

assert np.array_equal(lv_flags, validation_low_voltage_qf)

Expand Down Expand Up @@ -644,10 +641,10 @@ def test_flag_high_energy():
energy_range_edges = np.array([3, 5, 7, 18, 25]) # Example energy bin edges
# Spin bin 1 (events 0-3) 4 events fall within the culling energy bin
# - This is above all the energy thresholds except the second one (10),
# so it should be flagged for all energy ranges except flag #2
# so it should be flagged for all energy ranges except flag the 2nd
# Spin bin 2 (events 4-7) only 1 event falls within the culling energy bin
# - This is above the lowest energy threshold (1) but below the rest, so
# it should only be flagged with the last energy range flag (#3)
# it should only be flagged at the last energy range
# Spin bin 3 (events 8-11) No events fall within the culling energy bin,
# so it should not be flagged for any energy range
energy = np.array([17, 16, 12, 15, 5, 8, 4, 6, 4, 1, 22, 20])
Expand All @@ -666,26 +663,31 @@ def test_flag_high_energy():
spin_tbin_edges = np.arange(
start=0, stop=len(energy) + 1, step=4
) # create spin bins of 4 seconds
energy_range_flags = get_energy_range_flags(energy_range_edges)
quality_flags = flag_high_energy(
de_dataset,
spin_tbin_edges,
energy_range_edges,
energy_range_flags,
None,
cull_thresholds,
90,
)

# check shape
assert len(quality_flags) == len(spin_tbin_edges) - 1
np.testing.assert_array_equal(
quality_flags.shape, (len(energy_range_edges) - 1, len(spin_tbin_edges) - 1)
)
# Assert that the first spin bin is flagged for high energy for all energy ranges
# except the second one
assert quality_flags[0] == (2**0 | 2**2 | 2**3)
assert quality_flags[0, 0]
assert not quality_flags[1, 0]
assert quality_flags[2, 0]
assert quality_flags[3, 0]
# Assert that the second spin bin is only flagged for high energy for the last
# energy range
assert quality_flags[1] == 2**3
# Assert that the third spin bin is not flagged for any energy range
assert quality_flags[2] == 0
# # energy range
assert quality_flags[3, 1]
assert not np.any(quality_flags[0:3, 1])
# # Assert that the third spin bin is not flagged for any energy range
assert not np.any(quality_flags[:, 2])


@pytest.mark.external_test_data
Expand Down Expand Up @@ -720,23 +722,122 @@ def test_validate_high_energy_cull():
intervals, _, _ = build_energy_bins()
# Get the energy ranges
energy_ranges = get_binned_energy_ranges(intervals)
flags = get_energy_range_flags(energy_ranges)
e_flags = flag_high_energy(
de_ds, spin_tbin_edges, energy_ranges, flags, mock_thresholds
)
# The ULTRA IT flags are shaped n_energy_ranges, spin_bin while the SDC
# is only spin_bin but different flags are set for different energy ranges, so we
# need to check each energy separately
# We also need to invert the expected flags since the ULTRA IT mask is True
# for good spins (counts below threshold) while the SDC quality flags are set
# for bad spins (counts exceed threshold).
for i in range(expected_qf.shape[0]):
np.testing.assert_array_equal(
(e_flags & 2**i) > 0,
~expected_qf[i, :].astype(bool),
err_msg=f"High energy flag mismatch for energy range {i} with edges"
f" {energy_ranges[i]}-{energy_ranges[i + 1]}",
)
de_ds, spin_tbin_edges, energy_ranges, None, mock_thresholds
)
np.testing.assert_array_equal(e_flags, ~expected_qf.astype(bool))


def test_flag_statistical_outliers():
"""Tests flag_statistical_outliers function."""
energy_range_edges = np.array([3, 5, 7, 18, 25]) # Example energy bin edges
n_spin_bins = 12
spin_step = 7
energy = np.full(spin_step * n_spin_bins, 0)
# Make sure there are at least 3 other bins with counts in each energy bin so that
# the statistics can be calculated.
energy[::spin_step] = 3
energy[1::spin_step] = 5
energy[2::spin_step] = 7
energy[3::spin_step] = 18
# Make the last spin bin have higher counts. It should get flagged as an outlier.
energy[-spin_step:] = 23

de_dataset = xr.Dataset(
{
"de_event_met": ("epoch", np.arange(len(energy))),
"energy_spacecraft": ("epoch", energy),
"quality_outliers": ("epoch", np.full(len(energy), 0)),
"quality_scattering": ("epoch", np.full(len(energy), 0)),
"ebin": ("epoch", np.full(len(energy), 10)),
}
)
spin_tbin_edges = np.arange(
start=0, stop=len(energy) + 1, step=spin_step
) # create spin bins of 7 seconds
quality_flags, convergence, iterations, std_diff = flag_statistical_outliers(
de_dataset,
spin_tbin_edges,
energy_range_edges,
np.zeros((len(energy_range_edges) - 1, len(spin_tbin_edges) - 1), dtype=bool),
combine_flags_across_energy_bins=True,
)

# check shape
np.testing.assert_array_equal(
quality_flags.shape, (len(energy_range_edges) - 1, len(spin_tbin_edges) - 1)
)
# check that none of the flags are set except for the last spin bin
# since combine_flags_across_energy_bins is True, the entire last spin bin should
# be flagged even though only one energy bin had high counts
expected_flags = np.zeros(
(len(energy_range_edges) - 1, len(spin_tbin_edges) - 1), dtype=bool
)
expected_flags[:, -1] = True
np.testing.assert_array_equal(quality_flags, expected_flags)
# all energy bins should have converged
# The first 2 didn't have enough events to calculate statistics, but they should
# still be marked as converged
assert np.all(convergence)
# All energy bins should have iterated 1 time except the last one which should have
# iterated twice.
assert np.all(iterations[:-1] == 1)
assert iterations[-1] == 2
# Check that all std_diff values were set (not zero) except the last one
assert np.all(std_diff[:-1] != 0)
assert std_diff[-1] == 0


def test_flag_statistical_outliers_invalid_events():
"""Tests flag_statistical_outliers function when there are no valid events."""
energy_range_edges = np.array([3, 5, 7, 18, 25]) # Example energy bin edges
energy = np.arange(25)
de_dataset = xr.Dataset(
{
"de_event_met": ("epoch", np.arange(len(energy))),
"energy_spacecraft": ("epoch", energy),
"quality_outliers": ("epoch", np.full(len(energy), 0)),
"quality_scattering": ("epoch", np.full(len(energy), 0)),
"ebin": ("epoch", np.full(len(energy), 10)),
}
)
spin_tbin_edges = np.arange(
start=0, stop=len(energy) + 1, step=5
) # create spin bins of 5 seconds
mask = np.ones((len(energy_range_edges) - 1, len(spin_tbin_edges) - 1), dtype=bool)
quality_flags, convergence, iterations, std_diff = flag_statistical_outliers(
de_dataset,
spin_tbin_edges,
energy_range_edges,
mask,
)
# check that all flags are set because there are no valid events in any energy bin
# so it fails the stat outlier check by default.
np.testing.assert_array_equal(
quality_flags, np.ones_like(quality_flags, dtype=bool)
)
# check that all energy bins are marked as converged (no valid events is not a
# failure case for convergence since we just can't calculate statistics.
assert np.all(convergence)
# check that there were no iterations
assert np.sum(iterations) == 0
# Check that std_diff is all invalid (-1)
assert np.all(std_diff == -1)


def test_get_poisson_stats():
"""Tests get_poisson_stats function."""
counts = np.full(20, 0)
counts[-1] = 100 # Make the last bin have high counts
std_diff, outlier_mask = get_poisson_stats(counts)
# std_diff should be counts/sqrt(counts) = sqrt(counts)
assert (
std_diff == np.std(counts) / np.sqrt(5) - 1
) # std_diff should be counts/sqrt(counts) = sqrt(counts)
assert (
np.sum(outlier_mask) == 1
) # Only the last bin should be flagged as an outlier
assert outlier_mask[-1]


def test_get_energy_range_flags():
Expand Down
4 changes: 4 additions & 0 deletions imap_processing/ultra/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,7 @@ class UltraConstants:
)
# Use the channel defined below to determine which spins are contaminated
HIGH_ENERGY_CULL_CHANNEL = 4
# Number of iterations to perform for statistical outlier culling.
STAT_CULLING_N_ITER = 5
# Sigma threshold to use for statistical outlier culling.
STAT_CULLING_STD_THRESHOLD = 0.05
46 changes: 33 additions & 13 deletions imap_processing/ultra/l1b/extendedspin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import xarray as xr
from numpy.typing import NDArray

from imap_processing.quality_flags import ImapRatesUltraFlags
from imap_processing.ultra.constants import UltraConstants
from imap_processing.ultra.l1b.ultra_l1b_culling import (
count_rejected_events_per_spin,
Expand All @@ -15,6 +14,7 @@
flag_imap_instruments,
flag_low_voltage,
flag_rates,
flag_statistical_outliers,
get_binned_energy_ranges,
get_binned_spins_edges,
get_energy_histogram,
Expand Down Expand Up @@ -75,27 +75,32 @@ def calculate_extendedspin(
spin, spin_period, spin_starttime, spin_bin_size
)
voltage_qf = flag_low_voltage(spin_tbin_edges, status_dataset)
voltage_qf = expand_bin_flags_to_spins(len(spin), voltage_qf, spin_bin_size)
# Get energy bins used at l1c
intervals, _, _ = build_energy_bins()
# Get the energy ranges
energy_ranges = get_binned_energy_ranges(intervals)
energy_bin_flags = get_energy_range_flags(energy_ranges)
# Calculate the high energy quality flags using the de dataset with low voltage
# events removed. Use the same spin and energy bins that
# were used for low voltage flags to maintain consistency in the flags.
valid_voltage_spins = spin[np.where(voltage_qf == 0)]
valid_de_spins = np.isin(de_dataset["spin"].values, valid_voltage_spins)
de_dataset_filtered = de_dataset.isel(epoch=valid_de_spins)
# Calculate the high energy quality flags
energy_thresholds = UltraConstants.HIGH_ENERGY_CULL_THRESHOLDS
high_energy_qf = flag_high_energy(
de_dataset_filtered,
de_dataset,
spin_tbin_edges,
energy_ranges,
energy_bin_flags,
voltage_qf,
energy_thresholds,
instrument_id,
)
# Combine high energy and voltage flags to use for statistical outlier flagging.
mask = (
voltage_qf[np.newaxis, :] | high_energy_qf
) # Shape (n_energy_bins, n_spins_bins)
stat_outliers_qf, _, _, _ = flag_statistical_outliers(
de_dataset,
spin_tbin_edges,
energy_ranges,
mask,
instrument_id,
)
# Get the number of pulses per spin.
pulses = get_pulses_per_spin(aux_dataset, rates_dataset)

Expand Down Expand Up @@ -132,8 +137,25 @@ def calculate_extendedspin(
stop_per_spin[valid] = pulses.stop_per_spin[idx[valid]]
coin_per_spin[valid] = pulses.coin_per_spin[idx[valid]]

# high energy and statistical outlier flags are energy dependent boolean arrays
# with shape (n_energy_bins, n_spin_bins). We want to collapse the energy dimension
# using a bitwise OR to get a single boolean flag per spin.
high_energy_qf = np.bitwise_or.reduce(
high_energy_qf * energy_bin_flags[:, np.newaxis], axis=0
)
stat_outliers_qf = np.bitwise_or.reduce(
stat_outliers_qf * energy_bin_flags[:, np.newaxis], axis=0
)
# Low voltage flag is shape (n_spin_bins,) but we want to convert from a boolean
# to a bitwise flag to be consistent with the other flags, where each spin that
# is flagged will have the bitflag of all the energy flags combined.
voltage_qf = voltage_qf * np.bitwise_or.reduce(energy_bin_flags)
# Expand binned quality flags to individual spins.
high_energy_qf = expand_bin_flags_to_spins(len(spin), high_energy_qf, spin_bin_size)
voltage_qf = expand_bin_flags_to_spins(len(spin), voltage_qf, spin_bin_size)
stat_outliers_qf = expand_bin_flags_to_spins(
len(spin), stat_outliers_qf, spin_bin_size
)
# account for rates spins which are not in the direct event spins
extendedspin_dict["start_pulses_per_spin"] = start_per_spin
extendedspin_dict["stop_pulses_per_spin"] = stop_per_spin
Expand All @@ -146,9 +168,7 @@ def calculate_extendedspin(
extendedspin_dict["quality_low_voltage"] = voltage_qf # shape (nspin,)
# TODO calculate flags for high energy (SEPS) and statistics culling
# Initialize these flags to NONE for now.
extendedspin_dict["quality_statistics"] = np.full_like(
voltage_qf, ImapRatesUltraFlags.NONE.value, np.uint16
) # shape (nspin,)
extendedspin_dict["quality_statistics"] = stat_outliers_qf # shape (nspin,)
extendedspin_dict["quality_high_energy"] = high_energy_qf # shape (nspin,)
# Add an array of flags for each energy bin. Shape: (n_energy_bins)
extendedspin_dict["energy_range_flags"] = energy_bin_flags
Expand Down
Loading