Probabilistic Inference

This module provides inference mechanisms for probabilistic models.

Summary

Inference Classes

ForwardInference

Forward inference engine for probabilistic models.

DeterministicInference

Deterministic forward inference for probabilistic graphical models.

AncestralSamplingInference

Ancestral sampling inference for probabilistic graphical models.

Class Documentation

class ForwardInference(probabilistic_model: ProbabilisticModel, graph_learner: BaseGraphLearner | None = None, *args, **kwargs)[source]

Bases: BaseInference

Forward inference engine for probabilistic models.

This class implements forward inference through a probabilistic model by topologically sorting variables and computing them in dependency order. It supports parallel computation within topological levels and can optionally use a learned graph structure.

The inference engine: - Automatically sorts variables in topological order - Computes variables level-by-level (variables at same depth processed in parallel) - Supports GPU parallelization via CUDA streams - Supports CPU parallelization via threading - Handles interventions via _InterventionWrapper

probabilistic_model

The probabilistic model to perform inference on.

Type:

ProbabilisticModel

graph_learner

Optional graph structure learner.

Type:

BaseGraphLearner

concept_map

Maps concept names to Variable objects.

Type:

Dict[str, Variable]

sorted_variables

Variables in topological order.

Type:

List[Variable]

levels

Variables grouped by topological depth.

Type:

List[List[Variable]]

Parameters:
  • probabilistic_model – The probabilistic model to perform inference on.

  • graph_learner – Optional graph learner for weighted adjacency structure.

Raises:

RuntimeError – If the model contains cycles (not a DAG).

Example

>>> import torch
>>> from torch.distributions import Bernoulli
>>> from torch_concepts import InputVariable, EndogenousVariable
>>> from torch_concepts.distributions import Delta
>>> from torch_concepts.nn import ForwardInference, ParametricCPD, ProbabilisticModel
>>>
>>> # Create a simple model: latent -> A -> B
>>> # Where A is a root concept and B depends on A
>>>
>>> # Define variables
>>> input_var = InputVariable('input', parents=[], distribution=Delta, size=10)
>>> var_A = EndogenousVariable('A', parents=['input'], distribution=Bernoulli, size=1)
>>> var_B = EndogenousVariable('B', parents=['A'], distribution=Bernoulli, size=1)
>>>
>>> # Define CPDs (modules that compute each variable)
>>> from torch.nn import Identity, Linear
>>> latent_cpd = ParametricCPD('input', parametrization=Identity())
>>> cpd_A = ParametricCPD('A', parametrization=Linear(10, 1))  # latent -> A
>>> cpd_B = ParametricCPD('B', parametrization=Linear(1, 1))   # A -> B
>>>
>>> # Create probabilistic model
>>> pgm = ProbabilisticModel(
...     variables=[input_var, var_A, var_B],
...     parametric_cpds=[latent_cpd, cpd_A, cpd_B]
... )
>>>
>>> # Create forward inference engine
>>> inference = ForwardInference(pgm)
>>>
>>> # Check topological order
>>> print([v.concepts[0] for v in inference.sorted_variables])
>>> # ['input', 'A', 'B']
>>>
>>> # Check levels (for parallel computation)
>>> for i, level in enumerate(inference.levels):
...     print(f"Level {i}: {[v.concepts[0] for v in level]}")
>>> # Level 0: ['input']
>>> # Level 1: ['A']
>>> # Level 2: ['B']
abstract get_results(results: tensor, parent_variable: Variable)[source]

Process the raw output tensor from a CPD.

This method should be implemented by subclasses to handle distribution-specific processing (e.g., sampling from Bernoulli, taking argmax from Categorical, etc.).

Parameters:
  • results – Raw output tensor from the CPD.

  • parent_variable – The variable being computed.

Returns:

Processed output tensor.

predict(external_inputs: Dict[str, Tensor], debug: bool = False, device: str = 'auto') Dict[str, Tensor][source]

