Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions stixpy/product/product.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np
from astropy.time import Time

__all__ = ["BaseProduct", "GenericProduct", "LevelBinary", "L1Product", "Level2"]
Expand Down Expand Up @@ -76,14 +77,14 @@ class L1Product(GenericProduct):

def __init__(self, **kwargs):
super().__init__(**kwargs)
meta = kwargs["meta"]
data = kwargs["data"]

# TODO don't change the data add new property or similar
@property
def time(self):
try:
data["time"] = Time(meta["date-obs"]) + data["time"]
time = Time(self.meta["date-obs"]) + np.atleast_1d(self.data["time"])
except KeyError:
data["time"] = Time(meta["date_obs"]) + data["time"]
time = Time(self.meta["date_obs"]) + np.atleast_1d(self.data["time"])
return time


class Level2(GenericProduct):
Expand All @@ -96,5 +97,5 @@ def __init__(self, **kwargs):
def is_datasource_for(cls, *, meta, **kwargs):
"""Determines if meta data meach Raw Pixel Data"""
level = meta["level"]
if level == "LB":
if level == "L2":
return True
156 changes: 150 additions & 6 deletions stixpy/product/sources/science.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Union, Callable
from itertools import product
from collections import defaultdict

Expand Down Expand Up @@ -31,9 +32,48 @@
"EnergyEdgeMasks",
]

from stixpy.utils.time import times_to_indices

quantity_support()

# fmt: off
PIXEL_MAPPING = {
"all": slice(0, 12), # All pixels (top, bottom and small)
"big": slice(0, 8), # All big pixel top + bot
"top": slice(0, 4), # Top row pixels only
"bottom": slice(4, 8), # Bottom row pixels only
"small": slice(8, 12), # Small, middle row pixels only
}


DETECTOR_MAPPING = {
"all": slice(0, 32), # all detectors
"imaging": [2, 19, 21, 15, 13, 31, 20, 25, 3, 23, 7, 27, 14, 26, 30, 5, 29, 1, 24, 4, 22, 6, 28, 0,], # 30 imaging detectors (excluding BKG and CFL)
"imaging_24": [2, 19, 21, 15, 13, 31, 20, 25, 3, 23, 7, 27, 14, 26, 30, 5, 29, 1,], # 24 standard (not fine) imaging detectors
"imaging_fine": [4, 22, 6, 28, 0], # 6 finest imaging detectors
"cfl": 9, # Only CLF detector
"bkg": 8, # Only BKG detector
}
# fmt: on


def parse_time(times):
pass


def parse_energy():
pass


def parse_detectors(detectors):
if isinstance(detectors, str):
pass


def parse_pixels(pixels):
if isinstance(pixels, str):
return PIXEL_MAPPING.get(pixels, None)


class PPrintMixin:
"""
Expand Down Expand Up @@ -67,6 +107,7 @@ class IndexMasks(PPrintMixin):
"""

def __init__(self, mask_array):
mask_array = np.atleast_2d(mask_array)
masks = np.unique(mask_array, axis=0)
indices = [np.argwhere(np.all(mask_array == mask, axis=1)).reshape(-1) for mask in masks]
self.masks = masks
Expand Down Expand Up @@ -475,17 +516,15 @@ def __init__(self, *, meta, control, data, energies, idb_versions=None):
if "pixel_masks" in self.data.colnames:
self.pixel_masks = PixelMasks(self.data["pixel_masks"])
if "energy_bin_edge_mask" in self.control.colnames:
self.energy_masks = EnergyEdgeMasks(self.control["energy_bin_edge_mask"])
self.energy_masks = EnergyEdgeMasks(self.control["energy_bin_edge_mask"].squeeze())
self.dE = energies["e_high"] - energies["e_low"]

@property
def time_range(self):
"""
A `sunpy.time.TimeRange` for the data.
"""
return TimeRange(
self.data["time"][0] - self.data["timedel"][0] / 2, self.data["time"][-1] + self.data["timedel"][-1] / 2
)
return TimeRange(self.time[0] - self.data["timedel"][0] / 2, self.time[-1] + self.data["timedel"][-1] / 2)

@property
def pixels(self):
Expand Down Expand Up @@ -516,14 +555,117 @@ def times(self):
return self.data["time"]

@property
def durtaion(self):
def duration(self):
"""
An `astropy.units.Quantiy` array giving the duration or integration time
"""
return self.data["timedel"]

