Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/196.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add ability to control the data normalisation to `~stixpy.product.sources.science.ScienceData.get_data` via the `vtype` keyword:
* 'c' - count [c]
* 'cr' - count rate [c/s]
* 'dcr' - differential count rate [c/(s keV)]
* 'dcrf' - differential count rate flux (geometric area) [c/(s keV cm^2)]
74 changes: 65 additions & 9 deletions stixpy/product/sources/science.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from matplotlib.widgets import Slider
from sunpy.time.timerange import TimeRange

from stixpy.config.instrument import STIX_INSTRUMENT
from stixpy.io.readers import read_subc_params
from stixpy.product.product import L1Product

Expand Down Expand Up @@ -149,6 +150,7 @@ class SpectrogramPlotMixin:
def plot_spectrogram(
self,
axes=None,
vtype="dcr",
time_indices=None,
energy_indices=None,
detector_indices="all",
Expand Down Expand Up @@ -213,7 +215,11 @@ def plot_spectrogram(
pid = pixel_indices

counts, errors, times, timedeltas, energies = self.get_data(
detector_indices=did, pixel_indices=pid, time_indices=time_indices, energy_indices=energy_indices
vtype=vtype,
detector_indices=did,
pixel_indices=pid,
time_indices=time_indices,
energy_indices=energy_indices,
)
counts = counts.to(u.ct / u.s / u.keV)
errors = errors.to(u.ct / u.s / u.keV)
Expand Down Expand Up @@ -257,6 +263,7 @@ class TimesSeriesPlotMixin:

def plot_timeseries(
self,
vtype="dcr",
time_indices=None,
energy_indices=None,
detector_indices="all",
Expand All @@ -270,8 +277,12 @@ def plot_timeseries(

Parameters
----------
axes : optional `matplotlib.axes`
The axes the plot the spectrogram.
vtype : str
Type of value to return control the default normalisation:
* 'c' - count [c]
* 'cr' - count rate [c/s]
* 'dcr' - differential count rate [c/(s keV)]
* 'dcrf' - differential count rate flux (geometric area) [c/(s keV cm^2)]
time_indices : `list` or `numpy.ndarray`
If an 1xN array will be treated as mask if 2XN array will sum data between given
indices. For example `time_indices=[0, 2, 5]` would return only the first, third and
Expand All @@ -288,6 +299,8 @@ def plot_timeseries(
If an 1xN array will be treated as mask if 2XN array will sum data between given
indices. For example `pixel_indices=[0, 2, 5]` would return only the first, third and
sixth pixels while `pixel_indices=[[0, 2],[3, 5]]` would sum the data between.
axes : optional `matplotlib.axes`
The axes the plot the spectrogram.
error_bar : optional `bool`
Add error bars to plot.
**plot_kwargs : `dict`
Expand Down Expand Up @@ -348,12 +361,18 @@ class PixelPlotMixin:
Pixel plot mixin providing pixel plotting for pixel data.
"""

def plot_pixels(self, *, kind="pixels", time_indices=None, energy_indices=None, fig=None, cmap=None):
def plot_pixels(self, *, vtype="dcr", kind="pixels", time_indices=None, energy_indices=None, fig=None, cmap=None):
"""
Plot individual pixel data for each detector.

Parameters
----------
vtype : str
Type of value to return control the default normalisation:
* 'c' - count [c]
* 'cr' - count rate [c/s]
* 'dcr' - differential count rate [c/(s keV)]
* 'dcrf' - differential count rate flux (geometric area) [c/(s keV cm^2)]
kind : `string` the options: 'pixels', 'errorbar', 'config'
This sets the visualization type of the subplots. The data will then be shown in the selected style.
time_indices : `list` or `numpy.ndarray`
Expand Down Expand Up @@ -387,7 +406,9 @@ def plot_pixels(self, *, kind="pixels", time_indices=None, energy_indices=None,
else:
fig, axes = plt.subplots(nrows=4, ncols=8, sharex=True, sharey=True, figsize=(7, 7))

counts, count_err, times, dt, energies = self.get_data(time_indices=time_indices, energy_indices=energy_indices)
counts, count_err, times, dt, energies = self.get_data(
vtype=vtype, time_indices=time_indices, energy_indices=energy_indices
)

imaging_mask = np.ones(32, bool)
imaging_mask[8:10] = False
Expand Down Expand Up @@ -854,15 +875,28 @@ def duration(self):
return self.data["timedel"]

def get_data(
self, time_indices=None, energy_indices=None, detector_indices=None, pixel_indices=None, sum_all_times=False
self,
*,
vtype="dcr",
time_indices=None,
energy_indices=None,
detector_indices=None,
pixel_indices=None,
sum_all_times=False,
):
"""
r"""
Return the counts, errors, times, durations and energies for selected data.

Optionally summing in time and or energy.

Parameters
----------
vtype : str
Type of value to return control the default normalisation:
* 'c' - count [c]
* 'cr' - count rate [c/s]
* 'dcr' - differential count rate [c/(s keV)]
* 'dcrf' - differential count rate flux (geometric area) [c/(s keV cm^2)]
time_indices : `list` or `numpy.ndarray`
If an 1xN array will be treated as mask if 2XN array will sum data between given
indices. For example `time_indices=[0, 2, 5]` would return only the first, third and
Expand Down Expand Up @@ -998,14 +1032,36 @@ def get_data(
counts_var = np.sum(counts_var, axis=0, keepdims=True)
t_norm = np.sum(dt)

t_norm = t_norm.to("s")

if e_norm.size != 1:
e_norm = e_norm.reshape(1, 1, 1, -1)

if t_norm.size != 1:
t_norm = t_norm.reshape(-1, 1, 1, 1)

counts_err = np.sqrt(counts * u.ct + counts_var) / (e_norm * t_norm)
counts = counts / (e_norm * t_norm)
pixel_areas = STIX_INSTRUMENT.pixel_config["Area"].to("cm2")
a_norm = []
for pixel_mask in self.data["pixel_masks"]:
indices = np.nonzero(pixel_mask)
areas = np.full(12, 0 * u.cm**2)
areas[indices] = pixel_areas[indices].value
Comment on lines +1044 to +1048
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a_norm is built from self.data['pixel_masks'] for all original time steps, then reshaped to t_norm.size. If time_indices reduces or merges time bins, len(self.data['pixel_masks']) can differ from t_norm.size, causing reshape/broadcasting errors for vtype='dcrf' or misaligned areas. Slice/aggregate pixel_masks using the same time selection logic as counts/t_norm (e.g., index masks when filtering; for merged ranges, compute a dt-weighted mean or representative area per output bin) before reshaping.

Suggested change
a_norm = []
for pixel_mask in self.data["pixel_masks"]:
indices = np.nonzero(pixel_mask)
areas = np.full(12, 0 * u.cm**2)
areas[indices] = pixel_areas[indices].value
# Aggregate pixel masks according to time_indices logic
pixel_masks = self.data["pixel_masks"]
if time_indices is not None:
if time_indices.ndim == 1:
pixel_masks = [pixel_masks[i] for i in time_indices]
elif time_indices.ndim == 2:
# For each merged bin, aggregate pixel masks (dt-weighted mean)
pixel_masks_agg = []
for idx, (tl, th) in enumerate(time_indices):
# Get time durations for weighting
dt_bin = self.data["timedel"][tl : th + 1].to("s").value
masks_bin = np.array(pixel_masks[tl : th + 1])
# Weighted mean mask (float, but we want area)
weighted_mask = np.average(masks_bin, axis=0, weights=dt_bin)
# For each pixel, if mask is >0.5, treat as active (or use weighted area)
pixel_masks_agg.append(weighted_mask)
pixel_masks = pixel_masks_agg
elif t_norm.size == 1 and len(pixel_masks) > 1:
# If all times summed, aggregate all masks (dt-weighted mean)
dt_bin = self.data["timedel"].to("s").value
masks_bin = np.array(pixel_masks)
weighted_mask = np.average(masks_bin, axis=0, weights=dt_bin)
pixel_masks = [weighted_mask]
# Build a_norm from pixel_masks
a_norm = []
for pixel_mask in pixel_masks:
indices = np.nonzero(pixel_mask)
areas = np.full(12, 0 * u.cm**2)
# If mask is float (from weighted mean), use mask value as fraction of area
if np.issubdtype(pixel_mask.dtype, np.floating):
areas = pixel_areas.value * pixel_mask
else:
areas[indices] = pixel_areas[indices].value

Copilot uses AI. Check for mistakes.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is an interesting case if we sum on ground over a RCR change what area should we return the average or mask it out because we don't know?

Copy link

@jajmitchell jajmitchell Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could sum over each RCR state separately and then average them in the bin once normalised?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially but as was discussed at the STIX meeting data at directly before/after are not reliable so I think will set all values for this time to be masked.

a_norm.append(areas)
a_norm = np.vstack(a_norm).reshape(t_norm.size, 1, -1, 1) * u.cm**2

if vtype == "c":
norm = 1
elif vtype == "cr":
norm = 1 / t_norm
elif vtype == "dcr":
norm = 1 / (e_norm * t_norm)
elif vtype == "dcrf":
norm = 1 / (e_norm * t_norm * a_norm)
else:
raise ValueError("Unknown vtype must be one of 'c', 'cr', or 'dcr', 'dcrf'.")

counts_err = np.sqrt(counts * u.ct + counts_var) * norm
counts = counts * norm

return counts, counts_err, times, t_norm, energies

Expand Down
45 changes: 30 additions & 15 deletions stixpy/product/sources/tests/test_science.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,67 +45,71 @@ def spec():
@pytest.mark.remote_data
def test_sciencedata_get_data(cpd):
tot = cpd.data["counts"]
norm = cpd.data["timedel"].reshape(5, 1, 1, 1) * cpd.dE
norm = cpd.data["timedel"].reshape(5, 1, 1, 1).to(u.s) * cpd.dE
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"] ** 2) / norm
r, re, t, dt, e = cpd.get_data()
r, re, t, dt, e = cpd.get_data(vtype="dcr")
assert_allclose(rate, r)
assert_allclose(error, re)

# Detector sum
tot = cpd.data["counts"][:, 0:32, ...].sum(axis=1, keepdims=True)
norm = cpd.data["timedel"].reshape(5, 1, 1, 1) * cpd.dE
norm = cpd.data["timedel"].reshape(5, 1, 1, 1).to(u.s) * cpd.dE
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][:, 0:32, ...].sum(axis=1, keepdims=True) ** 2) / norm
r, re, t, dt, e = cpd.get_data(detector_indices=[[0, 31]])
r, re, t, dt, e = cpd.get_data(vtype="dcr", detector_indices=[[0, 31]])
assert_allclose(rate, r)
assert_allclose(error, re, atol=1e-3)

# Pixel sum
tot = cpd.data["counts"][..., 0:12, :].sum(axis=2, keepdims=True)
norm = cpd.data["timedel"].reshape(5, 1, 1, 1) * cpd.dE
norm = cpd.data["timedel"].reshape(5, 1, 1, 1).to(u.s) * cpd.dE
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][..., 0:12, :].sum(axis=2, keepdims=True) ** 2) / norm
r, re, t, dt, e = cpd.get_data(pixel_indices=[[0, 11]])
r, re, t, dt, e = cpd.get_data(vtype="dcr", pixel_indices=[[0, 11]])
assert_allclose(rate, r)
assert_allclose(error, re)

# Detector and Pixel sum
tot = cpd.data["counts"][:, 0:32, 0:12, :].sum(axis=(1, 2), keepdims=True)
norm = cpd.data["timedel"].reshape(5, 1, 1, 1) * cpd.dE
norm = cpd.data["timedel"].reshape(5, 1, 1, 1).to(u.s) * cpd.dE
rate = tot / norm
error = (
np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][:, 0:32, 0:12, :].sum(axis=(1, 2), keepdims=True) ** 2) / norm
)
r, re, t, dt, e = cpd.get_data(pixel_indices=[[0, 11]], detector_indices=[[0, 31]])
r, re, t, dt, e = cpd.get_data(vtype="dcr", pixel_indices=[[0, 11]], detector_indices=[[0, 31]])
assert_allclose(rate, r)
assert_allclose(error, re, atol=1e-3)

# Energy sum
tot = cpd.data["counts"][..., 1:31].sum(axis=3, keepdims=True)
norm = cpd.data["timedel"].reshape(5, 1, 1, 1) * (cpd.energies[30]["e_high"] - cpd.energies[1]["e_low"])
norm = cpd.data["timedel"].reshape(5, 1, 1, 1).to(u.s) * (cpd.energies[30]["e_high"] - cpd.energies[1]["e_low"])
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][..., 1:31].sum(axis=3, keepdims=True) ** 2) / norm
r, re, t, dt, e = cpd.get_data(energy_indices=[[1, 30]])
r, re, t, dt, e = cpd.get_data(vtype="dcr", energy_indices=[[1, 30]])
assert_allclose(rate, r)
assert_allclose(error, re, atol=1e-3)

