"""
Tools for dealing with a directed graph.
"""
import numpy as np
from scipy import sparse
from scipy.sparse import csgraph
from math import gcd
from numba import jit
from .util import check_random_state
# Decorator for *_components properties
[docs]def annotate_nodes(func):
def new_func(self):
list_of_components = func(self)
if self.node_labels is not None:
return [self.node_labels[c] for c in list_of_components]
return list_of_components
return new_func
[docs]class DiGraph:
r"""
Class for a directed graph. It stores useful information about the
graph structure such as strong connectivity [1]_ and periodicity
[2]_.
Parameters
----------
adj_matrix : array_like(ndim=2)
Adjacency matrix representing a directed graph. Must be of shape
n x n.
weighted : bool, optional(default=False)
Whether to treat `adj_matrix` as a weighted adjacency matrix.
node_labels : array_like(default=None)
Array_like of length n containing the labels associated with the
nodes, which must be homogeneous in type. If None, the labels
default to integers 0 through n-1.
Attributes
----------
csgraph : scipy.sparse.csr_matrix
Compressed sparse representation of the digraph.
is_strongly_connected : bool
Indicate whether the digraph is strongly connected.
num_strongly_connected_components : int
The number of the strongly connected components.
strongly_connected_components_indices : list(ndarray(int))
List of numpy arrays containing the indices of the strongly
connected components.
strongly_connected_components : list(ndarray)
List of numpy arrays containing the strongly connected
components, where the nodes are annotated with their labels (if
`node_labels` is not None).
num_sink_strongly_connected_components : int
The number of the sink strongly connected components.
sink_strongly_connected_components_indices : list(ndarray(int))
List of numpy arrays containing the indices of the sink strongly
connected components.
sink_strongly_connected_components : list(ndarray)
List of numpy arrays containing the sink strongly connected
components, where the nodes are annotated with their labels (if
`node_labels` is not None).
is_aperiodic : bool
Indicate whether the digraph is aperiodic.
period : int
The period of the digraph. Defined only for a strongly connected
digraph.
cyclic_components_indices : list(ndarray(int))
List of numpy arrays containing the indices of the cyclic
components.
cyclic_components : list(ndarray)
List of numpy arrays containing the cyclic components, where the
nodes are annotated with their labels (if `node_labels` is not
None).
References
----------
.. [1] `Strongly connected component
<http://en.wikipedia.org/wiki/Strongly_connected_component>`_,
Wikipedia.
.. [2] `Aperiodic graph
<http://en.wikipedia.org/wiki/Aperiodic_graph>`_, Wikipedia.
"""
def __init__(self, adj_matrix, weighted=False, node_labels=None):
if weighted:
dtype = None
else:
dtype = bool
self.csgraph = sparse.csr_matrix(adj_matrix, dtype=dtype)
m, n = self.csgraph.shape
if n != m:
raise ValueError('input matrix must be square')
self.n = n # Number of nodes
# Call the setter method
self.node_labels = node_labels
self._num_scc = None
self._scc_proj = None
self._sink_scc_labels = None
self._period = None
def __repr__(self):
return self.__str__()
def __str__(self):
return "Directed Graph:\n - n(number of nodes): {n}".format(n=self.n)
@property
def node_labels(self):
return self._node_labels
@node_labels.setter
def node_labels(self, values):
if values is None:
self._node_labels = None
else:
values = np.asarray(values)
if (values.ndim < 1) or (values.shape[0] != self.n):
raise ValueError(
'node_labels must be an array_like of length n'
)
if np.issubdtype(values.dtype, np.object_):
raise ValueError(
'data in node_labels must be homogeneous in type'
)
self._node_labels = values
def _find_scc(self):
"""
Set ``self._num_scc`` and ``self._scc_proj``
by calling ``scipy.sparse.csgraph.connected_components``:
* docs.scipy.org/doc/scipy/reference/sparse.csgraph.html
* github.com/scipy/scipy/blob/master/scipy/sparse/csgraph/_traversal.pyx
``self._scc_proj`` is a list of length `n` that assigns to each node
the label of the strongly connected component to which it belongs.
"""
# Find the strongly connected components
self._num_scc, self._scc_proj = \
csgraph.connected_components(self.csgraph, connection='strong')
@property
def num_strongly_connected_components(self):
if self._num_scc is None:
self._find_scc()
return self._num_scc
@property
def scc_proj(self):
if self._scc_proj is None:
self._find_scc()
return self._scc_proj
@property
def is_strongly_connected(self):
return (self.num_strongly_connected_components == 1)
def _condensation_lil(self):
"""
Return the sparse matrix representation of the condensation digraph
in lil format.
"""
condensation_lil = sparse.lil_matrix(
(self.num_strongly_connected_components,
self.num_strongly_connected_components), dtype=bool
)
scc_proj = self.scc_proj
for node_from, node_to in _csr_matrix_indices(self.csgraph):
scc_from, scc_to = scc_proj[node_from], scc_proj[node_to]
if scc_from != scc_to:
condensation_lil[scc_from, scc_to] = True
return condensation_lil
def _find_sink_scc(self):
"""
Set self._sink_scc_labels, which is a list containing the labels of
the strongly connected components.
"""
condensation_lil = self._condensation_lil()
# A sink SCC is a SCC such that none of its members is strongly
# connected to nodes in other SCCs
# Those k's such that graph_condensed_lil.rows[k] == []
self._sink_scc_labels = \
np.where(np.logical_not(condensation_lil.rows))[0]
@property
def sink_scc_labels(self):
if self._sink_scc_labels is None:
self._find_sink_scc()
return self._sink_scc_labels
@property
def num_sink_strongly_connected_components(self):
return len(self.sink_scc_labels)
@property
def strongly_connected_components_indices(self):
if self.is_strongly_connected:
return [np.arange(self.n)]
else:
return [np.where(self.scc_proj == k)[0]
for k in range(self.num_strongly_connected_components)]
@property
@annotate_nodes
def strongly_connected_components(self):
return self.strongly_connected_components_indices
@property
def sink_strongly_connected_components_indices(self):
if self.is_strongly_connected:
return [np.arange(self.n)]
else:
return [np.where(self.scc_proj == k)[0]
for k in self.sink_scc_labels.tolist()]
@property
@annotate_nodes
def sink_strongly_connected_components(self):
return self.sink_strongly_connected_components_indices
def _compute_period(self):
"""
Set ``self._period`` and ``self._cyclic_components_proj``.
Use the algorithm described in:
J. P. Jarvis and D. R. Shier,
"Graph-Theoretic Analysis of Finite Markov Chains," 1996.
"""
# Degenerate graph with a single node (which is strongly connected)
# csgraph.reconstruct_path would raise an exception
# github.com/scipy/scipy/issues/4018
if self.n == 1:
if self.csgraph[0, 0] == 0: # No edge: "trivial graph"
self._period = 1 # Any universally accepted definition?
self._cyclic_components_proj = np.zeros(self.n, dtype=int)
return None
else: # Self loop
self._period = 1
self._cyclic_components_proj = np.zeros(self.n, dtype=int)
return None
if not self.is_strongly_connected:
raise NotImplementedError(
'Not defined for a non strongly-connected digraph'
)
if np.any(self.csgraph.diagonal() > 0):
self._period = 1
self._cyclic_components_proj = np.zeros(self.n, dtype=int)
return None
# Construct a breadth-first search tree rooted at 0
node_order, predecessors = \
csgraph.breadth_first_order(self.csgraph, i_start=0)
bfs_tree_csr = \
csgraph.reconstruct_path(self.csgraph, predecessors)
# Edges not belonging to tree_csr
non_bfs_tree_csr = self.csgraph - bfs_tree_csr
non_bfs_tree_csr.eliminate_zeros()
# Distance to 0
level = np.zeros(self.n, dtype=int)
for i in range(1, self.n):
level[node_order[i]] = level[predecessors[node_order[i]]] + 1
# Determine the period
d = 0
for node_from, node_to in _csr_matrix_indices(non_bfs_tree_csr):
value = level[node_from] - level[node_to] + 1
d = gcd(d, value)
if d == 1:
self._period = 1
self._cyclic_components_proj = np.zeros(self.n, dtype=int)
return None
self._period = d
self._cyclic_components_proj = level % d
@property
def period(self):
if self._period is None:
self._compute_period()
return self._period
@property
def is_aperiodic(self):
return (self.period == 1)
@property
def cyclic_components_indices(self):
if self.is_aperiodic:
return [np.arange(self.n)]
else:
return [np.where(self._cyclic_components_proj == k)[0]
for k in range(self.period)]
@property
@annotate_nodes
def cyclic_components(self,):
return self.cyclic_components_indices
[docs] def subgraph(self, nodes):
"""
Return the subgraph consisting of the given nodes and edges
between thses nodes.
Parameters
----------
nodes : array_like(int, ndim=1)
Array of node indices.
Returns
-------
DiGraph
A DiGraph representing the subgraph.
"""
adj_matrix = self.csgraph[np.ix_(nodes, nodes)]
weighted = True # To copy the dtype
if self.node_labels is not None:
node_labels = self.node_labels[nodes]
else:
node_labels = None
return DiGraph(adj_matrix, weighted=weighted, node_labels=node_labels)
def _csr_matrix_indices(S):
"""
Generate the indices of nonzero entries of a csr_matrix S
"""
m, n = S.shape
for i in range(m):
for j in range(S.indptr[i], S.indptr[i+1]):
row_index, col_index = i, S.indices[j]
yield row_index, col_index
[docs]def random_tournament_graph(n, random_state=None):
"""
Return a random tournament graph [1]_ with n nodes.
Parameters
----------
n : scalar(int)
Number of nodes.
random_state : int or np.random.RandomState/Generator, optional
Random seed (integer) or np.random.RandomState or Generator
instance to set the initial state of the random number generator
for reproducibility. If None, a randomly initialized RandomState
is used.
Returns
-------
DiGraph
A DiGraph representing the tournament graph.
References
----------
.. [1] `Tournament (graph theory)
<https://en.wikipedia.org/wiki/Tournament_(graph_theory)>`_,
Wikipedia.
"""
random_state = check_random_state(random_state)
num_edges = n * (n-1) // 2
r = random_state.random(num_edges)
row = np.empty(num_edges, dtype=int)
col = np.empty(num_edges, dtype=int)
_populate_random_tournament_row_col(n, r, row, col)
data = np.ones(num_edges, dtype=bool)
adj_matrix = sparse.coo_matrix((data, (row, col)), shape=(n, n))
return DiGraph(adj_matrix)
@jit(nopython=True, cache=True)
def _populate_random_tournament_row_col(n, r, row, col):
"""
Populate ndarrays `row` and `col` with directed edge indices
determined by random numbers in `r` for a tournament graph with n
nodes, which has num_edges = n * (n-1) // 2 edges.
Parameters
----------
n : scalar(int)
Number of nodes.
r : ndarray(float, ndim=1)
ndarray of length num_edges containing random numbers in [0, 1).
row, col : ndarray(int, ndim=1)
ndarrays of length num_edges to be modified in place.
"""
k = 0
for i in range(n):
for j in range(i+1, n):
if r[k] < 0.5:
row[k], col[k] = i, j
else:
row[k], col[k] = j, i
k += 1