# Source code for quantecon.markov.core

r"""
This file contains some useful objects for handling a finite-state
discrete-time Markov chain.

Definitions and Some Basic Facts about Markov Chains
----------------------------------------------------

Let :math:\{X_t\} be a Markov chain represented by an :math:n \times
n stochastic matrix :math:P. State :math:i *has access* to state
:math:j, denoted :math:i \to j, if :math:i = j or :math:P^k[i, j]
> 0 for some :math:k = 1, 2, \ldots; :math:i and j *communicate*,
denoted :math:i \leftrightarrow j, if :math:i \to j and :math:j \to
i. The binary relation :math:\leftrightarrow is an equivalent
relation. A *communication class* of the Markov chain :math:\{X_t\},
or of the stochastic matrix :math:P, is an equivalent class of
:math:\leftrightarrow. Equivalently, a communication class is a
*strongly connected component* (SCC) in the associated *directed graph*
:math:\Gamma(P), a directed graph with :math:n nodes where there is
an edge from :math:i to :math:j if and only if :math:P[i, j] > 0.
The Markov chain, or the stochastic matrix, is *irreducible* if it
admits only one communication class, or equivalently, if
:math:\Gamma(P) is *strongly connected*.

A state :math:i is *recurrent* if :math:i \to j implies :math:j \to
i; it is *transient* if it is not recurrent. For any :math:i, j
contained in a communication class, :math:i is recurrent if and only
if :math:j is recurrent. Therefore, recurrence is a property of a
communication class. Thus, a communication class is a *recurrent class*
if it contains a recurrent state. Equivalently, a recurrent class is a
SCC that corresponds to a sink node in the *condensation* of the
directed graph :math:\Gamma(P), where the condensation of
:math:\Gamma(P) is a directed graph in which each SCC is replaced with
a single node and there is an edge from one SCC :math:C to another SCC
:math:C' if :math:C \neq C' and there is an edge from some node in
:math:C to some node in :math:C'. A recurrent class is also called a
*closed communication class*. The condensation is acyclic, so that there
exists at least one recurrent class.

For example, if the entries of :math:P are all strictly positive, then
the whole state space is a communication class as well as a recurrent
class. (More generally, if there is only one communication class, then
it is a recurrent class.) As another example, consider the stochastic
matrix :math:P = [[1, 0], [0,5, 0.5]]. This has two communication
classes, :math:\{0\} and :math:\{1\}, and :math:\{0\} is the only
recurrent class.

A *stationary distribution* of the Markov chain :math:\{X_t\}, or of
the stochastic matrix :math:P, is a nonnegative vector :math:x such
that :math:x' P = x' and :math:x' \mathbf{1} = 1, where
:math:\mathbf{1} is the vector of ones. The Markov chain has a unique
stationary distribution if and only if it has a unique recurrent class.
More generally, each recurrent class has a unique stationary
distribution whose support equals that recurrent class. The set of all
stationary distributions is given by the convex hull of these unique
stationary distributions for the recurrent classes.

A natural number :math:d is the *period* of state :math:i if it is
the greatest common divisor of all :math:k's such that :math:P^k[i,
i] > 0; equivalently, it is the GCD of the lengths of the cycles in
:math:\Gamma(P) passing through :math:i. For any :math:i, j
contained in a communication class, :math:i has period :math:d if
and only if :math:j has period :math:d. The *period* of an
irreducible Markov chain (or of an irreducible stochastic matrix) is the
period of any state. We define the period of a general (not necessarily
irreducible) Markov chain to be the least common multiple of the periods
of its recurrent classes, where the period of a recurrent class is the
period of any state in that class. A Markov chain is *aperiodic* if its
period is one. A Markov chain is irreducible and aperiodic if and only
if it is *uniformly ergodic*, i.e., there exists some :math:m such
that :math:P^m[i, j] > 0 for all :math:i, j (in this case, :math:P
is also called *primitive*).

Suppose that an irreducible Markov chain has period :math:d. Fix any
state, say state :math:0. For each :math:m = 0, \ldots, d-1, let
:math:S_m be the set of states :math:i such that :math:P^{kd+m}[0,
i] > 0 for some :math:k. These sets :math:S_0, \ldots, S_{d-1}
constitute a partition of the state space and are called the *cyclic
classes*. For each :math:S_m and each :math:i \in S_m, we have
:math:\sum_{j \in S_{m+1}} P[i, j] = 1, where :math:S_d = S_0.

"""
import numbers
from fractions import gcd
import numpy as np
from scipy import sparse
from numba import jit

