Source code for syntropy.gaussian.decompositions

import numpy as np
import networkx as nx

from .shannon import (
    local_differential_entropy,
    mutual_information,
)

from .utils import check_cov
from ..utils import make_powerset
from ..lattices import load_lattice, mobius_inversion
from numpy.typing import NDArray
from typing import Callable, Any

Atom = tuple[tuple[int, ...], ...]


# %%
def unpack_atom(atom: Atom) -> set[int, ...]:
    """
    A utitlity function to unpack tuples.
    """
    varset: set[int, ...] = set()
    for s in {*atom}:
        varset.update(set(s))

    return varset


def local_precompute_sources(
    data: NDArray[np.floating], cov: NDArray[np.floating] | None = None
):
    """
    A utility function that pre-computes the local entropies
    of every subset of the data. This speeds up the PID/GID/PED
    by orders of magnitude.

    Parameters
    ----------
    data : NDArray[np.floating]
        The data in channels x samples format.
    cov : NDArray[np.floating], optional
        The covariance matrix of the data.
        The default is COV_NULL.

    Returns
    -------
    dict
        A dictionary of the local entropies for every source.

    """
    cov_: NDArray[np.floating] = check_cov(cov, data)

    N: int = data.shape[0]

    joint: tuple[int, ...] = tuple(i for i in range(N))
    sources: list[tuple[int, ...]] = list(make_powerset(joint))
    sources.remove(())

    return {
        source: local_differential_entropy(data[source, :], cov_[source, :][:, source])
        for source in sources
    }


def hmin_differential_redundancy(
    atom: Atom,
    sources: dict,
) -> NDArray[np.floating]:
    """
    For a collection of sources :math:`\\alpha=\\{a_1, a_2, \\ldots, a_k\\}`, computes
    the redundnat entropy shared by all sources as:

    :math:`h_{\\cap}^{min}(\\alpha) = \\min_{i}(a_{i})`

    Parameters
    ----------
    atom : tuple
        The partial information atom. In the form :math:`((a_1,),(a_2,)\\ldots)`.
    sources : dict
        The pre-computed collection of sources returned by 
        `precompute_local_entropies`.

    Returns
    -------
    NDArray[np.floating]
        The local redundancies for each sample.

    References
    ----------
    Finn, C., & Lizier, J. T. (2020).
    Generalised Measures of Multivariate Information Content.
    Entropy, 22(2), Article 2.
    https://doi.org/10.3390/e22020216


    """
    N: int = sources[atom[0]].shape[1]
    h_min: NDArray[np.floating] = np.repeat(np.inf, N)
    source: tuple = tuple()

    for source in atom:
        # i+ = min(h(source))
        h_min = np.minimum(h_min, sources[source])

    return h_min


def mmi_differential_redundancy(
    atom: Atom,
    inputs: tuple[int, ...],
    target: tuple[int, ...],
    cov: NDArray[np.floating],
    single_target_flag: bool = True,
) -> float:
    mn: float = np.inf
    if single_target_flag is True:
        for idxs_x in atom:
            mi = mutual_information(idxs_x=idxs_x, idxs_y=target, cov=cov)
            if mi < mn:
                mn = mi
    elif single_target_flag is False:
        atom_inputs = atom[0]
        atom_targets = list()

        for x in atom[1]:
            atom_targets.append(tuple(target[i] for i in x))
        atom_targets = tuple(atom_targets)

        for idxs_x in atom_inputs:
            for idxs_y in atom_targets:
                mi = mutual_information(idxs_x=idxs_x, idxs_y=idxs_y, cov=cov)
                if mi < mn:
                    mn = mi

    return mn


