# Source code for quantecon.lss

"""
Computes quantities associated with the Gaussian linear state space model.

References
----------

https://lectures.quantecon.org/py/linear_models.html

"""

from textwrap import dedent
import numpy as np
from scipy.linalg import solve
from .matrix_eqn import solve_discrete_lyapunov
from numba import jit
from .util import check_random_state

[docs]@jit
def simulate_linear_model(A, x0, v, ts_length):
r"""
This is a separate function for simulating a vector linear system of
the form

.. math::

x_{t+1} = A x_t + v_t

given :math:x_0 = x0

Here :math:x_t and :math:v_t are both n x 1 and :math:A is n x n.

The purpose of separating this functionality out is to target it for
optimization by Numba.  For the same reason, matrix multiplication is
broken down into for loops.

Parameters
----------
A : array_like or scalar(float)
Should be n x n
x0 : array_like
Should be n x 1.  Initial condition
v : np.ndarray
Should be n x ts_length-1.  Its t-th column is used as the time t
shock :math:v_t
ts_length : int
The length of the time series

Returns
--------
x : np.ndarray
Time series with ts_length columns, the t-th column being :math:x_t
"""
A = np.asarray(A)
n = A.shape
x = np.empty((n, ts_length))
x[:, 0] = x0
for t in range(ts_length-1):
# x[:, t+1] = A.dot(x[:, t]) + v[:, t]
for i in range(n):
x[i, t+1] = v[i, t]                   # Shock
for j in range(n):
x[i, t+1] += A[i, j] * x[j, t]   # Dot Product
return x

[docs]class LinearStateSpace:
r"""
A class that describes a Gaussian linear state space model of the
form:

.. math::

x_{t+1} = A x_t + C w_{t+1}

y_t = G x_t + H v_t

where :math:{w_t} and :math:{v_t} are independent and standard normal
with dimensions k and l respectively.  The initial conditions are
:math:\mu_0 and :math:\Sigma_0 for :math:x_0 \sim N(\mu_0, \Sigma_0).
When :math:\Sigma_0=0, the draw of :math:x_0 is exactly :math:\mu_0.

Parameters
----------
A : array_like or scalar(float)
Part of the state transition equation.  It should be n x n
C : array_like or scalar(float)
Part of the state transition equation.  It should be n x m
G : array_like or scalar(float)
Part of the observation equation.  It should be k x n
H : array_like or scalar(float), optional(default=None)
Part of the observation equation.  It should be k x l
mu_0 : array_like or scalar(float), optional(default=None)
This is the mean of initial draw and is n x 1
Sigma_0 : array_like or scalar(float), optional(default=None)
This is the variance of the initial draw and is n x n and
also should be positive definite and symmetric

Attributes
----------
A, C, G, H, mu_0, Sigma_0 : see Parameters
n, k, m, l : scalar(int)
The dimensions of x_t, y_t, w_t and v_t respectively

"""

def __init__(self, A, C, G, H=None, mu_0=None, Sigma_0=None):
self.A, self.G, self.C = list(map(self.convert, (A, G, C)))
# = Check Input Shapes = #
ni, nj = self.A.shape
if ni != nj:
raise ValueError(
"Matrix A (shape: %s) needs to be square" % (self.A.shape, ))
if ni != self.C.shape:
raise ValueError(
"Matrix C (shape: %s) does not have compatible dimensions "
"with A. It should be shape: %s" % (self.C.shape, (ni, 1)))
self.m = self.C.shape
self.k, self.n = self.G.shape
if self.n != ni:
raise ValueError("Matrix G (shape: %s) does not have compatible"
"dimensions with A (%s)" % (self.G.shape,
self.A.shape))
if H is None:
self.H = None
self.l = None
else:
self.H = self.convert(H)
self.l = self.H.shape
if mu_0 is None:
self.mu_0 = np.zeros((self.n, 1))
else:
self.mu_0 = self.convert(mu_0)
self.mu_0.shape = self.n, 1
if Sigma_0 is None:
self.Sigma_0 = np.zeros((self.n, self.n))
else:
self.Sigma_0 = self.convert(Sigma_0)

def __repr__(self):
return self.__str__()

def __str__(self):
m = """\
Linear Gaussian state space model:
- dimension of state space          : {n}
- number of innovations             : {m}
- dimension of observation equation : {k}
"""
return dedent(m.format(n=self.n, k=self.k, m=self.m))

[docs]    def convert(self, x):
"""
Convert array_like objects (lists of lists, floats, etc.) into
well formed 2D NumPy arrays

"""
return np.atleast_2d(np.asarray(x, dtype='float'))

