diff --git a/aeon/forecasting/stats/__init__.py b/aeon/forecasting/stats/__init__.py index 79f3acab48..6f6d38c200 100644 --- a/aeon/forecasting/stats/__init__.py +++ b/aeon/forecasting/stats/__init__.py @@ -1,13 +1,17 @@ """Stats based forecasters.""" __all__ = [ - "ETS", "ARIMA", + "AutoTAR", + "ETS", + "TAR", "Theta", "TVPForecaster", ] from aeon.forecasting.stats._arima import ARIMA +from aeon.forecasting.stats._auto_tar import AutoTAR from aeon.forecasting.stats._ets import ETS +from aeon.forecasting.stats._tar import TAR from aeon.forecasting.stats._theta import Theta from aeon.forecasting.stats._tvp import TVPForecaster diff --git a/aeon/forecasting/stats/_ar.py b/aeon/forecasting/stats/_ar.py new file mode 100644 index 0000000000..a60d5fed62 --- /dev/null +++ b/aeon/forecasting/stats/_ar.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import numpy as np +from numba import njit + +from aeon.forecasting.base import BaseForecaster, IterativeForecastingMixin + + +class AR(BaseForecaster, IterativeForecastingMixin): + r"""Autoregressive (AR) forecaster fit by OLS. + + Parameters are estimated by ordinary least squares on + the lag-matrix design. Optionally, the AR order can be *selected* by scanning + orders ``0..p_max`` with an information criterion (AIC by default), reusing + the same lag matrix and OLS routine for efficiency (no nonlinear optimisation). + + Parameters + ---------- + ar_order : int | None, default=None + If an ``int``, fit a fixed-order AR(p) with that order. + If ``None``, the order is selected by scanning ``0..p_max`` and choosing + the minimum information criterion (``criterion``). + p_max : int, default=10 + Maximum order to consider when ``ar_order is None``. + criterion : {"AIC", "BIC", "AICc"}, default="AIC" + Information criterion for order selection when scanning. + demean : bool, default=True + If ``True``, subtract the training mean before fitting (common practice). + If ``False``, an intercept is estimated in OLS. + + Attributes + ---------- + p_ : int + Selected/used AR order. + intercept_ : float + Estimated intercept term (0.0 if ``demean=True``). + coef_ : np.ndarray of shape (p_,) + AR coefficients for lags 1..p_. + forecast_ : float + One-step-ahead forecast from the end of training. + params_ : dict + Snapshot including order-selection diagnostics. + + Notes + ----- + *Design alignment.* For a chosen maximum lag ``maxlag``, the design rows + correspond to times ``t = maxlag .. n-1``. For order ``p``, we use the first + ``p`` columns of the lag matrix ``[y_{t-1}, ..., y_{t-maxlag}]``. + + The OLS implementation uses normal equations with an explicit intercept term + (unless ``demean=True``), returning the residual sum of squares for criterion + computation: + + ``AIC = n_eff * log(max(RSS/n_eff, tiny)) + 2*k``, where ``k = p + 1`` if an + intercept is fit, else ``k = p``. + + """ + + def __init__( + self, + ar_order: int | None = None, + *, + p_max: int = 10, + criterion: str = "AIC", + demean: bool = True, + ) -> None: + self.ar_order = ar_order + self.p_max = p_max + self.criterion = criterion + self.demean = demean + super().__init__(horizon=1, axis=1) + + # --------------------------------------------------------------------- + # aeon required internals + # --------------------------------------------------------------------- + def _fit(self, y: np.ndarray, exog: np.ndarray | None = None) -> AR: + self._validate_params() + y = np.asarray(y, dtype=np.float64).squeeze() + if y.ndim != 1: + raise ValueError("y must be a 1D array-like") + y = np.ascontiguousarray(y) + n = y.shape[0] + + # centring (if requested) + if self.demean: + self._y_mean_ = float(y.mean()) + yc = y - self._y_mean_ + fit_intercept = False + else: + self._y_mean_ = 0.0 + yc = y + fit_intercept = True + + # Resolve order and build lag matrix up to needed maxlag + if self.ar_order is None: + if self.p_max < 0: + raise ValueError("p_max must be >= 0 when ar_order=None") + maxlag = int(self.p_max) + else: + if self.ar_order < 0: + raise ValueError("ar_order must be >= 0") + maxlag = int(self.ar_order) + + if n <= maxlag: + raise RuntimeError( + f"Not enough observations (n={n}) for maxlag={maxlag}. Provide more " + f"data or lower order." + ) + + X_full = _make_lag_matrix(yc, maxlag) # shape (rows, maxlag) + y_resp = yc[maxlag:] + rows = y_resp.shape[0] + + # If fixed order + if self.ar_order is not None: + p = int(self.ar_order) + if p == 0: + # intercept-only (if fit_intercept) or mean-zero (demeaned) + i, b, rss = _ols_fit_with_rss( + np.empty((rows, 0)), y_resp, fit_intercept + ) + else: + i, b, rss = _ols_fit_with_rss(X_full[:, :p], y_resp, fit_intercept) + self.p_ = p + self.intercept_ = float(i) + self.coef_ = np.ascontiguousarray(b, dtype=np.float64) + crit_value = _criterion_value(self.criterion, rss, rows, p, fit_intercept) + self.params_ = { + "selection": { + "mode": "fixed", + "criterion": self.criterion, + "value": float(crit_value), + }, + "order": int(self.p_), + } + else: + # Scan 0..p_max using shared design and OLS + best = ( + np.inf, + 0, + 0.0, + np.zeros(0, dtype=np.float64), + np.inf, + ) # (crit, p, i, b, rss) + for p in range(0, maxlag + 1): + if p == 0: + i, b, rss = _ols_fit_with_rss( + np.empty((rows, 0)), y_resp, fit_intercept + ) + else: + i, b, rss = _ols_fit_with_rss(X_full[:, :p], y_resp, fit_intercept) + crit = _criterion_value(self.criterion, rss, rows, p, fit_intercept) + if crit < best[0]: + best = (crit, p, i, b.copy(), rss) + crit, p, i, b, rss = best + self.p_ = int(p) + self.intercept_ = float(i) + self.coef_ = np.ascontiguousarray(b, dtype=np.float64) + self.params_ = { + "selection": { + "mode": "scan", + "criterion": self.criterion, + "value": float(crit), + "p_max": int(maxlag), + }, + "order": int(self.p_), + } + + # one-step forecast from end of training + self.forecast_ = _ar_predict(yc, self.intercept_, self.coef_, self.p_) + return self + + def _predict(self, y: np.ndarray, exog: np.ndarray | None = None) -> float: + y = np.asarray(y, dtype=np.float64).squeeze() + if y.ndim != 1: + raise ValueError("y must be a 1D array-like") + # apply the same centring used in fit + yc = y - self._y_mean_ + return _ar_predict(yc, self.intercept_, self.coef_, self.p_) + + # --------------------------------------------------------------------- + # validation helpers + # --------------------------------------------------------------------- + def _validate_params(self) -> None: + if self.ar_order is not None and not isinstance(self.ar_order, int): + raise TypeError("ar_order must be an int or None") + if not isinstance(self.p_max, int) or self.p_max < 0: + raise TypeError("p_max must be a non-negative int") + if self.criterion not in {"AIC", "BIC", "AICc"}: + raise ValueError("criterion must be one of {'AIC','BIC','AICc'}") + if not isinstance(self.demean, (bool, np.bool_)): + raise TypeError("demean must be a bool") + + +# ============================ shared Numba utilities ============================ + + +@njit(cache=True, fastmath=True) +def _make_lag_matrix(y: np.ndarray, maxlag: int) -> np.ndarray: + """Lag matrix with columns [y_{t-1}, ..., y_{t-maxlag}] (trim='both').""" + n = y.shape[0] + rows = n - maxlag + out = np.empty((rows, maxlag), dtype=np.float64) + for i in range(rows): + base = maxlag + i + for k in range(maxlag): + out[i, k] = y[base - (k + 1)] + return out + + +@njit(cache=True, fastmath=True) +def _ols_fit_with_rss( + X: np.ndarray, y: np.ndarray, fit_intercept: bool +) -> tuple[float, np.ndarray, float]: + """OLS via normal equations; return (intercept, coef, rss). + + If ``fit_intercept`` is ``False``, the intercept is forced to 0 and the + returned value is 0.0. Coefficients will have shape ``(n_features,)``. + """ + n_samples, n_features = X.shape + + if fit_intercept: + Xb = np.empty((n_samples, n_features + 1), dtype=np.float64) + Xb[:, 0] = 1.0 + if n_features: + Xb[:, 1:] = X + XtX = Xb.T @ Xb + Xty = Xb.T @ y + beta = np.linalg.solve(XtX, Xty) + pred = Xb @ beta + resid = y - pred + rss = float(resid @ resid) + return float(beta[0]), beta[1:], rss + else: + if n_features: + XtX = X.T @ X + Xty = X.T @ y + beta = np.linalg.solve(XtX, Xty) + pred = X @ beta + resid = y - pred + rss = float(resid @ resid) + return 0.0, beta, rss + else: + # no features, no intercept: y is mean-zero => model predicts 0 + rss = float(y @ y) + return 0.0, np.zeros(0, dtype=np.float64), rss + + +@njit(cache=True, fastmath=True) +def _criterion_value( + name: str, rss: float, n_eff: int, p: int, fit_intercept: bool +) -> float: + """Compute AIC/BIC/AICc from RSS for an AR(p) regression. + + k = number of free parameters = p + (1 if fit_intercept else 0) + """ + if n_eff <= 0: + return np.inf + sigma2 = rss / n_eff + if sigma2 <= 1e-300: + sigma2 = 1e-300 + + k = p + (1 if fit_intercept else 0) + + if name == "AIC": + return n_eff * np.log(sigma2) + 2.0 * k + elif name == "BIC": + return n_eff * np.log(sigma2) + k * np.log(max(2, n_eff)) + else: # AICc + denom = max(1.0, (n_eff - k - 1)) + return n_eff * np.log(sigma2) + 2.0 * k + (2.0 * k * (k + 1)) / denom + + +@njit(cache=True, fastmath=True) +def _ar_predict(y: np.ndarray, intercept: float, coef: np.ndarray, p: int) -> float: + """One-step-ahead forecast from the end of ``y`` for an AR(p) model.""" + if p == 0: + return intercept + val = intercept + for j in range(p): + val += coef[j] * y[-(j + 1)] + return val diff --git a/aeon/forecasting/stats/_auto_tar.py b/aeon/forecasting/stats/_auto_tar.py new file mode 100644 index 0000000000..3f9c1360fa --- /dev/null +++ b/aeon/forecasting/stats/_auto_tar.py @@ -0,0 +1,531 @@ +from __future__ import annotations + +from collections.abc import Iterable + +import numpy as np +from numba import njit + +from aeon.forecasting.base import BaseForecaster, IterativeForecastingMixin + +# Import all shared helpers from TAR (adjust import path if not packaged together) +# If these files live side-by-side, `from _tar import ...` also works. +from aeon.forecasting.stats._tar import ( + _aic_value, + _numba_predict, + _ols_fit_with_rss, + _prepare_design, + _subset_rows_cols, + _subset_target, +) + + +class AutoTAR(BaseForecaster, IterativeForecastingMixin): + r"""Threshold Autoregressive (AutoTAR) forecaster with fast threshold search. + + AutoTAR [1] assumes two regimes: when the threshold variable :math:`z_t = y_{t-d}` + is **at or below** a threshold :math:`r` (below/left regime) or **above** it + (above/right regime). The model fits separate AR(p) processes to each regime + by OLS. If not fixed by the user, the AR orders, delay, and threshold are + chosen by grid search with model selection by **AIC**. By convention, ties + are split as: below/left if :math:`z_t \le r`, above/right if :math:`z_t > r`. + + Parameters + ---------- + threshold : float | None, default=None + Fixed threshold :math:`r`. If ``None``, search over trimmed quantile candidates. + delay : int | None, default=None + Threshold delay :math:`d`. If ``None``, search over ``1..max_delay``. + ar_order : int | tuple[int, int] | None, default=None + If ``int``, same order in both regimes. If tuple, fixed ``(p_below, p_above)``. + If ``None``, search both in ``0..max_order``. + max_order : int, default=3 + Upper bound for per-regime order search when ``ar_order`` is ``None``. + max_delay : int, default=3 + Upper bound for delay search when ``delay`` is ``None``. + threshold_trim : float in [0, 0.5), default=0.15 + Fraction trimmed from each tail of the threshold variable distribution. + min_regime_frac : float in (0, 0.5], default=0.10 + Minimum fraction of rows required in each regime to accept a split. + min_points_offset : int, default=0 + Additional absolute minimum observations required in each regime. + max_threshold_candidates : int | None, default=100 + Maximum number of quantile-like candidates per (p_below,p_above,d). + If ``None``, uses all indices inside the trimmed region. + + Attributes + ---------- + threshold_ : float + Selected/used threshold. + delay_ : int + Selected/used delay. + p_below_ : int + AR order in the below/left regime. + p_above_ : int + AR order in the above/right regime. + intercept_below_, coef_below_ : float, np.ndarray + OLS parameters for the below/left regime. + intercept_above_, coef_above_ : float, np.ndarray + OLS parameters for the above/right regime. + params_ : dict + Snapshot of selected parameters and AIC. + forecast_ : float + One-step-ahead forecast from end of training. + + References + ---------- + .. [1] Tong, H., & Lim, K. S. (1980). + Threshold autoregression, limit cycles and cyclical data. + *Journal of the Royal Statistical Society: Series B*, 42(3), 245–292. + """ + + def __init__( + self, + threshold: float | None = None, + delay: int | None = None, + ar_order: int | tuple[int, int] | None = None, + *, + max_order: int = 3, + max_delay: int = 3, + threshold_trim: float = 0.15, + min_regime_frac: float = 0.10, + min_points_offset: int = 0, + max_threshold_candidates: int | None = 100, + ) -> None: + self.threshold = threshold + self.delay = delay + self.ar_order = ar_order + self.max_order = int(max_order) + self.max_delay = int(max_delay) + self.threshold_trim = float(threshold_trim) + self.min_regime_frac = float(min_regime_frac) + self.min_points_offset = int(min_points_offset) + self.max_threshold_candidates = max_threshold_candidates + super().__init__(horizon=1, axis=1) + + def _iter_orders(self) -> Iterable[tuple[int, int]]: + if isinstance(self.ar_order, tuple): + yield self.ar_order[0], self.ar_order[1] + return + if isinstance(self.ar_order, int): + p = self.ar_order + yield p, p + return + for pL in range(self.max_order + 1): + for pR in range(self.max_order + 1): + yield pL, pR + + def _iter_delays(self) -> Iterable[int]: + if isinstance(self.delay, int): + yield self.delay + elif self.delay is None: + yield from range(1, self.max_delay + 1) + else: + raise ValueError("delay must be int or None") + + def _fit(self, y: np.ndarray, exog: np.ndarray | None = None) -> AutoTAR: + self._validate_params() + y = np.ascontiguousarray(np.asarray(y, dtype=np.float64)).squeeze() + n = y.shape[0] + + best_aic = np.inf + found = False + fixed_r = self.threshold + + for pL, pR in self._iter_orders(): + base_maxlag = max(pL, pR) + for d in self._iter_delays(): + maxlag = max(base_maxlag, d) + if n <= maxlag + 1: + continue + + if fixed_r is None: + cap = ( + -1 + if (self.max_threshold_candidates is None) + else self.max_threshold_candidates + ) + r, aic_val, iL, bL, iR, bR, _, _ = _numba_threshold_search( + y, + pL, + pR, + d, + self.threshold_trim, + self.min_regime_frac, + self.min_points_offset, + cap, + ) + else: + # Evaluate a single fixed threshold with AIC. + X_full, y_resp, z = _prepare_design(y, maxlag, d) + thr = float(fixed_r) + mask_R = z > thr + nR = int(mask_R.sum()) + nL = y_resp.shape[0] - nR + min_per = max( + int(np.ceil(self.min_regime_frac * y_resp.shape[0])), + self.min_points_offset, + pL + 1, + pR + 1, + ) + if (nL < min_per) or (nR < min_per): + continue + XL = ( + _subset_rows_cols(X_full, mask_R, False, pL) + if pL > 0 + else np.empty((nL, 0)) + ) + XR = ( + _subset_rows_cols(X_full, mask_R, True, pR) + if pR > 0 + else np.empty((nR, 0)) + ) + yL = _subset_target(y_resp, mask_R, False) + yR = _subset_target(y_resp, mask_R, True) + iL, bL, rssL = _ols_fit_with_rss(XL, yL) + iR, bR, rssR = _ols_fit_with_rss(XR, yR) + rss = rssL + rssR + kpars = (1 + pL) + (1 + pR) + aic_val = _aic_value(rss, y_resp.shape[0], kpars) + r = thr + + if aic_val < best_aic: + best_aic = aic_val + self.p_below_ = pL + self.p_above_ = pR + self.delay_ = d + self.threshold_ = r + self.intercept_below_ = iL + self.coef_below_ = bL + self.intercept_above_ = iR + self.coef_above_ = bR + found = True + + if not found: + raise RuntimeError( + "No valid AutoTAR fit found. Consider relaxing trims or widening the " + "search space." + ) + + self.forecast_ = _numba_predict( + y, + self.delay_, + self.threshold_, + self.intercept_below_, + self.coef_below_, + self.p_below_, + self.intercept_above_, + self.coef_above_, + self.p_above_, + ) + + self.params_ = { + "threshold": self.threshold_, + "delay": self.delay_, + "regime_below": { + "order": self.p_below_, + "intercept": self.intercept_below_, + "coef": self.coef_below_, + }, + "regime_above": { + "order": self.p_above_, + "intercept": self.intercept_above_, + "coef": self.coef_above_, + }, + "selection": {"criterion": "AIC", "value": best_aic}, + } + return self + + def _predict(self, y: np.ndarray, exog: np.ndarray | None = None) -> float: + y = np.ascontiguousarray(np.asarray(y, dtype=np.float64)).squeeze() + return _numba_predict( + y, + self.delay_, + self.threshold_, + self.intercept_below_, + self.coef_below_, + self.p_below_, + self.intercept_above_, + self.coef_above_, + self.p_above_, + ) + + def _validate_params(self) -> None: + """Validate constructor parameters for type and value ranges.""" + if not isinstance(self.max_order, int) or self.max_order < 0: + raise TypeError("max_order must be an int >= 0") + if not isinstance(self.max_delay, int) or self.max_delay < 1: + raise TypeError("max_delay must be an int >= 1") + + if self.ar_order is not None: + if isinstance(self.ar_order, int): + if self.ar_order < 0: + raise ValueError("ar_order int must be >= 0") + elif isinstance(self.ar_order, tuple): + if len(self.ar_order) != 2: + raise TypeError( + "ar_order tuple must have length 2: (p_below, p_above)" + ) + pL, pR = self.ar_order + if not (isinstance(pL, int) and isinstance(pR, int)): + raise TypeError("ar_order tuple entries must be ints") + if pL < 0 or pR < 0: + raise ValueError("ar_order tuple entries must be >= 0") + else: + raise TypeError("ar_order must be int, (int,int), or None") + + if self.delay is not None: + if not isinstance(self.delay, int) or self.delay < 1: + raise TypeError("delay must be an int >= 1 or None") + + if self.threshold is not None: + if not isinstance(self.threshold, (int, float, np.floating)): + raise TypeError("threshold must be a real number or None") + if not np.isfinite(self.threshold): + raise ValueError("threshold must be finite") + + if not (0.0 <= self.threshold_trim < 0.5): + raise ValueError("threshold_trim must be in [0, 0.5)") + if not (0.0 < self.min_regime_frac <= 0.5): + raise ValueError("min_regime_frac must be in (0, 0.5]") + if not isinstance(self.min_points_offset, int) or self.min_points_offset < 0: + raise TypeError("min_points_offset must be an int >= 0") + if self.max_threshold_candidates is not None: + if ( + not isinstance(self.max_threshold_candidates, int) + or self.max_threshold_candidates < 1 + ): + raise TypeError("max_threshold_candidates must be int >= 1 or None") + + +# ============================ AutoTAR-specific kernel ============================ + + +@njit(cache=True, fastmath=True) +def _numba_threshold_search( + y: np.ndarray, + p_below: int, + p_above: int, + d: int, + trim_frac: float, + min_frac: float, + min_offset: int, + max_cands: int, # <= 0 → use all candidates in trimmed span +) -> tuple[float, float, float, np.ndarray, float, np.ndarray, int, int]: + """Search threshold using quantile-capped candidates + prefix/suffix stats. + + Below-regime means z_t <= r, above-regime means z_t > r. + """ + maxlag = max(p_below, p_above, d) + X_full, y_resp, z = _prepare_design(y, maxlag, d) + rows = y_resp.shape[0] + if rows <= 0: + return ( + 0.0, + np.inf, + 0.0, + np.empty(0), + 0.0, + np.empty(0), + 0, + 0, + ) + + # Augmented per-row designs (intercept + lags for each regime) + X_below_aug = np.empty((rows, p_below + 1), dtype=np.float64) + X_above_aug = np.empty((rows, p_above + 1), dtype=np.float64) + for i in range(rows): + X_below_aug[i, 0] = 1.0 + X_above_aug[i, 0] = 1.0 + for c in range(p_below): + X_below_aug[i, c + 1] = X_full[i, c] + for c in range(p_above): + X_above_aug[i, c + 1] = X_full[i, c] + + order = np.argsort(z) + z_sorted = z[order] + y_sorted = y_resp[order] + X_below_sorted = X_below_aug[order, :] + X_above_sorted = X_above_aug[order, :] + + lower = int(np.floor(trim_frac * rows)) + upper = rows - lower + if upper <= lower + 1: + # Fallback: evaluate median split with full OLS + r_med = np.median(z_sorted) + mask_above = z > r_med + n_above = int(mask_above.sum()) + n_below = rows - n_above + if (n_below == 0) or (n_above == 0): + return ( + r_med, + np.inf, + 0.0, + np.empty(0), + 0.0, + np.empty(0), + n_below, + n_above, + ) + + Xb = ( + _subset_rows_cols(X_full, mask_above, False, p_below) + if p_below > 0 + else np.empty((n_below, 0)) + ) + Xa = ( + _subset_rows_cols(X_full, mask_above, True, p_above) + if p_above > 0 + else np.empty((n_above, 0)) + ) + yb = _subset_target(y_resp, mask_above, False) + ya = _subset_target(y_resp, mask_above, True) + + i_below, b_below, rss_below = _ols_fit_with_rss(Xb, yb) + i_above, b_above, rss_above = _ols_fit_with_rss(Xa, ya) + + k = (1 + p_below) + (1 + p_above) + aic_val = _aic_value(rss_below + rss_above, rows, k) + return ( + r_med, + aic_val, + i_below, + b_below, + i_above, + b_above, + n_below, + n_above, + ) + + min_per = max( + int(np.ceil(min_frac * rows)), + min_offset, + p_below + 1, + p_above + 1, + ) + + # Candidate indices inside trimmed span + span = upper - lower + m = span if (max_cands <= 0) else (max_cands if span > max_cands else span) + idx = np.empty(m, dtype=np.int64) + if m == 1: + idx[0] = (lower + upper - 1) // 2 + else: + for j in range(m): + pos = lower + (j * (span - 1)) / (m - 1) + k = int(np.floor(pos + 0.5)) + if k < lower: + k = lower + if k > (upper - 1): + k = upper - 1 + idx[j] = k + # Deduplicate adjacent duplicates + w = 1 + for j in range(1, m): + if idx[j] != idx[w - 1]: + idx[w] = idx[j] + w += 1 + m = w + + # Prefix/suffix sufficient statistics + Sxx_below = np.zeros((p_below + 1, p_below + 1), dtype=np.float64) + Sxy_below = np.zeros((p_below + 1,), dtype=np.float64) + Syy_below = 0.0 + n_below = 0 + + Sxx_above = np.zeros((p_above + 1, p_above + 1), dtype=np.float64) + Sxy_above = np.zeros((p_above + 1,), dtype=np.float64) + Syy_above = 0.0 + for i in range(rows): + x_above_row = X_above_sorted[i] + yi = y_sorted[i] + for r0 in range(p_above + 1): + v0 = x_above_row[r0] + for r1 in range(p_above + 1): + Sxx_above[r0, r1] += v0 * x_above_row[r1] + for r0 in range(p_above + 1): + Sxy_above[r0] += x_above_row[r0] * yi + Syy_above += yi * yi + n_above = rows + + ridge = 1e-12 + best_aic = np.inf + best_r = 0.0 + + best_i_below = 0.0 + best_b_below = np.empty(0, dtype=np.float64) + best_i_above = 0.0 + best_b_above = np.empty(0, dtype=np.float64) + best_n_below = 0 + best_n_above = 0 + + prev = -1 + for j in range(m): + cut = idx[j] + # Move rows (prev+1 .. cut) from ABOVE → BELOW + for t in range(prev + 1, cut + 1): + x_below_row = X_below_sorted[t] + x_above_row = X_above_sorted[t] + yi = y_sorted[t] + + # Add to BELOW + for r0 in range(p_below + 1): + v0 = x_below_row[r0] + for r1 in range(p_below + 1): + Sxx_below[r0, r1] += v0 * x_below_row[r1] + for r0 in range(p_below + 1): + Sxy_below[r0] += x_below_row[r0] * yi + Syy_below += yi * yi + n_below += 1 + + # Remove from ABOVE + for r0 in range(p_above + 1): + v0 = x_above_row[r0] + for r1 in range(p_above + 1): + Sxx_above[r0, r1] -= v0 * x_above_row[r1] + for r0 in range(p_above + 1): + Sxy_above[r0] -= x_above_row[r0] * yi + Syy_above -= yi * yi + n_above -= 1 + + prev = cut + + if (n_below < min_per) or (n_above < min_per): + continue + + # Solve (copies avoid accumulating ridge across candidates) + S_below = Sxx_below.copy() + for r0 in range(p_below + 1): + S_below[r0, r0] += ridge + beta_below = np.linalg.solve(S_below, Sxy_below) + + S_above = Sxx_above.copy() + for r0 in range(p_above + 1): + S_above[r0, r0] += ridge + beta_above = np.linalg.solve(S_above, Sxy_above) + + rss_below = Syy_below - np.dot(beta_below, Sxy_below) + rss_above = Syy_above - np.dot(beta_above, Sxy_above) + rss = rss_below + rss_above + kpars = (1 + p_below) + (1 + p_above) + aic_val = _aic_value(rss, rows, kpars) + + if aic_val < best_aic: + best_aic = aic_val + best_r = z_sorted[cut] + best_i_below = beta_below[0] + best_b_below = beta_below[1:].copy() + best_i_above = beta_above[0] + best_b_above = beta_above[1:].copy() + best_n_below = n_below + best_n_above = n_above + + return ( + best_r, + best_aic, + best_i_below, + best_b_below, + best_i_above, + best_b_above, + best_n_below, + best_n_above, + ) diff --git a/aeon/forecasting/stats/_tar.py b/aeon/forecasting/stats/_tar.py new file mode 100644 index 0000000000..caf5976e4a --- /dev/null +++ b/aeon/forecasting/stats/_tar.py @@ -0,0 +1,341 @@ +from __future__ import annotations + +import numpy as np +from numba import njit + +from aeon.forecasting.base import BaseForecaster, IterativeForecastingMixin + + +class TAR(BaseForecaster, IterativeForecastingMixin): + r"""Threshold Autoregressive (TAR) forecaster with fixed parameters. + + Two regimes split by a threshold :math:`r` on the variable :math:`z_t=y_{t-d}`: + observations with :math:`z_t \le r` follow the **below/left** AR model, and + those with :math:`z_t > r` follow the **above/right** AR model. Each regime is + fit by OLS. **No parameter optimisation/search** is performed. + + Defaults: + - ``delay=1`` + - ``ar_order=(2, 2)`` (AR(2) in each regime) + - ``threshold=None`` → set to the **median** of the aligned threshold variable + computed on the training window. + + Parameters + ---------- + threshold : float | None, default=None + Fixed threshold :math:`r`. If ``None``, it is set in ``fit`` to the median + of :math:`z_t=y_{t-d}` over the aligned training rows. + delay : int, default=1 + Threshold delay :math:`d \ge 1` for :math:`z_t = y_{t-d}`. + ar_order : int | tuple[int, int], default=(2, 2) + If ``int``, use the same AR order in both regimes. + If tuple, use ``(p_below, p_above)`` for the two regimes. + + Attributes + ---------- + threshold_ : float + The threshold actually used (either provided or the computed median). + delay_ : int + The fixed delay actually used. + p_below_, p_above_ : int + AR orders used in the below/left and above/right regimes, respectively. + intercept_below_, coef_below_ : float, np.ndarray + OLS parameters for the below/left regime (:math:`z_t \le r`). + intercept_above_, coef_above_ : float, np.ndarray + OLS parameters for the above/right regime (:math:`z_t > r`). + forecast_ : float + One-step-ahead forecast from the end of training. + params_ : dict + Snapshot of configuration and a simple AIC diagnostic. + + References + ---------- + Tong, H., & Lim, K. S. (1980). + Threshold autoregression, limit cycles and cyclical data. + *JRSS-B*, 42(3), 245–292. + """ + + def __init__( + self, + threshold: float | None = None, + delay: int = 1, + ar_order: int | tuple[int, int] = (2, 2), + ) -> None: + self.threshold = threshold + self.delay = delay + self.ar_order = ar_order + super().__init__(horizon=1, axis=1) + + def _fit(self, y: np.ndarray, exog: np.ndarray | None = None) -> TAR: + self._validate_params() + y = y.squeeze() + y = np.ascontiguousarray(np.asarray(y, dtype=np.float64)) + n = y.shape[0] + + # Resolve orders + if isinstance(self.ar_order, int): + p_below = p_above = int(self.ar_order) + else: + p_below = int(self.ar_order[0]) + p_above = int(self.ar_order[1]) + + maxlag = max(p_below, p_above, self.delay) + if n <= maxlag: + raise RuntimeError( + f"Not enough observations (n={n}) for maxlag={maxlag}. " + "Provide more data or lower delay/order." + ) + + # Design matrices aligned to t = maxlag .. n-1 + X_full = _make_lag_matrix(y, maxlag) # shape (rows, maxlag) + y_resp = y[maxlag:] # shape (rows,) + rows = y_resp.shape[0] + + # Threshold variable z_t = y_{t-d} + base = maxlag - self.delay + z = y[base : base + rows] + + # Default threshold to the median of z if not provided + if self.threshold is not None: + thr = self.threshold + else: + thr = np.median(z) + + # Regime mask and sizes + mask_R = z > thr + nR = int(mask_R.sum()) + nL = rows - nR + + minL = p_below + 1 + minR = p_above + 1 + if nL < minL or nR < minR: + raise RuntimeError( + "Insufficient data per regime at the chosen threshold: " + f"below n={nL} (need ≥ {minL}), above n={nR} (need ≥ {minR}). " + "Consider providing a different threshold, delay, or orders." + ) + + # Per-regime designs + if p_below > 0: + XL = X_full[~mask_R, :p_below] + else: + XL = np.empty((nL, 0), dtype=np.float64) + if p_above > 0: + XR = X_full[mask_R, :p_above] + else: + XR = np.empty((nR, 0), dtype=np.float64) + yL = y_resp[~mask_R] + yR = y_resp[mask_R] + + # OLS fits + iL, bL, rssL = _ols_fit_with_rss(XL, yL) + iR, bR, rssR = _ols_fit_with_rss(XR, yR) + + # Persist learned params + self.threshold_ = thr + self.delay_ = int(self.delay) + self.p_below_ = p_below + self.p_above_ = p_above + self.intercept_below_ = float(iL) + self.coef_below_ = np.ascontiguousarray(bL, dtype=np.float64) + self.intercept_above_ = float(iR) + self.coef_above_ = np.ascontiguousarray(bR, dtype=np.float64) + + # 1-step forecast + self.forecast_ = _numba_predict( + y, + self.delay_, + self.threshold_, + self.intercept_below_, + self.coef_below_, + self.p_below_, + self.intercept_above_, + self.coef_above_, + self.p_above_, + ) + + # Simple AIC diagnostic (sum RSS; k counts both regimes incl. intercepts) + rss = rssL + rssR + k = (1 + p_below) + (1 + p_above) + aic = _aic_value(rss, rows, k) + self.params_ = { + "threshold": self.threshold_, + "delay": self.delay_, + "regime_below": { + "order": self.p_below_, + "intercept": self.intercept_below_, + "coef": self.coef_below_, + "n": int(nL), + }, + "regime_above": { + "order": self.p_above_, + "intercept": self.intercept_above_, + "coef": self.coef_above_, + "n": int(nR), + }, + "selection": {"criterion": "AIC", "value": float(aic)}, + } + return self + + def _predict(self, y: np.ndarray, exog: np.ndarray | None = None) -> float: + y = np.ascontiguousarray(np.asarray(y, dtype=np.float64)).squeeze() + return _numba_predict( + y, + self.delay_, + self.threshold_, + self.intercept_below_, + self.coef_below_, + self.p_below_, + self.intercept_above_, + self.coef_above_, + self.p_above_, + ) + + def _validate_params(self) -> None: + """Validate fixed-parameter configuration for types and ranges.""" + if self.threshold is not None: + if not isinstance( + self.threshold, (int, float, np.floating) + ) or not np.isfinite(self.threshold): + raise TypeError("threshold must be a finite real number or None") + if not isinstance(self.delay, int) or self.delay < 1: + raise TypeError("delay must be an int >= 1") + if isinstance(self.ar_order, int): + if self.ar_order < 0: + raise ValueError("ar_order int must be >= 0") + elif isinstance(self.ar_order, tuple): + if len(self.ar_order) != 2: + raise TypeError("ar_order tuple must be (p_below, p_above)") + pL, pR = self.ar_order + if not (isinstance(pL, int) and isinstance(pR, int)): + raise TypeError("ar_order tuple entries must be ints") + if pL < 0 or pR < 0: + raise ValueError("ar_order tuple entries must be >= 0") + else: + raise TypeError("ar_order must be int or (int, int)") + + +# ============================ shared Numba utilities ============================ + + +@njit(cache=True, fastmath=True) +def _make_lag_matrix(y: np.ndarray, maxlag: int) -> np.ndarray: + """Build lag matrix with columns [y_{t-1}, ..., y_{t-maxlag}] (trim='both').""" + n = y.shape[0] + rows = n - maxlag + out = np.empty((rows, maxlag), dtype=np.float64) + for i in range(rows): + base = maxlag + i + for k in range(maxlag): + out[i, k] = y[base - (k + 1)] + return out + + +@njit(cache=True, fastmath=True) +def _prepare_design( + y: np.ndarray, maxlag: int, d: int +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """Build lagged design X, response y_resp, and threshold var z=y_{t-d} (aligned).""" + X_full = _make_lag_matrix(y, maxlag) + y_resp = y[maxlag:] + rows = y_resp.shape[0] + z = np.empty(rows, dtype=np.float64) + base = maxlag - d + for i in range(rows): + z[i] = y[base + i] # y_{t-d} + return X_full, y_resp, z + + +@njit(cache=True, fastmath=True) +def _ols_fit_with_rss(X: np.ndarray, y: np.ndarray) -> tuple[float, np.ndarray, float]: + """OLS via normal equations; return (intercept, coef, rss).""" + n_samples, n_features = X.shape + Xb = np.empty((n_samples, n_features + 1), dtype=np.float64) + Xb[:, 0] = 1.0 + if n_features: + Xb[:, 1:] = X + XtX = Xb.T @ Xb + Xty = Xb.T @ y + beta = np.linalg.solve(XtX, Xty) + pred = Xb @ beta + resid = y - pred + rss = float(resid @ resid) + return float(beta[0]), beta[1:], rss + + +@njit(cache=True, fastmath=True) +def _subset_rows_cols( + X: np.ndarray, mask_true: np.ndarray, choose_true: bool, keep_cols: int +) -> np.ndarray: + """Select rows by mask and first keep_cols columns (Numba-friendly).""" + rows = 0 + for i in range(mask_true.size): + if mask_true[i] == choose_true: + rows += 1 + out = np.empty((rows, keep_cols), dtype=np.float64) + r = 0 + for i in range(mask_true.size): + if mask_true[i] == choose_true: + for c in range(keep_cols): + out[r, c] = X[i, c] + r += 1 + return out + + +@njit(cache=True, fastmath=True) +def _subset_target( + y: np.ndarray, mask_true: np.ndarray, choose_true: bool +) -> np.ndarray: + """Select target rows by mask (Numba-friendly).""" + rows = 0 + for i in range(mask_true.size): + if mask_true[i] == choose_true: + rows += 1 + out = np.empty(rows, dtype=np.float64) + r = 0 + for i in range(mask_true.size): + if mask_true[i] == choose_true: + out[r] = y[i] + r += 1 + return out + + +@njit(cache=True, fastmath=True) +def _aic_value(rss: float, n_eff: int, k: int) -> float: + """AIC ∝ n*log(max(RSS/n, tiny)) + 2k.""" + if n_eff <= 0: + return np.inf + sigma2 = rss / n_eff + if sigma2 <= 1e-300: + sigma2 = 1e-300 + return n_eff * np.log(sigma2) + 2.0 * k + + +@njit(cache=True, fastmath=True) +def _numba_predict( + y: np.ndarray, + delay: int, + thr: float, + iL: float, + bL: np.ndarray, + pL: int, + iR: float, + bR: np.ndarray, + pR: int, +) -> float: + """One-step forecast from end of y with fitted TAR params.""" + regime_right = y[-delay] > thr + if regime_right: + if pR == 0: + return iR + val = iR + for j in range(pR): + val += bR[j] * y[-(j + 1)] + return val + else: + if pL == 0: + return iL + val = iL + for j in range(pL): + val += bL[j] * y[-(j + 1)] + return val diff --git a/aeon/forecasting/stats/_tvp.py b/aeon/forecasting/stats/_tvp.py index fd0168ceb5..57ddd7309b 100644 --- a/aeon/forecasting/stats/_tvp.py +++ b/aeon/forecasting/stats/_tvp.py @@ -9,7 +9,7 @@ ) -class TVPForecaster(BaseForecaster, DirectForecastingMixin, IterativeForecastingMixin): +class TVP(BaseForecaster, DirectForecastingMixin, IterativeForecastingMixin): r"""Time-Varying Parameter (TVP) Forecaster using Kalman filter as described in [1]. This forecaster models the target series using a time-varying linear autoregression: diff --git a/aeon/forecasting/stats/tests/test_tar.py b/aeon/forecasting/stats/tests/test_tar.py new file mode 100644 index 0000000000..825ac7bcdd --- /dev/null +++ b/aeon/forecasting/stats/tests/test_tar.py @@ -0,0 +1,332 @@ +"""Tests for TAR (fixed) and AutoTAR (search) forecasters.""" + +from __future__ import annotations + +import math + +import numpy as np +import pytest + +from aeon.forecasting.stats import TAR, AutoTAR + +# --------------------------- helpers --------------------------- + + +def _gen_tar_series( + n: int, + phi_L: float = 0.2, + phi_R: float = 0.9, + r: float = 0.0, + d: int = 1, + sigma: float = 1.0, + seed: int = 123, +) -> np.ndarray: + """Generate a synthetic TAR(1,1) time series.""" + rng = np.random.default_rng(seed) + y = np.zeros(n, dtype=float) + eps = rng.normal(scale=sigma, size=n) + for t in range(1, n): + z = y[t - d] if t - d >= 0 else -np.inf + phi = phi_R if z > r else phi_L + y[t] = phi * y[t - 1] + eps[t] + return y + + +def _aligned_z(y: np.ndarray, delay: int, pL: int, pR: int) -> np.ndarray: + """Compute aligned threshold variable z_t = y_{t-d} for given (pL, pR, d).""" + maxlag = max(delay, pL, pR) + rows = y.shape[0] - maxlag + base = maxlag - delay + return y[base : base + rows] + + +def _compute_trim_band( + y: np.ndarray, delay: int, pL: int, pR: int, trim: float +) -> tuple[float, float]: + """Compute the [low, high] values of the trimmed threshold candidate band.""" + z = _aligned_z(y, delay, pL, pR) + z_sorted = np.sort(z) + rows = z_sorted.shape[0] + lower = int(np.floor(trim * rows)) + upper = rows - lower + lower_val = z_sorted[lower] if rows > 0 and lower < rows else -np.inf + upper_val = z_sorted[upper - 1] if rows > 0 and (upper - 1) >= 0 else np.inf + return lower_val, upper_val + + +# ============================ AutoTAR (search) ============================ + + +def test_autotar_fit_and_basic_attrs_exist(): + """AutoTAR fitting sets core learned attributes and forecast_.""" + y = _gen_tar_series(500, phi_L=0.2, phi_R=0.8, r=0.0, d=1, sigma=0.8, seed=7) + f = AutoTAR(threshold=None, delay=None, ar_order=None, max_order=3, max_delay=3) + f.fit(y) + for attr in [ + "threshold_", + "delay_", + "p_below_", + "p_above_", + "intercept_below_", + "coef_below_", + "intercept_above_", + "coef_above_", + ]: + assert hasattr(f, attr) + assert isinstance(f.forecast_, float) + assert isinstance(f.params_, dict) + assert f.params_["selection"]["criterion"] == "AIC" + + +def test_autotar_search_ranges_respected(): + """AutoTAR search-selected orders and delay stay within bounds.""" + y = _gen_tar_series(500, seed=1) + f = AutoTAR(threshold=None, delay=None, ar_order=None, max_order=2, max_delay=2) + f.fit(y) + assert 0 <= f.p_below_ <= 2 + assert 0 <= f.p_above_ <= 2 + assert 1 <= f.delay_ <= 2 + + +def test_autotar_fixed_params_branch_respected(): + """AutoTAR fixed threshold + fixed (pL,pR,d) branch is respected.""" + y = _gen_tar_series(600, phi_L=0.1, phi_R=0.7, r=0.25, d=2, seed=3) + f = AutoTAR(threshold=0.25, delay=2, ar_order=(1, 1)) + f.fit(y) + assert math.isclose(f.threshold_, 0.25, rel_tol=0, abs_tol=0) + assert f.delay_ == 2 + assert f.p_below_ == 1 and f.p_above_ == 1 + + +def test_autotar_threshold_within_trim_band(): + """AutoTAR learned threshold lies within the trimmed candidate range.""" + y = _gen_tar_series(400, phi_L=0.3, phi_R=0.85, r=0.0, d=1, sigma=0.9, seed=11) + f = AutoTAR( + threshold=None, + delay=None, + ar_order=None, + max_order=3, + max_delay=3, + threshold_trim=0.15, + ) + f.fit(y) + low, high = _compute_trim_band( + y, delay=f.delay_, pL=f.p_below_, pR=f.p_above_, trim=0.15 + ) + assert low <= f.threshold_ <= high + + +def test_autotar_predict_matches_internal_one_step(): + """AutoTAR one-step prediction matches forecast_ from fit.""" + y = _gen_tar_series(150, seed=9) + f = AutoTAR(threshold=None, delay=None, ar_order=None) + f.fit(y) + yhat_internal = f._predict(y) + assert isinstance(yhat_internal, float) + assert np.isfinite(f.forecast_) + assert math.isclose(f.forecast_, yhat_internal, rel_tol=1e-12, abs_tol=1e-12) + + +@pytest.mark.parametrize( + "phi_L,phi_R,r,d", + [ + (0.1, 0.8, 0.0, 1), + (0.3, 0.9, 0.25, 1), + (0.2, 0.7, -0.2, 2), + ], +) +def test_autotar_parameter_recovery_is_reasonable( + phi_L: float, phi_R: float, r: float, d: int +): + """AutoTAR fitted coefficients are close for synthetic TAR(1,1).""" + y = _gen_tar_series(600, phi_L=phi_L, phi_R=phi_R, r=r, d=d, sigma=0.8, seed=123) + f = AutoTAR( + threshold=None, delay=None, ar_order=None, max_order=1, max_delay=max(1, d) + ) + f.fit(y) + if f.p_below_ >= 1: + assert abs(float(f.coef_below_[0]) - phi_L) < 0.30 + if f.p_above_ >= 1: + assert abs(float(f.coef_above_[0]) - phi_R) < 0.30 + assert 1 <= f.delay_ <= max(3, d) + + +def test_autotar_max_threshold_candidates_caps_workload(): + """AutoTAR: setting a tiny candidate cap still fits and records AIC.""" + y = _gen_tar_series(400, seed=21) + f = AutoTAR(threshold=None, delay=None, ar_order=None, max_threshold_candidates=10) + f.fit(y) + assert hasattr(f, "threshold_") + assert np.isfinite(f.params_["selection"]["value"]) + + +def test_autotar_fixed_threshold_branch_and_tie_rule(): + """AutoTAR: when threshold is fixed and z == r, tie goes to below/left regime.""" + y = _gen_tar_series(800, seed=5) + delay = 2 + thr = float(y[-delay]) # force tie at prediction time + f = AutoTAR(threshold=thr, delay=delay, ar_order=(1, 1)) + f.fit(y) + + y_arr = np.asarray(y, dtype=float) + below_pred = f.intercept_below_ + if f.p_below_ >= 1: + below_pred += float(f.coef_below_[0]) * y_arr[-1] + if f.p_below_ >= 2: + below_pred += float(f.coef_below_[1]) * y_arr[-2] + yhat = f._predict(y) + assert math.isclose(yhat, below_pred, rel_tol=1e-12, abs_tol=1e-12) + + +def test_autotar_none_max_threshold_candidates_uses_full_span(): + """AutoTAR: max_threshold_candidates=None uses all trimmed candidates.""" + y = _gen_tar_series(450, seed=17) + f = AutoTAR( + threshold=None, + delay=None, + ar_order=None, + max_threshold_candidates=None, + threshold_trim=0.1, + ) + f.fit(y) + assert np.isfinite(f.params_["selection"]["value"]) + + +def test_autotar_no_valid_fit_raises(): + """AutoTAR: if every combo is invalid (e.g., delay too large), fit raises.""" + y = _gen_tar_series(50, seed=2) + f = AutoTAR(threshold=None, delay=200, ar_order=(0, 0)) + with pytest.raises(RuntimeError): + f.fit(y) + + +@pytest.mark.parametrize( + "kwargs,exc", + [ + (dict(max_order=-1), TypeError), + (dict(max_delay=0), TypeError), + (dict(ar_order=(-1, 1)), ValueError), + (dict(ar_order=(1.5, 1)), TypeError), + (dict(delay=0), TypeError), + (dict(threshold=float("nan")), ValueError), + (dict(threshold="bad"), TypeError), + (dict(threshold_trim=0.55), ValueError), + (dict(min_regime_frac=0.0), ValueError), + (dict(min_points_offset=-1), TypeError), + (dict(max_threshold_candidates=0), TypeError), + ], +) +def test_autotar_validation_errors(kwargs, exc): + """AutoTAR: constructor parameter validation raises clear errors.""" + y = _gen_tar_series(200, seed=3) + f = AutoTAR(**kwargs) + with pytest.raises(exc): + f.fit(y) + + +def test_autotar_predict_accepts_list_and_ndarray(): + """AutoTAR: _predict should accept list input and ndarray input seamlessly.""" + y = _gen_tar_series(300, seed=19) + f = AutoTAR(threshold=None, delay=None, ar_order=None) + f.fit(y) + yhat1 = f._predict(y) # ndarray + yhat2 = f._predict(list(y)) # list + assert math.isclose(yhat1, yhat2, rel_tol=0, abs_tol=0) + + +def test_autotar_median_fallback_path_runs(): + """AutoTAR: force trimmed span so small it hits the median-fallback path.""" + # Make rows small relative to trim: rows ≈ n - maxlag; pick n modest, trim high. + y = _gen_tar_series(12, seed=42) + f = AutoTAR( + threshold=None, + delay=1, + ar_order=(1, 1), + threshold_trim=0.49, + max_order=1, + max_delay=1, + ) + f.fit(y) # should not crash; exercises the fallback branch + # The threshold should be close to the median of aligned z + z = _aligned_z(y, delay=f.delay_, pL=f.p_below_, pR=f.p_above_) + assert math.isclose(f.threshold_, float(np.median(z)), rel_tol=0, abs_tol=1e-12) + + +def test_autotar_single_candidate_path_runs(): + """AutoTAR: force single-candidate path (m==1) in threshold search.""" + y = _gen_tar_series(60, seed=7) + # Use a trim that makes a tiny span, then cap to 1 candidate + f = AutoTAR( + threshold=None, + delay=1, + ar_order=None, + max_order=1, + max_delay=1, + threshold_trim=0.25, + max_threshold_candidates=1, + ) + f.fit(y) + assert hasattr(f, "threshold_") and np.isfinite(f.params_["selection"]["value"]) + + +# ============================ TAR (fixed) ============================ + + +def test_tar_defaults_use_median_threshold_and_ar22(): + """TAR defaults: delay=1, ar_order=(2,2), threshold=median(z).""" + y = _gen_tar_series(200, seed=10) + f = TAR() # defaults + f.fit(y) + assert f.delay_ == 1 + assert f.p_below_ == 2 and f.p_above_ == 2 + # Check threshold equals median of aligned z + z = _aligned_z(y, delay=f.delay_, pL=f.p_below_, pR=f.p_above_) + assert math.isclose(f.threshold_, float(np.median(z)), rel_tol=0, abs_tol=1e-12) + + +def test_tar_ar_int_sets_both_and_fixed_threshold(): + """TAR: ar_order=int sets both regimes; explicit threshold respected.""" + y = _gen_tar_series(250, seed=22) + thr = 0.0 + f = TAR(threshold=thr, delay=1, ar_order=1) + f.fit(y) + assert f.p_below_ == 1 and f.p_above_ == 1 + assert math.isclose(f.threshold_, thr, rel_tol=0, abs_tol=0) + + +def test_tar_predict_matches_internal_one_step_and_types(): + """TAR: one-step prediction matches forecast_ and handles list input.""" + y = _gen_tar_series(220, seed=5) + f = TAR() # defaults use median threshold, (2,2) + f.fit(y) + yhat_nd = f._predict(y) + yhat_list = f._predict(list(y)) + assert math.isclose(f.forecast_, yhat_nd, rel_tol=0, abs_tol=0) + assert math.isclose(yhat_nd, yhat_list, rel_tol=0, abs_tol=0) + + +def test_tar_insufficient_data_per_regime_raises(): + """TAR: threshold causing empty regime should raise (identifiability).""" + y = _gen_tar_series(120, seed=99) + # Choose a very large threshold so mask_R is all False → above regime empty + f = TAR(threshold=1e9, delay=1, ar_order=(2, 2)) + with pytest.raises(RuntimeError): + f.fit(y) + + +@pytest.mark.parametrize( + "kwargs,exc", + [ + (dict(threshold="bad"), TypeError), + (dict(delay=0), TypeError), + (dict(ar_order=(-1, 1)), ValueError), + (dict(ar_order=(1,)), TypeError), + (dict(ar_order=(1.5, 1)), TypeError), + ], +) +def test_tar_validation_errors(kwargs, exc): + """TAR: validation errors for bad config.""" + y = _gen_tar_series(120, seed=3) + f = TAR(**kwargs) # type: ignore[arg-type] + with pytest.raises(exc): + f.fit(y) diff --git a/aeon/forecasting/stats/tests/test_tvp.py b/aeon/forecasting/stats/tests/test_tvp.py index 89e78aa3ff..ad1240c284 100644 --- a/aeon/forecasting/stats/tests/test_tvp.py +++ b/aeon/forecasting/stats/tests/test_tvp.py @@ -6,20 +6,20 @@ import numpy as np -from aeon.forecasting.stats._tvp import TVPForecaster +from aeon.forecasting.stats._tvp import TVP def test_direct(): """Test aeon TVP Forecaster equivalent to statsmodels.""" expected = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]) - tvp = TVPForecaster(window=5, horizon=1, var=0.01, beta_var=0.01) + tvp = TVP(window=5, horizon=1, var=0.01, beta_var=0.01) p = tvp.forecast(expected) p2 = tvp.direct_forecast(expected, prediction_horizon=5) assert p == p2[0] def test_static_ar1_convergence_to_ols(): - """Test TVPForecaster converges to the OLS solution for a static AR(1) process.""" + """Test TVP converges to the OLS solution for a static AR(1) process.""" # Simulate AR(1) data with constant parameters rng = np.random.RandomState(0) true_phi = 0.6 @@ -32,7 +32,7 @@ def test_static_ar1_convergence_to_ols(): for t in range(1, n): y[t] = true_intercept + true_phi * y[t - 1] + rng.normal(0, noise_std) # Fit with beta_var=0 (no parameter drift) and observation variance = noise_var - forecaster = TVPForecaster(window=1, horizon=1, var=noise_std**2, beta_var=0.0) + forecaster = TVP(window=1, horizon=1, var=noise_std**2, beta_var=0.0) forecaster.fit(y) beta_est = forecaster._beta # [intercept, phi] estimated # Compute static OLS estimates for comparison @@ -67,8 +67,8 @@ def test_tvp_adapts_to_changing_coefficient(): # Second half (t=100 to 199) with phi2 for t in range(100, n): y[t] = intercept + phi2 * y[t - 1] + rng.normal(0, noise_std) - # Fit TVPForecaster with nonzero beta_var to allow parameter drift - forecaster = TVPForecaster(window=1, horizon=1, var=noise_std**2, beta_var=0.1) + # Fit TVP with nonzero beta_var to allow parameter drift + forecaster = TVP(window=1, horizon=1, var=noise_std**2, beta_var=0.1) forecaster.fit(y) beta_final = forecaster._beta # Compute OLS on first and second half segments for reference diff --git a/docs/api_reference/forecasting.rst b/docs/api_reference/forecasting.rst index c721c969cf..780dc53944 100644 --- a/docs/api_reference/forecasting.rst +++ b/docs/api_reference/forecasting.rst @@ -35,5 +35,7 @@ Statistical Models ARIMA ETS + TAR + AutoTAR Theta TVPForecaster diff --git a/examples/forecasting/regression.ipynb b/examples/forecasting/regression.ipynb index 35e3b847ba..c885458d6d 100644 --- a/examples/forecasting/regression.ipynb +++ b/examples/forecasting/regression.ipynb @@ -344,7 +344,14 @@ "source": [ "## Time-Varying Parameter (TVP) Forecaster\n", "\n", - "The `aeon` RegressionForecaster defaults to fits a standard linear regression on the windowed series using the sklearn `LinearRegression` estimator. This fits the linear regression parameters with the standard least squares algorithm. An alternative approach in the forecasting literature is to fit the linear regression parameters adaptively, so that more recent observations can have a greater weight in setting the parameters. One way of doing this is to use a Kalman filter. This considers the correlation between parameters in addition to the forecasting error. This is implemented in `aeon` as the `TVPForecaster`.\n" + "The `aeon` RegressionForecaster defaults to fits a standard linear regression on the \n", + "windowed series using the sklearn `LinearRegression` estimator. This fits the linear \n", + "regression parameters with the standard least squares algorithm. An alternative \n", + "approach in the forecasting literature is to fit the linear regression parameters \n", + "adaptively, so that more recent observations can have a greater weight in setting the\n", + " parameters. One way of doing this is to use a Kalman filter. This considers the \n", + " correlation between parameters in addition to the forecasting error. This is \n", + " implemented in `aeon` as the `TVP` forecaster.\n" ], "id": "2850a18027127db0" }, @@ -357,9 +364,9 @@ }, "cell_type": "code", "source": [ - "from aeon.forecasting.stats import TVPForecaster\n", + "from aeon.forecasting.stats import TVP\n", "\n", - "tvp = TVPForecaster(window=50)\n", + "tvp = TVP(window=50)\n", "tvp.fit(y_train)\n", "pred = tvp.predict(y_train)" ], @@ -438,9 +445,11 @@ "source": [ "TVP has two parameters that control the weight given to more recent observations. These are `var` and `coeff_var`. `var` represents the variation in the data. A small value of `var`, such as the default of 0.01 indicates there is less noise in the data and recent values will have a greater effect on parameter updates. It is a bit like the learning rate used in gradient descent or reinforcement learning. A large `var` value (e.g. greater than 1.0) will mean new values will adjust the parameters less at each update.\n", "\n", - "The second parameter, `beta_var`, estimates the variation in parameters over time. It has a similar effect on updates to `var` in that it effects how much parameters are changed at each stage, but it is modelling a different type on uncertainty. `TVPRegression` maintains an estimate of the covariance of the parameters in `fit` and this is adjusted after each update and then `beta_var` is added to perturb the matrix. Small `var` means the Kalman weights.\n", + "The second parameter, `beta_var`, estimates the variation in parameters over time. It\n", + " has a similar effect on updates to `var` in that it effects how much parameters are \n", + " changed at each stage, but it is modelling a different type on uncertainty. `TVP` regression maintains an estimate of the covariance of the parameters in `fit` and this is adjusted after each update and then `beta_var` is added to perturb the matrix.\n", "\n", - "Add some examples to demonstrate the difference\n" + "\n" ], "id": "ca7da34828be1c2" }, @@ -453,7 +462,7 @@ }, "cell_type": "code", "source": [ - "tvp2 = TVPForecaster(var=1.0, beta_var=1.0, window=50)\n", + "tvp2 = TVP(var=1.0, beta_var=1.0, window=50)\n", "direct2 = tvp.direct_forecast(airline, prediction_horizon=12)\n", "plt.plot(\n", " np.arange(0, len(direct)),\n", diff --git a/pyproject.toml b/pyproject.toml index 13711c013c..6e8df23fa6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -169,7 +169,7 @@ testpaths = "aeon" doctest_optionflags = [ "NORMALIZE_WHITESPACE", "ELLIPSIS", - "FLOAT_CMP", +# "FLOAT_CMP", ] addopts = [ "--durations=20",