"""
Volatility processes for ARCH model estimation. All volatility processes must
inherit from :class:`VolatilityProcess` and provide the same methods with the
same inputs.
"""
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from collections.abc import Sequence
import itertools
from typing import TYPE_CHECKING, cast
from warnings import warn
import numpy as np
from numpy.random import RandomState
from scipy.special import gammaln
from arch.typing import (
ArrayLike1D,
Float64Array,
ForecastingMethod,
Int32Array,
RNGType,
)
from arch.univariate.distribution import Normal
from arch.utility.array import AbstractDocStringInheritor, ensure1d
from arch.utility.exceptions import InitialValueWarning, initial_value_warning
if TYPE_CHECKING:
from arch.univariate import recursions_python as rec
else:
try:
from arch.univariate import recursions as rec
except ImportError:
from arch.univariate import recursions_python as rec
__all__ = [
"GARCH",
"ARCH",
"HARCH",
"ConstantVariance",
"EWMAVariance",
"RiskMetrics2006",
"EGARCH",
"FIGARCH",
"FixedVariance",
"BootstrapRng",
"MIDASHyperbolic",
"VolatilityProcess",
]
def _common_names(p: int, o: int, q: int) -> list[str]:
names = ["omega"]
names.extend(["alpha[" + str(i + 1) + "]" for i in range(p)])
names.extend(["gamma[" + str(i + 1) + "]" for i in range(o)])
names.extend(["beta[" + str(i + 1) + "]" for i in range(q)])
return names
class BootstrapRng:
"""
Simple fake RNG used to transform bootstrap-based forecasting into a standard
simulation forecasting problem
Parameters
----------
std_resid : ndarray
Array containing standardized residuals
start : int
Location of first forecast
random_state : RandomState, optional
NumPy RandomState instance
"""
def __init__(
self,
std_resid: Float64Array,
start: int,
random_state: RandomState | None = None,
) -> None:
if start <= 0 or start > std_resid.shape[0]:
raise ValueError("start must be > 0 and <= len(std_resid).")
self.std_resid: Float64Array = std_resid
self.start: int = start
self._index = start
if random_state is None:
self._random_state = RandomState()
elif isinstance(random_state, RandomState):
self._random_state = random_state
else:
raise TypeError("random_state must be a NumPy RandomState instance.")
@property
def random_state(self) -> RandomState:
return self._random_state
def rng(self) -> RNGType:
def _rng(size: int | tuple[int, ...]) -> Float64Array:
if self._index >= self.std_resid.shape[0]:
raise IndexError("not enough data points.")
index = self._random_state.random_sample(size)
int_index = np.floor((self._index + 1) * index)
int_index = int_index.astype(np.int64)
self._index += 1
return self.std_resid[int_index]
return _rng
def ewma_recursion(
lam: float, resids: Float64Array, sigma2: Float64Array, nobs: int, backcast: float
) -> Float64Array:
"""
Compute variance recursion for EWMA/RiskMetrics Variance
Parameters
----------
lam : float
Smoothing parameter
resids : ndarray
Residuals to use in the recursion
sigma2 : ndarray
Conditional variances with same shape as resids
nobs : int
Length of resids
backcast : float
Value to use when initializing the recursion
"""
# Throw away bounds
var_bounds = np.ones((nobs, 2)) * np.array([-1.0, 1.7e308])
rec.garch_recursion(
np.array([0.0, 1.0 - lam, lam]),
resids**2.0,
resids,
sigma2,
1,
0,
1,
nobs,
backcast,
var_bounds,
)
return sigma2
class VarianceForecast:
_forecasts = None
_forecast_paths = None
def __init__(
self,
forecasts: Float64Array,
forecast_paths: Float64Array | None = None,
shocks: Float64Array | None = None,
) -> None:
self._forecasts = forecasts
self._forecast_paths = forecast_paths
self._shocks = shocks
@property
def forecasts(self) -> Float64Array | None:
return self._forecasts
@property
def forecast_paths(self) -> Float64Array | None:
return self._forecast_paths
@property
def shocks(self) -> Float64Array | None:
return self._shocks
[docs]
class VolatilityProcess(metaclass=ABCMeta):
"""
Abstract base class for ARCH models. Allows the conditional mean model to be
specified separately from the conditional variance, even though parameters
are estimated jointly.
"""
_updatable: bool = True
def __init__(self) -> None:
self._num_params = 0
self._name = ""
self.closed_form: bool = False
self._normal = Normal()
self._min_bootstrap_obs = 100
self._start = 0
self._stop = -1
self._volatility_updater: rec.VolatilityUpdater | None = None
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return self.__str__() + ", id: " + hex(id(self))
@property
def name(self) -> str:
"""The name of the volatility process"""
return self._name
@property
def start(self) -> int:
"""Index to use to start variance subarray selection"""
return self._start
@start.setter
def start(self, value: int) -> None:
self._start = value
@property
def stop(self) -> int:
"""Index to use to stop variance subarray selection"""
return self._stop
@stop.setter
def stop(self, value: int) -> None:
self._stop = value
@property
def num_params(self) -> int:
"""The number of parameters in the model"""
return self._num_params
@property
def updateable(self) -> bool:
"""Flag indicating that the volatility process supports update"""
return self._updatable
@property
def volatility_updater(self) -> rec.VolatilityUpdater:
"""
Get the volatility updater associated with the volatility process
Returns
-------
VolatilityUpdater
The updater class
Raises
------
NotImplementedError
If the process is not updateable
"""
if not self._updatable or self._volatility_updater is None:
raise NotImplementedError("Subclasses may optionally implement")
assert self._volatility_updater is not None
return self._volatility_updater
[docs]
def update(
self,
index: int,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> float:
"""
Compute the variance for a single observation
Parameters
----------
index : int
The numerical index of the variance to compute
parameters : ndarray
The variance model parameters
resids :
The residual array. Only uses ``resids[:index]`` when computing
``sigma2[index]``
sigma2 : ndarray
The array containing the variances. Only uses ``sigma2[:index]``
when computing ``sigma2[index]``. The computed value is stored
in ``sigma2[index]``.
backcast : {float, ndarray}
Value to use when initializing the recursion
var_bounds : ndarray
Array containing columns of lower and upper bounds
Returns
-------
float
The variance computed for location ``index``
"""
raise NotImplementedError("Subclasses may optionally implement")
@abstractmethod
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
"""
Verify the requested forecasting method as valid for the specification
Parameters
----------
method : str
Forecasting method
horizon : int
Forecast horizon
Raises
------
NotImplementedError
* If method is not known or not supported
"""
def _one_step_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
horizon: int,
start_index: int,
) -> tuple[Float64Array, Float64Array]:
"""
One-step ahead forecast
Parameters
----------
parameters : ndarray
Parameters required to forecast the volatility model
resids : ndarray
Residuals to use in the recursion
backcast : float
Value to use when initializing the recursion
var_bounds : ndarray
Array containing columns of lower and upper bounds
horizon : int
Forecast horizon. Must be 1 or larger. Forecasts are produced
for horizons in [1, horizon].
Returns
-------
sigma2 : ndarray
t element array containing the one-step ahead forecasts
forecasts : ndarray
t by horizon array containing the one-step ahead forecasts in the first
location
"""
t = resids.shape[0]
_resids: Float64Array = np.concatenate((resids, np.array([0.0])))
_var_bounds: Float64Array = np.concatenate(
(var_bounds, np.array([[0, np.inf]]))
)
sigma2 = np.zeros(t + 1)
self.compute_variance(parameters, _resids, sigma2, backcast, _var_bounds)
forecasts = np.zeros((t - start_index, horizon))
forecasts[:, 0] = sigma2[start_index + 1 :]
sigma2 = sigma2[:-1]
return sigma2, forecasts
@abstractmethod
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
"""
Analytic multi-step volatility forecasts from the model
Parameters
----------
parameters : ndarray
Parameters required to forecast the volatility model
resids : ndarray
Residuals to use in the recursion
backcast : float
Value to use when initializing the recursion
var_bounds : ndarray
Array containing columns of lower and upper bounds
start : int
Index of the first observation to use as the starting point for
the forecast. Default is 0.
horizon : int
Forecast horizon. Must be 1 or larger. Forecasts are produced
for horizons in [1, horizon].
Returns
-------
forecasts : VarianceForecast
Class containing the variance forecasts, and, if using simulation
or bootstrap, the simulated paths.
"""
@abstractmethod
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
"""
Simulation-based volatility forecasts from the model
Parameters
----------
parameters : ndarray
Parameters required to forecast the volatility model
resids : ndarray
Residuals to use in the recursion
backcast : float
Value to use when initializing the recursion. The backcast is
assumed to be appropriately transformed so that it can be
used without modification, e.g., log of the variance backcast
in an EGARCH model.
var_bounds : ndarray
Array containing columns of lower and upper bounds
start : int
Index of the first observation to use as the starting point for
the forecast. Default is 0.
horizon : int
Forecast horizon. Must be 1 or larger. Forecasts are produced
for horizons in [1, horizon].
simulations : int
Number of simulations to run when computing the forecast using
either simulation or bootstrap.
rng : callable
Callable random number generator required if method is
'simulation'. Must take a single shape input and return random
samples numbers with that shape.
Returns
-------
forecasts : VarianceForecast
Class containing the variance forecasts, and, if using simulation
or bootstrap, the simulated paths.
"""
def _bootstrap_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
random_state: RandomState | None,
) -> VarianceForecast:
"""
Simulation-based volatility forecasts using model residuals
Parameters
----------
parameters : ndarray
Parameters required to forecast the volatility model
resids : ndarray
Residuals to use in the recursion
backcast : {float, ndarray}
Value to use when initializing the recursion
var_bounds : ndarray
Array containing columns of lower and upper bounds
start : int
Index of the first observation to use as the starting point for
the forecast. Default is 0.
horizon : int
Forecast horizon. Must be 1 or larger. Forecasts are produced
for horizons in [1, horizon].
simulations : int
Number of simulations to run when computing the forecast using
either simulation or bootstrap.
random_state : {RandomState, None}
NumPy RandomState instance to use in the BootstrapRng
Returns
-------
forecasts : VarianceForecast
Class containing the variance forecasts, and, if using simulation
or bootstrap, the simulated paths.
"""
sigma2 = np.empty_like(resids)
self.compute_variance(parameters, resids, sigma2, backcast, var_bounds)
std_resid = resids / np.sqrt(sigma2)
if start < self._min_bootstrap_obs:
raise ValueError(
"start must include more than {} "
"observations".format(self._min_bootstrap_obs)
)
rng = BootstrapRng(std_resid, start, random_state=random_state).rng()
return self._simulation_forecast(
parameters, resids, backcast, var_bounds, start, horizon, simulations, rng
)
[docs]
def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array:
"""
Construct loose bounds for conditional variances.
These bounds are used in parameter estimation to ensure
that the log-likelihood does not produce NaN values.
Parameters
----------
resids : ndarray
Approximate residuals to use to compute the lower and upper bounds
on the conditional variance
power : float, optional
Power used in the model. 2.0, the default corresponds to standard
ARCH models that evolve in squares.
Returns
-------
var_bounds : ndarray
Array containing columns of lower and upper bounds with the same
number of elements as resids
"""
nobs = resids.shape[0]
tau = min(75, nobs)
w = 0.94 ** np.arange(tau)
w = w / sum(w)
var_bound = np.zeros(nobs)
initial_value = w.dot(resids[:tau] ** 2.0)
ewma_recursion(0.94, resids, var_bound, resids.shape[0], initial_value)
var_bounds = np.vstack((var_bound / 1e6, var_bound * 1e6)).T
var = resids.var()
min_upper_bound = 1 + (resids**2.0).max()
lower_bound, upper_bound = var / 1e8, 1e7 * (1 + (resids**2.0).max())
var_bounds[var_bounds[:, 0] < lower_bound, 0] = lower_bound
var_bounds[var_bounds[:, 1] < min_upper_bound, 1] = min_upper_bound
var_bounds[var_bounds[:, 1] > upper_bound, 1] = upper_bound
if power != 2.0:
var_bounds **= power / 2.0
return np.ascontiguousarray(var_bounds)
[docs]
@abstractmethod
def starting_values(self, resids: Float64Array) -> Float64Array:
"""
Returns starting values for the ARCH model
Parameters
----------
resids : ndarray
Array of (approximate) residuals to use when computing starting
values
Returns
-------
sv : ndarray
Array of starting values
"""
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
"""
Construct values for backcasting to start the recursion
Parameters
----------
resids : ndarray
Vector of (approximate) residuals
Returns
-------
backcast : float
Value to use in backcasting in the volatility recursion
"""
tau = min(75, resids.shape[0])
w = 0.94 ** np.arange(tau)
w = w / sum(w)
return float(np.sum((resids[:tau] ** 2.0) * w))
[docs]
@abstractmethod
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
"""
Returns bounds for parameters
Parameters
----------
resids : ndarray
Vector of (approximate) residuals
Returns
-------
bounds : list[tuple[float,float]]
List of bounds where each element is (lower, upper).
"""
[docs]
@abstractmethod
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
"""
Compute the variance for the ARCH model
Parameters
----------
parameters : ndarray
Model parameters
resids : ndarray
Vector of mean zero residuals
sigma2 : ndarray
Array with same size as resids to store the conditional variance
backcast : {float, ndarray}
Value to use when initializing ARCH recursion. Can be an ndarray
when the model contains multiple components.
var_bounds : ndarray
Array containing columns of lower and upper bounds
"""
[docs]
@abstractmethod
def constraints(self) -> tuple[Float64Array, Float64Array]:
"""
Construct parameter constraints arrays for parameter estimation
Returns
-------
A : ndarray
Parameters loadings in constraint. Shape is number of constraints
by number of parameters
b : ndarray
Constraint values, one for each constraint
Notes
-----
Values returned are used in constructing linear inequality
constraints of the form A.dot(parameters) - b >= 0
"""
[docs]
def forecast(
self,
parameters: ArrayLike1D,
resids: Float64Array,
backcast: Float64Array | float,
var_bounds: Float64Array,
start: int | None = None,
horizon: int = 1,
method: ForecastingMethod = "analytic",
simulations: int = 1000,
rng: RNGType | None = None,
random_state: RandomState | None = None,
) -> VarianceForecast:
"""
Forecast volatility from the model
Parameters
----------
parameters : {ndarray, Series}
Parameters required to forecast the volatility model
resids : ndarray
Residuals to use in the recursion
backcast : float
Value to use when initializing the recursion
var_bounds : ndarray, 2-d
Array containing columns of lower and upper bounds
start : {None, int}
Index of the first observation to use as the starting point for
the forecast. Default is len(resids).
horizon : int
Forecast horizon. Must be 1 or larger. Forecasts are produced
for horizons in [1, horizon].
method : {'analytic', 'simulation', 'bootstrap'}
Method to use when producing the forecast. The default is analytic.
simulations : int
Number of simulations to run when computing the forecast using
either simulation or bootstrap.
rng : callable
Callable random number generator required if method is
'simulation'. Must take a single shape input and return random
samples numbers with that shape.
random_state : RandomState, optional
NumPy RandomState instance to use when method is 'bootstrap'
Returns
-------
forecasts : VarianceForecast
Class containing the variance forecasts, and, if using simulation
or bootstrap, the simulated paths.
Raises
------
NotImplementedError
* If method is not supported
ValueError
* If the method is not known
Notes
-----
The analytic ``method`` is not supported for all models. Attempting
to use this method when not available will raise a ValueError.
"""
parameters = np.asarray(parameters)
method_name = method.lower()
if method_name not in ("analytic", "simulation", "bootstrap"):
raise ValueError(f"{method} is not a known forecasting method")
if not isinstance(horizon, (int, np.integer)) or horizon < 1:
raise ValueError("horizon must be an integer >= 1.")
self._check_forecasting_method(cast(ForecastingMethod, method_name), horizon)
start = len(resids) - 1 if start is None else start
if method_name == "analytic":
return self._analytic_forecast(
parameters, resids, backcast, var_bounds, start, horizon
)
elif method == "simulation":
# TODO: This looks like a design flaw.It is optional above but then must
# be present. This happens because the caller of this function is
# expected to know when to provide the rng or not
assert rng is not None
return self._simulation_forecast(
parameters,
resids,
backcast,
var_bounds,
start,
horizon,
simulations,
rng,
)
else:
if start < 10 or (horizon / start) >= 0.2:
raise ValueError(
"Bootstrap forecasting requires at least 10 initial "
"observations, and the ratio of horizon-to-start < 20%."
)
return self._bootstrap_forecast(
parameters,
resids,
backcast,
var_bounds,
start,
horizon,
simulations,
random_state,
)
[docs]
@abstractmethod
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
"""
Simulate data from the model
Parameters
----------
parameters : {ndarray, Series}
Parameters required to simulate the volatility model
nobs : int
Number of data points to simulate
rng : callable
Callable function that takes a single integer input and returns
a vector of random numbers
burn : int, optional
Number of additional observations to generate when initializing
the simulation
initial_value : {float, ndarray}, optional
Scalar or array of initial values to use when initializing the
simulation
Returns
-------
resids : ndarray
The simulated residuals
variance : ndarray
The simulated variance
"""
def _gaussian_loglikelihood(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> float:
"""
Private implementation of a Gaussian log-likelihood for use in constructing
starting values or other quantities that do not depend on the distribution
used by the model.
"""
sigma2 = np.zeros_like(resids)
self.compute_variance(parameters, resids, sigma2, backcast, var_bounds)
return float(self._normal.loglikelihood([], resids, sigma2))
[docs]
@abstractmethod
def parameter_names(self) -> list[str]:
"""
Names of model parameters
Returns
-------
names : list (str)
Variables names
"""
[docs]
class ConstantVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
Constant volatility process
Notes
-----
Model has the same variance in all periods
"""
def __init__(self) -> None:
super().__init__()
self._num_params = 1
self._name = "Constant Variance"
self.closed_form: bool = True
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
sigma2[:] = parameters[0]
return sigma2
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
return np.array([resids.var()])
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
errors = rng(nobs + burn)
sigma2 = np.ones(nobs + burn) * parameters[0]
data = np.sqrt(sigma2) * errors
return data[burn:], sigma2[burn:]
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
return np.ones((1, 1)), np.zeros(1)
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
return float(resids.var())
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
v = float(resids.var())
return [(v / 100000.0, 10.0 * (v + float(resids.mean()) ** 2.0))]
[docs]
def parameter_names(self) -> list[str]:
return ["sigma2"]
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
t = resids.shape[0]
forecasts = np.full((t - start, horizon), np.nan)
forecasts[:, :] = parameters[0]
forecast_paths = None
return VarianceForecast(forecasts, forecast_paths)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
t = resids.shape[0]
forecasts = np.empty((t - start, horizon))
forecast_paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
for i in range(t - start):
shocks[i, :, :] = np.sqrt(parameters[0]) * rng((simulations, horizon))
forecasts[:, :] = parameters[0]
forecast_paths[:, :, :] = parameters[0]
return VarianceForecast(forecasts, forecast_paths, shocks)
[docs]
class GARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
GARCH and related model estimation
The following models can be specified using GARCH:
* ARCH(p)
* GARCH(p,q)
* GJR-GARCH(p,o,q)
* AVARCH(p)
* AVGARCH(p,q)
* TARCH(p,o,q)
* Models with arbitrary, pre-specified powers
Parameters
----------
p : int
Order of the symmetric innovation
o : int
Order of the asymmetric innovation
q : int
Order of the lagged (transformed) conditional variance
power : float, optional
Power to use with the innovations, abs(e) ** power. Default is 2.0, which
produces ARCH and related models. Using 1.0 produces AVARCH and related
models. Other powers can be specified, although these should be strictly
positive, and usually larger than 0.25.
Examples
--------
>>> from arch.univariate import GARCH
Standard GARCH(1,1)
>>> garch = GARCH(p=1, q=1)
Asymmetric GJR-GARCH process
>>> gjr = GARCH(p=1, o=1, q=1)
Asymmetric TARCH process
>>> tarch = GARCH(p=1, o=1, q=1, power=1.0)
Notes
-----
In this class of processes, the variance dynamics are
.. math::
\sigma_{t}^{\lambda}=\omega
+ \sum_{i=1}^{p}\alpha_{i}\left|\epsilon_{t-i}\right|^{\lambda}
+\sum_{j=1}^{o}\gamma_{j}\left|\epsilon_{t-j}\right|^{\lambda}
I\left[\epsilon_{t-j}<0\right]+\sum_{k=1}^{q}\beta_{k}\sigma_{t-k}^{\lambda}
where :math:`\lambda` is the ``power``.
"""
def __init__(self, p: int = 1, o: int = 0, q: int = 1, power: float = 2.0) -> None:
super().__init__()
self.p: int = int(p)
self.o: int = int(o)
self.q: int = int(q)
self.power: float = power
self._num_params = 1 + p + o + q
if p < 0 or o < 0 or q < 0:
raise ValueError("All lags lengths must be non-negative")
if p == 0 and o == 0:
raise ValueError("One of p or o must be strictly positive")
if power <= 0.0:
raise ValueError(
"power must be strictly positive, usually larger than 0.25"
)
self._name = self._generate_name()
self._volatility_updater = rec.GARCHUpdater(self.p, self.o, self.q, self.power)
def __str__(self) -> str:
descr = self.name
if self.power != 1.0 and self.power != 2.0:
descr = descr[:-1] + ", "
else:
descr += "("
for k, v in (("p", self.p), ("o", self.o), ("q", self.q)):
if v > 0:
descr += k + ": " + str(v) + ", "
descr = descr[:-2] + ")"
return descr
[docs]
def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array:
return super().variance_bounds(resids, self.power)
def _generate_name(self) -> str:
p, o, q, power = self.p, self.o, self.q, self.power # noqa: F841
if power == 2.0:
if o == 0 and q == 0:
return "ARCH"
elif o == 0:
return "GARCH"
else:
return "GJR-GARCH"
elif power == 1.0:
if o == 0 and q == 0:
return "AVARCH"
elif o == 0:
return "AVGARCH"
else:
return "TARCH/ZARCH"
else:
if o == 0 and q == 0:
return f"Power ARCH (power: {self.power:0.1f})"
elif o == 0:
return f"Power GARCH (power: {self.power:0.1f})"
else:
return f"Asym. Power GARCH (power: {self.power:0.1f})"
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
v = float(np.mean(abs(resids) ** self.power))
bounds = [(1e-8 * v, 10.0 * float(v))]
bounds.extend([(0.0, 1.0)] * self.p)
for i in range(self.o):
if i < self.p:
bounds.append((-1.0, 2.0))
else:
bounds.append((0.0, 2.0))
bounds.extend([(0.0, 1.0)] * self.q)
return bounds
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
p, o, q = self.p, self.o, self.q
k_arch = p + o + q
# alpha[i] >0
# alpha[i] + gamma[i] > 0 for i<=p, otherwise gamma[i]>0
# beta[i] >0
# sum(alpha) + 0.5 sum(gamma) + sum(beta) < 1
a = np.zeros((k_arch + 2, k_arch + 1))
for i in range(k_arch + 1):
a[i, i] = 1.0
for i in range(o):
if i < p:
a[i + p + 1, i + 1] = 1.0
a[k_arch + 1, 1:] = -1.0
a[k_arch + 1, p + 1 : p + o + 1] = -0.5
b = np.zeros(k_arch + 2)
b[k_arch + 1] = -1.0
return a, b
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
# fresids is abs(resids) ** power
# sresids is I(resids<0)
power = self.power
fresids = np.abs(resids) ** power
sresids = np.sign(resids)
p, o, q = self.p, self.o, self.q
nobs = resids.shape[0]
rec.garch_recursion(
parameters, fresids, sresids, sigma2, p, o, q, nobs, backcast, var_bounds
)
inv_power = 2.0 / power
sigma2 **= inv_power
return sigma2
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
power = self.power
tau = min(75, resids.shape[0])
w = 0.94 ** np.arange(tau)
w = w / sum(w)
backcast = np.sum((abs(resids[:tau]) ** power) * w)
return float(backcast)
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
p, o, q, power = self.p, self.o, self.q, self.power
errors = rng(nobs + burn)
if initial_value is None:
scale = np.ones_like(parameters)
scale[p + 1 : p + o + 1] = 0.5
persistence = np.sum(parameters[1:] * scale[1:])
if (1.0 - persistence) > 0:
initial_value = parameters[0] / (1.0 - persistence)
else:
warn(initial_value_warning, InitialValueWarning)
initial_value = parameters[0]
sigma2 = np.zeros(nobs + burn)
data = np.zeros(nobs + burn)
fsigma = np.zeros(nobs + burn)
fdata = np.zeros(nobs + burn)
max_lag = np.max([p, o, q])
fsigma[:max_lag] = initial_value
sigma2[:max_lag] = initial_value ** (2.0 / power)
data[:max_lag] = np.sqrt(sigma2[:max_lag]) * errors[:max_lag]
fdata[:max_lag] = abs(data[:max_lag]) ** power
for t in range(max_lag, nobs + burn):
loc = 0
fsigma[t] = parameters[loc]
loc += 1
for j in range(p):
fsigma[t] += parameters[loc] * fdata[t - 1 - j]
loc += 1
for j in range(o):
fsigma[t] += parameters[loc] * fdata[t - 1 - j] * (data[t - 1 - j] < 0)
loc += 1
for j in range(q):
fsigma[t] += parameters[loc] * fsigma[t - 1 - j]
loc += 1
sigma2[t] = fsigma[t] ** (2.0 / power)
data[t] = errors[t] * np.sqrt(sigma2[t])
fdata[t] = abs(data[t]) ** power
return data[burn:], sigma2[burn:]
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
p, o, q = self.p, self.o, self.q
power = self.power
alphas = [0.01, 0.05, 0.1, 0.2]
gammas = alphas
abg = [0.5, 0.7, 0.9, 0.98]
abgs = list(itertools.product(*[alphas, gammas, abg]))
target = np.mean(abs(resids) ** power)
scale = np.mean(resids**2) / (target ** (2.0 / power))
target *= scale ** (power / 2)
svs = []
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
llfs = np.zeros(len(abgs))
for i, values in enumerate(abgs):
alpha, gamma, agb = values
sv = (1.0 - agb) * target * np.ones(p + o + q + 1)
if p > 0:
sv[1 : 1 + p] = alpha / p
agb -= alpha
if o > 0:
sv[1 + p : 1 + p + o] = gamma / o
agb -= gamma / 2.0
if q > 0:
sv[1 + p + o : 1 + p + o + q] = agb / q
svs.append(sv)
llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
loc = np.argmax(llfs)
return svs[int(loc)]
[docs]
def parameter_names(self) -> list[str]:
return _common_names(self.p, self.o, self.q)
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
if horizon == 1:
return
if method == "analytic" and self.power != 2.0:
raise ValueError(
"Analytic forecasts not available for horizon > 1 when power != 2"
)
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
if horizon == 1:
return VarianceForecast(forecasts)
t = resids.shape[0]
p, o, q = self.p, self.o, self.q
omega = parameters[0]
alpha = parameters[1 : p + 1]
gamma = parameters[p + 1 : p + o + 1]
beta = parameters[p + o + 1 :]
m = np.max([p, o, q])
_resids = np.zeros(m + horizon)
_asym_resids = np.zeros(m + horizon)
_sigma2 = np.zeros(m + horizon)
for i in range(start, t):
if i - m + 1 >= 0:
_resids[:m] = resids[i - m + 1 : i + 1]
_asym_resids[:m] = _resids[:m] * (_resids[:m] < 0)
_sigma2[:m] = sigma2[i - m + 1 : i + 1]
else: # Back-casting needed
_resids[: m - i - 1] = np.sqrt(backcast)
_resids[m - i - 1 : m] = resids[0 : i + 1]
_asym_resids = cast(np.ndarray, _resids * (_resids < 0))
_asym_resids[: m - i - 1] = np.sqrt(0.5 * backcast)
_sigma2[:m] = backcast
_sigma2[m - i - 1 : m] = sigma2[0 : i + 1]
for h in range(0, horizon):
fcast_loc = i - start
forecasts[fcast_loc, h] = omega
start_loc = h + m - 1
for j in range(p):
forecasts[fcast_loc, h] += alpha[j] * _resids[start_loc - j] ** 2
for j in range(o):
forecasts[fcast_loc, h] += (
gamma[j] * _asym_resids[start_loc - j] ** 2
)
for j in range(q):
forecasts[fcast_loc, h] += beta[j] * _sigma2[start_loc - j]
_resids[h + m] = np.sqrt(forecasts[fcast_loc, h])
_asym_resids[h + m] = np.sqrt(0.5 * forecasts[fcast_loc, h])
_sigma2[h + m] = forecasts[fcast_loc, h]
return VarianceForecast(forecasts)
def _simulate_paths(
self,
m: int,
parameters: Float64Array,
horizon: int,
std_shocks: Float64Array,
scaled_forecast_paths: Float64Array,
scaled_shock: Float64Array,
asym_scaled_shock: Float64Array,
) -> tuple[Float64Array, Float64Array, Float64Array]:
power = self.power
p, o, q = self.p, self.o, self.q
omega = parameters[0]
alpha = parameters[1 : p + 1]
gamma = parameters[p + 1 : p + o + 1]
beta = parameters[p + o + 1 :]
shock = np.empty_like(scaled_forecast_paths)
for h in range(horizon):
loc = h + m - 1
scaled_forecast_paths[:, h + m] = omega
for j in range(p):
scaled_forecast_paths[:, h + m] += alpha[j] * scaled_shock[:, loc - j]
for j in range(o):
scaled_forecast_paths[:, h + m] += (
gamma[j] * asym_scaled_shock[:, loc - j]
)
for j in range(q):
scaled_forecast_paths[:, h + m] += (
beta[j] * scaled_forecast_paths[:, loc - j]
)
shock[:, h + m] = std_shocks[:, h] * scaled_forecast_paths[:, h + m] ** (
1.0 / power
)
lt_zero = shock[:, h + m] < 0
scaled_shock[:, h + m] = np.abs(shock[:, h + m]) ** power
asym_scaled_shock[:, h + m] = scaled_shock[:, h + m] * lt_zero
forecast_paths = scaled_forecast_paths[:, m:] ** (2.0 / power)
return np.asarray(np.mean(forecast_paths, 0)), forecast_paths, shock[:, m:]
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
t = resids.shape[0]
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
power = self.power
m = np.max([self.p, self.o, self.q])
scaled_forecast_paths = np.zeros((simulations, m + horizon))
scaled_shock = np.zeros((simulations, m + horizon))
asym_scaled_shock = np.zeros((simulations, m + horizon))
for i in range(start, t):
std_shocks = rng((simulations, horizon))
if i - m < 0:
scaled_forecast_paths[:, :m] = backcast ** (power / 2.0)
scaled_shock[:, :m] = backcast ** (power / 2.0)
asym_scaled_shock[:, :m] = (0.5 * backcast) ** (power / 2.0)
# Use actual values where available
count = i + 1
scaled_forecast_paths[:, m - count : m] = sigma2[:count] ** (
power / 2.0
)
scaled_shock[:, m - count : m] = np.abs(resids[:count]) ** power
asym = np.abs(resids[:count]) ** power * (resids[:count] < 0)
asym_scaled_shock[:, m - count : m] = asym
else:
scaled_forecast_paths[:, :m] = sigma2[i - m + 1 : i + 1] ** (
power / 2.0
)
scaled_shock[:, :m] = np.abs(resids[i - m + 1 : i + 1]) ** power
asym_scaled_shock[:, :m] = scaled_shock[:, :m] * (
resids[i - m + 1 : i + 1] < 0
)
f, p, s = self._simulate_paths(
m,
parameters,
horizon,
std_shocks,
scaled_forecast_paths,
scaled_shock,
asym_scaled_shock,
)
loc = i - start
forecasts[loc, :], paths[loc], shocks[loc] = f, p, s
return VarianceForecast(forecasts, paths, shocks)
[docs]
class HARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
Heterogeneous ARCH process
Parameters
----------
lags : {list, array, int}
List of lags to include in the model, or if scalar, includes all lags up the
value
Examples
--------
>>> from arch.univariate import HARCH
Lag-1 HARCH, which is identical to an ARCH(1)
>>> harch = HARCH()
More useful and realistic lag lengths
>>> harch = HARCH(lags=[1, 5, 22])
Notes
-----
In a Heterogeneous ARCH process, variance dynamics are
.. math::
\sigma_{t}^{2}=\omega + \sum_{i=1}^{m}\alpha_{l_{i}}
\left(l_{i}^{-1}\sum_{j=1}^{l_{i}}\epsilon_{t-j}^{2}\right)
In the common case where ``lags=[1,5,22]``, the model is
.. math::
\sigma_{t}^{2}=\omega+\alpha_{1}\epsilon_{t-1}^{2}
+\alpha_{5} \left(\frac{1}{5}\sum_{j=1}^{5}\epsilon_{t-j}^{2}\right)
+\alpha_{22} \left(\frac{1}{22}\sum_{j=1}^{22}\epsilon_{t-j}^{2}\right)
A HARCH process is a special case of an ARCH process where parameters in the
more general ARCH process have been restricted.
"""
def __init__(self, lags: int | Sequence[int] = 1) -> None:
super().__init__()
if not isinstance(lags, Sequence):
lag_val = int(lags)
lags = list(range(1, lag_val + 1))
lags_arr = ensure1d(lags, "lags")
self.lags: Int32Array = np.array(lags_arr, dtype=np.int32)
self._num_lags = lags_arr.shape[0]
self._num_params = self._num_lags + 1
self._name = "HARCH"
self._volatility_updater = rec.HARCHUpdater(self.lags)
def __str__(self) -> str:
descr = self.name + "(lags: "
descr += ", ".join([str(lag) for lag in self.lags])
descr += ")"
return descr
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
lags = self.lags
k_arch = lags.shape[0]
bounds = [(0.0, 10 * float(np.mean(resids**2.0)))]
bounds.extend([(0.0, 1.0)] * k_arch)
return bounds
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
k_arch = self._num_lags
a = np.zeros((k_arch + 2, k_arch + 1))
for i in range(k_arch + 1):
a[i, i] = 1.0
a[k_arch + 1, 1:] = -1.0
b = np.zeros(k_arch + 2)
b[k_arch + 1] = -1.0
return a, b
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
lags = self.lags
nobs = resids.shape[0]
rec.harch_recursion(
parameters, resids, sigma2, lags, nobs, backcast, var_bounds
)
return sigma2
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
lags = self.lags
errors = rng(nobs + burn)
if initial_value is None:
if (1.0 - np.sum(parameters[1:])) > 0:
initial_value = parameters[0] / (1.0 - np.sum(parameters[1:]))
else:
warn(initial_value_warning, InitialValueWarning)
initial_value = parameters[0]
sigma2 = np.empty(nobs + burn)
data = np.empty(nobs + burn)
max_lag = int(np.max(lags))
sigma2[:max_lag] = initial_value
data[:max_lag] = np.sqrt(initial_value)
for t in range(max_lag, nobs + burn):
sigma2[t] = parameters[0]
for i in range(lags.shape[0]):
param = parameters[1 + i] / lags[i]
for j in range(lags[i]):
sigma2[t] += param * data[t - 1 - j] ** 2.0
data[t] = errors[t] * np.sqrt(sigma2[t])
return data[burn:], sigma2[burn:]
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
k_arch = self._num_lags
alpha = 0.9
sv = (1.0 - alpha) * resids.var() * np.ones(k_arch + 1)
sv[1:] = alpha / k_arch
return sv
[docs]
def parameter_names(self) -> list[str]:
names = ["omega"]
lags = self.lags
names.extend(["alpha[" + str(lags[i]) + "]" for i in range(self._num_lags)])
return names
def _harch_to_arch(self, params: Float64Array) -> Float64Array:
arch_params = np.zeros(1 + int(self.lags.max()))
arch_params[0] = params[0]
for param, lag in zip(params[1:], self.lags):
arch_params[1 : lag + 1] += param / lag
return arch_params
def _common_forecast_components(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
horizon: int,
) -> tuple[float, Float64Array, Float64Array]:
arch_params = self._harch_to_arch(parameters)
t = resids.shape[0]
m = int(self.lags.max())
resids2 = np.empty((t, m + horizon))
resids2[:m, :m] = backcast
sq_resids = resids**2.0
for i in range(m):
resids2[m - i - 1 :, i] = sq_resids[: (t - (m - i - 1))]
const = arch_params[0]
arch = arch_params[1:]
return const, arch, resids2
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
const, arch, resids2 = self._common_forecast_components(
parameters, resids, backcast, horizon
)
m = int(self.lags.max())
resids2 = resids2[start:]
arch_rev = arch[::-1]
for i in range(horizon):
resids2[:, m + i] = const + resids2[:, i : (m + i)].dot(arch_rev)
return VarianceForecast(resids2[:, m:].copy())
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
const, arch, resids2 = self._common_forecast_components(
parameters, resids, backcast, horizon
)
t, m = resids.shape[0], int(self.lags.max())
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
temp_resids2 = np.empty((simulations, m + horizon))
arch_rev = arch[::-1]
for i in range(start, t):
std_shocks = rng((simulations, horizon))
temp_resids2[:, :] = resids2[i : (i + 1)]
path_loc = i - start
for j in range(horizon):
paths[path_loc, :, j] = const + temp_resids2[:, j : (m + j)].dot(
arch_rev
)
shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt(
paths[path_loc, :, j]
)
temp_resids2[:, m + j] = shocks[path_loc, :, j] ** 2.0
return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs]
class MIDASHyperbolic(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
MIDAS Hyperbolic ARCH process
Parameters
----------
m : int
Length of maximum lag to include in the model
asym : bool
Flag indicating whether to include an asymmetric term
Examples
--------
>>> from arch.univariate import MIDASHyperbolic
22-lag MIDAS Hyperbolic process
>>> harch = MIDASHyperbolic()
Longer 66-period lag
>>> harch = MIDASHyperbolic(m=66)
Asymmetric MIDAS Hyperbolic process
>>> harch = MIDASHyperbolic(asym=True)
Notes
-----
In a MIDAS Hyperbolic process, the variance evolves according to
.. math::
\sigma_{t}^{2}=\omega+
\sum_{i=1}^{m}\left(\alpha+\gamma I\left[\epsilon_{t-j}<0\right]\right)
\phi_{i}(\theta)\epsilon_{t-i}^{2}
where
.. math::
\phi_{i}(\theta) \propto \Gamma(i+\theta)/(\Gamma(i+1)\Gamma(\theta))
where :math:`\Gamma` is the gamma function. :math:`\{\phi_i(\theta)\}` is
normalized so that :math:`\sum \phi_i(\theta)=1`
References
----------
.. [*] Foroni, Claudia, and Massimiliano Marcellino. "A survey of
Econometric Methods for Mixed-Frequency Data". Norges Bank. (2013).
.. [*] Sheppard, Kevin. "Direct volatility modeling". Manuscript. (2018).
"""
def __init__(self, m: int = 22, asym: bool = False) -> None:
super().__init__()
self.m: int = int(m)
self._asym = bool(asym)
self._num_params = 3 + self._asym
self._name = "MIDAS Hyperbolic"
self._volatility_updater = rec.MIDASUpdater(self.m, self._asym)
def __str__(self) -> str:
descr = self.name
descr += f"(lags: {self.m}, asym: {self._asym}"
return descr
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
bounds = [(0.0, 10 * float(np.mean(resids**2.0)))] # omega
bounds.extend([(0.0, 1.0)]) # 0 <= alpha < 1
if self._asym:
bounds.extend([(-1.0, 2.0)]) # -1 <= gamma < 2
bounds.extend([(0.0, 1.0)]) # theta
return bounds
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
"""
Constraints
Notes
-----
Parameters are (omega, alpha, gamma, theta)
A.dot(parameters) - b >= 0
1. omega >0
2. alpha>0 or alpha + gamma > 0
3. alpha<1 or alpha+0.5*gamma<1
4. theta > 0
5. theta < 1
"""
symm = not self._asym
k = 3 + self._asym
a = np.zeros((5, k))
b = np.zeros(5)
# omega
a[0, 0] = 1.0
# alpha >0 or alpha+gamma>0
# alpha<1 or alpha+0.5*gamma<1
if symm:
a[1, 1] = 1.0
a[2, 1] = -1.0
else:
a[1, 1:3] = 1.0
a[2, 1:3] = [-1, -0.5]
b[2] = -1.0
# theta
a[3, k - 1] = 1.0
a[4, k - 1] = -1.0
b[4] = -1.0
return a, b
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
nobs = resids.shape[0]
weights = self._weights(parameters)
if not self._asym:
params = np.zeros(3)
params[:2] = parameters[:2]
else:
params = parameters[:3]
rec.midas_recursion(params, weights, resids, sigma2, nobs, backcast, var_bounds)
return sigma2
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = np.asarray(ensure1d(parameters, "parameters", False), dtype=float)
if self._asym:
omega, alpha, gamma = parameters[:3]
else:
omega, alpha = parameters[:2]
gamma = 0
weights = self._weights(parameters)
aw = weights * alpha
gw = weights * gamma
errors = rng(nobs + burn)
if initial_value is None:
if (1.0 - alpha - 0.5 * gamma) > 0:
initial_value = parameters[0] / (1.0 - alpha - 0.5 * gamma)
else:
warn(initial_value_warning, InitialValueWarning)
initial_value = parameters[0]
m = weights.shape[0]
burn = max(burn, m)
sigma2 = np.empty(nobs + burn)
data = np.empty(nobs + burn)
sigma2[:m] = initial_value
data[:m] = np.sqrt(initial_value)
for t in range(m, nobs + burn):
sigma2[t] = omega
for i in range(m):
if t - 1 - i < m:
coef = aw[i] + 0.5 * gw[i]
else:
coef = aw[i] + gw[i] * (data[t - 1 - i] < 0)
sigma2[t] += coef * data[t - 1 - i] ** 2.0
data[t] = errors[t] * np.sqrt(sigma2[t])
return data[burn:], sigma2[burn:]
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
theta = [0.1, 0.5, 0.8, 0.9]
alpha = [0.8, 0.9, 0.95, 0.98]
var = (resids**2).mean()
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
llfs = []
svs = []
for a, t in itertools.product(alpha, theta):
gamma = [0.0]
if self._asym:
gamma.extend([0.5, 0.9])
for g in gamma:
total = a + g / 2
o = (1 - min(total, 0.99)) * var
if self._asym:
sv = np.array([o, a, g, t])
else:
sv = np.array([o, a, t])
svs.append(sv)
llf = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
llfs.append(llf)
loc = np.argmax(llfs)
return svs[int(loc)]
[docs]
def parameter_names(self) -> list[str]:
names = ["omega", "alpha", "theta"]
if self._asym:
names.insert(2, "gamma")
return names
def _weights(self, params: Float64Array) -> Float64Array:
m = self.m
# Prevent 0
theta = max(params[-1], np.finfo(np.float64).eps)
j = np.arange(1.0, m + 1)
w = gammaln(theta + j) - gammaln(j + 1) - gammaln(theta)
w = np.exp(w)
return w / w.sum()
def _common_forecast_components(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
horizon: int,
) -> tuple[int, Float64Array, Float64Array, Float64Array, Float64Array]:
if self._asym:
omega, alpha, gamma = parameters[:3]
else:
omega, alpha = parameters[:2]
gamma = 0.0
weights = self._weights(parameters)
aw = weights * alpha
gw = weights * gamma
t = resids.shape[0]
m = self.m
resids2 = np.empty((t, m + horizon))
resids2[:m, :m] = backcast
indicator = np.empty((t, m + horizon))
indicator[:m, :m] = 0.5
sq_resids = resids**2.0
for i in range(m):
resids2[m - i - 1 :, i] = sq_resids[: (t - (m - i - 1))]
indicator[m - i - 1 :, i] = resids[: (t - (m - i - 1))] < 0
return omega, aw, gw, resids2, indicator
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
omega, aw, gw, resids2, indicator = self._common_forecast_components(
parameters, resids, backcast, horizon
)
m = self.m
resids2 = resids2[start:].copy()
aw_rev = aw[::-1]
gw_rev = gw[::-1]
for i in range(horizon):
resids2[:, m + i] = omega + resids2[:, i : (m + i)].dot(aw_rev)
if self._asym:
resids2_ind = resids2[:, i : (m + i)] * indicator[:, i : (m + i)]
resids2[:, m + i] += resids2_ind.dot(gw_rev)
indicator[:, m + i] = 0.5
return VarianceForecast(resids2[:, m:].copy())
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
omega, aw, gw, resids2, indicator = self._common_forecast_components(
parameters, resids, backcast, horizon
)
t = resids.shape[0]
m = self.m
shocks = np.empty((t - start, simulations, horizon))
paths = np.empty((t - start, simulations, horizon))
temp_resids2 = np.empty((simulations, m + horizon))
temp_indicator = np.empty((simulations, m + horizon))
aw_rev = aw[::-1]
gw_rev = gw[::-1]
for i in range(start, t):
std_shocks = rng((simulations, horizon))
temp_resids2[:, :] = resids2[i : (i + 1)]
temp_indicator[:, :] = indicator[i : (i + 1)]
path_loc = i - start
for j in range(horizon):
paths[path_loc, :, j] = omega + temp_resids2[:, j : (m + j)].dot(aw_rev)
if self._asym:
temp_resids2_ind = (
temp_resids2[:, j : (m + j)] * temp_indicator[:, j : (m + j)]
)
paths[path_loc, :, j] += temp_resids2_ind.dot(gw_rev)
shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt(
paths[path_loc, :, j]
)
temp_resids2[:, m + j] = shocks[path_loc, :, j] ** 2.0
temp_indicator[:, m + j] = (shocks[path_loc, :, j] < 0).astype(
np.double
)
return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs]
class ARCH(GARCH):
r"""
ARCH process
Parameters
----------
p : int
Order of the symmetric innovation
Examples
--------
ARCH(1) process
>>> from arch.univariate import ARCH
ARCH(5) process
>>> arch = ARCH(p=5)
Notes
-----
The variance dynamics of the model estimated
.. math::
\sigma_t^{2}=\omega+\sum_{i=1}^{p}\alpha_{i}\epsilon_{t-i}^{2}
"""
def __init__(self, p: int = 1) -> None:
super().__init__(p, 0, 0, 2.0)
self._num_params = p + 1
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
p = self.p
alphas = np.arange(0.1, 0.95, 0.05)
svs = []
backcast = self.backcast(resids)
llfs = alphas.copy()
var_bounds = self.variance_bounds(resids)
for i, alpha in enumerate(alphas):
sv = (1.0 - alpha) * resids.var() * np.ones(p + 1)
sv[1:] = alpha / p
svs.append(sv)
llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
loc = np.argmax(llfs)
return svs[int(loc)]
[docs]
class EWMAVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
Exponentially Weighted Moving-Average (RiskMetrics) Variance process
Parameters
----------
lam : {float, None}, optional
Smoothing parameter. Default is 0.94. Set to None to estimate lam
jointly with other model parameters
Examples
--------
Daily RiskMetrics EWMA process
>>> from arch.univariate import EWMAVariance
>>> rm = EWMAVariance(0.94)
Notes
-----
The variance dynamics of the model
.. math::
\sigma_t^{2}=\lambda\sigma_{t-1}^2 + (1-\lambda)\epsilon^2_{t-1}
When lam is provided, this model has no parameters since the smoothing
parameter is treated as fixed. Set lam to ``None`` to jointly estimate this
parameter when fitting the model.
"""
def __init__(self, lam: float | None = 0.94) -> None:
super().__init__()
self.lam: float | None = lam
self._estimate_lam = lam is None
self._num_params = 1 if self._estimate_lam else 0
if lam is not None and not 0.0 < lam < 1.0:
raise ValueError("lam must be strictly between 0 and 1")
self._name = "EWMA/RiskMetrics"
self._volatility_updater = rec.EWMAUpdater(self.lam)
def __str__(self) -> str:
if self._estimate_lam:
descr = self.name + "(lam: Estimated)"
else:
assert self.lam is not None
descr = self.name + "(lam: " + f"{self.lam:0.2f}" + ")"
return descr
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
if self._estimate_lam:
return np.array([0.94])
return np.array([])
[docs]
def parameter_names(self) -> list[str]:
if self._estimate_lam:
return ["lam"]
return []
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
if self._estimate_lam:
return [(0, 1)]
return []
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
lam = parameters[0] if self._estimate_lam else self.lam
return ewma_recursion(lam, resids, sigma2, resids.shape[0], float(backcast))
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
if self._estimate_lam:
a = np.ones((1, 1))
b = np.zeros((1,))
return a, b
return np.empty((0, 0)), np.empty((0,))
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
errors = rng(nobs + burn)
if initial_value is None:
initial_value = 1.0
sigma2 = np.zeros(nobs + burn)
data = np.zeros(nobs + burn)
sigma2[0] = initial_value
data[0] = np.sqrt(sigma2[0])
if self._estimate_lam:
lam = parameters[0]
else:
lam = self.lam
one_m_lam = 1.0 - lam
for t in range(1, nobs + burn):
sigma2[t] = lam * sigma2[t - 1] + one_m_lam * data[t - 1] ** 2.0
data[t] = np.sqrt(sigma2[t]) * errors[t]
return data[burn:], sigma2[burn:]
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
_, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start_index=start
)
for i in range(1, horizon):
forecasts[:, i] = forecasts[:, 0]
return VarianceForecast(forecasts)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
one_step = self._analytic_forecast(
parameters, resids, backcast, var_bounds, start, 1
)
t = resids.shape[0]
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
if self._estimate_lam:
lam = parameters[0]
else:
lam = self.lam
assert one_step.forecasts is not None
for i in range(start, t):
std_shocks = rng((simulations, horizon))
path_loc = i - start
paths[path_loc, :, 0] = one_step.forecasts[path_loc]
shocks[path_loc, :, 0] = (
np.sqrt(one_step.forecasts[path_loc]) * std_shocks[:, 0]
)
for h in range(1, horizon):
paths[path_loc, :, h] = (1 - lam) * shocks[
path_loc, :, h - 1
] ** 2.0 + lam * paths[path_loc, :, h - 1]
shocks[path_loc, :, h] = (
np.sqrt(paths[path_loc, :, h]) * std_shocks[:, h]
)
return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs]
class RiskMetrics2006(VolatilityProcess, metaclass=AbstractDocStringInheritor):
"""
RiskMetrics 2006 Variance process
Parameters
----------
tau0 : {int, float}, optional
Length of long cycle. Default is 1560.
tau1 : {int, float}, optional
Length of short cycle. Default is 4.
kmax : int, optional
Number of components. Default is 14.
rho : float, optional
Relative scale of adjacent cycles. Default is sqrt(2)
Examples
--------
Daily RiskMetrics 2006 process
>>> from arch.univariate import RiskMetrics2006
>>> rm = RiskMetrics2006()
Notes
-----
The variance dynamics of the model are given as a weighted average of kmax EWMA
variance processes where the smoothing parameters and weights are determined by
tau0, tau1 and rho.
This model has no parameters since the smoothing parameter is fixed.
"""
def __init__(
self,
tau0: float = 1560,
tau1: float = 4,
kmax: int = 14,
rho: float = 1.4142135623730951,
) -> None:
super().__init__()
self.tau0: float = tau0
self.tau1: float = tau1
self.kmax: int = kmax
self.rho: float = rho
self._num_params = 0
if tau0 <= tau1 or tau1 <= 0:
raise ValueError("tau0 must be greater than tau1 and tau1 > 0")
if tau1 * rho ** (kmax - 1) > tau0:
raise ValueError("tau1 * rho ** (kmax-1) smaller than tau0")
if not kmax >= 1:
raise ValueError("kmax must be a positive integer")
if not rho > 1:
raise ValueError("rho must be a positive number larger than 1")
self._name = "RiskMetrics2006"
self._volatility_updater = rec.RiskMetrics2006Updater(
self.kmax,
self._ewma_combination_weights(),
self._ewma_smoothing_parameters(),
)
def __str__(self) -> str:
descr = self.name
descr += (
f"(tau0: {self.tau0:0.1f}, tau1: {self.tau1:0.1f}, "
f"kmax: {self.kmax:d}, rho: {self.rho:0.3f}"
)
descr += ")"
return descr
def _ewma_combination_weights(self) -> Float64Array:
"""
Returns
-------
weights : ndarray
Combination weights for EWMA components
"""
tau0, tau1, kmax, rho = self.tau0, self.tau1, self.kmax, self.rho
taus = tau1 * (rho ** np.arange(kmax))
w = 1 - np.log(taus) / np.log(tau0)
w = w / w.sum()
return w
def _ewma_smoothing_parameters(self) -> Float64Array:
tau1, kmax, rho = self.tau1, self.kmax, self.rho
taus = tau1 * (rho ** np.arange(kmax))
mus = cast(Float64Array, np.exp(-1.0 / taus))
return mus
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
"""
Construct values for backcasting to start the recursion
Parameters
----------
resids : ndarray
Vector of (approximate) residuals
Returns
-------
backcast : ndarray
Backcast values for each EWMA component
"""
nobs = resids.shape[0]
mus = self._ewma_smoothing_parameters()
resids2 = resids**2.0
backcast = np.zeros(mus.shape[0])
for k in range(int(self.kmax)):
mu = mus[k]
end_point = int(max(min(np.floor(np.log(0.01) / np.log(mu)), nobs), k))
weights = mu ** np.arange(end_point)
weights = weights / weights.sum()
backcast[k] = weights.dot(resids2[:end_point])
return backcast
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
return np.empty((0,))
[docs]
def parameter_names(self) -> list[str]:
return []
[docs]
def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array:
return np.ones((resids.shape[0], 1)) * np.array(
[-1.0, np.finfo(np.float64).max]
)
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
return []
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
return np.empty((0, 0)), np.empty((0,))
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
nobs = resids.shape[0]
kmax = self.kmax
w = self._ewma_combination_weights()
mus = self._ewma_smoothing_parameters()
sigma2_temp = np.zeros_like(sigma2)
backcast = cast(Float64Array, backcast)
for k in range(kmax):
mu = mus[k]
ewma_recursion(mu, resids, sigma2_temp, nobs, backcast[k])
if k == 0:
sigma2[:] = w[k] * sigma2_temp
else:
sigma2 += w[k] * sigma2_temp
return sigma2
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
errors = rng(nobs + burn)
kmax = self.kmax
w = self._ewma_combination_weights()
mus = self._ewma_smoothing_parameters()
if initial_value is None:
initial_value = 1.0
sigma2s = np.zeros((nobs + burn, kmax))
sigma2s[0, :] = initial_value
sigma2 = np.zeros(nobs + burn)
data = np.zeros(nobs + burn)
data[0] = np.sqrt(initial_value)
sigma2[0] = w.dot(sigma2s[0])
for t in range(1, nobs + burn):
sigma2s[t] = mus * sigma2s[t - 1] + (1 - mus) * data[t - 1] ** 2.0
sigma2[t] = w.dot(sigma2s[t])
data[t] = np.sqrt(sigma2[t]) * errors[t]
return data[burn:], sigma2[burn:]
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
_, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start_index=start
)
for i in range(1, horizon):
forecasts[:, i] = forecasts[:, 0]
return VarianceForecast(forecasts)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
kmax = self.kmax
w = self._ewma_combination_weights()
mus = self._ewma_smoothing_parameters()
backcast = cast(Float64Array, np.asarray(backcast))
t = resids.shape[0]
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
temp_paths = np.empty((kmax, simulations, horizon))
# We use the transpose here to get C-contiguous arrays
component_one_step = np.empty((kmax, t + 1))
_resids = np.ascontiguousarray(resids)
for k in range(kmax):
mu = mus[k]
ewma_recursion(mu, _resids, component_one_step[k, :], t + 1, backcast[k])
# Transpose to be (t+1, kmax)
component_one_step = component_one_step.T
for i in range(start, t):
std_shocks = rng((simulations, horizon))
for k in range(kmax):
temp_paths[k, :, 0] = component_one_step[i, k]
path_loc = i - start
paths[path_loc, :, 0] = w.dot(temp_paths[:, :, 0])
shocks[path_loc, :, 0] = std_shocks[:, 0] * np.sqrt(paths[path_loc, :, 0])
for j in range(1, horizon):
for k in range(kmax):
mu = mus[k]
temp_paths[k, :, j] = (
mu * temp_paths[k, :, j - 1]
+ (1 - mu) * shocks[path_loc, :, j - 1] ** 2.0
)
paths[path_loc, :, j] = w.dot(temp_paths[:, :, j])
shocks[path_loc, :, j] = std_shocks[:, j] * np.sqrt(
paths[path_loc, :, j]
)
return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs]
class EGARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
EGARCH model estimation
Parameters
----------
p : int
Order of the symmetric innovation
o : int
Order of the asymmetric innovation
q : int
Order of the lagged (transformed) conditional variance
Examples
--------
>>> from arch.univariate import EGARCH
Symmetric EGARCH(1,1)
>>> egarch = EGARCH(p=1, q=1)
Standard EGARCH process
>>> egarch = EGARCH(p=1, o=1, q=1)
Exponential ARCH process
>>> earch = EGARCH(p=5)
Notes
-----
In this class of processes, the variance dynamics are
.. math::
\ln\sigma_{t}^{2}=\omega
+\sum_{i=1}^{p}\alpha_{i}
\left(\left|e_{t-i}\right|-\sqrt{2/\pi}\right)
+\sum_{j=1}^{o}\gamma_{j} e_{t-j}
+\sum_{k=1}^{q}\beta_{k}\ln\sigma_{t-k}^{2}
where :math:`e_{t}=\epsilon_{t}/\sigma_{t}`.
"""
def __init__(self, p: int = 1, o: int = 0, q: int = 1) -> None:
super().__init__()
self.p: int = int(p)
self.o: int = int(o)
self.q: int = int(q)
self._num_params = 1 + p + o + q
if p < 0 or o < 0 or q < 0:
raise ValueError("All lags lengths must be non-negative")
if p == 0 and o == 0:
raise ValueError("One of p or o must be strictly positive")
self._name = "EGARCH" if q > 0 else "EARCH"
# Helpers for fitting variance
self._arrays: tuple[Float64Array, Float64Array, Float64Array] | None = None
self._volatility_updater = rec.EGARCHUpdater(self.p, self.o, self.q)
def __str__(self) -> str:
descr = self.name + "("
for k, v in (("p", self.p), ("o", self.o), ("q", self.q)):
if v > 0:
descr += k + ": " + str(v) + ", "
descr = descr[:-2] + ")"
return descr
[docs]
def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array:
return super().variance_bounds(resids, 2.0)
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
v = np.mean(resids**2.0)
log_const = np.log(10000.0)
lnv = np.log(v)
bounds = [(lnv - log_const, lnv + log_const)]
bounds.extend([(-np.inf, np.inf)] * (self.p + self.o))
bounds.extend([(0.0, float(self.q))] * self.q)
return bounds
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
p, o, q = self.p, self.o, self.q
k_arch = p + o + q
a = np.zeros((1, k_arch + 1))
a[0, p + o + 1 :] = -1.0
b = np.zeros((1,))
b[0] = -1.0
return a, b
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
p, o, q = self.p, self.o, self.q
nobs = resids.shape[0]
if (self._arrays is not None) and (self._arrays[0].shape[0] == nobs):
lnsigma2, std_resids, abs_std_resids = self._arrays
else:
lnsigma2 = np.empty(nobs)
abs_std_resids = np.empty(nobs)
std_resids = np.empty(nobs)
self._arrays = (lnsigma2, abs_std_resids, std_resids)
rec.egarch_recursion(
parameters,
resids,
sigma2,
p,
o,
q,
nobs,
backcast,
var_bounds,
lnsigma2,
std_resids,
abs_std_resids,
)
return sigma2
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
return float(np.log(super().backcast(resids)))
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
p, o, q = self.p, self.o, self.q
errors = rng(nobs + burn)
if initial_value is None:
if q > 0:
beta_sum = np.sum(parameters[p + o + 1 :])
else:
beta_sum = 0.0
if beta_sum < 1:
initial_value = parameters[0] / (1.0 - beta_sum)
else:
warn(initial_value_warning, InitialValueWarning)
initial_value = parameters[0]
sigma2 = np.zeros(nobs + burn)
data = np.zeros(nobs + burn)
lnsigma2 = np.zeros(nobs + burn)
abserrors = cast(Float64Array, np.abs(errors))
norm_const = np.sqrt(2 / np.pi)
max_lag = np.max([p, o, q])
lnsigma2[:max_lag] = initial_value
sigma2[:max_lag] = np.exp(initial_value)
data[:max_lag] = errors[:max_lag] * np.sqrt(sigma2[:max_lag])
for t in range(max_lag, nobs + burn):
loc = 0
lnsigma2[t] = parameters[loc]
loc += 1
for j in range(p):
lnsigma2[t] += parameters[loc] * (abserrors[t - 1 - j] - norm_const)
loc += 1
for j in range(o):
lnsigma2[t] += parameters[loc] * errors[t - 1 - j]
loc += 1
for j in range(q):
lnsigma2[t] += parameters[loc] * lnsigma2[t - 1 - j]
loc += 1
sigma2 = cast(Float64Array, np.exp(lnsigma2))
data = errors * np.sqrt(sigma2)
return data[burn:], sigma2[burn:]
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
p, o, q = self.p, self.o, self.q
alphas = [0.01, 0.05, 0.1, 0.2]
gammas = [-0.1, 0.0, 0.1]
betas = [0.5, 0.7, 0.9, 0.98]
agbs = list(itertools.product(*[alphas, gammas, betas]))
target = np.log(np.mean(resids**2))
svs = []
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
llfs = np.zeros(len(agbs))
for i, values in enumerate(agbs):
alpha, gamma, beta = values
sv = (1.0 - beta) * target * np.ones(p + o + q + 1)
if p > 0:
sv[1 : 1 + p] = alpha / p
if o > 0:
sv[1 + p : 1 + p + o] = gamma / o
if q > 0:
sv[1 + p + o : 1 + p + o + q] = beta / q
svs.append(sv)
llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
loc = np.argmax(llfs)
return svs[int(loc)]
[docs]
def parameter_names(self) -> list[str]:
return _common_names(self.p, self.o, self.q)
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
if method == "analytic" and horizon > 1:
raise ValueError("Analytic forecasts not available for horizon > 1")
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
_, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
return VarianceForecast(forecasts)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
t = resids.shape[0]
p, o, q = self.p, self.o, self.q
m = np.max([p, o, q])
lnsigma2 = cast(Float64Array, np.log(sigma2))
e = resids / np.sqrt(sigma2)
lnsigma2_mat = np.full((t, m), backcast)
e_mat = np.zeros((t, m))
abs_e_mat = np.full((t, m), np.sqrt(2 / np.pi))
for i in range(m):
lnsigma2_mat[m - i - 1 :, i] = lnsigma2[: (t - (m - 1) + i)]
e_mat[m - i - 1 :, i] = e[: (t - (m - 1) + i)]
abs_e_mat[m - i - 1 :, i] = np.abs(e[: (t - (m - 1) + i)])
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
sqrt2pi = np.sqrt(2 / np.pi)
_lnsigma2 = np.empty((simulations, m + horizon))
_e = np.empty((simulations, m + horizon))
_abs_e = np.empty((simulations, m + horizon))
for i in range(start, t):
std_shocks = rng((simulations, horizon))
_lnsigma2[:, :m] = lnsigma2_mat[i, :]
_e[:, :m] = e_mat[i, :]
_e[:, m:] = std_shocks
_abs_e[:, :m] = abs_e_mat[i, :]
_abs_e[:, m:] = np.abs(std_shocks)
for j in range(horizon):
loc = 0
_lnsigma2[:, m + j] = parameters[loc]
loc += 1
for k in range(p):
_lnsigma2[:, m + j] += parameters[loc] * (
_abs_e[:, m + j - 1 - k] - sqrt2pi
)
loc += 1
for k in range(o):
_lnsigma2[:, m + j] += parameters[loc] * _e[:, m + j - 1 - k]
loc += 1
for k in range(q):
_lnsigma2[:, m + j] += parameters[loc] * _lnsigma2[:, m + j - 1 - k]
loc += 1
loc = i - start
paths[loc, :, :] = np.exp(_lnsigma2[:, m:])
shocks[loc, :, :] = np.sqrt(paths[loc, :, :]) * std_shocks
return VarianceForecast(np.asarray(paths.mean(1)), paths, shocks)
[docs]
class FixedVariance(VolatilityProcess, metaclass=AbstractDocStringInheritor):
"""
Fixed volatility process
Parameters
----------
variance : {array, Series}
Array containing the variances to use. Should have the same shape as the
data used in the model.
unit_scale : bool, optional
Flag whether to enforce a unit scale. If False, a scale parameter will be
estimated so that the model variance will be proportional to ``variance``.
If True, the model variance is set of ``variance``
Notes
-----
Allows a fixed set of variances to be used when estimating a mean model,
allowing GLS estimation.
"""
def __init__(self, variance: Float64Array, unit_scale: bool = False) -> None:
super().__init__()
self._num_params = 0 if unit_scale else 1
self._unit_scale = unit_scale
self._name = "Fixed Variance"
self._name += " (Unit Scale)" if unit_scale else ""
self._variance_series = ensure1d(variance, "variance", True)
self._variance = np.asarray(variance)
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
if self._stop - self._start != sigma2.shape[0]:
raise ValueError("start and stop do not have the correct values.")
sigma2[:] = self._variance[self._start : self._stop]
if not self._unit_scale:
sigma2 *= parameters[0]
return sigma2
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
if not self._unit_scale:
_resids = resids / np.sqrt(self._variance[self._start : self._stop])
return np.array([_resids.var()])
return np.empty(0)
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
raise NotImplementedError("Fixed Variance processes do not support simulation")
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
if not self._unit_scale:
return np.ones((1, 1)), np.zeros(1)
else:
return np.ones((0, 0)), np.zeros(0)
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
return 1.0
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
if not self._unit_scale:
v = float(np.squeeze(self.starting_values(resids)))
_resids = resids / np.sqrt(self._variance[self._start : self._stop])
mu = _resids.mean()
return [(v / 100000.0, 10.0 * (v + mu**2.0))]
return []
[docs]
def parameter_names(self) -> list[str]:
if not self._unit_scale:
return ["scale"]
return []
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
t = resids.shape[0]
forecasts = np.full((t, horizon), np.nan)
return VarianceForecast(forecasts)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
t = resids.shape[0]
forecasts = np.full((t, horizon), np.nan)
forecast_paths = np.empty((t, simulations, horizon))
forecast_paths.fill(np.nan)
shocks = np.full((t, simulations, horizon), np.nan)
return VarianceForecast(forecasts, forecast_paths, shocks)
[docs]
class FIGARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
FIGARCH model
Parameters
----------
p : {0, 1}
Order of the symmetric innovation
q : {0, 1}
Order of the lagged (transformed) conditional variance
power : float, optional
Power to use with the innovations, abs(e) ** power. Default is 2.0,
which produces FIGARCH and related models. Using 1.0 produces
FIAVARCH and related models. Other powers can be specified, although
these should be strictly positive, and usually larger than 0.25.
truncation : int, optional
Truncation point to use in ARCH(:math:`\infty`) representation.
Default is 1000.
Examples
--------
>>> from arch.univariate import FIGARCH
Standard FIGARCH
>>> figarch = FIGARCH()
FIARCH
>>> fiarch = FIGARCH(p=0)
FIAVGARCH process
>>> fiavarch = FIGARCH(power=1.0)
Notes
-----
In this class of processes, the variance dynamics are
.. math::
h_t = \omega + [1-\beta L - \phi L (1-L)^d] \epsilon_t^2 + \beta h_{t-1}
where ``L`` is the lag operator and ``d`` is the fractional differencing
parameter. The model is estimated using the ARCH(:math:`\infty`)
representation,
.. math::
h_t = (1-\beta)^{-1} \omega + \sum_{i=1}^\infty \lambda_i \epsilon_{t-i}^2
The weights are constructed using
.. math::
\delta_1 = d \\
\lambda_1 = d - \beta + \phi
and the recursive equations
.. math::
\delta_j = \frac{j - 1 - d}{j} \delta_{j-1} \\
\lambda_j = \beta \lambda_{j-1} + \delta_j - \phi \delta_{j-1}.
When `power` is not 2, the ARCH(:math:`\infty`) representation is still used
where :math:`\epsilon_t^2` is replaced by :math:`|\epsilon_t|^p` and
``p`` is the power.
"""
def __init__(
self, p: int = 1, q: int = 1, power: float = 2.0, truncation: int = 1000
) -> None:
super().__init__()
self.p: int = int(p)
self.q: int = int(q)
self.power: float = power
self._num_params = 2 + p + q
self._truncation = int(truncation)
if p < 0 or q < 0 or p > 1 or q > 1:
raise ValueError("p and q must be either 0 or 1.")
if self._truncation <= 0:
raise ValueError("truncation must be a positive integer")
if power <= 0.0:
raise ValueError(
"power must be strictly positive, usually larger than 0.25"
)
self._name = self._generate_name()
self._volatility_updater = rec.FIGARCHUpdater(p, q, power, truncation)
@property
def truncation(self) -> int:
"""Truncation lag for the ARCH-infinity approximation"""
return self._truncation
def __str__(self) -> str:
descr = self.name
if self.power != 1.0 and self.power != 2.0:
descr = descr[:-1] + ", "
else:
descr += "("
for k, v in (("p", self.p), ("q", self.q)):
descr += k + ": " + str(v) + ", "
descr = descr[:-2] + ")"
return descr
[docs]
def variance_bounds(self, resids: Float64Array, power: float = 2.0) -> Float64Array:
return super().variance_bounds(resids, self.power)
def _generate_name(self) -> str:
q, power = self.q, self.power
if power == 2.0:
if q == 0:
return "FIARCH"
else:
return "FIGARCH"
elif power == 1.0:
if q == 0:
return "FIAVARCH"
else:
return "FIAVGARCH"
else:
if q == 0:
return f"Power FIARCH (power: {self.power:0.1f})"
else:
return f"Power FIGARCH (power: {self.power:0.1f})"
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
eps_half = np.sqrt(np.finfo(np.float64).eps)
v = np.mean(abs(resids) ** self.power)
bounds = [(0.0, 10.0 * float(v))]
bounds.extend([(0.0, 0.5)] * self.p) # phi
bounds.extend([(0.0, 1.0 - eps_half)]) # d
bounds.extend([(0.0, 1.0 - eps_half)] * self.q) # beta
return bounds
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
# omega > 0 <- 1
# 0 <= d <= 1 <- 2
# 0 <= phi <= (1 - d) / 2 <- 2
# 0 <= beta <= d + phi <- 2
a = np.array(
[
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, -2, -1, 0],
[0, 0, 1, 0],
[0, 0, -1, 0],
[0, 0, 0, 1],
[0, 1, 1, -1],
]
)
b = np.array([0, 0, -1, 0, -1, 0, 0])
if not self.q:
a = a[:-2, :-1]
b = b[:-2]
if not self.p:
# Drop column 1 and rows 1 and 2
a = np.delete(a, (1,), axis=1)
a = np.delete(a, (1, 2), axis=0)
b = np.delete(b, (1, 2))
return a, b
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
# fresids is abs(resids) ** power
power = self.power
fresids = np.abs(resids) ** power
p, q, truncation = self.p, self.q, self.truncation
nobs = resids.shape[0]
rec.figarch_recursion(
parameters, fresids, sigma2, p, q, nobs, truncation, backcast, var_bounds
)
inv_power = 2.0 / power
sigma2 **= inv_power
return sigma2
[docs]
def backcast(self, resids: Float64Array) -> float | Float64Array:
power = self.power
tau = min(75, resids.shape[0])
w = 0.94 ** np.arange(tau)
w = w / sum(w)
backcast = float(np.sum((abs(resids[:tau]) ** power) * w))
return backcast
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
parameters = ensure1d(parameters, "parameters", False)
truncation = self.truncation
p, q, power = self.p, self.q, self.power
lam = rec.figarch_weights(parameters[1:], p, q, truncation)
lam_rev = lam[::-1]
errors = rng(truncation + nobs + burn)
if initial_value is None:
persistence = np.sum(lam)
beta = parameters[-1] if q else 0.0
initial_value = parameters[0]
if beta < 1:
initial_value /= 1 - beta
if persistence < 1:
initial_value /= 1 - persistence
if persistence >= 1.0 or beta >= 1.0:
warn(initial_value_warning, InitialValueWarning)
assert initial_value is not None
sigma2 = np.empty(truncation + nobs + burn)
data = np.empty(truncation + nobs + burn)
fsigma = np.empty(truncation + nobs + burn)
fdata = np.empty(truncation + nobs + burn)
fsigma[:truncation] = initial_value
sigma2[:truncation] = initial_value ** (2.0 / power)
data[:truncation] = np.sqrt(sigma2[:truncation]) * errors[:truncation]
fdata[:truncation] = abs(data[:truncation]) ** power
omega = parameters[0]
beta = parameters[-1] if q else 0
omega_tilde = omega / (1 - beta)
for t in range(truncation, truncation + nobs + burn):
fsigma[t] = omega_tilde + lam_rev.dot(fdata[t - truncation : t])
sigma2[t] = fsigma[t] ** (2.0 / power)
data[t] = errors[t] * np.sqrt(sigma2[t])
fdata[t] = abs(data[t]) ** power
return data[truncation + burn :], sigma2[truncation + burn :]
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
truncation = self.truncation
ds = [0.2, 0.5, 0.7]
phi_ratio = [0.2, 0.5, 0.8] if self.p else [0]
beta_ratio = [0.1, 0.5, 0.9] if self.q else [0]
power = self.power
target = np.mean(abs(resids) ** power)
scale = np.mean(resids**2) / (target ** (2.0 / power))
target *= scale ** (power / 2)
all_starting_vals = []
for d in ds:
for pr in phi_ratio:
phi = (1 - d) / 2 * pr
for br in beta_ratio:
beta = (d + phi) * br
temp = [phi, d, beta]
lam = rec.figarch_weights(np.array(temp), 1, 1, truncation)
omega = (1 - beta) * target * (1 - np.sum(lam))
all_starting_vals.append((omega, phi, d, beta))
distinct_svs = set(all_starting_vals)
starting_vals = np.array(list(distinct_svs))
if not self.q:
starting_vals = starting_vals[:, :-1]
if not self.p:
starting_vals = np.c_[starting_vals[:, [0]], starting_vals[:, 2:]]
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
llfs = np.zeros(len(starting_vals))
for i, sv in enumerate(starting_vals):
llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
loc = np.argmax(llfs)
return starting_vals[int(loc)]
[docs]
def parameter_names(self) -> list[str]:
names = ["omega"]
if self.p:
names += ["phi"]
names += ["d"]
if self.q:
names += ["beta"]
return names
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
if horizon == 1:
return
if method == "analytic" and self.power != 2.0:
raise ValueError(
"Analytic forecasts not available for horizon > 1 when power != 2"
)
return
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
if horizon == 1:
return VarianceForecast(forecasts)
truncation = self.truncation
p, q = self.p, self.q
lam = rec.figarch_weights(parameters[1:], p, q, truncation)
lam_rev = lam[::-1]
t = resids.shape[0]
omega = parameters[0]
beta = parameters[-1] if q else 0.0
omega_tilde = omega / (1 - beta)
temp_forecasts = np.empty(truncation + horizon)
resids2 = resids**2
for i in range(start, t):
fcast_loc = i - start
available = i + 1 - max(0, i - truncation + 1)
temp_forecasts[truncation - available : truncation] = resids2[
max(0, i - truncation + 1) : i + 1
]
if available < truncation:
temp_forecasts[: truncation - available] = backcast
for h in range(horizon):
lagged_forecasts = temp_forecasts[h : truncation + h]
temp_forecasts[truncation + h] = omega_tilde + lam_rev.dot(
lagged_forecasts
)
forecasts[fcast_loc, :] = temp_forecasts[truncation:]
return VarianceForecast(forecasts)
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
t = resids.shape[0]
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
power = self.power
truncation = self.truncation
p, q = self.p, self.q
lam = rec.figarch_weights(parameters[1:], p, q, truncation)
lam_rev = lam[::-1]
t = resids.shape[0]
omega = parameters[0]
beta = parameters[-1] if q else 0.0
omega_tilde = omega / (1 - beta)
fpath = np.empty((simulations, truncation + horizon))
fresids = np.abs(resids) ** power
for i in range(start, t):
std_shocks = rng((simulations, horizon))
available = i + 1 - max(0, i - truncation + 1)
fpath[:, truncation - available : truncation] = fresids[
max(0, i + 1 - truncation) : i + 1
]
if available < truncation:
fpath[:, : (truncation - available)] = backcast
for h in range(horizon):
# 1. Forecast transformed variance
lagged_forecasts = fpath[:, h : truncation + h]
temp = omega_tilde + lagged_forecasts.dot(lam_rev)
# 2. Transform variance
sigma2 = temp ** (2.0 / power)
# 3. Simulate new residual
path_loc = i - start
shocks[path_loc, :, h] = std_shocks[:, h] * np.sqrt(sigma2)
paths[path_loc, :, h] = sigma2
forecasts[path_loc, h] = sigma2.mean()
# 4. Transform new residual
fpath[:, truncation + h] = np.abs(shocks[path_loc, :, h]) ** power
return VarianceForecast(forecasts, paths, shocks)
[docs]
class APARCH(VolatilityProcess, metaclass=AbstractDocStringInheritor):
r"""
Asymmetric Power ARCH (APARCH) volatility process
Parameters
----------
p : int
Order of the symmetric innovation. Must satisfy p>=o.
o : int
Order of the asymmetric innovation. Must satisfy o<=p.
q : int
Order of the lagged (transformed) conditional variance
delta : float, optional
Value to use for a fixed delta in the APARCH model. If
not provided, the value of delta is jointly estimated
with other model parameters. User provided delta is restricted
to lie in (0.05, 4.0).
common_asym : bool, optional
Restrict all asymmetry terms to share the same asymmetry
parameter. If False (default), then there are no restrictions
on the ``o`` asymmetry parameters.
Examples
--------
>>> from arch.univariate import APARCH
Symmetric Power ARCH(1,1)
>>> aparch = APARCH(p=1, q=1)
Standard APARCH process
>>> aparch = APARCH(p=1, o=1, q=1)
Fixed power parameters
>>> aparch = APARCH(p=1, o=1, q=1, delta=1.3)
Notes
-----
In this class of processes, the variance dynamics are
.. math::
\sigma_{t}^{\delta}=\omega
+\sum_{i=1}^{p}\alpha_{i}
\left(\left|\epsilon_{t-i}\right|
-\gamma_{i}I_{[o\geq i]}\epsilon_{t-i}\right)^{\delta}
+\sum_{k=1}^{q}\beta_{k}\sigma_{t-k}^{\delta}
If ``common_asym`` is ``True``, then all of :math:`\gamma_i`
are restricted to have a common value.
"""
def __init__(
self,
p: int = 1,
o: int = 1,
q: int = 1,
delta: float | None = None,
common_asym: bool = False,
) -> None:
super().__init__()
self.p: int = int(p)
self.o: int = int(o)
self.q: int = int(q)
self._est_delta = delta is None
self._common_asym = bool(common_asym) and self.o > 0
self._delta = float(np.nan)
if not self._est_delta:
try:
assert delta is not None
self._delta = float(delta)
except (ValueError, TypeError):
raise TypeError("delta must be convertible to a float.")
if not 0.05 < delta < 4:
raise ValueError("delta must be between 0.05 and 4")
self._delta = delta
if p < 1 or o < 0 or q < 0:
raise ValueError("All lags lengths must be non-negative, and p >= 1")
if o > p:
raise ValueError("o must be <= p.")
self._name = "APARCH" if o > 0 else "Power ARCH"
self._sigma_delta = np.empty(0)
self._parameters = np.empty(1 + self.p + self.o + self.q + 1)
o = 1 if self._common_asym else o
self._num_params = 1 + p + o + q + int(self._est_delta)
self._repack = self._common_asym or not self._est_delta
@property
def delta(self) -> float:
"""The value of delta in the model. NaN is delta is estimated."""
return self._delta
@property
def common_asym(self) -> bool:
"""The value of delta in the model. NaN is delta is estimated."""
return self._common_asym
def _repack_parameters(self, parameters: Float64Array) -> Float64Array:
if not self._repack:
return parameters
p, o, q = self.p, self.o, self.q
_parameters = self._parameters
if not self._common_asym:
# Must only have fixed delta
_parameters[: p + o + q + 1] = parameters
else:
_parameters[: p + 1] = parameters[: p + 1]
_parameters[p + 1 : p + o + 1] = parameters[p + 1]
_parameters[p + o + 1 : p + o + q + 1] = parameters[p + 2 : p + q + 2]
_parameters[-1] = parameters[-1] if self._est_delta else self.delta
return _parameters
[docs]
def compute_variance(
self,
parameters: Float64Array,
resids: Float64Array,
sigma2: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
) -> Float64Array:
abs_resids = np.abs(resids)
if self._sigma_delta.shape[0] != resids.shape[0]:
self._sigma_delta = np.empty(resids.shape[0])
sigma_delta = self._sigma_delta
p, o, q = self.p, self.o, self.q
nobs = resids.shape[0]
_parameters = self._repack_parameters(parameters)
rec.aparch_recursion(
_parameters,
resids,
abs_resids,
sigma2,
sigma_delta,
p,
o,
q,
nobs,
backcast,
var_bounds,
)
return sigma2
[docs]
def bounds(self, resids: Float64Array) -> list[tuple[float, float]]:
v = max(float(np.mean(abs(resids) ** 0.5)), float(np.mean(resids**2)))
bounds = [(0.0, 10.0 * float(v))]
bounds.extend([(0.0, 1.0)] * self.p)
o = 1 if self._common_asym else self.o
bounds.extend([(-0.9997, 0.9997)] * o)
bounds.extend([(0.0, 1.0)] * self.q)
if self._est_delta:
bounds.append((0.05, 4.0))
return bounds
[docs]
def starting_values(self, resids: Float64Array) -> Float64Array:
p, o, q = self.p, self.o, self.q
alphas = [0.03, 0.05, 0.08, 0.15]
alpha_beta = [0.8, 0.9, 0.95, 0.975, 0.99]
gammas = [-0.5, 0, 0.5] if self.o > 0 else [0]
deltas = [0.5, 1.2, 1.8] if self._est_delta else [self._delta]
abgs = list(itertools.product(*[alphas, gammas, alpha_beta, deltas]))
svs = []
var_bounds = self.variance_bounds(resids)
backcast = self.backcast(resids)
llfs = np.zeros(len(abgs))
est_delta = int(self._est_delta)
for i, values in enumerate(abgs):
alpha, gamma, ab, delta = values
target = np.mean(abs(resids) ** delta)
scale = np.mean(resids**2) / (target ** (2.0 / delta))
target *= scale ** (delta / 2)
sv = (1.0 - ab) * target * np.ones(p + o + q + 1 + est_delta)
sv[1 : 1 + p] = alpha / p
ab -= alpha
if o > 0:
sv[1 + p : 1 + p + o] = gamma
if q > 0:
sv[1 + p + o : 1 + p + o + q] = ab / q
if est_delta:
sv[-1] = delta
svs.append(sv)
llfs[i] = self._gaussian_loglikelihood(sv, resids, backcast, var_bounds)
loc = np.argmax(llfs)
sv = svs[int(loc)]
if self._common_asym:
sv = np.r_[sv[: p + 1 + (o > 0)], sv[p + o + 1 :]]
return sv
[docs]
def constraints(self) -> tuple[Float64Array, Float64Array]:
p, o, q = self.p, self.o, self.q
o = 1 if self._common_asym else o
k_arch = p + o + q
# alpha[i] > 0, p
# -1 < gamma[i] < 1, 2*o
# beta[i] > 0, q
# sum(alpha) + sum(beta) < 1, 1
ndelta = 2 * int(self._est_delta)
a = np.zeros((k_arch + o + 2 + ndelta, self._num_params))
for i in range(p + 1):
a[i, i] = 1.0
for i in range(o):
a[1 + p + i, 1 + p + i] = 1.0
a[1 + p + o + i, 1 + p + i] = -1.0
for i in range(q):
a[1 + p + 2 * o, 1 + p + o + i] = 1.0
if self._est_delta:
a[1 + p + 2 * o + q, 1 + p + o + q] = 1.0
a[1 + p + 2 * o + q + 1, 1 + p + o + q] = -1.0
a[-1, 1 : p + o + q + 1] = -1.0
a[-1, p + 1 : p + o + 1] = 0
b = np.zeros(k_arch + o + 2 + ndelta) # omega and alpha, > 0
b[p + 1 : p + o + 1] = -0.9997 # gamma > -.9997
b[p + o + 1 : p + 2 * o + 1] = -0.9997 # gamma < .9997
if self._est_delta:
b[-3] = 0.05 # delta > 0.05
b[-2] = -4.0 # delta < 4
b[-1] = -1.0 # sum < 1
return a, b
[docs]
def parameter_names(self) -> list[str]:
names = _common_names(self.p, self.o, self.q)
if self._common_asym:
names = names[: self.p + 1] + ["gamma"] + names[1 + self.p + self.o :]
if self._est_delta:
names += ["delta"]
return names
[docs]
def simulate(
self,
parameters: Sequence[int | float] | ArrayLike1D,
nobs: int,
rng: RNGType,
burn: int = 500,
initial_value: None | float | Float64Array = None,
) -> tuple[Float64Array, Float64Array]:
params = np.asarray(ensure1d(parameters, "parameters", False), dtype=float)
params = self._repack_parameters(params)
p, o, q = self.p, self.o, self.q
errors = rng(nobs + burn)
sigma2 = np.zeros(nobs + burn)
sigma_delta = np.zeros(nobs + burn)
data = np.zeros(nobs + burn)
adata = np.zeros(nobs + burn)
max_lag = np.max([p, q])
delta = params[-1]
if initial_value is None:
persistence = params[1 : p + 1].sum()
persistence += params[1 + p + o : 1 + p + o + q].sum()
if (1.0 - persistence) > 0:
initial_value = params[0] / (1.0 - persistence)
else:
warn(initial_value_warning, InitialValueWarning)
initial_value = params[0]
sigma_delta[:max_lag] = initial_value
sigma2[:max_lag] = initial_value ** (2.0 / delta)
data[:max_lag] = np.sqrt(sigma2[:max_lag]) * errors[:max_lag]
adata[:max_lag] = np.abs(data[:max_lag])
for t in range(max_lag, nobs + burn):
sigma_delta[t] = params[0]
for j in range(p):
shock = adata[t - 1 - j]
if o > j:
shock -= params[1 + p + j] * data[t - 1 - j]
sigma_delta[t] += params[1 + j] * shock**delta
for j in range(q):
sigma_delta[t] += params[1 + p + o + j] * sigma_delta[t - 1 - j]
sigma2[t] = sigma_delta[t] ** (2.0 / delta)
data[t] = errors[t] * np.sqrt(sigma2[t])
adata[t] = np.abs(data[t])
return data[burn:], sigma2[burn:]
def _simulate_paths(
self,
m: int,
parameters: Float64Array,
horizon: int,
std_shocks: Float64Array,
sigma_delta: Float64Array,
shock: Float64Array,
abs_shock: Float64Array,
) -> tuple[Float64Array, Float64Array, Float64Array]:
if self._est_delta:
delta = parameters[-1]
else:
delta = self._delta
p, o, q = self.p, self.o, self.q
omega = parameters[0]
alpha = parameters[1 : p + 1]
gamma = parameters[p + 1 : p + o + 1]
beta = parameters[p + o + 1 : p + o + q + 1]
for h in range(horizon):
loc = h + m - 1
sigma_delta[:, h + m] = omega
for j in range(p):
_shock = abs_shock[:, loc - j]
if self.o > j:
_shock -= gamma[j] * shock[:, loc - j]
sigma_delta[:, h + m] += alpha[j] * (_shock**delta)
for j in range(q):
sigma_delta[:, h + m] += beta[j] * sigma_delta[:, loc - j]
shock[:, h + m] = std_shocks[:, h] * sigma_delta[:, h + m] ** (1.0 / delta)
abs_shock[:, h + m] = np.abs(shock[:, h + m])
forecast_paths = sigma_delta[:, m:] ** (2.0 / delta)
return np.asarray(np.mean(forecast_paths, 0)), forecast_paths, shock[:, m:]
def _simulation_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
simulations: int,
rng: RNGType,
) -> VarianceForecast:
sigma2, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
parameters = self._repack_parameters(parameters)
t = resids.shape[0]
paths = np.empty((t - start, simulations, horizon))
shocks = np.empty((t - start, simulations, horizon))
delta = parameters[-1]
m = np.max([self.p, self.q])
sigma_delta = np.zeros((simulations, m + horizon))
shock = np.zeros((simulations, m + horizon))
abs_shock = np.zeros((simulations, m + horizon))
for i in range(start, t):
std_shocks = rng((simulations, horizon))
if i - m < 0:
sigma_delta[:, :m] = backcast ** (delta / 2.0)
shock[:, :m] = 0
abs_shock[:, :m] = backcast ** (1.0 / 2.0)
# Use actual values where available
count = i + 1
sigma_delta[:, m - count : m] = sigma2[:count] ** (delta / 2.0)
shock[:, m - count : m] = resids[:count]
abs_shock[:, m - count : m] = np.abs(resids[:count])
else:
sigma_delta[:, :m] = sigma2[i - m + 1 : i + 1] ** (delta / 2.0)
shock[:, :m] = resids[i - m + 1 : i + 1]
abs_shock[:, :m] = np.abs(resids[i - m + 1 : i + 1])
f, p, s = self._simulate_paths(
m,
parameters,
horizon,
std_shocks,
sigma_delta,
shock,
abs_shock,
)
loc = i - start
forecasts[loc, :], paths[loc], shocks[loc] = f, p, s
return VarianceForecast(forecasts, paths, shocks)
def _analytic_forecast(
self,
parameters: Float64Array,
resids: Float64Array,
backcast: float | Float64Array,
var_bounds: Float64Array,
start: int,
horizon: int,
) -> VarianceForecast:
_, forecasts = self._one_step_forecast(
parameters, resids, backcast, var_bounds, horizon, start
)
return VarianceForecast(forecasts)
def _check_forecasting_method(
self, method: ForecastingMethod, horizon: int
) -> None:
if horizon == 1:
return
if method == "analytic":
raise ValueError("Analytic forecasts not available for horizon > 1")
return
def __str__(self) -> str:
descr = self.name + "("
for k, v in (("p", self.p), ("o", self.o), ("q", self.q)):
if v > 0:
descr += f"{k}: {v}, "
if not self._est_delta:
descr += f"delta: {self.delta:0.3f}, "
if self.o > 0:
descr += f"Common Asym: {self._common_asym}, "
descr = descr[:-2] + ")"
return descr