# Time sum
tot = cpd.data["counts"][:, ...].sum(axis=0, keepdims=True)
norm = cpd.data["timedel"].sum() * cpd.dE
norm = cpd.data["timedel"].sum().to(u.s) * cpd.dE
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][:, ...].sum(axis=0, keepdims=True) ** 2) / norm
r, re, t, dt, e = cpd.get_data(time_indices=[[0, 4]])
r, re, t, dt, e = cpd.get_data(vtype="dcr", time_indices=[[0, 4]])
assert_allclose(rate, r)
assert_allclose(error, re)

# Sum everything down to one number
tot = cpd.data["counts"][..., 1:31].sum(keepdims=True)
norm = cpd.data["timedel"].sum() * (cpd.energies[30]["e_high"] - cpd.energies[1]["e_low"])
norm = cpd.data["timedel"].sum().to(u.s) * (cpd.energies[30]["e_high"] - cpd.energies[1]["e_low"])
rate = tot / norm
error = np.sqrt(tot * u.ct + cpd.data["counts_comp_err"][..., 1:31].sum(keepdims=True) ** 2) / norm
r, re, t, dt, e = cpd.get_data(
time_indices=[[0, 4]], energy_indices=[[1, 30]], pixel_indices=[[0, 11]], detector_indices=[[0, 31]]
vtype="dcr",
time_indices=[[0, 4]],
energy_indices=[[1, 30]],
pixel_indices=[[0, 11]],
detector_indices=[[0, 31]],
)
assert_allclose(rate, r)
assert_allclose(error, re, atol=1e-3)
Expand All @@ -121,7 +125,9 @@ def test_sciencedata_get_data(cpd):
cpd.energies["e_high"][1:-1] = ((np.arange(31) / 30)[1:] * u.keV).astype(np.float32)
cpd.energies["e_low"][1:-1] = ((np.arange(31) / 30)[:-1] * u.keV).astype(np.float32)
cpd.data["time"] = cpd.time_range.start + np.arange(5) / 5 * u.s
count, count_err, times, timedel, energies = cpd.get_data(time_indices=[[0, 4]], energy_indices=[[1, 30]])
count, count_err, times, timedel, energies = cpd.get_data(
vtype="dcr", time_indices=[[0, 4]], energy_indices=[[1, 30]]
)
assert_allclose(count, 1 * u.ct / (u.s * u.keV))
assert_allclose(count_err, np.sqrt(2) * u.ct / (u.s * u.keV)) # 1 + 1

Expand Down Expand Up @@ -186,6 +192,15 @@ def test_spectrogram(spec):
assert spec.data["counts"].shape == (num_times, num_energies - 1)


@pytest.mark.remote_data
@pytest.mark.parametrize(
"vtype, unit", [("c", "ct"), ("cr", "ct / s"), ("dcr", "ct / (s keV)"), ("dcrf", "ct / (s keV cm**2)")]
)
def test_get_data_vtype(cpd, vtype, unit):
data, *_ = cpd.get_data(vtype=vtype)
assert data.unit == u.Unit(unit)


@pytest.mark.remote_data
def test_cpd_plot_pixel(cpd):
cpd.plot_pixels()
Expand Down
Loading