def crop_by_value(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def crop_by_value(
def crop_by_values(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this function take high-level objects, e.g. Time, SkyCord, SpectralCoord? Or just lower level Quantity?

Copy link
Member Author

@samaloney samaloney May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Low level objects plus a few strings for different pixel/detector combos e.g cpd.crop_by_vales([Time(...), Time(...)], None, 'big', [4, 10]*u.keV). Since the axis have well defined should be easy to validate.

self,
values,
):
r"""
Crop the data by a values e.g time, energy.

Parameters
----------
values

Returns
-------

Examples
--------
>>> cpd.crop_by_value()

"""

def rebin(self, *indices, operator: Callable = np.sum):
r"""
Rebin the data using the given indices.

Parameters
----------
operator :
indices :

Returns
-------

Examples
--------
cpd.rebin(None, None, None, [1, 5])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good for this to follow the NDCube.rebin API as much as makes sense. In that case, rebin should take:

  • integers which are an integer factor of the length of the axis to which it corresponds, or:
  • a sequence of indices representing the bin edges along that axis.

In this case, the equivalent API for no rebinning should be 1, not None.

Alternatively, this method could be renamed to avoid confusion with NDCube.rebin to something like rebin_by_edges. In that case, it could take None or a sequence of indices for each axis.

cpd.rebin(None, None, None, [[1, 5], [5, 10]])
cpd.rebin([0:-10], None, None, [[1, 5], [5, 10]])


"""

def rebin_by_vales(self, *values, operator: Callable = np.sum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def rebin_by_vales(self, *values, operator: Callable = np.sum):
def rebin_by_values(self, *values, operator: Callable = np.sum):

r"""
Rebin the data using the given values.

Parameters
----------
values

Returns
-------

Examples
--------
cpd.rebin_by_vales('all', 'imaging', 'big', [4, 10]*u.keV)
cpd.rebin_by_vales([t1, t2]', 'imaging', 'big', [[4, 10], [10, 15]]*u.keV)
cpd.rebin_by_vales([[t1, t2], [t2, t3]]', 'imaging', 'big', [[4, 10], [10, 15]]*u.keV)


"""

def __getitem__(self, item: Union[int, tuple[int, ...], slice]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __getitem__(self, item: Union[int, tuple[int, ...], slice]):
def __getitem__(self, item: Union[int, tuple[int, slice, ...], slice]):

r"""Slicing"""
if isinstance(item, int):
# single
data = self.data[item : item + 1].copy()
energies = self.energies.copy()
meta = self.meta.copy()
idb = self.idb_versions.copy()
ci = data["control_index"]
control = self.control[np.argwhere(self.control["index"] == ci)].copy()
return type(self)(meta=meta, control=control, data=data, energies=energies, idb_versions=idb)
elif isinstance(item, tuple):
time_slice = item[0]
missing_dims = len(self.data["counts"].shape) - len(item)
count_slice = list(item[1:]) + [None] * missing_dims
data = self.data[time_slice].copy()
data["counts"] = data["counts"][*count_slice]
energies = self.energies.copy()
meta = self.meta.copy()
idb = self.idb_versions.copy()
ci = set(data["control_index"])
control = self.control[np.argwhere(np.isin(self.control["index"], list(ci)))].copy()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this work but really need to rename and add a few more properties for this to make sense. Convert all the masks to be named _mask or _requested and then add some more props for detectors, pixels, energies which would indicate what contained in the object which would be less/different to the request as slice, rebin or crops.

return type(self)(meta=meta, control=control, data=data, energies=energies, idb_versions=idb)

elif isinstance(item, slice):
# Slice
data = self.data[item].copy()
energies = self.energies.copy()
meta = self.meta.copy()
idb = self.idb_versions.copy()
ci = set(data["control_index"])
control = self.control[np.argwhere(np.isin(self.control["index"], list(ci)))].copy()
return type(self)(meta=meta, control=control, data=data, energies=energies, idb_versions=idb)
else:
raise TypeError(f"Index must be int, not {type(item)}.")

def get_data(
self, time_indices=None, energy_indices=None, detector_indices=None, pixel_indices=None, sum_all_times=False
self,
time=None,
time_indices=None,
energy_indices=None,
detector_indices=None,
pixel_indices=None,
sum_all_times=False,
):
"""
Return the counts, errors, times, durations and energies for selected data.
Expand Down Expand Up @@ -633,6 +775,8 @@ def get_data(
energies = QTable(energies * u.keV, names=["e_low", "e_high"])

t_norm = self.data["timedel"]
if time is not None:
time_indices = times_to_indices(time, times)
if time_indices is not None:
time_indices = np.asarray(time_indices)
if time_indices.ndim == 1:
Expand Down
3 changes: 3 additions & 0 deletions stixpy/utils/time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import astropy.units as u
import numpy as np
from astropy.time import Time
from astropy.units import Quantity
from numpy.testing import assert_equal


Expand All @@ -22,6 +23,8 @@ def times_to_indices(in_times, obs_times, unit=u.ms, decimals=3):
-------
Array of indices
"""
if not isinstance(in_times, (Time, Quantity)):
return in_times
unique_only = False
shape = None
relative_times = np.around((obs_times - obs_times[0]).to(unit), decimals=decimals)
Expand Down