Source code for arch.univariate.volatility

"""
Volatility processes for ARCH model estimation.  All volatility processes must
inherit from :class:`VolatilityProcess` and provide the same methods with the
same inputs.
"""

from __future__ import annotations

from abc import ABCMeta, abstractmethod
from collections.abc import Sequence
import itertools
from typing import TYPE_CHECKING, cast
from warnings import warn

import numpy as np
from numpy.random import RandomState
from scipy.special import gammaln

from arch.typing import (
    ArrayLike1D,
    Float64Array,
    ForecastingMethod,
    Int32Array,
    RNGType,
)
from arch.univariate.distribution import Normal
from arch.utility.array import AbstractDocStringInheritor, ensure1d
from arch.utility.exceptions import InitialValueWarning, initial_value_warning

if TYPE_CHECKING:
    from arch.univariate import recursions_python as rec
else:
    try:
        from arch.univariate import recursions as rec
    except ImportError:
        from arch.univariate import recursions_python as rec

__all__ = [
    "GARCH",
    "ARCH",
    "HARCH",
    "ConstantVariance",
    "EWMAVariance",
    "RiskMetrics2006",
    "EGARCH",
    "FIGARCH",
    "FixedVariance",
    "BootstrapRng",
    "MIDASHyperbolic",
    "VolatilityProcess",
]


def _common_names(p: int, o: int, q: int) -> list[str]:
    names = ["omega"]
    names.extend(["alpha[" + str(i + 1) + "]" for i in range(p)])
    names.extend(["gamma[" + str(i + 1) + "]" for i in range(o)])
    names.extend(["beta[" + str(i + 1) + "]" for i in range(q)])
    return names


class BootstrapRng:
    """
    Simple fake RNG used to transform bootstrap-based forecasting into a standard
    simulation forecasting problem

    Parameters
    ----------
    std_resid : ndarray
        Array containing standardized residuals
    start : int
        Location of first forecast
    random_state : RandomState, optional
        NumPy RandomState instance
    """

    def __init__(
        self,
        std_resid: Float64Array,
        start: int,
        random_state: RandomState | None = None,
    ) -> None:
        if start <= 0 or start > std_resid.shape[0]:
            raise ValueError("start must be > 0 and <= len(std_resid).")

        self.std_resid: Float64Array = std_resid
        self.start: int = start
        self._index = start
        if random_state is None:
            self._random_state = RandomState()
        elif isinstance(random_state, RandomState):
            self._random_state = random_state
        else:
            raise TypeError("random_state must be a NumPy RandomState instance.")

    @property
    def random_state(self) -> RandomState:
        return self._random_state

    def rng(self) -> RNGType:
        def _rng(size: int | tuple[int, ...]) -> Float64Array:
            if self._index >= self.std_resid.shape[0]:
                raise IndexError("not enough data points.")
            index = self._random_state.random_sample(size)
            int_index = np.floor((self._index + 1) * index)
            int_index = int_index.astype(np.int64)
            self._index += 1
            return self.std_resid[int_index]

        return _rng


def ewma_recursion(
    lam: float, resids: Float64Array, sigma2: Float64Array, nobs: int, backcast: float
) -> Float64Array:
    """
    Compute variance recursion for EWMA/RiskMetrics Variance

    Parameters
    ----------
    lam : float
        Smoothing parameter
    resids : ndarray
        Residuals to use in the recursion
    sigma2 : ndarray
        Conditional variances with same shape as resids
    nobs : int
        Length of resids
    backcast : float
        Value to use when initializing the recursion
    """

    # Throw away bounds
    var_bounds = np.ones((nobs, 2)) * np.array([-1.0, 1.7e308])
    rec.garch_recursion(
        np.array([0.0, 1.0 - lam, lam]),
        resids**2.0,
        resids,
        sigma2,
        1,
        0,
        1,
        nobs,
        backcast,
        var_bounds,
    )
    return sigma2


class VarianceForecast:
    _forecasts = None
    _forecast_paths = None

    def __init__(
        self,
        forecasts: Float64Array,
        forecast_paths: Float64Array | None = None,
        shocks: Float64Array | None = None,
    ) -> None:
        self._forecasts = forecasts
        self._forecast_paths = forecast_paths
        self._shocks = shocks

    @property
    def forecasts(self) -> Float64Array | None:
        return self._forecasts

    @property
    def forecast_paths(self) -> Float64Array | None:
        return self._forecast_paths

    @property
    def shocks(self) -> Float64Array | None:
        return self._shocks


