diff --git a/pyspi/base_new.py b/pyspi/base_new.py new file mode 100644 index 0000000..c94702f --- /dev/null +++ b/pyspi/base_new.py @@ -0,0 +1,341 @@ +import numpy as np +import warnings, copy + + +from skbase import BaseObject +from pyspi.data import Data + +class BaseNewSPI(BaseObject): + """ + Base class for PySPI. This class provides a unified public API for the PySPI library. + + This class serves as the base class for all the pairwaise statistical interaction + measures (SPI) in the PySPI library. It uses the tagging system of skbase to indicate + capabilities rather than using decorators. + + Notes + ----- + This class follows a modern approach towards defining the public API by using + the tagging system of skbase. The tags are used to indicate the capabilties of the + handling the data. The old base class is still available for backward compatibility. + """ + + _tags = { + "capability-multivariate": True, + "capability-univariate": True, + "capability-bivariate": True, + "python_dependencies": "sktime", + "issigned": True, + "identifier": "base", + "name": "BasePySPI", + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def _ensure_data_format(self, data: Data, data2: Data = None) -> tuple: + """ + Ensure the data is in the correct format. + + Parameters + ---------- + data : Data + The data to be checked. + data2 : Data, optional + The second data to be checked. Default is None. + + Returns + ------- + tuple + A tuple containing the data and the second data in the correct format. + """ + + if not isinstance(data, Data): + data = Data(data) + if data2 is not None and not isinstance(data2, Data): + data2 = Data(data2) + + return data, data2 + + ## Public API ## + def spi(self, data, i=None, copy_data=False): + """Compute the univariate SPI value + + Parameters + ---------- + data : Data + The data to be used for the computation. + i : int, optional + The index of the variable to be used. Default is None. + copy_data : bool, optional + Whether to copy the data or not. Default is False. + + Returns + ------- + float + The computed SPI value. + """ + + data, _ = self.ensure_data_format(data, None) + if i is None: + if data.n_processes == 1: + i=0 + else: + raise ValueError("i must be specified for multiple processes") + + if copy_data: + data = copy.deepcopy(data) + + return self._spi(data, i) + + def spi_mat(self, data, data2=None, i=None, j=None, copy_data=False): + """ + Compute the pairwise SPI matrix + + Parameters + ---------- + data : Data + The data to be used for the computation. + data2 : Data, optional + The second data to be used for the computation. Default is None. + i : int, optional + Row index filter of the output matrix. Default is None. + j : int, optional + Column index filter of the output matrix. Default is None. + copy_data : bool, optional + Whether to copy the data or not. Default is False. + + Returns + ------- + np.ndarray + The computed SPI matrix. + """ + + # get the correct formatted data + data, data2 = self.ensure_data_format(data, data2) + + if copy_data: + data = copy.deepcopy(data) + if data2 is not None: + data2 = copy.deepcopy(data2) + + if data2 is None: + data2 = data + + n_processes1 = data.n_processes + n_processes2 = data2.n_processes + + A = np.empty((n_processes1, n_processes2)) + A[:] = np.nan + + self._compute_matrix(A, data, data2) + + # Apply symmetry if the measure is undirected and comparing within the same data + if not self.get_tag("capability-directed", False) and data is data2: + li = np.tril_indices(n_processes1, -1) + A[li] = A.T[li] + + # Apply filtering if required + if i is not None: + if isinstance(i, int): + A = A[i:i+1, :] + else: + A = A[i, :] + if j is not None: + if isinstance(j, int): + A = A[:, j:j+1] + else: + A = A[:, j] + + return A + + ## Private API ## + def _spi(self, data, i=0): + """ + Internal method to compute univariate SPI + + Parameters + ---------- + data : Data + The data to be used for the computation. + i : int, optional + The index of the variable to be used. Default is 0. + + Returns + ------- + float + The computed SPI value. + + Notes + ----- + This method should be overridden by the subclasses to implement. + """ + raise NotImplementedError("_spi must be implemented in the subclass") + + def _spi_mat(self, data, data2=None, i=None, j=None): + """ + Internal method to compute the SPI matrix + + Parameters + ---------- + data : Data + The data to be used for the computation. + data2 : Data, optional + The second data to be used for the computation. Default is None. + i : int, optional + Row index filter of the output matrix. Default is None. + j : int, optional + Column index filter of the output matrix. Default is None. + + Returns + ------- + np.ndarray + The computed SPI matrix. + + Notes + ----- + This method should be overridden by the subclasses to implement. + """ + raise NotImplementedError("_spi_mat must be implemented in the subclass") + + def _compute_matrix(self, A, data, data2): + """ + Internal method to compute the SPI matrix + + Parameters + ---------- + A : np.ndarray + The matrix to be filled with the computed SPI values. + data : Data + The data to be used for the computation. + data2 : Data + The second data to be used for the computation. + + Notes + ----- + This method populates the A matrix with computed SPI values. + Can be overridden by subclasses to implement specific logic. + """ + n_processes1 = data.n_processes + n_processes2 = data2.n_processes + + for row in range(n_processes1): + for col in range(n_processes2): + # skip diagonal elements + if data is data2 and row == col and self.get_tag("capability-directed", False): + continue + A[row, col] = self._compute_pair(data, data2, row, col) + + def _compute_pair(self, data, data2, i, j): + """ + Internal method to compute the SPI value for a pair of processes + + Parameters + ---------- + data : Data + The data to be used for the computation. + data2 : Data + The second data to be used for the computation. + i : int + The index of the first process. + j : int + The index of the second process. + + Returns + ------- + float + The computed SPI value for the pair of processes. + + Notes + ----- + This method should be overridden by the subclasses to implement. + """ + raise NotImplementedError("_compute_pair must be implemented in the subclass") + +class DirectedNewSPI(BaseNewSPI): + """ + Base class for directed SPI measures. + + Captures asymmetric relationships between processes. + + Examples + -------- + >>> from pyspi import DirectedNewSPI + >>> class MyDirectedSPI(DirectedNewSPI): + ... def _compute_pair(self, data, data2, i, j): + ... # Implement the directed SPI computation here + ... return np.random.rand() + >>> my_spi = MyDirectedSPI() + >>> data = Data(np.random.rand(10, 5)) + >>> result = my_spi.spi(data, i=0) + >>> print(result) + """ + + _tags = { + "capability-directed": True, + "capability-signed": True, + "identifier": "directed", + "name": "DirectedPySPI", + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + +class UndirectedNewSPI(BaseNewSPI): + """ + Base class for undirected SPI measures. + + Captures symmetric relationships between processes. + + Examples + -------- + >>> from pyspi import UndirectedNewSPI + >>> class MyUndirectedSPI(UndirectedNewSPI): + ... def _compute_pair(self, data, data2, i, j): + ... # Implement the undirected SPI computation here + ... return np.random.rand() + >>> my_spi = MyUndirectedSPI() + >>> data = Data(np.random.rand(10, 5)) + >>> result = my_spi.spi(data, i=0) + >>> print(result) + """ + + _tags = { + "capability-directed": False, + "capability-signed": True, + "identifier": "undirected", + "name": "UndirectedPySPI", + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + +class SignedNewSPI(BaseNewSPI): + """ + Base class for signed SPI measures. + + Captures both positive and negative relationships between processes. + + Examples + -------- + >>> from pyspi import SignedNewSPI + >>> class MySignedSPI(SignedNewSPI): + ... def _compute_pair(self, data, data2, i, j): + ... # Implement the signed SPI computation here + ... return np.random.rand() + >>> my_spi = MySignedSPI() + >>> data = Data(np.random.rand(10, 5)) + >>> result = my_spi.spi(data, i=0) + >>> print(result) + """ + + _tags = { + "capability-signed": True, + "identifier": "signed ", + "name": "SignedPySPI", + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + +### Example Usage to be added later after finalizing the API design ### \ No newline at end of file diff --git a/pyspi/spi/distance_correlation.py b/pyspi/spi/distance_correlation.py new file mode 100644 index 0000000..9efb1eb --- /dev/null +++ b/pyspi/spi/distance_correlation.py @@ -0,0 +1 @@ +from pyspi.base_new import UndirectedNewSPI diff --git a/pyspi/spi/transfer_entropy.py b/pyspi/spi/transfer_entropy.py new file mode 100644 index 0000000..68eb13f --- /dev/null +++ b/pyspi/spi/transfer_entropy.py @@ -0,0 +1,210 @@ +import numpy as np + +from pyspi.base_new import DirectedNewSPI + +class TransferEntropyNewSPI(DirectedNewSPI): + """ + Transfer Entropy implementation for the new PySPI architecture. + + Transfer Entropy (TE) is an information-theoretic measure of directed + information transfer between two random processes. It quantifies how much knowing + the past of one process (source) improves the prediction of another process, + beyond what the target's own past provides. + + parameters + ---------- + lag_source : int, default=1 + The lag to be used for the source process. + lag_target : int, default=1 + The lag to be used for the target process. + estimator : str, default="binning" + Method to estimate probability distributions. + n_bins : int, default=10 + Number of bins to use for the binning estimator. + + Examples + -------- + >>> import numpy as np + >>> from pyspu.data import Data + >>> from pyspi.base_new import TransferEntropyNewSPI + >>> data = Data(np.random.rand(100, 2)) # Two processes + >>> te = TransferEntropyNewSPI(lag_source=1, lag_target=1, estimator="binning", n_bins=10) + >>> result = te.spi_mat(data) + >>> print(result.shape) # (2, 2) + """ + + _tags = { + "capability-directed": True, + "capability-signed": False, + "capability-multivariate": True, + "identifier": "transfer_entropy", + "name": "TransferEntropyNewSPI", + "python_dependencies": ["numpy", "scipy"] + } + + def __init__(self, lag_source=1, lag_target=1, estimator="binning", n_bins=10, **kwargs): + self.lag_source = lag_source + self.lag_target = lag_target + self.estimator = estimator + self.n_bins = n_bins + super().__init__(**kwargs) + + def _spi(self, data, i=0): + """ + Compute unvariate transfer entropy for a single process. + Returns 0.0, since TE requires 2 processes + + Parameters + ---------- + data : Data + The input data used for computation + i : int + Index of the process + + Returns + ------- + float + Always returns 0.0 as TE is inherently bivariate. + """ + return 0.0 + + def _compute_pair(self, data, data2, i, j): + """ + Compute transfer entropy from process i to process j. + + Parameters + ---------- + data : Data + The input data used for computation + data2 : Data + The second input data used for computation + i : int + Index of the source process + j : int + Index of the target process + + Returns + ------- + float + The transfer entropy from process i to process j. + """ + if data is data2 and i==j: + return 0.0 + + x_src = data.processes[i] + y_tgt = data.processes[j] + + return self._get_transfer_entropy(x_src, y_tgt) + + def _get_transfer_entropy(self, x_src, y_tgt): + """Compute Transfer Entropy from source to target time series. + + Parameters + ---------- + x_src : np.ndarray + Source time series. + y_tgt : np.ndarray + Target time series. + + Returns + ------- + float + Tranfer Entropy value + """ + x = x_src.flatten() if x_src.ndim > 1 else x_src + y = y_tgt.flatten() if y_tgt.ndim > 1 else y_tgt + + min_length = max(self.lag_source, self.lag_target) + 1 + if len(x) < min_length or len(y) < min_length: + raise ValueError(f"Input time series must be at least {min_length} samples long.") + + # Create lagged versions + start_idx = max(self.lag_source, self.lag_target) + x_lagged = np.array([x[i - self.lag_source] for i in range(start_idx, len(x))]) + y_lagged = np.array([y[i - self.lag_target] for i in range(start_idx, len(y))]) + y_future = np.array([y[i] for i in range(start_idx, len(y))]) + + if len(x_lagged) == 0 or len(y_lagged) == 0 or len(y_future) == 0: + return np.nan + + if self.estimator == "binning": + # Discretize the data + x_bins = np.histogram_bin_edges(x, bins=self.n_bins) + y_bins = np.histogram_bin_edges(y, bins=self.n_bins) + + x_lagged_binned = np.digitize(x_lagged, x_bins) + y_lagged_binned = np.digitize(y_lagged, y_bins) + y_future_binned = np.digitize(y_future, y_bins) + + # Compute probability distributions + p_yfuture_ylag = self._joint_prob(y_future_binned, y_lagged_binned) + p_yfuture_ylag_xlag = self._joint_prob_3d(y_future_binned, y_lagged_binned, x_lagged_binned) + + p_ylag = self._marginal_prob(y_lagged_binned) + p_ylag_xlag = self._joint_prob(y_lagged_binned, x_lagged_binned) + + # Compute conditional entropies + H1 = self._conditional_entropy(p_yfuture_ylag, p_ylag) + H2 = self._conditional_entropy(p_yfuture_ylag_xlag, p_ylag_xlag) + + te_value = H1 - H2 + return max(0.0, te_value) # TE should be non-negative + else: + raise NotImplementedError(f"Estimator {self.estimator} not implemented yet.") + + def _joint_prob(self, x, y): + """Estimate 2D joint probability distribution.""" + if len(x) == 0 or len(y) == 0: + return np.array([[1e-10]]) + + bins = (max(1, int(np.max(x)) + 1), max(1, int(np.max(y)) + 1)) + hist, _, _ = np.histogram2d(x, y, bins=bins) + return hist / np.sum(hist) if np.sum(hist) > 0 else hist + 1e-10 + + def _joint_prob_3d(self, x, y, z): + """Estimate 3D joint probability distribution.""" + if len(x) == 0 or len(y) == 0 or len(z) == 0: + return np.array([[[1e-10]]]) + + xyz = np.vstack((x, y, z)).T + unique, counts = np.unique(xyz, axis=0, return_counts=True) + + max_x = max(1, int(np.max(x)) + 1) + max_y = max(1, int(np.max(y)) + 1) + max_z = max(1, int(np.max(z)) + 1) + + hist = np.zeros((max_x, max_y, max_z)) + for (i, j, k), c in zip(unique, counts): + hist[int(i), int(j), int(k)] = c + return hist / np.sum(hist) if np.sum(hist) > 0 else hist + 1e-10 + + def _marginal_prob(self, x): + """Estimate 1D marginal probability distribution.""" + if len(x) == 0: + return np.array([1e-10]) + + bins = max(1, int(np.max(x)) + 1) + hist, _ = np.histogram(x, bins=bins) + return hist / np.sum(hist) if np.sum(hist) > 0 else hist + 1e-10 + + def _conditional_entropy(self, joint, marginal): + """Calculate conditional entropy H(Y|X) = H(X,Y) - H(X).""" + eps = 1e-10 + + if np.sum(joint) == 0 or np.sum(marginal) == 0: + return 0.0 + + joint = np.clip(joint, eps, 1.0) + marginal = np.clip(marginal, eps, 1.0) + + H_joint = -np.sum(joint * np.log(joint + eps)) + H_marginal = -np.sum(marginal * np.log(marginal + eps)) + + return H_joint - H_marginal + + + + + + +