"""
Implements the Kalman filter for a linear Gaussian state space model.
References
----------
https://python.quantecon.org/kalman.html
"""
from textwrap import dedent
import numpy as np
from scipy.linalg import inv
from ._lss import LinearStateSpace
from ._matrix_eqn import solve_discrete_riccati
[docs]class Kalman:
r"""
Implements the Kalman filter for the Gaussian state space model
.. math::
x_{t+1} = A x_t + C w_{t+1} \\
y_t = G x_t + H v_t
Here :math:`x_t` is the hidden state and :math:`y_t` is the measurement.
The shocks :math:`w_t` and :math:`v_t` are iid standard normals. Below
we use the notation
.. math::
Q := CC'
R := HH'
Parameters
----------
ss : instance of LinearStateSpace
An instance of the quantecon.lss.LinearStateSpace class
x_hat : scalar(float) or array_like(float), optional(default=None)
An n x 1 array representing the mean x_hat of the
prior/predictive density. Set to zero if not supplied.
Sigma : scalar(float) or array_like(float), optional(default=None)
An n x n array representing the covariance matrix Sigma of
the prior/predictive density. Must be positive definite.
Set to the identity if not supplied.
Attributes
----------
Sigma, x_hat : as above
Sigma_infinity : array_like or scalar(float)
The infinite limit of Sigma_t
K_infinity : array_like or scalar(float)
The stationary Kalman gain.
References
----------
https://python.quantecon.org/kalman.html
"""
def __init__(self, ss, x_hat=None, Sigma=None):
self.ss = ss
self.set_state(x_hat, Sigma)
self._K_infinity = None
self._Sigma_infinity = None
[docs] def set_state(self, x_hat, Sigma):
if Sigma is None:
self.Sigma = np.identity(self.ss.n)
else:
self.Sigma = np.atleast_2d(Sigma)
if x_hat is None:
self.x_hat = np.zeros((self.ss.n, 1))
else:
self.x_hat = np.atleast_2d(x_hat)
self.x_hat.shape = self.ss.n, 1
def __repr__(self):
return self.__str__()
def __str__(self):
m = """\
Kalman filter:
- dimension of state space : {n}
- dimension of observation equation : {k}
"""
return dedent(m.format(n=self.ss.n, k=self.ss.k))
@property
def Sigma_infinity(self):
if self._Sigma_infinity is None:
self.stationary_values()
return self._Sigma_infinity
@property
def K_infinity(self):
if self._K_infinity is None:
self.stationary_values()
return self._K_infinity
[docs] def whitener_lss(self):
r"""
This function takes the linear state space system
that is an input to the Kalman class and it converts
that system to the time-invariant whitener represenation
given by
.. math::
\tilde{x}_{t+1}^* = \tilde{A} \tilde{x} + \tilde{C} v
a = \tilde{G} \tilde{x}
where
.. math::
\tilde{x}_t = [x+{t}, \hat{x}_{t}, v_{t}]
and
.. math::
\tilde{A} =
\begin{bmatrix}
A & 0 & 0 \\
KG & A-KG & KH \\
0 & 0 & 0 \\
\end{bmatrix}
.. math::
\tilde{C} =
\begin{bmatrix}
C & 0 \\
0 & 0 \\
0 & I \\
\end{bmatrix}
.. math::
\tilde{G} =
\begin{bmatrix}
G & -G & H \\
\end{bmatrix}
with :math:`A, C, G, H` coming from the linear state space system
that defines the Kalman instance
Returns
-------
whitened_lss : LinearStateSpace
This is the linear state space system that represents
the whitened system
"""
K = self.K_infinity
# Get the matrix sizes
n, k, m, l = self.ss.n, self.ss.k, self.ss.m, self.ss.l
A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
Atil = np.vstack([np.hstack([A, np.zeros((n, n)), np.zeros((n, l))]),
np.hstack([np.dot(K, G),
A-np.dot(K, G),
np.dot(K, H)]),
np.zeros((l, 2*n + l))])
Ctil = np.vstack([np.hstack([C, np.zeros((n, l))]),
np.zeros((n, m+l)),
np.hstack([np.zeros((l, m)), np.eye(l)])])
Gtil = np.hstack([G, -G, H])
whitened_lss = LinearStateSpace(Atil, Ctil, Gtil)
self.whitened_lss = whitened_lss
return whitened_lss
[docs] def prior_to_filtered(self, y):
r"""
Updates the moments (x_hat, Sigma) of the time t prior to the
time t filtering distribution, using current measurement :math:`y_t`.
The updates are according to
.. math::
\hat{x}^F = \hat{x} + \Sigma G' (G \Sigma G' + R)^{-1}
(y - G \hat{x})
\Sigma^F = \Sigma - \Sigma G' (G \Sigma G' + R)^{-1} G
\Sigma
Parameters
----------
y : scalar or array_like(float)
The current measurement
"""
# === simplify notation === #
G, H = self.ss.G, self.ss.H
R = np.dot(H, H.T)
# === and then update === #
y = np.atleast_2d(y)
y.shape = self.ss.k, 1
E = np.dot(self.Sigma, G.T)
F = np.dot(np.dot(G, self.Sigma), G.T) + R
M = np.dot(E, inv(F))
self.x_hat = self.x_hat + np.dot(M, (y - np.dot(G, self.x_hat)))
self.Sigma = self.Sigma - np.dot(M, np.dot(G, self.Sigma))
[docs] def filtered_to_forecast(self):
"""
Updates the moments of the time t filtering distribution to the
moments of the predictive distribution, which becomes the time
t+1 prior
"""
# === simplify notation === #
A, C = self.ss.A, self.ss.C
Q = np.dot(C, C.T)
# === and then update === #
self.x_hat = np.dot(A, self.x_hat)
self.Sigma = np.dot(A, np.dot(self.Sigma, A.T)) + Q
[docs] def update(self, y):
"""
Updates x_hat and Sigma given k x 1 ndarray y. The full
update, from one period to the next
Parameters
----------
y : np.ndarray
A k x 1 ndarray y representing the current measurement
"""
self.prior_to_filtered(y)
self.filtered_to_forecast()
[docs] def stationary_values(self, method='doubling'):
r"""
Computes the limit of :math:`\Sigma_t` as t goes to infinity by
solving the associated Riccati equation. The outputs are stored in the
attributes `K_infinity` and `Sigma_infinity`. Computation is via the
doubling algorithm (default) or a QZ decomposition method (see the
documentation in `matrix_eqn.solve_discrete_riccati`).
Parameters
----------
method : str, optional(default="doubling")
Solution method used in solving the associated Riccati
equation, str in {'doubling', 'qz'}.
Returns
-------
Sigma_infinity : array_like or scalar(float)
The infinite limit of :math:`\Sigma_t`
K_infinity : array_like or scalar(float)
The stationary Kalman gain.
"""
# === simplify notation === #
A, C, G, H = self.ss.A, self.ss.C, self.ss.G, self.ss.H
Q, R = np.dot(C, C.T), np.dot(H, H.T)
# === solve Riccati equation, obtain Kalman gain === #
Sigma_infinity = solve_discrete_riccati(A.T, G.T, Q, R, method=method)
temp1 = np.dot(np.dot(A, Sigma_infinity), G.T)
temp2 = inv(np.dot(G, np.dot(Sigma_infinity, G.T)) + R)
K_infinity = np.dot(temp1, temp2)
# == record as attributes and return == #
self._Sigma_infinity, self._K_infinity = Sigma_infinity, K_infinity
return Sigma_infinity, K_infinity
[docs] def stationary_coefficients(self, j, coeff_type='ma'):
"""
Wold representation moving average or VAR coefficients for the
steady state Kalman filter.
Parameters
----------
j : int
The lag length
coeff_type : string, either 'ma' or 'var' (default='ma')
The type of coefficent sequence to compute. Either 'ma' for
moving average or 'var' for VAR.
"""
# == simplify notation == #
A, G = self.ss.A, self.ss.G
K_infinity = self.K_infinity
# == compute and return coefficients == #
coeffs = []
i = 1
if coeff_type == 'ma':
coeffs.append(np.identity(self.ss.k))
P_mat = A
P = np.identity(self.ss.n) # Create a copy
elif coeff_type == 'var':
coeffs.append(np.dot(G, K_infinity))
P_mat = A - np.dot(K_infinity, G)
P = np.copy(P_mat) # Create a copy
else:
raise ValueError("Unknown coefficient type")
while i <= j:
coeffs.append(np.dot(np.dot(G, P), K_infinity))
P = np.dot(P, P_mat)
i += 1
return coeffs
[docs] def stationary_innovation_covar(self):
# == simplify notation == #
H, G = self.ss.H, self.ss.G
R = np.dot(H, H.T)
Sigma_infinity = self.Sigma_infinity
return np.dot(G, np.dot(Sigma_infinity, G.T)) + R