Perform forward pass prediction across the entire probabilistic model.

This method processes variables level-by-level, exploiting parallelism within each level. On GPU, uses CUDA streams for parallel computation. On CPU, uses ThreadPoolExecutor.

Parameters:
  • external_inputs – Dictionary mapping root variable names to input tensors.

  • debug – If True, runs sequentially for easier debugging (disables parallelism).

  • device – Device to use for computation. Options: - ‘auto’ (default): Automatically detect and use CUDA if available, else CPU - ‘cuda’ or ‘gpu’: Force use of CUDA (will raise error if not available) - ‘cpu’: Force use of CPU even if CUDA is available

Returns:

Dictionary mapping concept names to their output tensors.

Raises:

RuntimeError – If device=’cuda’/’gpu’ is specified but CUDA is not available.

get_parent_kwargs(parametric_cpd, parent_input: List[Tensor] | Tensor | None = None, parent_endogenous: List[Tensor] | Tensor | None = None) Dict[str, Tensor][source]

Prepare keyword arguments for CPD forward pass based on parent outputs.

This method inspects the CPD’s forward signature and constructs appropriate kwargs, separating endogenous (from probabilistic parents) and latent features (from continuous parents).

Parameters:
  • parametric_cpd – The CPD module to call.

  • parent_input – List of continuous parent outputs (latent/exogenous).

  • parent_endogenous – List of probabilistic parent outputs (concept endogenous).

Returns:

Dictionary of kwargs ready for parametric_cpd.forward(**kwargs).

query(query_concepts: List[str], evidence: Dict[str, Tensor], debug: bool = False, device: str = 'auto') Tensor[source]

Execute forward pass and return only specified concepts concatenated.

This method runs full inference via predict() and then extracts and concatenates only the requested concepts in the specified order.

Parameters:
  • query_concepts – List of concept names to retrieve (e.g., [“C”, “B”, “A”]).

  • evidence – Dictionary of {root_concept_name: input_tensor}.

  • debug – If True, runs in debug mode (sequential execution).

  • device – Device to use for computation. Options: - ‘auto’ (default): Automatically detect and use CUDA if available, else CPU - ‘cuda’ or ‘gpu’: Force use of CUDA (will raise error if not available) - ‘cpu’: Force use of CPU even if CUDA is available

Returns:

Single tensor containing concatenated predictions for requested concepts, ordered as requested (Batch x TotalFeatures).

Raises:
  • ValueError – If requested concept was not computed.

  • RuntimeError – If batch sizes don’t match across concepts.

  • RuntimeError – If concatenation produces unexpected feature dimension.

  • RuntimeError – If device=’cuda’/’gpu’ is specified but CUDA is not available.

property available_query_vars: Set[str]

Get all variable names available for querying.

Returns:

Set of concept names that can be queried.

unrolled_probabilistic_model() ProbabilisticModel[source]

Build an ‘unrolled’ view of the ProbabilisticModel based on graph_learner adjacency.

This method creates a modified PGM that reflects the learned graph structure, applying rules for keeping/dropping CPDs based on root/non-root status and recursively pruning unused variables.

Rules: - For root columns (no incoming edges): keep row CPD, drop column CPD - For non-root columns: keep column CPD, drop row CPD - Recursively drop variables whose children are all dropped - Apply adjacency gating to remove zero-weight edges

Returns:

Modified ProbabilisticModel with unrolled structure.

Raises:
  • RuntimeError – If graph_learner is not set or lacks weighted_adj.

  • RuntimeError – If adjacency shape doesn’t match label lengths.

training: bool
class DeterministicInference(probabilistic_model: ProbabilisticModel, graph_learner: BaseGraphLearner | None = None, *args, **kwargs)[source]

Bases: ForwardInference

Deterministic forward inference for probabilistic graphical models.

