Skip to content

Hextof lab loader #534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ cdeform
cdeformfield
cdisp
centroidnn
cfel
CFEL
chessy
clim
cmap
Expand All @@ -63,6 +65,7 @@ cryo
cstart
cstep
csvfile
cumsum
custom-dictionary
cval
cvdist
Expand Down Expand Up @@ -169,6 +172,7 @@ joblib
jpars
jupyterlab
kernelspec
kmic
kmodem
KTOF
kwds
Expand Down
11 changes: 7 additions & 4 deletions src/sed/config/flash_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ core:
beamtime_id: 11019101
# the year of the beamtime
year: 2023
# the instrument used
instrument: hextof # hextof, wespe, etc
# The paths to the raw and parquet data directories. If these are not
# provided, the loader will try to find the data based on year beamtimeID etc
# paths:
Expand All @@ -32,6 +30,7 @@ core:
# (Not to be changed by user)
beamtime_dir:
pg2: "/asap3/flash/gpfs/pg2/"
cfel: "/asap3/fs-flash-o/gpfs/hextof/"

binning:
# Histogram computation mode to use.
Expand Down Expand Up @@ -60,6 +59,11 @@ dataframe:
# Columns used for jitter correction
jitter_cols: [dldPosX, dldPosY, dldTimeSteps]

# The index and formats of the data
index: [trainId, pulseId, electronId]
formats: [per_train, per_pulse, per_electron]
fill_formats: [per_train, per_pulse] # Channels with this format will be forward filled

# Column settings
columns:
x: dldPosX
Expand Down Expand Up @@ -211,8 +215,7 @@ dataframe:

# metadata collection from scicat
# metadata:
# scicat_url: <URL>
# scicat_token: <TOKEN>
# archiver_url: <URL>

# The nexus collection routine shall be finalized soon for both instruments
nexus:
Expand Down
160 changes: 160 additions & 0 deletions src/sed/config/lab_example_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# This file contains the default configuration for the flash loader.

core:
# defines the loader
loader: cfel
# Since this will run on maxwell most probably, we have a lot of cores at our disposal
num_cores: 10
# the ID number of the beamtime
beamtime_id: 11021732
# the year of the beamtime
year: 2025

# The paths to the raw and parquet data directories. If these are not
# provided, the loader will try to find the data based on year beamtimeID etc
paths:
# location of the raw data.
raw: "/asap3/fs-flash-o/gpfs/hextof/2025/data/11021732/raw/"
# location of the intermediate parquet files.
processed: "."

# The beamtime directories for different DAQ systems.
# (Not to be changed by user)
beamtime_dir:
pg2: "/asap3/flash/gpfs/pg2/"
cfel: "/asap3/fs-flash-o/gpfs/hextof/"


dataframe:
daq: fl1user3 # DAQ system name to resolve filenames/paths
ubid_offset: 5 # Offset correction to the pulseId
forward_fill_iterations: 0 # Number of iterations to fill the pulseId forward
split_sector_id_from_dld_time: True # Remove reserved bits for dldSectorID from dldTimeSteps column
sector_id_reserved_bits: 3 # Bits reserved for dldSectorID in the dldTimeSteps column
sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] # Sector delays

first_event_time_stamp_key: /ScanParam/StartTime
ms_markers_key: /SlowData/exposure_time

# Time and binning settings
tof_binwidth: 2.0576131995767355E-11 # Base time-of-flight bin width in seconds
tof_binning: 8 # Binning parameter for time-of-flight data

# Columns used for jitter correction
index: [countId]
jitter_cols: [dldPosX, dldPosY, dldTimeSteps]
formats: [per_file, per_train, per_electron]
fill_formats: [per_train] # Channels with this format will be forward filled

# Column settings
columns:
x: dldPosX
corrected_x: X
kx: kx
y: dldPosY
corrected_y: Y
ky: ky
tof: dldTimeSteps
tof_ns: dldTime
corrected_tof: tm
timestamp: timeStamp
auxiliary: dldAux
sector_id: dldSectorID
delay: delayStage
corrected_delay: pumpProbeTime

units:
# These are the units of the columns
dldPosX: 'step'
dldPosY: 'step'
dldTimeSteps: 'step'
tof_voltage: 'V'
extractorVoltage: 'V'
extractorCurrent: 'A'
cryoTemperature: 'K'
sampleTemperature: 'K'
dldTime: 'ns'
delay: 'ps'
timeStamp: 's'
energy: 'eV'
E: 'eV'
kx: '1/A'
ky: '1/A'

# The channels to load.
# channels have the following structure:
# <channelAlias>:
# format: per_pulse/per_electron/per_train
# index_key: the hdf5 index key
# dataset_key: the hdf5 dataset key
# slice: int to slice a multidimensional data along axis=1. If not defined, there is no slicing
# dtype: the datatype of the data
# subChannels: further aliases for if the data is multidimensional and needs to be split in different cols
# used currently for the auxiliary channel
# <subChannelAlias>:
# slice: int to slice a multidimensional data along axis=1. Must be defined
# dtype: the datatype of the data

channels:
# event key
countId:
format: per_file
dataset_key: /DLD/NumOfEvents
# detector x position
dldPosX:
format: per_electron
dataset_key: /DLD/DLD/xPos
# dtype: uint32

# detector y position
dldPosY:
format: per_electron
dataset_key: /DLD/DLD/yPos
# dtype: uint32

# Detector time-of-flight channel
# if split_sector_id_from_dld_time is set to True, This this will generate
# also the dldSectorID channel
dldTimeSteps:
format: per_electron
dataset_key: /DLD/DLD/times
# dtype: uint32

# The auxiliary channel has a special structure where the group further contains
# a multidimensional structure so further aliases are defined below
dldAux:
format: per_train
dataset_key: "/SlowData/hextof/dld/info/Aux"
sub_channels:
sampleBias:
slice: 0
dtype: float32
tofVoltage:
slice: 1
dtype: float64
extractorVoltage:
slice: 2
extractorCurrent:
slice: 3
cryoTemperature:
slice: 4
sampleTemperature:
slice: 5
dldTimeBinSize:
slice: 15

vuRead:
format: per_train
dataset_key: /SlowData/hextof/logic/kmic1/Sample_VURead



# metadata collection from scicat
# metadata:
# archiver_url: <URL>

# The nexus collection routine shall be finalized soon for both instruments
# nexus:
# reader: "mpes"
# definition: "NXmpes"
# input_files: ["NXmpes_config-HEXTOF.json"]
4 changes: 3 additions & 1 deletion src/sed/core/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class CoreModel(BaseModel):
num_cores: Optional[PositiveInt] = None
year: Optional[int] = None
beamtime_id: Optional[Union[int, str]] = None
instrument: Optional[str] = None
beamline: Optional[str] = None
copy_tool: Optional[CopyToolModel] = None
stream_name_prefixes: Optional[dict] = None
Expand Down Expand Up @@ -140,6 +139,9 @@ class DataframeModel(BaseModel):
sector_id_reserved_bits: Optional[int] = None
sector_delays: Optional[Sequence[float]] = None
daq: Optional[str] = None
index: Optional[Sequence[str]] = None
formats: Optional[Union[Sequence[str], str]] = None
fill_formats: Optional[Union[Sequence[str], str]] = None
# SXP specific settings
num_trains: Optional[PositiveInt] = None
num_pulses: Optional[PositiveInt] = None
Expand Down
Empty file added src/sed/loader/cfel/__init__.py
Empty file.
106 changes: 106 additions & 0 deletions src/sed/loader/cfel/buffer_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from __future__ import annotations

import time
from pathlib import Path

import dask.dataframe as dd

from sed.core.logging import setup_logging
from sed.loader.cfel.dataframe import DataFrameCreator
from sed.loader.flash.buffer_handler import BufferFilePaths
from sed.loader.flash.buffer_handler import BufferHandler as BaseBufferHandler
from sed.loader.flash.utils import get_channels
from sed.loader.flash.utils import get_dtypes

logger = setup_logging("cfel_buffer_handler")


class BufferHandler(BaseBufferHandler):
"""
A class for handling the creation and manipulation of buffer files using DataFrameCreator.
"""

def __init__(
self,
config: dict,
) -> None:
"""
Initializes the BufferHandler.

Args:
config (dict): The configuration dictionary.
"""
super().__init__(config)

def _save_buffer_file(self, paths: dict[str, Path]) -> None:
"""Creates the electron and timed buffer files from the raw H5 file."""
logger.debug(f"Processing file: {paths['raw'].stem}")
start_time = time.time()

# Create DataFrameCreator and get get dataframe
dfc = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"])
df = dfc.df

# Save electron resolved dataframe
electron_channels = get_channels(self._config, "per_electron")
dtypes = get_dtypes(self._config, df.columns.values)
electron_df = df.dropna(subset=electron_channels).astype(dtypes).reset_index()
logger.debug(f"Saving electron buffer with shape: {electron_df.shape}")
electron_df.to_parquet(paths["electron"])

# Create and save timed dataframe
df_timed = dfc.df_timed
dtypes = get_dtypes(self._config, df_timed.columns.values)
timed_df = df_timed.astype(dtypes)
logger.debug(f"Saving timed buffer with shape: {timed_df.shape}")
timed_df.to_parquet(paths["timed"])

logger.debug(f"Processed {paths['raw'].stem} in {time.time() - start_time:.2f}s")

def process_and_load_dataframe(
self,
h5_paths: list[Path],
folder: Path,
force_recreate: bool = False,
suffix: str = "",
debug: bool = False,
remove_invalid_files: bool = False,
filter_timed_by_electron: bool = True,
) -> tuple[dd.DataFrame, dd.DataFrame]:
"""
Runs the buffer file creation process.
Does a schema check on the buffer files and creates them if they are missing.
Performs forward filling and splits the sector ID from the DLD time lazily.

Args:
h5_paths (List[Path]): List of paths to H5 files.
folder (Path): Path to the folder for processed files.
force_recreate (bool): Flag to force recreation of buffer files.
suffix (str): Suffix for buffer file names.
debug (bool): Flag to enable debug mode.):
remove_invalid_files (bool): Flag to remove invalid files.
filter_timed_by_electron (bool): Flag to filter timed data by valid electron events.

Returns:
Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes.
"""
self.filter_timed_by_electron = filter_timed_by_electron
if remove_invalid_files:
h5_paths = self._validate_h5_files(self._config, h5_paths)

self.fp = BufferFilePaths(h5_paths, folder, suffix)

if not force_recreate:
schema_set = set(
get_channels(self._config, formats="all", index=True, extend_aux=True)
+ [self._config["columns"].get("timestamp")],
)
self._schema_check(self.fp["timed"], schema_set)

self._schema_check(self.fp["electron"], schema_set)

self._save_buffer_files(force_recreate, debug)

self._get_dataframes()

return self.df["electron"], self.df["timed"]
Loading
Loading