From 6381b4dca91a6092ba2d5992e4d81aa4943fd466 Mon Sep 17 00:00:00 2001 From: jl1331 Date: Fri, 12 Dec 2025 12:31:55 -0500 Subject: [PATCH 1/2] first draft of project architecture for central conversion of waveforms --- central-wfs | 1 + 1 file changed, 1 insertion(+) create mode 160000 central-wfs diff --git a/central-wfs b/central-wfs new file mode 160000 index 0000000..fdb66f9 --- /dev/null +++ b/central-wfs @@ -0,0 +1 @@ +Subproject commit fdb66f9992fd6047d0a774ec84e4b9bd26917c32 From 5c66a6deb3f3ad78556d97fae41b8cda9bb17541 Mon Sep 17 00:00:00 2001 From: jl1331 Date: Fri, 12 Dec 2025 12:53:16 -0500 Subject: [PATCH 2/2] rm submodules --- central-wfs | 1 - central-wfs/README.md | 106 +++++ central-wfs/config/environment.yml | 39 ++ central-wfs/pyproject.toml | 60 +++ central-wfs/src/__init__.py | 0 central-wfs/src/__main__.py | 155 +++++++ central-wfs/src/base.py | 151 +++++++ central-wfs/src/formats/crf.py | 0 central-wfs/src/formats/csv.py | 0 central-wfs/src/formats/dicom.py | 0 central-wfs/src/formats/feather.py | 0 central-wfs/src/formats/hdf5.py | 0 central-wfs/src/formats/map.py | 36 ++ central-wfs/src/formats/parquet.py | 0 central-wfs/src/formats/wfdb.py | 0 central-wfs/src/pipeline.py | 678 ++++++++++++++++++++++++++++ central-wfs/src/qc/chs_norm.py | 3 + central-wfs/src/qc/data.py | 357 +++++++++++++++ central-wfs/src/qc/gaps.py | 64 +++ central-wfs/src/qc/ioperf.py | 177 ++++++++ central-wfs/src/registry.py | 39 ++ central-wfs/src/sites/columbia.py | 0 central-wfs/src/sites/duke.py | 0 central-wfs/src/sites/emory.py | 0 central-wfs/src/sites/mayo.py | 0 central-wfs/src/sites/mgh.py | 0 central-wfs/src/sites/mit.py | 0 central-wfs/src/sites/nationwide.py | 0 central-wfs/src/sites/pitt.py | 0 central-wfs/src/sites/seattle.py | 0 central-wfs/src/sites/tufts.py | 0 central-wfs/src/sites/ucla.py | 0 central-wfs/src/sites/ucsf.py | 0 central-wfs/src/sites/uf.py | 0 central-wfs/src/sites/uva.py | 0 central-wfs/src/utils.py | 104 +++++ 36 files changed, 1969 insertions(+), 1 deletion(-) delete mode 160000 central-wfs create mode 100644 central-wfs/README.md create mode 100644 central-wfs/config/environment.yml create mode 100644 central-wfs/pyproject.toml create mode 100644 central-wfs/src/__init__.py create mode 100644 central-wfs/src/__main__.py create mode 100644 central-wfs/src/base.py create mode 100644 central-wfs/src/formats/crf.py create mode 100644 central-wfs/src/formats/csv.py create mode 100644 central-wfs/src/formats/dicom.py create mode 100644 central-wfs/src/formats/feather.py create mode 100644 central-wfs/src/formats/hdf5.py create mode 100644 central-wfs/src/formats/map.py create mode 100644 central-wfs/src/formats/parquet.py create mode 100644 central-wfs/src/formats/wfdb.py create mode 100644 central-wfs/src/pipeline.py create mode 100644 central-wfs/src/qc/chs_norm.py create mode 100644 central-wfs/src/qc/data.py create mode 100644 central-wfs/src/qc/gaps.py create mode 100644 central-wfs/src/qc/ioperf.py create mode 100644 central-wfs/src/registry.py create mode 100644 central-wfs/src/sites/columbia.py create mode 100644 central-wfs/src/sites/duke.py create mode 100644 central-wfs/src/sites/emory.py create mode 100644 central-wfs/src/sites/mayo.py create mode 100644 central-wfs/src/sites/mgh.py create mode 100644 central-wfs/src/sites/mit.py create mode 100644 central-wfs/src/sites/nationwide.py create mode 100644 central-wfs/src/sites/pitt.py create mode 100644 central-wfs/src/sites/seattle.py create mode 100644 central-wfs/src/sites/tufts.py create mode 100644 central-wfs/src/sites/ucla.py create mode 100644 central-wfs/src/sites/ucsf.py create mode 100644 central-wfs/src/sites/uf.py create mode 100644 central-wfs/src/sites/uva.py create mode 100644 central-wfs/src/utils.py diff --git a/central-wfs b/central-wfs deleted file mode 160000 index fdb66f9..0000000 --- a/central-wfs +++ /dev/null @@ -1 +0,0 @@ -Subproject commit fdb66f9992fd6047d0a774ec84e4b9bd26917c32 diff --git a/central-wfs/README.md b/central-wfs/README.md new file mode 100644 index 0000000..9189d12 --- /dev/null +++ b/central-wfs/README.md @@ -0,0 +1,106 @@ +### Intro +Project architecture to centrally ingest CHoRUS site waveforms; identify and correct gaps; and convert to WFDB format. + +Layout mimics extensive work done [here](https://github.com/chorus-ai/chorus_waveform/tree/main) by some wonderful people. + +Below diagrams how waveform data flows with this framework, starting at site upload. + +```mermaid +--- +title: Ingestion-Conversion pipeline +--- + +flowchart TD + + A@{ shape: stadium, label: "__main__: CLI call options, summary record(s) table" } + + Z@{ shape: stadium, label: "assemble_waveforms: inventory waveform file paths" } + + B@{ shape: stadium, label: "run_benchmarks(waveform_suite_table)" } + + Y@{ shape: stadium, label: "load_signals: site/format-dependent, may be WFDB already" } + + C@{ shape: stadium, label: "waveform_summary: per channel, per chunk" } + + W@{shape: delay, label: "process_gaps: identify and convert" } + + D@{ shape: stadium, label: "write_waveforms: site/format-dependent, to WFDB" } + + E@{ shape: stadium, label: "fidelity checks: data quality, read/write operations" } + + X@{ shape: rounded, label: "site uploads data" } + + F@{ shape: database, label: "save_output_to_log: store converted metadata and data" } + + G@{ shape: rounded, label: "PerformanceCounter: memory profiler" } + + H@{ shape: lin-rect, label: "formats/ and sites/: import specific functions and methods" } + + I@{ shape: lin-rect, label: "registry: define and locate site configurations"} + + %%LINKS + A --> Z --> B --> operations + + subgraph operations + direction TB + subgraph read/write + direction TB + Y --> C --> D + end + G --> read/write + read/write --> E + I --> H --> read/write + end + X XZ@--> Z + operations oF@--> F + W -.-> read/write + + %%STYLING + oF@{ animation: fast} + XZ@{ animation: slow} +``` + +### Repo layout +``` +central_wfs/ +├── config/ # setup +│ ├── .env +│ └── environment.yml # conda/mamba install +├── src/ # Source code +│ ├── qc/ +│ │ ├── chs_norm.py # check channel mappings +│ │ ├── data.py # fidelity check on data quality +│ │ ├── gaps.py # identify and correct gaps +│ │ └── ioperf.py # monitor memory & performance +│ ├── formats/ +│ │ ├── map.py # dictionary to standardize site file extensions +│ │ ├── csv.py +│ │ ├── parquet.py +│ │ ├── wfdb.py +│ │ ├── format_n.py +│ ├── sites/ +│ │ ├── uf.py +│ │ ├── columbia.py +│ │ ├── mgh.py +│ │ ├── mit.py +│ │ └── site_n.py +│ ├── __main__.py # entry point +│ ├── __init__.py +│ ├── base.py # define site/format-specific class requirements +│ ├── registry.py # define site configs +│ └── utils.py +├── data/ +│ └── suites/ # file path tables +├── tests/ # unit/integration tests +├── docs/ # documentation +├── .gitignore +├── pyproject.toml # pip install +├── requirements.txt # python dependencies +└── README.md # project overview +``` + +#### Design notes +1. Making gap tooling central versus site-specific (currently central). +2. Abstracting site AND format class methods in preparation for potential heterogeneity of site waveform uploads. +3. Additional modifications were denoted in the neighborhood with '###' +4. Site-specific classes inherit from format-specific classes, and include a method for assembling the waveform suite table. \ No newline at end of file diff --git a/central-wfs/config/environment.yml b/central-wfs/config/environment.yml new file mode 100644 index 0000000..7532696 --- /dev/null +++ b/central-wfs/config/environment.yml @@ -0,0 +1,39 @@ +# setup: +# 1. mamba env create -f environment.yml +# 2. mamba activate central_wfs + +name: central_wfs +channels: + - conda-forge + - defaults + +dependencies: + - python=3.10 + + # SCIENTIFIC COMPUTING + - numpy + - scipy + - pandas + - polars + - pyarrow + + # DATA FORMATS + - h5py + - mne + - pyedflib + - pyyaml + - pydicom + + # SUPPORT TOOLING + - click + - tqdm + - rich + - memory_profiler + + - pip + + # SAFER WITH PIP + - pip: + - wfdb + - pydantic>=2 + - -e . \ No newline at end of file diff --git a/central-wfs/pyproject.toml b/central-wfs/pyproject.toml new file mode 100644 index 0000000..c9d790b --- /dev/null +++ b/central-wfs/pyproject.toml @@ -0,0 +1,60 @@ +# setup via: +# 1. pip install -e . + + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "central_wfs" +version = "0.1.0" +description = "Central ingestion, gap correction, and WFDB conversion pipeline for heterogeneous medical waveform data." +authors = [ + { name = "u", email = "u@site.edu" } +] +readme = "README.md" +requires-python = ">=3.10" + +dependencies = [ + + # SCIENTIFIC COMPUTING + "numpy", + "scipy", + "pandas", + "polars", + + # DATA FORMATS + "pyyaml", # config: YAML + "h5py", + "wfdb", + "pyarrow", + "pydicom", + + "pyedflib", # EDF, BDF, EDF+ + "mne", + + # CONFIG & VALIDATION + "pydantic>=2", + + # CLI / UTILITY / LOGGING + "click", + "tqdm", + "rich", + "memory_profiler", +] + +# OPTIONAL DEPENDENCIES BY DOMAIN +[project.optional-dependencies] +testing = ["pytest", "pytest-cov"] +dev = ["black", "ruff", "mypy"] +plotting = ["matplotlib"] +excel = ["openpyxl"] + +# CONSOLE ENTRYPOINT(S) FOR CLI +[project.scripts] +ingest = "src.__main__:main" + +# SETUP TOOLS +[tool.setuptools.packages.find] +where = ["src"] \ No newline at end of file diff --git a/central-wfs/src/__init__.py b/central-wfs/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/__main__.py b/central-wfs/src/__main__.py new file mode 100644 index 0000000..2af90c1 --- /dev/null +++ b/central-wfs/src/__main__.py @@ -0,0 +1,155 @@ +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/__main__.py + +import argparse +import csv +import os +import sys + +import pandas as pd +from tqdm import tqdm + +from src.pipeline import run_benchmarks + +def read_csv(file_path): + """ + Read in a csv file with a header + """ + data = [] + with open(file_path, 'r', newline='') as csvfile: + csv_reader = csv.reader(csvfile) + next(csv_reader) + for row in csv_reader: + data.append(row) + + return data + + +def init_summary_file(summary_file): + """ + Initialize the summary lists - either grab existing data from a file to be appended to or create empty lists for all + new data + """ + if os.path.exists(summary_file): + df_summary = pd.read_csv(summary_file) + try: + format_list = list(df_summary['format']) + waveform_list = list(df_summary['waveform']) + test_list = list(df_summary['test']) + result_list = list(df_summary['result']) + except: + print("Required columns not found in the waveform suite summary file") + sys.exit(1) + else: + format_list = [] + waveform_list = [] + test_list = [] + result_list = [] + + return format_list, waveform_list, test_list, result_list + + +def save_summary(format_list, waveform_list, test_list, result_list, summary_file): + """ + Save a summary of the results to a CSV file + """ + df_updated_summary = pd.DataFrame(zip(format_list, waveform_list, test_list, result_list), + columns=['format', 'waveform', 'test', 'result']) + + # Add columns for the last identifier for format and waveform + df_updated_summary['format_id'] = df_updated_summary['format'].str.split('.').str[-1] + df_updated_summary['waveform_id'] = df_updated_summary['waveform'].str.split('/').str[-1] + + # Reorder the columns + df_updated_summary = df_updated_summary[['test', 'waveform', 'waveform_id', 'format', 'format_id', 'result']] + + df_updated_summary.to_csv(summary_file, index=False) + + +def main(): + ap = argparse.ArgumentParser() + + ### + ap.add_argument('--site_name', '-r', + default=False, + help='The site name to configure ingestion, correction, and conversion to.') + + ap.add_argument('--input_record', '-r', + help='The record name to run benchmarking against') + ap.add_argument('--format_class', '-f', + help='The format to save the waveform in') + ap.add_argument('--physionet_directory', '-p', + default=None, + help='The physionet database directory to read the source waveform from') + ap.add_argument('--save_output_to_log', '-l', + default=False, + help='Save all of the benchmarking results to a log file') + ap.add_argument('--waveform_suite_table', '-s', + default=None, + help='A csv table with input_record, physionet directory, and format class for multiple files') + ap.add_argument('--waveform_suite_summary_file', '-w', + default='waveform_suite_benchmark_summary.csv', + help='Save a CSV summary of the waveform suite run to this path/file') + ap.add_argument('--test_only', + default=False, action='store_true', + help='Run only the tests, do not run the benchmarks') + ap.add_argument('--memory_profiling', '-m', + default=False, type=bool, action=argparse.BooleanOptionalAction, + help='Run memory profiling on the benchmarking process') + opts = ap.parse_args() + + ### + if opts.site_name is None: + ap.error('--site_name specified was invalid') + + # If log is requested send the output there + if opts.save_output_to_log: + log_file = open('benchmark_results.log', 'a') + + # Send the output to the log file + sys.stdout = log_file + + # Check conditions based on the parsed arguments + if not opts.waveform_suite_table: + # If a waveform suite table is not provided, input_record and format_class must be provided + if opts.input_record is None or opts.format_class is None: + ap.error('--input_record and --format_class are required unless --waveform_suite_table is specified') + + # If a table with multiple files is passed we loop through it and save a summary of the results + if opts.waveform_suite_table: + waveform_suite = read_csv(opts.waveform_suite_table) + + format_list, waveform_list, test_list, result_list = init_summary_file(opts.waveform_suite_summary_file) + + for waveform_file in tqdm(waveform_suite): + # Extract metadata from looped file and launch benchmarking + record = waveform_file[0] + format = waveform_file[1] + pn_dir = waveform_file[2] + format_list, waveform_list, test_list, result_list = run_benchmarks(input_record=record, + format_class=format, pn_dir=pn_dir, + format_list=format_list, + waveform_list=waveform_list, + test_list=test_list, + result_list=result_list, + test_only = opts.test_only, + mem_profile = opts.memory_profiling, + site_name = opts.site_name) + + save_summary(format_list, waveform_list, test_list, result_list, opts.waveform_suite_summary_file) + + # Run benchmarking against a single file + else: + run_benchmarks(input_record=opts.input_record, + format_class=opts.format_class, + pn_dir=opts.physionet_directory, + test_only = opts.test_only, + mem_profile = opts.memory_profiling) + + # Close the log file after the run is complete + if opts.save_output_to_log: + log_file.close() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/central-wfs/src/base.py b/central-wfs/src/base.py new file mode 100644 index 0000000..ed37ce7 --- /dev/null +++ b/central-wfs/src/base.py @@ -0,0 +1,151 @@ +import abc +from typing import Dict, Tuple, Any, List, Array, Literal +from src.qc.gaps import ( + detect_gaps_stream, detect_gaps_pair, + correct_gaps_stream, correct_gaps_pair +) + +class BaseSite(abc.ABC): + + @abc.abstractmethod + def assemble_waveforms(self, path: str): + """ + Search directory for waveform file paths. + For each file path, extract columns: + + 'PATIENT_ID': str + 'SESSION_ID': str + 'PROCESS': bool + 'FILE_NAME': str + 'FORMAT': + + :param self: object instance + :param path: directory + :type path: str + """ + raise NotImplementedError + + @abc.abstractmethod + def write_waveforms(self, path: str, waveforms: dict): + raise NotImplementedError + + @abc.abstractmethod + def read_waveforms(self, path: str, start_time: float, end_time: float, + signal_names: list): + """ + Call on BaseFormat.read(). + Format to standard output for other operations: + + 1. quality checks + 2. gap correction + 3. WFDB conversion + + Reference site-specific name and structure irregularities. + """ + raise NotImplementedError + + @abc.abstractmethod + def get_metadata(self, path: str): + raise NotImplementedError + + @abc.abstractmethod + def gaps(self, data: Array, attrs: Dict): + """ + Prepare data and attributes for shared tool + to identify and correct gaps. + """ + # detect_gaps_pair() + # detect_gaps_stream() + # correct_gaps_pair() + # correct_gaps_stream() + raise NotImplementedError + +class BaseFormat(abc.ABC): + """ + Abstract class for extracting data and attributes from specific file format. + + For each data format that we want to benchmark, a class must be + defined that inherits from this class and defines the following + methods: + + - can_read: flag for system-readable + - read: prepare file attributes for site-specific read + - write: prepare file attributes for site-specific write + """ + + @abc.abstractmethod + def can_read(self, path: str): + """Return True if this format can read the given file.""" + raise NotImplementedError + + @abc.abstractmethod + def read(self, path: str, start_time: float = None, end_time: float = None, + signals: List[str] = None): + """ + Prepare waveform attributes from file for site-native read commands: + + 1. BaseSite.read_waveforms() + 2. BaseSite.get_metadata() + + Returns: + waveforms: dict mapping signal names to numpy arrays + metadata: dict of file metadata (sampling rate, channels, units) + """ + raise NotImplementedError + + @abc.abstractmethod + def write(self, path: str, waveforms: Dict[str, Any], metadata: Dict[str, Any]): + """ + Prepare waveform attributes from file for site-native write commands: + + 1. BaseSite.write_waveforms() + """ + raise NotImplementedError + + +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/formats/base.py + +class BaseFormat(abc.ABC): + """ + Abstract class for data formats to be tested. + + For each data format that we want to benchmark, a class must be + defined that inherits from this class and defines the following + methods: + + - write_waveforms: take a collection of waveform data, and save + the data to one or more files on disk + + - read_waveforms: load waveforms from one or more files, and + return the requested subset of the data + """ + + @abc.abstractmethod + def write_waveforms(self, path: str, waveforms: dict): + raise NotImplementedError + + @abc.abstractmethod + def read_waveforms(self, path: str, start_time: float, end_time: float, + signal_names: list): + raise NotImplementedError + + # kwargs is a dictionary that can be used to pass additional arguments to the format + # replaces the total_length, block_length, and block_size params which are either test specific or intrinsic to the format. + def open_waveforms(self, path: str, signal_names:list, **kwargs): + raise NotImplementedError + + def read_opened_waveforms(self, opened_files: dict, start_time: float, end_time: float, + signal_names: list): + raise NotImplementedError + + def close_waveforms(self, opened_files: dict): + raise NotImplementedError + + + def open_read_close_waveforms(self, path, start_time, end_time, signal_names, **kwargs): + opened_files = self.open_waveforms(path, signal_names, **kwargs) + output = self.read_opened_waveforms(opened_files, start_time, end_time, signal_names) + self.close_waveforms(opened_files) + + return output \ No newline at end of file diff --git a/central-wfs/src/formats/crf.py b/central-wfs/src/formats/crf.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/csv.py b/central-wfs/src/formats/csv.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/dicom.py b/central-wfs/src/formats/dicom.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/feather.py b/central-wfs/src/formats/feather.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/hdf5.py b/central-wfs/src/formats/hdf5.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/map.py b/central-wfs/src/formats/map.py new file mode 100644 index 0000000..a1bdc92 --- /dev/null +++ b/central-wfs/src/formats/map.py @@ -0,0 +1,36 @@ +# map of possible file format variations to their shared file extension that +# their respective base script will be called + +FILE_FORMAT_MAP = { + # CRF + "crf": "crf", + + # PARQUET + "parquet": "parquet", + "parq": "parquet", + "pq": "parquet", + + # CSV + "csv": "csv", + "tsv": "csv", # if normalized + + # DICOM + "dcm": ".dicom", + "dicom": ".dicom", + + # FEATHER + "feather": "feather", + "ft": "feather", + + # HDF5 + "hdf5": "hdf5", + "hd5": "hdf5", + "h5": "hdf5", + + # WFDB + "hea": "wfdb", # header file + "dat": "wfdb", # signal file + + # sometimes, extension ends in .gz + +} \ No newline at end of file diff --git a/central-wfs/src/formats/parquet.py b/central-wfs/src/formats/parquet.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/formats/wfdb.py b/central-wfs/src/formats/wfdb.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/pipeline.py b/central-wfs/src/pipeline.py new file mode 100644 index 0000000..a49c5f8 --- /dev/null +++ b/central-wfs/src/pipeline.py @@ -0,0 +1,678 @@ +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/benchmark.py + + +#!/usr/bin/python3 + +import importlib +import os +import random +import sys +import tempfile + +import numpy as np + +from src.utils import load_wfdb_signals +from src.qc.ioperf import PerformanceCounter +from src.utils import repeat_test +from src.utils import median_attr +from src.utils import convert_time_value_pairs_to_nan_array +from src.registry import Registry + +from memory_profiler import memory_usage +import time + + +def append_result(format_name, waveform_name, test_name, result, format_list, waveform_list, test_list, result_list): + """ + Add a result to the summary lists + """ + format_list.append(format_name) + waveform_list.append(waveform_name) + test_list.append(test_name) + result_list.append(result) + + return format_list, waveform_list, test_list, result_list + + +def __read(fmt, path, total_length, all_channels, block_length, block_count, rng, test1=False): + reader = fmt() + for j in range(block_count): + t0 = rng.random() * (total_length - block_length) + t1 = t0 + block_length + if test1: + c = rng.choice(all_channels) + reader.read_waveforms(path, t0, t1, [c]) + else: + reader.read_waveforms(path, t0, t1, all_channels) + + # reads random channel and random block, opening file each time. + + +def _run_read_test(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur=10, test_min_iter=3, test1=False, mem_profile=False): + r = random.Random(12345) + counters = [] + mem_usages = [] + for i in repeat_test(test_min_dur, test_min_iter): + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage( + (__read, (fmt, path, total_length, all_channels, block_length, block_count, r), {"test1": test1}), + include_children=True, max_usage=True) + mem_usages.append(mem_usage) + else: + __read(fmt, path, total_length, all_channels, block_length, block_count, r, test1=test1) + counters.append(pc) + return counters, mem_usages + + +# kwargs parameters are for any additional parameters that might be useful for optimization. +def __open(fmt, path, channels, **kwargs): + reader = fmt() + opened = reader.open_waveforms(path, channels, **kwargs) + + return (reader, opened) + + +def __close(reader, opened): + reader.close_waveforms(opened) + + +def __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, rng, + rand_channel: bool = False): + # TODO test: + # reader = fmt() + for j in range(block_count): + t0 = rng.random() * (total_length - block_length) + t1 = t0 + block_length + if rand_channel: + c = rng.choice(channels) + reader.read_opened_waveforms(opened, t0, t1, [c]) + else: + reader.read_opened_waveforms(opened, t0, t1, channels) + + +# reads random channel and random block, opening file(s) once. +def _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur=10, test_min_iter=3, + rand_channel: bool = True, test1ch: bool = False, mem_profile: bool = False): + r = random.Random(12345) + open_counters = [] + read_counters = [] + close_counters = [] + open_mem_usages = [] + read_mem_usages = [] + close_mem_usages = [] + + if (not rand_channel) and test1ch: + channels = [r.choice(all_channels)] + else: + channels = all_channels + rand_channel = test1ch and rand_channel + + for i in repeat_test(test_min_dur, test_min_iter): + # open + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage, (reader, opened) = memory_usage( + (__open, (fmt, path, channels), + {'total_length': total_length, 'block_length': block_length, 'block_count': block_count}), + include_children=True, max_usage=True, retval=True) + open_mem_usages.append(mem_usage) + else: + (reader, opened) = __open(fmt, path, channels, total_length=total_length, + block_length=block_length, block_count=block_count) + open_counters.append(pc) + + # read + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage((__read_rand_channel, + (reader, opened, total_length, channels, block_length, block_count, r), + {"rand_channel": rand_channel}), include_children=True, max_usage=True) + read_mem_usages.append(mem_usage) + else: + __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, r, + rand_channel=rand_channel) + read_counters.append(pc) + + # close + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage((__close, (reader, opened), {}), include_children=True, max_usage=True) + close_mem_usages.append(mem_usage) + else: + __close(reader, opened) + close_counters.append(pc) + return open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages + + +def compute_snr(reference_signal, output_signal): + """ + Compute the signal-to-noise ratio (SNR) for the signal in decibels. + """ + + # Convert the data to NumPy arrays as needed. + reference_signal = np.asarray(reference_signal) + output_signal = np.asarray(output_signal) + + # Check that the signals have the same dimensions. + assert (np.array_equal(np.shape(reference_signal), np.shape(output_signal))) + + # Compute the noise in the signal. + noise_signal = output_signal - reference_signal + + # Compute the SNR with special handling for edge cases. + x = np.sum(reference_signal ** 2) + y = np.sum(noise_signal ** 2) + + if x > 0 and y > 0: + snr = 10 * np.log10(x / y) + elif y == 0: + snr = float('inf') + else: + snr = float('nan') + + return snr + + +def print_header(mem_profile: bool = False, with_ops: bool = False): + ops = "OP " if with_ops else "" + if (mem_profile): + print('%s #seek #read KiB CPU(s) Wall(s) Mem(MB)(used/maxrss/malloced) [N]' % ops) + else: + print('%s #seek #read KiB CPU(s) Wall(s) [N]' % ops) + + +def print_results(op_str: str, channels: str, counters, mem_usages, block_count, block_length, + mem_profile: bool = False): + if (mem_profile): + print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %8.4f/%8.4f/%8.4f %6s open %d x %.0fs, %s' + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + # walltime / len(counters), + np.median(mem_usages), + median_attr(counters, 'max_rss'), + median_attr(counters, 'malloced'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) + else: + print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %6s open %d x %.0fs, %s' + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) + + +def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, waveform_list=None, test_list=None, + result_list=None, mem_profile=False, test_only=False, site_name=None): + + + ### + module_name, class_name = format_class.rsplit('.', 1) + + site_config = Registry.get_site_config(site_name) + fmt_name = site_config.file_formats + fmt_module = importlib.import_module(fmt_name) + fmt = getattr(fmt_module, class_name) + + site_name = site_config.site + site_module = importlib.import_module(site_name) + site = getattr(site_module, class_name) + input_record = input_record.removesuffix('.hea') + waveforms = site.read_waveforms(input_record) + # ... + + + + # Load the class we will be testing + module_name, class_name = format_class.rsplit('.', 1) + module = importlib.import_module(module_name) + fmt = getattr(module, class_name) + ### site = getattr(module, class_name) + + # Load the example data + input_record = input_record.removesuffix('.hea') + if pn_dir: + waveforms = load_wfdb_signals(input_record, pn_dir) + ### waveforms = fmt.read_waveforms() + else: + waveforms = load_wfdb_signals(input_record) + ### waveforms = fmt.read_waveforms() + all_channels = list(waveforms.keys()) + + total_length = 0 + timepoints_per_second = 0 + actual_samples = 0 + waveform_characterizations = {} + for i, (name, waveform) in enumerate(waveforms.items()): + precise_channel_length = 0 + channel_length = waveform['chunks'][-1]['end_time'] + total_length = max(total_length, channel_length) + timepoints_per_second += waveform['samples_per_second'] + actual_samples += sum(len(chunk['samples']) + for chunk in waveform['chunks']) + + # Collect summary information about each channel in the waveform + inv_gain = 1 / waveform['chunks'][-1]['gain'] + res_rounded = f'{float(f"{inv_gain:.3g}"):g}' + resolution = f"{res_rounded}({waveforms[name]['units']})" + precise_channel_length += sum(chunk['end_time'] - chunk['start_time'] + for chunk in waveform['chunks']) + waveform_characterizations[name] = { + 'fs': waveform['samples_per_second'], + 'bit_resolution': resolution, + 'channel_length': precise_channel_length + } + + total_timepoints = total_length * timepoints_per_second + if pn_dir: + record_name = os.path.join(pn_dir, input_record) + else: + record_name = input_record + + TEST_BLOCK_LENGTHS = [ + [total_length, 1], + [500, 5], # 5 random blocks of 500 seconds + [50, 50], # 50 random blocks of 50 seconds + [5, 500], # 500 random blocks of 5 seconds + ] + + TEST_MIN_DURATION = 10 + TEST_MIN_ITERATIONS = 3 + + print('_' * 64) + print('Format: %s' % format_class) + if fmt.__doc__: + print(' (%s)' + % fmt.__doc__.strip().splitlines()[0].rstrip('.')) + + print('Record: %s' % record_name) + print(' %.0f seconds x %d channels' + % (total_length, len(all_channels))) + print(' %d timepoints, %d samples (%.1f%%)' + % (total_timepoints, actual_samples, + 100 * actual_samples / total_timepoints)) + print('_' * 64) + + # Print summary information for each waveform channel + print('Channel summary information:') + print(f" {'signal':<12} {'fs(Hz)':<10} {'Bit resolution':<20} {'Channel length(s)':<20}") + for (signal, values) in waveform_characterizations.items(): + fs_value = f"{values['fs']:.2f}" + bit_resolution_value = values['bit_resolution'] + channel_length_value = f"{values['channel_length']:.0f}" + print(f" {signal:<12} {fs_value:<10} {bit_resolution_value:<20} {channel_length_value:<20}") + print('_' * 64) + + with tempfile.TemporaryDirectory(prefix='wavetest-', dir='.') as tempdir: + path = os.path.join(tempdir, 'wavetest') + + # Write the example data to a file or files. + # time1 = time.time() + with PerformanceCounter(mem_profile=mem_profile) as pc_write: + if mem_profile: + mem_usage = memory_usage((fmt().write_waveforms, (path, waveforms), {}), + include_children=True, max_usage=True) + else: + fmt().write_waveforms(path, waveforms) + # wall_time = time.time() - time1 + + # Calculate total size of the file(s). + output_size = 0 + for subdir, dirs, files in os.walk(tempdir): + for file in files: + output_size += os.path.getsize(os.path.join(subdir, file)) + + print('Output size: %.0f KiB (%.2f bits/sample)' + % (output_size / 1024, output_size * 8 / actual_samples)) + print('CPU time: %.4f sec' % pc_write.cpu_seconds) + # print('Wall Time: %.0f s' % wall_time) + print('Wall Time: %.4f s' % pc_write.walltime) + + if (mem_profile): + print('Memory Used (memory_profiler): %.0f MiB' % mem_usage) + print('Maximum Memory Used (max_rss): %.0f MiB' % pc_write.max_rss) + print('Memory Malloced (tracemalloc): %.0f MiB' % pc_write.malloced) + + print('_' * 64) + + if format_list is not None: + # Append output size and write time + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + 'output_size', + (output_size / 1024), format_list, + waveform_list, test_list, result_list) + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + 'output_time', + pc_write.cpu_seconds, format_list, + waveform_list, test_list, result_list) + + # Fidelity Check + # Loop over each waveform + print("Fidelity check via read_waveforms:") + print() + print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") + print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") + + for channel, waveform in waveforms.items(): + print(f"Signal: {channel}") + # Loop over chunks + # print("Chunk\t\t Numeric Samples\t\t NaN Samples") + # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") + + for i_ch, chunk in enumerate(waveform["chunks"]): + st = chunk["start_time"] + et = chunk["end_time"] + data = chunk["samples"] + + # read chunk from file + filedata = fmt().read_waveforms(path, st, et, [channel]) + filedata = filedata[channel] + + # Check if the filedata is in time-value pair format + if isinstance(filedata, tuple): + filedata = convert_time_value_pairs_to_nan_array(filedata, waveform, st, et) + + # compare values + + # check arrays are same size + if data.shape != filedata.shape: + print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") + continue + + # check for nans in correct location + NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) + numnan = np.sum(np.isnan(data)) + numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" + + # remove nans for equality check + data_nonan = data[~np.isnan(data)] + filedata_nonan = filedata[~np.isnan(data)] + + # use numpy's isclose to determine floating point equality + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5 / chunk['gain']) + numgood = np.sum(isgood) + fpeq_rel = numgood / len(data_nonan) if len(data_nonan) != 0 else np.nan + + # compute SNR to quantify signal fidelity + snr = compute_snr(data_nonan, filedata_nonan) + + # print to table + print( + f"{i_ch:^5}\t{len(data_nonan) - numgood:10}/{len(data_nonan):10}\t{fpeq_rel * 100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + + # print up to 10 bad values if not all equal + if numgood != len(data_nonan): + print("Subset of unuequal numeric data from input:") + print(data_nonan[~isgood][:10]) + print("Subset of unuequal numeric data from formatted file:") + print(filedata_nonan[~isgood][:10]) + print(f"(Gain: {chunk['gain']})") + # print('_' * 64) + + # Fidelity Check 2 + # Loop over each waveform + + try: + print('_' * 64) + print("Fidelity check via open/read/close:") + print() + print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") + print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") + + for channel,waveform in waveforms.items(): + print(f"Signal: {channel}") + # Loop over chunks + # print("Chunk\t\t Numeric Samples\t\t NaN Samples") + # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") + + for i_ch, chunk in enumerate(waveform["chunks"]): + st = chunk["start_time"] + et = chunk["end_time"] + data = chunk["samples"] + + # read chunk from file + filedata = fmt().open_read_close_waveforms(path, st, et, [channel]) + filedata = filedata[channel] + + # compare values + + # Check if the filedata is in time-value pair format + if isinstance(filedata, tuple): + filedata = convert_time_value_pairs_to_nan_array(filedata, waveform, st, et) + + # check arrays are same size + if data.shape != filedata.shape: + print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") + continue + + # check for nans in correct location + NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) + numnan = np.sum(np.isnan(data)) + numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" + + # remove nans for equality check + data_nonan = data[~np.isnan(data)] + filedata_nonan = filedata[~np.isnan(data)] + + # use numpy's isclose to determine floating point equality + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) + numgood = np.sum(isgood) + fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan + + # compute SNR to quantify signal fidelity + snr = compute_snr(data_nonan, filedata_nonan) + + # print to table + print(f"{i_ch:^5}\t{len(data_nonan)-numgood:10}/{len(data_nonan):10}\t{fpeq_rel*100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + + # print up to 10 bad values if not all equal + if numgood != len(data_nonan): + print("Subset of unuequal numeric data from input:") + print(data_nonan[~isgood][:10]) + print("Subset of unuequal numeric data from formatted file:") + print(filedata_nonan[~isgood][:10]) + print(f"(Gain: {chunk['gain']})") + # print('_' * 64) + except NotImplementedError: + print("Skipping Fidelity check open/read/close - open once, read many - this format has not implemented the required functions.") + + if not test_only: + try: + print('_' * 64) + print('Open Once Read Many performance (median of N trials):') + print_header(mem_profile=mem_profile, with_ops=True) + + for block_length, block_count in TEST_BLOCK_LENGTHS: + + # time1 = time.time() + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = True, test1ch = False, mem_profile = mem_profile) + # walltime = time.time() - time1 + + print_results("open ", "all channels", open_counters, open_mem_usages, + block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append open time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_open', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "all channels", + read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_read', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "all channels", + close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append close time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_close', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + for block_length, block_count in TEST_BLOCK_LENGTHS: + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = True, test1ch = True, mem_profile = mem_profile) + + print_results("open ", "rand channel", + open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_open_rand', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "rand channel", + read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_read_rand', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "rand channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_close_rand', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + + for block_length, block_count in TEST_BLOCK_LENGTHS: + + # time1 = time.time() + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = False, test1ch = True, mem_profile = mem_profile) + # walltime = time.time() - time1 + + print_results("open ", "one channel", open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append open time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_open_fixed', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "one channel", read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_read_fixed', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "one channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append close time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_close_fixed', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + except NotImplementedError: + print( + "Skipping Open Once Read Many performance test since this format has not implemented the required functions.") + + print('_' * 64) + print('Read performance (median of N trials):') + print_header(mem_profile=mem_profile, with_ops=False) + + for block_length, block_count in TEST_BLOCK_LENGTHS: + + # time1 = time.time() + counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, + block_length, block_count, + test_min_dur=TEST_MIN_DURATION, + test_min_iter=TEST_MIN_ITERATIONS, + test1=False, mem_profile=mem_profile) + # walltime = time.time() - time1 + + print_results("", "all channels", counters, mem_usages, block_count, block_length, + mem_profile=mem_profile) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all', + median_attr(counters, + 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + for block_length, block_count in TEST_BLOCK_LENGTHS: + counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, + block_length, block_count, + test_min_dur=TEST_MIN_DURATION, + test_min_iter=TEST_MIN_ITERATIONS, + test1=True, mem_profile=mem_profile) + # walltime = time.time() - time1 + + print_results("", "one channel", counters, mem_usages, block_count, block_length, + mem_profile=mem_profile) + + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one', + median_attr(counters, + 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print('_' * 64) + + if format_list is not None: + # Return the lists with appended results for this waveform + return format_list, waveform_list, test_list, result_list \ No newline at end of file diff --git a/central-wfs/src/qc/chs_norm.py b/central-wfs/src/qc/chs_norm.py new file mode 100644 index 0000000..e38a155 --- /dev/null +++ b/central-wfs/src/qc/chs_norm.py @@ -0,0 +1,3 @@ +# Check and normalize channel mappings based on: +# https://github.com/chorus-ai/chorus_waveform_resources/wiki/Waveform-Conversion#telemetry-signals +# https://github.com/chorus-ai/chorus_waveform_resources/wiki/Waveform-Conversion#electroencephalogram-signals diff --git a/central-wfs/src/qc/data.py b/central-wfs/src/qc/data.py new file mode 100644 index 0000000..3ce33d7 --- /dev/null +++ b/central-wfs/src/qc/data.py @@ -0,0 +1,357 @@ +#!/usr/bin/python3 + +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/benchmark.py + +import importlib +import os +import random +import sys +import tempfile + +import numpy as np + +from src.utils import load_wfdb_signals +from src.qc.ioperf import PerformanceCounter +from src.utils import ( + repeat_test, + median_attr, + convert_time_value_pairs_to_nan_array +) + +from memory_profiler import memory_usage +import time + +def append_result(format_name, waveform_name, test_name, result, format_list, waveform_list, test_list, result_list): + """ + Add a result to the summary lists + """ + format_list.append(format_name) + waveform_list.append(waveform_name) + test_list.append(test_name) + result_list.append(result) + + return format_list, waveform_list, test_list, result_list + + +def __read(fmt, path, total_length, all_channels, block_length, block_count, rng, test1=False): + reader = fmt() + for j in range(block_count): + t0 = rng.random() * (total_length - block_length) + t1 = t0 + block_length + if test1: + c = rng.choice(all_channels) + reader.read_waveforms(path, t0, t1, [c]) + else: + reader.read_waveforms(path, t0, t1, all_channels) + + # reads random channel and random block, opening file each time. + + +def _run_read_test(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur=10, test_min_iter=3, test1=False, mem_profile=False): + r = random.Random(12345) + counters = [] + mem_usages = [] + for i in repeat_test(test_min_dur, test_min_iter): + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage( + (__read, (fmt, path, total_length, all_channels, block_length, block_count, r), {"test1": test1}), + include_children=True, max_usage=True) + mem_usages.append(mem_usage) + else: + __read(fmt, path, total_length, all_channels, block_length, block_count, r, test1=test1) + counters.append(pc) + return counters, mem_usages + + +# kwargs parameters are for any additional parameters that might be useful for optimization. +def __open(fmt, path, channels, **kwargs): + reader = fmt() + opened = reader.open_waveforms(path, channels, **kwargs) + + return (reader, opened) + + +def __close(reader, opened): + reader.close_waveforms(opened) + + +def __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, rng, + rand_channel: bool = False): + # TODO test: + # reader = fmt() + for j in range(block_count): + t0 = rng.random() * (total_length - block_length) + t1 = t0 + block_length + if rand_channel: + c = rng.choice(channels) + reader.read_opened_waveforms(opened, t0, t1, [c]) + else: + reader.read_opened_waveforms(opened, t0, t1, channels) + + +# reads random channel and random block, opening file(s) once. +def _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur=10, test_min_iter=3, + rand_channel: bool = True, test1ch: bool = False, mem_profile: bool = False): + r = random.Random(12345) + open_counters = [] + read_counters = [] + close_counters = [] + open_mem_usages = [] + read_mem_usages = [] + close_mem_usages = [] + + if (not rand_channel) and test1ch: + channels = [r.choice(all_channels)] + else: + channels = all_channels + rand_channel = test1ch and rand_channel + + for i in repeat_test(test_min_dur, test_min_iter): + # open + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage, (reader, opened) = memory_usage( + (__open, (fmt, path, channels), + {'total_length': total_length, 'block_length': block_length, 'block_count': block_count}), + include_children=True, max_usage=True, retval=True) + open_mem_usages.append(mem_usage) + else: + (reader, opened) = __open(fmt, path, channels, total_length=total_length, + block_length=block_length, block_count=block_count) + open_counters.append(pc) + + # read + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage((__read_rand_channel, + (reader, opened, total_length, channels, block_length, block_count, r), + {"rand_channel": rand_channel}), include_children=True, max_usage=True) + read_mem_usages.append(mem_usage) + else: + __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, r, + rand_channel=rand_channel) + read_counters.append(pc) + + # close + with PerformanceCounter(mem_profile=mem_profile) as pc: + if (mem_profile): + mem_usage = memory_usage((__close, (reader, opened), {}), include_children=True, max_usage=True) + close_mem_usages.append(mem_usage) + else: + __close(reader, opened) + close_counters.append(pc) + return open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages + + +def compute_snr(reference_signal, output_signal): + """ + Compute the signal-to-noise ratio (SNR) for the signal in decibels. + """ + + # Convert the data to NumPy arrays as needed. + reference_signal = np.asarray(reference_signal) + output_signal = np.asarray(output_signal) + + # Check that the signals have the same dimensions. + assert (np.array_equal(np.shape(reference_signal), np.shape(output_signal))) + + # Compute the noise in the signal. + noise_signal = output_signal - reference_signal + + # Compute the SNR with special handling for edge cases. + x = np.sum(reference_signal ** 2) + y = np.sum(noise_signal ** 2) + + if x > 0 and y > 0: + snr = 10 * np.log10(x / y) + elif y == 0: + snr = float('inf') + else: + snr = float('nan') + + return snr + + +def print_header(mem_profile: bool = False, with_ops: bool = False): + ops = "OP " if with_ops else "" + if (mem_profile): + print('%s #seek #read KiB CPU(s) Wall(s) Mem(MB)(used/maxrss/malloced) [N]' % ops) + else: + print('%s #seek #read KiB CPU(s) Wall(s) [N]' % ops) + + +def print_results(op_str: str, channels: str, counters, mem_usages, block_count, block_length, + mem_profile: bool = False): + if (mem_profile): + print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %8.4f/%8.4f/%8.4f %6s open %d x %.0fs, %s' + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + # walltime / len(counters), + np.median(mem_usages), + median_attr(counters, 'max_rss'), + median_attr(counters, 'malloced'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) + else: + print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %6s open %d x %.0fs, %s' + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) + +def check_data(waveforms, format_class, format_list=None, waveform_list=None, test_list=None, + result_list=None, mem_profile=False, test_only=False): + + # Load the class we will be testing + module_name, class_name = format_class.rsplit('.', 1) + module = importlib.import_module(module_name) + fmt = getattr(module, class_name) + + with tempfile.TemporaryDirectory(prefix='wavetest-', dir='.') as tempdir: + path = os.path.join(tempdir, 'wavetest') + + # Fidelity Check + # Loop over each waveform + print("Fidelity check via read_waveforms:") + print() + print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") + print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") + + for channel, waveform in waveforms.items(): + print(f"Signal: {channel}") + # Loop over chunks + # print("Chunk\t\t Numeric Samples\t\t NaN Samples") + # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") + + for i_ch, chunk in enumerate(waveform["chunks"]): + st = chunk["start_time"] + et = chunk["end_time"] + data = chunk["samples"] + + # read chunk from file + filedata = fmt().read_waveforms(path, st, et, [channel]) + filedata = filedata[channel] + + # Check if the filedata is in time-value pair format + if isinstance(filedata, tuple): + filedata = convert_time_value_pairs_to_nan_array(filedata, waveform, st, et) + + # compare values + + # check arrays are same size + if data.shape != filedata.shape: + print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") + continue + + # check for nans in correct location + NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) + numnan = np.sum(np.isnan(data)) + numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" + + # remove nans for equality check + data_nonan = data[~np.isnan(data)] + filedata_nonan = filedata[~np.isnan(data)] + + # use numpy's isclose to determine floating point equality + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5 / chunk['gain']) + numgood = np.sum(isgood) + fpeq_rel = numgood / len(data_nonan) if len(data_nonan) != 0 else np.nan + + # compute SNR to quantify signal fidelity + snr = compute_snr(data_nonan, filedata_nonan) + + # print to table + print( + f"{i_ch:^5}\t{len(data_nonan) - numgood:10}/{len(data_nonan):10}\t{fpeq_rel * 100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + + # print up to 10 bad values if not all equal + if numgood != len(data_nonan): + print("Subset of unuequal numeric data from input:") + print(data_nonan[~isgood][:10]) + print("Subset of unuequal numeric data from formatted file:") + print(filedata_nonan[~isgood][:10]) + print(f"(Gain: {chunk['gain']})") + # print('_' * 64) + + # Fidelity Check 2 + # Loop over each waveform + + try: + print('_' * 64) + print("Fidelity check via open/read/close:") + print() + print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") + print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") + + for channel,waveform in waveforms.items(): + print(f"Signal: {channel}") + # Loop over chunks + # print("Chunk\t\t Numeric Samples\t\t NaN Samples") + # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") + + for i_ch, chunk in enumerate(waveform["chunks"]): + st = chunk["start_time"] + et = chunk["end_time"] + data = chunk["samples"] + + # read chunk from file + filedata = fmt().open_read_close_waveforms(path, st, et, [channel]) + filedata = filedata[channel] + + # compare values + + # Check if the filedata is in time-value pair format + if isinstance(filedata, tuple): + filedata = convert_time_value_pairs_to_nan_array(filedata, waveform, st, et) + + # check arrays are same size + if data.shape != filedata.shape: + print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") + continue + + # check for nans in correct location + NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) + numnan = np.sum(np.isnan(data)) + numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" + + # remove nans for equality check + data_nonan = data[~np.isnan(data)] + filedata_nonan = filedata[~np.isnan(data)] + + # use numpy's isclose to determine floating point equality + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) + numgood = np.sum(isgood) + fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan + + # compute SNR to quantify signal fidelity + snr = compute_snr(data_nonan, filedata_nonan) + + # print to table + print(f"{i_ch:^5}\t{len(data_nonan)-numgood:10}/{len(data_nonan):10}\t{fpeq_rel*100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + + # print up to 10 bad values if not all equal + if numgood != len(data_nonan): + print("Subset of unuequal numeric data from input:") + print(data_nonan[~isgood][:10]) + print("Subset of unuequal numeric data from formatted file:") + print(filedata_nonan[~isgood][:10]) + print(f"(Gain: {chunk['gain']})") + # print('_' * 64) + except NotImplementedError: + print("Skipping Fidelity check open/read/close - open once, read many - this format has not implemented the required functions.") \ No newline at end of file diff --git a/central-wfs/src/qc/gaps.py b/central-wfs/src/qc/gaps.py new file mode 100644 index 0000000..ba6ed05 --- /dev/null +++ b/central-wfs/src/qc/gaps.py @@ -0,0 +1,64 @@ +# Submissions are assumed to be formatted as pair-based OR stream-based: +# Pair-based: one timestamp for each sample +# Stream-based: more than one sample per row + # with a corresponding "start" time and "end" time for the stream. + # Indicates whether the start and end times correspond + # to the beginning or end of the first and last samples. + +from typing import List, Array + +def detect_gaps_pair(timestamps: List, waveforms: Array): + """ + Gap detection for pair-based waveform data. + + :param timestamps: (N,1) shape, for N rows/messages + and 1 timestamp for each message/sample. + :type timestamps: Array + :param waveforms: Data values and types of waveform data. + :type waveforms: Array + + + Returns: + summary_gaps: dict describing gap type(s), frequency, location(s). + identified_gaps: list of gap indices and segments to correct for. + """ + pass + +def detect_gaps_stream(timestamps: Array, waveforms: Array ): + """ + Gap detection for stream-based waveform data. + + :param timestamps: (N,2) shape, for N rows/messages + and 2 timestamps for start and end. + If initialy data has more than 2 timestamp column, + then declare 2 as the bounds of each message. + :type timestamps: Array + :param waveforms: Data values and types of waveform data. + :type waveforms: Array + + Returns: + summary_gaps: dict describing gap type(s), frequency, location(s) + identified_gaps: list of gap indices and segments to correct for + """ + pass + +def correct_gaps_pair(): + """ + Correct gaps in pair-based waveform data. + + Returns: + corrected_timestamps: array with gaps corrected + corrected_waveforms: waveform array with interpolated or filled values + """ + pass + +def correct_gaps_stream(): + """ + Correct gaps in stream-based waveform data. + + Returns: + corrected_timestamps: array with gaps corrected + corrected_waveforms: waveform array with interpolated or filled values + """ + + pass \ No newline at end of file diff --git a/central-wfs/src/qc/ioperf.py b/central-wfs/src/qc/ioperf.py new file mode 100644 index 0000000..fc85d6e --- /dev/null +++ b/central-wfs/src/qc/ioperf.py @@ -0,0 +1,177 @@ +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/ioperf.py + +import math +import os +import re +import resource +import signal +import subprocess +import sys +import threading +import tracemalloc +import time +class PerformanceCounter: + """ + Context manager to measure CPU and I/O usage. + + This object has the following attributes, which (approximately) + represent the system resources that are used during execution of + the context manager - that is to say, anything that happens before + or after the "with" block doesn't count. + + 'n_read_calls': Number of times the read() system call was + invoked. + + 'n_seek_calls': Number of times the lseek() system call was + invoked. + + 'n_bytes_read': Number of bytes that were read from disk. + + 'cpu_seconds': Number of seconds that the CPU was busy + (multiplied by the number of CPU cores in use.) + + 'max_rss': Maximum resident set size of the process during + execution, in MB + 'mem_used': Memory allocated by the process during execution, in MB + + + If the 'clear_cache' argument is true, then attempt to clear the + system cache whenever a file is opened (so that 'n_bytes_read' + should reflect the total number of bytes retrieved.) Note that + 'clear_cache' *only* affects files that are opened through Python + (using the open() or os.open() functions) and depends on the + filesystem (it won't work on tmpfs, for instance.) + + several methods have been tested: + rusage, tracemalloc, psutil, memory_profiler + each is reporting different numbers. + + memory_profiler by default uses psutil. + rusage can only report maxrss + tracemalloc is reporting allocations. + + we will report a composite + """ + def __init__(self, clear_cache=True, mem_profile = False): + self._clear_cache = clear_cache + self.mem_profile = mem_profile + self.n_read_calls = 0 + self.n_seek_calls = 0 + self.n_bytes_read = 0 + self.cpu_seconds = 0 + if (self.mem_profile): + tracemalloc.start() + + + def __enter__(self): + if self._clear_cache: + with _nocache_lock: + _nocache_enabled.add(self) + + if self._clear_cache and not _nocache_supported: + self.n_bytes_read = math.nan + + # Run strace to track system calls. + try: + self._strace = subprocess.Popen(['strace', '-c', '-f', + '-U', 'name,calls', + '-e', 'lseek,read', + '-p', str(os.getpid())], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Wait for strace to attach to the current process. (Subtract + # one from n_read_calls because this counts as one.) + self._strace.stdout.readline() + self.n_read_calls -= 1 + except FileNotFoundError: + self._strace = None + self.n_read_calls = math.nan + self.n_seek_calls = math.nan + + if (self.mem_profile): + tracemalloc.reset_peak() + + self._walltime_start = time.time() + + # Measure past resource usage for this process and any children. + self._rusage_self = resource.getrusage(resource.RUSAGE_SELF) + self._rusage_children = resource.getrusage(resource.RUSAGE_CHILDREN) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Measure current resource usage for this process and any children. + rusage_self = resource.getrusage(resource.RUSAGE_SELF) + rusage_children = resource.getrusage(resource.RUSAGE_CHILDREN) + + # Calculate wall time + self.walltime = time.time() - self._walltime_start + + with _nocache_lock: + _nocache_enabled.discard(self) + + # Stop the strace process and read its output. + if self._strace is not None: + self._strace.send_signal(signal.SIGINT) + strace_data, _ = self._strace.communicate() + for m in re.finditer(rb'^\s*(\w+)\s+(\d+)\s*$', + strace_data, re.MULTILINE): + if m.group(1) == b'read': + self.n_read_calls += int(m.group(2)) + elif m.group(1) == b'lseek': + self.n_seek_calls += int(m.group(2)) + + if (self.mem_profile): + (final_usage, peak_usage) = tracemalloc.get_traced_memory() + tracemalloc.stop() + + self.malloced = (peak_usage - final_usage) / (2**20) + self.max_rss = (rusage_self.ru_maxrss + rusage_children.ru_maxrss) / float(2**10) + + + # Calculate number of bytes read. ru_inblock is always + # measured in 512-byte blocks regardless of I/O block size. + self.n_bytes_read += 512 * (rusage_self.ru_inblock + + rusage_children.ru_inblock + - self._rusage_self.ru_inblock + - self._rusage_children.ru_inblock) + + + # Calculate CPU seconds, including both user and kernel time. + self.cpu_seconds += (rusage_self.ru_utime + + rusage_self.ru_stime + + rusage_children.ru_utime + + rusage_children.ru_stime + - self._rusage_self.ru_utime + - self._rusage_self.ru_stime + - self._rusage_children.ru_utime + - self._rusage_children.ru_stime) + + +_nocache_lock = threading.Lock() +_nocache_supported = False +_nocache_enabled = set() +_nocache_loop = False + +if hasattr(os, 'posix_fadvise') and hasattr(os, 'POSIX_FADV_DONTNEED'): + _nocache_supported = True + + def _nocache_hook(event, args): + global _nocache_loop + if event == 'open' and not _nocache_loop: + with _nocache_lock: + if _nocache_enabled and isinstance(args[0], (str, bytes)): + _nocache_loop = True + fd = None + try: + fd = os.open(args[0], os.O_RDONLY | os.O_CLOEXEC) + os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) + except OSError: + pass + finally: + if fd is not None: + os.close(fd) + _nocache_loop = False + + sys.addaudithook(_nocache_hook) \ No newline at end of file diff --git a/central-wfs/src/registry.py b/central-wfs/src/registry.py new file mode 100644 index 0000000..7425bfa --- /dev/null +++ b/central-wfs/src/registry.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import Dict, Tuple, Literal +from datetime import ZoneInfo +from typing import Type, Dict +from src.base import BaseFormat, BaseSite + +# SITE_REGISTRY = {} + +@dataclass(frozen=True) +class SiteInfo: + site: str + root_path: str + metadata_path: str + mapping_path: str + bedside_system: Tuple[Literal["DRAGER", "DWC", "PHILIPS", "GE", "NIHON KOHDEN"], ...] + backend_system: Tuple[Literal["CAPSULE", "BEDMASTER", "SICKBAY", "DWC"], ...] + file_formats: Tuple[Literal["CSV","H5","PARQUET","FEATHER","WFDB","DICOM"],...] + is_wfdb: bool + telemetry: bool + eeg: bool + timezone: ZoneInfo + time_base: Literal["STREAM-BASED", "PAIR-BASED"] + +class Registry: + """ + Class methods for defining and loading site settings prior to + executing ingestion, correction, and conversion pipeline. + """ + + @classmethod + def register_site(cls, site_name: str, converter_config: SiteInfo): + cls.site_config = converter_config + + @classmethod + def get_site_config(cls, site_name: str): + try: + return cls.site_config + except KeyError: + raise ValueError(f"No loader registered for site '{site_name}'") \ No newline at end of file diff --git a/central-wfs/src/sites/columbia.py b/central-wfs/src/sites/columbia.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/duke.py b/central-wfs/src/sites/duke.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/emory.py b/central-wfs/src/sites/emory.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/mayo.py b/central-wfs/src/sites/mayo.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/mgh.py b/central-wfs/src/sites/mgh.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/mit.py b/central-wfs/src/sites/mit.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/nationwide.py b/central-wfs/src/sites/nationwide.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/pitt.py b/central-wfs/src/sites/pitt.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/seattle.py b/central-wfs/src/sites/seattle.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/tufts.py b/central-wfs/src/sites/tufts.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/ucla.py b/central-wfs/src/sites/ucla.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/ucsf.py b/central-wfs/src/sites/ucsf.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/uf.py b/central-wfs/src/sites/uf.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/sites/uva.py b/central-wfs/src/sites/uva.py new file mode 100644 index 0000000..e69de29 diff --git a/central-wfs/src/utils.py b/central-wfs/src/utils.py new file mode 100644 index 0000000..b9bce31 --- /dev/null +++ b/central-wfs/src/utils.py @@ -0,0 +1,104 @@ +# taken directly from this repo: +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/utils.py +# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/input.py + +import time +import numpy as np +import numpy +import wfdb + +def load_wfdb_signals(record_name, pn_dir=None, start=None, end=None): + header = wfdb.rdheader(record_name, pn_dir=pn_dir) + if start is None: + start = 0 + start = round(header.get_frame_number(start)) + if end is None: + end = header.sig_len + end = round(header.get_frame_number(end)) + + record = wfdb.rdrecord(record_name, pn_dir=pn_dir, return_res=32, + smooth_frames=False, m2s=False, + sampfrom=start, sampto=end) + + if isinstance(record, wfdb.MultiRecord): + segments = record.segments + segment_lengths = record.seg_len + else: + segments = [record] + segment_lengths = [record.sig_len] + layout = segments[0] + + waveforms = {} + + for i, name in enumerate(layout.sig_name): + waveform = { + 'units': layout.units[i], + 'samples_per_second': layout.fs * layout.samps_per_frame[i], + 'chunks': [], + } + waveforms[name] = waveform + + end_frame = 0 + for seg, seg_len in zip(segments, segment_lengths): + start_frame = end_frame + end_frame += seg_len + if seg is None or seg_len == 0: + continue + for i, name in enumerate(seg.sig_name): + chunk = { + 'start_time': start_frame / layout.fs, + 'end_time': end_frame / layout.fs, + 'start_sample': start_frame * seg.samps_per_frame[i], + 'end_sample': end_frame * seg.samps_per_frame[i], + 'gain': seg.adc_gain[i], + 'samples': seg.e_p_signal[i], + } + wave_chunks = waveforms[name]['chunks'] + if (wave_chunks + and wave_chunks[-1]['end_sample'] == chunk['start_sample'] + and wave_chunks[-1]['gain'] == chunk['gain']): + wave_chunks[-1]['end_sample'] = chunk['end_sample'] + wave_chunks[-1]['end_time'] = chunk['end_time'] + wave_chunks[-1]['samples'] = numpy.concatenate( + (wave_chunks[-1]['samples'], chunk['samples']) + ) + else: + wave_chunks.append(chunk) + + return waveforms + +def repeat_test(min_duration, min_iterations): + end = time.time() + min_duration + for _ in range(min_iterations): + yield + while time.time() < end: + yield + + +def median_attr(objects, attr): + values = [] + for obj in objects: + values.append(getattr(obj, attr)) + values.sort() + n = len(values) + return (values[n // 2] + values[(n - 1) // 2]) / 2 + + +def convert_time_value_pairs_to_nan_array(filedata, waveform, st, et): + # Write non-nan data onto nan array + read_time_data, read_value_data = filedata + freq_hz = waveform['samples_per_second'] + + num_samples = round(et * freq_hz) - round(st * freq_hz) + nan_values = np.full(num_samples, np.nan, dtype=np.float64) + + start_time_nano = round(st * (10 ** 9)) + period_ns = (10 ** 9) / freq_hz + closest_i_array = np.round((read_time_data - start_time_nano) / period_ns).astype(np.int64) + + # Make sure indices are within bounds + mask = (closest_i_array >= 0) & (closest_i_array < num_samples) + closest_i_array = closest_i_array[mask] + nan_values[closest_i_array] = read_value_data[mask] + + return nan_values \ No newline at end of file