This inference engine performs deterministic (maximum likelihood) inference by returning raw endogenous/outputs from CPDs without sampling. It’s useful for prediction tasks where you want the most likely values rather than samples from the distribution.

Inherits all functionality from ForwardInference but implements get_results() to return raw outputs without stochastic sampling.

Example

>>> import torch
>>> from torch.distributions import Bernoulli
>>> from torch_concepts import InputVariable, EndogenousVariable
>>> from torch_concepts.distributions import Delta
>>> from torch_concepts.nn import DeterministicInference, ParametricCPD, ProbabilisticModel, LinearCC
>>>
>>> # Create a simple PGM: latent -> A -> B
>>> input_var = InputVariable('input', parents=[], distribution=Delta, size=10)
>>> var_A = EndogenousVariable('A', parents=['input'], distribution=Bernoulli, size=1)
>>> var_B = EndogenousVariable('B', parents=['A'], distribution=Bernoulli, size=1)
>>>
>>> # Define CPDs
>>> from torch.nn import Identity, Linear
>>> cpd_emb = ParametricCPD('input', parametrization=Identity())
>>> cpd_A = ParametricCPD('A', parametrization=Linear(10, 1))
>>> cpd_B = ParametricCPD('B', parametrization=LinearCC(1, 1))
>>>
>>> # Create probabilistic model
>>> pgm = ProbabilisticModel(
...     variables=[input_var, var_A, var_B],
...     parametric_cpds=[cpd_emb, cpd_A, cpd_B]
... )
>>>
>>> # Create deterministic inference engine
>>> inference = DeterministicInference(pgm)
>>>
>>> # Perform inference - returns endogenous, not samples
>>> x = torch.randn(4, 10)  # batch_size=4, latent_size=10
>>> results = inference.predict({'input': x})
>>>
>>> # Results contain raw endogenous for Bernoulli variables
>>> print(results['A'].shape)  # torch.Size([4, 1]) - endogenous, not {0,1}
>>> print(results['B'].shape)  # torch.Size([4, 1]) - endogenous, not {0,1}
>>>
>>> # Query specific concepts - returns concatenated endogenous
>>> output = inference.query(['B', 'A'], evidence={'input': x})
>>> print(output.shape)  # torch.Size([4, 2])
>>> # output contains [logit_B, logit_A] for each sample
>>>
>>> # Convert endogenous to probabilities if needed
>>> prob_A = torch.sigmoid(results['A'])
>>> print(prob_A.shape)  # torch.Size([4, 1])
>>>
>>> # Get hard predictions (0 or 1)
>>> pred_A = (prob_A > 0.5).float()
>>> print(pred_A)  # Binary predictions
get_results(results: tensor, parent_variable: Variable) Tensor[source]

Return raw output without sampling.

Parameters:
  • results – Raw output tensor from the CPD.

  • parent_variable – The variable being computed (unused in deterministic mode).

Returns:

Raw output tensor (endogenous for probabilistic variables).

Return type:

torch.Tensor

training: bool
class AncestralSamplingInference(probabilistic_model: ProbabilisticModel, graph_learner: BaseGraphLearner | None = None, log_probs: bool = True, **dist_kwargs)[source]

Bases: ForwardInference

Ancestral sampling inference for probabilistic graphical models.

This inference engine performs ancestral (forward) sampling by drawing samples from the distributions defined by each variable. It’s useful for generating realistic samples from the model and for tasks requiring stochastic predictions.

The sampling respects the probabilistic structure: - Samples from Bernoulli distributions using .sample() - Uses reparameterization (.rsample()) for RelaxedBernoulli and RelaxedOneHotCategorical - Supports custom distribution kwargs (e.g., temperature for Gumbel-Softmax)

Parameters:
  • probabilistic_model – The probabilistic model to perform inference on.

  • graph_learner – Optional graph learner for weighted adjacency structure.

  • **dist_kwargs – Additional kwargs passed to distribution constructors (e.g., temperature for relaxed distributions).

