Source code for arch.covariance.kernel

from __future__ import annotations

from arch.compat.numba import jit

from abc import ABC, abstractmethod
from functools import cached_property
from typing import SupportsInt, cast

import numpy as np
from pandas import DataFrame, Index
from pandas.util._decorators import Substitution

from arch.typing import ArrayLike, Float64Array
from arch.utility.array import AbstractDocStringInheritor, ensure1d, ensure2d

__all__ = [
    "Bartlett",
    "Parzen",
    "ParzenCauchy",
    "ParzenGeometric",
    "ParzenRiesz",
    "TukeyHamming",
    "TukeyHanning",
    "TukeyParzen",
    "CovarianceEstimate",
    "CovarianceEstimator",
    "QuadraticSpectral",
    "Andrews",
    "Gallant",
    "NeweyWest",
]

KERNELS = [
    "Bartlett",
    "Parzen",
    "ParzenCauchy",
    "ParzenGeometric",
    "ParzenRiesz",
    "TukeyHamming",
    "TukeyHanning",
    "TukeyParzen",
    "QuadraticSpectral",
    "Andrews",
    "Gallant",
    "NeweyWest",
]


[docs] class CovarianceEstimate: r""" Covariance estimate using a long-run covariance estimator Parameters ---------- short_run : ndarray The short-run covariance estimate. one_sided_strict : ndarray The one-sided strict covariance estimate. columns : {None, list[str]} Column labels to use if covariance estimates are returned as DataFrames. long_run : ndarray, default None The long-run covariance estimate. If not provided, computed from short_run and one_sided_strict. one_sided_strict : ndarray, default None The one-sided-strict covariance estimate. If not provided, computed from short_run and one_sided_strict. Notes ----- If :math:`\Gamma_0` is the short-run covariance and :math:`\Lambda_1` is the one-sided strict covariance, then the long-run covariance is defined .. math:: \Omega = \Gamma_0 + \Lambda_1 + \Lambda_1^\prime and the one-sided covariance is .. math:: \Lambda_0 = \Gamma_0 + \Lambda_1. """ def __init__( self, short_run: Float64Array, one_sided_strict: Float64Array, columns: Index | list[str] | None = None, long_run: Float64Array | None = None, one_sided: Float64Array | None = None, ) -> None: self._sr = short_run self._oss = one_sided_strict self._columns = columns self._long_run = long_run self._one_sided = one_sided def _wrap(self, value: Float64Array) -> Float64Array | DataFrame: if self._columns is not None: return DataFrame(value, columns=self._columns, index=self._columns) return value @cached_property def long_run(self) -> Float64Array | DataFrame: """ The long-run covariance estimate. """ if self._long_run is not None: long_run = self._long_run else: long_run = self._sr + self._oss + self._oss.T return self._wrap(long_run) @cached_property def short_run(self) -> Float64Array | DataFrame: """ The short-run covariance estimate. """ return self._wrap(self._sr) @cached_property def one_sided(self) -> Float64Array | DataFrame: """ The one-sided covariance estimate. """ if self._one_sided is not None: one_sided = self._one_sided else: one_sided = self._sr + self._oss return self._wrap(one_sided) @cached_property def one_sided_strict(self) -> Float64Array | DataFrame: """ The one-sided strict covariance estimate. """ return self._wrap(self._oss)
@jit(nopython=True) def _cov_jit(df, k, num_weights, w, x): oss = np.zeros((k, k)) for i in range(1, num_weights): oss += w[i] * (x[i:].T @ x[:-i]) / df return oss class CovarianceEstimator(ABC): r""" %(kernel_name)s kernel covariance estimation. Parameters ---------- x : array_like The data to use in covariance estimation. bandwidth : float, default None The kernel's bandwidth. If None, optimal bandwidth is estimated. df_adjust : int, default 0 Degrees of freedom to remove when adjusting the covariance. Uses the number of observations in x minus df_adjust when dividing inner-products. center : bool, default True A flag indicating whether x should be demeaned before estimating the covariance. weights : array_like, default None An array of weights used to combine when estimating optimal bandwidth. If not provided, a vector of 1s is used. Must have nvar elements. force_int : bool, default False Force bandwidth to be an integer. Notes ----- The kernel weights are computed using .. math:: %(formula)s where :math:`z=\frac{h}{H}, h=0, 1, \ldots, H` where H is the bandwidth. """ _name = "" def __init__( self, x: ArrayLike, bandwidth: float | None = None, df_adjust: int = 0, center: bool = True, weights: ArrayLike | None = None, force_int: bool = False, ): self._x_orig = ensure2d(x, "x") self._x = np.asarray(self._x_orig) self._center = center if self._center: self._x = self._x - self._x.mean(0) if bandwidth is not None: if not np.isscalar(bandwidth) or (cast(float, bandwidth) < 0.0): raise ValueError("bandwidth must be a non-negative scalar.") self._bandwidth = bandwidth self._auto_bandwidth = bandwidth is None if not np.isscalar(df_adjust) or (cast(int, df_adjust) < 0): raise ValueError("df_adjust must be a non-negative integer.") self._df_adjust = int(cast(SupportsInt, df_adjust)) self._df = self._x.shape[0] - self._df_adjust if self._df <= 0: raise ValueError( "Degrees of freedom is <= 0 after adjusting the sample " "size of x using df_adjust. df_adjust must be less than" f" {self._x.shape[0]}" ) if weights is None: xw = self._x_weights = np.ones((self._x.shape[1], 1)) else: xw = ensure1d(np.asarray(weights), "weights", series=False) xw = self._x_weights = xw[:, None] if ( xw.shape[0] != self._x.shape[1] or xw.shape[1] != 1 or np.any(xw < 0) or np.all(xw == 0) ): raise ValueError( f"weights must be a 1 by {self._x.shape[1]} (x.shape[1]) " f"array with non-negative values where at least one value is " "strictly greater than 0." ) self._force_int = force_int def __str__(self) -> str: out = ( f"Kernel: {self.name}", f"Bandwidth: {self.bandwidth}", f"Degree of Freedom Adjustment: {self._df_adjust}", f"Centered: {self.centered}", f"Automatic Bandwidth: {self._auto_bandwidth}", ) return "\n".join(out) def __repr__(self) -> str: return self.__str__() + f"\nID: {hex(id(self))}" @property def name(self) -> str: """ The covariance estimator's name. Returns ------- str The covariance estimator's name. """ return self._name @property def bandwidth(self) -> float: """ The bandwidth used by the covariance estimator. Returns ------- float The user-provided or estimated optimal bandwidth. """ if self._bandwidth is None: self._bandwidth = self.opt_bandwidth if self._force_int: return float(int(np.ceil(self._bandwidth))) return self._bandwidth @property def centered(self) -> bool: """ Flag indicating whether the data are centered (demeaned). Returns ------- bool A flag indicating whether the estimator is centered. """ return self._center @property @abstractmethod def kernel_const(self) -> float: """ The constant used in optimal bandwidth calculation. Returns ------- float The constant value used in the optimal bandwidth calculation. """ @property @abstractmethod def bandwidth_scale(self) -> float: """ The power used in optimal bandwidth calculation. Returns ------- float The power value used in the optimal bandwidth calculation. """ @property @abstractmethod def rate(self) -> float: """ The optimal rate used in bandwidth selection. Controls the number of lags used in the variance estimate that determines the estimate of the optimal bandwidth. Returns ------- float The rate used in bandwidth selection. """ def _alpha_q(self) -> float: q = self.bandwidth_scale v = self._x @ self._x_weights nobs = v.shape[0] n = int(np.ceil(4 * ((nobs / 100) ** self.rate))) f_0s = 0.0 f_qs = 0.0 for j in range(n + 1): sig_j = float(np.squeeze(v[j:].T @ v[: (nobs - j)])) / nobs scale = 1 + (j != 0) f_0s += scale * sig_j f_qs += scale * j**q * sig_j return (f_qs / f_0s) ** 2 @cached_property def opt_bandwidth(self) -> float: r""" Estimate optimal bandwidth. Returns ------- float The estimated optimal bandwidth. Notes ----- Computed as .. math:: \hat{b}_T = c_k \left[\hat{\alpha}\left(q\right) T \right]^{\frac{1}{2q+1}} where :math:`c_k` is a kernel-dependent constant, T is the sample size, q determines the optimal bandwidth rate for the kernel. """ c = self.kernel_const q = self.bandwidth_scale nobs = self._x.shape[0] alpha_q = self._alpha_q() bw = c * (alpha_q * nobs) ** (1 / (2 * q + 1)) if self._force_int: bw = np.ceil(bw) return min(bw, nobs - 1.0) @abstractmethod def _weights(self) -> Float64Array: """ Compute the kernel's weights """ @cached_property def kernel_weights(self) -> Float64Array: """ Weights used in covariance calculation. Returns ------- ndarray The weight vector including 1 in position 0. """ return self._weights() @cached_property def cov(self) -> CovarianceEstimate: """ The estimated covariances. Returns ------- CovarianceEstimate Covariance estimate instance containing 4 estimates: * long_run * short_run * one_sided * one_sided_strict See Also -------- CovarianceEstimate """ x = np.ascontiguousarray(self._x) k = x.shape[1] df = self._df sr = x.T @ x / df w = self.kernel_weights num_weights = w.shape[0] oss = _cov_jit(df, k, num_weights, w, x) labels = self._x_orig.columns if isinstance(self._x_orig, DataFrame) else None return CovarianceEstimate(sr, oss, labels) @property def force_int(self) -> bool: """ Flag indicating whether the bandwidth is restricted to be an integer. """ return self._force_int _bartlett_formula = """\ w=\\begin{cases} 1-\\left|z\\right| & z\\leq1 \\\\ 0 & z>1 \\end{cases} """
[docs] @Substitution(kernel_name="Bartlett's (Newey-West)", formula=_bartlett_formula) class Bartlett(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.1447 @property def bandwidth_scale(self) -> float: return 1.0 @property def rate(self) -> float: return 2 / 9 def _weights(self) -> Float64Array: bw = self.bandwidth return (bw + 1 - np.arange(int(bw + 1), dtype="double")) / (bw + 1)
_parzen_formula = """\ w=\\begin{cases}\ 1-6z^{2}\\left(1-z\\right) & z\\leq\\frac{1}{2} \\\\ \ 2\\left(1-z\\right)^{3} & \\frac{1}{2}<z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} """
[docs] @Substitution(kernel_name="Parzen's", formula=_parzen_formula) class Parzen(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 2.6614 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) w = np.empty_like(x) loc = x <= 0.5 w[loc] = 1 - 6 * x[loc] ** 2 * (1 - x[loc]) w[~loc] = 2 * (1 - x[~loc]) ** 3 return w
_parzen_reisz_formula = """\ w=\\begin{cases} \ 1-z^2 & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Parzen-Reisz", formula=_parzen_reisz_formula) class ParzenRiesz(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.1340 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 1 - x**2
_parzen_geometric_formula = """\ w=\\begin{cases} \ \\frac{1}{1+z} & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Parzen's Geometric", formula=_parzen_geometric_formula) class ParzenGeometric(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.0000 @property def bandwidth_scale(self) -> float: return 1 @property def rate(self) -> float: return 2 / 9 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 1 / (1 + x)
_parzen_cauchy_formula = """\ w=\\begin{cases} \ \\frac{1}{1+z^2} & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Parzen's Cauchy", formula=_parzen_cauchy_formula) class ParzenCauchy(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.0924 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 1 / (1 + x**2)
_tukey_hamming_formula = """\ w=\\begin{cases} \ 0.54 + 0.46 \\cos{\\pi z} & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Tukey-Hamming", formula=_tukey_hamming_formula) class TukeyHamming(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.6694 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 0.54 + 0.46 * np.cos(np.pi * x)
_tukey_hanning_formula = """\ w=\\begin{cases} \ \\frac{1}{2} + \\frac{1}{2} \\cos{\\pi z} & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Tukey-Hanning", formula=_tukey_hanning_formula) class TukeyHanning(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.7462 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 0.5 + 0.5 * np.cos(np.pi * x)
_tukey_parzen_formula = """\ w=\\begin{cases} \ 0.436 + 0.564 \\cos{\\pi z} & z\\leq1 \\\\ \ 0 & z>1 \ \\end{cases} \ """
[docs] @Substitution(kernel_name="Tukey-Parzen", formula=_tukey_parzen_formula) class TukeyParzen(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.8576 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 4 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth x = np.arange(int(bw + 1), dtype="double") / (bw + 1) return 0.436 + 0.564 * np.cos(np.pi * x)
_qs_name = "Quadratic-Spectral (Andrews')" _qs_formula = """\ w=\\begin{cases} \ 1 & z=0\\\\ \ \\frac{3}{x^{2}}\\left(\\frac{\\sin x}{x}-\\cos x\\right),x=\\frac{6\\pi z}{5} & z>0 \ \\end{cases} \ """
[docs] @Substitution(kernel_name=_qs_name, formula=_qs_formula) class QuadraticSpectral(CovarianceEstimator, metaclass=AbstractDocStringInheritor): @property def kernel_const(self) -> float: return 1.3221 @property def bandwidth_scale(self) -> float: return 2 @property def rate(self) -> float: return 2 / 25 def _weights(self) -> Float64Array: bw = self.bandwidth nobs = self._x.shape[0] w = np.zeros(nobs) w[0] = 1.0 if bw > 0: x = np.arange(1, nobs) / bw z = 6 * np.pi * x / 5 w[1:] = 3 / z**2 * (np.sin(z) / z - np.cos(z)) return w
[docs] class Gallant(Parzen): """ Alternative name for Parzen covariance estimator. See Also -------- Parzen """
[docs] class Andrews(QuadraticSpectral): """ Alternative name of the QuadraticSpectral covariance estimator. See Also -------- QuadraticSpectral """
[docs] class NeweyWest(Bartlett): """ Alternative name for Bartlett covariance estimator. See Also -------- Bartlett """