from .gth_solve import gth_solve
from ..graph_tools import DiGraph
from ..util import searchsorted, check_random_state

[docs]class MarkovChain:
"""
Class for a finite-state discrete-time Markov chain. It stores
useful information such as the stationary distributions, and
communication, recurrent, and cyclic classes, and allows simulation
of state transitions.

Parameters
----------
P : array_like or scipy sparse matrix (float, ndim=2)
The transition matrix.  Must be of shape n x n.

state_values : array_like(default=None)
Array_like of length n containing the values associated with the
states, which must be homogeneous in type. If None, the values
default to integers 0 through n-1.

Attributes
----------
P : ndarray or scipy.sparse.csr_matrix (float, ndim=2)
See Parameters

stationary_distributions : array_like(float, ndim=2)
Array containing stationary distributions, one for each
recurrent class, as rows.

is_irreducible : bool
Indicate whether the Markov chain is irreducible.

num_communication_classes : int
The number of the communication classes.

communication_classes_indices : list(ndarray(int))
List of numpy arrays containing the indices of the communication
classes.

communication_classes : list(ndarray)
List of numpy arrays containing the communication classes, where
the states are annotated with their values (if state_values is
not None).

num_recurrent_classes : int
The number of the recurrent classes.

recurrent_classes_indices : list(ndarray(int))
List of numpy arrays containing the indices of the recurrent
classes.

recurrent_classes : list(ndarray)
List of numpy arrays containing the recurrent classes, where the
states are annotated with their values (if state_values is not
None).

is_aperiodic : bool
Indicate whether the Markov chain is aperiodic.

period : int
The period of the Markov chain.

cyclic_classes_indices : list(ndarray(int))
List of numpy arrays containing the indices of the cyclic
classes. Defined only when the Markov chain is irreducible.

cyclic_classes : list(ndarray)
List of numpy arrays containing the cyclic classes, where the
states are annotated with their values (if state_values is not
None). Defined only when the Markov chain is irreducible.

Notes
-----
In computing stationary distributions, if the input matrix is a
sparse matrix, internally it is converted to a dense matrix.

"""

def __init__(self, P, state_values=None):
if sparse.issparse(P):  # Sparse matrix
self.P = sparse.csr_matrix(P)
self.is_sparse = True
else:  # Dense matrix
self.P = np.asarray(P)
self.is_sparse = False

# Check Properties
# Double check that P is a square matrix
if len(self.P.shape) != 2 or self.P.shape != self.P.shape:
raise ValueError('P must be a square matrix')

# The number of states
self.n = self.P.shape

# Double check that P is a nonnegative matrix
if not self.is_sparse:
data_nonnegative = (self.P >= 0)  # ndarray
else:
data_nonnegative = (self.P.data >= 0)  # csr_matrx
if not np.all(data_nonnegative):
raise ValueError('P must be nonnegative')

# Double check that the rows of P sum to one
row_sums = self.P.sum(axis=1)
if self.is_sparse:  # row_sums is np.matrix (ndim=2)
row_sums = row_sums.getA1()
if not np.allclose(row_sums, np.ones(self.n)):
raise ValueError('The rows of P must sum to 1')

# Call the setter method
self.state_values = state_values

# To analyze the structure of P as a directed graph
self._digraph = None

self._stationary_dists = None
self._cdfs = None  # For dense matrix
self._cdfs1d = None  # For sparse matrix

def __repr__(self):
msg = "Markov chain with transition matrix \nP = \n{0}"

if self._stationary_dists is None:
return msg.format(self.P)
else:
msg = msg + "\nand stationary distributions \n{1}"
return msg.format(self.P, self._stationary_dists)

