From cc9580a5838b9139ea44e24f21935218116dbcc8 Mon Sep 17 00:00:00 2001 From: Kevin Sheppard Date: Tue, 5 Nov 2024 17:04:14 +0000 Subject: [PATCH] TYP: Improve typing accuracy --- arch/bootstrap/base.py | 4 ++-- arch/tests/univariate/test_mean.py | 3 ++- arch/unitroot/unitroot.py | 16 +++++++------ arch/univariate/base.py | 37 ++++++++++++++++++++++++++++-- arch/univariate/mean.py | 36 ++++++++++++++++++++++++++++- arch/univariate/volatility.py | 1 + 6 files changed, 84 insertions(+), 13 deletions(-) diff --git a/arch/bootstrap/base.py b/arch/bootstrap/base.py index 5fde14c86d..5132d539a2 100644 --- a/arch/bootstrap/base.py +++ b/arch/bootstrap/base.py @@ -1271,7 +1271,7 @@ def _resample(self) -> tuple[tuple[ArrayLike, ...], dict[str, ArrayLike]]: pos_data: list[Union[NDArray, pd.DataFrame, pd.Series]] = [] for values in self._args: if isinstance(values, (pd.Series, pd.DataFrame)): - assert isinstance(indices, NDArray) + assert isinstance(indices, np.ndarray) pos_data.append(values.iloc[indices]) else: assert isinstance(values, np.ndarray) @@ -1279,7 +1279,7 @@ def _resample(self) -> tuple[tuple[ArrayLike, ...], dict[str, ArrayLike]]: named_data: dict[str, Union[NDArray, pd.DataFrame, pd.Series]] = {} for key, values in self._kwargs.items(): if isinstance(values, (pd.Series, pd.DataFrame)): - assert isinstance(indices, NDArray) + assert isinstance(indices, np.ndarray) named_data[key] = values.iloc[indices] else: assert isinstance(values, np.ndarray) diff --git a/arch/tests/univariate/test_mean.py b/arch/tests/univariate/test_mean.py index cec971d21a..86490a5583 100644 --- a/arch/tests/univariate/test_mean.py +++ b/arch/tests/univariate/test_mean.py @@ -29,6 +29,7 @@ from arch.typing import Literal from arch.univariate.base import ( ARCHModel, + ARCHModelFixedResult, ARCHModelForecast, ARCHModelResult, _align_forecast, @@ -139,7 +140,7 @@ def simulated_data(request): for a, b in itertools.product(simple_mean_models, analytic_volatility_processes) ], ) -def forecastable_model(request): +def forecastable_model(request) -> tuple[ARCHModelResult, ARCHModelFixedResult]: mod: ARCHModel vol: VolatilityProcess mod, vol = request.param diff --git a/arch/unitroot/unitroot.py b/arch/unitroot/unitroot.py index c00c0dcca7..0b4882a0e9 100644 --- a/arch/unitroot/unitroot.py +++ b/arch/unitroot/unitroot.py @@ -17,6 +17,7 @@ diag, diff, empty, + float64, full, hstack, inf, @@ -235,7 +236,7 @@ def _autolag_ols_low_memory( trendx.append(empty((nobs, 0))) else: if "tt" in trend: - tt = arange(1, nobs + 1, dtype=float)[:, None] ** 2 + tt = arange(1, nobs + 1, dtype=float64)[:, None] ** 2 tt *= sqrt(5) / float(nobs) ** (5 / 2) trendx.append(tt) if "t" in trend: @@ -402,6 +403,7 @@ def _df_select_lags( if max_lags is None: max_lags = int(ceil(12.0 * power(nobs / 100.0, 1 / 4.0))) max_lags = max(min(max_lags, max_max_lags), 0) + assert isinstance(max_lags, int) if max_lags > 119: warnings.warn( "The value of max_lags was not specified and has been calculated as " @@ -1970,8 +1972,8 @@ def auto_bandwidth( float The estimated optimal bandwidth. """ - y = ensure1d(y, "y") - if y.shape[0] < 2: + y_arr = ensure1d(y, "y") + if y_arr.shape[0] < 2: raise ValueError("Data must contain more than one observation") lower_kernel = kernel.lower() @@ -1987,12 +1989,12 @@ def auto_bandwidth( else: raise ValueError("Unknown kernel") - n = int(4 * ((len(y) / 100) ** n_power)) + n = int(4 * ((len(y_arr) / 100) ** n_power)) sig = (n + 1) * [0] for i in range(n + 1): - a = list(y[i:]) - b = list(y[: len(y) - i]) + a = list(y_arr[i:]) + b = list(y_arr[: len(y_arr) - i]) sig[i] = int(npsum([i * j for (i, j) in zip(a, b)])) sigma_m1 = sig[1 : len(sig)] # sigma without the 1st element @@ -2018,6 +2020,6 @@ def auto_bandwidth( else: # kernel == "qs": gamma = 1.3221 * (((s2 / s0) ** 2) ** t_power) - bandwidth = gamma * power(len(y), t_power) + bandwidth = gamma * power(len(y_arr), t_power) return bandwidth diff --git a/arch/univariate/base.py b/arch/univariate/base.py index 9fdc6a61ae..4103c6f087 100644 --- a/arch/univariate/base.py +++ b/arch/univariate/base.py @@ -7,7 +7,7 @@ from copy import deepcopy import datetime as dt from functools import cached_property -from typing import Any, Callable, Optional, Union, cast +from typing import Any, Callable, Optional, Union, cast, overload import warnings import numpy as np @@ -410,6 +410,38 @@ def _fit_parameterless_model( deepcopy(self), ) + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + ) -> float: # pragma: no cover + ... # pragma: no cover + + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + individual: Literal[False] = ..., + ) -> float: # pragma: no cover + ... # pragma: no cover + + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + individual: Literal[True] = ..., + ) -> Float64Array: # pragma: no cover + ... # pragma: no cover + def _loglikelihood( self, parameters: Float64Array, @@ -706,6 +738,7 @@ def fit( if starting_values is not None: assert sv is not None sv = ensure1d(sv, "starting_values") + assert isinstance(sv, (np.ndarray, pd.Series)) valid = sv.shape[0] == num_params if a.shape[0] > 0: satisfies_constraints = a.dot(sv) - b >= 0 @@ -1362,7 +1395,7 @@ def _set_tight_x(axis: Axes, index: pd.Index) -> None: ax = fig.add_subplot(2, 1, 1) ax.plot(self._index.values, self.resid / self.conditional_volatility) ax.set_title("Standardized Residuals") - ax.axes.xaxis.set_ticklabels([]) + ax.set_xticklabels([]) _set_tight_x(ax, self._index) ax = fig.add_subplot(2, 1, 2) diff --git a/arch/univariate/mean.py b/arch/univariate/mean.py index d0d4e075a1..264b07e208 100644 --- a/arch/univariate/mean.py +++ b/arch/univariate/mean.py @@ -5,7 +5,7 @@ from collections.abc import Mapping, Sequence import copy -from typing import TYPE_CHECKING, Callable, Optional, Union, cast +from typing import TYPE_CHECKING, Callable, Optional, Union, cast, overload import numpy as np import pandas as pd @@ -546,7 +546,9 @@ def _check_specification(self) -> None: if isinstance(self._x, pd.Series): self._x = pd.DataFrame(self._x) elif self._x.ndim == 1: + assert isinstance(self._x, np.ndarray) self._x = np.asarray(self._x)[:, None] + assert isinstance(self._x, (np.ndarray, pd.DataFrame)) if self._x.ndim != 2 or self._x.shape[0] != self._y.shape[0]: raise ValueError( "x must be nobs by n, where nobs is the same as " @@ -1723,6 +1725,38 @@ def resids( def starting_values(self) -> Float64Array: return np.r_[super().starting_values(), 0.0] + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + ) -> float: # pragma: no cover + ... # pragma: no cover + + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + individual: Literal[False] = ..., + ) -> float: # pragma: no cover + ... # pragma: no cover + + @overload + def _loglikelihood( + self, + parameters: Float64Array, + sigma2: Float64Array, + backcast: Union[float, Float64Array], + var_bounds: Float64Array, + individual: Literal[True] = ..., + ) -> Float64Array: # pragma: no cover + ... # pragma: no cover + def _loglikelihood( self, parameters: Float64Array, diff --git a/arch/univariate/volatility.py b/arch/univariate/volatility.py index f41e625ecc..aae4991a5a 100644 --- a/arch/univariate/volatility.py +++ b/arch/univariate/volatility.py @@ -2111,6 +2111,7 @@ def compute_variance( var_bounds: Float64Array, ) -> Float64Array: lam = parameters[0] if self._estimate_lam else self.lam + assert isinstance(lam, float) return ewma_recursion(lam, resids, sigma2, resids.shape[0], float(backcast)) def constraints(self) -> tuple[Float64Array, Float64Array]: