Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions central-wfs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
### Intro
Project architecture to centrally ingest CHoRUS site waveforms; identify and correct gaps; and convert to WFDB format.

Layout mimics extensive work done [here](https://github.com/chorus-ai/chorus_waveform/tree/main) by some wonderful people.

Below diagrams how waveform data flows with this framework, starting at site upload.

```mermaid
---
title: Ingestion-Conversion pipeline
---

flowchart TD

A@{ shape: stadium, label: "__main__: CLI call options, summary record(s) table" }

Z@{ shape: stadium, label: "assemble_waveforms: inventory waveform file paths" }

B@{ shape: stadium, label: "run_benchmarks(waveform_suite_table)" }

Y@{ shape: stadium, label: "load_signals: site/format-dependent, may be WFDB already" }

C@{ shape: stadium, label: "waveform_summary: per channel, per chunk" }

W@{shape: delay, label: "process_gaps: identify and convert" }

D@{ shape: stadium, label: "write_waveforms: site/format-dependent, to WFDB" }

E@{ shape: stadium, label: "fidelity checks: data quality, read/write operations" }

X@{ shape: rounded, label: "site uploads data" }

F@{ shape: database, label: "save_output_to_log: store converted metadata and data" }

G@{ shape: rounded, label: "PerformanceCounter: memory profiler" }

H@{ shape: lin-rect, label: "formats/ and sites/: import specific functions and methods" }

I@{ shape: lin-rect, label: "registry: define and locate site configurations"}

%%LINKS
A --> Z --> B --> operations

subgraph operations
direction TB
subgraph read/write
direction TB
Y --> C --> D
end
G --> read/write
read/write --> E
I --> H --> read/write
end
X XZ@--> Z
operations oF@--> F
W -.-> read/write

%%STYLING
oF@{ animation: fast}
XZ@{ animation: slow}
```

### Repo layout
```
central_wfs/
├── config/ # setup
│ ├── .env
│ └── environment.yml # conda/mamba install
├── src/ # Source code
│ ├── qc/
│ │ ├── chs_norm.py # check channel mappings
│ │ ├── data.py # fidelity check on data quality
│ │ ├── gaps.py # identify and correct gaps
│ │ └── ioperf.py # monitor memory & performance
│ ├── formats/
│ │ ├── map.py # dictionary to standardize site file extensions
│ │ ├── csv.py
│ │ ├── parquet.py
│ │ ├── wfdb.py
│ │ ├── format_n.py
│ ├── sites/
│ │ ├── uf.py
│ │ ├── columbia.py
│ │ ├── mgh.py
│ │ ├── mit.py
│ │ └── site_n.py
│ ├── __main__.py # entry point
│ ├── __init__.py
│ ├── base.py # define site/format-specific class requirements
│ ├── registry.py # define site configs
│ └── utils.py
├── data/
│ └── suites/ # file path tables
├── tests/ # unit/integration tests
├── docs/ # documentation
├── .gitignore
├── pyproject.toml # pip install
├── requirements.txt # python dependencies
└── README.md # project overview
```

#### Design notes
1. Making gap tooling central versus site-specific (currently central).
2. Abstracting site AND format class methods in preparation for potential heterogeneity of site waveform uploads.
3. Additional modifications were denoted in the neighborhood with '###'
4. Site-specific classes inherit from format-specific classes, and include a method for assembling the waveform suite table.
39 changes: 39 additions & 0 deletions central-wfs/config/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# setup:
# 1. mamba env create -f environment.yml
# 2. mamba activate central_wfs

name: central_wfs
channels:
- conda-forge
- defaults

dependencies:
- python=3.10

# SCIENTIFIC COMPUTING
- numpy
- scipy
- pandas
- polars
- pyarrow

# DATA FORMATS
- h5py
- mne
- pyedflib
- pyyaml
- pydicom

# SUPPORT TOOLING
- click
- tqdm
- rich
- memory_profiler

- pip

# SAFER WITH PIP
- pip:
- wfdb
- pydantic>=2
- -e .
60 changes: 60 additions & 0 deletions central-wfs/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# setup via:
# 1. pip install -e .


[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "central_wfs"
version = "0.1.0"
description = "Central ingestion, gap correction, and WFDB conversion pipeline for heterogeneous medical waveform data."
authors = [
{ name = "u", email = "u@site.edu" }
]
readme = "README.md"
requires-python = ">=3.10"

dependencies = [

# SCIENTIFIC COMPUTING
"numpy",
"scipy",
"pandas",
"polars",

# DATA FORMATS
"pyyaml", # config: YAML
"h5py",
"wfdb",
"pyarrow",
"pydicom",

"pyedflib", # EDF, BDF, EDF+
"mne",

# CONFIG & VALIDATION
"pydantic>=2",

# CLI / UTILITY / LOGGING
"click",
"tqdm",
"rich",
"memory_profiler",
]

# OPTIONAL DEPENDENCIES BY DOMAIN
[project.optional-dependencies]
testing = ["pytest", "pytest-cov"]
dev = ["black", "ruff", "mypy"]
plotting = ["matplotlib"]
excel = ["openpyxl"]

# CONSOLE ENTRYPOINT(S) FOR CLI
[project.scripts]
ingest = "src.__main__:main"

# SETUP TOOLS
[tool.setuptools.packages.find]
where = ["src"]
Empty file added central-wfs/src/__init__.py
Empty file.
155 changes: 155 additions & 0 deletions central-wfs/src/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# taken directly from this repo:
# https://github.com/chorus-ai/chorus_waveform/blob/main/waveform_benchmark/__main__.py

import argparse
import csv
import os
import sys

import pandas as pd
from tqdm import tqdm

from src.pipeline import run_benchmarks

def read_csv(file_path):
"""
Read in a csv file with a header
"""
data = []
with open(file_path, 'r', newline='') as csvfile:
csv_reader = csv.reader(csvfile)
next(csv_reader)
for row in csv_reader:
data.append(row)

return data


def init_summary_file(summary_file):
"""
Initialize the summary lists - either grab existing data from a file to be appended to or create empty lists for all
new data
"""
if os.path.exists(summary_file):
df_summary = pd.read_csv(summary_file)
try:
format_list = list(df_summary['format'])
waveform_list = list(df_summary['waveform'])
test_list = list(df_summary['test'])
result_list = list(df_summary['result'])
except:
print("Required columns not found in the waveform suite summary file")
sys.exit(1)
else:
format_list = []
waveform_list = []
test_list = []
result_list = []

return format_list, waveform_list, test_list, result_list


def save_summary(format_list, waveform_list, test_list, result_list, summary_file):
"""
Save a summary of the results to a CSV file
"""
df_updated_summary = pd.DataFrame(zip(format_list, waveform_list, test_list, result_list),
columns=['format', 'waveform', 'test', 'result'])

# Add columns for the last identifier for format and waveform
df_updated_summary['format_id'] = df_updated_summary['format'].str.split('.').str[-1]
df_updated_summary['waveform_id'] = df_updated_summary['waveform'].str.split('/').str[-1]

# Reorder the columns
df_updated_summary = df_updated_summary[['test', 'waveform', 'waveform_id', 'format', 'format_id', 'result']]

df_updated_summary.to_csv(summary_file, index=False)


def main():
ap = argparse.ArgumentParser()

###
ap.add_argument('--site_name', '-r',
default=False,
help='The site name to configure ingestion, correction, and conversion to.')

ap.add_argument('--input_record', '-r',
help='The record name to run benchmarking against')
ap.add_argument('--format_class', '-f',
help='The format to save the waveform in')
ap.add_argument('--physionet_directory', '-p',
default=None,
help='The physionet database directory to read the source waveform from')
ap.add_argument('--save_output_to_log', '-l',
default=False,
help='Save all of the benchmarking results to a log file')
ap.add_argument('--waveform_suite_table', '-s',
default=None,
help='A csv table with input_record, physionet directory, and format class for multiple files')
ap.add_argument('--waveform_suite_summary_file', '-w',
default='waveform_suite_benchmark_summary.csv',
help='Save a CSV summary of the waveform suite run to this path/file')
ap.add_argument('--test_only',
default=False, action='store_true',
help='Run only the tests, do not run the benchmarks')
ap.add_argument('--memory_profiling', '-m',
default=False, type=bool, action=argparse.BooleanOptionalAction,
help='Run memory profiling on the benchmarking process')
opts = ap.parse_args()

###
if opts.site_name is None:
ap.error('--site_name specified was invalid')

# If log is requested send the output there
if opts.save_output_to_log:
log_file = open('benchmark_results.log', 'a')

# Send the output to the log file
sys.stdout = log_file

# Check conditions based on the parsed arguments
if not opts.waveform_suite_table:
# If a waveform suite table is not provided, input_record and format_class must be provided
if opts.input_record is None or opts.format_class is None:
ap.error('--input_record and --format_class are required unless --waveform_suite_table is specified')

# If a table with multiple files is passed we loop through it and save a summary of the results
if opts.waveform_suite_table:
waveform_suite = read_csv(opts.waveform_suite_table)

format_list, waveform_list, test_list, result_list = init_summary_file(opts.waveform_suite_summary_file)

for waveform_file in tqdm(waveform_suite):
# Extract metadata from looped file and launch benchmarking
record = waveform_file[0]
format = waveform_file[1]
pn_dir = waveform_file[2]
format_list, waveform_list, test_list, result_list = run_benchmarks(input_record=record,
format_class=format, pn_dir=pn_dir,
format_list=format_list,
waveform_list=waveform_list,
test_list=test_list,
result_list=result_list,
test_only = opts.test_only,
mem_profile = opts.memory_profiling,
site_name = opts.site_name)

save_summary(format_list, waveform_list, test_list, result_list, opts.waveform_suite_summary_file)

# Run benchmarking against a single file
else:
run_benchmarks(input_record=opts.input_record,
format_class=opts.format_class,
pn_dir=opts.physionet_directory,
test_only = opts.test_only,
mem_profile = opts.memory_profiling)

# Close the log file after the run is complete
if opts.save_output_to_log:
log_file.close()


if __name__ == '__main__':
main()
Loading