def ipm_differential_redundancy(
    atom: Atom,
    inputs: tuple[int, ...],
    target: tuple[int, ...],
    sources: dict[tuple[int, ...], NDArray[np.floating]],
    single_target_flag: bool = True,
) -> NDArray[np.floating]:
    """

    Parameters
    ----------
    atom : Atom
        The partial information or integrated information atom.

    inputs : tuple[int, ...]
        The indices of the input elements.

    target : tuple[int, ...]
        The indices of the target element(s)

    sources : dict[tuple[int, ...], NDArray[np.floating]]
        The local entropies retruned by `precompute_local_entropies`

    single_target_flag : bool
        Whether to do a single-target PID or multi-target PhiID.

    Returns
    -------
    NDArray[np.floating]


    """
    joint: tuple[int, ...] = inputs + target
    num_inputs: int = len(inputs)
    num_target: int = len(target)

    target_: tuple[int, ...] = tuple(
        i for i in range(num_inputs, num_inputs + num_target)
    )

    if single_target_flag is True:
        mn_inputs = hmin_differential_redundancy(atom=atom, sources=sources)
        mn_target = sources[target_]
        mn_joint = hmin_differential_redundancy(
            tuple(x + target_ for x in atom), sources=sources
        )
    elif single_target_flag is False:
        atom_inputs = atom[0]
        atom_targets = list()
        for x in atom[1]:
            atom_targets.append(tuple(target_[i] for i in x))
        atom_targets = tuple(atom_targets)

        mn_inputs = hmin_differential_redundancy(atom=atom_inputs, sources=sources)
        mn_target = hmin_differential_redundancy(atom=atom_targets, sources=sources)
        mn_joint = np.repeat(np.inf, repeats=mn_inputs.shape[1])
        for s1 in atom_inputs:
            for s2 in atom_targets:
                joint = s1 + s2
                mn_joint = np.minimum(mn_joint, sources[joint])

    diff = mn_inputs + mn_target - mn_joint

    return diff


def _pid(
    inputs: tuple[int, ...],
    target: tuple[int, ...],
    data: NDArray[np.floating] | None,
    cov: NDArray[np.floating] | None = None,
    redundancy_function: str = "ipm",
    single_target_flag: bool = True,
) -> dict[Atom, float] | tuple[dict[Atom, float], dict[Atom, NDArray[np.floating]]]:
    """
    A utility function that computes the guts of the PID/PhiID, depending 
    the value of single_target_flag. 

    Parameters
    ----------
    inputs : tuple[int, ...]
        The indices of the input elements. 
        
    target : tuple[int, ...]
        The indices of the target elements. 
        
    data : NDArray[np.floating]
        The numpy array, assumed to be in channels x time format.
    
    cov : NDArray[np.floating], optional
        The covariance matrix. If none is not provided, it is computed
        from the data directly.
        
    redundancy_function : str
        The redundancy function. 
        Options are `ipm` and `mmi`

    single_target_flag : bool
        Whether to do the PID or PhiID. 

    Returns
    -------
    dict[Atom, float] | tuple[dict[Atom, float], dict[Atom, NDArray[np.floating]]]


    """
    assert redundancy_function in {"ipm", "mmi"}, (
        "The available redundancy functions are Finn and Lizier's ipm and the minimum mutual information."
    )
    if redundancy_function == "ipm":
        assert data is not None, (
            "You must provide data for the ipm redundancy function."
        )

    cov_ = check_cov(cov, data)

    joint = inputs + target
    num_inputs = len(inputs)
    assert num_inputs in (2, 3, 4), (
        "Currently, syntropy only supports PIDs on 2, 3, and 4 inputs."
    )

    num_target = len(target)
    if single_target_flag is False:
        assert num_target in (2, 3), (
            "Currently syntropy only supports \Phi-IDs on 2, and 3 targets."
        )
        lattice: nx.DiGraph = load_lattice(num_inputs=num_inputs, num_target=num_target)
    elif single_target_flag is True:
        lattice: nx.DiGraph = load_lattice(num_inputs=num_inputs)

    kwargs: dict = {
        "inputs": inputs,
        "target": target,
        "single_target_flag": single_target_flag,
    }

    if redundancy_function == "mmi":
        redundancy_func: Callable = mmi_differential_redundancy
        kwargs["cov"] = cov_
        result: nx.DiGraph = mobius_inversion(redundancy_func, lattice, kwargs)

        avg: dict[Atom, float] = {
            node: result.nodes[node]["pi"] for node in result.nodes
        }

        return avg
    elif redundancy_function == "ipm":
        redundancy_func: Callable = ipm_differential_redundancy
        sources = local_precompute_sources(
            data=data[joint, :], cov=cov_[np.ix_(joint, joint)]
        )
        kwargs["sources"] = sources
        result: nx.DiGraph = mobius_inversion(redundancy_func, lattice, kwargs)

        ptw: dict[Atom, NDArray[np.floating]] = {
            node: result.nodes[node]["pi"] for node in result.nodes
        }
        avg: dict[Atom, float] = {key: ptw[key].mean() for key in ptw.keys()}

        return ptw, avg