def __str__(self):
return str(self.__repr__)

@property
def state_values(self):
return self._state_values

@state_values.setter
def state_values(self, values):
if values is None:
self._state_values = None
else:
values = np.asarray(values)
if (values.ndim < 1) or (values.shape != self.n):
raise ValueError(
'state_values must be an array_like of length n'
)
if np.issubdtype(values.dtype, np.object_):
raise ValueError(
'data in state_values must be homogeneous in type'
)
self._state_values = values

[docs]    def get_index(self, value):
"""
Return the index (or indices) of the given value (or values) in
state_values.

Parameters
----------
value
Value(s) to get the index (indices) for.

Returns
-------
idx : int or ndarray(int)
Index of value if value is a single state value; array
of indices if value is an array_like of state values.

"""
if self.state_values is None:
state_values_ndim = 1
else:
state_values_ndim = self.state_values.ndim

values = np.asarray(value)

if values.ndim <= state_values_ndim - 1:
return self._get_index(value)
elif values.ndim == state_values_ndim:  # array of values
k = values.shape
idx = np.empty(k, dtype=int)
for i in range(k):
idx[i] = self._get_index(values[i])
return idx
else:
raise ValueError('invalid value')

def _get_index(self, value):
"""
Return the index of the given value in state_values.

Parameters
----------
value
Value to get the index for.

Returns
-------
idx : int
Index of value.

"""

if self.state_values is None:
if isinstance(value, numbers.Integral) and (0 <= value < self.n):
return value
else:
raise ValueError(error_msg)

# if self.state_values is not None:
if self.state_values.ndim == 1:
try:
idx = np.where(self.state_values == value)
return idx
except IndexError:
raise ValueError(error_msg)
else:
idx = 0
while idx < self.n:
if np.array_equal(self.state_values[idx], value):
return idx
idx += 1
raise ValueError(error_msg)

@property
def digraph(self):
if self._digraph is None:
self._digraph = DiGraph(self.P, node_labels=self.state_values)
return self._digraph

@property
def is_irreducible(self):
return self.digraph.is_strongly_connected

@property
def num_communication_classes(self):
return self.digraph.num_strongly_connected_components

@property
def communication_classes_indices(self):
return self.digraph.strongly_connected_components_indices

@property
def communication_classes(self):
return self.digraph.strongly_connected_components

@property
def num_recurrent_classes(self):
return self.digraph.num_sink_strongly_connected_components

@property
def recurrent_classes_indices(self):
return self.digraph.sink_strongly_connected_components_indices

@property
def recurrent_classes(self):
return self.digraph.sink_strongly_connected_components

@property
def is_aperiodic(self):
if self.is_irreducible:
return self.digraph.is_aperiodic
else:
return self.period == 1

@property
def period(self):
if self.is_irreducible:
return self.digraph.period
else:
rec_classes = self.recurrent_classes

# Determine the period, the LCM of the periods of rec_classes
d = 1
for rec_class in rec_classes:
period = self.digraph.subgraph(rec_class).period
d = (d * period) // gcd(d, period)

return d

@property
def cyclic_classes(self):
if not self.is_irreducible:
raise NotImplementedError(
'Not defined for a reducible Markov chain'
)
else:
return self.digraph.cyclic_components

@property
def cyclic_classes_indices(self):
if not self.is_irreducible:
raise NotImplementedError(
'Not defined for a reducible Markov chain'
)
else:
return self.digraph.cyclic_components_indices

def _compute_stationary(self):
"""
Store the stationary distributions in self._stationary_distributions.

"""
if self.is_irreducible:
if not self.is_sparse:  # Dense
stationary_dists = gth_solve(self.P).reshape(1, self.n)
else:  # Sparse
stationary_dists = \
gth_solve(self.P.toarray(),
overwrite=True).reshape(1, self.n)
else:
rec_classes = self.recurrent_classes_indices
stationary_dists = np.zeros((len(rec_classes), self.n))
for i, rec_class in enumerate(rec_classes):
P_rec_class = self.P[np.ix_(rec_class, rec_class)]
if self.is_sparse:
P_rec_class = P_rec_class.toarray()
stationary_dists[i, rec_class] = \
gth_solve(P_rec_class, overwrite=True)