[docs]    def simulate(self, ts_length=100, random_state=None):
r"""
Simulate a time series of length ts_length, first drawing

.. math::

x_0 \sim N(\mu_0, \Sigma_0)

Parameters
----------
ts_length : scalar(int), optional(default=100)
The length of the simulation
random_state : int or np.random.RandomState, optional
Random seed (integer) or np.random.RandomState instance to set
the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState is
used.

Returns
-------
x : array_like(float)
An n x ts_length array, where the t-th column is :math:x_t
y : array_like(float)
A k x ts_length array, where the t-th column is :math:y_t

"""
random_state = check_random_state(random_state)

x0 = random_state.multivariate_normal(self.mu_0.flatten(),
self.Sigma_0)
w = random_state.randn(self.m, ts_length-1)
v = self.C.dot(w)  # Multiply each w_t by C to get v_t = C w_t
# == simulate time series == #
x = simulate_linear_model(self.A, x0, v, ts_length)

if self.H is not None:
v = random_state.randn(self.l, ts_length)
y = self.G.dot(x) + self.H.dot(v)
else:
y = self.G.dot(x)

return x, y

[docs]    def replicate(self, T=10, num_reps=100, random_state=None):
r"""
Simulate num_reps observations of :math:x_T and :math:y_T given
:math:x_0 \sim N(\mu_0, \Sigma_0).

Parameters
----------
T : scalar(int), optional(default=10)
The period that we want to replicate values for
num_reps : scalar(int), optional(default=100)
The number of replications that we want
random_state : int or np.random.RandomState, optional
Random seed (integer) or np.random.RandomState instance to set
the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState is
used.

Returns
-------
x : array_like(float)
An n x num_reps array, where the j-th column is the j_th
observation of :math:x_T

y : array_like(float)
A k x num_reps array, where the j-th column is the j_th
observation of :math:y_T

"""
random_state = check_random_state(random_state)

x = np.empty((self.n, num_reps))
for j in range(num_reps):
x_T, _ = self.simulate(ts_length=T+1, random_state=random_state)
x[:, j] = x_T[:, -1]
if self.H is not None:
v = random_state.randn(self.l, num_reps)
y = self.G.dot(x) + self.H.dot(v)
else:
y = self.G.dot(x)

return x, y

[docs]    def moment_sequence(self):
r"""
Create a generator to calculate the population mean and
variance-covariance matrix for both :math:x_t and :math:y_t
starting at the initial condition (self.mu_0, self.Sigma_0).
Each iteration produces a 4-tuple of items (mu_x, mu_y, Sigma_x,
Sigma_y) for the next period.

Yields
------
mu_x : array_like(float)
An n x 1 array representing the population mean of x_t
mu_y : array_like(float)
A  k x 1 array representing the population mean of y_t
Sigma_x : array_like(float)
An n x n array representing the variance-covariance matrix
of x_t
Sigma_y : array_like(float)
A k x k array representing the variance-covariance matrix
of y_t

"""
# == Simplify names == #
A, C, G, H = self.A, self.C, self.G, self.H
# == Initial moments == #
mu_x, Sigma_x = self.mu_0, self.Sigma_0

while 1:
mu_y = G.dot(mu_x)
if H is None:
Sigma_y = G.dot(Sigma_x).dot(G.T)
else:
Sigma_y = G.dot(Sigma_x).dot(G.T) + H.dot(H.T)

yield mu_x, mu_y, Sigma_x, Sigma_y

# == Update moments of x == #
mu_x = A.dot(mu_x)
Sigma_x = A.dot(Sigma_x).dot(A.T) + C.dot(C.T)

[docs]    def stationary_distributions(self):
r"""
Compute the moments of the stationary distributions of :math:x_t and
:math:y_t if possible. Computation is by solving the discrete
Lyapunov equation.

Returns
-------
mu_x : array_like(float)
An n x 1 array representing the stationary mean of :math:x_t
mu_y : array_like(float)
An k x 1 array representing the stationary mean of :math:y_t
Sigma_x : array_like(float)
An n x n array representing the stationary var-cov matrix
of :math:x_t
Sigma_y : array_like(float)
An k x k array representing the stationary var-cov matrix
of :math:y_t
Sigma_yx : array_like(float)
An k x n array representing the stationary cov matrix
between :math:y_t and :math:x_t.

"""
self.__partition()
num_const, sorted_idx = self.num_const, self.sorted_idx
A21, A22 = self.A21, self.A22
CC2 = self.C2 @ self.C2.T
n = self.n