Example

>>> import torch
>>> from torch.distributions import Bernoulli
>>> from torch_concepts import InputVariable
>>> from torch_concepts.distributions import Delta
>>> from torch_concepts.nn import AncestralSamplingInference, ParametricCPD, ProbabilisticModel
>>> from torch_concepts import EndogenousVariable
>>> from torch_concepts.nn import LinearCC
>>>
>>> # Create a simple PGM: embedding -> A -> B
>>> embedding_var = InputVariable('embedding', parents=[], distribution=Delta, size=10)
>>> var_A = EndogenousVariable('A', parents=['embedding'], distribution=Bernoulli, size=1)
>>> var_B = EndogenousVariable('B', parents=['A'], distribution=Bernoulli, size=1)
>>>
>>> # Define CPDs
>>> from torch.nn import Identity, Linear
>>> cpd_emb = ParametricCPD('embedding', parametrization=Identity())
>>> cpd_A = ParametricCPD('A', parametrization=Linear(10, 1))
>>> cpd_B = ParametricCPD('B', parametrization=LinearCC(1, 1))
>>>
>>> # Create probabilistic model
>>> pgm = ProbabilisticModel(
...     variables=[embedding_var, var_A, var_B],
...     parametric_cpds=[cpd_emb, cpd_A, cpd_B]
... )
>>>
>>> # Create ancestral sampling inference engine
>>> inference = AncestralSamplingInference(pgm)
>>>
>>> # Perform inference - returns samples, not endogenous
>>> x = torch.randn(4, 10)  # batch_size=4, embedding_size=10
>>> results = inference.predict({'embedding': x})
>>>
>>> # Results contain binary samples {0, 1} for Bernoulli variables
>>> print(results['A'].shape)  # torch.Size([4, 1])
>>> print(results['A'].unique())  # tensor([0., 1.]) - actual samples
>>> print(results['B'].shape)  # torch.Size([4, 1])
>>> print(results['B'].unique())  # tensor([0., 1.]) - actual samples
>>>
>>> # Query specific concepts - returns concatenated samples
>>> samples = inference.query(['B', 'A'], evidence={'embedding': x})
>>> print(samples.shape)  # torch.Size([4, 2])
>>> # samples contains [sample_B, sample_A] for each instance
>>> print(samples)  # All values are 0 or 1
>>>
>>> # Multiple runs produce different samples (stochastic)
>>> samples1 = inference.query(['A'], evidence={'embedding': x})
>>> samples2 = inference.query(['A'], evidence={'embedding': x})
>>> print(torch.equal(samples1, samples2))  # Usually False (different samples)
>>>
>>> # With relaxed distributions (requires temperature)
>>> from torch.distributions import RelaxedBernoulli
>>> var_A_relaxed = InputVariable('A', parents=['embedding'],
...                               distribution=RelaxedBernoulli, size=1)
>>> pgm = ProbabilisticModel(
...     variables=[embedding_var, var_A_relaxed, var_B],
...     parametric_cpds=[cpd_emb, cpd_A, cpd_B]
... )
>>> inference_relaxed = AncestralSamplingInference(pgm, temperature=0.05)
>>> # Now uses reparameterization trick (.rsample())
>>>
>>> # Query returns continuous values in [0, 1] for relaxed distributions
>>> relaxed_samples = inference_relaxed.query(['A'], evidence={'embedding': x})
>>> # relaxed_samples will be continuous, not binary
get_results(results: tensor, parent_variable: Variable) Tensor[source]

Sample from the distribution parameterized by the results.

This method creates a distribution using the variable’s distribution type and the computed endogenous/parameters, then draws a sample.

Parameters:
  • results – Raw output tensor from the CPD (endogenous or parameters).

  • parent_variable – The variable being computed (defines distribution type).

Returns:

Sampled values from the distribution.

Return type:

torch.Tensor

training: bool