self._stationary_dists = stationary_dists

@property
def stationary_distributions(self):
if self._stationary_dists is None:
self._compute_stationary()
return self._stationary_dists

@property
def cdfs(self):
if (self._cdfs is None) and not self.is_sparse:
# See issue #137#issuecomment-96128186
cdfs = np.empty((self.n, self.n), order='C')
np.cumsum(self.P, axis=-1, out=cdfs)
self._cdfs = cdfs
return self._cdfs

@property
def cdfs1d(self):
if (self._cdfs1d is None) and self.is_sparse:
data = self.P.data
indices = self.P.indices
indptr = self.P.indptr

cdfs1d = np.empty(self.P.nnz, order='C')
for i in range(self.n):
cdfs1d[indptr[i]:indptr[i+1]] = \
data[indptr[i]:indptr[i+1]].cumsum()
self._cdfs1d = cdfs1d
return self._cdfs1d

[docs]    def simulate_indices(self, ts_length, init=None, num_reps=None,
random_state=None):
"""
Simulate time series of state transitions, where state indices
are returned.

Parameters
----------
ts_length : scalar(int)
Length of each simulation.

init : int or array_like(int, ndim=1), optional
Initial state(s). If None, the initial state is randomly
drawn.

num_reps : scalar(int), optional(default=None)
Number of repetitions of simulation.

random_state : int or np.random.RandomState, optional
Random seed (integer) or np.random.RandomState instance to
set the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState
is used.

Returns
-------
X : ndarray(ndim=1 or 2)
Array containing the state values of the sample path(s). See
the simulate method for more information.

"""
random_state = check_random_state(random_state)
dim = 1  # Dimension of the returned array: 1 or 2

msg_out_of_range = 'index {init} is out of the state space'

try:
k = len(init)  # init is an array
dim = 2
init_states = np.asarray(init, dtype=int)
# Check init_states are in the state space
if (init_states >= self.n).any() or (init_states < -self.n).any():
idx = np.where(
(init_states >= self.n) + (init_states < -self.n)
)
raise ValueError(msg_out_of_range.format(init=idx))
if num_reps is not None:
k *= num_reps
init_states = np.tile(init_states, num_reps)
except TypeError:  # init is a scalar(int) or None
k = 1
if num_reps is not None:
dim = 2
k = num_reps
if init is None:
init_states = random_state.randint(self.n, size=k)
elif isinstance(init, numbers.Integral):
# Check init is in the state space
if init >= self.n or init < -self.n:
raise ValueError(msg_out_of_range.format(init=init))
init_states = np.ones(k, dtype=int) * init
else:
raise ValueError(
'init must be int, array_like of ints, or None'
)

# === set up array to store output === #
X = np.empty((k, ts_length), dtype=int)

# Random values, uniformly sampled from [0, 1)
random_values = random_state.random_sample(size=(k, ts_length-1))

# Generate sample paths and store in X
if not self.is_sparse:  # Dense
_generate_sample_paths(
self.cdfs, init_states, random_values, out=X
)
else:  # Sparse
_generate_sample_paths_sparse(
self.cdfs1d, self.P.indices, self.P.indptr, init_states,
random_values, out=X
)

if dim == 1:
return X
else:
return X

[docs]    def simulate(self, ts_length, init=None, num_reps=None, random_state=None):
"""
Simulate time series of state transitions, where the states are
annotated with their values (if state_values is not None).

Parameters
----------
ts_length : scalar(int)
Length of each simulation.

init : scalar or array_like, optional(default=None)
Initial state values(s). If None, the initial state is
randomly drawn.

num_reps : scalar(int), optional(default=None)
Number of repetitions of simulation.

random_state : int or np.random.RandomState, optional
Random seed (integer) or np.random.RandomState instance to
set the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState
is used.

Returns
-------
X : ndarray(ndim=1 or 2)
Array containing the sample path(s), of shape (ts_length,)
if init is a scalar (integer) or None and num_reps is None;
of shape (k, ts_length) otherwise, where k = len(init) if
(init, num_reps) = (array, None), k = num_reps if (init,
num_reps) = (int or None, int), and k = len(init)*num_reps
if (init, num_reps) = (array, int).

"""
if init is not None:
init_idx = self.get_index(init)
else:
init_idx = None
X = self.simulate_indices(ts_length, init=init_idx, num_reps=num_reps,
random_state=random_state)

