diff --git a/pyhctsa/calculator.py b/pyhctsa/calculator.py index 40fcded..cd714ea 100644 --- a/pyhctsa/calculator.py +++ b/pyhctsa/calculator.py @@ -4,6 +4,7 @@ from itertools import product from pathlib import Path from typing import Union, Any, Callable +import logging import numpy as np import pandas as pd @@ -38,8 +39,8 @@ def classify_output(res) -> int: out = 4 return out -def _apply_selection_wrapper(func:Callable, filter_keys:Union[str, list[str]], - keep:bool=True) -> Callable: +def _apply_selection_wrapper(func: Callable, filter_keys: Union[str, list[str]], + keep: bool = True) -> Callable: """ Wraps a function to selectively filter keys from its dict output. @@ -69,7 +70,7 @@ def wrapper(*args, **kwargs): if isinstance(result, dict): missing = [k for k in keys if k not in result] # log all of the missing keys if missing: - print(f'Warning: time-series features for func {func} not found {missing}') + logging.info(f'Warning: time-series features for func {func} not found {missing}') if keep: return {k: result[k] for k in keys if k in result} else: @@ -86,7 +87,7 @@ def _standardise_inputs(data) -> list[np.ndarray]: elif data.ndim == 2: if data.shape[0] > data.shape[1]: # notify the user to check that the shapes make sense - print(f"Check that the shape of the 2D input is such " + logging.warning(f"Check that the shape of the 2D input is such " f"that (n_series, n_samples). Got shape: {data.shape}") return [np.asarray(row, dtype=float) for row in data] else: @@ -216,7 +217,7 @@ def _check_deps(self, module_key, feature_name, config): missing = [dep for dep in deps_to_check if not _check_optional_deps(dep)] if missing: full_name = f"{module_key}.{feature_name}" - print(f"Skipping function '{full_name}' - missing dependencies: {', '.join(missing)}") + logging.info(f"Skipping function '{full_name}' - missing dependencies: {', '.join(missing)}") self._skipped_functions.append((full_name, missing)) return False return True @@ -229,7 +230,7 @@ def _build_feature_funcs(self): try: module = importlib.import_module(f"{self._operations_package}.{module_key}") except ImportError as e: - print(f"Failed to import module '{module_key}': {e}") + logging.warning(f"Failed to import module '{module_key}': {e}") # Skip all functions in this module since we can't import it for feature_name in self.config[module_key].keys(): skipped_functions.append((f"{module_key}.{feature_name}", ["import_error"])) @@ -273,7 +274,7 @@ def _build_feature_funcs(self): # store information about skipped functions for later reference self._skipped_functions = skipped_functions if skipped_functions: - print(f"Total functions skipped due to missing dependencies: {len(skipped_functions)}") + logging.info(f"Total functions skipped due to missing dependencies: {len(skipped_functions)}") return feature_funcs diff --git a/pyhctsa/distribute.py b/pyhctsa/distribute.py index ba79823..ba441ea 100644 --- a/pyhctsa/distribute.py +++ b/pyhctsa/distribute.py @@ -60,7 +60,7 @@ class LocalDistributor(BaseDistributor): Number of worker processes to use. If ``None``, defaults to ``multiprocessing.cpu_count()``. """ - def __init__(self, n_workers : Union[int, None] = None): + def __init__(self, n_workers: Union[int, None] = None): self.n_workers = n_workers or mp.cpu_count() try: pathos_mp.set_start_method('spawn', force=True) diff --git a/pyhctsa/operations/correlation.py b/pyhctsa/operations/correlation.py index 1a35a09..d728b2e 100644 --- a/pyhctsa/operations/correlation.py +++ b/pyhctsa/operations/correlation.py @@ -157,7 +157,7 @@ def add_noise(y: ArrayLike, tau: Union[int, str] = 1, ami_method: str = 'even', return out -def first_under_fn(x : ArrayLike, m : ArrayLike, p : ArrayLike) -> float: +def first_under_fn(x: ArrayLike, m: ArrayLike, p: ArrayLike) -> float: """ Find the value of m for the first time p goes under the threshold, x. p and m are vectors of the same length @@ -916,7 +916,7 @@ def stick_angles(y: ArrayLike) -> dict: return out -def _sub_statav(x: ArrayLike, n : int) -> tuple: +def _sub_statav(x: ArrayLike, n: int) -> tuple: # helper function nn = len(x) if nn < 2 * n: # not long enough @@ -1280,7 +1280,7 @@ def embed2_shapes(y: ArrayLike, tau: Union[str, int, None] = 'tau', counts -= 1 # ignore self counts if np.all(counts == 0): - print("No counts detected!") + logging.warning("embed2_shapes: no counts detected!") return np.nan # Return basic statistics on the counts @@ -1535,7 +1535,7 @@ def acf_y(t): for i, t in enumerate(tau): if np.any(np.isnan(y)): good_r = (~np.isnan(y[:N-t])) & (~np.isnan(y[t:])) - print(f'NaNs in time series, computing for {np.sum(good_r)}/{len(good_r)} pairs of points') + logging.info(f'NaNs in time series, computing for {np.sum(good_r)}/{len(good_r)} pairs of points.') y1 = y[:N-t] y1n = y1[good_r] - np.mean(y1[good_r]) y2 = y[t:] @@ -1719,7 +1719,7 @@ def _stat_av(y: ArrayLike, window_stat: str = 'mean', num_seg: int = 5, inc_move logging.warning(f"Time-series of length {len(y)} is too short for {num_seg} windows") return np.nan inc = np.floor(win_length/inc_move) # increment to move at each step - # if incrment rounded down to zero, prop it up + # if increment rounded down to zero, prop it up if inc == 0: inc = 1 @@ -1829,7 +1829,7 @@ def autocorr_shape(y: ArrayLike, stop_when: Union[int, str] = 'pos_drown') -> di # Check for good behavior if np.any(np.isnan(acf)): - # This is an anomalous time series (e.g., all constant, or conatining NaNs) + # This is an anomalous time series (e.g., all constant, or containing NaNs) out = np.nan out = {} diff --git a/pyhctsa/operations/distribution.py b/pyhctsa/operations/distribution.py index 0dffdc7..ab5eb1e 100644 --- a/pyhctsa/operations/distribution.py +++ b/pyhctsa/operations/distribution.py @@ -73,11 +73,11 @@ def compare_ks_fit(x: ArrayLike, what_distn: str) -> dict: elif what_distn == 'exp': # Check positivity if np.any(x < 0): - print("The data contains negative values, but Exponential is a positive-only distribution.") + logging.warning("The data contains negative values, but Exponential is a positive-only distribution.") return np.nan # Check constant if np.all(x == x[0]): - print("Data are a constant") + logging.warning("Data are a constant.") return np.nan # Fit Exponential distribution (equivalent to expfit in MATLAB) _, lam = expon.fit(x, floc=0) # force support at 0 @@ -91,7 +91,7 @@ def compare_ks_fit(x: ArrayLike, what_distn: str) -> dict: elif what_distn == 'logn': # Check positivity if np.any(x <= 0): - print("The data are not positive, but Log-Normal is a positive-only distribution.") + logging.warning("The data are not positive, but Log-Normal is a positive-only distribution.") return np.nan # Fit log-normal distribution shape, loc, scale = lognorm.fit(x, floc=0) # sigma, 0, exp(mu) @@ -107,7 +107,7 @@ def compare_ks_fit(x: ArrayLike, what_distn: str) -> dict: ffit_func = lambda xi: lognorm.pdf(xi, s=sigma, loc=0, scale=np.exp(mu)) else: - raise ValueError(f"Unknown distribution: {what_distn}.") + raise ValueError(f"Unknown distribution: {what_distn}.") # ---------------------------- # Estimate smoothed empirical distribution diff --git a/pyhctsa/operations/entropy.py b/pyhctsa/operations/entropy.py index 8efe530..54c6cf3 100644 --- a/pyhctsa/operations/entropy.py +++ b/pyhctsa/operations/entropy.py @@ -1,5 +1,6 @@ from math import factorial from typing import Optional, Union +import logging import numpy as np from numpy.typing import ArrayLike @@ -265,7 +266,7 @@ def multi_scale_entropy( pp_text = f"after {pre_process_how} pre-processing" else: pp_text = "" - print(f"Warning: Not enough samples ({len(y)} {pp_text}) to compute SampEn at multiple scales") + logging.warning(f"Not enough samples ({len(y)} {pp_text}) to compute sample entropy at multiple scales") return {'out': np.nan} # Output raw values diff --git a/pyhctsa/operations/graph.py b/pyhctsa/operations/graph.py index 351ada7..2b9be3b 100644 --- a/pyhctsa/operations/graph.py +++ b/pyhctsa/operations/graph.py @@ -3,6 +3,7 @@ import scipy from scipy.stats import expon, norm from ts2vg import NaturalVG +import logging from pyhctsa.operations.correlation import autocorr, first_crossing from pyhctsa.operations.entropy import distribution_entropy @@ -87,7 +88,7 @@ def visibility_graph(y: ArrayLike, meth: str = 'horiz', max_l: int = 5000) -> di N = len(y) if N > max_l: # too long to store in memory - print(f"Time series ({N} > {max_l}) is too long for visibility graph." + logging.info(f"Time series ({N} > {max_l}) is too long for visibility graph." f"Analyzing the first {max_l} samples.") y = y[:max_l] N = len(y) diff --git a/pyhctsa/operations/information.py b/pyhctsa/operations/information.py index a259f0f..3c6c37a 100644 --- a/pyhctsa/operations/information.py +++ b/pyhctsa/operations/information.py @@ -189,7 +189,7 @@ def _mi_bin(v1: ArrayLike, v2: ArrayLike, r1: Union[str, list] = 'range', if np.any(mask): mi = np.sum(p_ij[mask] * np.log(p_ij[mask] / p_ixp_j[mask])) else: - print("The histograms aren't catching any points. Perhaps due to an inappropriate custom range for binning the data.") + logging.warning("The histograms aren't catching any points. Perhaps due to an inappropriate custom range for binning the data.") mi = np.nan return mi @@ -403,7 +403,7 @@ def automutual_info( for k, delay in enumerate(time_delay): # check enough samples to compute automutual info if delay > n - min_samples: - # time sereis too short - keep the remaining values as NaNs + # time series too short - keep the remaining values as NaNs break # form the time-delay vectors y1 and y2 @@ -424,8 +424,8 @@ def automutual_info( amis[k] = mi_calc.computeAverageLocalOfObservations() if np.isnan(amis).any(): - print( - f"Warning: Time series (n={n}) is too short for automutual information calculations " + logging.warning( + f"Time series (n={n}) is too short for automutual information calculations " f"up to lags of {max(time_delay)}" ) @@ -457,9 +457,9 @@ def mutual_info( Parameters ---------- - y1 : ArrayLike + y1 : array-like First input time series. - y2 : ArrayLike + y2 : array-like Second input time series. est_method : str, optional Estimation method to use: @@ -679,7 +679,7 @@ def _rm_info(*args): return # some initial tests on the input arguments - x = np.array(args[0]) # make sure the imputs are in numpy array form + x = np.array(args[0]) # make sure the inputs are in numpy array form y = np.array(args[1]) x_shape = x.shape @@ -689,23 +689,23 @@ def _rm_info(*args): len_y = y_shape[0] if len(x_shape) != 1: # makes sure x is a row vector - print("Error: invalid dimension of x") + logging.warning("Invalid dimension of x") return if len(y_shape) != 1: - print("Error: invalid dimension of y") + logging.warning("Invalid dimension of y") return if len_x != len_y: # makes sure x and y have the same amount of elements - print("Error: unequal length of x and y") + logging.warning("Unequal length of x and y") return if n_args > 5: - print("Error: too many arguments") + logging.warning("Too many arguments") return if n_args < 2: - print("Error: not enough arguments") + logging.warning("Not enough arguments") return # setting up variables depending on amount of inputs @@ -869,19 +869,19 @@ def _rm_histogram_2(*args): leny = yshape[0] if len(xshape) != 1: # makes sure x is a row vector - print("Error: invalid dimension of x") + logging.warning("Invalid dimension of x") return if len(yshape) != 1: - print("Error: invalid dimension of y") + logging.warning("Invalid dimension of y") return if lenx != leny: # makes sure x and y have the same amount of elements - print("Error: unequal length of x and y") + logging.warning("Unequal length of x and y") return if nargin > 3: - print("Error: too many arguments") + logging.warning("Too many arguments") return if nargin == 2: @@ -909,16 +909,16 @@ def _rm_histogram_2(*args): # checking descriptor to make sure it is valid, otherwise print an error if ncellx < 1: - print("Error: invalid number of cells in X dimension") + logging.warning("Invalid number of cells in X dimension") if ncelly < 1: - print("Error: invalid number of cells in Y dimension") + logging.warning("Invalid number of cells in Y dimension") if upperx <= lowerx: - print("Error: invalid bounds in X dimension") + logging.warning("Invalid bounds in X dimension") if uppery <= lowery: - print("Error: invalid bounds in Y dimension") + logging.warning("Invalid bounds in Y dimension") result = np.zeros([int(ncellx), int(ncelly)], dtype=int) # should do the same thing as matlab: result(1:ncellx,1:ncelly) = 0; diff --git a/pyhctsa/operations/medical.py b/pyhctsa/operations/medical.py index 76b7d09..cfb715f 100644 --- a/pyhctsa/operations/medical.py +++ b/pyhctsa/operations/medical.py @@ -271,7 +271,6 @@ def pol_var(x: ArrayLike, d: float = 1, D: int = 6) -> float: i = 0 pc = 0 - # seqcnt = 0 while i <= (N-D): x_seq = x_sym[i:(i+D)] if np.array_equal(x_seq, z_seq) or np.array_equal(x_seq, o_seq): diff --git a/pyhctsa/operations/model_fit.py b/pyhctsa/operations/model_fit.py index d395453..6255588 100644 --- a/pyhctsa/operations/model_fit.py +++ b/pyhctsa/operations/model_fit.py @@ -9,6 +9,7 @@ from statsmodels.stats.diagnostic import acorr_ljungbox from statsmodels.tsa.ar_model import AutoReg, ar_select_order from lmfit.models import SineModel +import logging from ..operations.correlation import autocorr, first_crossing from ..operations.stationarity import sliding_window @@ -306,7 +307,7 @@ def local_simple(y: ArrayLike, forecast_meth: str = 'mean', Returns ------- dict - Dictionary containing output statistics on the residuals of the simple fortecasting method. + Dictionary containing output statistics on the residuals of the simple forecasting method. """ y = np.asarray(y) @@ -315,11 +316,11 @@ def local_simple(y: ArrayLike, forecast_meth: str = 'mean', if train_length == 'ac': lp = first_crossing(y, 'ac', 0, 'discrete') else: - #the e length of the subsegment preceeding to use to predict the subsequent value + #the e length of the subsegment preceding to use to predict the subsequent value lp = train_length evalr = np.arange(lp, N) #range over which to evaluate the forecast if np.size(evalr) == 0: - print("This time series is too short for forecasting") + logging.warning("This time series is too short for forecasting") return np.nan res = np.zeros(len(evalr)) if forecast_meth == 'mean': @@ -400,15 +401,15 @@ def exp_smoothing(x: ArrayLike, n_train: Union[None, int, float] = None, min_train, max_train = 100, 1000 if n_train > max_train: - print(f"Training set size reduced from {n_train} to {max_train}.") + logging.info(f"Training set size reduced from {n_train} to {max_train}.") n_train = max_train if n_train < min_train: - print(f"Training set size increased from {n_train} to {min_train}.") + logging.info(f"Training set size increased from {n_train} to {min_train}.") n_train = min_train if N < n_train: - print("Time series is too short for the specified training size.") + logging.warning("Time series is too short for the specified training size.") return np.nan # --- Find Optimal Alpha --- @@ -427,7 +428,7 @@ def exp_smoothing(x: ArrayLike, n_train: Union[None, int, float] = None, # Check for valid RMSEs before fitting valid_indices = ~np.isnan(rmses) if np.sum(valid_indices) < 3: - print("Not enough valid points for quadratic fit; choosing best alpha from search.") + logging.info("Not enough valid points for quadratic fit; choosing best alpha from search.") alphamin = alphar[np.nanargmin(rmses)] if np.any(valid_indices) else 0.5 else: # Fit quadratic to the 3 points with the lowest RMSE @@ -463,7 +464,7 @@ def exp_smoothing(x: ArrayLike, n_train: Union[None, int, float] = None, valid_ref = ~np.isnan(rmses_ref) if not np.any(valid_ref): - print("Could not compute RMSE in refined search; using previous alpha.") + logging.info("Could not compute RMSE in refined search; using previous alpha.") else: p2 = np.polyfit(alphar_ref[valid_ref], rmses_ref[valid_ref], 2) if p2[0] < 0: # Bad fit, fallback to best alpha in search @@ -482,7 +483,7 @@ def exp_smoothing(x: ArrayLike, n_train: Union[None, int, float] = None, yp, xp = y_fit[2:], x[2:] if len(yp) < 2: - print("Not enough points to calculate residual statistics.") + logging.warning("Not enough points to calculate residual statistics.") residout = {'mean': np.nan, 'std': np.nan, 'AC1': np.nan} else: residuals = yp - xp @@ -626,7 +627,7 @@ def ar_cov(y: ArrayLike, p: int = 2) -> dict: return out -def _arconf_from_arfit(fitted_ar, the_conf_interval : float = 0.95) -> dict: +def _arconf_from_arfit(fitted_ar, the_conf_interval: float = 0.95) -> dict: params = fitted_ar.params has_intercept = fitted_ar.model.trend == 'c' if has_intercept: diff --git a/pyhctsa/operations/nonlinearity.py b/pyhctsa/operations/nonlinearity.py index 69f09b9..728ae2b 100644 --- a/pyhctsa/operations/nonlinearity.py +++ b/pyhctsa/operations/nonlinearity.py @@ -2,6 +2,7 @@ import numpy as np from numpy.typing import ArrayLike +import logging from sklearn.decomposition import PCA @@ -83,7 +84,7 @@ def _ms_embed(z, v, w): lags = np.sort(lags) dim = len(lags) if n <= lags[-1]: - print("Vector is too small to be embedded with the given lags.") + logging.warning("Vector is too small to be embedded with the given lags.") return np.full((dim, 1), np.nan), None w_win = lags[-1] - lags[0] # window width (renamed to avoid shadowing arg) @@ -251,11 +252,12 @@ def nlpe(y: ArrayLike, de: int = 3, tau: Union[int, str] = 1, max_n: int = 5000) if n > max_n: # crop the time series to the first max_n samples y = y[:max_n] - print(f"Michael Small's nlpe code is only being evaluated on the first {max_n} (/{n}) samples.") + logging.info(f"Michael Small's nlpe code is only being evaluated on the first {max_n} (/{n}) samples.") n = max_n if n < 20: # short time series cause problems - print(f'Time series (N = {len(y)}) is too short.') + logging.warning(f'Time series (N = {len(y)}) is too short.') + return np.nan # run the nonlinear prediction error code res = _ms_nlpe(y, de, tau) @@ -302,18 +304,18 @@ def embed_pca(y: ArrayLike, tau: Union[str, int] = 'ac', m: int = 3) -> dict: if tau == 'ac': tau = first_crossing(y, 'ac', 0, 'discrete') if np.isnan(tau): - print('Could not get time delay by ACF (time series too short?)') + logging.warning('Could not get time delay by ACF (time series too short?)') return np.nan elif tau == 'mi': tau = first_min(y, 'mi') if np.isnan(tau): - print('Could not get time delay by mutual information (time series too short?)') + logging.warning('Could not get time delay by mutual information (time series too short?)') return np.nan else: raise ValueError(f'Invalid time-delay method: {tau}. Choose either mi or ac.') n_embed = n - (m-1)*tau if n_embed <= 0: - print(f'Time series (N = {n}) too short to embed with these embedding parameters.') + logging.warning(f'Time series (N = {n}) too short to embed with these embedding parameters.') return np.nan y_embed = np.zeros((n_embed, m)) diff --git a/pyhctsa/operations/pre_process.py b/pyhctsa/operations/pre_process.py index 96a26f5..ecab178 100644 --- a/pyhctsa/operations/pre_process.py +++ b/pyhctsa/operations/pre_process.py @@ -2,6 +2,7 @@ from numpy.typing import ArrayLike from scipy.signal import lfilter, resample_poly from statsmodels.tsa.tsatools import detrend +import logging from ..operations.distribution import outlier_test from ..operations.stationarity import sliding_window, stat_av @@ -114,7 +115,7 @@ def preproc_compare(y: ArrayLike, detrend_meth: str = 'medianf') -> dict: try: order = int(order) except ValueError: - print(f"Could not convert order: `{order}' to integer.") + logging.warning(f"Could not convert order: `{order}' to integer.") y_d = detrend(y, order=order, axis=0) # 2) Differencing @@ -126,7 +127,7 @@ def preproc_compare(y: ArrayLike, detrend_meth: str = 'medianf') -> dict: try: ndiff = int(ndiff) except ValueError: - print(f"Could not convert ndiff: `{ndiff}' to integer.") + logging.warning(f"Could not convert ndiff: `{ndiff}' to integer.") y_d = np.diff(y, n=ndiff, axis=0) # 3) Median filter @@ -138,7 +139,7 @@ def preproc_compare(y: ArrayLike, detrend_meth: str = 'medianf') -> dict: try: med_ord = int(med_ord) except ValueError: - print(f"Could not convert median order: `{med_ord}' to integer.") + logging.warning(f"Could not convert median order: `{med_ord}' to integer.") y_d = _med_filt_1d(y, med_ord) # 4) Running average @@ -150,7 +151,7 @@ def preproc_compare(y: ArrayLike, detrend_meth: str = 'medianf') -> dict: try: rav_wsize = int(rav_wsize) except ValueError: - print(f"Could not running average window size: `{rav_wsize}' to integer.") + logging.warning(f"Could not running average window size: `{rav_wsize}' to integer.") y_d = lfilter(np.ones(rav_wsize)/rav_wsize, [1], y) elif 'resample' in detrend_meth: diff --git a/pyhctsa/operations/scaling.py b/pyhctsa/operations/scaling.py index 5949b21..3f62c32 100644 --- a/pyhctsa/operations/scaling.py +++ b/pyhctsa/operations/scaling.py @@ -5,6 +5,7 @@ from scipy.stats import iqr from scipy.interpolate import interp1d import statsmodels.api as sm +import logging from ..toolboxes.Max_Little import fastdfa from ..utils import make_mat_buffer @@ -46,7 +47,7 @@ def fast_dfa(y: ArrayLike) -> float: def fluctuation_analysis(x: ArrayLike, q: Union[float, int] = 2, wtf: str = 'rsrange', tau_step: int = 1, k: int = 1, - lag: Union[int, None] = None, log_inc: bool = True): + lag: Union[int, None] = None, log_inc: bool = True) -> dict: """ Implements fluctuation analysis by a variety of methods. @@ -68,7 +69,7 @@ def fluctuation_analysis(x: ArrayLike, q: Union[float, int] = 2, Parameters ---------- - x : ArrayLike + x : array-like The input time series. q : Union[float, int], optional The parameter in the fluctuation function. The default is q = 2 which gives RMS @@ -123,7 +124,7 @@ def fluctuation_analysis(x: ArrayLike, q: Union[float, int] = 2, taur = np.arange(5, np.floor(N/2) + 1, tau_step) # maybe increased?? ntau = len(taur) # analyze the time series across this many timescales if ntau < 8: # fewer than 8 points - print(f'This time series (N = {N}) is too short to analyze using this fluctuation analysis') + logging.warning(f'This time series (N = {N}) is too short to analyze using this fluctuation analysis.') out = np.nan return out diff --git a/pyhctsa/operations/spectral.py b/pyhctsa/operations/spectral.py index 3233712..fc4db3f 100644 --- a/pyhctsa/operations/spectral.py +++ b/pyhctsa/operations/spectral.py @@ -418,7 +418,7 @@ def _findpeaks(s, min_pk_dist=0, sort_str='none'): return pk_height, pk_loc -def give_me_robust_stats(x_data, y_data, field_name) -> dict: +def give_me_robust_stats(x_data: ArrayLike, y_data: ArrayLike, field_name: str) -> dict: """ Statistics based on a robust linear fit """ diff --git a/pyhctsa/operations/stationarity.py b/pyhctsa/operations/stationarity.py index e75abf7..a85bf52 100644 --- a/pyhctsa/operations/stationarity.py +++ b/pyhctsa/operations/stationarity.py @@ -55,14 +55,11 @@ def local_distributions(y: ArrayLike, num_segs: int = 5, each_or_par: str = 'par start_idx = i * lseg end_idx = (i + 1) * lseg segment_data = y[start_idx:end_idx] - #kde = KDEUnivariate(segment_data) kde = gaussian_kde(segment_data, bw_method="scott") - #kde.fit(bw="scott") # tune bw adjustment factor empiricially? dns[:, i] = kde.evaluate(r) # Compare the local distributions if each_or_par in ["par", "parent"]: #Compares each subdistribtuion to the parent (full signal) distribution - #kde = KDEUnivariate(y).fit(bw="scott") kde = gaussian_kde(y, bw_method="scott") pardn = kde.evaluate(r) divs = np.zeros(num_segs) @@ -260,7 +257,6 @@ def moment_corr(x: ArrayLike, window_length: Union[None, float] = None, # first calculate the first moment in all the windows M1 = _calc_me_moments(x_buff, mom_1) M2 = _calc_me_moments(x_buff, mom_2) - #print(M1) out = {} rmat = np.corrcoef(M1, M2) @@ -480,7 +476,7 @@ def kpss_test(y: ArrayLike, lags: Union[int, list] = 0) -> dict: Parameters ---------- - y : ArrayLike + y : array-like The input time series to analyze for stationarity lags : Union[int, list], optional Either: @@ -552,7 +548,7 @@ def range_evolve(y: ArrayLike) -> dict: fullr = np.ptp(y) - # return number of unqiue entries in a vector, x + # return number of unique entries in a vector, x lunique = lambda x : len(np.unique(x)) out['totnuq'] = lunique(cums) @@ -680,7 +676,7 @@ def local_global(y: ArrayLike, subset_how: str = 'l', n: Union[int, float, None] Parameters ----------- - y : ArrayLike + y : array-like The time series to analyse. subset_how : str, optional The method to select the local subset of time series: @@ -720,7 +716,6 @@ def local_global(y: ArrayLike, subset_how: str = 'l', n: Union[int, float, None] elif subset_how == 'p': # take initial proportion n of time series r = np.arange(int(np.floor(N*n))) - #print(r) elif subset_how == 'unicg': r = np.round(np.linspace(1, N, n)).astype(int) - 1 else: @@ -756,7 +751,7 @@ def fit_polynomial(y: ArrayLike, k: int = 1) -> float: Parameters ----------- - y : ArrayLike + y : array-like the time series to analyze. k : int, optional the order of the polynomial to fit to y. Default is 1. @@ -942,10 +937,10 @@ def stat_av(y: ArrayLike, what_type: str = 'seg', extra_param: int = 5) -> float pn = int(np.floor(N / extra_param)) M = np.array([np.mean(y[j*extra_param:(j+1)*extra_param]) for j in range(pn)]) else: - print(f"This time series (N = {N}) is too short for StatAv({what_type},'{extra_param}')") + logging.warning(f"This time series (N = {N}) is too short for stat_av({what_type},'{extra_param}')") return np.nan else: - raise ValueError(f"Error evaluating StatAv of type '{what_type}', please select either 'seg' or 'len'") + raise ValueError(f"Error evaluating stat_av of type '{what_type}', please select either 'seg' or 'len'") s = np.std(y, ddof=1) # should be 1 (for a z-scored time-series input) sdav = np.std(M, ddof=1) @@ -954,7 +949,7 @@ def stat_av(y: ArrayLike, what_type: str = 'seg', extra_param: int = 5) -> float return float(out) def sliding_window(y: ArrayLike, window_stat: str = 'mean', across_win_stat: str = 'std', - num_seg: int = 5, inc_move: int = 2): + num_seg: int = 5, inc_move: int = 2) -> dict: """ Sliding window measures of stationarity. @@ -1083,7 +1078,7 @@ def sliding_window(y: ArrayLike, window_stat: str = 'mean', across_win_stat: str return out def _get_window(step_ind, inc, win_length): - # helper funtion to convert a step index (stepInd) to a range of indices corresponding to that window + # helper function to convert a step index (stepInd) to a range of indices corresponding to that window start_idx = (step_ind) * inc end_idx = (step_ind) * inc + win_length diff --git a/pyhctsa/operations/symbolic.py b/pyhctsa/operations/symbolic.py index d9dbffa..06bd13c 100644 --- a/pyhctsa/operations/symbolic.py +++ b/pyhctsa/operations/symbolic.py @@ -85,7 +85,7 @@ def surprise(y: ArrayLike, what_prior: str = 'dist', memory: float = 0.2, num_gr store[i] = p elif what_prior == 'T1': # uses one-point correlations in memory to inform the next point - # estimate transition probabilites from data in memory + # estimate transition probabilities from data in memory # find where in memory this has been observbed before, and preceded it memory_data = yth[rs[0, i] - memory:rs[0, i]] inmem = np.where(memory_data[:-1] == yth[rs[0, i] - 1])[0] @@ -124,8 +124,7 @@ def surprise(y: ArrayLike, what_prior: str = 'dist', memory: float = 0.2, num_gr out['min'] = np.nan # Calculate statistics - #print(sum(store)) - out['max'] = np.max(store) # maximum amount of information you cna gain in this way + out['max'] = np.max(store) # maximum amount of information you can gain in this way out['mean'] = np.mean(store) out['sum'] = np.sum(store) out['median'] = np.median(store) @@ -588,7 +587,7 @@ def transition_matrix(y: ArrayLike, how_to_cg: str = 'quantile', num_groups : int, optional number of groups in the course-graining. Default is 2. tau : int or str, optional - analyze transition matricies corresponding to this lag. We + analyze transition matrices corresponding to this lag. We could either downsample the time series at this lag and then do the discretization as normal, or do the discretization and then just look at this dicrete lag. Here we do the former. Can also set tau to 'ac' diff --git a/pyhctsa/operations/wavelet.py b/pyhctsa/operations/wavelet.py index 4e0731a..7efcf90 100644 --- a/pyhctsa/operations/wavelet.py +++ b/pyhctsa/operations/wavelet.py @@ -7,6 +7,7 @@ import numpy as np from numpy.typing import ArrayLike import pywt +import logging from ..utils import sign_change from pywt._extensions._pywt import ( @@ -26,9 +27,9 @@ def _custom_cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=- Parameters ---------- - data : array_like + data : array-like Input signal - scales : array_like + scales : array-like The wavelet scales to use. One can use ``f = scale2frequency(wavelet, scale)/sampling_period`` to determine what physical frequency, ``f``. Here, ``f`` is in hertz when the @@ -251,9 +252,9 @@ def scal_2_freq(y: ArrayLike, w_name: str = 'db3', a_max: int = 5, delta: int = if a_max == 'max': a_max = max_level if max_level < a_max: - print(f'Chosen level {a_max} is too large for this wavelet on this signal...') + logging.info(f'Chosen level {a_max} is too large for this wavelet on this signal...') a_max = max_level # set to max allowed level - print(f'changed to maximum level computed with wmaxlev: {a_max}') + logging.info(f'changed to maximum level computed with wmaxlev: {a_max}') # % Define scales. scales = np.arange(1, a_max+1) @@ -305,7 +306,7 @@ def dwt_coeff(y: ArrayLike, w_name: str = 'db3', level: int = 3) -> dict: level = pywt.dwt_max_level(N, w_name) max_level_allowed = pywt.dwt_max_level(N, w_name) if max_level_allowed < level: - print("Chosen level is too large for this wavelet on this signal....\n") + logging.warning("Chosen level is too large for this wavelet on this signal....\n") #%% Perform Wavelet Decomposition C, L = None, None if max_level_allowed < level: # if level exceeds max level, just use max level instead @@ -425,7 +426,7 @@ def cwt(y: ArrayLike, w_name: str = 'db3', max_scale: int = 32) -> dict: return out -def _slosr(xx : ArrayLike) -> int: +def _slosr(xx: ArrayLike) -> int: # helper function for detail_coeffs the_max_level = len(xx) slosr = np.zeros(the_max_level-2) @@ -464,9 +465,9 @@ def detail_coeffs(y: ArrayLike, w_name: str = 'db3', max_level: Union[int, str] if max_level == 'max': max_level = pywt.dwt_max_level(N, w_name) if pywt.dwt_max_level(N, w_name) < max_level: - print(f"Chosen wavelet level is too large for the {w_name} wavelet for this signal of length N = {N}") + logging.info(f"Chosen wavelet level is too large for the {w_name} wavelet for this signal of length N = {N}") max_level = pywt.dwt_max_level(N, w_name) - print(f"Using a wavelet level of {max_level} instead.") + logging.info(f"Using a wavelet level of {max_level} instead.") # Perform a single-level wavelet decomposition means = np.zeros(max_level) # mean detail coefficient magnitude at each level medians = np.zeros(max_level) # median detail coefficient magnitude at each level @@ -578,7 +579,7 @@ def wl_coeffs(y: ArrayLike, w_name: str = 'db3', level: Union[int, str] = 3) -> return out -def wavedec(data: ArrayLike, wavelet: str, mode: str ='symmetric', level: int = 1, axis=-1): +def wavedec(data: ArrayLike, wavelet: str, mode: str ='symmetric', level: int = 1, axis=-1) -> tuple: """ Multiple level 1-D discrete fast wavelet decomposition. Taken from https://github.com/izlotnik/wavelet-wrcoef/blob/master/wrcoef.py diff --git a/pyhctsa/utils.py b/pyhctsa/utils.py index eddc428..6823880 100644 --- a/pyhctsa/utils.py +++ b/pyhctsa/utils.py @@ -8,6 +8,7 @@ import yaml from pyhctsa import __version__ from pathlib import Path +import logging import numpy as np from numpy.typing import ArrayLike @@ -100,28 +101,28 @@ def _check_optional_deps(dep: str) -> bool: except (ImportError, AttributeError): return False except jpype.JVMNotFoundException: - print("JVM not found. Please check your JAVA_HOME or installation.") + logging.warning("JVM not found. Please check your JAVA_HOME or installation.") return False return True except PackageNotFoundError: return False -def _validate_data(ts : np.ndarray) -> bool: +def _validate_data(ts: np.ndarray) -> bool: """validate a time series before computing features""" if len(ts) < 100: - print("Time series is too short!") + logging.warning("Time series is too short!") return False if np.all(ts == ts[0]): # constant time series # maybe do a tolerance instead? - print("Time series is constant") + logging.warning("Time series is constant.") return False if np.any(np.isnan(ts)): # data contains nans - print("Time series contains NaNs") + logging.warning("Time series contains NaNs.") return False if np.any(np.isinf(ts)): - print("Time series contains Inf") + logging.warning("Time series contains Inf.") return False return True @@ -193,7 +194,7 @@ def get_dataset(which: str = "e1000") -> list: print(f"Loaded dataset of {len(dataset)} time series.") return dataset -def _preprocess_decorator(zscore : bool = False, absval : bool = False) -> Callable: +def _preprocess_decorator(zscore: bool = False, absval: bool = False) -> Callable: """ Decorator to preprocess time series data before feature computation. @@ -230,7 +231,7 @@ def wrapper(x, *args, **kwargs): return wrapper return decorator -def z_score(x : ArrayLike) -> np.ndarray: +def z_score(x: ArrayLike) -> np.ndarray: """ Z-score the input data vector. @@ -277,7 +278,7 @@ def z_score(x : ArrayLike) -> np.ndarray: return zscored_data -def histc(x : ArrayLike, bins : ArrayLike) -> int: +def histc(x: ArrayLike, bins: ArrayLike) -> int: """Counts the number of values in x that are within each specified bin.""" # Get indices of the bins to which each value in input array belongs. map_to_bins = np.digitize(x, bins) @@ -286,8 +287,8 @@ def histc(x : ArrayLike, bins : ArrayLike) -> int: res[el-1] += 1 # Increment appropriate bin. return res -def bin_picker(x_min : float, x_max : float, n_bins : Union[None, int], - bin_width_est : Union[None, float] = None) -> np.ndarray: +def bin_picker(x_min: float, x_max: float, n_bins: Union[None, int], + bin_width_est: Union[None, float] = None) -> np.ndarray: """ Choose histogram bins. @@ -386,7 +387,7 @@ def bin_picker(x_min : float, x_max : float, n_bins : Union[None, int], return edges -def simple_binner(x_data : ArrayLike, num_bins : int) -> tuple: +def simple_binner(x_data: ArrayLike, num_bins: int) -> tuple: """ Generate a histogram from equally spaced bins. @@ -459,7 +460,7 @@ def point_of_crossing(x: ArrayLike, threshold: float) -> tuple: return fc, poc -def sign_change(y : Union[list, np.ndarray], do_find: int = 0) -> ArrayLike: +def sign_change(y: Union[list, np.ndarray], do_find: int = 0) -> ArrayLike: """ Where a data vector changes sign. @@ -477,7 +478,7 @@ def sign_change(y : Union[list, np.ndarray], do_find: int = 0) -> ArrayLike: return indexs -def make_buffer(y : ArrayLike, buffer_size : int) -> np.ndarray: +def make_buffer(y: ArrayLike, buffer_size: int) -> np.ndarray: """ Make a buffered version of a time series. @@ -506,8 +507,8 @@ def make_buffer(y : ArrayLike, buffer_size : int) -> np.ndarray: return y_buffer -def make_mat_buffer(x : ArrayLike, n : int, p : int = 0, - opt : Union[str, None] = None) -> np.ndarray: +def make_mat_buffer(x: ArrayLike, n: int, p: int = 0, + opt: Union[str, None] = None) -> np.ndarray: ''' Create a buffer array. @@ -567,20 +568,20 @@ def make_mat_buffer(x : ArrayLike, n : int, p : int = 0, return result -def binarize(y : ArrayLike, binarize_how : str = 'diff') -> ArrayLike: +def binarize(y: ArrayLike, binarize_how: str = 'diff') -> ArrayLike: """ Converts an input vector into a binarized version. Parameters ----------- - y : array_like + y : array-like The input time series binarize_how : str, optional Method to binarize the time series: 'diff', 'mean', 'median', 'iqr'. Returns -------- - y_bin : array_like + y_bin : array-like The binarized time series """ if binarize_how == 'diff': @@ -613,7 +614,7 @@ def _step_binary(x : ArrayLike) -> ArrayLike: return y -def x_corr(x : ArrayLike, y : ArrayLike, normed : bool = True, max_lags : int = 10) -> tuple: +def x_corr(x: ArrayLike, y: ArrayLike, normed: bool = True, max_lags: int = 10) -> tuple: """ Calculates the cross-correlation coefficients or inner products between two signals at various lags. The function computes correlations up to a specified