[docs] class VolatilityProcess(metaclass=ABCMeta): """ Abstract base class for ARCH models. Allows the conditional mean model to be specified separately from the conditional variance, even though parameters are estimated jointly. """ _updatable: bool = True def __init__(self) -> None: self._num_params = 0 self._name = "" self.closed_form: bool = False self._normal = Normal() self._min_bootstrap_obs = 100 self._start = 0 self._stop = -1 self._volatility_updater: rec.VolatilityUpdater | None = None def __str__(self) -> str: return self.name def __repr__(self) -> str: return self.__str__() + ", id: " + hex(id(self)) @property def name(self) -> str: """The name of the volatility process""" return self._name @property def start(self) -> int: """Index to use to start variance subarray selection""" return self._start @start.setter def start(self, value: int) -> None: self._start = value @property def stop(self) -> int: """Index to use to stop variance subarray selection""" return self._stop @stop.setter def stop(self, value: int) -> None: self._stop = value @property def num_params(self) -> int: """The number of parameters in the model""" return self._num_params @property def updateable(self) -> bool: """Flag indicating that the volatility process supports update""" return self._updatable @property def volatility_updater(self) -> rec.VolatilityUpdater: """ Get the volatility updater associated with the volatility process Returns ------- VolatilityUpdater The updater class Raises ------ NotImplementedError If the process is not updateable """ if not self._updatable or self._volatility_updater is None: raise NotImplementedError("Subclasses may optionally implement") assert self._volatility_updater is not None return self._volatility_updater
[docs] def update( self, index: int, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> float: """ Compute the variance for a single observation Parameters ---------- index : int The numerical index of the variance to compute parameters : ndarray The variance model parameters resids : The residual array. Only uses ``resids[:index]`` when computing ``sigma2[index]`` sigma2 : ndarray The array containing the variances. Only uses ``sigma2[:index]`` when computing ``sigma2[index]``. The computed value is stored in ``sigma2[index]``. backcast : {float, ndarray} Value to use when initializing the recursion var_bounds : ndarray Array containing columns of lower and upper bounds Returns ------- float The variance computed for location ``index`` """ raise NotImplementedError("Subclasses may optionally implement")
@abstractmethod def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: """ Verify the requested forecasting method as valid for the specification Parameters ---------- method : str Forecasting method horizon : int Forecast horizon Raises ------ NotImplementedError * If method is not known or not supported """ def _one_step_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, horizon: int, start_index: int, ) -> tuple[Float64Array, Float64Array]: """ One-step ahead forecast Parameters ---------- parameters : ndarray Parameters required to forecast the volatility model resids : ndarray Residuals to use in the recursion backcast : float Value to use when initializing the recursion var_bounds : ndarray Array containing columns of lower and upper bounds horizon : int Forecast horizon. Must be 1 or larger. Forecasts are produced for horizons in [1, horizon]. Returns ------- sigma2 : ndarray t element array containing the one-step ahead forecasts forecasts : ndarray t by horizon array containing the one-step ahead forecasts in the first location """ t = resids.shape[0] _resids: Float64Array = np.concatenate((resids, np.array([0.0]))) _var_bounds: Float64Array = np.concatenate( (var_bounds, np.array([[0, np.inf]])) ) sigma2 = np.zeros(t + 1) self.compute_variance(parameters, _resids, sigma2, backcast, _var_bounds) forecasts = np.zeros((t - start_index, horizon)) forecasts[:, 0] = sigma2[start_index + 1 :] sigma2 = sigma2[:-1] return sigma2, forecasts @abstractmethod def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: """ Analytic multi-step volatility forecasts from the model Parameters ---------- parameters : ndarray Parameters required to forecast the volatility model resids : ndarray Residuals to use in the recursion backcast : float Value to use when initializing the recursion var_bounds : ndarray Array containing columns of lower and upper bounds start : int Index of the first observation to use as the starting point for the forecast. Default is 0. horizon : int Forecast horizon. Must be 1 or larger. Forecasts are produced for horizons in [1, horizon]. Returns ------- forecasts : VarianceForecast Class containing the variance forecasts, and, if using simulation or bootstrap, the simulated paths. """ @abstractmethod def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: """ Simulation-based volatility forecasts from the model Parameters ---------- parameters : ndarray Parameters required to forecast the volatility model resids : ndarray Residuals to use in the recursion backcast : float Value to use when initializing the recursion. The backcast is assumed to be appropriately transformed so that it can be used without modification, e.g., log of the variance backcast in an EGARCH model. var_bounds : ndarray Array containing columns of lower and upper bounds start : int Index of the first observation to use as the starting point for the forecast. Default is 0. horizon : int Forecast horizon. Must be 1 or larger. Forecasts are produced for horizons in [1, horizon]. simulations : int Number of simulations to run when computing the forecast using either simulation or bootstrap. rng : callable Callable random number generator required if method is 'simulation'. Must take a single shape input and return random samples numbers with that shape. Returns ------- forecasts : VarianceForecast Class containing the variance forecasts, and, if using simulation or bootstrap, the simulated paths. """ def _bootstrap_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, random_state: RandomState | None, ) -> VarianceForecast: """ Simulation-based volatility forecasts using model residuals Parameters ---------- parameters : ndarray Parameters required to forecast the volatility model resids : ndarray Residuals to use in the recursion backcast : {float, ndarray} Value to use when initializing the recursion var_bounds : ndarray Array containing columns of lower and upper bounds start : int Index of the first observation to use as the starting point for the forecast. Default is 0. horizon : int Forecast horizon. Must be 1 or larger. Forecasts are produced for horizons in [1, horizon]. simulations : int Number of simulations to run when computing the forecast using either simulation or bootstrap. random_state : {RandomState, None} NumPy RandomState instance to use in the BootstrapRng Returns ------- forecasts : VarianceForecast Class containing the variance forecasts, and, if using simulation or bootstrap, the simulated paths. """ sigma2 = np.empty_like(resids) self.compute_variance(parameters, resids, sigma2, backcast, var_bounds) std_resid = resids / np.sqrt(sigma2) if start < self._min_bootstrap_obs: raise ValueError( "start must include more than {} " "observations".format(self._min_bootstrap_obs) ) rng = BootstrapRng(std_resid, start, random_state=random_state).rng() return self._simulation_forecast( parameters, resids, backcast, var_bounds, start, horizon, simulations, rng )
[docs] def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array: """ Construct loose bounds for conditional variances. These bounds are used in parameter estimation to ensure that the log-likelihood does not produce NaN values. Parameters ---------- resids : ndarray Approximate residuals to use to compute the lower and upper bounds on the conditional variance power : float, optional Power used in the model. 2.0, the default corresponds to standard ARCH models that evolve in squares. Returns ------- var_bounds : ndarray Array containing columns of lower and upper bounds with the same number of elements as resids """ nobs = resids.shape[0] tau = min(75, nobs) w = 0.94 ** np.arange(tau) w = w / sum(w) var_bound = np.zeros(nobs) initial_value = w.dot(resids[:tau] ** 2.0) ewma_recursion(0.94, resids, var_bound, resids.shape[0], initial_value) var_bounds = np.vstack((var_bound / 1e6, var_bound * 1e6)).T var = resids.var() min_upper_bound = 1 + (resids**2.0).max() lower_bound, upper_bound = var / 1e8, 1e7 * (1 + (resids**2.0).max()) var_bounds[var_bounds[:, 0] < lower_bound, 0] = lower_bound var_bounds[var_bounds[:, 1] < min_upper_bound, 1] = min_upper_bound var_bounds[var_bounds[:, 1] > upper_bound, 1] = upper_bound if power != 2.0: var_bounds **= power / 2.0 return np.ascontiguousarray(var_bounds)
[docs] @abstractmethod def starting_values(self, resids: Float64Array) -> Float64Array: """ Returns starting values for the ARCH model Parameters ---------- resids : ndarray Array of (approximate) residuals to use when computing starting values Returns ------- sv : ndarray Array of starting values """
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: """ Construct values for backcasting to start the recursion Parameters ---------- resids : ndarray Vector of (approximate) residuals Returns ------- backcast : float Value to use in backcasting in the volatility recursion """ tau = min(75, resids.shape[0]) w = 0.94 ** np.arange(tau) w = w / sum(w) return float(np.sum((resids[:tau] ** 2.0) * w))
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: """ Transformation to apply to user-provided backcast values Parameters ---------- backcast : {float, ndarray} User-provided ``backcast`` that approximates sigma2[0]. Returns ------- backcast : {float, ndarray} Backcast transformed to the model-appropriate scale """ if np.any(backcast < 0): raise ValueError("User backcast value must be strictly positive.") return backcast
[docs] @abstractmethod def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: """ Returns bounds for parameters Parameters ---------- resids : ndarray Vector of (approximate) residuals Returns ------- bounds : list[tuple[float,float]] List of bounds where each element is (lower, upper). """
[docs] @abstractmethod def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: """ Compute the variance for the ARCH model Parameters ---------- parameters : ndarray Model parameters resids : ndarray Vector of mean zero residuals sigma2 : ndarray Array with same size as resids to store the conditional variance backcast : {float, ndarray} Value to use when initializing ARCH recursion. Can be an ndarray when the model contains multiple components. var_bounds : ndarray Array containing columns of lower and upper bounds """
[docs] @abstractmethod def constraints(self) -> tuple[Float64Array, Float64Array]: """ Construct parameter constraints arrays for parameter estimation Returns ------- A : ndarray Parameters loadings in constraint. Shape is number of constraints by number of parameters b : ndarray Constraint values, one for each constraint Notes ----- Values returned are used in constructing linear inequality constraints of the form A.dot(parameters) - b >= 0 """
[docs] def forecast( self, parameters: ArrayLike1D, resids: Float64Array, backcast: Float64Array | float, var_bounds: Float64Array, start: int | None = None, horizon: int = 1, method: ForecastingMethod = "analytic", simulations: int = 1000, rng: RNGType | None = None, random_state: RandomState | None = None, ) -> VarianceForecast: """ Forecast volatility from the model Parameters ---------- parameters : {ndarray, Series} Parameters required to forecast the volatility model resids : ndarray Residuals to use in the recursion backcast : float Value to use when initializing the recursion var_bounds : ndarray, 2-d Array containing columns of lower and upper bounds start : {None, int} Index of the first observation to use as the starting point for the forecast. Default is len(resids). horizon : int Forecast horizon. Must be 1 or larger. Forecasts are produced for horizons in [1, horizon]. method : {'analytic', 'simulation', 'bootstrap'} Method to use when producing the forecast. The default is analytic. simulations : int Number of simulations to run when computing the forecast using either simulation or bootstrap. rng : callable Callable random number generator required if method is 'simulation'. Must take a single shape input and return random samples numbers with that shape. random_state : RandomState, optional NumPy RandomState instance to use when method is 'bootstrap' Returns ------- forecasts : VarianceForecast Class containing the variance forecasts, and, if using simulation or bootstrap, the simulated paths. Raises ------ NotImplementedError * If method is not supported ValueError * If the method is not known Notes ----- The analytic ``method`` is not supported for all models. Attempting to use this method when not available will raise a ValueError. """ parameters = np.asarray(parameters) method_name = method.lower() if method_name not in ("analytic", "simulation", "bootstrap"): raise ValueError(f"{method} is not a known forecasting method") if not isinstance(horizon, (int, np.integer)) or horizon < 1: raise ValueError("horizon must be an integer >= 1.") self._check_forecasting_method(cast(ForecastingMethod, method_name), horizon) start = len(resids) - 1 if start is None else start if method_name == "analytic": return self._analytic_forecast( parameters, resids, backcast, var_bounds, start, horizon ) elif method == "simulation": # TODO: This looks like a design flaw.It is optional above but then must # be present. This happens because the caller of this function is # expected to know when to provide the rng or not assert rng is not None return self._simulation_forecast( parameters, resids, backcast, var_bounds, start, horizon, simulations, rng, ) else: if start < 10 or (horizon / start) >= 0.2: raise ValueError( "Bootstrap forecasting requires at least 10 initial " "observations, and the ratio of horizon-to-start < 20%." ) return self._bootstrap_forecast( parameters, resids, backcast, var_bounds, start, horizon, simulations, random_state, )
[docs] @abstractmethod def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: """ Simulate data from the model Parameters ---------- parameters : {ndarray, Series} Parameters required to simulate the volatility model nobs : int Number of data points to simulate rng : callable Callable function that takes a single integer input and returns a vector of random numbers burn : int, optional Number of additional observations to generate when initializing the simulation initial_value : {float, ndarray}, optional Scalar or array of initial values to use when initializing the simulation Returns ------- resids : ndarray The simulated residuals variance : ndarray The simulated variance """
def _gaussian_loglikelihood( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> float: """ Private implementation of a Gaussian log-likelihood for use in constructing starting values or other quantities that do not depend on the distribution used by the model. """ sigma2 = np.zeros_like(resids) self.compute_variance(parameters, resids, sigma2, backcast, var_bounds) return float(self._normal.loglikelihood([], resids, sigma2))
[docs] @abstractmethod def parameter_names(self) -> list[str]: """ Names of model parameters Returns ------- names : list (str) Variables names """
[docs] class ConstantVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" Constant volatility process Notes ----- Model has the same variance in all periods """ def __init__(self) -> None: super().__init__() self._num_params = 1 self._name = "Constant Variance" self.closed_form: bool = True
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: sigma2[:] = parameters[0] return sigma2
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: return np.array([resids.var()])
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) errors = rng(nobs + burn) sigma2 = np.ones(nobs + burn) * parameters[0] data = np.sqrt(sigma2) * errors return data[burn:], sigma2[burn:]
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: return np.ones((1, 1)), np.zeros(1)
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: backcast = super().backcast_transform(backcast) return backcast
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: return float(resids.var())
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: v = float(resids.var()) return [(v / 100000.0, 10.0 * (v + float(resids.mean()) ** 2.0))]
[docs] def parameter_names(self) -> list[str]: return ["sigma2"]
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: t = resids.shape[0] forecasts = np.full((t - start, horizon), np.nan) forecasts[:, :] = parameters[0] forecast_paths = None return VarianceForecast(forecasts, forecast_paths) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: t = resids.shape[0] forecasts = np.empty((t - start, horizon)) forecast_paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) for i in range(t - start): shocks[i, :, :] = np.sqrt(parameters[0]) * rng((simulations, horizon)) forecasts[:, :] = parameters[0] forecast_paths[:, :, :] = parameters[0] return VarianceForecast(forecasts, forecast_paths, shocks)
[docs] class GARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" GARCH and related model estimation The following models can be specified using GARCH: * ARCH(p) * GARCH(p,q) * GJR-GARCH(p,o,q) * AVARCH(p) * AVGARCH(p,q) * TARCH(p,o,q) * Models with arbitrary, pre-specified powers Parameters ---------- p : int Order of the symmetric innovation o : int Order of the asymmetric innovation q : int Order of the lagged (transformed) conditional variance power : float, optional Power to use with the innovations, abs(e) ** power. Default is 2.0, which produces ARCH and related models. Using 1.0 produces AVARCH and related models. Other powers can be specified, although these should be strictly positive, and usually larger than 0.25. Examples -------- >>> from arch.univariate import GARCH Standard GARCH(1,1) >>> garch = GARCH(p=1, q=1) Asymmetric GJR-GARCH process >>> gjr = GARCH(p=1, o=1, q=1) Asymmetric TARCH process >>> tarch = GARCH(p=1, o=1, q=1, power=1.0) Notes ----- In this class of processes, the variance dynamics are .. math:: \sigma_{t}^{\lambda}=\omega + \sum_{i=1}^{p}\alpha_{i}\left|\epsilon_{t-i}\right|^{\lambda} +\sum_{j=1}^{o}\gamma_{j}\left|\epsilon_{t-j}\right|^{\lambda} I\left[\epsilon_{t-j}<0\right]+\sum_{k=1}^{q}\beta_{k}\sigma_{t-k}^{\lambda} where :math:`\lambda` is the ``power``. """ def __init__(self, p: int = 1, o: int = 0, q: int = 1, power: float = 2.0) -> None: super().__init__() self.p: int = int(p) self.o: int = int(o) self.q: int = int(q) self.power: float = power self._num_params = 1 + p + o + q if p < 0 or o < 0 or q < 0: raise ValueError("All lags lengths must be non-negative") if p == 0 and o == 0: raise ValueError("One of p or o must be strictly positive") if power <= 0.0: raise ValueError( "power must be strictly positive, usually larger than 0.25" ) self._name = self._generate_name() self._volatility_updater = rec.GARCHUpdater(self.p, self.o, self.q, self.power) def __str__(self) -> str: descr = self.name if self.power != 1.0 and self.power != 2.0: descr = descr[:-1] + ", " else: descr += "(" for k, v in (("p", self.p), ("o", self.o), ("q", self.q)): if v > 0: descr += k + ": " + str(v) + ", " descr = descr[:-2] + ")" return descr
[docs] def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array: return super().variance_bounds(resids, self.power)
def _generate_name(self) -> str: p, o, q, power = self.p, self.o, self.q, self.power # noqa: F841 if power == 2.0: if o == 0 and q == 0: return "ARCH" elif o == 0: return "GARCH" else: return "GJR-GARCH" elif power == 1.0: if o == 0 and q == 0: return "AVARCH" elif o == 0: return "AVGARCH" else: return "TARCH/ZARCH" else: if o == 0 and q == 0: return f"Power ARCH (power: {self.power:0.1f})" elif o == 0: return f"Power GARCH (power: {self.power:0.1f})" else: return f"Asym. Power GARCH (power: {self.power:0.1f})"
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: v = float(np.mean(abs(resids) ** self.power)) bounds = [(1e-8 * v, 10.0 * float(v))] bounds.extend([(0.0, 1.0)] * self.p) for i in range(self.o): if i < self.p: bounds.append((-1.0, 2.0)) else: bounds.append((0.0, 2.0)) bounds.extend([(0.0, 1.0)] * self.q) return bounds
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: p, o, q = self.p, self.o, self.q k_arch = p + o + q # alpha[i] >0 # alpha[i] + gamma[i] > 0 for i<=p, otherwise gamma[i]>0 # beta[i] >0 # sum(alpha) + 0.5 sum(gamma) + sum(beta) < 1 a = np.zeros((k_arch + 2, k_arch + 1)) for i in range(k_arch + 1): a[i, i] = 1.0 for i in range(o): if i < p: a[i + p + 1, i + 1] = 1.0 a[k_arch + 1, 1:] = -1.0 a[k_arch + 1, p + 1 : p + o + 1] = -0.5 b = np.zeros(k_arch + 2) b[k_arch + 1] = -1.0 return a, b
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: # fresids is abs(resids) ** power # sresids is I(resids<0) power = self.power fresids = np.abs(resids) ** power sresids = np.sign(resids) p, o, q = self.p, self.o, self.q nobs = resids.shape[0] rec.garch_recursion( parameters, fresids, sresids, sigma2, p, o, q, nobs, backcast, var_bounds ) inv_power = 2.0 / power sigma2 **= inv_power return sigma2
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: backcast = super().backcast_transform(backcast) return np.sqrt(backcast) ** self.power
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: power = self.power tau = min(75, resids.shape[0]) w = 0.94 ** np.arange(tau) w = w / sum(w) backcast = np.sum((abs(resids[:tau]) ** power) * w) return float(backcast)
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) p, o, q, power = self.p, self.o, self.q, self.power errors = rng(nobs + burn) if initial_value is None: scale = np.ones_like(parameters) scale[p + 1 : p + o + 1] = 0.5 persistence = np.sum(parameters[1:] * scale[1:]) if (1.0 - persistence) > 0: initial_value = parameters[0] / (1.0 - persistence) else: warn(initial_value_warning, InitialValueWarning) initial_value = parameters[0] sigma2 = np.zeros(nobs + burn) data = np.zeros(nobs + burn) fsigma = np.zeros(nobs + burn) fdata = np.zeros(nobs + burn) max_lag = np.max([p, o, q]) fsigma[:max_lag] = initial_value sigma2[:max_lag] = initial_value ** (2.0 / power) data[:max_lag] = np.sqrt(sigma2[:max_lag]) * errors[:max_lag] fdata[:max_lag] = abs(data[:max_lag]) ** power for t in range(max_lag, nobs + burn): loc = 0 fsigma[t] = parameters[loc] loc += 1 for j in range(p): fsigma[t] += parameters[loc] * fdata[t - 1 - j] loc += 1 for j in range(o): fsigma[t] += parameters[loc] * fdata[t - 1 - j] * (data[t - 1 - j] < 0) loc += 1 for j in range(q): fsigma[t] += parameters[loc] * fsigma[t - 1 - j] loc += 1 sigma2[t] = fsigma[t] ** (2.0 / power) data[t] = errors[t] * np.sqrt(sigma2[t]) fdata[t] = abs(data[t]) ** power return data[burn:], sigma2[burn:]
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: p, o, q = self.p, self.o, self.q power = self.power alphas = [0.01, 0.05, 0.1, 0.2] gammas = alphas abg = [0.5, 0.7, 0.9, 0.98] abgs = list(itertools.product(*[alphas, gammas, abg])) target = np.mean(abs(resids) ** power) scale = np.mean(resids**2) / (target ** (2.0 / power)) target *= scale ** (power / 2) svs = [] var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) llfs = np.zeros(len(abgs)) for i, values in enumerate(abgs): alpha, gamma, agb = values sv = (1.0 - agb) * target * np.ones(p + o + q + 1) if p > 0: sv[1 : 1 + p] = alpha / p agb -= alpha if o > 0: sv[1 + p : 1 + p + o] = gamma / o agb -= gamma / 2.0 if q > 0: sv[1 + p + o : 1 + p + o + q] = agb / q svs.append(sv) llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) loc = np.argmax(llfs) return svs[int(loc)]
[docs] def parameter_names(self) -> list[str]: return _common_names(self.p, self.o, self.q)
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: if horizon == 1: return if method == "analytic" and self.power != 2.0: raise ValueError( "Analytic forecasts not available for horizon > 1 when power != 2" ) return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) if horizon == 1: return VarianceForecast(forecasts) t = resids.shape[0] p, o, q = self.p, self.o, self.q omega = parameters[0] alpha = parameters[1 : p + 1] gamma = parameters[p + 1 : p + o + 1] beta = parameters[p + o + 1 :] m = np.max([p, o, q]) _resids = np.zeros(m + horizon) _asym_resids = np.zeros(m + horizon) _sigma2 = np.zeros(m + horizon) for i in range(start, t): if i - m + 1 >= 0: _resids[:m] = resids[i - m + 1 : i + 1] _asym_resids[:m] = _resids[:m] * (_resids[:m] < 0) _sigma2[:m] = sigma2[i - m + 1 : i + 1] else: # Back-casting needed _resids[: m - i - 1] = np.sqrt(backcast) _resids[m - i - 1 : m] = resids[0 : i + 1] _asym_resids = cast(np.ndarray, _resids * (_resids < 0)) _asym_resids[: m - i - 1] = np.sqrt(0.5 * backcast) _sigma2[:m] = backcast _sigma2[m - i - 1 : m] = sigma2[0 : i + 1] for h in range(0, horizon): fcast_loc = i - start forecasts[fcast_loc, h] = omega start_loc = h + m - 1 for j in range(p): forecasts[fcast_loc, h] += alpha[j] * _resids[start_loc - j] ** 2 for j in range(o): forecasts[fcast_loc, h] += ( gamma[j] * _asym_resids[start_loc - j] ** 2 ) for j in range(q): forecasts[fcast_loc, h] += beta[j] * _sigma2[start_loc - j] _resids[h + m] = np.sqrt(forecasts[fcast_loc, h]) _asym_resids[h + m] = np.sqrt(0.5 * forecasts[fcast_loc, h]) _sigma2[h + m] = forecasts[fcast_loc, h] return VarianceForecast(forecasts) def _simulate_paths( self, m: int, parameters: Float64Array, horizon: int, std_shocks: Float64Array, scaled_forecast_paths: Float64Array, scaled_shock: Float64Array, asym_scaled_shock: Float64Array, ) -> tuple[Float64Array, Float64Array, Float64Array]: power = self.power p, o, q = self.p, self.o, self.q omega = parameters[0] alpha = parameters[1 : p + 1] gamma = parameters[p + 1 : p + o + 1] beta = parameters[p + o + 1 :] shock = np.empty_like(scaled_forecast_paths) for h in range(horizon): loc = h + m - 1 scaled_forecast_paths[:, h + m] = omega for j in range(p): scaled_forecast_paths[:, h + m] += alpha[j] * scaled_shock[:, loc - j] for j in range(o): scaled_forecast_paths[:, h + m] += ( gamma[j] * asym_scaled_shock[:, loc - j] ) for j in range(q): scaled_forecast_paths[:, h + m] += ( beta[j] * scaled_forecast_paths[:, loc - j] ) shock[:, h + m] = std_shocks[:, h] * scaled_forecast_paths[:, h + m] ** ( 1.0 / power ) lt_zero = shock[:, h + m] < 0 scaled_shock[:, h + m] = np.abs(shock[:, h + m]) ** power asym_scaled_shock[:, h + m] = scaled_shock[:, h + m] * lt_zero forecast_paths = scaled_forecast_paths[:, m:] ** (2.0 / power) return np.asarray(np.mean(forecast_paths, 0)), forecast_paths, shock[:, m:] def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) t = resids.shape[0] paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) power = self.power m = np.max([self.p, self.o, self.q]) scaled_forecast_paths = np.zeros((simulations, m + horizon)) scaled_shock = np.zeros((simulations, m + horizon)) asym_scaled_shock = np.zeros((simulations, m + horizon)) for i in range(start, t): std_shocks = rng((simulations, horizon)) if i - m < 0: scaled_forecast_paths[:, :m] = backcast ** (power / 2.0) scaled_shock[:, :m] = backcast ** (power / 2.0) asym_scaled_shock[:, :m] = (0.5 * backcast) ** (power / 2.0) # Use actual values where available count = i + 1 scaled_forecast_paths[:, m - count : m] = sigma2[:count] ** ( power / 2.0 ) scaled_shock[:, m - count : m] = np.abs(resids[:count]) ** power asym = np.abs(resids[:count]) ** power * (resids[:count] < 0) asym_scaled_shock[:, m - count : m] = asym else: scaled_forecast_paths[:, :m] = sigma2[i - m + 1 : i + 1] ** ( power / 2.0 ) scaled_shock[:, :m] = np.abs(resids[i - m + 1 : i + 1]) ** power asym_scaled_shock[:, :m] = scaled_shock[:, :m] * ( resids[i - m + 1 : i + 1] < 0 ) f, p, s = self._simulate_paths( m, parameters, horizon, std_shocks, scaled_forecast_paths, scaled_shock, asym_scaled_shock, ) loc = i - start forecasts[loc, :], paths[loc], shocks[loc] = f, p, s return VarianceForecast(forecasts, paths, shocks)
[docs] class HARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" Heterogeneous ARCH process Parameters ---------- lags : {list, array, int} List of lags to include in the model, or if scalar, includes all lags up the value Examples -------- >>> from arch.univariate import HARCH Lag-1 HARCH, which is identical to an ARCH(1) >>> harch = HARCH() More useful and realistic lag lengths >>> harch = HARCH(lags=[1, 5, 22]) Notes ----- In a Heterogeneous ARCH process, variance dynamics are .. math:: \sigma_{t}^{2}=\omega + \sum_{i=1}^{m}\alpha_{l_{i}} \left(l_{i}^{-1}\sum_{j=1}^{l_{i}}\epsilon_{t-j}^{2}\right) In the common case where ``lags=[1,5,22]``, the model is .. math:: \sigma_{t}^{2}=\omega+\alpha_{1}\epsilon_{t-1}^{2} +\alpha_{5} \left(\frac{1}{5}\sum_{j=1}^{5}\epsilon_{t-j}^{2}\right) +\alpha_{22} \left(\frac{1}{22}\sum_{j=1}^{22}\epsilon_{t-j}^{2}\right) A HARCH process is a special case of an ARCH process where parameters in the more general ARCH process have been restricted. """ def __init__(self, lags: int | Sequence[int] = 1) -> None: super().__init__() if not isinstance(lags, Sequence): lag_val = int(lags) lags = list(range(1, lag_val + 1)) lags_arr = ensure1d(lags, "lags") self.lags: Int32Array = np.array(lags_arr, dtype=np.int32) self._num_lags = lags_arr.shape[0] self._num_params = self._num_lags + 1 self._name = "HARCH" self._volatility_updater = rec.HARCHUpdater(self.lags) def __str__(self) -> str: descr = self.name + "(lags: " descr += ", ".join([str(lag) for lag in self.lags]) descr += ")" return descr
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: lags = self.lags k_arch = lags.shape[0] bounds = [(0.0, 10 * float(np.mean(resids**2.0)))] bounds.extend([(0.0, 1.0)] * k_arch) return bounds
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: k_arch = self._num_lags a = np.zeros((k_arch + 2, k_arch + 1)) for i in range(k_arch + 1): a[i, i] = 1.0 a[k_arch + 1, 1:] = -1.0 b = np.zeros(k_arch + 2) b[k_arch + 1] = -1.0 return a, b
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: lags = self.lags nobs = resids.shape[0] rec.harch_recursion( parameters, resids, sigma2, lags, nobs, backcast, var_bounds ) return sigma2
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) lags = self.lags errors = rng(nobs + burn) if initial_value is None: if (1.0 - np.sum(parameters[1:])) > 0: initial_value = parameters[0] / (1.0 - np.sum(parameters[1:])) else: warn(initial_value_warning, InitialValueWarning) initial_value = parameters[0] sigma2 = np.empty(nobs + burn) data = np.empty(nobs + burn) max_lag = int(np.max(lags)) sigma2[:max_lag] = initial_value data[:max_lag] = np.sqrt(initial_value) for t in range(max_lag, nobs + burn): sigma2[t] = parameters[0] for i in range(lags.shape[0]): param = parameters[1 + i] / lags[i] for j in range(lags[i]): sigma2[t] += param * data[t - 1 - j] ** 2.0 data[t] = errors[t] * np.sqrt(sigma2[t]) return data[burn:], sigma2[burn:]
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: k_arch = self._num_lags alpha = 0.9 sv = (1.0 - alpha) * resids.var() * np.ones(k_arch + 1) sv[1:] = alpha / k_arch return sv
[docs] def parameter_names(self) -> list[str]: names = ["omega"] lags = self.lags names.extend(["alpha[" + str(lags[i]) + "]" for i in range(self._num_lags)]) return names
def _harch_to_arch(self, params: Float64Array) -> Float64Array: arch_params = np.zeros(1 + int(self.lags.max())) arch_params[0] = params[0] for param, lag in zip(params[1:], self.lags): arch_params[1 : lag + 1] += param / lag return arch_params def _common_forecast_components( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, horizon: int, ) -> tuple[float, Float64Array, Float64Array]: arch_params = self._harch_to_arch(parameters) t = resids.shape[0] m = int(self.lags.max()) resids2 = np.empty((t, m + horizon)) resids2[:m, :m] = backcast sq_resids = resids**2.0 for i in range(m): resids2[m - i - 1 :, i] = sq_resids[: (t - (m - i - 1))] const = arch_params[0] arch = arch_params[1:] return const, arch, resids2 def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: const, arch, resids2 = self._common_forecast_components( parameters, resids, backcast, horizon ) m = int(self.lags.max()) resids2 = resids2[start:] arch_rev = arch[::-1] for i in range(horizon): resids2[:, m + i] = const + resids2[:, i : (m + i)].dot(arch_rev) return VarianceForecast(resids2[:, m:].copy()) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: const, arch, resids2 = self._common_forecast_components( parameters, resids, backcast, horizon ) t, m = resids.shape[0], int(self.lags.max()) paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) temp_resids2 = np.empty((simulations, m + horizon)) arch_rev = arch[::-1] for i in range(start, t): std_shocks = rng((simulations, horizon)) temp_resids2[:, :] = resids2[i : (i + 1)] path_loc = i - start for j in range(horizon): paths[path_loc, :, j] = const + temp_resids2[:, j : (m + j)].dot( arch_rev ) shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt( paths[path_loc, :, j] ) temp_resids2[:, m + j] = shocks[path_loc, :, j] ** 2.0 return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs] class MIDASHyperbolic(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" MIDAS Hyperbolic ARCH process Parameters ---------- m : int Length of maximum lag to include in the model asym : bool Flag indicating whether to include an asymmetric term Examples -------- >>> from arch.univariate import MIDASHyperbolic 22-lag MIDAS Hyperbolic process >>> harch = MIDASHyperbolic() Longer 66-period lag >>> harch = MIDASHyperbolic(m=66) Asymmetric MIDAS Hyperbolic process >>> harch = MIDASHyperbolic(asym=True) Notes ----- In a MIDAS Hyperbolic process, the variance evolves according to .. math:: \sigma_{t}^{2}=\omega+ \sum_{i=1}^{m}\left(\alpha+\gamma I\left[\epsilon_{t-j}<0\right]\right) \phi_{i}(\theta)\epsilon_{t-i}^{2} where .. math:: \phi_{i}(\theta) \propto \Gamma(i+\theta)/(\Gamma(i+1)\Gamma(\theta)) where :math:`\Gamma` is the gamma function. :math:`\{\phi_i(\theta)\}` is normalized so that :math:`\sum \phi_i(\theta)=1` References ---------- .. [*] Foroni, Claudia, and Massimiliano Marcellino. "A survey of Econometric Methods for Mixed-Frequency Data". Norges Bank. (2013). .. [*] Sheppard, Kevin. "Direct volatility modeling". Manuscript. (2018). """ def __init__(self, m: int = 22, asym: bool = False) -> None: super().__init__() self.m: int = int(m) self._asym = bool(asym) self._num_params = 3 + self._asym self._name = "MIDAS Hyperbolic" self._volatility_updater = rec.MIDASUpdater(self.m, self._asym) def __str__(self) -> str: descr = self.name descr += f"(lags: {self.m}, asym: {self._asym}" return descr
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: bounds = [(0.0, 10 * float(np.mean(resids**2.0)))] # omega bounds.extend([(0.0, 1.0)]) # 0 <= alpha < 1 if self._asym: bounds.extend([(-1.0, 2.0)]) # -1 <= gamma < 2 bounds.extend([(0.0, 1.0)]) # theta return bounds
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: """ Constraints Notes ----- Parameters are (omega, alpha, gamma, theta) A.dot(parameters) - b >= 0 1. omega >0 2. alpha>0 or alpha + gamma > 0 3. alpha<1 or alpha+0.5*gamma<1 4. theta > 0 5. theta < 1 """ symm = not self._asym k = 3 + self._asym a = np.zeros((5, k)) b = np.zeros(5) # omega a[0, 0] = 1.0 # alpha >0 or alpha+gamma>0 # alpha<1 or alpha+0.5*gamma<1 if symm: a[1, 1] = 1.0 a[2, 1] = -1.0 else: a[1, 1:3] = 1.0 a[2, 1:3] = [-1, -0.5] b[2] = -1.0 # theta a[3, k - 1] = 1.0 a[4, k - 1] = -1.0 b[4] = -1.0 return a, b
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: nobs = resids.shape[0] weights = self._weights(parameters) if not self._asym: params = np.zeros(3) params[:2] = parameters[:2] else: params = parameters[:3] rec.midas_recursion(params, weights, resids, sigma2, nobs, backcast, var_bounds) return sigma2
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = np.asarray(ensure1d(parameters, "parameters", False), dtype=float) if self._asym: omega, alpha, gamma = parameters[:3] else: omega, alpha = parameters[:2] gamma = 0 weights = self._weights(parameters) aw = weights * alpha gw = weights * gamma errors = rng(nobs + burn) if initial_value is None: if (1.0 - alpha - 0.5 * gamma) > 0: initial_value = parameters[0] / (1.0 - alpha - 0.5 * gamma) else: warn(initial_value_warning, InitialValueWarning) initial_value = parameters[0] m = weights.shape[0] burn = max(burn, m) sigma2 = np.empty(nobs + burn) data = np.empty(nobs + burn) sigma2[:m] = initial_value data[:m] = np.sqrt(initial_value) for t in range(m, nobs + burn): sigma2[t] = omega for i in range(m): if t - 1 - i < m: coef = aw[i] + 0.5 * gw[i] else: coef = aw[i] + gw[i] * (data[t - 1 - i] < 0) sigma2[t] += coef * data[t - 1 - i] ** 2.0 data[t] = errors[t] * np.sqrt(sigma2[t]) return data[burn:], sigma2[burn:]
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: theta = [0.1, 0.5, 0.8, 0.9] alpha = [0.8, 0.9, 0.95, 0.98] var = (resids**2).mean() var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) llfs = [] svs = [] for a, t in itertools.product(alpha, theta): gamma = [0.0] if self._asym: gamma.extend([0.5, 0.9]) for g in gamma: total = a + g / 2 o = (1 - min(total, 0.99)) * var if self._asym: sv = np.array([o, a, g, t]) else: sv = np.array([o, a, t]) svs.append(sv) llf = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) llfs.append(llf) loc = np.argmax(llfs) return svs[int(loc)]
[docs] def parameter_names(self) -> list[str]: names = ["omega", "alpha", "theta"] if self._asym: names.insert(2, "gamma") return names
def _weights(self, params: Float64Array) -> Float64Array: m = self.m # Prevent 0 theta = max(params[-1], np.finfo(np.float64).eps) j = np.arange(1.0, m + 1) w = gammaln(theta + j) - gammaln(j + 1) - gammaln(theta) w = np.exp(w) return w / w.sum() def _common_forecast_components( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, horizon: int, ) -> tuple[int, Float64Array, Float64Array, Float64Array, Float64Array]: if self._asym: omega, alpha, gamma = parameters[:3] else: omega, alpha = parameters[:2] gamma = 0.0 weights = self._weights(parameters) aw = weights * alpha gw = weights * gamma t = resids.shape[0] m = self.m resids2 = np.empty((t, m + horizon)) resids2[:m, :m] = backcast indicator = np.empty((t, m + horizon)) indicator[:m, :m] = 0.5 sq_resids = resids**2.0 for i in range(m): resids2[m - i - 1 :, i] = sq_resids[: (t - (m - i - 1))] indicator[m - i - 1 :, i] = resids[: (t - (m - i - 1))] < 0 return omega, aw, gw, resids2, indicator def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: omega, aw, gw, resids2, indicator = self._common_forecast_components( parameters, resids, backcast, horizon ) m = self.m resids2 = resids2[start:].copy() aw_rev = aw[::-1] gw_rev = gw[::-1] for i in range(horizon): resids2[:, m + i] = omega + resids2[:, i : (m + i)].dot(aw_rev) if self._asym: resids2_ind = resids2[:, i : (m + i)] * indicator[:, i : (m + i)] resids2[:, m + i] += resids2_ind.dot(gw_rev) indicator[:, m + i] = 0.5 return VarianceForecast(resids2[:, m:].copy()) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: omega, aw, gw, resids2, indicator = self._common_forecast_components( parameters, resids, backcast, horizon ) t = resids.shape[0] m = self.m shocks = np.empty((t - start, simulations, horizon)) paths = np.empty((t - start, simulations, horizon)) temp_resids2 = np.empty((simulations, m + horizon)) temp_indicator = np.empty((simulations, m + horizon)) aw_rev = aw[::-1] gw_rev = gw[::-1] for i in range(start, t): std_shocks = rng((simulations, horizon)) temp_resids2[:, :] = resids2[i : (i + 1)] temp_indicator[:, :] = indicator[i : (i + 1)] path_loc = i - start for j in range(horizon): paths[path_loc, :, j] = omega + temp_resids2[:, j : (m + j)].dot(aw_rev) if self._asym: temp_resids2_ind = ( temp_resids2[:, j : (m + j)] * temp_indicator[:, j : (m + j)] ) paths[path_loc, :, j] += temp_resids2_ind.dot(gw_rev) shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt( paths[path_loc, :, j] ) temp_resids2[:, m + j] = shocks[path_loc, :, j] ** 2.0 temp_indicator[:, m + j] = (shocks[path_loc, :, j] < 0).astype( np.double ) return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs] class ARCH(GARCH): r""" ARCH process Parameters ---------- p : int Order of the symmetric innovation Examples -------- ARCH(1) process >>> from arch.univariate import ARCH ARCH(5) process >>> arch = ARCH(p=5) Notes ----- The variance dynamics of the model estimated .. math:: \sigma_t^{2}=\omega+\sum_{i=1}^{p}\alpha_{i}\epsilon_{t-i}^{2} """ def __init__(self, p: int = 1) -> None: super().__init__(p, 0, 0, 2.0) self._num_params = p + 1
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: p = self.p alphas = np.arange(0.1, 0.95, 0.05) svs = [] backcast = self.backcast(resids) llfs = alphas.copy() var_bounds = self.variance_bounds(resids) for i, alpha in enumerate(alphas): sv = (1.0 - alpha) * resids.var() * np.ones(p + 1) sv[1:] = alpha / p svs.append(sv) llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) loc = np.argmax(llfs) return svs[int(loc)]
[docs] class EWMAVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" Exponentially Weighted Moving-Average (RiskMetrics) Variance process Parameters ---------- lam : {float, None}, optional Smoothing parameter. Default is 0.94. Set to None to estimate lam jointly with other model parameters Examples -------- Daily RiskMetrics EWMA process >>> from arch.univariate import EWMAVariance >>> rm = EWMAVariance(0.94) Notes ----- The variance dynamics of the model .. math:: \sigma_t^{2}=\lambda\sigma_{t-1}^2 + (1-\lambda)\epsilon^2_{t-1} When lam is provided, this model has no parameters since the smoothing parameter is treated as fixed. Set lam to ``None`` to jointly estimate this parameter when fitting the model. """ def __init__(self, lam: float | None = 0.94) -> None: super().__init__() self.lam: float | None = lam self._estimate_lam = lam is None self._num_params = 1 if self._estimate_lam else 0 if lam is not None and not 0.0 < lam < 1.0: raise ValueError("lam must be strictly between 0 and 1") self._name = "EWMA/RiskMetrics" self._volatility_updater = rec.EWMAUpdater(self.lam) def __str__(self) -> str: if self._estimate_lam: descr = self.name + "(lam: Estimated)" else: assert self.lam is not None descr = self.name + "(lam: " + f"{self.lam:0.2f}" + ")" return descr
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: if self._estimate_lam: return np.array([0.94]) return np.array([])
[docs] def parameter_names(self) -> list[str]: if self._estimate_lam: return ["lam"] return []
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: if self._estimate_lam: return [(0, 1)] return []
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: lam = parameters[0] if self._estimate_lam else self.lam return ewma_recursion(lam, resids, sigma2, resids.shape[0], float(backcast))
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: if self._estimate_lam: a = np.ones((1, 1)) b = np.zeros((1,)) return a, b return np.empty((0, 0)), np.empty((0,))
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) errors = rng(nobs + burn) if initial_value is None: initial_value = 1.0 sigma2 = np.zeros(nobs + burn) data = np.zeros(nobs + burn) sigma2[0] = initial_value data[0] = np.sqrt(sigma2[0]) if self._estimate_lam: lam = parameters[0] else: lam = self.lam one_m_lam = 1.0 - lam for t in range(1, nobs + burn): sigma2[t] = lam * sigma2[t - 1] + one_m_lam * data[t - 1] ** 2.0 data[t] = np.sqrt(sigma2[t]) * errors[t] return data[burn:], sigma2[burn:]
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: _, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start_index=start ) for i in range(1, horizon): forecasts[:, i] = forecasts[:, 0] return VarianceForecast(forecasts) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: one_step = self._analytic_forecast( parameters, resids, backcast, var_bounds, start, 1 ) t = resids.shape[0] paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) if self._estimate_lam: lam = parameters[0] else: lam = self.lam assert one_step.forecasts is not None for i in range(start, t): std_shocks = rng((simulations, horizon)) path_loc = i - start paths[path_loc, :, 0] = one_step.forecasts[path_loc] shocks[path_loc, :, 0] = ( np.sqrt(one_step.forecasts[path_loc]) * std_shocks[:, 0] ) for h in range(1, horizon): paths[path_loc, :, h] = (1 - lam) * shocks[ path_loc, :, h - 1 ] ** 2.0 + lam * paths[path_loc, :, h - 1] shocks[path_loc, :, h] = ( np.sqrt(paths[path_loc, :, h]) * std_shocks[:, h] ) return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs] class RiskMetrics2006(VolatilityProcess, metaclass=AbstractDocStringInheritor): """ RiskMetrics 2006 Variance process Parameters ---------- tau0 : {int, float}, optional Length of long cycle. Default is 1560. tau1 : {int, float}, optional Length of short cycle. Default is 4. kmax : int, optional Number of components. Default is 14. rho : float, optional Relative scale of adjacent cycles. Default is sqrt(2) Examples -------- Daily RiskMetrics 2006 process >>> from arch.univariate import RiskMetrics2006 >>> rm = RiskMetrics2006() Notes ----- The variance dynamics of the model are given as a weighted average of kmax EWMA variance processes where the smoothing parameters and weights are determined by tau0, tau1 and rho. This model has no parameters since the smoothing parameter is fixed. """ def __init__( self, tau0: float = 1560, tau1: float = 4, kmax: int = 14, rho: float = 1.4142135623730951, ) -> None: super().__init__() self.tau0: float = tau0 self.tau1: float = tau1 self.kmax: int = kmax self.rho: float = rho self._num_params = 0 if tau0 <= tau1 or tau1 <= 0: raise ValueError("tau0 must be greater than tau1 and tau1 > 0") if tau1 * rho ** (kmax - 1) > tau0: raise ValueError("tau1 * rho ** (kmax-1) smaller than tau0") if not kmax >= 1: raise ValueError("kmax must be a positive integer") if not rho > 1: raise ValueError("rho must be a positive number larger than 1") self._name = "RiskMetrics2006" self._volatility_updater = rec.RiskMetrics2006Updater( self.kmax, self._ewma_combination_weights(), self._ewma_smoothing_parameters(), ) def __str__(self) -> str: descr = self.name descr += ( f"(tau0: {self.tau0:0.1f}, tau1: {self.tau1:0.1f}, " f"kmax: {self.kmax:d}, rho: {self.rho:0.3f}" ) descr += ")" return descr def _ewma_combination_weights(self) -> Float64Array: """ Returns ------- weights : ndarray Combination weights for EWMA components """ tau0, tau1, kmax, rho = self.tau0, self.tau1, self.kmax, self.rho taus = tau1 * (rho ** np.arange(kmax)) w = 1 - np.log(taus) / np.log(tau0) w = w / w.sum() return w def _ewma_smoothing_parameters(self) -> Float64Array: tau1, kmax, rho = self.tau1, self.kmax, self.rho taus = tau1 * (rho ** np.arange(kmax)) mus = cast(Float64Array, np.exp(-1.0 / taus)) return mus
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: """ Construct values for backcasting to start the recursion Parameters ---------- resids : ndarray Vector of (approximate) residuals Returns ------- backcast : ndarray Backcast values for each EWMA component """ nobs = resids.shape[0] mus = self._ewma_smoothing_parameters() resids2 = resids**2.0 backcast = np.zeros(mus.shape[0]) for k in range(int(self.kmax)): mu = mus[k] end_point = int(max(min(np.floor(np.log(0.01) / np.log(mu)), nobs), k)) weights = mu ** np.arange(end_point) weights = weights / weights.sum() backcast[k] = weights.dot(resids2[:end_point]) return backcast
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: backcast = super().backcast_transform(backcast) mus = self._ewma_smoothing_parameters() backcast_arr = np.asarray(backcast) if backcast_arr.ndim == 0: backcast_arr = cast(np.ndarray, backcast * np.ones(mus.shape[0])) if backcast_arr.shape[0] != mus.shape[0] and backcast_arr.ndim != 0: raise ValueError( "User backcast must be either a scalar or an vector containing the " "number of\ncomponent EWMAs in the model." ) return backcast_arr
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: return np.empty((0,))
[docs] def parameter_names(self) -> list[str]: return []
[docs] def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array: return np.ones((resids.shape[0], 1)) * np.array( [-1.0, np.finfo(np.float64).max] )
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: return []
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: return np.empty((0, 0)), np.empty((0,))
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: nobs = resids.shape[0] kmax = self.kmax w = self._ewma_combination_weights() mus = self._ewma_smoothing_parameters() sigma2_temp = np.zeros_like(sigma2) backcast = cast(Float64Array, backcast) for k in range(kmax): mu = mus[k] ewma_recursion(mu, resids, sigma2_temp, nobs, backcast[k]) if k == 0: sigma2[:] = w[k] * sigma2_temp else: sigma2 += w[k] * sigma2_temp return sigma2
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: errors = rng(nobs + burn) kmax = self.kmax w = self._ewma_combination_weights() mus = self._ewma_smoothing_parameters() if initial_value is None: initial_value = 1.0 sigma2s = np.zeros((nobs + burn, kmax)) sigma2s[0, :] = initial_value sigma2 = np.zeros(nobs + burn) data = np.zeros(nobs + burn) data[0] = np.sqrt(initial_value) sigma2[0] = w.dot(sigma2s[0]) for t in range(1, nobs + burn): sigma2s[t] = mus * sigma2s[t - 1] + (1 - mus) * data[t - 1] ** 2.0 sigma2[t] = w.dot(sigma2s[t]) data[t] = np.sqrt(sigma2[t]) * errors[t] return data[burn:], sigma2[burn:]
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: _, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start_index=start ) for i in range(1, horizon): forecasts[:, i] = forecasts[:, 0] return VarianceForecast(forecasts) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: kmax = self.kmax w = self._ewma_combination_weights() mus = self._ewma_smoothing_parameters() backcast = cast(Float64Array, np.asarray(backcast)) t = resids.shape[0] paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) temp_paths = np.empty((kmax, simulations, horizon)) # We use the transpose here to get C-contiguous arrays component_one_step = np.empty((kmax, t + 1)) _resids = np.ascontiguousarray(resids) for k in range(kmax): mu = mus[k] ewma_recursion(mu, _resids, component_one_step[k, :], t + 1, backcast[k]) # Transpose to be (t+1, kmax) component_one_step = component_one_step.T for i in range(start, t): std_shocks = rng((simulations, horizon)) for k in range(kmax): temp_paths[k, :, 0] = component_one_step[i, k] path_loc = i - start paths[path_loc, :, 0] = w.dot(temp_paths[:, :, 0]) shocks[path_loc, :, 0] = std_shocks[:, 0] * np.sqrt(paths[path_loc, :, 0]) for j in range(1, horizon): for k in range(kmax): mu = mus[k] temp_paths[k, :, j] = ( mu * temp_paths[k, :, j - 1] + (1 - mu) * shocks[path_loc, :, j - 1] ** 2.0 ) paths[path_loc, :, j] = w.dot(temp_paths[:, :, j]) shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt( paths[path_loc, :, j] ) return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs] class EGARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" EGARCH model estimation Parameters ---------- p : int Order of the symmetric innovation o : int Order of the asymmetric innovation q : int Order of the lagged (transformed) conditional variance Examples -------- >>> from arch.univariate import EGARCH Symmetric EGARCH(1,1) >>> egarch = EGARCH(p=1, q=1) Standard EGARCH process >>> egarch = EGARCH(p=1, o=1, q=1) Exponential ARCH process >>> earch = EGARCH(p=5) Notes ----- In this class of processes, the variance dynamics are .. math:: \ln\sigma_{t}^{2}=\omega +\sum_{i=1}^{p}\alpha_{i} \left(\left|e_{t-i}\right|-\sqrt{2/\pi}\right) +\sum_{j=1}^{o}\gamma_{j} e_{t-j} +\sum_{k=1}^{q}\beta_{k}\ln\sigma_{t-k}^{2} where :math:`e_{t}=\epsilon_{t}/\sigma_{t}`. """ def __init__(self, p: int = 1, o: int = 0, q: int = 1) -> None: super().__init__() self.p: int = int(p) self.o: int = int(o) self.q: int = int(q) self._num_params = 1 + p + o + q if p < 0 or o < 0 or q < 0: raise ValueError("All lags lengths must be non-negative") if p == 0 and o == 0: raise ValueError("One of p or o must be strictly positive") self._name = "EGARCH" if q > 0 else "EARCH" # Helpers for fitting variance self._arrays: tuple[Float64Array, Float64Array, Float64Array] | None = None self._volatility_updater = rec.EGARCHUpdater(self.p, self.o, self.q) def __str__(self) -> str: descr = self.name + "(" for k, v in (("p", self.p), ("o", self.o), ("q", self.q)): if v > 0: descr += k + ": " + str(v) + ", " descr = descr[:-2] + ")" return descr
[docs] def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array: return super().variance_bounds(resids, 2.0)
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: v = np.mean(resids**2.0) log_const = np.log(10000.0) lnv = np.log(v) bounds = [(lnv - log_const, lnv + log_const)] bounds.extend([(-np.inf, np.inf)] * (self.p + self.o)) bounds.extend([(0.0, float(self.q))] * self.q) return bounds
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: p, o, q = self.p, self.o, self.q k_arch = p + o + q a = np.zeros((1, k_arch + 1)) a[0, p + o + 1 :] = -1.0 b = np.zeros((1,)) b[0] = -1.0 return a, b
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: p, o, q = self.p, self.o, self.q nobs = resids.shape[0] if (self._arrays is not None) and (self._arrays[0].shape[0] == nobs): lnsigma2, std_resids, abs_std_resids = self._arrays else: lnsigma2 = np.empty(nobs) abs_std_resids = np.empty(nobs) std_resids = np.empty(nobs) self._arrays = (lnsigma2, abs_std_resids, std_resids) rec.egarch_recursion( parameters, resids, sigma2, p, o, q, nobs, backcast, var_bounds, lnsigma2, std_resids, abs_std_resids, ) return sigma2
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: backcast = super().backcast_transform(backcast) return float(np.log(backcast))
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: return float(np.log(super().backcast(resids)))
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) p, o, q = self.p, self.o, self.q errors = rng(nobs + burn) if initial_value is None: if q > 0: beta_sum = np.sum(parameters[p + o + 1 :]) else: beta_sum = 0.0 if beta_sum < 1: initial_value = parameters[0] / (1.0 - beta_sum) else: warn(initial_value_warning, InitialValueWarning) initial_value = parameters[0] sigma2 = np.zeros(nobs + burn) data = np.zeros(nobs + burn) lnsigma2 = np.zeros(nobs + burn) abserrors = cast(Float64Array, np.abs(errors)) norm_const = np.sqrt(2 / np.pi) max_lag = np.max([p, o, q]) lnsigma2[:max_lag] = initial_value sigma2[:max_lag] = np.exp(initial_value) data[:max_lag] = errors[:max_lag] * np.sqrt(sigma2[:max_lag]) for t in range(max_lag, nobs + burn): loc = 0 lnsigma2[t] = parameters[loc] loc += 1 for j in range(p): lnsigma2[t] += parameters[loc] * (abserrors[t - 1 - j] - norm_const) loc += 1 for j in range(o): lnsigma2[t] += parameters[loc] * errors[t - 1 - j] loc += 1 for j in range(q): lnsigma2[t] += parameters[loc] * lnsigma2[t - 1 - j] loc += 1 sigma2 = cast(Float64Array, np.exp(lnsigma2)) data = errors * np.sqrt(sigma2) return data[burn:], sigma2[burn:]
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: p, o, q = self.p, self.o, self.q alphas = [0.01, 0.05, 0.1, 0.2] gammas = [-0.1, 0.0, 0.1] betas = [0.5, 0.7, 0.9, 0.98] agbs = list(itertools.product(*[alphas, gammas, betas])) target = np.log(np.mean(resids**2)) svs = [] var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) llfs = np.zeros(len(agbs)) for i, values in enumerate(agbs): alpha, gamma, beta = values sv = (1.0 - beta) * target * np.ones(p + o + q + 1) if p > 0: sv[1 : 1 + p] = alpha / p if o > 0: sv[1 + p : 1 + p + o] = gamma / o if q > 0: sv[1 + p + o : 1 + p + o + q] = beta / q svs.append(sv) llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) loc = np.argmax(llfs) return svs[int(loc)]
[docs] def parameter_names(self) -> list[str]: return _common_names(self.p, self.o, self.q)
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: if method == "analytic" and horizon > 1: raise ValueError("Analytic forecasts not available for horizon > 1") return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: _, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) return VarianceForecast(forecasts) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) t = resids.shape[0] p, o, q = self.p, self.o, self.q m = np.max([p, o, q]) lnsigma2 = cast(Float64Array, np.log(sigma2)) e = resids / np.sqrt(sigma2) lnsigma2_mat = np.full((t, m), backcast) e_mat = np.zeros((t, m)) abs_e_mat = np.full((t, m), np.sqrt(2 / np.pi)) for i in range(m): lnsigma2_mat[m - i - 1 :, i] = lnsigma2[: (t - (m - 1) + i)] e_mat[m - i - 1 :, i] = e[: (t - (m - 1) + i)] abs_e_mat[m - i - 1 :, i] = np.abs(e[: (t - (m - 1) + i)]) paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) sqrt2pi = np.sqrt(2 / np.pi) _lnsigma2 = np.empty((simulations, m + horizon)) _e = np.empty((simulations, m + horizon)) _abs_e = np.empty((simulations, m + horizon)) for i in range(start, t): std_shocks = rng((simulations, horizon)) _lnsigma2[:, :m] = lnsigma2_mat[i, :] _e[:, :m] = e_mat[i, :] _e[:, m:] = std_shocks _abs_e[:, :m] = abs_e_mat[i, :] _abs_e[:, m:] = np.abs(std_shocks) for j in range(horizon): loc = 0 _lnsigma2[:, m + j] = parameters[loc] loc += 1 for k in range(p): _lnsigma2[:, m + j] += parameters[loc] * ( _abs_e[:, m + j - 1 - k] - sqrt2pi ) loc += 1 for k in range(o): _lnsigma2[:, m + j] += parameters[loc] * _e[:, m + j - 1 - k] loc += 1 for k in range(q): _lnsigma2[:, m + j] += parameters[loc] * _lnsigma2[:, m + j - 1 - k] loc += 1 loc = i - start paths[loc, :, :] = np.exp(_lnsigma2[:, m:]) shocks[loc, :, :] = np.sqrt(paths[loc, :, :]) * std_shocks return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs] class FixedVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor): """ Fixed volatility process Parameters ---------- variance : {array, Series} Array containing the variances to use. Should have the same shape as the data used in the model. unit_scale : bool, optional Flag whether to enforce a unit scale. If False, a scale parameter will be estimated so that the model variance will be proportional to ``variance``. If True, the model variance is set of ``variance`` Notes ----- Allows a fixed set of variances to be used when estimating a mean model, allowing GLS estimation. """ def __init__(self, variance: Float64Array, unit_scale: bool = False) -> None: super().__init__() self._num_params = 0 if unit_scale else 1 self._unit_scale = unit_scale self._name = "Fixed Variance" self._name += " (Unit Scale)" if unit_scale else "" self._variance_series = ensure1d(variance, "variance", True) self._variance = np.asarray(variance)
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: if self._stop - self._start != sigma2.shape[0]: raise ValueError("start and stop do not have the correct values.") sigma2[:] = self._variance[self._start : self._stop] if not self._unit_scale: sigma2 *= parameters[0] return sigma2
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: if not self._unit_scale: _resids = resids / np.sqrt(self._variance[self._start : self._stop]) return np.array([_resids.var()]) return np.empty(0)
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: raise NotImplementedError("Fixed Variance processes do not support simulation")
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: if not self._unit_scale: return np.ones((1, 1)), np.zeros(1) else: return np.ones((0, 0)), np.zeros(0)
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: return 1.0
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: if not self._unit_scale: v = float(np.squeeze(self.starting_values(resids))) _resids = resids / np.sqrt(self._variance[self._start : self._stop]) mu = _resids.mean() return [(v / 100000.0, 10.0 * (v + mu**2.0))] return []
[docs] def parameter_names(self) -> list[str]: if not self._unit_scale: return ["scale"] return []
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: t = resids.shape[0] forecasts = np.full((t, horizon), np.nan) return VarianceForecast(forecasts) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: t = resids.shape[0] forecasts = np.full((t, horizon), np.nan) forecast_paths = np.empty((t, simulations, horizon)) forecast_paths.fill(np.nan) shocks = np.full((t, simulations, horizon), np.nan) return VarianceForecast(forecasts, forecast_paths, shocks)
[docs] class FIGARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" FIGARCH model Parameters ---------- p : {0, 1} Order of the symmetric innovation q : {0, 1} Order of the lagged (transformed) conditional variance power : float, optional Power to use with the innovations, abs(e) ** power. Default is 2.0, which produces FIGARCH and related models. Using 1.0 produces FIAVARCH and related models. Other powers can be specified, although these should be strictly positive, and usually larger than 0.25. truncation : int, optional Truncation point to use in ARCH(:math:`\infty`) representation. Default is 1000. Examples -------- >>> from arch.univariate import FIGARCH Standard FIGARCH >>> figarch = FIGARCH() FIARCH >>> fiarch = FIGARCH(p=0) FIAVGARCH process >>> fiavarch = FIGARCH(power=1.0) Notes ----- In this class of processes, the variance dynamics are .. math:: h_t = \omega + [1-\beta L - \phi L (1-L)^d] \epsilon_t^2 + \beta h_{t-1} where ``L`` is the lag operator and ``d`` is the fractional differencing parameter. The model is estimated using the ARCH(:math:`\infty`) representation, .. math:: h_t = (1-\beta)^{-1} \omega + \sum_{i=1}^\infty \lambda_i \epsilon_{t-i}^2 The weights are constructed using .. math:: \delta_1 = d \\ \lambda_1 = d - \beta + \phi and the recursive equations .. math:: \delta_j = \frac{j - 1 - d}{j} \delta_{j-1} \\ \lambda_j = \beta \lambda_{j-1} + \delta_j - \phi \delta_{j-1}. When `power` is not 2, the ARCH(:math:`\infty`) representation is still used where :math:`\epsilon_t^2` is replaced by :math:`|\epsilon_t|^p` and ``p`` is the power. """ def __init__( self, p: int = 1, q: int = 1, power: float = 2.0, truncation: int = 1000 ) -> None: super().__init__() self.p: int = int(p) self.q: int = int(q) self.power: float = power self._num_params = 2 + p + q self._truncation = int(truncation) if p < 0 or q < 0 or p > 1 or q > 1: raise ValueError("p and q must be either 0 or 1.") if self._truncation <= 0: raise ValueError("truncation must be a positive integer") if power <= 0.0: raise ValueError( "power must be strictly positive, usually larger than 0.25" ) self._name = self._generate_name() self._volatility_updater = rec.FIGARCHUpdater(p, q, power, truncation) @property def truncation(self) -> int: """Truncation lag for the ARCH-infinity approximation""" return self._truncation def __str__(self) -> str: descr = self.name if self.power != 1.0 and self.power != 2.0: descr = descr[:-1] + ", " else: descr += "(" for k, v in (("p", self.p), ("q", self.q)): descr += k + ": " + str(v) + ", " descr = descr[:-2] + ")" return descr
[docs] def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array: return super().variance_bounds(resids, self.power)
def _generate_name(self) -> str: q, power = self.q, self.power if power == 2.0: if q == 0: return "FIARCH" else: return "FIGARCH" elif power == 1.0: if q == 0: return "FIAVARCH" else: return "FIAVGARCH" else: if q == 0: return f"Power FIARCH (power: {self.power:0.1f})" else: return f"Power FIGARCH (power: {self.power:0.1f})"
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: eps_half = np.sqrt(np.finfo(np.float64).eps) v = np.mean(abs(resids) ** self.power) bounds = [(0.0, 10.0 * float(v))] bounds.extend([(0.0, 0.5)] * self.p) # phi bounds.extend([(0.0, 1.0 - eps_half)]) # d bounds.extend([(0.0, 1.0 - eps_half)] * self.q) # beta return bounds
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: # omega > 0 <- 1 # 0 <= d <= 1 <- 2 # 0 <= phi <= (1 - d) / 2 <- 2 # 0 <= beta <= d + phi <- 2 a = np.array( [ [1, 0, 0, 0], [0, 1, 0, 0], [0, -2, -1, 0], [0, 0, 1, 0], [0, 0, -1, 0], [0, 0, 0, 1], [0, 1, 1, -1], ] ) b = np.array([0, 0, -1, 0, -1, 0, 0]) if not self.q: a = a[:-2, :-1] b = b[:-2] if not self.p: # Drop column 1 and rows 1 and 2 a = np.delete(a, (1,), axis=1) a = np.delete(a, (1, 2), axis=0) b = np.delete(b, (1, 2)) return a, b
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: # fresids is abs(resids) ** power power = self.power fresids = np.abs(resids) ** power p, q, truncation = self.p, self.q, self.truncation nobs = resids.shape[0] rec.figarch_recursion( parameters, fresids, sigma2, p, q, nobs, truncation, backcast, var_bounds ) inv_power = 2.0 / power sigma2 **= inv_power return sigma2
[docs] def backcast_transform( self, backcast: float | Float64Array ) -> float | Float64Array: backcast = super().backcast_transform(backcast) return np.sqrt(backcast) ** self.power
[docs] def backcast(self, resids: Float64Array) -> float | Float64Array: power = self.power tau = min(75, resids.shape[0]) w = 0.94 ** np.arange(tau) w = w / sum(w) backcast = float(np.sum((abs(resids[:tau]) ** power) * w)) return backcast
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: parameters = ensure1d(parameters, "parameters", False) truncation = self.truncation p, q, power = self.p, self.q, self.power lam = rec.figarch_weights(parameters[1:], p, q, truncation) lam_rev = lam[::-1] errors = rng(truncation + nobs + burn) if initial_value is None: persistence = np.sum(lam) beta = parameters[-1] if q else 0.0 initial_value = parameters[0] if beta < 1: initial_value /= 1 - beta if persistence < 1: initial_value /= 1 - persistence if persistence >= 1.0 or beta >= 1.0: warn(initial_value_warning, InitialValueWarning) assert initial_value is not None sigma2 = np.empty(truncation + nobs + burn) data = np.empty(truncation + nobs + burn) fsigma = np.empty(truncation + nobs + burn) fdata = np.empty(truncation + nobs + burn) fsigma[:truncation] = initial_value sigma2[:truncation] = initial_value ** (2.0 / power) data[:truncation] = np.sqrt(sigma2[:truncation]) * errors[:truncation] fdata[:truncation] = abs(data[:truncation]) ** power omega = parameters[0] beta = parameters[-1] if q else 0 omega_tilde = omega / (1 - beta) for t in range(truncation, truncation + nobs + burn): fsigma[t] = omega_tilde + lam_rev.dot(fdata[t - truncation : t]) sigma2[t] = fsigma[t] ** (2.0 / power) data[t] = errors[t] * np.sqrt(sigma2[t]) fdata[t] = abs(data[t]) ** power return data[truncation + burn :], sigma2[truncation + burn :]
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: truncation = self.truncation ds = [0.2, 0.5, 0.7] phi_ratio = [0.2, 0.5, 0.8] if self.p else [0] beta_ratio = [0.1, 0.5, 0.9] if self.q else [0] power = self.power target = np.mean(abs(resids) ** power) scale = np.mean(resids**2) / (target ** (2.0 / power)) target *= scale ** (power / 2) all_starting_vals = [] for d in ds: for pr in phi_ratio: phi = (1 - d) / 2 * pr for br in beta_ratio: beta = (d + phi) * br temp = [phi, d, beta] lam = rec.figarch_weights(np.array(temp), 1, 1, truncation) omega = (1 - beta) * target * (1 - np.sum(lam)) all_starting_vals.append((omega, phi, d, beta)) distinct_svs = set(all_starting_vals) starting_vals = np.array(list(distinct_svs)) if not self.q: starting_vals = starting_vals[:, :-1] if not self.p: starting_vals = np.c_[starting_vals[:, [0]], starting_vals[:, 2:]] var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) llfs = np.zeros(len(starting_vals)) for i, sv in enumerate(starting_vals): llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) loc = np.argmax(llfs) return starting_vals[int(loc)]
[docs] def parameter_names(self) -> list[str]: names = ["omega"] if self.p: names += ["phi"] names += ["d"] if self.q: names += ["beta"] return names
def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: if horizon == 1: return if method == "analytic" and self.power != 2.0: raise ValueError( "Analytic forecasts not available for horizon > 1 when power != 2" ) return def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) if horizon == 1: return VarianceForecast(forecasts) truncation = self.truncation p, q = self.p, self.q lam = rec.figarch_weights(parameters[1:], p, q, truncation) lam_rev = lam[::-1] t = resids.shape[0] omega = parameters[0] beta = parameters[-1] if q else 0.0 omega_tilde = omega / (1 - beta) temp_forecasts = np.empty(truncation + horizon) resids2 = resids**2 for i in range(start, t): fcast_loc = i - start available = i + 1 - max(0, i - truncation + 1) temp_forecasts[truncation - available : truncation] = resids2[ max(0, i - truncation + 1) : i + 1 ] if available < truncation: temp_forecasts[: truncation - available] = backcast for h in range(horizon): lagged_forecasts = temp_forecasts[h : truncation + h] temp_forecasts[truncation + h] = omega_tilde + lam_rev.dot( lagged_forecasts ) forecasts[fcast_loc, :] = temp_forecasts[truncation:] return VarianceForecast(forecasts) def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) t = resids.shape[0] paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) power = self.power truncation = self.truncation p, q = self.p, self.q lam = rec.figarch_weights(parameters[1:], p, q, truncation) lam_rev = lam[::-1] t = resids.shape[0] omega = parameters[0] beta = parameters[-1] if q else 0.0 omega_tilde = omega / (1 - beta) fpath = np.empty((simulations, truncation + horizon)) fresids = np.abs(resids) ** power for i in range(start, t): std_shocks = rng((simulations, horizon)) available = i + 1 - max(0, i - truncation + 1) fpath[:, truncation - available : truncation] = fresids[ max(0, i + 1 - truncation) : i + 1 ] if available < truncation: fpath[:, : (truncation - available)] = backcast for h in range(horizon): # 1. Forecast transformed variance lagged_forecasts = fpath[:, h : truncation + h] temp = omega_tilde + lagged_forecasts.dot(lam_rev) # 2. Transform variance sigma2 = temp ** (2.0 / power) # 3. Simulate new residual path_loc = i - start shocks[path_loc, :, h] = std_shocks[:, h] * np.sqrt(sigma2) paths[path_loc, :, h] = sigma2 forecasts[path_loc, h] = sigma2.mean() # 4. Transform new residual fpath[:, truncation + h] = np.abs(shocks[path_loc, :, h]) ** power return VarianceForecast(forecasts, paths, shocks)
[docs] class APARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor): r""" Asymmetric Power ARCH (APARCH) volatility process Parameters ---------- p : int Order of the symmetric innovation. Must satisfy p>=o. o : int Order of the asymmetric innovation. Must satisfy o<=p. q : int Order of the lagged (transformed) conditional variance delta : float, optional Value to use for a fixed delta in the APARCH model. If not provided, the value of delta is jointly estimated with other model parameters. User provided delta is restricted to lie in (0.05, 4.0). common_asym : bool, optional Restrict all asymmetry terms to share the same asymmetry parameter. If False (default), then there are no restrictions on the ``o`` asymmetry parameters. Examples -------- >>> from arch.univariate import APARCH Symmetric Power ARCH(1,1) >>> aparch = APARCH(p=1, q=1) Standard APARCH process >>> aparch = APARCH(p=1, o=1, q=1) Fixed power parameters >>> aparch = APARCH(p=1, o=1, q=1, delta=1.3) Notes ----- In this class of processes, the variance dynamics are .. math:: \sigma_{t}^{\delta}=\omega +\sum_{i=1}^{p}\alpha_{i} \left(\left|\epsilon_{t-i}\right| -\gamma_{i}I_{[o\geq i]}\epsilon_{t-i}\right)^{\delta} +\sum_{k=1}^{q}\beta_{k}\sigma_{t-k}^{\delta} If ``common_asym`` is ``True``, then all of :math:`\gamma_i` are restricted to have a common value. """ def __init__( self, p: int = 1, o: int = 1, q: int = 1, delta: float | None = None, common_asym: bool = False, ) -> None: super().__init__() self.p: int = int(p) self.o: int = int(o) self.q: int = int(q) self._est_delta = delta is None self._common_asym = bool(common_asym) and self.o > 0 self._delta = float(np.nan) if not self._est_delta: try: assert delta is not None self._delta = float(delta) except (ValueError, TypeError): raise TypeError("delta must be convertible to a float.") if not 0.05 < delta < 4: raise ValueError("delta must be between 0.05 and 4") self._delta = delta if p < 1 or o < 0 or q < 0: raise ValueError("All lags lengths must be non-negative, and p >= 1") if o > p: raise ValueError("o must be <= p.") self._name = "APARCH" if o > 0 else "Power ARCH" self._sigma_delta = np.empty(0) self._parameters = np.empty(1 + self.p + self.o + self.q + 1) o = 1 if self._common_asym else o self._num_params = 1 + p + o + q + int(self._est_delta) self._repack = self._common_asym or not self._est_delta @property def delta(self) -> float: """The value of delta in the model. NaN is delta is estimated.""" return self._delta @property def common_asym(self) -> bool: """The value of delta in the model. NaN is delta is estimated.""" return self._common_asym def _repack_parameters(self, parameters: Float64Array) -> Float64Array: if not self._repack: return parameters p, o, q = self.p, self.o, self.q _parameters = self._parameters if not self._common_asym: # Must only have fixed delta _parameters[: p + o + q + 1] = parameters else: _parameters[: p + 1] = parameters[: p + 1] _parameters[p + 1 : p + o + 1] = parameters[p + 1] _parameters[p + o + 1 : p + o + q + 1] = parameters[p + 2 : p + q + 2] _parameters[-1] = parameters[-1] if self._est_delta else self.delta return _parameters
[docs] def compute_variance( self, parameters: Float64Array, resids: Float64Array, sigma2: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, ) -> Float64Array: abs_resids = np.abs(resids) if self._sigma_delta.shape[0] != resids.shape[0]: self._sigma_delta = np.empty(resids.shape[0]) sigma_delta = self._sigma_delta p, o, q = self.p, self.o, self.q nobs = resids.shape[0] _parameters = self._repack_parameters(parameters) rec.aparch_recursion( _parameters, resids, abs_resids, sigma2, sigma_delta, p, o, q, nobs, backcast, var_bounds, ) return sigma2
[docs] def bounds(self, resids: Float64Array) -> list[tuple[float, float]]: v = max(float(np.mean(abs(resids) ** 0.5)), float(np.mean(resids**2))) bounds = [(0.0, 10.0 * float(v))] bounds.extend([(0.0, 1.0)] * self.p) o = 1 if self._common_asym else self.o bounds.extend([(-0.9997, 0.9997)] * o) bounds.extend([(0.0, 1.0)] * self.q) if self._est_delta: bounds.append((0.05, 4.0)) return bounds
[docs] def starting_values(self, resids: Float64Array) -> Float64Array: p, o, q = self.p, self.o, self.q alphas = [0.03, 0.05, 0.08, 0.15] alpha_beta = [0.8, 0.9, 0.95, 0.975, 0.99] gammas = [-0.5, 0, 0.5] if self.o > 0 else [0] deltas = [0.5, 1.2, 1.8] if self._est_delta else [self._delta] abgs = list(itertools.product(*[alphas, gammas, alpha_beta, deltas])) svs = [] var_bounds = self.variance_bounds(resids) backcast = self.backcast(resids) llfs = np.zeros(len(abgs)) est_delta = int(self._est_delta) for i, values in enumerate(abgs): alpha, gamma, ab, delta = values target = np.mean(abs(resids) ** delta) scale = np.mean(resids**2) / (target ** (2.0 / delta)) target *= scale ** (delta / 2) sv = (1.0 - ab) * target * np.ones(p + o + q + 1 + est_delta) sv[1 : 1 + p] = alpha / p ab -= alpha if o > 0: sv[1 + p : 1 + p + o] = gamma if q > 0: sv[1 + p + o : 1 + p + o + q] = ab / q if est_delta: sv[-1] = delta svs.append(sv) llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds) loc = np.argmax(llfs) sv = svs[int(loc)] if self._common_asym: sv = np.r_[sv[: p + 1 + (o > 0)], sv[p + o + 1 :]] return sv
[docs] def constraints(self) -> tuple[Float64Array, Float64Array]: p, o, q = self.p, self.o, self.q o = 1 if self._common_asym else o k_arch = p + o + q # alpha[i] > 0, p # -1 < gamma[i] < 1, 2*o # beta[i] > 0, q # sum(alpha) + sum(beta) < 1, 1 ndelta = 2 * int(self._est_delta) a = np.zeros((k_arch + o + 2 + ndelta, self._num_params)) for i in range(p + 1): a[i, i] = 1.0 for i in range(o): a[1 + p + i, 1 + p + i] = 1.0 a[1 + p + o + i, 1 + p + i] = -1.0 for i in range(q): a[1 + p + 2 * o, 1 + p + o + i] = 1.0 if self._est_delta: a[1 + p + 2 * o + q, 1 + p + o + q] = 1.0 a[1 + p + 2 * o + q + 1, 1 + p + o + q] = -1.0 a[-1, 1 : p + o + q + 1] = -1.0 a[-1, p + 1 : p + o + 1] = 0 b = np.zeros(k_arch + o + 2 + ndelta) # omega and alpha, > 0 b[p + 1 : p + o + 1] = -0.9997 # gamma > -.9997 b[p + o + 1 : p + 2 * o + 1] = -0.9997 # gamma < .9997 if self._est_delta: b[-3] = 0.05 # delta > 0.05 b[-2] = -4.0 # delta < 4 b[-1] = -1.0 # sum < 1 return a, b
[docs] def parameter_names(self) -> list[str]: names = _common_names(self.p, self.o, self.q) if self._common_asym: names = names[: self.p + 1] + ["gamma"] + names[1 + self.p + self.o :] if self._est_delta: names += ["delta"] return names
[docs] def simulate( self, parameters: Sequence[int | float] | ArrayLike1D, nobs: int, rng: RNGType, burn: int = 500, initial_value: None | float | Float64Array = None, ) -> tuple[Float64Array, Float64Array]: params = np.asarray(ensure1d(parameters, "parameters", False), dtype=float) params = self._repack_parameters(params) p, o, q = self.p, self.o, self.q errors = rng(nobs + burn) sigma2 = np.zeros(nobs + burn) sigma_delta = np.zeros(nobs + burn) data = np.zeros(nobs + burn) adata = np.zeros(nobs + burn) max_lag = np.max([p, q]) delta = params[-1] if initial_value is None: persistence = params[1 : p + 1].sum() persistence += params[1 + p + o : 1 + p + o + q].sum() if (1.0 - persistence) > 0: initial_value = params[0] / (1.0 - persistence) else: warn(initial_value_warning, InitialValueWarning) initial_value = params[0] sigma_delta[:max_lag] = initial_value sigma2[:max_lag] = initial_value ** (2.0 / delta) data[:max_lag] = np.sqrt(sigma2[:max_lag]) * errors[:max_lag] adata[:max_lag] = np.abs(data[:max_lag]) for t in range(max_lag, nobs + burn): sigma_delta[t] = params[0] for j in range(p): shock = adata[t - 1 - j] if o > j: shock -= params[1 + p + j] * data[t - 1 - j] sigma_delta[t] += params[1 + j] * shock**delta for j in range(q): sigma_delta[t] += params[1 + p + o + j] * sigma_delta[t - 1 - j] sigma2[t] = sigma_delta[t] ** (2.0 / delta) data[t] = errors[t] * np.sqrt(sigma2[t]) adata[t] = np.abs(data[t]) return data[burn:], sigma2[burn:]
def _simulate_paths( self, m: int, parameters: Float64Array, horizon: int, std_shocks: Float64Array, sigma_delta: Float64Array, shock: Float64Array, abs_shock: Float64Array, ) -> tuple[Float64Array, Float64Array, Float64Array]: if self._est_delta: delta = parameters[-1] else: delta = self._delta p, o, q = self.p, self.o, self.q omega = parameters[0] alpha = parameters[1 : p + 1] gamma = parameters[p + 1 : p + o + 1] beta = parameters[p + o + 1 : p + o + q + 1] for h in range(horizon): loc = h + m - 1 sigma_delta[:, h + m] = omega for j in range(p): _shock = abs_shock[:, loc - j] if self.o > j: _shock -= gamma[j] * shock[:, loc - j] sigma_delta[:, h + m] += alpha[j] * (_shock**delta) for j in range(q): sigma_delta[:, h + m] += beta[j] * sigma_delta[:, loc - j] shock[:, h + m] = std_shocks[:, h] * sigma_delta[:, h + m] ** (1.0 / delta) abs_shock[:, h + m] = np.abs(shock[:, h + m]) forecast_paths = sigma_delta[:, m:] ** (2.0 / delta) return np.asarray(np.mean(forecast_paths, 0)), forecast_paths, shock[:, m:] def _simulation_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, simulations: int, rng: RNGType, ) -> VarianceForecast: sigma2, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) parameters = self._repack_parameters(parameters) t = resids.shape[0] paths = np.empty((t - start, simulations, horizon)) shocks = np.empty((t - start, simulations, horizon)) delta = parameters[-1] m = np.max([self.p, self.q]) sigma_delta = np.zeros((simulations, m + horizon)) shock = np.zeros((simulations, m + horizon)) abs_shock = np.zeros((simulations, m + horizon)) for i in range(start, t): std_shocks = rng((simulations, horizon)) if i - m < 0: sigma_delta[:, :m] = backcast ** (delta / 2.0) shock[:, :m] = 0 abs_shock[:, :m] = backcast ** (1.0 / 2.0) # Use actual values where available count = i + 1 sigma_delta[:, m - count : m] = sigma2[:count] ** (delta / 2.0) shock[:, m - count : m] = resids[:count] abs_shock[:, m - count : m] = np.abs(resids[:count]) else: sigma_delta[:, :m] = sigma2[i - m + 1 : i + 1] ** (delta / 2.0) shock[:, :m] = resids[i - m + 1 : i + 1] abs_shock[:, :m] = np.abs(resids[i - m + 1 : i + 1]) f, p, s = self._simulate_paths( m, parameters, horizon, std_shocks, sigma_delta, shock, abs_shock, ) loc = i - start forecasts[loc, :], paths[loc], shocks[loc] = f, p, s return VarianceForecast(forecasts, paths, shocks) def _analytic_forecast( self, parameters: Float64Array, resids: Float64Array, backcast: float | Float64Array, var_bounds: Float64Array, start: int, horizon: int, ) -> VarianceForecast: _, forecasts = self._one_step_forecast( parameters, resids, backcast, var_bounds, horizon, start ) return VarianceForecast(forecasts) def _check_forecasting_method( self, method: ForecastingMethod, horizon: int ) -> None: if horizon == 1: return if method == "analytic": raise ValueError("Analytic forecasts not available for horizon > 1") return def __str__(self) -> str: descr = self.name + "(" for k, v in (("p", self.p), ("o", self.o), ("q", self.q)): if v > 0: descr += f"{k}: {v}, " if not self._est_delta: descr += f"delta: {self.delta:0.3f}, " if self.o > 0: descr += f"Common Asym: {self._common_asym}, " descr = descr[:-2] + ")" return descr