# Annotate states
if self.state_values is not None:
X = self.state_values[X]

return X

@jit(nopython=True)
def _generate_sample_paths(P_cdfs, init_states, random_values, out):
"""
Generate num_reps sample paths of length ts_length, where num_reps =
out.shape and ts_length = out.shape.

Parameters
----------
P_cdfs : ndarray(float, ndim=2)
Array containing as rows the CDFs of the state transition.

init_states : array_like(int, ndim=1)
Array containing the initial states. Its length must be equal to
num_reps.

random_values : ndarray(float, ndim=2)
Array containing random values from [0, 1). Its shape must be
equal to (num_reps, ts_length-1)

out : ndarray(int, ndim=2)
Array to store the sample paths.

Notes
-----
This routine is jit-complied by Numba.

"""
num_reps, ts_length = out.shape

for i in range(num_reps):
out[i, 0] = init_states[i]
for t in range(ts_length-1):
out[i, t+1] = searchsorted(P_cdfs[out[i, t]], random_values[i, t])

@jit(nopython=True)
def _generate_sample_paths_sparse(P_cdfs1d, indices, indptr, init_states,
random_values, out):
"""
For sparse matrix.

Generate num_reps sample paths of length ts_length, where num_reps =
out.shape and ts_length = out.shape.

Parameters
----------
P_cdfs1d : ndarray(float, ndim=1)
1D array containing the CDFs of the state transition.

indices : ndarray(int, ndim=1)
CSR format index array.

indptr : ndarray(int, ndim=1)
CSR format index pointer array.

init_states : array_like(int, ndim=1)
Array containing the initial states. Its length must be equal to
num_reps.

random_values : ndarray(float, ndim=2)
Array containing random values from [0, 1). Its shape must be
equal to (num_reps, ts_length-1)

out : ndarray(int, ndim=2)
Array to store the sample paths.

Notes
-----
This routine is jit-complied by Numba.

"""
num_reps, ts_length = out.shape

for i in range(num_reps):
out[i, 0] = init_states[i]
for t in range(ts_length-1):
k = searchsorted(P_cdfs1d[indptr[out[i, t]]:indptr[out[i, t]+1]],
random_values[i, t])
out[i, t+1] = indices[indptr[out[i, t]]+k]

[docs]def mc_compute_stationary(P):
"""
Computes stationary distributions of P, one for each recurrent
class. Any stationary distribution is written as a convex
combination of these distributions.

Returns
-------
stationary_dists : array_like(float, ndim=2)
Array containing the stationary distributions as its rows.

"""
return MarkovChain(P).stationary_distributions

[docs]def mc_sample_path(P, init=0, sample_size=1000, random_state=None):
"""
Generates one sample path from the Markov chain represented by
(n x n) transition matrix P on state space S = {{0,...,n-1}}.

Parameters
----------
P : array_like(float, ndim=2)
A Markov transition matrix.

init : array_like(float ndim=1) or scalar(int), optional(default=0)
If init is an array_like, then it is treated as the initial
distribution across states.  If init is a scalar, then it
treated as the deterministic initial state.

sample_size : scalar(int), optional(default=1000)
The length of the sample path.

random_state : int or np.random.RandomState, optional
Random seed (integer) or np.random.RandomState instance to set
the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState is
used.

Returns
-------
X : array_like(int, ndim=1)
The simulation of states.

"""
random_state = check_random_state(random_state)

if isinstance(init, numbers.Integral):
X_0 = init
else:
cdf0 = np.cumsum(init)
u_0 = random_state.random_sample()
X_0 = searchsorted(cdf0, u_0)

mc = MarkovChain(P)
return mc.simulate(ts_length=sample_size, init=X_0,
random_state=random_state)