import numpy as np
from numpy.typing import NDArray
from scipy.special import digamma
from scipy.spatial import cKDTree
from .utils import build_tree_and_get_distances, get_counts_from_tree
from ..utils import check_idxs
[docs]
def total_correlation(
data: NDArray[np.floating],
k: int,
idxs: tuple[int, ...] | None = None,
algorithm: int = 1,
) -> tuple[NDArray[np.floating], float]:
"""
A wrapper function for the two TC functions.
Parameters
----------
data : NDArray[np.floating]
Data array of shape (n_variables, n_samples)
k : int
Number of nearest neighbors
idxs : tuple[int, ...]
Indices of variables to use (-1 means all)
algorithm : int
Whether to use algorithm 1 or 2.
Defaults to 1
See Also
--------
total_correlation_1 : The TC computed using algorithm 1.
total_correlation_2 : The TC computed using algorithm 2.
Returns
-------
NDArray[np.floating
The local total correlation for each sample.
float
The expected total correlation over all samples
"""
assert algorithm in {1, 2}, "Algorithm must be 1 or 2."
if algorithm == 1:
return total_correlation_1(data=data, k=k, idxs=idxs)
else:
return total_correlation_2(data=data, k=k, idxs=idxs)
[docs]
def total_correlation_1(
data: NDArray[np.floating], k: int, idxs: tuple[int, ...] | None = None
) -> tuple[NDArray[np.floating], float]:
r"""
Computes the Kraskov, Stogbauer, Grassberger estimate of the total correlation using the first algorithm presented Kraskov et. al (2004)
.. math::
\hat{TC}(X) = \psi(k) - (m-1)\psi(N) -\langle \psi(n_{x_{1}}+1) + \ldots + \psi(n_{x_{N}}+1)\rangle
Parameters
----------
data : NDArray[np.floating]
Data array of shape (n_variables, n_samples)
k : int
Number of nearest neighbors
idxs : tuple[int, ...]
Indices of variables to use (-1 means all)
Returns
-------
NDArray[np.floating
The local total correlation for each sample.
float
The expected total correlation over all samples
References
----------
Kraskov, A., Stögbauer, H., & Grassberger, P. (2004).
Estimating mutual information.
Physical Review E, 69(6), 066138.
https://doi.org/10.1103/PhysRevE.69.066138
Watanabe, S. (1960).
Information Theoretical Analysis of Multivariate Correlation.
IBM Journal of Research and Development, 4(1), Article 1.
https://doi.org/10.1147/rd.41.0066
"""
idxs_: tuple[int, ...] = check_idxs(idxs, data)
m: int = len(idxs_)
N: int = data.shape[1]
psi_k: float = digamma(k)
psi_N: float = digamma(N)
ptw: NDArray[np.floating] = np.full((1, N), psi_k + (m - 1) * psi_N)
distances: NDArray[np.floating]
_, distances, _ = build_tree_and_get_distances(data=data[idxs_, :], k=k)
for idx in idxs_:
tree, _, _ = build_tree_and_get_distances(data[(idx,), :], k=k)
counts: NDArray[np.integer] = get_counts_from_tree(
tree,
data[(idx,), :],
distances[:, -1],
)
ptw[0, :] -= digamma(counts + 1)
return ptw, ptw.mean()
[docs]
def total_correlation_2(
data: NDArray[np.floating], k: int, idxs: tuple[int, ...] | None = None
) -> tuple[NDArray[np.floating], float]:
r"""
Computes the Kraskov, Stogbauer, Grassberger estimate of the total correlation using the second algorithm presented in Kraskov et. al., (2004).
.. math::
\hat{TC}(X) = \psi(k) - ((m-1)/k) - (m-1)\psi(N) - \langle \psi(n_{x_{1}}) + \ldots + \psi(n_{x_{N}}) \rangle
Parameters
----------
data : NDArray[np.floating]
Data array of shape (n_variables, n_samples)
k : int
Number of nearest neighbors
idxs : tuple[int, ...]
Indices of variables to use (-1 means all)
Returns
-------
NDArray[np.floating
The local total correlation for each sample.
float
The expected total correlation over all samples
References
----------
Kraskov, A., Stögbauer, H., & Grassberger, P. (2004).
Estimating mutual information.
Physical Review E, 69(6), 066138.
https://doi.org/10.1103/PhysRevE.69.066138
Watanabe, S. (1960).
Information Theoretical Analysis of Multivariate Correlation.
IBM Journal of Research and Development, 4(1), Article 1.
https://doi.org/10.1147/rd.41.0066
"""
idxs_: tuple[int, ...] = check_idxs(idxs, data)
m: int = len(idxs_)
N: int = data.shape[1]
psi_k: float = digamma(k)
psi_N: float = digamma(N)
ptw: NDArray[np.floating] = np.full(
(1, N), psi_k - ((m - 1) / k) + ((m - 1) * psi_N)
)
distances: NDArray[np.floating]
indices: NDArray[np.integer]
_, distances, indices = build_tree_and_get_distances(data[idxs_, :], k=k)
neighbors: NDArray[np.floating] = indices[:, 1:]
for idx in idxs_:
data_idx: NDArray[np.floating] = data[
(idx,),
:,
].T
eps: NDArray[np.floating] = np.full(N, -np.inf)
for j in range(k):
norm: NDArray[np.floating] = np.linalg.norm(
data_idx - data_idx[neighbors[:, j]], ord=np.inf, axis=1
)
eps = np.maximum(norm, eps)
tree, distances, _ = build_tree_and_get_distances(data_idx.T, k=k)
counts: NDArray[np.integer] = get_counts_from_tree(
tree, data_idx.T, eps, strict=False
)
ptw[0, :] -= digamma(counts)
return ptw, ptw.mean()
[docs]
def dual_total_correlation(
data: NDArray[np.floating], k: int, idxs: tuple[int, ...] | None = None
) -> tuple[NDArray[np.floating], float]:
"""
Compute dual total correlation using KSG estimation.
Code adapted from JIDT
https://github.com/jlizier/jidt/blob/master/java/source/infodynamics/measures/continuous/kraskov/DualTotalCorrelationCalculatorKraskov.java
Parameters
----------
data : NDArray[np.floating]
Data array of shape (n_variables, n_samples)
k : int
Number of nearest neighbors
idxs : tuple[int, ...]
Indices of variables to use (-1 means all)
Returns
-------
NDArray[np.floating
The local dual total correlation for each sample.
float
The expected dual total correlation over all samples
References
----------
Abdallah, S. A., & Plumbley, M. D. (2012).
A measure of statistical complexity based on predictive information with application to finite spin systems.
Physics Letters A, 376(4), 275–281.
https://doi.org/10.1016/j.physleta.2011.10.066
Rosas, F., Mediano, P. A. M., Gastpar, M., & Jensen, H. J. (2019).
Quantifying High-order Interdependencies via Multivariate Extensions of the Mutual Information.
Physical Review E, 100(3), Article 3.
https://doi.org/10.1103/PhysRevE.100.032305
"""
idxs_: tuple[int, ...] = check_idxs(idxs, data)
m: int = len(idxs_)
N: int = data.shape[1]
psi_k: float = digamma(k)
psi_N: float = digamma(N)
ptw: NDArray[np.floating] = np.full((1, N), psi_k - psi_N)
distances: NDArray[np.floating]
tree, distances, _ = build_tree_and_get_distances(data[idxs_, :], k=k)
eps: NDArray[np.floating] = distances[:, -1]
for i in range(m):
residual_idxs: list[int] = [idxs_[j] for j in range(m) if j != i]
residual_data: NDArray[np.floating] = data[residual_idxs, :]
residual_tree = cKDTree(residual_data.T)
counts: NDArray[np.integer] = get_counts_from_tree(
residual_tree, residual_data, eps
)
ptw[0, :] -= (digamma(counts + 1) - psi_N) / (m - 1)
ptw *= m - 1
return ptw, ptw.mean()