diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..817009e --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +# Exclude all __init__.py files +exclude = __init__.py, ./tests/**/** \ No newline at end of file diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml new file mode 100644 index 0000000..52bcdca --- /dev/null +++ b/.github/workflows/lint-and-test.yml @@ -0,0 +1,36 @@ +name: Lint with flake8 and test with PyTest + +on: [pull_request] + +jobs: + flake8-lint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install testing dependencies + run: | + pip install flake8 pep8-naming pytest pytest-cov + + - name: Install package dependencies + run: | + pip install -e . + + - name: Run flake8 + run: | + flake8 EEGPhasePy + + - name: Run PyTest + run: | + pytest --cov --cov-branch --cov-report=xml + + - name: Upload coverage reports to Codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.gitignore b/.gitignore index 018e25c..ede1a33 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,6 @@ -phase-eeg \ No newline at end of file +__pycache__ +venv +dist +EEGPhasePy.egg-info +eegphasepy.egg-info +tests/test_data/** \ No newline at end of file diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..7cbd526 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,21 @@ +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the OS, Python version, and other tools you might need +build: + os: ubuntu-24.04 + tools: + python: "3.13" + +# Build documentation in the "docs/" directory with Sphinx +sphinx: + configuration: docs/source/conf.py +# Optionally, but recommended, +# declare the Python requirements required to build your documentation +# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html +python: + install: + - requirements: docs/requirements.txt diff --git a/EEGPhasePy/__pycache__/__init__.cpython-39.pyc b/EEGPhasePy/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 33d7670..0000000 Binary files a/EEGPhasePy/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/estimators/__init__.py b/EEGPhasePy/estimators/__init__.py index 4f62f58..d079dc7 100644 --- a/EEGPhasePy/estimators/__init__.py +++ b/EEGPhasePy/estimators/__init__.py @@ -1 +1,3 @@ -from .etp import ETP \ No newline at end of file +from .etp import ETP +from .estimator import Estimator +from .phastimate import PHASTIMATE diff --git a/EEGPhasePy/estimators/__pycache__/__init__.cpython-39.pyc b/EEGPhasePy/estimators/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 77887fb..0000000 Binary files a/EEGPhasePy/estimators/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/estimators/__pycache__/etp.cpython-39.pyc b/EEGPhasePy/estimators/__pycache__/etp.cpython-39.pyc deleted file mode 100644 index 877d816..0000000 Binary files a/EEGPhasePy/estimators/__pycache__/etp.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/estimators/estimator.py b/EEGPhasePy/estimators/estimator.py new file mode 100644 index 0000000..68a0b2d --- /dev/null +++ b/EEGPhasePy/estimators/estimator.py @@ -0,0 +1,333 @@ +import numpy as np +import scipy.signal as signal +import scipy.stats as stats +import matplotlib + +from ..utils.check import _check_array_dimensions, _check_type +from ..viz import plot_polar_histogram, plot_waveform_average + + +class Estimator: + ''' + Base model for all phase estimators. This class contains the helper + functions, validation and base parameters needed for all estimators. + ''' + + def __init__(self, + real_time_filter: np.ndarray, + ground_truth_filter: np.ndarray, + sampling_rate: int, + window_len: int = 500, + window_edge: int = 40): + ''' + Base model for all phase estimators. This class contains the helper + functions, validation and base parameters needed for all estimators. + + Parameters + ---------- + real_time_filter + Filter parameters for filter to apply for predicting phase. + Accounts for FIR or IIR filters + ground_truth_filter + Filter parameters for identifying "true" phase. See + :cite:t:Zrenner2020-zb for an in-depth discussion + on "true" phase. Accounts for FIR or IIR filters + sampling_rate : int + Original sampling rate of data. + window_len : int + Window length in ms. Optional parameter to specify window length + to run pseudo-real-time simulations or to train `window_len` + dependent models with. This should match whatever is used in + real-time + window_edge : int + Window edge to remove in ms. Optional parameter to specify edge to + remove after applying `real_time_filter` + ''' + + _check_type(real_time_filter, ['array']) + _check_type(ground_truth_filter, ['array']) + _check_type(window_len, ['int']) + _check_type(window_edge, ['int']) + _check_type(sampling_rate, ['int']) + + _check_array_dimensions(real_time_filter, [(1,), (1, 1)]) + _check_array_dimensions(ground_truth_filter, [(1,), (1, 1)]) + + self.ground_truth_filter: np.ndarray = ground_truth_filter + self.sampling_rate: int = sampling_rate + + self.real_time_filter: np.ndarray = real_time_filter + self.window_len: int = window_len + self.window_edge: int = window_edge + + def _filter_data(self, dsp_filter: np.ndarray | list, + data: np.ndarray | list) -> np.ndarray: + ''' + Forward/backward filters data using an FIR or IIR filter + + Parameters + ---------- + dsp_filter : array_like (n_parameters) | array_like (2, n_parameters) + Filter to apply to data + data : array (n_samples,) + 1D array repreenting window to filter + + ------- + Returns + ------- + filtered_data : array (n_samples,) + Data after filtering + ''' + return signal.filtfilt(dsp_filter, 1.0, data) \ + if len(np.shape(dsp_filter)) == 1 \ + else signal.filtfilt(dsp_filter[0], dsp_filter[1], data) + + def get_phase_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray, + degree=False, + full_circle=False) -> np.ndarray: + ''' + Get the corresponding phase for each trigger sample + + Parameters + ---------- + data : array_like (n_samples) + The ground truth filtered EEG data in array format that the + triggers correspond to + triggers : array_like (n_samples) + The sample number each trigger occurred at within the + given EEG data + degree=False : bool + Whether to convert the phase data into degree. By default this is + value is false + full_circle=False : bool + Whether to express phase values in full circle format (i.e. 0 to + 360 or 0 to :math:`2\\pi`) or the default format + (-180 to 180 or :math:r`-\\pi` to :math:`\\pi`) + + + Returns + ------- + phase_data : array_like (n_samples) + An array containing the phase, in radians, each trigger in the + `triggers` argument occurred at + ''' + _check_type(data, ["array"]) + _check_type(triggers, ["array"]) + _check_type(degree, ["bool"]) + _check_type(full_circle, ["bool"]) + _check_array_dimensions(data, [(1,)]) + _check_array_dimensions(triggers, [(1,)]) + + filtered_data = self._filter_data(self.ground_truth_filter, data) + phase = np.angle(signal.hilbert(filtered_data)[triggers]) + + if degree: + phase = np.rad2deg(phase) + + if full_circle: + phase = phase % 360 if degree else phase % (2*np.pi) + + return phase + + def get_waveforms_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray, + tmin: int | float, + tmax: int | float) -> np.ndarray: + ''' + Get the corresponding waveform window for each trigger sample + + Parameters + ---------- + data : array_like (n_samples) + The ground truth filtered EEG data in array format that the + triggers correspond to + triggers : array_like (n_samples) + The sample number each trigger occurred at within the given + EEG data + tmin : int | float + The time in seconds pre-trigger to include in the waveform window + tmax : int | float + The time in seconds post-trigger to include in the waveform window + + Returns + ------- + waveform_data : array_like (n_waveforms, n_samples) + A 2D array containing the each waveform + ''' + _check_type(data, ["array"]) + _check_type(triggers, ["array"]) + _check_type(tmin, ["int", "float"]) + _check_type(tmax, ["int", "float"]) + _check_array_dimensions(data, [(1,)]) + _check_array_dimensions(triggers, [(1,)]) + + fs = self.sampling_rate + waveform_windows = [] + window_start_sample = int(tmin * fs) + window_end_sample = int(tmax * fs) + + filtered_data = self._filter_data(self.ground_truth_filter, data) + + for trigger_sample in triggers: + waveform_windows.append( + filtered_data[trigger_sample - window_start_sample: + trigger_sample + window_end_sample]) + + return waveform_windows + + def mean_phase_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray, + degree=False) -> float: + ''' + Get the circular mean for phase from trigger samples + + Parameters + ----------- + data : array_like (n_samples) + The raw EEG data array + triggers : array_like (n_samples) + The samples at which triggers occurred + degree : bool + Optional. By default is False. Whether to plot the polar histogram + in degrees or radians + + Returns + --------- + circular_mean_phase : float + The circular mean phase + ''' + phase_data = self.get_phase_from_triggers( + data, triggers, degree=False) + + if degree: + return stats.circmean(np.rad2deg(phase_data) % 360) + else: + return stats.circmean(phase_data % 2*np.pi) + + def std_phase_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray, + degree=False) -> float: + ''' + Get the circular standard deviation for phase from trigger samples + + Parameters + ----------- + data : array_like (n_samples) + The raw EEG data array + triggers : array_like (n_samples) + The samples at which triggers occurred + degree : bool + Optional. By default is False. Whether to plot the polar histogram + in degrees or radians + + Returns + -------- + phase_circular_std : float + The phase data's circular standard deviation + ''' + phase_data = self.get_phase_from_triggers( + data, triggers, degree=False) + + if degree: + return stats.circstd(np.rad2deg(phase_data) % 360) + else: + return stats.circstd(phase_data % 2*np.pi) + + def phase_accuracy_from_triggers(self, + data: np.ndarray[float | int], + triggers: list[int] | np.ndarray[int], + target_phase: float) -> float: + ''' + Compute the accuracy of the triggers to the target phase. An accuracy + of 50% means that the targeting is completely random. An accuracy + below 50% means, the triggers tend to occur more often at the opposite + phase. + + Parameters + ----------- + data : array_like (n_samples) + The raw EEG data array + triggers : array_like (n_samples) + The array containing the samples trigger occurred at + target_phase : float + The phase to target given in radians + + Returns + -------- + accuracy : float + The accuracy in decimal form + ''' + if len(triggers) == 0: + return 0 + + phase = self.get_phase_from_triggers(data, triggers) + + phase_difference = np.exp(phase*1j - target_phase*1j) + mean_degree_difference = np.abs(np.angle( + np.sum(phase_difference), deg=True)) / (len(triggers) * 180) + + return 1 - mean_degree_difference + + def plot_mean_std_waveform_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray, + tmin: float, + tmax: float) -> \ + matplotlib.pyplot.Figure: + ''' + Plot mean waveform and standard deviation highlight from a set of + triggers + + Parameters + ----------- + data : array_like (n_samples) + The raw EEG data array + triggers : array_like (n_samples) + The samples at which triggers occurred + tmin : float + The time in seconds pre-trigger to include in the waveform window + tmax : float + The time in seconds post-trigger to include in the waveform window + + Returns + -------- + waveform_avg_plot: matplotlib.pyplot.Figure + The pyplot figure of the average waveform + ''' + fs = self.sampling_rate + waveforms = self.get_waveforms_from_triggers( + data, triggers, tmin, tmax) + + # Need to find t_trigger + t_trigger = len(waveforms[0]) - (int(fs * tmin)) + + return plot_waveform_average(waveforms, fs, t_trigger) + + def polar_histogram_from_triggers(self, + data: np.ndarray, + triggers: list | np.ndarray) -> \ + matplotlib.pyplot.Figure: + ''' + Plot polar histogram from trigger samples and raw EEG data + + Parameters + ----------- + data : array_like (n_samples) + The raw EEG data array + triggers : array_like (n_samples) + The samples at which triggers occurred + + Returns + -------- + polar_histogram_figure : matplotlib.pyplot.Figure + The pyplot figure of the polar histogram + ''' + phase_data = self.get_phase_from_triggers( + data, triggers, degree=False) + + return plot_polar_histogram(phase_data) diff --git a/EEGPhasePy/estimators/etp.py b/EEGPhasePy/estimators/etp.py index 81a12a5..48d7c27 100644 --- a/EEGPhasePy/estimators/etp.py +++ b/EEGPhasePy/estimators/etp.py @@ -1,163 +1,171 @@ import numpy as np import scipy.signal as signal import scipy.stats as stats +from .estimator import Estimator from ..utils.check import _check_array_dimensions, _check_type -class ETP: - def __init__(self, real_time_filter, ground_truth_filter, sampling_rate, window_len=500, window_edge=40): - ''' - Construct a model for the educated-temporal-prediction (ETP) model of phase estimation (Shirinpour et al., 2020) - - Parameters - ---------- - real_time_filter : array_like shape (n_parameters) | array_like shape (2, n_parameters)\n - Filter parameters for filter to apply for predicting phase (should be constructed with fs of 1000). - Accounts for FIR or IIR filters\n - ground_truth_filter : array_like shape (n_parameters) | array_like shape (2, n_parameters)\n - Filter parameters for filter to use during ETP training (should be constructed with fs of 1000). - Accounts for FIR or IIR filters\n - sampling_rate : int - Original sampling rate of data. As per Shirinpour et al., 2020, data are downsampled to 1kHz\n - window_len : 500 | int - Window length in ms. Optional parameter to specify window length to train ETP with. This should match whatever is used in real-time\n - window_edge : 40 | int - Window edge to remove in ms. Optional parameter to specify edge to remove after applying real_time_filter\n - ''' - self.Tadj = None - - _check_type(real_time_filter, 'array') - _check_type(ground_truth_filter, 'array') - _check_type(window_len, 'int') - _check_type(window_edge, 'int') - _check_type(sampling_rate, 'int') - - _check_array_dimensions(real_time_filter, [(1,), (1, 1)]) - _check_array_dimensions(ground_truth_filter, [(1,), (1,1)]) - - self.ground_truth_filter = ground_truth_filter - self.sampling_rate = sampling_rate - - self.real_time_filter = real_time_filter - self.window_len = window_len - self.window_edge = window_edge - - def _filter_data(self, dsp_filter, data): - ''' - Forward/backward filters data using an FIR or IIR filter - - Parameters - ---------- - dsp_filter : array_like shape (n_parameters) | array_like shape (2, n_parameters) - Filter to apply to data - data : array (n_samples,) - 1D array repreenting window to filter - - ------- - Returns - ------- - filtered_data : array (n_samples,) - Data after filtering - ''' - return signal.filtfilt(dsp_filter, 1.0, data) if hasattr(self.ground_truth_filter, '__len__') \ - else signal.filtfilt(dsp_filter[0], dsp_filter[0], data) - - def fit(self, training_data, min_ipi): - ''' - Estimates ideal Tadj for training data and updates self object with new Tadj. - - Parameters - ---------- - training_data : array (n_samples,) - 1D samples array of channel to estimate Tadj for - min_ipi : int - Minimum inter peak interval, should be the period of the upper frequency of the target band - - ------- - Returns - ------- - self - ''' - _check_type(training_data, "array") - _check_type(min_ipi, "int") - _check_array_dimensions(training_data, [(1,)]) # ensure training data is 1D - - fs = 1000 - resampled_training_data = signal.resample(training_data, int(fs*len(training_data) / self.sampling_rate)) - filtered_data = self._filter_data(self.ground_truth_filter, resampled_training_data) - # ground truth is hard to define for phase estimation, see Zrenner et al., 2020 for a more detailed discussion - ground_truth_phase = np.angle(signal.hilbert(filtered_data), deg=True) % 360 - - training_window_data = filtered_data[:90*fs] - peaks = signal.find_peaks(training_window_data)[0] - inter_peak_interval = np.diff(peaks) - inter_peak_interval = inter_peak_interval[inter_peak_interval > min_ipi] # remove IPIs that are too short - period = round(np.exp(np.nanmean(np.log(inter_peak_interval)))) - - bias = 0 - bias_direction = None - last_mean = None - mean_differences = [] - while True: - triggered_phases = [] - - for i in range(255): - window_i = 90*fs + 350*i - window_data = resampled_training_data[window_i:window_i + self.window_len] - filtered_window = self._filter_data(self.real_time_filter, window_data)[:-self.window_edge] - peaks = signal.find_peaks(filtered_window)[0] - trigger_i = window_i + peaks[-1] + period + bias - triggered_phases.append(ground_truth_phase[trigger_i]) - - mean_phase = stats.circmean(np.deg2rad(triggered_phases)) - - if bias_direction == None: - bias_direction = 1 if np.rad2deg(mean_phase)%360 > 180 else -1 - - if not last_mean == None: - difference_mean = 1 - np.real(np.exp(1j*last_mean - 1j*mean_phase)) - if len(mean_differences) > 0 and mean_differences[-1] < difference_mean: - self.Tadj = period + bias - bias_direction - return self - - mean_differences.append(difference_mean) - last_mean = mean_phase - - bias += bias_direction - - def predict(self, data, target_phase): - ''' - Predicts the next sample target phase occurs at - - Parameters - ---------- - data : array_like (n_samples) - Window of EEG data from target channel to predict from - target_phase : float - Target phase to predict in radians - - ------- - Returns - ------- - relative_next_phase : int - Next sample target phase occurs, defined relative to window start +class ETP(Estimator): ''' + The Educated Temporal Prediction (ETP) model :cite:t:`Shirinpour2020-ef`. + If you use this class please cite :cite:t:`Shirinpour2020-ef` - _check_type(data, "array") - _check_type(target_phase, "float") + ETP was first described by :cite:t:`Shirinpour2020-ef`. A more in depth + description can be found there. - _check_array_dimensions(data, [(1,)]) + Briefly, this EEG phase estimation model works by estimating the average + inter-peak interval for the target EEG band. In real-time, ETP predicts + the next time the target phase will occur at (:math:`T_{adj}`) + ''' - fs = 1000 - downsampled_window = signal.resample(data, int(fs*len(data) / self.sampling_rate)) - filtered_window = self._filter_data(self.real_time_filter, downsampled_window) - peaks = signal.find_peaks(filtered_window)[0] + def __init__(self, + real_time_filter: np.ndarray, + ground_truth_filter: np.ndarray, + sampling_rate: int, + window_len: int = 500, + window_edge: int = 40): + ''' + Construct a model for the educated-temporal-prediction (ETP) model of + phase estimation :cite:t:`Shirinpour2020-ef` + + Parameters + ---------- + real_time_filter + Filter parameters for filter to apply for predicting phase. + Accounts for FIR or IIR filters + ground_truth_filter + Filter parameters for identifying "true" phase. Used + to fit ETP. See :cite:t:Zrenner2020-zb for an + in-depth discussion on "true" phase. Accounts for FIR + or IIR filters + sampling_rate : int + Original sampling rate of data. + window_len : 500 | int + Window length in ms. Optional parameter to specify window length + to train ETP with. This should match whatever is used in real-time + window_edge : 40 | int + Window edge to remove in ms. Optional parameter to specify edge to + remove after applying real_time_filter + ''' + super().__init__(real_time_filter, ground_truth_filter, + sampling_rate, window_len, window_edge) + self.tadj: int + + def fit(self, training_data: np.ndarray | list, min_ipi: int): + ''' + Estimates ideal tadj for training data and updates self object with + new tadj. + + In cases of high phase instability, ETP may not converge onto the + ideal tadj. Using a more aggressive ground-truth filter typically helps + + Parameters + ---------- + training_data : array (n_samples,) + 1D samples array of channel to estimate tadj for + min_ipi : int + Minimum inter peak interval in number of samples, should be the + period of the upper frequency of the target band + + + Returns + ------- + self + ''' + _check_type(training_data, ["array"]) + _check_type(min_ipi, ["int"]) + # ensure training data is 1D + _check_array_dimensions(training_data, [(1,)]) + + fs = self.sampling_rate + filtered_data = self._filter_data( + self.ground_truth_filter, training_data) + # ground truth is hard to define for phase estimation, see Zrenner + # et al., 2020 for a more detailed discussion + ground_truth_phase = np.angle( + signal.hilbert(filtered_data), deg=True) % 360 + + training_window_data = filtered_data[:90*fs] + peaks = signal.find_peaks(training_window_data)[0] + inter_peak_interval = np.diff(peaks) + # remove IPIs that are too short + inter_peak_interval = inter_peak_interval[inter_peak_interval > + min_ipi] + period = round(np.exp(np.nanmean(np.log(inter_peak_interval)))) + + bias = 0 + bias_direction = None + last_mean = None + mean_differences = [] + + window_len = int((self.window_len / 1000) * fs) + + while True: + triggered_phases = [] + + for i in range(255): + window_i = 90*fs + 350*i + window_data = training_data[window_i:window_i + window_len] + filtered_window = self._filter_data( + self.real_time_filter, + window_data)[:-self.window_edge] + + peaks = signal.find_peaks(filtered_window)[0] + trigger_i = window_i + peaks[-1] + period + bias + triggered_phases.append(ground_truth_phase[trigger_i]) + + mean_phase = stats.circmean(np.deg2rad(triggered_phases)) + + if bias_direction is None: + bias_direction = 1 if np.rad2deg( + mean_phase) % 360 > 180 else -1 + + if last_mean is not None: + difference_mean = 1 - \ + np.real(np.exp(1j*last_mean - 1j*mean_phase)) + if len(mean_differences) > 0 and \ + mean_differences[-1] < difference_mean: + self.tadj = period + bias - bias_direction + return self + + mean_differences.append(difference_mean) + last_mean = mean_phase + + bias += bias_direction + + def predict(self, data: np.ndarray | list, target_phase: int | float) \ + -> np.int64: + ''' + Predicts the next sample target phase occurs at + + Parameters + ---------- + data : array_like (n_samples) + Window of EEG data from target channel to predict from + target_phase : int | float + Target phase to predict in radians + + + Returns + ------- + relative_next_phase : int + Next sample target phase occurs, defined relative to window start + ''' + + _check_type(data, ["array"]) + _check_type(target_phase, ["float", "int"]) + + _check_array_dimensions(data, [(1,)]) + + filtered_window = self._filter_data(self.real_time_filter, data) + peaks = signal.find_peaks(filtered_window)[0] - if len(peaks) == 0: - raise RuntimeError("No peaks could be found in the window passed into the `predict` method") + if len(peaks) == 0: + raise RuntimeError( + "No peaks could be found in the window passed into the \ + `predict` method") - Tadj = self.Tadj * target_phase/2*np.pi + tadj: int = int(self.tadj * target_phase/(2*np.pi)) - return peaks[-1] + Tadj - + return peaks[-1] + tadj diff --git a/EEGPhasePy/estimators/phastimate.py b/EEGPhasePy/estimators/phastimate.py new file mode 100644 index 0000000..5ba8eaa --- /dev/null +++ b/EEGPhasePy/estimators/phastimate.py @@ -0,0 +1,331 @@ +import numpy as np +import scipy.signal as signal +from typing import Literal, Callable +from statsmodels.regression.linear_model import yule_walker +import pygad +from bayes_opt import BayesianOptimization + +from .estimator import Estimator +from ..utils.check import _check_array_dimensions, _check_type + + +class PHASTIMATE(Estimator): + ''' + Class for the PHASTIMATE algorithm created by :cite:t:`Zrenner2020-zb` + + If you use this class, please cite :cite:t:`Zrenner2020-zb` + + PHASTIMATE uses an autoregressive approach to fill in data impacted by + filter edge effects then applys a hilbert transform to extract the phase + at the current time. It is best used in real-time environments where + minimal delay in your system (particularly EEG -> computer) is guarenteed, + because forward forecasting more than a couple of milliseconds beyond t = 0 + typically results in inaccurate phase predictions. This implementation does + not support forward forecasting beyond t = 0 + + PHASTIMATE also comes with genetic optimization for the phase estimation + algorithms including order and window_edge. Our implementation of + PHASTIMATE does not include optimization over filter parameters. + ''' + + def __init__(self, real_time_filter: np.ndarray, + ground_truth_filter: np.ndarray, + sampling_rate: int, + window_len=500, + window_edge=40, + ar_order=30): + ''' + Constructor for PHASTIMATE class + + Parameters + ---------- + real_time_filter + Filter parameters for filter to apply for predicting phase. + Accounts for FIR or IIR filters + ground_truth_filter + Filter parameters for identifying "true" phase. See + :cite:t:Zrenner2020-zb for an in-depth discussion + on "true" phase. Accounts for FIR or IIR filters + sampling_rate : int + Original sampling rate of data. + window_len : int + Window length in ms. Optional parameter to specify window length + to run pseudo-real-time simulations or to train `window_len` + dependent models with. This should match whatever is used in + real-time + window_edge : int + Window edge to remove in ms. Optional parameter to specify edge to + remove after applying `real_time_filter` + ar_order : int + The order for the auto-regressive model + ''' + super().__init__(real_time_filter, ground_truth_filter, + sampling_rate, window_len, window_edge) + + _check_type(ar_order, ['int']) + self.ar_order = ar_order + + def _ar_forecast(self, data: np.ndarray | list, + ar_params: np.ndarray | list, steps=10) -> np.ndarray: + """ + Forecast future values from an AR process. + + Parameters + ---------- + data : array-like + Time series data. + ar_params : array-like + AR coefficients (phi_1, ..., phi_p). + steps : int + Number of steps to forecast. + + Returns + -------- + forecasted_data : ndarray + The forecasted series + """ + p = len(ar_params) + forecast = list(data[-p:]) # start with the last p values + + for _ in range(steps): + new_val = np.dot(ar_params, forecast[-p:][::-1]) # weighted sum + forecast.append(new_val) + np.seterr(invalid='ignore') + np.seterr(over='ignore') + + return np.array(forecast[p:]) + + def _generate_black_box_function(self, + data: np.ndarray[float] | list[float]): + ''' + Generates a function that runs a pseudo-real-time simulation of the + autoregressive model on `data` and computes accuracy to peaks. + The pseudo-real-time simulation will use the current window_len + and will use a step of 0.01s. When the target phase is detected, + the window will jump by 1 x its length + + Parameters + ----------- + data : np.ndarray[float] | list[float] + The (n_samples,) array containing the unfiltered EEG data to use + for simulations + + Returns + -------- + black_box_function : function + The black box function that simulates the AR model with provided + parameters and outputs accuracy + ''' + _check_type(data, ['array']) + _check_array_dimensions(data, [(1,)]) + + def black_box_function(edge: float, ar_order: float) -> float: + self.window_edge = int(edge) + self.ar_order = int(ar_order) + + fs = self.sampling_rate + window_i = 0 + window_len = self.window_len + window_step = int(0.01*fs) + + triggers = [] + + while window_i + window_len < len(data): + window_data = data[window_i:window_i + window_len] + if self.predict(window_data, 0, 10): + triggers.append(window_i + window_len) + window_i += window_len + + window_i += window_step + + accuracy = self.phase_accuracy_from_triggers(data, triggers, 0) + + return accuracy + + return black_box_function + + def _generate_fitness_function(self, + data: np.ndarray[float] | list[float]) \ + -> Callable[..., float]: + ''' + Generates a function that runs a pseudo-real-time simulation of the + autoregressive model on `data` and computes accuracy to peaks. + The pseudo-real-time simulation will use provided window_len and will + use a step of 0.05s. When the target phase is detected, + the window will jump by 1 x its length + + Parameters + ----------- + data : np.ndarray[float] | list[float] + The (n_samples,) array containing the unfiltered EEG data to use + for simulations + + Returns + -------- + fitness_function : function + The fitness function that simulates the AR model with provided + parameters and outputs accuracy + ''' + _check_type(data, ['array']) + _check_array_dimensions(data, [(1,)]) + + def fitness_function(ga_instance: pygad.GA, + solution: list[int], + solution_i: int) -> float: + _solution = [int(sol) for sol in solution] + + self.window_edge = _solution[0] + self.ar_order = _solution[1] + + fs = self.sampling_rate + window_i = 0 + window_len = self.window_len + window_step = int(0.05*fs) + + triggers = [] + + while window_i + window_len < len(data): + window_data = data[window_i:window_i + window_len] + if self.predict(window_data, 0, 5): + triggers.append(window_i + window_len) + window_i += window_len + + window_i += window_step + + accuracy = self.phase_accuracy_from_triggers(data, triggers, 0) + + return accuracy + + return fitness_function + + def optimize_parameters(self, + data: np.ndarray[float] | list[float], + method: Literal["bayesian", "genetic"] + = "bayesian") -> None: + ''' + Perform optimization over the amount of edge removed following + filtering and autoregressive order. This method will update the + properties of the current PHASTIMATE instance + instance. + + Parameters + ----------- + data : np.ndarray[float] | list[float] + The (n_samples,) array containing the unfiltered EEG data to use + for optimization + method : "bayesian" | "genetic" + Whether to perform bayesian optimization or genetic optimization + ''' + if method == "bayesian": + parameter_bounds = { + "edge": [5, float(np.min([60, self.window_len / 8]))], + "ar_order": [1.0, 0.1 * self.sampling_rate] + } + optimizer = BayesianOptimization( + self._generate_black_box_function(data), + pbounds=parameter_bounds) + + print( + "[PHASTIMATE Bayesian Optimization] Starting bayesian" + + " optimization....") + optimizer.maximize(10, n_iter=100) + print("[PHASTIMATE Bayesian Optimization] Optimization complete") + + print("[PHASTIMATE Bayesian Optimization] Accuracy of best " + + "parameters", + optimizer.max['target']) + self.window_edge = int(optimizer.max["params"]["edge"]) + self.ar_order = int(optimizer.max["params"]["ar_order"]) + else: + gene_space = [ + list(np.arange(10, np.min( + [80, self.window_len / 8]), 5, dtype=int)), + list(np.arange(1, int(0.1 * self.sampling_rate), dtype=int)) + ] + num_generations = 20 + num_parents_mating = 4 + + fitness_function = self._generate_fitness_function(data) + + sol_per_pop = 5 + num_genes = len(gene_space) + + parent_selection_type = "rws" + + def on_gen(ga_instance) -> None: + print("[PHASTIMATE Genetic Optimization] Generation : ", + ga_instance.generations_completed) + print("[PHASTIMATE Genetic Optimization] Accuracy of the best" + + " solution :", ga_instance.best_solution()[1]) + + print("[PHASTIMATE Genetic Optimization] Starting genetic " + + "optimization") + ga_instance = pygad.GA(num_generations=num_generations, + num_parents_mating=num_parents_mating, + fitness_func=fitness_function, + sol_per_pop=sol_per_pop, + num_genes=num_genes, + gene_space=gene_space, + parent_selection_type=parent_selection_type, + on_generation=on_gen, + mutation_percent_genes=50) + ga_instance.run() + + solution, solution_fitness, solution_i = \ + ga_instance.best_solution() + print("[PHASTIMATE Genetic Optimization] Optimization complete") + print( + "[PHASTIMATE Genetic Optimization]" + + "Accuracy of best parameters: " + str(solution_fitness)) + + self.window_edge = int(solution[0]) + self.ar_order = int(solution[1]) + + def predict(self, + data: np.ndarray[float] | list[float], + target_phase: float | int, + tolerance: float | int = 5) -> bool: + ''' + Predict whether the phase at the current time matches the target phase + + Parameters + ----------- + data : np.ndarray[float] | list[float] + The (n_samples,) array containing the unfiltered EEG data in the + current window + target_phase : float | int + The phase in degrees that the current phase should match + tolerance : float | int + The tolerance between the current phase and target phase. The + target phase has to be within `tolerance` degrees for this + method to return `True` + + Returns + -------- + current_phase_matches_target : bool + Whether the current phase matches the target phase with a given + tolerance + + ''' + _check_type(data, ['array']) + _check_type(target_phase, ['int', 'float']) + _check_type(tolerance, ['int', 'float']) + _check_array_dimensions(data, [(1,)]) + + edge = int((self.window_edge / 1000) * self.sampling_rate) + filtered_data = self._filter_data( + self.real_time_filter, data)[edge:-edge] + + ar_params, _ = yule_walker(filtered_data, self.ar_order) + forecasted_data = self._ar_forecast(filtered_data, ar_params, 2*edge) + full_data = np.concatenate([filtered_data, forecasted_data]) + + analytic_signal = signal.hilbert(full_data) + phase_t0 = np.angle(analytic_signal[-edge], deg=True) % 360 + + if target_phase % 360 - tolerance < 0: + return np.isclose(phase_t0, target_phase % 360, atol=tolerance) or\ + np.isclose(phase_t0, 360 - (target_phase % + 360), atol=tolerance) + else: + return np.isclose(phase_t0, target_phase % 360, atol=tolerance) diff --git a/EEGPhasePy/tests/__init__.py b/EEGPhasePy/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/EEGPhasePy/tests/__pycache__/__init__.cpython-39.pyc b/EEGPhasePy/tests/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 7942cb9..0000000 Binary files a/EEGPhasePy/tests/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/tests/__pycache__/test_etp_estimator.cpython-39-pytest-8.3.4.pyc b/EEGPhasePy/tests/__pycache__/test_etp_estimator.cpython-39-pytest-8.3.4.pyc deleted file mode 100644 index 393e10e..0000000 Binary files a/EEGPhasePy/tests/__pycache__/test_etp_estimator.cpython-39-pytest-8.3.4.pyc and /dev/null differ diff --git a/EEGPhasePy/tests/test_etp_estimator.py b/EEGPhasePy/tests/test_etp_estimator.py deleted file mode 100644 index 7ddb7ce..0000000 --- a/EEGPhasePy/tests/test_etp_estimator.py +++ /dev/null @@ -1,131 +0,0 @@ -import pytest -import numpy as np -import scipy.signal as signal - - -from ..estimators import ETP - -real_time_filter_fir = signal.firwin(128, [8, 13], fs=2048, pass_zero=False) -real_time_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) - -ground_truth_filter_fir = signal.firwin(300, [8, 13], fs=2048, pass_zero=False) -ground_truth_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) - -param_to_prop_map = ["real_time_filter", "ground_truth_filter", "sampling_rate", "window_len", "window_edge"] - -def construct_etp_with_paramset(param_set): - ''' - Constructs the ETP class with passed in parameters and returns the constructed ETP object - - Parameters - ---------- - - param_set : array - Array of parameters in order to pass into ETP - - ------- - Returns - ------- - - ETP : the constructed ETP object - ''' - - if len(param_set) == 3: - return ETP(param_set[0], param_set[1], param_set[2]) - else: - return ETP(param_set[0], param_set[1], param_set[2], window_len=param_set[3], window_edge=param_set[4]) - -def construct_default_etp(): - ''' - Constructs an ETP class with default parameters - - ------- - Returns - ------- - - etp : ETP object - ''' - return ETP(real_time_filter_fir, ground_truth_filter_fir, 2048) - -def test_etp_construct(): - # check ETP fails with TypeError if invalid type passed in - # check ETP fails with ValueError if filter parameter array of wrong dimension - # check ETP constructs correctly with the same values - etp_parameter_sets = [ - [real_time_filter_fir, ground_truth_filter_fir, 2048], # pass - [real_time_filter_iir, ground_truth_filter_iir, 2048], # pass - [real_time_filter_fir, ground_truth_filter_fir, 2048.5], # fail - [real_time_filter_fir, 1, 2048], # fail - [1, ground_truth_filter_fir, 2048], # fail - [real_time_filter_fir, ground_truth_filter_fir, 2048, 1000, 80], # pass - [real_time_filter_fir, ground_truth_filter_fir, 2048, 1200.2, 60], # fail - [real_time_filter_fir, ground_truth_filter_fir, 2048, 1000, 74.3], # fail - [np.zeros((1, 1, 1)), ground_truth_filter_fir, 2048], # fail - [real_time_filter_iir, np.zeros((1, 1, 1)), 2048], # fail - ] - etp_parameter_outputs = [ - False, - False, - [TypeError, "Value must be an int type"], - [TypeError, "Value must be an array type"], - [TypeError, "Value must be an array type"], - False, - [TypeError, "Value must be an int type"], - [TypeError, "Value must be an int type"], - [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D 2D"], - [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D 2D"] - ] - - for i, param_set in enumerate(etp_parameter_sets): - if not type(etp_parameter_outputs[i]) == bool: - with pytest.raises(etp_parameter_outputs[i][0], match=r"" + etp_parameter_outputs[i][1] +""): - construct_etp_with_paramset(param_set) - else: - etp = construct_etp_with_paramset(param_set) - for j, prop in enumerate(param_to_prop_map[:len(param_set)]): - if hasattr(param_set[j], '__len__'): - assert (np.array(getattr(etp, prop)) == np.array(param_set[j])).all() - else: - assert getattr(etp, prop) == param_set[j] - -def test_etp_fit(): - # should fail if training data not 1D - # should fail if min_ipi not int - # should fail if training_data not arr - - etp_fit_parameters = [ - [np.sin(78.5*np.arange(0, 200, 1/2048)), 63], # pass - [1, 63], # fail - [np.zeros((100, 2)), 63], # fail - [np.zeros(100), 62.5] # fail - ] - - etp_fit_parameter_outputs = [ - False, - [TypeError, "Value must be an array type"], - [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], - [TypeError, "Value must be an int type"] - ] - - for i, param_set in enumerate(etp_fit_parameters): - if not type(etp_fit_parameter_outputs[i]) == bool: - with pytest.raises(etp_fit_parameter_outputs[i][0], match=r"" + etp_fit_parameter_outputs[i][1] +""): - etp = construct_default_etp() - etp.fit(param_set[0], param_set[1]) - else: - etp = construct_default_etp() - etp.fit(param_set[0], param_set[1]) - assert etp.Tadj == pytest.approx(80, 2) # check if Tadj roughly equal to pre-defined period (80ms) - -def test_etp_predict(): - # Load test files - # Test if predict successfuly predicts with no errors - # Test if predict fails when given non-array - # Test if predict fails when given 2D array - # Test if predict fails when given non-float target_phase - pass - -def test_etp_predict_performance(): - # Load test files - # Test if predict performs as expected on test files mean == mean and std == std for phase - pass \ No newline at end of file diff --git a/EEGPhasePy/utils/__pycache__/__init__.cpython-39.pyc b/EEGPhasePy/utils/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 1f587ce..0000000 Binary files a/EEGPhasePy/utils/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/utils/__pycache__/check.cpython-39.pyc b/EEGPhasePy/utils/__pycache__/check.cpython-39.pyc deleted file mode 100644 index a70728c..0000000 Binary files a/EEGPhasePy/utils/__pycache__/check.cpython-39.pyc and /dev/null differ diff --git a/EEGPhasePy/utils/check.py b/EEGPhasePy/utils/check.py index 3543827..e090320 100644 --- a/EEGPhasePy/utils/check.py +++ b/EEGPhasePy/utils/check.py @@ -1,62 +1,80 @@ import numpy as np + def _is_array(value): - ''' - Checks if a value is an array. \n - - Parameters - ---------- - value : any - Value to check - - ------- - Returns - ------- - bool - True if value is an array; False otherwise - ''' - return hasattr(value, '__len__') + ''' + Checks if a value is an array. \n + + Parameters + ---------- + value : any + Value to check + + Returns + ------- + bool + True if value is an array; False otherwise + ''' + return hasattr(value, '__len__') + def _check_array_dimensions(test_array, target_shape_structures): - ''' - Checks if two arrays share the same dimensions (i.e. both are 1D, 2D, 3D, etc). Raises value error if - array doesn't match target dimension - - Parameters - ---------- - - test_array : array_like - Array to test dimensions of - target_shape_structure : array_like - Array representing the target structures - ''' - np_test_array = np.array(test_array) - failed = True - for struct in target_shape_structures: - np_target_shape_structure = np.array(struct) - if len(np_test_array.shape) == len(np_target_shape_structure): - failed = False - - if failed: - raise ValueError("The provided array has the wrong dimension. Arrays can have the following dimensions:" + "".join([" %dD" % len(_struct) for _struct in target_shape_structures])) - -def _check_type(value, types): - ''' - Checks if value matches a specific type(s). Raises type error if value doesn't match type - - Parameters - ---------- - - value : any - Value to be checked - types : array of "array" | "int" | "float" - Type value should match. - ''' - - if types == "array" and not _is_array(value): - raise TypeError("Value must be an array type") - elif types == "int" and not isinstance(value, int): - raise TypeError("Value must be an int type") - elif types == "float" and not isinstance(value, float): - raise TypeError("Value must be a float type") - + ''' + Checks if two arrays share the same dimensions (i.e. both are 1D, 2D, 3D, + etc). Raises value error if array doesn't match target dimension + + Parameters + ---------- + test_array : array_like + Array to test dimensions of + target_shape_structure : array_like + Array representing the target structures + ''' + failed = True + for struct in target_shape_structures: + np_target_shape_structure = np.array(struct) + if len(np.shape(test_array)) == len(np_target_shape_structure): + failed = False + + if failed: + raise ValueError("The provided array has the wrong dimension. " + + "Arrays can have the following dimensions:" + + "".join([" %dD" % len(_struct) + for _struct in target_shape_structures])) + + +def _check_type(value: any, types: list): + ''' + Checks if value matches a specific type(s). Raises type error if value + doesn't match type + + Parameters + ---------- + + value : any + Value to be checked + types : array of "array" | "int" | "float" | "bool" + Type value should match. + ''' + + for i, type in enumerate(types): + if type == "array" and _is_array(value): + break + elif type == "int" and isinstance(value, int): + break + elif type == "float" and isinstance(value, float): + break + elif type == "bool" and isinstance(value, bool): + break + elif i + 1 >= len(types): + if len(types) == 1: + type_to_message_map = { + "array": "an array", + "int": "an int", + "float": "a float", + "bool": "a bool" + } + raise TypeError("Value must be " + + type_to_message_map[type] + " type") + else: + raise TypeError("Value must be one of: " + ' or '.join(types)) diff --git a/EEGPhasePy/viz.py b/EEGPhasePy/viz.py new file mode 100644 index 0000000..cad04e8 --- /dev/null +++ b/EEGPhasePy/viz.py @@ -0,0 +1,108 @@ +''' +EEGPhasePy visualization module + +The visualization module consists of helper functions for plotting +phase histograms and pre-trigger/post-trigger waveforms +''' +import numpy as np +import matplotlib.pyplot as plt + +from .utils.check import _check_type, _check_array_dimensions + + +def plot_polar_histogram(phase_data: np.ndarray[float | int], + bin_width=22.5) -> plt.Figure: + ''' + Plot the polar histogram for an array of phases + + Parameters + ----------- + phase_data : ndarray[float | int] + An array of floats containing the phases to construct a histogram out + of. Must be in radians + bar_width : float | int + The width of the bin in degrees + + Returns + -------- + figure : matplotlib.pyplot.Figure + The figure object for the polar histogram + ''' + _check_type(phase_data, ['array']) + _check_type(bin_width, ['int', 'float']) + + _check_array_dimensions(phase_data, [(1,)]) + + fig = plt.figure() + + deg_phase = np.rad2deg(phase_data) + degrees_full_circle = [phase % 360 for phase in deg_phase] + + bin_size = bin_width + a, b = np.histogram(degrees_full_circle, + bins=np.arange(0, 360+bin_size, bin_size)) + centers = np.deg2rad(np.ediff1d(b)//2 + b[:-1]) + + ax = fig.add_subplot(111, projection='polar') + ax.bar(centers, a, width=np.deg2rad(bin_size), bottom=0.0, alpha=0.8, + facecolor=(3/255, 148/255, 252/255), + edgecolor=(0/255, 98/255, 255/255), + linewidth=2) + ax.set_theta_zero_location("N") + ax.set_theta_direction(-1) + + return fig + + +def plot_waveform_average(waveforms: np.ndarray[float | int], + fs: int, + t_trigger: int, + show_std=True) -> plt.Figure: + ''' + Plot the average waveform and standard deviation as a highlight around the + mean waveform + + Parameters + ---------- + waveforms : array_like (n_trigger_segments, n_time) + A 2D array containing some pre-trigger and post-trigger waveform + content + fs : int + The sampling rate of the data + t_trigger : int + The sample number that corresponds to `t = 0` + show_std : bool + Defaults to True. Whether to show the standard deviation highlight + + Returns + -------- + waveform_figure : matplotlib.pyplot.Figure + The waveform figure object + ''' + _check_type(waveforms, ['array']) + _check_type(show_std, ['bool']) + + _check_array_dimensions(waveforms, [(1, 1)]) + + waveform_data = None + if np.max(waveforms) < 5e-5: + waveform_data = waveforms * 1e6 + else: + waveform_data = waveforms + + mean_waveform = np.mean(waveform_data, axis=0) + waveform_std = np.std(waveform_data, axis=0) + + time_start = (len(mean_waveform) - t_trigger) / fs + time_end = (len(mean_waveform) - fs*time_start) / fs + time_data = np.arange(-time_start, time_end, 1/fs) + + fig = plt.figure() + ax = fig.add_subplot(111) + ax.plot(time_data, mean_waveform, color=(0/255, 98/255, 255/255), alpha=1) + ax.fill_between(time_data, mean_waveform - waveform_std, mean_waveform + + waveform_std, alpha=0.2, color=(0/255, 98/255, 255/255)) + ax.set_xlabel("Time (s)") + ax.set_ylabel("Amplitude (μV)") + + return fig diff --git a/EEGPhasePy/viz/__init__.py b/EEGPhasePy/viz/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/LISCENCE.md b/LISCENCE.md new file mode 100644 index 0000000..8dc8b02 --- /dev/null +++ b/LISCENCE.md @@ -0,0 +1,11 @@ +Copyright 2025 EEGPhasePy Authors + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..0e73d88 --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,3 @@ +build +sg_execution_times.rst +gallery_examples \ No newline at end of file diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d0c3cbf --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/examples/GALLERY_HEADER.rst b/docs/examples/GALLERY_HEADER.rst new file mode 100644 index 0000000..2b56f9d --- /dev/null +++ b/docs/examples/GALLERY_HEADER.rst @@ -0,0 +1,3 @@ +Gallery +---------- +These are some examples for how to use ``EEGPhasePy`` \ No newline at end of file diff --git a/docs/examples/ar-bayesian-optimization.py b/docs/examples/ar-bayesian-optimization.py new file mode 100644 index 0000000..78d89ce --- /dev/null +++ b/docs/examples/ar-bayesian-optimization.py @@ -0,0 +1,74 @@ +''' +Bayesian optimization of PHASTIMATE parameters +================================================ +''' + +# %% [markdown] +# It may be ideal in certain circumstances to systematically optimize the parameters involved in the PHASTIMATE toolbox. For instance, you may be dealing with a population known to contain a large degree of inter-individual variability in the SNR of your target frequency band. Thus, the PHASTIMATE parameters that work for one individual may not work well for another. You can optptimize the `window_edge` and `ar_order` parameters of PHASTIMATE to tailor the algorithm on an inter-individual basis. +# +# Below we showcase how this can be done in EEGPhasePy using Bayesian optimization. If you use this algorithm please cite the BayesianOptimization library (https://github.com/bayesian-optimization/BayesianOptimization) + +# %% +from EEGPhasePy.estimators import PHASTIMATE +import sys +import numpy as np +import scipy.signal as signal + +sys.path.append("../../..") + + +# %% [markdown] +# Generate the simulated training and testing EEG data + +# %% +fs = 2000 +time_data = np.arange(0, 90, 1/fs) + +training_signal_clean = np.sin(2 * np.pi * 10 * time_data) +training_signal = training_signal_clean + \ + np.random.normal(0, 2, len(time_data)) + +testing_signal_clean = np.sin(2 * np.pi * 10 * time_data) +testing_signal = testing_signal_clean + np.random.normal(0, 2, len(time_data)) + +# %% [markdown] +# Define the real-time and ground-truth alpha bandpass filters. +# +# The real-time filter should be lower order since it will be applied to a window with fewer samples than the ground truth filter. The ground truth filter is what will be used to extract the phase for phase stats (i.e. accuracy, mean and std of phase) as well as to generate the polar histogram and trigger waveform plots + +# %% +rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) +gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +# %% [markdown] +# Next, we will instantiate PHASTIMATE. + +# %% +phastimate = PHASTIMATE(rt_filter, gt_filter, fs) + +# %% [markdown] +# Now we will run the bayesian optimization algorithm using the `optimize_parameters` method with the `method` argument being set to bayesian. This method will update the `window_edge` and `ar_order` that result in the phase estimation accuracy + +# %% +phastimate.optimize_parameters(training_signal, 'bayesian') + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(testing_signal): + window = testing_signal[window_i:window_i + window_len] + + if phastimate.predict(window, 0, 15): + triggers.append(window_i + window_len) + + window_i += window_step + +# sphinx_gallery_thumbnail_number = 1 +polar_hist_fig = phastimate.polar_histogram_from_triggers( + testing_signal, triggers) +waveform_fig = phastimate.plot_mean_std_waveform_from_triggers( + testing_signal, triggers, tmin=0.1, tmax=0.1) diff --git a/docs/examples/ar-genetic-optimization.py b/docs/examples/ar-genetic-optimization.py new file mode 100644 index 0000000..3a37e69 --- /dev/null +++ b/docs/examples/ar-genetic-optimization.py @@ -0,0 +1,74 @@ +''' +Genetic optimization of PHASTIMATE parameters +================================================ +''' + + +# %% [markdown] +# It may be ideal in certain circumstances to systematically optimize the parameters involved in the PHASTIMATE toolbox. For instance, you may be dealing with a population known to contain a large degree of inter-individual variability in the SNR of your target frequency band. Thus, the PHASTIMATE parameters that work for one individual may not work well for another. You can optptimize the `window_edge` and `ar_order` parameters of PHASTIMATE to tailor the algorithm on an inter-individual basis. +# +# Below we showcase how this can be done in EEGPhasePy using genetic optimization. If you use this algorithm please cite PyGAD (https://link.springer.com/article/10.1007/s11042-023-17167-y) + +# %% +from EEGPhasePy.estimators import PHASTIMATE +import sys +import numpy as np +import scipy.signal as signal + +sys.path.append("../../..") + + +# %% [markdown] +# Generate the simulated training and testing EEG data + +# %% +fs = 2000 +time_data = np.arange(0, 90, 1/fs) + +training_signal_clean = np.sin(2 * np.pi * 10 * time_data) +training_signal = training_signal_clean + \ + np.random.normal(0, 2, len(time_data)) + +testing_signal_clean = np.sin(2 * np.pi * 10 * time_data) +testing_signal = testing_signal_clean + np.random.normal(0, 2, len(time_data)) + +# %% [markdown] +# Define the real-time and ground-truth alpha bandpass filters. +# +# The real-time filter should be lower order since it will be applied to a window with fewer samples than the ground truth filter. The ground truth filter is what will be used to extract the phase for phase stats (i.e. accuracy, mean and std of phase) as well as to generate the polar histogram and trigger waveform plots + +# %% +rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) +gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +# %% [markdown] +# Next, we will instantiate PHASTIMATE. + +# %% +phastimate = PHASTIMATE(rt_filter, gt_filter, fs) + +# %% [markdown] +# Now we will run the genetic optimization algorithm using the `optimize_parameters` method with the `method` argument being set to genetic. This method will update the `window_edge` and `ar_order` that result in the phase estimation accuracy + +# %% +phastimate.optimize_parameters(training_signal, 'genetic') + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(testing_signal): + window = testing_signal[window_i:window_i + window_len] + + if phastimate.predict(window, 0, 15): + triggers.append(window_i + window_len) + + window_i += window_step + +polar_hist_fig = phastimate.polar_histogram_from_triggers( + testing_signal, triggers) +waveform_fig = phastimate.plot_mean_std_waveform_from_triggers( + testing_signal, triggers, tmin=0.1, tmax=0.1) diff --git a/docs/examples/ar-phase-estimation.py b/docs/examples/ar-phase-estimation.py new file mode 100644 index 0000000..89d8fa7 --- /dev/null +++ b/docs/examples/ar-phase-estimation.py @@ -0,0 +1,144 @@ +''' +Alpha phase estimation using PHASTIMATE +================================================ +''' + + +# %% +from EEGPhasePy.estimators import PHASTIMATE +import sys +import numpy as np +import scipy.signal as signal + +sys.path.append("../../..") + + +# %% [markdown] +# Generate the simulated training and testing EEG data + +# %% +fs = 2000 +time_data = np.arange(0, 200, 1/fs) + +signal_clean = np.sin(2 * np.pi * 10 * time_data) +signal_noisy = signal_clean + np.random.normal(0, 2, len(time_data)) + +# %% [markdown] +# Define the real-time and ground-truth alpha bandpass filters. +# +# The real-time filter should be lower order since it will be applied to a window with fewer samples than the ground truth filter. The ground truth filter is what will be used to extract the phase for phase stats (i.e. accuracy, mean and std of phase) as well as to generate the polar histogram and trigger waveform plots + +# %% +rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) +gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +# %% [markdown] +# Next, we will instantiate PHASTIMATE. +# +# Unlike ETP, PHASTIMATE doesn't necessarily require training data. With the right parameters +# it can work out of the box. However, you may benefit from optimizing the `window_edge` and `ar_order` parameters in the class, especially +# when there is a variability in SNR across participants. + +# %% +phastimate = PHASTIMATE(rt_filter, gt_filter, fs, window_edge=35, ar_order=15) + +# %% [markdown] +# Now, we will run a pseudo-real-time simulation using the testing signal. Here, we use the `predict` method of the `phastimate` +# object. `predict` works by returning a boolean value about whether the current phase is your target phase. A tolerance value may also be +# passed in. By default tolerance is 5 degrees. Tolerance indicates how close the current phase must be to the target phase for `predict` +# to return `True`. + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(signal_noisy): + window = signal_noisy[window_i:window_i + window_len] + + if phastimate.predict(window, 0, 15): + triggers.append(window_i + window_len) + + window_i += window_step + +# %% +polar_hist_fig = phastimate.polar_histogram_from_triggers( + signal_noisy, triggers) +waveform_fig = phastimate.plot_mean_std_waveform_from_triggers( + signal_noisy, triggers, tmin=0.1, tmax=0.1) + +phase_standard_deviation = phastimate.std_phase_from_triggers( + signal_noisy, triggers, degree=True) +phase_mean = phastimate.mean_phase_from_triggers( + signal_noisy, triggers, degree=True) +accuracy = phastimate.phase_accuracy_from_triggers(signal_noisy, triggers, 0) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) + +# %% [markdown] +# As with ETP, PHASTIMATE can also quite easily detect different phases simply by changing the `target_phase` argument (which uses degrees) + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(signal_noisy): + window = signal_noisy[window_i:window_i + window_len] + + if phastimate.predict(window, 270, 2): + triggers.append(window_i + window_len) + window_i += window_step + +polar_hist_fig = phastimate.polar_histogram_from_triggers( + signal_noisy, triggers) +waveform_fig = phastimate.plot_mean_std_waveform_from_triggers( + signal_noisy, triggers, tmin=0.05, tmax=0.05) + +phase_standard_deviation = phastimate.std_phase_from_triggers( + signal_noisy, triggers, degree=True) +phase_mean = phastimate.mean_phase_from_triggers( + signal_noisy, triggers, degree=True) +accuracy = phastimate.phase_accuracy_from_triggers(signal_noisy, triggers, 270) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) + +# %% [markdown] +# Similarly, to target troughs, instead of peaks, we just change the `target_phase` to :math:`90\deg` + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(signal_noisy): + window = signal_noisy[window_i:window_i + window_len] + + if phastimate.predict(window, 180, 5): + triggers.append(window_i + window_len) + window_i += window_step + +polar_hist_fig = phastimate.polar_histogram_from_triggers( + signal_noisy, triggers) +waveform_fig = phastimate.plot_mean_std_waveform_from_triggers( + signal_noisy, triggers, tmin=0.05, tmax=0.05) + +phase_standard_deviation = phastimate.std_phase_from_triggers( + signal_noisy, triggers, degree=True) +phase_mean = phastimate.mean_phase_from_triggers( + signal_noisy, triggers, degree=True) +accuracy = phastimate.phase_accuracy_from_triggers(signal_noisy, triggers, 180) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) diff --git a/docs/examples/etp-based-phase-estimation.py b/docs/examples/etp-based-phase-estimation.py new file mode 100644 index 0000000..9797b34 --- /dev/null +++ b/docs/examples/etp-based-phase-estimation.py @@ -0,0 +1,147 @@ +''' +Alpha phase estimation using ETP +=================================== +''' + + +# %% +from EEGPhasePy.estimators import ETP +import sys +import numpy as np +import scipy.signal as signal + +sys.path.append("../../..") + + +# %% [markdown] +# Generate the simulated training and testing EEG data + +# %% +fs = 2000 +time_data = np.arange(0, 200, 1/fs) + +training_signal_clean = np.sin(2 * np.pi * 10 * time_data) +training_signal = training_signal_clean + \ + np.random.normal(0, 2, len(time_data)) + +testing_signal_clean = np.sin(2 * np.pi * 10 * time_data) +testing_signal = testing_signal_clean + np.random.normal(0, 2, len(time_data)) + +# %% [markdown] +# Define the real-time and ground-truth alpha bandpass filters. +# +# The real-time filter should be lower order since it will be applied to a window with fewer samples than the ground truth filter. The ground truth filter is what will be used to extract the phase for phase stats (i.e. accuracy, mean and std of phase) as well as to generate the polar histogram and trigger waveform plots + +# %% +rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) +gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +# %% [markdown] +# Construct the ETP class and fit it to the training data. We will use the default edge and window length for this example +# +# *A note on window length and edge effects:* +# +# When looking at other frequency bands that have more phase instability (as is the case with theta) or in cases of lower SNR, such as extracting phase data from the leisoned cortex of stroke patients, adjusting the window length and edge removed can significantly improve ETP performance. By increasing window length, more data is included in the sliding window allowing for a higher order filter to be used. This will help to more effectively extract the "signal" from "noise". However, increasing filter order will also increase the amount of data that is effected by filtering. Thus, you will need to increase the amount of signal removed from the edge of the window to compensate. This can potentially compromise ETP performance. It is important to balance all 3 of these factors + +# %% +etp = ETP(rt_filter, gt_filter, fs) +etp.fit(training_signal, min_ipi=int(fs * 1/12)) + +# %% [markdown] +# Now that ETP has estimated the optimal `Tadj` (inter-peak-interval), we can run pseudo-real-time simulations on the testing data +# +# All estimators in EEGPhasePy contain a `predict` method that takes the unfiltered sliding window data as an argument. `predict` filters the data with the provided real-time filter and removes the specified edge from the data. Across estimators the functionality and output can differ. With ETP, the `predict` method will output how much time from the moment `predict` was called, must pass before the target phase will occur. You can use this information to schedule your phase-triggered stimulus. +# +# In our pseudo-real-time simulations we will store the sample at which our target phase was predicted to occur so that we can quantify and visualize the performance of ETP + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(testing_signal): + window = testing_signal[window_i:window_i + window_len] + + if window_i + window_len + etp.predict(window, 0) < len(testing_signal) - window_len: + triggers.append(window_i + window_len + etp.predict(window, 0)) + window_i += window_step + +# %% +polar_hist_fig = etp.polar_histogram_from_triggers(testing_signal, triggers) +waveform_fig = etp.plot_mean_std_waveform_from_triggers( + testing_signal, triggers, tmin=0.1, tmax=0.1) + +phase_standard_deviation = etp.std_phase_from_triggers( + testing_signal, triggers, degree=True) +phase_mean = etp.mean_phase_from_triggers( + testing_signal, triggers, degree=True) +accuracy = etp.phase_accuracy_from_triggers(testing_signal, triggers, 0) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) + +# %% [markdown] +# Above, we estimated peaks using ETP. However, with ETP you can easily estimate any other phase by changing the `target_phase` argument to the desired phase in radians. Below we estimate alpha rising phase by simply changing `target_phase` to :math:`3\pi/2` + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(testing_signal): + window = testing_signal[window_i:window_i + window_len] + + if window_i + window_len + etp.predict(window, 0) < len(testing_signal) - 2*window_len: + triggers.append(window_i + window_len + + etp.predict(window, (3/2)*np.pi)) + window_i += window_step + +polar_hist_fig = etp.polar_histogram_from_triggers(testing_signal, triggers) +waveform_fig = etp.plot_mean_std_waveform_from_triggers( + testing_signal, triggers, tmin=0.05, tmax=0.05) + +phase_standard_deviation = etp.std_phase_from_triggers( + testing_signal, triggers, degree=True) +phase_mean = etp.mean_phase_from_triggers( + testing_signal, triggers, degree=True) +accuracy = etp.phase_accuracy_from_triggers(testing_signal, triggers, 270) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) + +# %% [markdown] +# Similarly, to target troughs, instead of peaks, we just change the `target_phase` to :math:`\pi` + +# %% +window_i = 2*fs +window_len = int(0.5*fs) +window_step = int(0.06*fs) # using a step of 60 ms + +triggers = [] + +while window_i + window_len < len(testing_signal): + window = testing_signal[window_i:window_i + window_len] + + if window_i + window_len + etp.predict(window, 0) < len(testing_signal) - 2*window_len: + triggers.append(window_i + window_len + etp.predict(window, np.pi)) + window_i += window_step + +polar_hist_fig = etp.polar_histogram_from_triggers(testing_signal, triggers) +waveform_fig = etp.plot_mean_std_waveform_from_triggers( + testing_signal, triggers, tmin=0.05, tmax=0.05) + +phase_standard_deviation = etp.std_phase_from_triggers( + testing_signal, triggers, degree=True) +phase_mean = etp.mean_phase_from_triggers( + testing_signal, triggers, degree=True) +accuracy = etp.phase_accuracy_from_triggers(testing_signal, triggers, 180) + +print("Mean phase: " + str(phase_mean)) +print("Std phase: " + str(phase_standard_deviation)) +print("Accuracy: " + str(100*accuracy)) diff --git a/docs/examples/polar-histogram.py b/docs/examples/polar-histogram.py new file mode 100644 index 0000000..162a42e --- /dev/null +++ b/docs/examples/polar-histogram.py @@ -0,0 +1,30 @@ +''' +Polar histograms +================= +''' + + +# %% +import EEGPhasePy.viz as viz +import sys +import numpy as np +import scipy.signal as signal + +sys.path.append("../../..") + + +# %% +fs = 2000 +time_data = np.arange(0, 200, 1/fs) + +signal_clean = np.sin(2 * np.pi * 10 * time_data) + +# %% [markdown] +# Here, we identify the location of the peaks in our signal and add some random gaussian noise to the phase data of our peaks to simulate the results of real phase estimation. The `viz.plot_polar_histogram` method requires only that the phase data be passed into it + +# %% +peaks = signal.find_peaks(signal_clean)[0] +phase_data = np.angle(signal.hilbert(signal_clean)) + +polar_hist = viz.plot_polar_histogram( + phase_data[peaks] + np.random.normal(0, 0.7, len(peaks))) diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..747ffb7 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=source +set BUILDDIR=build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..ca8e7a7 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,4 @@ +sphinxcontrib-bibtex +sphinx-rtd-theme +nbsphinx +sphinx-gallery \ No newline at end of file diff --git a/docs/source/api.rst b/docs/source/api.rst new file mode 100644 index 0000000..fb14acc --- /dev/null +++ b/docs/source/api.rst @@ -0,0 +1,24 @@ +API Reference +=================== + +Here you can find the API reference for EEGPhasePy. + +Estimators +---------------- +.. autoclass:: EEGPhasePy.estimators.Estimator + :members: + :inherited-members: + +.. autoclass:: EEGPhasePy.estimators.ETP + :members: + :inherited-members: + +.. autoclass:: EEGPhasePy.estimators.PHASTIMATE + :members: + :inherited-members: + + +Visualization +------------------ +.. automodule:: EEGPhasePy.viz + :members: \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 0000000..c3e84b5 --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,54 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +import os +import sys +sys.path.insert(0, os.path.abspath("../../")) + +project = 'EEGPhasePy' +copyright = '2026, EEGPhasePy Authors' +author = 'EEGPhasePy Authors' + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.napoleon', + 'sphinxcontrib.bibtex', + 'sphinx.ext.intersphinx', + 'sphinx.ext.doctest', + 'sphinx_gallery.gen_gallery' +] + +intersphinx_mapping = { + 'python': ('https://docs.python.org/3/', None), + 'numpy': ('https://numpy.org/doc/stable/', None), + "matplotlib": ("https://matplotlib.org/stable/", None), + 'scipy': ('https://docs.scipy.org/doc/scipy/', None) +} + +sphinx_gallery_conf = { + 'examples_dirs': '../examples', # path to your example scripts + 'gallery_dirs': 'gallery_examples', # path to where to save gallery + 'ignore_pattern': r"(^|/)(GALLERY_HEADER\.rst|__init__\.py)$", +} + +# autosummary_generate = True +autodoc_typehints = "description" + +templates_path = ['_templates'] +exclude_patterns = [] + +bibtex_bibfiles = ['references.bib'] +bibtext_default_style = 'unsrt' + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = 'sphinx_rtd_theme' +html_static_path = ['_static'] diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 0000000..1748af0 --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,45 @@ +EEGPhasePy documentation +======================== + +Tookit for developing and analyzing real-time and pseudo-real-time EEG phase estimation. + +Getting started +---------------- +To get started with EEGPhasePy, you will first need to install the packaeg from PyPi by running the command: + +.. code-block:: Python + :caption: Install EEGPhasePy + + pip install eegphasepy + +Next, look through our usage guide for phase estimation. We recommend beginning with ETP phase estimation first. + + +Usage +---------------- +.. toctree:: + :maxdepth: 3 + + usage/etp + usage/ar + usage/optimization + usage/plots + usage/stats + + +Gallery +--------- +.. minigallery:: ../examples/*.py + +Reference +---------------- +.. toctree:: + :maxdepth: 3 + + api + +Bibliography +------------------ +.. bibliography:: references.bib + :style: unsrt + :all: \ No newline at end of file diff --git a/docs/source/references.bib b/docs/source/references.bib new file mode 100644 index 0000000..3291518 --- /dev/null +++ b/docs/source/references.bib @@ -0,0 +1,108 @@ +@ARTICLE{Shirinpour2020-ef, + title = "Experimental evaluation of methods for real-time {EEG} + phase-specific transcranial magnetic stimulation", + author = "Shirinpour, Sina and Alekseichuk, Ivan and Mantell, Kathleen and + Opitz, Alexander", + abstract = "OBJECTIVE: Real-time approaches for transcranial magnetic + stimulation (TMS) based on a specific EEG phase are a promising + avenue for more precise neuromodulation interventions. However, + optimal approaches to reliably extract the EEG phase in a + frequency band of interest to inform TMS are still to be + identified. Here, we implement a new real-time phase detection + method for closed-loop EEG-TMS for robust phase extraction. We + compare this algorithm with state-of-the-art methods and + evaluate its performance both in silico and experimentally. + APPROACH: We propose a new robust algorithm (Educated Temporal + Prediction) for delivering real-time EEG phase-specific + stimulation based on short prerecorded EEG training data. This + method estimates the interpeak period from a training period and + applies a bias correction to predict future peaks. We compare + the accuracy and computation speed of the ETP algorithm with two + existing methods (Fourier based, Autoregressive Prediction) + using prerecorded resting EEG data and real-time experiments. + MAIN RESULTS: We found that Educated Temporal Prediction + performs with higher accuracy than Fourier-based or + Autoregressive methods both in silico and in vivo while being + computationally more efficient. Further, we document the + dependency of the EEG signal-to-noise ratio (SNR) on algorithm + accuracy across all algorithms. SIGNIFICANCE: Our results give + important insights for real-time EEG-TMS technical development + as well as experimental design. Due to its robustness and + computational efficiency, our method can find broad use in + experimental research or clinical applications. Through open + sharing of code for all three methods, we enable broad access of + TMS-EEG real-time algorithms to the community.", + journal = "J. Neural Eng.", + publisher = "IOP Publishing", + volume = 17, + number = 4, + pages = "046002", + month = jul, + year = 2020, + keywords = "Electroencephalography; Real-time brain stimulation; + Transcranial magnetic stimulation", + copyright = "http://iopscience.iop.org/page/copyright", + language = "en" +} +@ARTICLE{Zrenner2020-zb, + title = "The shaky ground truth of real-time phase estimation", + author = "Zrenner, Christoph and Galevska, Dragana and Nieminen, Jaakko O + and Baur, David and Stefanou, Maria-Ioanna and Ziemann, Ulf", + abstract = "Instantaneous phase of brain oscillations in + electroencephalography (EEG) is a measure of brain state that is + relevant to neuronal processing and modulates evoked responses. + However, determining phase at the time of a stimulus with + standard signal processing methods is not possible due to the + stimulus artifact masking the future part of the signal. Here, + we quantify the degree to which signal-to-noise ratio and + instantaneous amplitude of the signal affect the variance of + phase estimation error and the precision with which ``ground + truth'' phase is even defined, using both the variance of + equivalent estimators and realistic simulated EEG data with + known synthetic phase. Necessary experimental conditions are + specified in which pre-stimulus phase estimation is meaningfully + possible based on instantaneous amplitude and signal-to-noise + ratio of the oscillation of interest. An open source toolbox is + made available for causal (using pre-stimulus signal only) phase + estimation along with a EEG dataset consisting of recordings + from 140 participants and a best practices workflow for + algorithm optimization and benchmarking. As an illustration, + post-hoc sorting of open-loop transcranial magnetic stimulation + (TMS) trials according to pre-stimulus sensorimotor $\mu$-rhythm + phase is performed to demonstrate modulation of corticospinal + excitability, as indexed by the amplitude of motor evoked + potentials.", + journal = "Neuroimage", + publisher = "Elsevier BV", + volume = 214, + number = 116761, + pages = "116761", + month = jul, + year = 2020, + keywords = "Brain state; EEG; EEG-TMS; Estimator; Oscillation; Phase; + Real-time; TMS", + copyright = "http://creativecommons.org/licenses/by-nc-nd/4.0/", + language = "en" +} +@ARTICLE{Zrenner2018-hh, + title = "Real-time {EEG-defined} excitability states determine efficacy + of {TMS-induced} plasticity in human motor cortex", + author = "Zrenner, Christoph and Desideri, Debora and Belardinelli, Paolo + and Ziemann, Ulf", + journal = "Brain Stimul.", + publisher = "Elsevier BV", + volume = 11, + number = 2, + pages = "374--389", + month = mar, + year = 2018, + language = "en" +} +@article{gad2023pygad, + title={Pygad: An intuitive genetic algorithm python library}, + author={Gad, Ahmed Fawzy}, + journal={Multimedia Tools and Applications}, + pages={1--14}, + year={2023}, + publisher={Springer} +} \ No newline at end of file diff --git a/docs/source/usage/ar.rst b/docs/source/usage/ar.rst new file mode 100644 index 0000000..866b73f --- /dev/null +++ b/docs/source/usage/ar.rst @@ -0,0 +1,108 @@ +Autoregressive phase estimation +================================= + +Background +------------ +Autoregressive (AR) phase estimation is a form of phase estimation that uses autoregressive forecasting to fill in the data impacted by +filter edge effects and tens of milliseconds ahead of the current point in time, to account for Hilbert transform edge effects. AR was first introduced +by :cite:t:`Zrenner2018-hh` and a toolbox for AR phase estimation known as PHASTIMATE was published by :cite:t:`Zrenner2020-zb` + +AR is best used in environments that make gaurentees about communication delays as foreward forecasting beyond the current point in time significantly reduces +AR phase estimation performance. Further, to the best of our knowledge, AR has not been applied as a phase estimation algorithm in environments +where delay gaurentees were made. It is also important to note that AR requires a high packet send rate from your EEG amplifier. This is due to the +algorithm estimating the current phase. Considering phase changes rapidly across the vast majority of EEG frequencies (except for low-delta) +if the algorithm is only run a couple of times per second, it will very rarely encounter a sliding window whose end contains the desired phase. +The first application of AR ran the algorithm at a frequency of 500Hz (i.e. 500 sliding windows passed into the algorithm / second) :cite:p:`Zrenner2018-hh`. + + +Usage +--------- +AR phase estimation is implemented in the :py:class:`EEGPhasePy.estimators.PHASTIMATE` class. If you use this class +please cite :cite:t:`Zrenner2020-zb`. We'll start off by importing the PHASTIMATE class, numpy to simulate EEG data +and scipy.signal for filtering. + +.. doctest:: + :hide: + :pyversion: == 3.12 + +.. testsetup:: * + import numpy as np + import scipy.signal as signal + + from EEGPhasePy.estimators import PHASTIMATE + +.. code-block:: Python + :caption: Imports + + import numpy as np + import scipy.signal as signal + + from EEGPhasePy.estimators import PHASTIMATE + +Next, we'll create one 200s long signal simulating the human alpha rhythym (assuming 10hz here) with some added gaussian noise. + +.. testcode:: + + fs = 2000 + time_data = np.arange(0, 200, 1/fs) + + signal_clean = np.sin(2 * np.pi * 10 * time_data) + signal_noisy = training_signal_clean + np.random.normal(0, 2, len(time_data)) + +After our signals have been created, we need to construct our real-time and ground-truth filter. For simplicity, +we have assumed that a higher order filter will allow us to effectively obtain ground truth data for most of the signal. +The purpose of the ground-truth filter is to obtain the true phase. This filter would be used when computing +phase stats (e.g. circular mean and std) based on triggers. The real-time filter is used to filter the raw window +of EEG data passed into the :py:meth:`EEGPhasePy.estimators.PHASTIMATE.predict` method. + +.. note:: A note on the "ground truth" filter + + We use ground-truth here quite loosely. It is very challenging to + obtain a true "ground-truth" for EEG phase. If you're interested in understanding + more of the nuance associated with this, check out :cite:t:`Zrenner2020-zb`. + +.. testcode:: + + rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) + gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +Next, we will instantiate :py:class:`EEGPhasePy.estimators.PHASTIMATE`. + +Unlike ETP, PHASTIMATE doesn't necessarily require training data. With the right parameters +it can work out of the box. However, you may benefit from optimizing the ``window_edge`` and ``ar_order`` parameters in the class, especially +when there is variability in SNR across participants. Our implementation of the PHASTIMATE toolbox includes genetic optimization in a similar +manner as what :cite:t:`Zrenner2020-zb` proposed along with a Bayesian Optimization of these parameters. To run an optimization of these parameters +additional resting-state EEG data would need to be collected, similar to ETP. We have included examples of both optimization approaches at the +bottom of this page. + +.. testcode:: + + phastimate = PHASTIMATE(rt_filter, gt_filter, fs) + +Now, we will run a pseudo-real-time simulation using the testing signal. Here, we use :py:meth:`EEGPhasePy.estimators.PHASTIMATE.predict` +object. :py:meth:`EEGPhasePy.estimators.PHASTIMATE.predict` works by returning a boolean value about whether the current phase is your target phase. A tolerance value may also be +passed in. By default tolerance is 5 degrees. Tolerance indicates how close the current phase must be to the target phase for :py:meth:`EEGPhasePy.estimators.PHASTIMATE.predict` +to return ``True``. + +.. testcode:: + + window_start = 2*fs + window_len = int(0.5*fs) + window_step = int(0.06*fs) # using a step of 60 ms + + triggers = [] + + while window_start + window_len < len(signal_noisy): + window = signal_noisy[window_start:window_start + window_len] + + if phastimate.predict(window, 0, 15): + triggers.append(window_start + window_len) + + window_start += window_step + + polar_hist_fig = phastimate.polar_histogram_from_triggers(signal_noisy, triggers) + waveform_fig = phastimate.plot_mean_std_waveform_from_triggers(signal_noisy, triggers, tmin=0.1, tmax=0.1) + +Examples +---------- +.. minigallery:: ../examples/ar-* \ No newline at end of file diff --git a/docs/source/usage/etp.rst b/docs/source/usage/etp.rst new file mode 100644 index 0000000..3022b2b --- /dev/null +++ b/docs/source/usage/etp.rst @@ -0,0 +1,102 @@ +ETP-based phase estimation +=========================== + +Background +------------ +Educated Temporal Prediction (ETP) is a form of phase estimation that uses the time of the latest peak in the current time window and the average +inter-peak-interval to predict the next time the desired phase will occur at. ETP was first described by :cite:t:`Shirinpour2020-ef`, so please cite +:cite:t:`Shirinpour2020-ef` when using the :py:class:`EEGPhasePy.estimators.ETP` class of EEGPhasePy. + +The primary benefit of ETP over other phase estimation methods is its ability to work in environments with large and uncertain (within limits) +delays. This is especially useful considering most EEG amplifiers don't make gaurentees about communication delays and the computer running ETP in real-time will likely suffer from +jitter in scheduling your phase triggered stimulus (e.g. `transcranial electrical stimulation (tES) `_ +or `transcranial magnetic stimulation (TMS) `_). ETP allows you to account for these delays while suffering less of an accuracy +dip compared to other phase estimation algorithms + + +Usage +--------- +Using ETP is quite simple. We'll start off by importing the ETP class, :mod:`numpy` to simulate EEG data and :mod:`scipy.signal` for filtering. + +.. doctest:: + :hide: + :pyversion: == 3.12 + +.. testsetup:: * + import numpy as np + import scipy.signal as signal + + from EEGPhasePy.estimators import ETP + +.. code-block:: Python + :caption: Imports + + import numpy as np + import scipy.signal as signal + + from EEGPhasePy.estimators import ETP + +Next, we'll create two 200 s long signals simulating the human alpha rhythym (assuming 10 Hz here) and adding in some gaussian noise. +Our training signal will be used to fit the ETP algorithm and then we will test it on the testing signal. + +.. testcode:: + + fs = 2000 + time_data = np.arange(0, 200, 1/fs) + + training_signal_clean = np.sin(2 * np.pi * 10 * time_data) + training_signal = training_signal_clean + np.random.normal(0, 2, len(time_data)) + + testing_signal_clean = np.sin(2 * np.pi * 10 * time_data) + testing_signal = testing_signal_clean + np.random.normal(0, 2, len(time_data)) + +After our signals have been created, we need to construct our real-time and ground-truth filter. For simplicity, +we have assumed that a higher order filter will allow us to effectively obtain ground truth data for most of the signal. +The purpose of the ground-truth filter is to obtain the true phase. This filter would be used when training ETP and for +computing phase stats (e.g. circular mean and std) based on triggers. The real-time filter is used to filter the raw window +of EEG data passed into the `predict` method. + +.. note:: A note on the "ground truth" filter + + We use ground-truth here quite loosely. It is very challenging to + obtain a true "ground-truth" for EEG phase. If you're interested in understanding + more of the nuance associated with this, check out :cite:t:`Zrenner2020-zb`. + +.. testcode:: + + rt_filter = signal.firwin(120, [8, 12], fs=fs, pass_zero=False) + gt_filter = signal.firwin(300, [8, 12], fs=fs, pass_zero=False) + +Next, we will instantiate ETP and fit it to our training data. The `min_ipi` parameter specifies the shortest +time between peaks the algorithm should expect. This is used to limit the effect phase-slips or phase-resets have +on fitting ETP. + +.. testcode:: + + etp = ETP(rt_filter, gt_filter, fs) + etp.fit(training_signal, min_ipi=int(fs * 1/12)) + +Then, we will run a pseudo-real-time simulation using the testing signal. Here, we use the `predict` method of the `etp` +object. `predict` works by returning the next sample your target phase will occur at. + +.. testcode:: + + window_start = 0 + window_len = int(0.5*fs) + window_step = int(0.06*fs) # using a step of 60 ms + + triggers = [] + + while window_start + window_len < len(testing_signal): + window = testing_signal[window_start:window_start + window_len] + + if window_start + window_len + etp.predict(window, 0) < len(testing_signal) - window_len: + triggers.append(window_start + window_len + etp.predict(window, 0)) + window_start += window_step + + polar_hist_fig = etp.polar_histogram_from_triggers(testing_signal, triggers) + waveform_fig = etp.plot_mean_std_waveform_from_triggers(testing_signal, triggers, tmin=0.1, tmax=0.1) + +Examples +---------- +.. minigallery:: ../examples/etp-* \ No newline at end of file diff --git a/docs/source/usage/optimization.rst b/docs/source/usage/optimization.rst new file mode 100644 index 0000000..8f6022f --- /dev/null +++ b/docs/source/usage/optimization.rst @@ -0,0 +1,14 @@ +Optimizing PHASTIMATE parameters +========================================================================= + +The :py:meth:`EEGPhasePy.estimators.PHASTIMATE` parameters of ``window_edge`` and ``ar_order`` significantly impact the performance of :py:meth:`EEGPhasePy.estimators.PHASTIMATE` performance. +In certain cases, such as when dealing with high inter-individual variability or working with a frequency band that is not commonly studied, using +an optimization algorithm to select these parameters is ideal. We have implemented 2 approaches for optimizing these parameters. + +Genetic Optimization +-------------------- +.. minigallery:: ../examples/ar-genetic-* + +Bayesian Optiization +--------------------- +.. minigallery:: ../examples/ar-bayesian-* \ No newline at end of file diff --git a/docs/source/usage/plots.rst b/docs/source/usage/plots.rst new file mode 100644 index 0000000..c811d4c --- /dev/null +++ b/docs/source/usage/plots.rst @@ -0,0 +1,61 @@ +Plotting +======================== + +EEGPhasePy comes with 2 plotting helper methods. We provide helper functions to plot the polar histogram given a set of trigger samples and +to plot the pre/post-trigger waveform average with standard deviation highlights. Each plotting method returns its :py:mod:`matplotlib` figure object. + +All estimators contain stats and plotting helper methods. We will describe how the plotting methods in EEGPhasePy can be used +with and without an estimator. + +Polar histogram +----------------- +Assuming you have an array, ``triggers``, that contains the sample values at which your target phase was triggered at +you can plot a polar histogram using an existing estimator with the code below: + +.. doctest:: + :hide: + :pyversion: == 3.12 + + +.. code-block:: Python + :caption: Plotting polar histogram + + polar_hist_fig = etp.polar_histogram_from_triggers(testing_signal, triggers) + +If you want to plot a polar histogram without creating an estimator, you can do so by using the :py:mod:`EEGPhasePy.viz` module. See the example code below: + +.. testcode:: + + import numpy as np + import scipy.signal as signal + + import EEGPhasePy.viz as viz + + fs = 2000 + time_data = np.arange(0, 200, 1/fs) + + signal_clean = np.sin(2 * np.pi * 10 * time_data) + + peaks = signal.find_peaks(signal_clean)[0] + phase_data = np.angle(signal.hilbert(signal_clean)) + + polar_hist = viz.plot_polar_histogram(phase_data[peaks] + np.random.normal(0, 0.7, len(peaks))) + + +Waveform average +------------------ +Plotting the average waveform with standard deivation highlights follows similar logic to the polar histogram. With the waveform average +however, you must specify how much time before and after the stimulus your plot should show. The time before stimulus delievery can be specified +using ``tmin`` (always positive, larger values will encompass a greater amount of pre-stimulus time) and ``tmax``. You can +plot the pre/post-stimulus waveform given an array of trigger samples and a pre-exisitng estimator using the code below: + +.. code-block:: Python + :caption: Plotting waveform average + + waveform_fig = etp.plot_mean_std_waveform_from_triggers(testing_signal, triggers, tmin=0.1, tmax=0.1) + +Unlike the polar histogram, you can only plot waveforms using an existing estimator. + +Examples +---------- +.. minigallery:: ../examples/*-phase-estimation.py diff --git a/docs/source/usage/stats.rst b/docs/source/usage/stats.rst new file mode 100644 index 0000000..743747d --- /dev/null +++ b/docs/source/usage/stats.rst @@ -0,0 +1,34 @@ +Phase estimation statistics +==================================== + +When performing phase estimation, you will likely need to quantify one or more of mean, standard deviation and accuracy. All estimators +contain helper methods for automatically computing mean, standard deviation and accuracy of triggers to a target phase. + +Mean and standard deviation +----------------------------- +Mean and standard deviation (std) are computed using :py:mod:`scipy.stats` circular statistics. To compute the mean and standard deviation of phase +given a list of trigger samples, you can use the :py:meth:`EEGPhasePy.estimators.Estimator.mean_phase_from_triggers` and :py:meth:`EEGPhasePy.estimators.Estimator.std_phase_from_triggers`. You can specify radian or degree output +when calling each method. See below: + +.. code-block:: Python + :caption: Computing mean and std using etp + + phase_standard_deviation = etp.std_phase_from_triggers(testing_signal, triggers, degree=True) + phase_mean = etp.mean_phase_from_triggers(testing_signal, triggers, degree=True) + + +Accuracy +--------- +Accuracy is defined as 50% begin completely random, 0% begin completely opposite to the target phase and 100% being exactly at the target phase. +We calculate accuracy based no the equation described by :cite:t:`Shirinpour2020-ef`. You can specify the target phase when you call the :py:meth:`EEGPhasePy.estimators.Estimator.phase_accuracy_from_triggers` +method: + +.. code-block:: Python + :caption: Computing accuracy to target phase from triggers + + accuracy = etp.phase_accuracy_from_triggers(testing_signal, triggers, 0) + + +Examples +---------- +.. minigallery:: ../examples/polar-histogram.py \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1353400 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,34 @@ +[build-system] +requires = ["setuptools >= 77.0.3"] +build-backend = "setuptools.build_meta" + +[project] +name = "eegphasepy" +version = "0.0.5" +authors = [ + { name="Ameer Hamoodi", email="hamoodia@mcmaster.ca" }, + { name="Christian Brodbeck", email="brodbecc@mcmaster.ca"}, + { name="Mustaali Hussain", email="hussam55@mcmaster.ca"}, + { name="Aimee Nelson", email="nelsonaj@mcmaster.ca"} +] +description = "A toolkit for EEG phase estimation" +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", +] +license = "BSD-3-Clause" +license-files = ["LICENCSE"] +dependencies = [ + "matplotlib >= 3.8", + "numpy >= 1.26,<3", + "scipy >= 1.11", + "statsmodels >= 0.14", + "pygad >= 3.5.0", + "bayesian-optimization >= 3.1.0" +] + +[project.urls] +Homepage = "https://github.com/AmeerHamoodi/EEGPhasePy" +Issues = "https://github.com/AmeerHamoodi/EEGPhasePy/issues" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4d2cb5a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +matplotlib >= 3.8 +numpy >= 1.26,<3 +scipy >= 1.11 +statsmodels >= 0.14 +pygad >= 3.5.0 +bayesian-optimization >= 3.1.0 \ No newline at end of file diff --git a/EEGPhasePy/performance/__init__.py b/tests/__init__.py similarity index 100% rename from EEGPhasePy/performance/__init__.py rename to tests/__init__.py diff --git a/tests/test_base_estimator.py b/tests/test_base_estimator.py new file mode 100644 index 0000000..b3f5e97 --- /dev/null +++ b/tests/test_base_estimator.py @@ -0,0 +1,152 @@ +from EEGPhasePy.estimators import Estimator +import pytest +import numpy as np +import scipy.signal as signal + +from EEGPhasePy.utils import check + +real_time_filter_fir = signal.firwin(128, [8, 13], fs=2048, pass_zero=False) +real_time_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) + +ground_truth_filter_fir = signal.firwin(300, [8, 13], fs=2048, pass_zero=False) +ground_truth_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) + +param_to_prop_map = ["real_time_filter", "ground_truth_filter", + "sampling_rate", "window_len", "window_edge"] + + +def construct_estimator_with_paramset(param_set): + ''' + Constructs the Estimator class with passed in parameters and returns the constructed Estimator object + + Parameters + ---------- + + param_set : array + Array of parameters in order to pass into Estimator + + ------- + Returns + ------- + + estimator : the constructed Estimator object + ''' + + if len(param_set) == 3: + return Estimator(param_set[0], param_set[1], param_set[2]) + else: + return Estimator(param_set[0], param_set[1], param_set[2], window_len=param_set[3], window_edge=param_set[4]) + + +def test_estimator_construct(): + estimator_parameter_sets = [ + [real_time_filter_fir, ground_truth_filter_fir, 2048], # pass + [real_time_filter_iir, ground_truth_filter_iir, 2048], # pass + [real_time_filter_fir, ground_truth_filter_fir, 2048.5], # fail + [real_time_filter_fir, 1, 2048], # fail + [1, ground_truth_filter_fir, 2048], # fail + [real_time_filter_fir, ground_truth_filter_fir, 2048, 1000, 80], # pass + [real_time_filter_fir, ground_truth_filter_fir, 2048, 1200.2, 60], # fail + [real_time_filter_fir, ground_truth_filter_fir, 2048, 1000, 74.3], # fail + [np.zeros((1, 1, 1)), ground_truth_filter_fir, 2048], # fail + [real_time_filter_iir, np.zeros((1, 1, 1)), 2048], # fail + ] + estimator_parameter_outputs = [ + False, + False, + [TypeError, "Value must be an int type"], + [TypeError, "Value must be an array type"], + [TypeError, "Value must be an array type"], + False, + [TypeError, "Value must be an int type"], + [TypeError, "Value must be an int type"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D 2D"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D 2D"] + ] + + for i, param_set in enumerate(estimator_parameter_sets): + if not type(estimator_parameter_outputs[i]) == bool: + with pytest.raises(estimator_parameter_outputs[i][0], match=r"" + estimator_parameter_outputs[i][1] + ""): + construct_estimator_with_paramset(param_set) + else: + etp = construct_estimator_with_paramset(param_set) + for j, prop in enumerate(param_to_prop_map[:len(param_set)]): + if hasattr(param_set[j], '__len__'): + assert (np.array(getattr(etp, prop)) == + np.array(param_set[j])).all() + else: + assert getattr(etp, prop) == param_set[j] + + +def test_estimator_get_phase_at_triggers(): + trigger_param_sets = [ + [np.zeros((1000, 2)), [1, 2, 3, 4]], + [np.zeros(1000), [[1, 2, 3, 4]]], + [1, [1, 2, 3, 4]], + [np.zeros(1000), 1], + [np.zeros(1000), [1, 2, 3, 4], 1], + [np.zeros(1000), [1, 2, 3, 4], True], + [np.zeros(1000), [1, 2, 3, 4], True, 1], + [np.zeros(1000), [1, 2, 3, 4], True, True], + [np.sin(2 * np.pi * 5 * np.arange(0, 2, 1/500)), [125, 225]], + [np.sin(2 * np.pi * 5 * np.arange(0, 2, 1/500)), [238, 338, 438], True], + [np.sin(2 * np.pi * 5 * np.arange(0, 2, 1/500)), + [238, 121, 221], True, True], + [np.sin(2 * np.pi * 5 * np.arange(0, 2, 1/500)), + [238, 121, 221], False, True], + ] + + trigger_param_outputs = [ + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], + [TypeError, "Value must be an array"], + [TypeError, "Value must be an array"], + [TypeError, "Value must be a bool"], + False, + [TypeError, "Value must be a bool"], + False, + [0, 0], + [45, 45, 45], + [45, 345, 345], + [0.25*np.pi, 6.02, 6.02] + ] + + custom_filter = signal.firwin(100, [1, 5], fs=500, pass_zero=False) + estimator = Estimator(custom_filter, custom_filter, 500) + for i, param_set in enumerate(trigger_param_sets): + if check._is_array(trigger_param_outputs[i]) and (trigger_param_outputs[i][0] == ValueError or trigger_param_outputs[i][0] == TypeError): + with pytest.raises(trigger_param_outputs[i][0], match=r"" + trigger_param_outputs[i][1] + ""): + phase = estimator.get_phase_from_triggers(*param_set) + elif check._is_array(trigger_param_outputs[i]): + phase = estimator.get_phase_from_triggers(*param_set) + + for j, true_phase in enumerate(trigger_param_outputs[i]): + if true_phase <= 2*np.pi: + assert np.abs(phase[j]) == pytest.approx( + true_phase, abs=0.1) + else: + assert np.abs(phase[j]) == pytest.approx(true_phase, abs=6) + else: + phase = estimator.get_phase_from_triggers(*param_set) + + +def test_estimator_compute_phase_stats(): + phase_param_sets = [ + [np.sin(2 * np.pi * 5 * np.arange(0, 10, 1/500)), + [125, 225, 425, 625, 525, 825]], + [np.sin(2 * np.pi * 5 * np.arange(0, 2, 1/500)) + + np.random.normal(0, 1, size=1000), [125, 225, 325]], + ] + + trigger_param_outputs = [ + [0, 0], + [0, 1] + ] + + custom_filter = signal.firwin(100, [1, 5], fs=500, pass_zero=False) + estimator = Estimator(custom_filter, custom_filter, 500) + for i, param_set in enumerate(phase_param_sets): + assert estimator.mean_phase_from_triggers( + param_set[0], param_set[1], degree=True) == pytest.approx(trigger_param_outputs[i][0], abs=6) + assert estimator.std_phase_from_triggers( + param_set[0], param_set[1], degree=True) == pytest.approx(trigger_param_outputs[i][1], abs=15) diff --git a/EEGPhasePy/tests/test_data/.gitkeep b/tests/test_data/.gitkeep similarity index 100% rename from EEGPhasePy/tests/test_data/.gitkeep rename to tests/test_data/.gitkeep diff --git a/tests/test_etp_estimator.py b/tests/test_etp_estimator.py new file mode 100644 index 0000000..bcba030 --- /dev/null +++ b/tests/test_etp_estimator.py @@ -0,0 +1,156 @@ +import pytest +import numpy as np +import scipy.signal as signal +import scipy.stats as stats +import os + +if not os.getenv('GITHUB_ACTIONS') == 'true': + import mne + + +from EEGPhasePy.estimators import ETP + +real_time_filter_fir = signal.firwin(128, [8, 13], fs=2048, pass_zero=False) +real_time_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) + +ground_truth_filter_fir = signal.firwin(300, [8, 13], fs=2048, pass_zero=False) +ground_truth_filter_iir = signal.butter(3, [8, 13], btype="bandpass", fs=2048) + +param_to_prop_map = ["real_time_filter", "ground_truth_filter", + "sampling_rate", "window_len", "window_edge"] + + +def construct_etp_with_paramset(param_set): + ''' + Constructs the ETP class with passed in parameters and returns the constructed ETP object + + Parameters + ---------- + + param_set : array + Array of parameters in order to pass into ETP + + ------- + Returns + ------- + + ETP : the constructed ETP object + ''' + + if len(param_set) == 3: + return ETP(param_set[0], param_set[1], param_set[2]) + else: + return ETP(param_set[0], param_set[1], param_set[2], window_len=param_set[3], window_edge=param_set[4]) + + +def construct_default_etp(): + ''' + Constructs an ETP class with default parameters + + ------- + Returns + ------- + + etp : ETP object + ''' + return ETP(real_time_filter_fir, ground_truth_filter_fir, 2048) + + +def test_etp_fit(): + # should fail if training data not 1D + # should fail if min_ipi not int + # should fail if training_data not arr + + etp_fit_parameters = [ + [np.sin(78.5*np.arange(0, 200, 1/2048)), 63], # pass + [1, 63], # fail + [np.zeros((100, 2)), 63], # fail + [np.zeros(100), 62.5] # fail + ] + + etp_fit_parameter_outputs = [ + False, + [TypeError, "Value must be an array type"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], + [TypeError, "Value must be an int type"] + ] + + for i, param_set in enumerate(etp_fit_parameters): + if not type(etp_fit_parameter_outputs[i]) == bool: + with pytest.raises(etp_fit_parameter_outputs[i][0], match=r"" + etp_fit_parameter_outputs[i][1] + ""): + etp = construct_default_etp() + etp.fit(param_set[0], param_set[1]) + else: + etp = construct_default_etp() + etp.fit(param_set[0], param_set[1]) + # check if Tadj roughly equal to pre-defined period (80ms) + assert etp.tadj == pytest.approx(80, 2) + + +def test_etp_predict(): + # Test if predict successfuly predicts with no errors + # Test if predict fails when given non-array + # Test if predict fails when given 2D array + # Test if predict fails when given non-float target_phase + + etp_predict_parameters = [ + [np.sin(78.5*np.arange(0, 0.5, 1/2048)), 1.96], + [15, 1.96], + [np.zeros((500, 2)), 1.96], + [np.sin(78.5*np.arange(0, 0.5, 1/2048)), "string"] + ] + + etp_predict_parameter_outputs = [ + False, + [TypeError, "Value must be an array type"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], + [TypeError, "Value must be one of: float or int"] + ] + + for i, param_set in enumerate(etp_predict_parameters): + if not type(etp_predict_parameter_outputs[i]) == bool: + with pytest.raises(etp_predict_parameter_outputs[i][0], match=r"" + etp_predict_parameter_outputs[i][1] + ""): + etp = construct_default_etp() + etp.fit(np.sin(78.5*np.arange(0, 200, 1/2048)), 63) + etp.predict(param_set[0], param_set[1]) + else: + etp = construct_default_etp() + etp.fit(np.sin(78.5*np.arange(0, 200, 1/2048)), 63) + + assert isinstance(etp.predict( + param_set[0], param_set[1]), np.int64) + + +def test_etp_real_data(): + if not os.getenv('GITHUB_ACTIONS') == 'true': + # Load test files + training_data = mne.io.read_raw_curry( + "./tests/test_data/training-rsEEG.cdt") + testing_data = mne.io.read_raw_curry( + "./tests/test_data/testing-rsEEG.cdt") + fs = 2048 + + C3_train_data = training_data.pick("C3").get_data()[0] + C3_test_data = testing_data.pick("C3").get_data()[0] + + # Test if predict performs as expected on test files mean == mean and std == std for phase + + etp = construct_default_etp() + etp.fit(C3_train_data, 83) + + window_i = 0 + window_step = int(0.063 * fs) + window_len = int(0.5 * fs) + + triggers = [] + + while window_i + window_len < len(C3_test_data): + window_data = C3_test_data[window_i:window_i + window_len] + + triggers.append(etp.predict(window_data, 0) + window_i) + + window_i += window_step + + # Obtain phase from trigger indecies + assert stats.circmean(etp.get_phase_from_triggers(C3_test_data, triggers, degree=True, full_circle=True)) == pytest.approx(0, 5) \ + or stats.circmean(etp.get_phase_from_triggers(C3_test_data, triggers, degree=True, full_circle=True)) == pytest.approx(360, 5) diff --git a/tests/test_phastimate_estimator.py b/tests/test_phastimate_estimator.py new file mode 100644 index 0000000..f495b43 --- /dev/null +++ b/tests/test_phastimate_estimator.py @@ -0,0 +1,78 @@ +from EEGPhasePy.estimators import PHASTIMATE +import pytest +import numpy as np +import scipy.signal as signal + +from EEGPhasePy.utils import check + +# seed is necessary since AR can sometimes be inaccurate, inaccuracy is a result of the gaussian random noise +np.random.seed(123) + + +def test_phastimate_construct(): + rt_filter = signal.firwin(200, [8, 12], pass_zero=False, fs=500) + gt_filter = signal.firwin(300, [8, 12], pass_zero=False, fs=500) + + phastimate_paramset = [ + [rt_filter, gt_filter, 500, 500, 40, 1.5], + [rt_filter, gt_filter, 500, 500, 40, 35] + ] + + phastimate_output = [ + [TypeError, 'Value must be an int'], + False + ] + + for i, paramset in enumerate(phastimate_paramset): + if check._is_array(phastimate_output[i]): + with pytest.raises(phastimate_output[i][0], match=r"" + phastimate_output[i][1] + ""): + phastimate = PHASTIMATE(*paramset) + else: + phastimate = PHASTIMATE(*paramset) + + +def test_phastimate_predict(): + fs = 500 + rt_filter = signal.firwin(80, [8, 12], pass_zero=False, fs=fs) + gt_filter = signal.firwin(300, [8, 12], pass_zero=False, fs=fs) + + phastimate = PHASTIMATE(rt_filter, gt_filter, fs, + ar_order=12, window_edge=64) + + time_data = np.arange(0, 2, 1/fs) + + clean_signal = np.sin(2 * np.pi * 10 * time_data) + ground_truth_phase = np.angle(signal.hilbert(clean_signal), deg=True) % 360 + + minimally_noisy_alpha = clean_signal + \ + np.random.normal(0, 2, size=len(time_data)) + + phastimate_predict_paramset = [ + [100, 25], + [minimally_noisy_alpha, [25]], + [[minimally_noisy_alpha], 25], + [minimally_noisy_alpha, 25, [100]], + [minimally_noisy_alpha[-500:-250], ground_truth_phase[-220], 10], + [minimally_noisy_alpha[-500:-250], ground_truth_phase[-250], 10], + [minimally_noisy_alpha[-470:-220], ground_truth_phase[-220], 10], + [minimally_noisy_alpha[-470:-220], ground_truth_phase[-200], 0.5], + ] + + phastimate_output = [ + [TypeError, "Value must be an array type"], + [TypeError, "Value must be one of: int or float"], + [ValueError, "The provided array has the wrong dimension. Arrays can have the following dimensions: 1D"], + [TypeError, "Value must be one of: int or float"], + np.False_, + np.True_, + np.True_, + np.False_ + ] + + for i, param_set in enumerate(phastimate_predict_paramset): + if check._is_array(phastimate_output[i]) and (phastimate_output[i][0] == ValueError or phastimate_output[i][0] == TypeError): + with pytest.raises(phastimate_output[i][0], match=r"" + phastimate_output[i][1] + ""): + phase_match = phastimate.predict(*param_set) + else: + phase_match = phastimate.predict(*param_set) + assert phase_match == phastimate_output[i]