[docs] def partial_information_decomposition( inputs: tuple[int, ...], target: tuple[int, ...], data: NDArray[np.floating] | None, cov: NDArray[np.floating] | None = None, redundancy_function: str = "ipm", ) -> Any: """ Computes the partial information decomposition for up to four input variables onto one (potentially joint) target variable. The available redundancy functions are :math:`MMI` and :math:`i_{\\min}` from Finn and Lizier. .. math:: i_{pm}(\\alpha;t) &= \\min h(\\alpha_i) - \min h(\\alpha_i|t) \\\\ i_{MMI}(\\alpha;t) &= \\min_i I(\\alpha_i;T) Parameters ---------- inputs : tuple[int, ...] The indices of the input elements. target : tuple[int, ...] The indices of the target elements. data : NDArray[np.floating] The numpy array, assumed to be in channels x time format. cov : NDArray[np.floating], optional The covariance matrix. If none is not provided, it is computed from the data directly. redundancy_function : str The redundancy function. Options are `ipm` and `mmi` Returns ------- Any References ---------- Williams, P. L., & Beer, R. D. (2010). Nonnegative Decomposition of Multivariate Information. arXiv:1004.2515 [Math-Ph, Physics:Physics, q-Bio]. http://arxiv.org/abs/1004.2515 Finn, C., & Lizier, J. T. (2018). Pointwise Partial Information Decomposition Using the Specificity and Ambiguity Lattices. Entropy, 20(4), Article 4. https://doi.org/10.3390/e20040297 """ return _pid( inputs=inputs, target=target, data=data, cov=cov, redundancy_function=redundancy_function, single_target_flag=True, )
[docs] def integrated_information_decomposition( inputs: tuple[int, ...], target: tuple[int, ...], data: NDArray[np.floating] | None, cov: NDArray[np.floating] | None = None, redundancy_function: str = "ipm", ) -> Any: """ Computes the integrated information decomposition introduced by Rosas, Mediano, et al. The PhiID relaxes the requirement of only having a single target, and instead allows for redundant-redundant, synergistic-synergistic, etc interactions. Available redundancy functions are: .. math:: i_{pm}(\\alpha;\\beta) &= \\min_i h(\\alpha_i) + \\min_i h(\\beta_i) - \min h(\\alpha_i, \\beta_i) \\\\ i_{MMI}(\\alpha;\\beta) &= \\min_{ij} I(\\alpha_i;\\beta_j) Parameters ---------- inputs : tuple[int, ...] The indices of the input elements. target : tuple[int, ...] The indices of the target elements. data : NDArray[np.floating] The numpy array, assumed to be in channels x time format. cov : NDArray[np.floating], optional The covariance matrix. If none is not provided, it is computed from the data directly. redundancy_function : str The redundancy function. Options are `ipm` and `mmi` Returns ------- Any References ---------- Mediano, P. A. M., Rosas, F. E., Luppi, A. I., Carhart-Harris, R. L., Bor, D., Seth, A. K., & Barrett, A. B. (2025). Toward a unified taxonomy of information dynamics via Integrated Information Decomposition. Proceedings of the National Academy of Sciences, 122(39), e2423297122. https://doi.org/10.1073/pnas.2423297122 Rosas, F. E., Mediano, P. A. M., Jensen, H. J., Seth, A. K., Barrett, A. B., Carhart-Harris, R. L., & Bor, D. (2020). Reconciling emergences: An information-theoretic approach to identify causal emergence in multivariate data. PLOS Computational Biology, 16(12), Article 12. https://doi.org/10.1371/journal.pcbi.1008289 """ return _pid( inputs=inputs, target=target, data=data, cov=cov, redundancy_function=redundancy_function, single_target_flag=False, )
[docs] def partial_entropy_decomposition( data: NDArray[np.floating], inputs: tuple[int, ...] | None = None, cov: NDArray[np.floating] | None = None, ) -> tuple[dict[Atom, NDArray[np.floating]], dict[Atom, float]]: """ Computes the partial entropy decomposition of a joint distribution with up to four elements. Uses the Gaussian hmin estimator. See: Parameters ---------- inputs : tuple[int, ...] The indices of the elements to analyze. data : NDArray[np.floating] The numpy array, assumed to be in channels x time format. cov : NDArray[np.floating], optional The covariance matrix. If none is not provided, it is computed from the data directly. Returns ------- ptw : dict The dictionary of local values for each partial entropy atom. The local values are represented by a numpy array of the same length as the data array. avg : dict The dictionary of expected values for each partial entropy atom. References ---------- Finn, C., & Lizier, J. T. (2020). Generalised Measures of Multivariate Information Content. Entropy, 22(2), Article 2. https://doi.org/10.3390/e22020216 """ cov_: NDArray[np.floating] = check_cov(cov, data) if inputs is None: inputs = tuple(i for i in range(data.shape[0])) num_inputs: int = len(inputs) lattice = load_lattice(num_inputs=num_inputs) sources: dict[tuple[int, ...], NDArray[np.floating]] = local_precompute_sources( data=data, cov=cov_ ) kwargs = {"sources": sources} result = mobius_inversion(hmin_differential_redundancy, lattice, kwargs) ptw = {node: result.nodes[node]["pi"] for node in result.nodes} avg = {key: ptw[key].mean() for key in ptw.keys()} return ptw, avg
[docs] def generalized_information_decomposition( inputs: tuple[int, ...], data: NDArray[np.floating], cov_posterior: NDArray[np.floating], cov_prior: NDArray[np.floating], ) -> tuple[dict[Atom, NDArray[np.floating]], dict[Atom, float]]: """ Computes the generalized information decomposition from Varley et al. The GID is a decomposition of the Kullback-Leibler divergence of a posterior distribution from a prior distribution. The available redundancy function is the Gaussian hmin. Parameters ---------- inputs : tuple[int, ...] The indices of the elements to analyze. data : NDArray[np.floating] The numpy array, assumed to be in channels x time format. cov_posterior : NDArray[np.floating] The covariance matrix that defines the prior distribution. cov_prior : NDArray[np.floating] The covariance matrix that defines the posterior distribution. Returns ------- ptw : dict The dictionary of local values for each partial entropy atom. The local values are represented by a numpy array of the same length as the data array. avg : dict The dictionary of expected values for each partial entropy atom. References ---------- Varley, T. F. (2024). Generalized decomposition of multivariate information. PLOS ONE, 19(2), e0297128. https://doi.org/10.1371/journal.pone.0297128 """ ptw_prior: dict[Atom, NDArray[np.floating]] ptw_prior, _ = partial_entropy_decomposition( data=data, inputs=inputs, cov=cov_prior ) ptw_posterior: dict[Atom, NDArray[np.floating]] ptw_posterior, _ = partial_entropy_decomposition( data=data, inputs=inputs, cov=cov_posterior ) ptw: dict[Atom, NDArray[np.floating]] = { key: ptw_prior[key] - ptw_posterior[key] for key in ptw_prior.keys() } avg: dict[Atom, float] = {key: ptw[key].mean() for key in ptw.keys()} return ptw, avg
[docs] def representational_complexity( avg: dict[Atom, float], comparator: Callable = min ) -> float: """ Computes the representational complexity of a given partial information or entropy lattice. The representational complexity is a measure of how much partial information atoms of a given degree of synergy contribute to the overall mutual information or entropy. Parameters ---------- avg : dict The dictionary of partial information/entropy atoms. Returned from any of the above functions. comparator : function, optional Whether to consider the minimum complexity of an atom. or the maximum complexity of an atom. Options are: min, max, np.min, np.max. The default is min, following the original work by Ehrlich et al.,. Returns ------- float The representational complexity. References ---------- Ehrlich, D. A., Schneider, A. C., Priesemann, V., Wibral, M., & Makkeh, A. (2023). A Measure of the Complexity of Neural Representations based on Partial Information Decomposition. Transactions on Machine Learning Research. https://openreview.net/forum?id=R8TU3pfzFr """ assert comparator in (min, max, np.min, np.max), "The comparator must be min or max" rc: float = 0.0 atom: Atom for atom in avg.keys(): rc += avg[atom] * comparator(len(source) for source in atom) return rc / sum(avg.values())
[docs] def idep_partial_information_decomposition( inputs: tuple[tuple[int, ...], tuple[int, ...]], target: tuple[int, ...], cov: NDArray[np.floating] | None, data: NDArray[np.floating] | None = None ) -> dict[str, float]: """ Computes the I_dep partial information decomposition for Gaussian systems using the dependency constraint method from Kay & Ince (2018). Currently only supports 2 predictors (univariate or multivariate). Adapted from: https://github.com/robince/partial-info-decomp/blob/master/calc_pi_Idep_mvn.m Parameters ---------- inputs: tuple[tuple[int, ...], tuple[int,...]], The indices of the two predictor variables/sets. Must have length 2. target : tuple[int, ...] The indices of the target variable(s). data : NDArray[np.floating] | None The data in channels x samples format. Optional if cov provided. cov : NDArray[np.floating] | None The covariance matrix. If None, computed from data. Returns ------- dict[str, float] Dictionary with keys: 'unq0', 'unq1', 'red', 'syn' References ---------- Kay, J. W., & Ince, R. A. A. (2018). Exact Partial Information Decompositions for Gaussian Systems Based on Dependency Constraints. Entropy, 20(4), 240. https://doi.org/10.3390/e20040240 """ assert len(inputs) == 2, "I_dep currently only supports 2 predictors" if cov is None: assert data is not None, "You must provide something" cov = np.cov(data, ddof=0) # Extract indices input_0 = inputs[0] input_1 = inputs[1] n0, n1, nt = len(input_0), len(input_1), len(target) # Extract block covariances C_00 = cov[np.ix_(input_0, input_0)] C_11 = cov[np.ix_(input_1, input_1)] C_tt = cov[np.ix_(target, target)] C_01 = cov[np.ix_(input_0, input_1)] C_0t = cov[np.ix_(input_0, target)] C_1t = cov[np.ix_(input_1, target)] # Cholesky decomposition (upper triangular) # Note: np.linalg.cholesky returns lower triangular, so we transpose chol_00 = np.linalg.cholesky(C_00).T chol_11 = np.linalg.cholesky(C_11).T chol_tt = np.linalg.cholesky(C_tt).T # Compute P, Q, R using Kay & Ince Appendix D (Equation A5) # P = inv(chol_xx)' * Cxy * inv(chol_yy) inv_chol_00 = np.linalg.inv(chol_00) inv_chol_11 = np.linalg.inv(chol_11) inv_chol_tt = np.linalg.inv(chol_tt) P = inv_chol_00.T @ C_01 @ inv_chol_11 Q = inv_chol_00.T @ C_0t @ inv_chol_tt R = inv_chol_11.T @ C_1t @ inv_chol_tt # Build standardized covariance matrix for MI calculations # Compute basic mutual informations on ORIGINAL covariance mi_x0_y = mutual_information(idxs_x=input_0, idxs_y=target, cov=cov) mi_x1_y = mutual_information(idxs_x=input_1, idxs_y=target, cov=cov) mi_x01_y = mutual_information(idxs_x=input_0 + input_1, idxs_y=target, cov=cov) # Compute edge values using TRUE identity matrices edge_b = mi_x0_y # Edge i: using identity matrices (Kay & Ince Table 9) eye_n1 = np.eye(n1) eye_n2 = np.eye(nt) numerator_det = np.linalg.slogdet(eye_n1 - R @ Q.T @ Q @ R.T)[1] denom1_det = np.linalg.slogdet(eye_n2 - Q.T @ Q)[1] denom2_det = np.linalg.slogdet(eye_n2 - R.T @ R)[1] edge_i = 0.5 * (numerator_det - denom1_det - denom2_det) - mi_x1_y # Edge k: build standardized Sigma_Z and compute its determinant Sigma_Z = np.zeros((n0 + n1 + nt, n0 + n1 + nt)) Sigma_Z[:n0, :n0] = np.eye(n0) Sigma_Z[n0 : n0 + n1, n0 : n0 + n1] = np.eye(n1) Sigma_Z[n0 + n1 :, n0 + n1 :] = np.eye(nt) Sigma_Z[:n0, n0 : n0 + n1] = P Sigma_Z[n0 : n0 + n1, :n0] = P.T Sigma_Z[:n0, n0 + n1 :] = Q Sigma_Z[n0 + n1 :, :n0] = Q.T Sigma_Z[n0 : n0 + n1, n0 + n1 :] = R Sigma_Z[n0 + n1 :, n0 : n0 + n1] = R.T k_num = 0.5 * np.linalg.slogdet(np.eye(n1) - P.T @ P)[1] k_den = 0.5 * np.linalg.slogdet(Sigma_Z)[1] edge_k = k_num - k_den - mi_x1_y # Unique information from X0 is minimum across all edges adding X0Y unq0 = min(edge_b, edge_i, edge_k) # Derive other components red = mi_x0_y - unq0 unq1 = mi_x1_y - red syn = mi_x01_y - mi_x1_y - unq0 return { "unq0": unq0, "unq1": unq1, "red": red, "syn": syn, # # For debugging/analysis # 'I_X0_Y': mi_x0_y, # 'I_X1_Y': mi_x1_y, # 'I_X0X1_Y': mi_x01_y, # 'edges': {'b': edge_b, 'i': edge_i, 'k': edge_k} }