if num_const > 0:
μ = solve(np.eye(n-num_const) - A22, A21)
else:
μ = solve(np.eye(n-num_const) - A22, np.zeros((n, 1)))
Σ = solve_discrete_lyapunov(A22, CC2, method='bartels-stewart')

mu_x = np.empty((n, 1))
mu_x[:num_const] = self.mu_0[sorted_idx[:num_const]]
mu_x[num_const:] = μ

Sigma_x = np.zeros((n, n))
Sigma_x[num_const:, num_const:] = Σ

mu_x = self.P.T @ mu_x
Sigma_x = self.P.T @ Sigma_x @ self.P

mu_y = self.G @ mu_x
Sigma_y = self.G @ Sigma_x @ self.G.T
if self.H is not None:
Sigma_y += self.H @ self.H.T
Sigma_yx = self.G @ Sigma_x

self.mu_x, self.mu_y = mu_x, mu_y
self.Sigma_x, self.Sigma_y, self.Sigma_yx = Sigma_x, Sigma_y, Sigma_yx

return mu_x, mu_y, Sigma_x, Sigma_y, Sigma_yx

[docs]    def geometric_sums(self, beta, x_t):
r"""
Forecast the geometric sums

.. math::

S_x := E \Big[ \sum_{j=0}^{\infty} \beta^j x_{t+j} | x_t \Big]

S_y := E \Big[ \sum_{j=0}^{\infty} \beta^j y_{t+j} | x_t \Big]

Parameters
----------
beta : scalar(float)
Discount factor, in [0, 1)

beta : array_like(float)
The term x_t for conditioning

Returns
-------
S_x : array_like(float)
Geometric sum as defined above

S_y : array_like(float)
Geometric sum as defined above

"""

I = np.identity(self.n)
S_x = solve(I - beta * self.A, x_t)
S_y = self.G.dot(S_x)

return S_x, S_y

[docs]    def impulse_response(self, j=5):
r"""
Pulls off the imuplse response coefficients to a shock
in :math:w_{t} for :math:x and :math:y

Important to note: We are uninterested in the shocks to
v for this method

* :math:x coefficients are :math:C, AC, A^2 C...
* :math:y coefficients are :math:GC, GAC, GA^2C...

Parameters
----------
j : Scalar(int)
Number of coefficients that we want

Returns
-------
xcoef : list(array_like(float, 2))
The coefficients for x
ycoef : list(array_like(float, 2))
The coefficients for y
"""
# Pull out matrices
A, C, G, H = self.A, self.C, self.G, self.H
Apower = np.copy(A)

# Create room for coefficients
xcoef = [C]
ycoef = [np.dot(G, C)]

for i in range(j):
xcoef.append(np.dot(Apower, C))
ycoef.append(np.dot(G, np.dot(Apower, C)))
Apower = np.dot(Apower, A)

return xcoef, ycoef

def __partition(self):
r"""
Reorder the states by shifting the constant terms to the
top of the state vector. Then partition the linear state
space model following Appendix C in RMT Ch2 such that the
A22 matrix associated with non-constant states have eigenvalues
all strictly smaller than 1.

.. math::
\left[\begin{array}{c}
const\\
x_{2,t+1}
\end{array}\right]=\left[\begin{array}{cc}
I & 0\\
A_{21} & A_{22}
\end{array}\right]\left[\begin{array}{c}
1\\
x_{2,t}
\end{array}\right]+\left[\begin{array}{c}
0\\
C_{2}
\end{array}\right]w_{t+1}

Returns
-------
A22 : array_like(float)
Lower right part of the reordered matrix A.
A21 : array_like(float)
Lower left part of the reordered matrix A.
"""
A, C = self.A, self.C
n = self.n

sorted_idx = []
A_diag = np.diag(A)
num_const = 0
for idx in range(n):
if (A_diag[idx] == 1) and (C[idx, :] == 0).all() and \
np.linalg.norm(A[idx, :]) == 1:
sorted_idx.insert(0, idx)
num_const += 1
else:
sorted_idx.append(idx)
self.num_const = num_const
self.sorted_idx = sorted_idx

P = np.zeros((n, n))
P[range(n), sorted_idx] = 1

sorted_A = P @ A @ P.T
sorted_C = P @ C
A21 = sorted_A[num_const:, :num_const]
A22 = sorted_A[num_const:, num_const:]
C2 = sorted_C[num_const:, :]

self.P, self.A21, self.A22, self.C2 = P, A21, A22, C2

return A21, A22