Skip to content

core

lacuna.core

Core data structures and utilities for the lesion decoding toolkit.

AnalysisError

Bases: LacunaError, RuntimeError

Raised when analysis computation fails.

Source code in src/lacuna/core/exceptions.py
class AnalysisError(LacunaError, RuntimeError):
    """Raised when analysis computation fails."""

    pass

BIDSValidationError

Bases: LacunaError, ValueError

Raised when BIDS dataset structure is invalid.

Source code in src/lacuna/core/exceptions.py
class BIDSValidationError(LacunaError, ValueError):
    """Raised when BIDS dataset structure is invalid."""

    pass

ConnectivityMatrix dataclass

Bases: DataContainer

Result container for connectivity matrices.

Stores a single connectivity matrix with optional region labels.

Attributes:

Name Type Description
name str

Name/identifier for this result

matrix ndarray

Connectivity matrix (N x N)

region_labels list of str, optional

Labels for matrix rows/columns

matrix_type (str, optional)

Type of connectivity ("structural", "functional")

metadata dict

Additional metadata about the output

Examples:

>>> import numpy as np
>>> # Create a structural connectivity matrix
>>> conn_matrix = np.array([
...     [1.0, 0.8, 0.3],
...     [0.8, 1.0, 0.5],
...     [0.3, 0.5, 1.0]
... ])
>>> conn = ConnectivityMatrix(
...     name="structural_connectivity",
...     matrix=conn_matrix,
...     region_labels=["V1", "V2", "MT"],
...     matrix_type="structural"
... )
>>> print(conn.summary())
structural_connectivity: 3x3, type=structural
Source code in src/lacuna/core/data_types.py
@dataclass
class ConnectivityMatrix(DataContainer):
    """Result container for connectivity matrices.

    Stores a single connectivity matrix with optional region labels.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    matrix : np.ndarray
        Connectivity matrix (N x N)
    region_labels : list of str, optional
        Labels for matrix rows/columns
    matrix_type : str, optional
        Type of connectivity ("structural", "functional")
    metadata : dict
        Additional metadata about the output

    Examples
    --------
    >>> import numpy as np
    >>> # Create a structural connectivity matrix
    >>> conn_matrix = np.array([
    ...     [1.0, 0.8, 0.3],
    ...     [0.8, 1.0, 0.5],
    ...     [0.3, 0.5, 1.0]
    ... ])

    >>> conn = ConnectivityMatrix(
    ...     name="structural_connectivity",
    ...     matrix=conn_matrix,
    ...     region_labels=["V1", "V2", "MT"],
    ...     matrix_type="structural"
    ... )
    >>> print(conn.summary())
    structural_connectivity: 3x3, type=structural
    """

    name: str
    matrix: np.ndarray
    region_labels: list[str] | None = None
    matrix_type: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize and validate matrix."""
        super().__init__(name=self.name, metadata=self.metadata)

        # Validate matrix shape
        if self.matrix.ndim != 2:
            raise ValueError(f"Matrix must be 2D, got shape {self.matrix.shape}")
        if self.matrix.shape[0] != self.matrix.shape[1]:
            raise ValueError(f"Matrix must be square, got shape {self.matrix.shape}")

        # Validate labels if provided
        if self.region_labels is not None:
            if len(self.region_labels) != self.matrix.shape[0]:
                raise ValueError(
                    f"Number of labels ({len(self.region_labels)}) "
                    f"must match matrix size ({self.matrix.shape[0]})"
                )

    def get_data(self) -> np.ndarray:
        """Get connectivity matrix."""
        return self.matrix

    def summary(self) -> str:
        """Get a summary description of this result."""
        n_regions = self.matrix.shape[0]
        type_info = f", type={self.matrix_type}" if self.matrix_type else ""
        return f"{self.name}: {n_regions}x{n_regions}{type_info}"

    def __repr__(self) -> str:
        """Return string representation."""
        type_str = f", type='{self.matrix_type}'" if self.matrix_type else ""
        return (
            f"ConnectivityMatrix(" f"name='{self.name}', " f"shape={self.matrix.shape}{type_str})"
        )

__post_init__()

Initialize and validate matrix.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize and validate matrix."""
    super().__init__(name=self.name, metadata=self.metadata)

    # Validate matrix shape
    if self.matrix.ndim != 2:
        raise ValueError(f"Matrix must be 2D, got shape {self.matrix.shape}")
    if self.matrix.shape[0] != self.matrix.shape[1]:
        raise ValueError(f"Matrix must be square, got shape {self.matrix.shape}")

    # Validate labels if provided
    if self.region_labels is not None:
        if len(self.region_labels) != self.matrix.shape[0]:
            raise ValueError(
                f"Number of labels ({len(self.region_labels)}) "
                f"must match matrix size ({self.matrix.shape[0]})"
            )

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    type_str = f", type='{self.matrix_type}'" if self.matrix_type else ""
    return (
        f"ConnectivityMatrix(" f"name='{self.name}', " f"shape={self.matrix.shape}{type_str})"
    )

get_data()

Get connectivity matrix.

Source code in src/lacuna/core/data_types.py
def get_data(self) -> np.ndarray:
    """Get connectivity matrix."""
    return self.matrix

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    n_regions = self.matrix.shape[0]
    type_info = f", type={self.matrix_type}" if self.matrix_type else ""
    return f"{self.name}: {n_regions}x{n_regions}{type_info}"

CoordinateSpaceError

Bases: LacunaError, ValueError

Raised when operations require specific coordinate space.

Source code in src/lacuna/core/exceptions.py
class CoordinateSpaceError(LacunaError, ValueError):
    """Raised when operations require specific coordinate space."""

    pass

DataContainer

Bases: ABC

Abstract base class for unified data type containers.

This is the base class for all data container types. It provides common functionality for metadata management and a consistent interface for accessing data.

Subclasses implement specific data types: - VoxelMap: For 3D/4D brain maps (functional connectivity, disconnection) - ParcelData: For region-level aggregated data (atlas-based analysis) - ConnectivityMatrix: For connectivity matrices - SurfaceMesh: For surface-based data (vertices, faces) - Tractogram: For tractography streamlines - ScalarMetric: For summary statistics, scalars, and other data

Attributes:

Name Type Description
name str

Name/identifier for this data container (e.g., "rmap", "zmap")

metadata dict

Additional metadata about the data

data_type str

Type identifier for the container (set by subclasses)

Examples:

Subclasses are used to store analysis results:

>>> # VoxelMap for brain maps
>>> voxel_result = VoxelMap(
...     name="rmap",
...     data=nifti_img,
...     space="MNI152NLin6Asym",
...     resolution=2.0
... )
>>> print(voxel_result.summary())
>>> # ParcelData for region-level data
>>> parcel_result = ParcelData(
...     name="damage_scores",
...     data={"V1": 0.8, "V2": 0.6},
...     aggregation_method="mean"
... )
>>> top_regions = parcel_result.get_top_regions(n=5)
Source code in src/lacuna/core/data_types.py
class DataContainer(ABC):
    """Abstract base class for unified data type containers.

    This is the base class for all data container types. It provides
    common functionality for metadata management and a consistent interface
    for accessing data.

    Subclasses implement specific data types:
    - VoxelMap: For 3D/4D brain maps (functional connectivity, disconnection)
    - ParcelData: For region-level aggregated data (atlas-based analysis)
    - ConnectivityMatrix: For connectivity matrices
    - SurfaceMesh: For surface-based data (vertices, faces)
    - Tractogram: For tractography streamlines
    - ScalarMetric: For summary statistics, scalars, and other data

    Attributes
    ----------
    name : str
        Name/identifier for this data container (e.g., "rmap", "zmap")
    metadata : dict
        Additional metadata about the data
    data_type : str
        Type identifier for the container (set by subclasses)

    Examples
    --------
    Subclasses are used to store analysis results:

    >>> # VoxelMap for brain maps
    >>> voxel_result = VoxelMap(
    ...     name="rmap",
    ...     data=nifti_img,
    ...     space="MNI152NLin6Asym",
    ...     resolution=2.0
    ... )
    >>> print(voxel_result.summary())

    >>> # ParcelData for region-level data
    >>> parcel_result = ParcelData(
    ...     name="damage_scores",
    ...     data={"V1": 0.8, "V2": 0.6},
    ...     aggregation_method="mean"
    ... )
    >>> top_regions = parcel_result.get_top_regions(n=5)
    """

    def __init__(self, name: str, metadata: dict[str, Any] | None = None):
        """Initialize base data container.

        Parameters
        ----------
        name : str
            Name/identifier for this container
        metadata : dict, optional
            Additional metadata about the data
        """
        self.name = name
        self.metadata = metadata or {}
        self.data_type = self.__class__.__name__

    @abstractmethod
    def get_data(self, **kwargs) -> Any:
        """Get the primary data from this result.

        Parameters
        ----------
        **kwargs
            Subclass-specific options for data retrieval

        Returns
        -------
        Any
            The primary data (type depends on subclass)
        """
        pass

    @abstractmethod
    def summary(self) -> str:
        """Get a summary description of this result.

        Returns
        -------
        str
            Human-readable summary
        """
        pass

    def __repr__(self) -> str:
        """Return string representation."""
        return f"{self.data_type}(name='{self.name}', metadata={len(self.metadata)} items)"

__init__(name, metadata=None)

Initialize base data container.

Parameters:

Name Type Description Default
name str

Name/identifier for this container

required
metadata dict

Additional metadata about the data

None
Source code in src/lacuna/core/data_types.py
def __init__(self, name: str, metadata: dict[str, Any] | None = None):
    """Initialize base data container.

    Parameters
    ----------
    name : str
        Name/identifier for this container
    metadata : dict, optional
        Additional metadata about the data
    """
    self.name = name
    self.metadata = metadata or {}
    self.data_type = self.__class__.__name__

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    return f"{self.data_type}(name='{self.name}', metadata={len(self.metadata)} items)"

get_data(**kwargs) abstractmethod

Get the primary data from this result.

Parameters:

Name Type Description Default
**kwargs

Subclass-specific options for data retrieval

{}

Returns:

Type Description
Any

The primary data (type depends on subclass)

Source code in src/lacuna/core/data_types.py
@abstractmethod
def get_data(self, **kwargs) -> Any:
    """Get the primary data from this result.

    Parameters
    ----------
    **kwargs
        Subclass-specific options for data retrieval

    Returns
    -------
    Any
        The primary data (type depends on subclass)
    """
    pass

summary() abstractmethod

Get a summary description of this result.

Returns:

Type Description
str

Human-readable summary

Source code in src/lacuna/core/data_types.py
@abstractmethod
def summary(self) -> str:
    """Get a summary description of this result.

    Returns
    -------
    str
        Human-readable summary
    """
    pass

EmptyMaskError

Bases: ValidationError

Raised when a mask contains no non-zero voxels.

Source code in src/lacuna/core/exceptions.py
class EmptyMaskError(ValidationError):
    """Raised when a mask contains no non-zero voxels."""

    def __init__(self, subject_id: str | None = None, detail: str | None = None):
        self.subject_id = subject_id
        if subject_id:
            message = f"Empty mask for '{subject_id}': mask contains no non-zero voxels. "
        else:
            message = "Empty mask: mask contains no non-zero voxels. "
        if detail:
            message += detail
        else:
            message += "Please ensure mask files contain valid lesion data."
        super().__init__(message)

LacunaError

Bases: Exception

Base exception for all lacuna errors.

Source code in src/lacuna/core/exceptions.py
class LacunaError(Exception):
    """Base exception for all lacuna errors."""

    pass

NiftiLoadError

Bases: LacunaError, IOError

Raised when NIfTI file loading fails.

Source code in src/lacuna/core/exceptions.py
class NiftiLoadError(LacunaError, IOError):
    """Raised when NIfTI file loading fails."""

    pass

ParcelData dataclass

Bases: DataContainer

Result container for atlas-based region aggregation.

Attributes:

Name Type Description
name str

Name/identifier for this result

data dict

Dictionary mapping ROI identifiers to values

region_labels list of str, optional

Ordered list of region label names (from atlas metadata)

parcel_names list of str, optional

Names of atlases used in the analysis

aggregation_method (str, optional)

Method used for aggregation (e.g., "mean", "percent")

metadata dict

Additional metadata about the output

Examples:

>>> # Create parcel data from atlas-based analysis
>>> parcel_data = ParcelData(
...     name="damage_scores",
...     data={
...         "Visual_V1": 0.85,
...         "Motor_Primary": 0.42,
...         "Prefrontal_DLPFC": 0.15
...     },
...     parcel_names=["Schaefer100"],
...     aggregation_method="percent"
... )
>>> # Get top damaged regions
>>> top = parcel_data.get_top_regions(n=2)
>>> print(top)
{'Visual_V1': 0.85, 'Motor_Primary': 0.42}
>>> print(parcel_data.summary())
damage_scores: 3 regions from 1 atlases, method=percent
Source code in src/lacuna/core/data_types.py
@dataclass
class ParcelData(DataContainer):
    """Result container for atlas-based region aggregation.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    data : dict
        Dictionary mapping ROI identifiers to values
    region_labels : list of str, optional
        Ordered list of region label names (from atlas metadata)
    parcel_names : list of str, optional
        Names of atlases used in the analysis
    aggregation_method : str, optional
        Method used for aggregation (e.g., "mean", "percent")
    metadata : dict
        Additional metadata about the output

    Examples
    --------
    >>> # Create parcel data from atlas-based analysis
    >>> parcel_data = ParcelData(
    ...     name="damage_scores",
    ...     data={
    ...         "Visual_V1": 0.85,
    ...         "Motor_Primary": 0.42,
    ...         "Prefrontal_DLPFC": 0.15
    ...     },
    ...     parcel_names=["Schaefer100"],
    ...     aggregation_method="percent"
    ... )

    >>> # Get top damaged regions
    >>> top = parcel_data.get_top_regions(n=2)
    >>> print(top)
    {'Visual_V1': 0.85, 'Motor_Primary': 0.42}

    >>> print(parcel_data.summary())
    damage_scores: 3 regions from 1 atlases, method=percent
    """

    name: str
    data: dict[str, float]
    region_labels: list[str] | None = None
    parcel_names: list[str] | None = None
    aggregation_method: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize base class."""
        super().__init__(name=self.name, metadata=self.metadata)

    def get_data(self, atlas_filter: str | None = None) -> dict[str, float]:
        """Get ROI data, optionally filtered by atlas name."""
        if atlas_filter is None:
            return self.data

        return {
            roi: value for roi, value in self.data.items() if atlas_filter.lower() in roi.lower()
        }

    def get_top_regions(self, n: int = 10, ascending: bool = False) -> dict[str, float]:
        """Get top N regions by value."""
        sorted_items = sorted(self.data.items(), key=lambda x: x[1], reverse=not ascending)
        return dict(sorted_items[:n])

    def summary(self) -> str:
        """Get a summary description of this result."""
        n_rois = len(self.data)
        atlas_info = f"{len(self.parcel_names)} atlases" if self.parcel_names else "unknown atlases"
        method_info = f", method={self.aggregation_method}" if self.aggregation_method else ""
        return f"{self.name}: {n_rois} regions from {atlas_info}{method_info}"

    def __repr__(self) -> str:
        """Return string representation."""
        method_str = f", method='{self.aggregation_method}'" if self.aggregation_method else ""
        return f"ParcelData(name='{self.name}', n_regions={len(self.data)}{method_str})"

__post_init__()

Initialize base class.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize base class."""
    super().__init__(name=self.name, metadata=self.metadata)

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    method_str = f", method='{self.aggregation_method}'" if self.aggregation_method else ""
    return f"ParcelData(name='{self.name}', n_regions={len(self.data)}{method_str})"

get_data(atlas_filter=None)

Get ROI data, optionally filtered by atlas name.

Source code in src/lacuna/core/data_types.py
def get_data(self, atlas_filter: str | None = None) -> dict[str, float]:
    """Get ROI data, optionally filtered by atlas name."""
    if atlas_filter is None:
        return self.data

    return {
        roi: value for roi, value in self.data.items() if atlas_filter.lower() in roi.lower()
    }

get_top_regions(n=10, ascending=False)

Get top N regions by value.

Source code in src/lacuna/core/data_types.py
def get_top_regions(self, n: int = 10, ascending: bool = False) -> dict[str, float]:
    """Get top N regions by value."""
    sorted_items = sorted(self.data.items(), key=lambda x: x[1], reverse=not ascending)
    return dict(sorted_items[:n])

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    n_rois = len(self.data)
    atlas_info = f"{len(self.parcel_names)} atlases" if self.parcel_names else "unknown atlases"
    method_info = f", method={self.aggregation_method}" if self.aggregation_method else ""
    return f"{self.name}: {n_rois} regions from {atlas_info}{method_info}"

Pipeline

Declarative analysis workflow definition.

Pipeline allows defining a sequence of analyses that will be run in order on each subject. It supports batch processing with configurable parallelization.

Parameters:

Name Type Description Default
name str

Human-readable name for the pipeline

None
description str

Description of what the pipeline does

None

Examples:

>>> from lacuna.analysis import RegionalDamage, FunctionalNetworkMapping, ParcelAggregation
>>> from lacuna import Pipeline
>>> # Define pipeline
>>> pipeline = Pipeline(name="Standard Lesion Analysis")
>>> pipeline.add(RegionalDamage())
>>> pipeline.add(FunctionalNetworkMapping())
>>> pipeline.add(ParcelAggregation(parc_names=["Schaefer100"]))
>>> # Run on single subject
>>> result = pipeline.run(mask_data)
>>> # Run on multiple subjects in parallel
>>> results = pipeline.run_batch(subjects, n_jobs=-1)
>>> # Get workflow description
>>> print(pipeline.describe())
Pipeline: Standard Lesion Analysis
Steps:
  1. RegionalDamage
  2. FunctionalNetworkMapping (atlas=schaefer100)
  3. ParcelAggregation (parc_names=['Schaefer100'])
Source code in src/lacuna/core/pipeline.py
class Pipeline:
    """
    Declarative analysis workflow definition.

    Pipeline allows defining a sequence of analyses that will be run
    in order on each subject. It supports batch processing with
    configurable parallelization.

    Parameters
    ----------
    name : str, optional
        Human-readable name for the pipeline
    description : str, optional
        Description of what the pipeline does

    Examples
    --------
    >>> from lacuna.analysis import RegionalDamage, FunctionalNetworkMapping, ParcelAggregation
    >>> from lacuna import Pipeline

    >>> # Define pipeline
    >>> pipeline = Pipeline(name="Standard Lesion Analysis")
    >>> pipeline.add(RegionalDamage())
    >>> pipeline.add(FunctionalNetworkMapping())
    >>> pipeline.add(ParcelAggregation(parc_names=["Schaefer100"]))

    >>> # Run on single subject
    >>> result = pipeline.run(mask_data)

    >>> # Run on multiple subjects in parallel
    >>> results = pipeline.run_batch(subjects, n_jobs=-1)

    >>> # Get workflow description
    >>> print(pipeline.describe())
    Pipeline: Standard Lesion Analysis
    Steps:
      1. RegionalDamage
      2. FunctionalNetworkMapping (atlas=schaefer100)
      3. ParcelAggregation (parc_names=['Schaefer100'])
    """

    def __init__(
        self,
        name: str | None = None,
        description: str | None = None,
    ):
        self.name = name or "Unnamed Pipeline"
        self.description = description
        self._steps: list[PipelineStep] = []

    def add(
        self,
        analysis: BaseAnalysis,
        name: str | None = None,
    ) -> Pipeline:
        """
        Add an analysis step to the pipeline.

        Parameters
        ----------
        analysis : BaseAnalysis
            The analysis module to add
        name : str, optional
            Human-readable name for this step

        Returns
        -------
        Pipeline
            Self for method chaining
        """
        step = PipelineStep(analysis=analysis, name=name)
        self._steps.append(step)
        return self

    def run(self, data: SubjectData, verbose: bool = False) -> SubjectData:
        """
        Run the pipeline on a single subject.

        Parameters
        ----------
        data : SubjectData
            Input data to process
        verbose : bool, default=False
            If True, print progress messages. If False, run silently.

        Returns
        -------
        SubjectData
            Processed data with all analysis results

        Raises
        ------
        TypeError
            If data is not a SubjectData instance
        """
        # Validate input type
        if not isinstance(data, SubjectData):
            raise TypeError(
                f"Unsupported input type: {type(data).__name__}\n" "Supported types: SubjectData"
            )

        result = data

        # Create logger for analysis section headers
        logger = ConsoleLogger(verbose=verbose, width=70)

        for step in self._steps:
            # Run the analysis
            if verbose:
                logger.section(f"Running {step.name}")

            result = step.analysis.run(result)

        return result

    def run_batch(
        self,
        data_list: list[SubjectData],
        n_jobs: int = -1,
        show_progress: bool = True,
        parallel: bool = True,
    ) -> list[SubjectData | ParcelData]:
        """
        Run the pipeline on multiple subjects.

        Parameters
        ----------
        data_list : list of SubjectData
            List of subjects to process
        n_jobs : int, default=-1
            Number of parallel jobs (-1 uses all CPUs)
        show_progress : bool, default=True
            Show progress bar
        parallel : bool, default=True
            Whether to process subjects in parallel

        Returns
        -------
        list of SubjectData or ParcelData
            Processed data for each subject
        """
        if not parallel or n_jobs == 1:
            # Sequential processing
            results: list[SubjectData | ParcelData] = []
            iterator = data_list
            if show_progress:
                from tqdm import tqdm

                iterator = tqdm(data_list, desc=self.name)

            for data in iterator:
                results.append(self.run(data, verbose=False))
            return results

        # Parallel processing - run each step as a batch
        step_results: list[SubjectData | ParcelData] = list(data_list)

        for step in self._steps:
            # Use batch_process for this step
            from lacuna.batch.api import batch_process

            step_results = batch_process(
                inputs=step_results,  # type: ignore[arg-type]
                analysis=step.analysis,
                n_jobs=n_jobs,
                show_progress=show_progress,
            )

        return step_results

    def describe(self) -> str:
        """
        Get a human-readable description of the pipeline.

        Returns
        -------
        str
            Multi-line description of the pipeline
        """
        lines = [f"Pipeline: {self.name}"]

        if self.description:
            lines.append(f"  {self.description}")

        lines.append("")
        lines.append("Steps:")

        for i, step in enumerate(self._steps, 1):
            # Get analysis parameters for display
            params = self._get_analysis_params(step.analysis)
            if params:
                lines.append(f"  {i}. {step.name} ({params})")
            else:
                lines.append(f"  {i}. {step.name}")

        return "\n".join(lines)

    def _get_analysis_params(self, analysis: BaseAnalysis) -> str:
        """Extract key parameters from analysis for display."""
        params = []

        # Try common parameter names
        for attr in ["parc_names", "atlas", "threshold", "source"]:
            if hasattr(analysis, attr):
                val = getattr(analysis, attr)
                if val is not None:
                    params.append(f"{attr}={val!r}")

        return ", ".join(params)

    def __len__(self) -> int:
        """Return number of steps in pipeline."""
        return len(self._steps)

    def __repr__(self) -> str:
        return f"Pipeline(name={self.name!r}, steps={len(self._steps)})"

__len__()

Return number of steps in pipeline.

Source code in src/lacuna/core/pipeline.py
def __len__(self) -> int:
    """Return number of steps in pipeline."""
    return len(self._steps)

add(analysis, name=None)

Add an analysis step to the pipeline.

Parameters:

Name Type Description Default
analysis BaseAnalysis

The analysis module to add

required
name str

Human-readable name for this step

None

Returns:

Type Description
Pipeline

Self for method chaining

Source code in src/lacuna/core/pipeline.py
def add(
    self,
    analysis: BaseAnalysis,
    name: str | None = None,
) -> Pipeline:
    """
    Add an analysis step to the pipeline.

    Parameters
    ----------
    analysis : BaseAnalysis
        The analysis module to add
    name : str, optional
        Human-readable name for this step

    Returns
    -------
    Pipeline
        Self for method chaining
    """
    step = PipelineStep(analysis=analysis, name=name)
    self._steps.append(step)
    return self

describe()

Get a human-readable description of the pipeline.

Returns:

Type Description
str

Multi-line description of the pipeline

Source code in src/lacuna/core/pipeline.py
def describe(self) -> str:
    """
    Get a human-readable description of the pipeline.

    Returns
    -------
    str
        Multi-line description of the pipeline
    """
    lines = [f"Pipeline: {self.name}"]

    if self.description:
        lines.append(f"  {self.description}")

    lines.append("")
    lines.append("Steps:")

    for i, step in enumerate(self._steps, 1):
        # Get analysis parameters for display
        params = self._get_analysis_params(step.analysis)
        if params:
            lines.append(f"  {i}. {step.name} ({params})")
        else:
            lines.append(f"  {i}. {step.name}")

    return "\n".join(lines)

run(data, verbose=False)

Run the pipeline on a single subject.

Parameters:

Name Type Description Default
data SubjectData

Input data to process

required
verbose bool

If True, print progress messages. If False, run silently.

False

Returns:

Type Description
SubjectData

Processed data with all analysis results

Raises:

Type Description
TypeError

If data is not a SubjectData instance

Source code in src/lacuna/core/pipeline.py
def run(self, data: SubjectData, verbose: bool = False) -> SubjectData:
    """
    Run the pipeline on a single subject.

    Parameters
    ----------
    data : SubjectData
        Input data to process
    verbose : bool, default=False
        If True, print progress messages. If False, run silently.

    Returns
    -------
    SubjectData
        Processed data with all analysis results

    Raises
    ------
    TypeError
        If data is not a SubjectData instance
    """
    # Validate input type
    if not isinstance(data, SubjectData):
        raise TypeError(
            f"Unsupported input type: {type(data).__name__}\n" "Supported types: SubjectData"
        )

    result = data

    # Create logger for analysis section headers
    logger = ConsoleLogger(verbose=verbose, width=70)

    for step in self._steps:
        # Run the analysis
        if verbose:
            logger.section(f"Running {step.name}")

        result = step.analysis.run(result)

    return result

run_batch(data_list, n_jobs=-1, show_progress=True, parallel=True)

Run the pipeline on multiple subjects.

Parameters:

Name Type Description Default
data_list list of SubjectData

List of subjects to process

required
n_jobs int

Number of parallel jobs (-1 uses all CPUs)

-1
show_progress bool

Show progress bar

True
parallel bool

Whether to process subjects in parallel

True

Returns:

Type Description
list of SubjectData or ParcelData

Processed data for each subject

Source code in src/lacuna/core/pipeline.py
def run_batch(
    self,
    data_list: list[SubjectData],
    n_jobs: int = -1,
    show_progress: bool = True,
    parallel: bool = True,
) -> list[SubjectData | ParcelData]:
    """
    Run the pipeline on multiple subjects.

    Parameters
    ----------
    data_list : list of SubjectData
        List of subjects to process
    n_jobs : int, default=-1
        Number of parallel jobs (-1 uses all CPUs)
    show_progress : bool, default=True
        Show progress bar
    parallel : bool, default=True
        Whether to process subjects in parallel

    Returns
    -------
    list of SubjectData or ParcelData
        Processed data for each subject
    """
    if not parallel or n_jobs == 1:
        # Sequential processing
        results: list[SubjectData | ParcelData] = []
        iterator = data_list
        if show_progress:
            from tqdm import tqdm

            iterator = tqdm(data_list, desc=self.name)

        for data in iterator:
            results.append(self.run(data, verbose=False))
        return results

    # Parallel processing - run each step as a batch
    step_results: list[SubjectData | ParcelData] = list(data_list)

    for step in self._steps:
        # Use batch_process for this step
        from lacuna.batch.api import batch_process

        step_results = batch_process(
            inputs=step_results,  # type: ignore[arg-type]
            analysis=step.analysis,
            n_jobs=n_jobs,
            show_progress=show_progress,
        )

    return step_results

ProvenanceError

Bases: LacunaError, RuntimeError

Raised when provenance tracking encounters issues.

Source code in src/lacuna/core/exceptions.py
class ProvenanceError(LacunaError, RuntimeError):
    """Raised when provenance tracking encounters issues."""

    pass

ScalarMetric dataclass

Bases: DataContainer

Result container for miscellaneous data.

This class handles summary statistics, scalar values, metadata, and any other data that doesn't fit into specific result types.

Attributes:

Name Type Description
name str

Name/identifier for this result

data Any

The data (can be scalar, dict, list, etc.)

data_type (str, optional)

Type description (e.g., "scalar", "summary_stats", "metadata")

metadata dict

Additional metadata about the output

Source code in src/lacuna/core/data_types.py
@dataclass
class ScalarMetric(DataContainer):
    """Result container for miscellaneous data.

    This class handles summary statistics, scalar values, metadata,
    and any other data that doesn't fit into specific result types.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    data : Any
        The data (can be scalar, dict, list, etc.)
    data_type : str, optional
        Type description (e.g., "scalar", "summary_stats", "metadata")
    metadata : dict
        Additional metadata about the output
    """

    name: str
    data: Any
    data_type: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize base class and infer data_type if needed."""
        # Store the data_type before calling super().__init__
        user_data_type = self.data_type
        super().__init__(name=self.name, metadata=self.metadata)

        # Restore or infer data_type
        if user_data_type is not None:
            self.data_type = user_data_type
        elif isinstance(self.data, (int, float, bool)):
            self.data_type = "scalar"
        elif isinstance(self.data, dict):
            self.data_type = "dictionary"
        elif isinstance(self.data, (list, tuple)):
            self.data_type = "sequence"
        else:
            self.data_type = "unknown"

    def get_data(self) -> Any:
        """Get the data."""
        return self.data

    def summary(self) -> str:
        """Get a summary description of this result."""
        if self.data_type == "scalar":
            return f"{self.name}: {self.data} ({type(self.data).__name__})"
        elif self.data_type == "dictionary":
            return f"{self.name}: dict with {len(self.data)} keys"
        elif self.data_type == "sequence":
            return f"{self.name}: {type(self.data).__name__} with {len(self.data)} items"
        else:
            return f"{self.name}: {self.data_type}"

    def __repr__(self) -> str:
        """Return string representation."""
        return f"ScalarMetric(name='{self.name}', data_type='{self.data_type}')"

__post_init__()

Initialize base class and infer data_type if needed.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize base class and infer data_type if needed."""
    # Store the data_type before calling super().__init__
    user_data_type = self.data_type
    super().__init__(name=self.name, metadata=self.metadata)

    # Restore or infer data_type
    if user_data_type is not None:
        self.data_type = user_data_type
    elif isinstance(self.data, (int, float, bool)):
        self.data_type = "scalar"
    elif isinstance(self.data, dict):
        self.data_type = "dictionary"
    elif isinstance(self.data, (list, tuple)):
        self.data_type = "sequence"
    else:
        self.data_type = "unknown"

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    return f"ScalarMetric(name='{self.name}', data_type='{self.data_type}')"

get_data()

Get the data.

Source code in src/lacuna/core/data_types.py
def get_data(self) -> Any:
    """Get the data."""
    return self.data

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    if self.data_type == "scalar":
        return f"{self.name}: {self.data} ({type(self.data).__name__})"
    elif self.data_type == "dictionary":
        return f"{self.name}: dict with {len(self.data)} keys"
    elif self.data_type == "sequence":
        return f"{self.name}: {type(self.data).__name__} with {len(self.data)} items"
    else:
        return f"{self.name}: {self.data_type}"

SpatialMismatchError

Bases: ValidationError

Raised when spatial properties (affine, shape) don't match.

Source code in src/lacuna/core/exceptions.py
class SpatialMismatchError(ValidationError):
    """Raised when spatial properties (affine, shape) don't match."""

    pass

SubjectData

Central data container for a single research participant's mask-based analysis.

This class encapsulates binary mask image data, spatial metadata, subject identifiers, processing provenance, and analysis results. It enforces immutability-by-convention: transformations should return new instances rather than modifying in place.

Parameters:

Name Type Description Default
mask_img Nifti1Image

Binary mask (3D only, values must be 0 or 1).

required
space str

Coordinate space identifier (e.g., 'MNI152NLin6Asym'). If not provided, must be in metadata dict.

None
resolution float

Spatial resolution in millimeters (e.g., 1.0, 2.0). If not provided, must be in metadata dict.

None
metadata dict

Additional subject metadata (e.g., session info, patient ID). 'subject_id' defaults to "sub-unknown" if not provided. Note: Direct kwargs (space, resolution) override metadata dict values.

None
provenance list of dict

Processing history (for deserialization only).

None
results dict

Analysis results (for deserialization only).

None

Raises:

Type Description
ValueError

If space or resolution is not provided (via kwargs or metadata dict), if mask_img is not 3D, or if mask_img is not binary (0/1 values only).

Attributes:

Name Type Description
mask_img Nifti1Image

The binary mask image (read-only).

affine ndarray

4x4 affine transformation matrix (read-only).

space str

Coordinate space identifier (e.g., 'MNI152NLin6Asym').

resolution float

Spatial resolution in millimeters.

metadata ImmutableDict

SubjectData and session metadata (read-only view).

provenance list

Processing history (read-only view).

results dict

Analysis results (read-only view, nested structure).

Examples:

>>> import nibabel as nib
>>> mask_img = nib.load("mask.nii.gz")
>>> mask_data = SubjectData(
...     mask_img,
...     space="MNI152NLin6Asym",
...     resolution=2,
...     metadata={"subject_id": "sub-001"}
... )
>>> print(f"Volume: {mask_data.get_volume_mm3()} mm³")
>>> print(f"Space: {mask_data.space}")
>>> print(f"Resolution: {mask_data.resolution}mm")
Source code in src/lacuna/core/subject_data.py
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
class SubjectData:
    """
    Central data container for a single research participant's mask-based analysis.

    This class encapsulates binary mask image data, spatial metadata, subject
    identifiers, processing provenance, and analysis results. It enforces
    immutability-by-convention: transformations should return new instances
    rather than modifying in place.

    Parameters
    ----------
    mask_img : nibabel.Nifti1Image
        Binary mask (3D only, values must be 0 or 1).
    space : str, optional
        Coordinate space identifier (e.g., 'MNI152NLin6Asym').
        If not provided, must be in metadata dict.
    resolution : float, optional
        Spatial resolution in millimeters (e.g., 1.0, 2.0).
        If not provided, must be in metadata dict.
    metadata : dict, optional
        Additional subject metadata (e.g., session info, patient ID).
        'subject_id' defaults to "sub-unknown" if not provided.
        Note: Direct kwargs (space, resolution) override metadata dict values.
    provenance : list of dict, optional
        Processing history (for deserialization only).
    results : dict, optional
        Analysis results (for deserialization only).

    Raises
    ------
    ValueError
        If space or resolution is not provided (via kwargs or metadata dict),
        if mask_img is not 3D, or if mask_img is not binary (0/1 values only).

    Attributes
    ----------
    mask_img : nibabel.Nifti1Image
        The binary mask image (read-only).
    affine : np.ndarray
        4x4 affine transformation matrix (read-only).
    space : str
        Coordinate space identifier (e.g., 'MNI152NLin6Asym').
    resolution : float
        Spatial resolution in millimeters.
    metadata : ImmutableDict
        SubjectData and session metadata (read-only view).
    provenance : list
        Processing history (read-only view).
    results : dict
        Analysis results (read-only view, nested structure).

    Examples
    --------
    >>> import nibabel as nib
    >>> mask_img = nib.load("mask.nii.gz")

    # Recommended: Direct kwargs for space and resolution
    >>> mask_data = SubjectData(
    ...     mask_img,
    ...     space="MNI152NLin6Asym",
    ...     resolution=2,
    ...     metadata={"subject_id": "sub-001"}
    ... )

    >>> print(f"Volume: {mask_data.get_volume_mm3()} mm³")
    >>> print(f"Space: {mask_data.space}")
    >>> print(f"Resolution: {mask_data.resolution}mm")
    """

    def __init__(
        self,
        mask_img: nib.Nifti1Image,
        space: str | None = None,
        resolution: float | None = None,
        metadata: dict[str, Any] | None = None,
        provenance: list[dict[str, Any]] | None = None,
        results: dict[str, Any] | None = None,
    ):
        # Validate lesion image
        validate_nifti_image(mask_img, require_3d=True, check_affine=True)

        # Validate binary mask
        mask_data = mask_img.get_fdata()
        unique_values = np.unique(mask_data)
        if not np.all(np.isin(unique_values, [0, 1])):
            raise ValueError(
                "mask_img must be a binary mask with only 0 and 1 values.\n"
                f"Found unique values: {unique_values}\n"
                "Please binarize your lesion mask before creating SubjectDataData."
            )

        # Check if mask is empty (no non-zero voxels)
        self._is_empty_mask = not np.any(mask_data > 0)
        if self._is_empty_mask:
            import logging

            subject_id = metadata.get("subject_id") if metadata else None
            logger = logging.getLogger("lacuna")
            if subject_id:
                logger.warning(
                    f"Empty mask for '{subject_id}': mask contains no non-zero voxels. "
                    "Analyses will produce zero-valued outputs for this subject."
                )
            else:
                logger.warning(
                    "Empty mask: mask contains no non-zero voxels. "
                    "Analyses will produce zero-valued outputs for this subject."
                )

        # Store image
        self._mask_img = mask_img

        # Extract and validate affine
        self._affine = mask_img.affine.copy()
        validate_affine(self._affine)

        # Setup metadata - direct kwargs take priority over metadata dict
        if metadata is None:
            metadata = {}
        else:
            # Convert to regular dict in case ImmutableDict was passed
            metadata = dict(metadata)
        if "subject_id" not in metadata:
            metadata["subject_id"] = "sub-unknown"

        # Supported template spaces
        # NLin6 and 2009c are user-facing; 2009b is internal (dTOR985 tractogram)
        SUPPORTED_TEMPLATE_SPACES = [
            "MNI152NLin6Asym",
            "MNI152NLin2009bAsym",
            "MNI152NLin2009cAsym",
        ]

        # Always detect space from image for validation
        detected_space = self._detect_space_from_image(mask_img)

        # Handle space parameter - direct kwarg takes priority, then metadata dict, then auto-detect
        if space is not None:
            declared_space = space
        elif "space" in metadata:
            declared_space = metadata["space"]
        else:
            declared_space = None

        # Validate declared space matches detected space if both available
        if declared_space is not None and detected_space is not None:
            # Import spaces module for equivalence check
            from .spaces import spaces_are_equivalent

            if not spaces_are_equivalent(declared_space, detected_space):
                raise ValueError(
                    f"Space mismatch: declared space '{declared_space}' "
                    f"does not match detected space '{detected_space}' from image affine.\n"
                    "The space must match the affine transformation in the image header.\n"
                    f"Either use space='{detected_space}' or verify the image is in the "
                    f"'{declared_space}' coordinate space."
                )
            self._space = declared_space
        elif declared_space is not None:
            # Declared but not detected — warn user that we can't verify
            import logging

            logging.getLogger("lacuna").warning(
                f"Cannot verify declared space '{declared_space}' from image affine. "
                "The image affine does not match any known template reference. "
                "Proceeding with declared space — results may be incorrect if "
                "the image is not actually in this coordinate space."
            )
            self._space = declared_space
        elif detected_space is not None:
            # Auto-detected from image affine
            self._space = detected_space
        else:
            raise ValueError(
                "Coordinate space must be specified via 'space' parameter.\n"
                "This is required for spatial validation in analysis modules.\n"
                f"Supported spaces: {', '.join(SUPPORTED_TEMPLATE_SPACES)}\n"
                "Example: SubjectData(img, space='MNI152NLin6Asym', resolution=2)"
            )

        # Validate space is in supported list
        if self._space not in SUPPORTED_TEMPLATE_SPACES:
            from lacuna.utils.suggestions import format_suggestions, suggest_similar

            suggestions = suggest_similar(self._space, SUPPORTED_TEMPLATE_SPACES)
            hint = format_suggestions(suggestions)
            msg = (
                f"Invalid space '{self._space}'. "
                f"Supported spaces: {', '.join(SUPPORTED_TEMPLATE_SPACES)}\n"
                "Note: 'native' space is not supported. Use the actual template space instead.\n"
                "Example: SubjectData(img, space='MNI152NLin6Asym', resolution=2)"
            )
            if hint:
                msg = f"{msg}\n{hint}"
            raise ValueError(msg)

        # Handle resolution parameter - direct kwarg takes priority, then metadata dict, then auto-detect
        # Always detect actual resolution from image for validation
        detected_res = self._detect_resolution_from_image(mask_img)

        if resolution is not None:
            declared_resolution = float(resolution)
        elif "resolution" in metadata:
            declared_resolution = float(metadata["resolution"])
        else:
            declared_resolution = None

        # Validate declared resolution matches actual resolution if both available
        if declared_resolution is not None and detected_res is not None:
            # Allow small tolerance for floating point comparison
            if abs(declared_resolution - detected_res) > 0.1:
                raise ValueError(
                    f"Resolution mismatch: declared resolution ({declared_resolution}mm) "
                    f"does not match actual image resolution ({detected_res}mm).\n"
                    "The resolution must match the voxel dimensions in the image affine.\n"
                    f"Either use resolution={detected_res} or resample the image to "
                    f"{declared_resolution}mm resolution first."
                )
            self._resolution = declared_resolution
        elif declared_resolution is not None:
            # Declared but not detected (anisotropic image) - trust user
            self._resolution = declared_resolution
        elif detected_res is not None:
            # Auto-detected from isotropic image
            self._resolution = detected_res
        else:
            raise ValueError(
                "Spatial resolution must be specified via 'resolution' parameter (in mm).\n"
                "This is required for spatial validation and template matching.\n"
                "Common values: 1, 2 (for 1mm or 2mm resolution)\n"
                "Example: SubjectData(img, space='MNI152NLin6Asym', resolution=2)"
            )

        # Store space and resolution in metadata for consistency
        metadata["space"] = self._space
        metadata["resolution"] = self._resolution

        self._metadata = metadata.copy()

        # Setup provenance (empty list for new objects)
        self._provenance = list(provenance) if provenance is not None else []

        # Setup results (nested dict: analysis -> result_name -> result_object)
        # Handle format migration: dict[str, list] -> dict[str, dict[str, Any]]
        if results is not None:
            self._results = self._normalize_results_format(dict(results))
        else:
            self._results = {}

        # Track coordinate space (extracted from metadata or provenance)
        self._coordinate_space = self._infer_coordinate_space()

    @staticmethod
    def _detect_space_from_image(img: nib.Nifti1Image) -> str | None:
        """
        Attempt to detect coordinate space from image header.

        Uses the spaces module to match the image affine against known templates.

        Parameters
        ----------
        img : nibabel.Nifti1Image
            Image to detect space from

        Returns
        -------
        str or None
            Detected space identifier, or None if cannot be determined.
        """
        from .spaces import SpaceDetectionError

        try:
            from .spaces import get_image_space

            detected = get_image_space(img)
            if detected is not None:
                return detected.identifier
        except SpaceDetectionError:
            # Affine doesn't match any known reference — legitimate
            return None
        except Exception:
            pass
        return None

    @staticmethod
    def _detect_resolution_from_image(img: nib.Nifti1Image) -> float | None:
        """
        Detect resolution from image voxel dimensions.

        Returns the resolution only if the image has isotropic voxels
        (within 0.1mm tolerance).

        Parameters
        ----------
        img : nibabel.Nifti1Image
            Image to detect resolution from

        Returns
        -------
        float or None
            Resolution in mm (if isotropic), or None if anisotropic.
        """
        try:
            # Get voxel dimensions from affine
            voxel_dims = np.abs(np.diag(img.affine[:3, :3]))

            # Check if approximately isotropic (within 0.1mm tolerance)
            if np.allclose(voxel_dims, voxel_dims[0], atol=0.1):
                return float(round(voxel_dims[0]))
        except Exception:
            pass
        return None

    @staticmethod
    def _normalize_results_format(results: dict[str, Any]) -> dict[str, dict[str, Any]]:
        """Convert results format to nested dict format.

        Results format: dict[str, list] or dict[str, Any] with non-dict values
        Nested dict format: dict[str, dict[str, Any]]

        Parameters
        ----------
        results : dict
            Results in non-dict values format

        Returns
        -------
        dict[str, dict[str, Any]]
            Results in nested dict format
        """
        normalized = {}
        for namespace, value in results.items():
            if isinstance(value, dict):
                # Already new format
                normalized[namespace] = value
            elif isinstance(value, list):
                # Old format: list of results -> dict with index keys
                normalized[namespace] = {f"result_{i}": v for i, v in enumerate(value)}
            else:
                # Single result object -> wrap in dict
                normalized[namespace] = {"default": value}
        return normalized

    @classmethod
    def from_nifti(
        cls,
        mask_path: str | Path,
        space: str | None = None,
        resolution: float | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> SubjectData:
        """
        Load mask data from NIfTI file.

        Parameters
        ----------
        mask_path : str or Path
            Path to mask NIfTI file.
        space : str, optional
            Coordinate space identifier (e.g., 'MNI152NLin6Asym').
            If not provided, will attempt auto-detection from image header/filename.
        resolution : float, optional
            Spatial resolution in millimeters (e.g., 1.0, 2.0).
            If not provided, will attempt auto-detection from image header/filename.
        metadata : dict, optional
            Additional subject metadata (e.g., session info).
            'subject_id' auto-generated from filename if not provided.

        Returns
        -------
        SubjectData
            Loaded mask data object.

        Raises
        ------
        FileNotFoundError
            If file path doesn't exist.
        NiftiLoadError
            If image fails to load or validate.
        ValueError
            If 'space' or 'resolution' cannot be determined.

        Examples
        --------
        >>> mask_data = SubjectData.from_nifti(
        ...     "mask.nii.gz",
        ...     space="MNI152NLin6Asym",
        ...     resolution=2.0
        ... )
        >>> mask_data = SubjectData.from_nifti(
        ...     "mask.nii.gz",
        ...     space="MNI152NLin6Asym",
        ...     resolution=2.0,
        ...     metadata={"subject_id": "sub-001", "session": "baseline"}
        ... )
        """
        mask_path = Path(mask_path)

        # Load lesion image
        try:
            mask_img = nib.load(mask_path)
        except FileNotFoundError:
            raise
        except Exception as e:
            raise NiftiLoadError(f"Failed to load mask from {mask_path}: {e}") from e

        # Initialize metadata dict
        if metadata is None:
            metadata = {}
        else:
            metadata = metadata.copy()  # Don't modify caller's dict

        # Auto-generate subject_id from filename if not provided
        if "subject_id" not in metadata:
            # Try to extract BIDS-like subject ID from filename
            filename = mask_path.stem.replace(".nii", "")
            if "sub-" in filename:
                # Extract sub-XXX pattern
                parts = filename.split("_")
                for part in parts:
                    if part.startswith("sub-"):
                        metadata["subject_id"] = part
                        break

        # Auto-extract session_id from filename if not provided (BIDS compliant)
        if "session_id" not in metadata:
            filename = mask_path.stem.replace(".nii", "")
            if "ses-" in filename:
                # Extract ses-XXX pattern
                parts = filename.split("_")
                for part in parts:
                    if part.startswith("ses-"):
                        metadata["session_id"] = part
                        break

        # Handle space and resolution (priority: kwargs > metadata > auto-detection)
        if space is not None:
            metadata["space"] = space
        if resolution is not None:
            metadata["resolution"] = resolution

        # If coordinate space information is still missing, attempt auto-detection
        if "space" not in metadata or "resolution" not in metadata:
            try:
                # Import lazily to avoid circular imports at module load time
                from .spaces import get_image_space

                detected = get_image_space(mask_img, filepath=mask_path)
                if detected is not None:
                    # Populate metadata entries if not already present
                    if "space" not in metadata:
                        metadata["space"] = detected.identifier
                    if "resolution" not in metadata:
                        metadata["resolution"] = detected.resolution
            except Exception:
                # Detection is best-effort; leave metadata untouched and allow
                # __init__ to raise a helpful error if necessary.
                pass

        return cls(mask_img=mask_img, metadata=metadata)

    def validate(self) -> bool:
        """
        Validate data integrity.

        Checks that affine is invertible, image is 3D, and spatial properties
        are consistent.

        Returns
        -------
        bool
            True if all checks pass.

        Warns
        -----
        UserWarning
            If mask is empty or has suspicious properties.

        Raises
        ------
        ValidationError
            If critical invariants violated.

        Examples
        --------
        >>> mask_data.validate()
        True
        """
        # Validate images
        validate_nifti_image(self._mask_img, require_3d=True)

        # Validate affine
        validate_affine(self._affine)

        # Note: Empty mask check is performed at __init__ time and raises EmptyMaskError

        return True

    def copy(self) -> SubjectData:
        """
        Create a deep copy of this SubjectData instance.

        Returns
        -------
        SubjectData
            Independent copy with same data.

        Examples
        --------
        >>> mask_copy = mask_data.copy()
        >>> mask_copy is mask_data
        False
        """
        return SubjectData(
            mask_img=self._mask_img,
            space=self._space,
            resolution=self._resolution,
            metadata=copy.deepcopy(self._metadata),
            provenance=copy.deepcopy(self._provenance),
            results=copy.deepcopy(self._results),
        )

    def get_coordinate_space(self) -> str:
        """
        Get current coordinate space from metadata.

        Returns
        -------
        str
            Coordinate space identifier (e.g., 'MNI152NLin6Asym').

        Examples
        --------
        >>> mask_data.get_coordinate_space()
        'MNI152NLin6Asym'
        """
        return self._coordinate_space

    def get_volume_mm3(self) -> float:
        """
        Calculate mask volume in cubic millimeters.

        Returns
        -------
        float
            Total mask volume (sum of non-zero voxels * voxel volume).

        Examples
        --------
        >>> volume = mask_data.get_volume_mm3()
        >>> print(f"Mask volume: {volume:.2f} mm³")
        """
        mask_data = self._mask_img.get_fdata()
        num_voxels = np.sum(mask_data > 0)

        # Calculate voxel volume from affine
        voxel_dims = np.abs(np.diag(self._affine[:3, :3]))
        voxel_volume_mm3 = np.prod(voxel_dims)

        return float(num_voxels * voxel_volume_mm3)

    def to_dict(self) -> dict[str, Any]:
        """
        Serialize to JSON-compatible dictionary (excludes image data).

        Returns
        -------
        dict
            Metadata, provenance, and results (no NIfTI arrays).

        Examples
        --------
        >>> data = mask_data.to_dict()
        >>> import json
        >>> json.dumps(data)  # Should succeed
        """
        return {
            "metadata": copy.deepcopy(self._metadata),
            "provenance": copy.deepcopy(self._provenance),
            "results": copy.deepcopy(self._results),
            "coordinate_space": self._coordinate_space,
            "affine": self._affine.tolist(),  # Convert numpy to list for JSON
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any], mask_img: nib.Nifti1Image) -> SubjectData:
        """
        Deserialize from dictionary + NIfTI image.

        Parameters
        ----------
        data : dict
            Output from to_dict().
        mask_img : nibabel.Nifti1Image
            Mask image (loaded separately).

        Returns
        -------
        SubjectData
            Reconstructed object.

        Examples
        --------
        >>> data = mask_data.to_dict()
        >>> mask_img = nib.load("mask.nii.gz")
        >>> mask_restored = SubjectData.from_dict(data, mask_img)
        """
        return cls(
            mask_img=mask_img,
            metadata=data.get("metadata"),
            provenance=data.get("provenance"),
            results=data.get("results"),
        )

    def add_result(self, namespace: str, results: dict[str, Any]) -> SubjectData:
        """
        Create new SubjectData with additional analysis results.

        This method follows immutability-by-convention: it returns a new instance
        with the updated results rather than modifying the current instance.

        Parameters
        ----------
        namespace : str
            Result namespace (e.g., 'FunctionalNetworkMapping', 'ParcelAggregation').
            Should match the analysis module name for clarity.
        results : dict[str, Any]
            Analysis results as a dict mapping result names to result objects.
            For single result: {"result_name": result_object}
            For multiple results (e.g., multi-atlas): {"Schaefer100": roi_result1, "Tian": roi_result2}

        Returns
        -------
        SubjectData
            New instance with added results.

        Raises
        ------
        ValueError
            If namespace already exists in results.

        Examples
        --------
        >>> # Single result
        >>> results = {"default": VoxelMapResult(...)}
        >>> lesion_with_results = lesion.add_result("VolumeAnalysis", results)
        >>> "VolumeAnalysis" in lesion_with_results.results
        True
        >>>
        >>> # Multi-atlas results
        >>> results = {"Schaefer100": roi_result1, "Tian": roi_result2}
        >>> lesion_with_results = lesion.add_result("ParcelAggregation", results)
        >>> lesion_with_results.results["ParcelAggregation"]["Schaefer100"]
        ParcelData(...)
        """
        if namespace in self._results:
            raise ValueError(
                f"Result namespace '{namespace}' already exists. "
                f"Use a different namespace or create a new SubjectData instance."
            )

        # Create new results dict with added namespace
        new_results = copy.deepcopy(self._results)
        new_results[namespace] = copy.deepcopy(results)

        # Return new instance
        return SubjectData(
            mask_img=self._mask_img,
            space=self._space,
            resolution=self._resolution,
            metadata=copy.deepcopy(self._metadata),
            provenance=copy.deepcopy(self._provenance),
            results=new_results,
        )

    def add_provenance(self, record: dict[str, Any]) -> SubjectData:
        """
        Create new SubjectData with additional provenance record.

        This method follows immutability-by-convention: it returns a new instance
        with the updated provenance history rather than modifying the current instance.

        Parameters
        ----------
        record : dict
            Provenance record (from create_provenance_record() or compatible dict).
            Must contain 'function', 'parameters', 'timestamp', and 'version' keys.

        Returns
        -------
        SubjectData
            New instance with appended provenance.

        Raises
        ------
        ValueError
            If record is missing required fields.

        Examples
        --------
        >>> from lacuna.core.provenance import create_provenance_record
        >>> prov = create_provenance_record(
        ...     function="lacuna.analysis.RegionalDamage",
        ...     version="0.1.0"
        ... )
        >>> result = mask_data.add_provenance(prov)
        >>> len(result.provenance) == len(mask_data.provenance) + 1
        True
        """
        # Validate record has required fields
        required_fields = ["function", "parameters", "timestamp", "version"]
        missing_fields = [f for f in required_fields if f not in record]
        if missing_fields:
            raise ValueError(
                f"Provenance record missing required fields: {missing_fields}. "
                f"Use create_provenance_record() to create valid records."
            )

        # Create new provenance list with appended record
        new_provenance = copy.deepcopy(self._provenance)
        new_provenance.append(copy.deepcopy(record))

        # Return new instance
        return SubjectData(
            mask_img=self._mask_img,
            space=self._space,
            resolution=self._resolution,
            metadata=copy.deepcopy(self._metadata),
            provenance=new_provenance,
            results=copy.deepcopy(self._results),
        )

    def _infer_coordinate_space(self) -> str:
        """
        Get coordinate space.

        Returns the coordinate space identifier (e.g., 'MNI152NLin6Asym').
        This is always present (validated in __init__).

        """
        return self._space

    # Read-only properties

    @property
    def is_empty_mask(self) -> bool:
        """Whether the mask contains no non-zero voxels."""
        return self._is_empty_mask

    @property
    def mask_img(self) -> nib.Nifti1Image:
        """Binary mask image."""
        return self._mask_img

    @property
    def affine(self) -> np.ndarray:
        """4x4 affine transformation matrix (voxel to world)."""
        return self._affine.copy()  # Return copy to prevent modification

    @property
    def metadata(self) -> ImmutableDict:
        """
        SubjectData and session metadata (read-only view).

        Returns an immutable dictionary that prevents modifications with clear
        error messages. To update metadata, create a new SubjectData instance
        with the desired metadata.

        Returns
        -------
        ImmutableDict
            Read-only view of metadata. Raises TypeError on modification attempts.

        Examples
        --------
        >>> mask_data.metadata["subject_id"]  # OK - reading
        'sub-001'
        >>> mask_data.metadata["new_key"] = "value"  # Raises TypeError
        Traceback (most recent call last):
            ...
        TypeError: Cannot modify SubjectData.metadata - it is immutable.
        To update metadata, create a new SubjectData instance instead.
        """
        return ImmutableDict(self._metadata, "metadata")

    @property
    def provenance(self) -> list[dict[str, Any]]:
        """Processing history (immutable view)."""
        return copy.deepcopy(self._provenance)  # Deep copy for nested dicts

    @property
    def results(self) -> dict[str, dict[str, Any]]:
        """Analysis results (immutable view).

        Returns dict mapping analysis namespace to result dict.
        Result dict maps result names to result objects.

        Access pattern: ``results['AnalysisName']['result_name']``
        """
        return copy.deepcopy(self._results)  # Deep copy for nested structures

    @property
    def space(self) -> str:
        """
        Coordinate space identifier (e.g., 'MNI152NLin6Asym').

        Returns
        -------
        str
            The coordinate space.

        Examples
        --------
        >>> mask_data.space
        'MNI152NLin6Asym'
        """
        return self._space

    @property
    def resolution(self) -> float:
        """
        Spatial resolution in millimeters.

        Returns
        -------
        float
            The spatial resolution.

        Examples
        --------
        >>> mask_data.resolution
        2.0
        """
        return self._resolution

    def __getattr__(self, name: str) -> dict[str, Any]:
        """Enable attribute-based access to analysis results.

        Allows accessing results via `mask_data.AnalysisName` instead of
        `mask_data.results['AnalysisName']`.

        Parameters
        ----------
        name : str
            Analysis namespace (e.g., "ParcelAggregation", "RegionalDamage")

        Returns
        -------
        dict[str, Any]
            Result dictionary for the requested analysis

        Raises
        ------
        AttributeError
            If the attribute doesn't exist in results

        Examples
        --------
        >>> # After running ParcelAggregation:
        >>> mask_data.ParcelAggregation["Schaefer100"]
        ParcelData(...)
        >>> # Equivalent to:
        >>> mask_data.results["ParcelAggregation"]["Schaefer100"]
        ParcelData(...)
        """
        # Only intercept result namespace lookups, not internal attributes
        if name.startswith("_"):
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

        # Check if name exists in results
        if name in self._results:
            return self._results[name]  # Return reference, not copy

        # Not found - raise AttributeError with helpful message
        available = ", ".join(self._results.keys()) if self._results else "none"
        raise AttributeError(
            f"'{type(self).__name__}' object has no attribute '{name}'.\n"
            f"Available analysis results: {available}"
        )

    def get_result(
        self,
        analysis: str,
        pattern: str | None = None,
        unwrap: bool = True,
    ) -> Any:
        """
        Get result by analysis name with optional glob pattern filtering.

        This method provides a convenient way to access results using
        glob patterns for flexible filtering.

        Parameters
        ----------
        analysis : str
            Analysis namespace (e.g., "ParcelAggregation", "FunctionalNetworkMapping").
        pattern : str, optional
            Glob pattern to match result keys (e.g., "*rmap*",
            "atlas-Schaefer*"). Supports fnmatch-style wildcards:
            - ``*`` matches any sequence of characters
            - ``?`` matches any single character
            - ``[seq]`` matches any character in seq
        unwrap : bool, default=True
            If True, call `.get_data()` on result objects to return raw data
            (e.g., numpy arrays, nibabel images) instead of wrapper objects.

        Returns
        -------
        Any
            - If no pattern: dict of all results for the analysis
            - If single match: the result value directly
            - If multiple matches: dict of matching results
            - If unwrap=True: raw data via `.get_data()` instead of wrappers

        Raises
        ------
        KeyError
            If analysis namespace not found, or if no results match pattern.

        Examples
        --------
        >>> # Get all ParcelAggregation results
        >>> results = subject.get_result("ParcelAggregation")

        >>> # Get by glob pattern
        >>> z_map = subject.get_result("FunctionalNetworkMapping", pattern="*zmap*")

        >>> # Get unwrapped data directly (nibabel image instead of VoxelMap)
        >>> corr_img = subject.get_result(
        ...     "FunctionalNetworkMapping", pattern="*rmap*", unwrap=True
        ... )
        >>> corr_img.shape  # Access numpy array directly
        (91, 109, 91)

        See Also
        --------
        results : Property for accessing all results.
        lacuna.core.keys.build_result_key : Build key from components.
        lacuna.core.keys.parse_result_key : Parse key into components.
        """
        from fnmatch import fnmatch

        from lacuna.utils.suggestions import format_suggestions, suggest_similar

        if analysis not in self._results:
            available = list(self._results.keys())
            suggestions = suggest_similar(analysis, available)
            hint = format_suggestions(suggestions)
            msg = f"Analysis '{analysis}' not found in results."
            if hint:
                msg = f"{msg} {hint}"
            raise KeyError(msg)

        analysis_results = self._results[analysis]

        def _unwrap_value(val: Any) -> Any:
            """Call get_data() on result objects if they have it.

            Skips nibabel images (deprecated get_data()) - returns as-is.
            """
            import nibabel as nib

            # Skip nibabel images - they're already raw data
            if isinstance(val, nib.Nifti1Image):
                return val
            # Call get_data() on wrapper objects (VoxelMap, ParcelData, etc.)
            if hasattr(val, "get_data") and callable(val.get_data):
                return val.get_data()
            return val

        def _unwrap_dict(d: dict) -> dict:
            """Unwrap all values in a dict."""
            return {k: _unwrap_value(v) for k, v in d.items()}

        # If no pattern, return all results for this analysis
        if pattern is None:
            if unwrap:
                return _unwrap_dict(analysis_results)
            return analysis_results

        # Filter results by glob pattern
        matching = {}
        for key, value in analysis_results.items():
            if fnmatch(key, pattern):
                matching[key] = value

        if len(matching) == 0:
            # No matches found - provide suggestions
            available_keys = list(analysis_results.keys())
            suggestions = suggest_similar(pattern, available_keys)
            hint = format_suggestions(suggestions)
            msg = f"No results found in {analysis} matching pattern={pattern!r}."
            if hint:
                msg = f"{msg} {hint}"
            raise KeyError(msg)

        if len(matching) == 1:
            # Single match - return the value directly
            result = next(iter(matching.values()))
            if unwrap:
                return _unwrap_value(result)
            return result

        # Multiple matches - return as dict
        if unwrap:
            return _unwrap_dict(matching)
        return matching

affine property

4x4 affine transformation matrix (voxel to world).

is_empty_mask property

Whether the mask contains no non-zero voxels.

mask_img property

Binary mask image.

metadata property

SubjectData and session metadata (read-only view).

Returns an immutable dictionary that prevents modifications with clear error messages. To update metadata, create a new SubjectData instance with the desired metadata.

Returns:

Type Description
ImmutableDict

Read-only view of metadata. Raises TypeError on modification attempts.

Examples:

>>> mask_data.metadata["subject_id"]  # OK - reading
'sub-001'
>>> mask_data.metadata["new_key"] = "value"  # Raises TypeError
Traceback (most recent call last):
    ...
TypeError: Cannot modify SubjectData.metadata - it is immutable.
To update metadata, create a new SubjectData instance instead.

provenance property

Processing history (immutable view).

resolution property

Spatial resolution in millimeters.

Returns:

Type Description
float

The spatial resolution.

Examples:

>>> mask_data.resolution
2.0

results property

Analysis results (immutable view).

Returns dict mapping analysis namespace to result dict. Result dict maps result names to result objects.

Access pattern: results['AnalysisName']['result_name']

space property

Coordinate space identifier (e.g., 'MNI152NLin6Asym').

Returns:

Type Description
str

The coordinate space.

Examples:

>>> mask_data.space
'MNI152NLin6Asym'

__getattr__(name)

Enable attribute-based access to analysis results.

Allows accessing results via mask_data.AnalysisName instead of mask_data.results['AnalysisName'].

Parameters:

Name Type Description Default
name str

Analysis namespace (e.g., "ParcelAggregation", "RegionalDamage")

required

Returns:

Type Description
dict[str, Any]

Result dictionary for the requested analysis

Raises:

Type Description
AttributeError

If the attribute doesn't exist in results

Examples:

>>> # After running ParcelAggregation:
>>> mask_data.ParcelAggregation["Schaefer100"]
ParcelData(...)
>>> # Equivalent to:
>>> mask_data.results["ParcelAggregation"]["Schaefer100"]
ParcelData(...)
Source code in src/lacuna/core/subject_data.py
def __getattr__(self, name: str) -> dict[str, Any]:
    """Enable attribute-based access to analysis results.

    Allows accessing results via `mask_data.AnalysisName` instead of
    `mask_data.results['AnalysisName']`.

    Parameters
    ----------
    name : str
        Analysis namespace (e.g., "ParcelAggregation", "RegionalDamage")

    Returns
    -------
    dict[str, Any]
        Result dictionary for the requested analysis

    Raises
    ------
    AttributeError
        If the attribute doesn't exist in results

    Examples
    --------
    >>> # After running ParcelAggregation:
    >>> mask_data.ParcelAggregation["Schaefer100"]
    ParcelData(...)
    >>> # Equivalent to:
    >>> mask_data.results["ParcelAggregation"]["Schaefer100"]
    ParcelData(...)
    """
    # Only intercept result namespace lookups, not internal attributes
    if name.startswith("_"):
        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

    # Check if name exists in results
    if name in self._results:
        return self._results[name]  # Return reference, not copy

    # Not found - raise AttributeError with helpful message
    available = ", ".join(self._results.keys()) if self._results else "none"
    raise AttributeError(
        f"'{type(self).__name__}' object has no attribute '{name}'.\n"
        f"Available analysis results: {available}"
    )

add_provenance(record)

Create new SubjectData with additional provenance record.

This method follows immutability-by-convention: it returns a new instance with the updated provenance history rather than modifying the current instance.

Parameters:

Name Type Description Default
record dict

Provenance record (from create_provenance_record() or compatible dict). Must contain 'function', 'parameters', 'timestamp', and 'version' keys.

required

Returns:

Type Description
SubjectData

New instance with appended provenance.

Raises:

Type Description
ValueError

If record is missing required fields.

Examples:

>>> from lacuna.core.provenance import create_provenance_record
>>> prov = create_provenance_record(
...     function="lacuna.analysis.RegionalDamage",
...     version="0.1.0"
... )
>>> result = mask_data.add_provenance(prov)
>>> len(result.provenance) == len(mask_data.provenance) + 1
True
Source code in src/lacuna/core/subject_data.py
def add_provenance(self, record: dict[str, Any]) -> SubjectData:
    """
    Create new SubjectData with additional provenance record.

    This method follows immutability-by-convention: it returns a new instance
    with the updated provenance history rather than modifying the current instance.

    Parameters
    ----------
    record : dict
        Provenance record (from create_provenance_record() or compatible dict).
        Must contain 'function', 'parameters', 'timestamp', and 'version' keys.

    Returns
    -------
    SubjectData
        New instance with appended provenance.

    Raises
    ------
    ValueError
        If record is missing required fields.

    Examples
    --------
    >>> from lacuna.core.provenance import create_provenance_record
    >>> prov = create_provenance_record(
    ...     function="lacuna.analysis.RegionalDamage",
    ...     version="0.1.0"
    ... )
    >>> result = mask_data.add_provenance(prov)
    >>> len(result.provenance) == len(mask_data.provenance) + 1
    True
    """
    # Validate record has required fields
    required_fields = ["function", "parameters", "timestamp", "version"]
    missing_fields = [f for f in required_fields if f not in record]
    if missing_fields:
        raise ValueError(
            f"Provenance record missing required fields: {missing_fields}. "
            f"Use create_provenance_record() to create valid records."
        )

    # Create new provenance list with appended record
    new_provenance = copy.deepcopy(self._provenance)
    new_provenance.append(copy.deepcopy(record))

    # Return new instance
    return SubjectData(
        mask_img=self._mask_img,
        space=self._space,
        resolution=self._resolution,
        metadata=copy.deepcopy(self._metadata),
        provenance=new_provenance,
        results=copy.deepcopy(self._results),
    )

add_result(namespace, results)

Create new SubjectData with additional analysis results.

This method follows immutability-by-convention: it returns a new instance with the updated results rather than modifying the current instance.

Parameters:

Name Type Description Default
namespace str

Result namespace (e.g., 'FunctionalNetworkMapping', 'ParcelAggregation'). Should match the analysis module name for clarity.

required
results dict[str, Any]

Analysis results as a dict mapping result names to result objects. For single result: {"result_name": result_object} For multiple results (e.g., multi-atlas): {"Schaefer100": roi_result1, "Tian": roi_result2}

required

Returns:

Type Description
SubjectData

New instance with added results.

Raises:

Type Description
ValueError

If namespace already exists in results.

Examples:

>>> # Single result
>>> results = {"default": VoxelMapResult(...)}
>>> lesion_with_results = lesion.add_result("VolumeAnalysis", results)
>>> "VolumeAnalysis" in lesion_with_results.results
True
>>>
>>> # Multi-atlas results
>>> results = {"Schaefer100": roi_result1, "Tian": roi_result2}
>>> lesion_with_results = lesion.add_result("ParcelAggregation", results)
>>> lesion_with_results.results["ParcelAggregation"]["Schaefer100"]
ParcelData(...)
Source code in src/lacuna/core/subject_data.py
def add_result(self, namespace: str, results: dict[str, Any]) -> SubjectData:
    """
    Create new SubjectData with additional analysis results.

    This method follows immutability-by-convention: it returns a new instance
    with the updated results rather than modifying the current instance.

    Parameters
    ----------
    namespace : str
        Result namespace (e.g., 'FunctionalNetworkMapping', 'ParcelAggregation').
        Should match the analysis module name for clarity.
    results : dict[str, Any]
        Analysis results as a dict mapping result names to result objects.
        For single result: {"result_name": result_object}
        For multiple results (e.g., multi-atlas): {"Schaefer100": roi_result1, "Tian": roi_result2}

    Returns
    -------
    SubjectData
        New instance with added results.

    Raises
    ------
    ValueError
        If namespace already exists in results.

    Examples
    --------
    >>> # Single result
    >>> results = {"default": VoxelMapResult(...)}
    >>> lesion_with_results = lesion.add_result("VolumeAnalysis", results)
    >>> "VolumeAnalysis" in lesion_with_results.results
    True
    >>>
    >>> # Multi-atlas results
    >>> results = {"Schaefer100": roi_result1, "Tian": roi_result2}
    >>> lesion_with_results = lesion.add_result("ParcelAggregation", results)
    >>> lesion_with_results.results["ParcelAggregation"]["Schaefer100"]
    ParcelData(...)
    """
    if namespace in self._results:
        raise ValueError(
            f"Result namespace '{namespace}' already exists. "
            f"Use a different namespace or create a new SubjectData instance."
        )

    # Create new results dict with added namespace
    new_results = copy.deepcopy(self._results)
    new_results[namespace] = copy.deepcopy(results)

    # Return new instance
    return SubjectData(
        mask_img=self._mask_img,
        space=self._space,
        resolution=self._resolution,
        metadata=copy.deepcopy(self._metadata),
        provenance=copy.deepcopy(self._provenance),
        results=new_results,
    )

copy()

Create a deep copy of this SubjectData instance.

Returns:

Type Description
SubjectData

Independent copy with same data.

Examples:

>>> mask_copy = mask_data.copy()
>>> mask_copy is mask_data
False
Source code in src/lacuna/core/subject_data.py
def copy(self) -> SubjectData:
    """
    Create a deep copy of this SubjectData instance.

    Returns
    -------
    SubjectData
        Independent copy with same data.

    Examples
    --------
    >>> mask_copy = mask_data.copy()
    >>> mask_copy is mask_data
    False
    """
    return SubjectData(
        mask_img=self._mask_img,
        space=self._space,
        resolution=self._resolution,
        metadata=copy.deepcopy(self._metadata),
        provenance=copy.deepcopy(self._provenance),
        results=copy.deepcopy(self._results),
    )

from_dict(data, mask_img) classmethod

Deserialize from dictionary + NIfTI image.

Parameters:

Name Type Description Default
data dict

Output from to_dict().

required
mask_img Nifti1Image

Mask image (loaded separately).

required

Returns:

Type Description
SubjectData

Reconstructed object.

Examples:

>>> data = mask_data.to_dict()
>>> mask_img = nib.load("mask.nii.gz")
>>> mask_restored = SubjectData.from_dict(data, mask_img)
Source code in src/lacuna/core/subject_data.py
@classmethod
def from_dict(cls, data: dict[str, Any], mask_img: nib.Nifti1Image) -> SubjectData:
    """
    Deserialize from dictionary + NIfTI image.

    Parameters
    ----------
    data : dict
        Output from to_dict().
    mask_img : nibabel.Nifti1Image
        Mask image (loaded separately).

    Returns
    -------
    SubjectData
        Reconstructed object.

    Examples
    --------
    >>> data = mask_data.to_dict()
    >>> mask_img = nib.load("mask.nii.gz")
    >>> mask_restored = SubjectData.from_dict(data, mask_img)
    """
    return cls(
        mask_img=mask_img,
        metadata=data.get("metadata"),
        provenance=data.get("provenance"),
        results=data.get("results"),
    )

from_nifti(mask_path, space=None, resolution=None, metadata=None) classmethod

Load mask data from NIfTI file.

Parameters:

Name Type Description Default
mask_path str or Path

Path to mask NIfTI file.

required
space str

Coordinate space identifier (e.g., 'MNI152NLin6Asym'). If not provided, will attempt auto-detection from image header/filename.

None
resolution float

Spatial resolution in millimeters (e.g., 1.0, 2.0). If not provided, will attempt auto-detection from image header/filename.

None
metadata dict

Additional subject metadata (e.g., session info). 'subject_id' auto-generated from filename if not provided.

None

Returns:

Type Description
SubjectData

Loaded mask data object.

Raises:

Type Description
FileNotFoundError

If file path doesn't exist.

NiftiLoadError

If image fails to load or validate.

ValueError

If 'space' or 'resolution' cannot be determined.

Examples:

>>> mask_data = SubjectData.from_nifti(
...     "mask.nii.gz",
...     space="MNI152NLin6Asym",
...     resolution=2.0
... )
>>> mask_data = SubjectData.from_nifti(
...     "mask.nii.gz",
...     space="MNI152NLin6Asym",
...     resolution=2.0,
...     metadata={"subject_id": "sub-001", "session": "baseline"}
... )
Source code in src/lacuna/core/subject_data.py
@classmethod
def from_nifti(
    cls,
    mask_path: str | Path,
    space: str | None = None,
    resolution: float | None = None,
    metadata: dict[str, Any] | None = None,
) -> SubjectData:
    """
    Load mask data from NIfTI file.

    Parameters
    ----------
    mask_path : str or Path
        Path to mask NIfTI file.
    space : str, optional
        Coordinate space identifier (e.g., 'MNI152NLin6Asym').
        If not provided, will attempt auto-detection from image header/filename.
    resolution : float, optional
        Spatial resolution in millimeters (e.g., 1.0, 2.0).
        If not provided, will attempt auto-detection from image header/filename.
    metadata : dict, optional
        Additional subject metadata (e.g., session info).
        'subject_id' auto-generated from filename if not provided.

    Returns
    -------
    SubjectData
        Loaded mask data object.

    Raises
    ------
    FileNotFoundError
        If file path doesn't exist.
    NiftiLoadError
        If image fails to load or validate.
    ValueError
        If 'space' or 'resolution' cannot be determined.

    Examples
    --------
    >>> mask_data = SubjectData.from_nifti(
    ...     "mask.nii.gz",
    ...     space="MNI152NLin6Asym",
    ...     resolution=2.0
    ... )
    >>> mask_data = SubjectData.from_nifti(
    ...     "mask.nii.gz",
    ...     space="MNI152NLin6Asym",
    ...     resolution=2.0,
    ...     metadata={"subject_id": "sub-001", "session": "baseline"}
    ... )
    """
    mask_path = Path(mask_path)

    # Load lesion image
    try:
        mask_img = nib.load(mask_path)
    except FileNotFoundError:
        raise
    except Exception as e:
        raise NiftiLoadError(f"Failed to load mask from {mask_path}: {e}") from e

    # Initialize metadata dict
    if metadata is None:
        metadata = {}
    else:
        metadata = metadata.copy()  # Don't modify caller's dict

    # Auto-generate subject_id from filename if not provided
    if "subject_id" not in metadata:
        # Try to extract BIDS-like subject ID from filename
        filename = mask_path.stem.replace(".nii", "")
        if "sub-" in filename:
            # Extract sub-XXX pattern
            parts = filename.split("_")
            for part in parts:
                if part.startswith("sub-"):
                    metadata["subject_id"] = part
                    break

    # Auto-extract session_id from filename if not provided (BIDS compliant)
    if "session_id" not in metadata:
        filename = mask_path.stem.replace(".nii", "")
        if "ses-" in filename:
            # Extract ses-XXX pattern
            parts = filename.split("_")
            for part in parts:
                if part.startswith("ses-"):
                    metadata["session_id"] = part
                    break

    # Handle space and resolution (priority: kwargs > metadata > auto-detection)
    if space is not None:
        metadata["space"] = space
    if resolution is not None:
        metadata["resolution"] = resolution

    # If coordinate space information is still missing, attempt auto-detection
    if "space" not in metadata or "resolution" not in metadata:
        try:
            # Import lazily to avoid circular imports at module load time
            from .spaces import get_image_space

            detected = get_image_space(mask_img, filepath=mask_path)
            if detected is not None:
                # Populate metadata entries if not already present
                if "space" not in metadata:
                    metadata["space"] = detected.identifier
                if "resolution" not in metadata:
                    metadata["resolution"] = detected.resolution
        except Exception:
            # Detection is best-effort; leave metadata untouched and allow
            # __init__ to raise a helpful error if necessary.
            pass

    return cls(mask_img=mask_img, metadata=metadata)

get_coordinate_space()

Get current coordinate space from metadata.

Returns:

Type Description
str

Coordinate space identifier (e.g., 'MNI152NLin6Asym').

Examples:

>>> mask_data.get_coordinate_space()
'MNI152NLin6Asym'
Source code in src/lacuna/core/subject_data.py
def get_coordinate_space(self) -> str:
    """
    Get current coordinate space from metadata.

    Returns
    -------
    str
        Coordinate space identifier (e.g., 'MNI152NLin6Asym').

    Examples
    --------
    >>> mask_data.get_coordinate_space()
    'MNI152NLin6Asym'
    """
    return self._coordinate_space

get_result(analysis, pattern=None, unwrap=True)

Get result by analysis name with optional glob pattern filtering.

This method provides a convenient way to access results using glob patterns for flexible filtering.

Parameters:

Name Type Description Default
analysis str

Analysis namespace (e.g., "ParcelAggregation", "FunctionalNetworkMapping").

required
pattern str

Glob pattern to match result keys (e.g., "rmap", "atlas-Schaefer*"). Supports fnmatch-style wildcards: - * matches any sequence of characters - ? matches any single character - [seq] matches any character in seq

None
unwrap bool

If True, call .get_data() on result objects to return raw data (e.g., numpy arrays, nibabel images) instead of wrapper objects.

True

Returns:

Type Description
Any
  • If no pattern: dict of all results for the analysis
  • If single match: the result value directly
  • If multiple matches: dict of matching results
  • If unwrap=True: raw data via .get_data() instead of wrappers

Raises:

Type Description
KeyError

If analysis namespace not found, or if no results match pattern.

Examples:

>>> # Get all ParcelAggregation results
>>> results = subject.get_result("ParcelAggregation")
>>> # Get by glob pattern
>>> z_map = subject.get_result("FunctionalNetworkMapping", pattern="*zmap*")
>>> # Get unwrapped data directly (nibabel image instead of VoxelMap)
>>> corr_img = subject.get_result(
...     "FunctionalNetworkMapping", pattern="*rmap*", unwrap=True
... )
>>> corr_img.shape  # Access numpy array directly
(91, 109, 91)
See Also

results : Property for accessing all results. lacuna.core.keys.build_result_key : Build key from components. lacuna.core.keys.parse_result_key : Parse key into components.

Source code in src/lacuna/core/subject_data.py
def get_result(
    self,
    analysis: str,
    pattern: str | None = None,
    unwrap: bool = True,
) -> Any:
    """
    Get result by analysis name with optional glob pattern filtering.

    This method provides a convenient way to access results using
    glob patterns for flexible filtering.

    Parameters
    ----------
    analysis : str
        Analysis namespace (e.g., "ParcelAggregation", "FunctionalNetworkMapping").
    pattern : str, optional
        Glob pattern to match result keys (e.g., "*rmap*",
        "atlas-Schaefer*"). Supports fnmatch-style wildcards:
        - ``*`` matches any sequence of characters
        - ``?`` matches any single character
        - ``[seq]`` matches any character in seq
    unwrap : bool, default=True
        If True, call `.get_data()` on result objects to return raw data
        (e.g., numpy arrays, nibabel images) instead of wrapper objects.

    Returns
    -------
    Any
        - If no pattern: dict of all results for the analysis
        - If single match: the result value directly
        - If multiple matches: dict of matching results
        - If unwrap=True: raw data via `.get_data()` instead of wrappers

    Raises
    ------
    KeyError
        If analysis namespace not found, or if no results match pattern.

    Examples
    --------
    >>> # Get all ParcelAggregation results
    >>> results = subject.get_result("ParcelAggregation")

    >>> # Get by glob pattern
    >>> z_map = subject.get_result("FunctionalNetworkMapping", pattern="*zmap*")

    >>> # Get unwrapped data directly (nibabel image instead of VoxelMap)
    >>> corr_img = subject.get_result(
    ...     "FunctionalNetworkMapping", pattern="*rmap*", unwrap=True
    ... )
    >>> corr_img.shape  # Access numpy array directly
    (91, 109, 91)

    See Also
    --------
    results : Property for accessing all results.
    lacuna.core.keys.build_result_key : Build key from components.
    lacuna.core.keys.parse_result_key : Parse key into components.
    """
    from fnmatch import fnmatch

    from lacuna.utils.suggestions import format_suggestions, suggest_similar

    if analysis not in self._results:
        available = list(self._results.keys())
        suggestions = suggest_similar(analysis, available)
        hint = format_suggestions(suggestions)
        msg = f"Analysis '{analysis}' not found in results."
        if hint:
            msg = f"{msg} {hint}"
        raise KeyError(msg)

    analysis_results = self._results[analysis]

    def _unwrap_value(val: Any) -> Any:
        """Call get_data() on result objects if they have it.

        Skips nibabel images (deprecated get_data()) - returns as-is.
        """
        import nibabel as nib

        # Skip nibabel images - they're already raw data
        if isinstance(val, nib.Nifti1Image):
            return val
        # Call get_data() on wrapper objects (VoxelMap, ParcelData, etc.)
        if hasattr(val, "get_data") and callable(val.get_data):
            return val.get_data()
        return val

    def _unwrap_dict(d: dict) -> dict:
        """Unwrap all values in a dict."""
        return {k: _unwrap_value(v) for k, v in d.items()}

    # If no pattern, return all results for this analysis
    if pattern is None:
        if unwrap:
            return _unwrap_dict(analysis_results)
        return analysis_results

    # Filter results by glob pattern
    matching = {}
    for key, value in analysis_results.items():
        if fnmatch(key, pattern):
            matching[key] = value

    if len(matching) == 0:
        # No matches found - provide suggestions
        available_keys = list(analysis_results.keys())
        suggestions = suggest_similar(pattern, available_keys)
        hint = format_suggestions(suggestions)
        msg = f"No results found in {analysis} matching pattern={pattern!r}."
        if hint:
            msg = f"{msg} {hint}"
        raise KeyError(msg)

    if len(matching) == 1:
        # Single match - return the value directly
        result = next(iter(matching.values()))
        if unwrap:
            return _unwrap_value(result)
        return result

    # Multiple matches - return as dict
    if unwrap:
        return _unwrap_dict(matching)
    return matching

get_volume_mm3()

Calculate mask volume in cubic millimeters.

Returns:

Type Description
float

Total mask volume (sum of non-zero voxels * voxel volume).

Examples:

>>> volume = mask_data.get_volume_mm3()
>>> print(f"Mask volume: {volume:.2f} mm³")
Source code in src/lacuna/core/subject_data.py
def get_volume_mm3(self) -> float:
    """
    Calculate mask volume in cubic millimeters.

    Returns
    -------
    float
        Total mask volume (sum of non-zero voxels * voxel volume).

    Examples
    --------
    >>> volume = mask_data.get_volume_mm3()
    >>> print(f"Mask volume: {volume:.2f} mm³")
    """
    mask_data = self._mask_img.get_fdata()
    num_voxels = np.sum(mask_data > 0)

    # Calculate voxel volume from affine
    voxel_dims = np.abs(np.diag(self._affine[:3, :3]))
    voxel_volume_mm3 = np.prod(voxel_dims)

    return float(num_voxels * voxel_volume_mm3)

to_dict()

Serialize to JSON-compatible dictionary (excludes image data).

Returns:

Type Description
dict

Metadata, provenance, and results (no NIfTI arrays).

Examples:

>>> data = mask_data.to_dict()
>>> import json
>>> json.dumps(data)  # Should succeed
Source code in src/lacuna/core/subject_data.py
def to_dict(self) -> dict[str, Any]:
    """
    Serialize to JSON-compatible dictionary (excludes image data).

    Returns
    -------
    dict
        Metadata, provenance, and results (no NIfTI arrays).

    Examples
    --------
    >>> data = mask_data.to_dict()
    >>> import json
    >>> json.dumps(data)  # Should succeed
    """
    return {
        "metadata": copy.deepcopy(self._metadata),
        "provenance": copy.deepcopy(self._provenance),
        "results": copy.deepcopy(self._results),
        "coordinate_space": self._coordinate_space,
        "affine": self._affine.tolist(),  # Convert numpy to list for JSON
    }

validate()

Validate data integrity.

Checks that affine is invertible, image is 3D, and spatial properties are consistent.

Returns:

Type Description
bool

True if all checks pass.

Warns:

Type Description
UserWarning

If mask is empty or has suspicious properties.

Raises:

Type Description
ValidationError

If critical invariants violated.

Examples:

>>> mask_data.validate()
True
Source code in src/lacuna/core/subject_data.py
def validate(self) -> bool:
    """
    Validate data integrity.

    Checks that affine is invertible, image is 3D, and spatial properties
    are consistent.

    Returns
    -------
    bool
        True if all checks pass.

    Warns
    -----
    UserWarning
        If mask is empty or has suspicious properties.

    Raises
    ------
    ValidationError
        If critical invariants violated.

    Examples
    --------
    >>> mask_data.validate()
    True
    """
    # Validate images
    validate_nifti_image(self._mask_img, require_3d=True)

    # Validate affine
    validate_affine(self._affine)

    # Note: Empty mask check is performed at __init__ time and raises EmptyMaskError

    return True

SurfaceMesh dataclass

Bases: DataContainer

Result container for surface-based data.

Attributes:

Name Type Description
name str

Name/identifier for this result

vertices ndarray

Vertex coordinates (N x 3)

faces ndarray

Triangle faces (M x 3, indices into vertices)

vertex_data (ndarray, optional)

Per-vertex values (N,) - e.g., correlation, thickness

hemisphere (str, optional)

Hemisphere identifier ("L", "R", "both")

surface_type (str, optional)

Type of surface (e.g., "pial", "white", "inflated")

metadata dict

Additional metadata about the output

Source code in src/lacuna/core/data_types.py
@dataclass
class SurfaceMesh(DataContainer):
    """Result container for surface-based data.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    vertices : np.ndarray
        Vertex coordinates (N x 3)
    faces : np.ndarray
        Triangle faces (M x 3, indices into vertices)
    vertex_data : np.ndarray, optional
        Per-vertex values (N,) - e.g., correlation, thickness
    hemisphere : str, optional
        Hemisphere identifier ("L", "R", "both")
    surface_type : str, optional
        Type of surface (e.g., "pial", "white", "inflated")
    metadata : dict
        Additional metadata about the output
    """

    name: str
    vertices: np.ndarray
    faces: np.ndarray
    vertex_data: np.ndarray | None = None
    hemisphere: str | None = None
    surface_type: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize and validate surface data."""
        super().__init__(name=self.name, metadata=self.metadata)

        # Validate vertices
        if self.vertices.ndim != 2 or self.vertices.shape[1] != 3:
            raise ValueError(f"Vertices must be N x 3, got shape {self.vertices.shape}")

        # Validate faces
        if self.faces.ndim != 2 or self.faces.shape[1] != 3:
            raise ValueError(f"Faces must be M x 3, got shape {self.faces.shape}")

        # Validate vertex data if provided
        if self.vertex_data is not None:
            if self.vertex_data.shape[0] != self.vertices.shape[0]:
                raise ValueError(
                    f"Vertex data length ({self.vertex_data.shape[0]}) "
                    f"must match number of vertices ({self.vertices.shape[0]})"
                )

    def get_data(self) -> np.ndarray:
        """Get per-vertex data."""
        if self.vertex_data is None:
            raise ValueError("No vertex data available")
        return self.vertex_data

    def get_mesh(self) -> tuple[np.ndarray, np.ndarray]:
        """Get surface mesh."""
        return self.vertices, self.faces

    def summary(self) -> str:
        """Get a summary description of this result."""
        n_verts = len(self.vertices)
        n_faces = len(self.faces)
        has_data = "with vertex data" if self.vertex_data is not None else "mesh only"
        hemi_info = f", hemisphere={self.hemisphere}" if self.hemisphere else ""
        type_info = f", type={self.surface_type}" if self.surface_type else ""
        return f"{self.name}: {n_verts} vertices, {n_faces} faces, {has_data}{hemi_info}{type_info}"

    def __repr__(self) -> str:
        """Return string representation."""
        return (
            f"SurfaceMesh("
            f"name='{self.name}', "
            f"n_vertices={len(self.vertices)}, "
            f"n_faces={len(self.faces)})"
        )

__post_init__()

Initialize and validate surface data.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize and validate surface data."""
    super().__init__(name=self.name, metadata=self.metadata)

    # Validate vertices
    if self.vertices.ndim != 2 or self.vertices.shape[1] != 3:
        raise ValueError(f"Vertices must be N x 3, got shape {self.vertices.shape}")

    # Validate faces
    if self.faces.ndim != 2 or self.faces.shape[1] != 3:
        raise ValueError(f"Faces must be M x 3, got shape {self.faces.shape}")

    # Validate vertex data if provided
    if self.vertex_data is not None:
        if self.vertex_data.shape[0] != self.vertices.shape[0]:
            raise ValueError(
                f"Vertex data length ({self.vertex_data.shape[0]}) "
                f"must match number of vertices ({self.vertices.shape[0]})"
            )

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    return (
        f"SurfaceMesh("
        f"name='{self.name}', "
        f"n_vertices={len(self.vertices)}, "
        f"n_faces={len(self.faces)})"
    )

get_data()

Get per-vertex data.

Source code in src/lacuna/core/data_types.py
def get_data(self) -> np.ndarray:
    """Get per-vertex data."""
    if self.vertex_data is None:
        raise ValueError("No vertex data available")
    return self.vertex_data

get_mesh()

Get surface mesh.

Source code in src/lacuna/core/data_types.py
def get_mesh(self) -> tuple[np.ndarray, np.ndarray]:
    """Get surface mesh."""
    return self.vertices, self.faces

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    n_verts = len(self.vertices)
    n_faces = len(self.faces)
    has_data = "with vertex data" if self.vertex_data is not None else "mesh only"
    hemi_info = f", hemisphere={self.hemisphere}" if self.hemisphere else ""
    type_info = f", type={self.surface_type}" if self.surface_type else ""
    return f"{self.name}: {n_verts} vertices, {n_faces} faces, {has_data}{hemi_info}{type_info}"

Tractogram dataclass

Bases: DataContainer

Result container for tractography streamlines.

Primary storage is path-based. Optionally stores streamlines in memory for immediate access. Use nibabel or dipy to load tractograms from disk.

Attributes:

Name Type Description
name str

Name/identifier for this result

tractogram_path Path

Path to saved tractogram file (.tck, .trk)

streamlines (list or ndarray, optional)

Optional in-memory streamlines, each as (N_points, 3) array

metadata dict

Additional metadata about the output

Source code in src/lacuna/core/data_types.py
@dataclass
class Tractogram(DataContainer):
    """Result container for tractography streamlines.

    Primary storage is path-based. Optionally stores streamlines in memory
    for immediate access. Use nibabel or dipy to load tractograms from disk.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    tractogram_path : Path
        Path to saved tractogram file (.tck, .trk)
    streamlines : list or np.ndarray, optional
        Optional in-memory streamlines, each as (N_points, 3) array
    metadata : dict
        Additional metadata about the output
    """

    name: str
    tractogram_path: Path
    streamlines: list[np.ndarray] | np.ndarray | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize and validate tractogram data."""
        super().__init__(name=self.name, metadata=self.metadata)

        # Path is required
        if self.tractogram_path is None:
            raise ValueError("tractogram_path is required")

    def get_data(self) -> list[np.ndarray] | np.ndarray | Path:
        """Get tractogram data.

        Returns
        -------
        streamlines or path
            Returns in-memory streamlines if available, otherwise returns path.
            Use nibabel.streamlines.load() to load from path.

        Examples
        --------
        >>> result = Tractogram(name="tracts", tractogram_path=Path("tracts.tck"))
        >>> data = result.get_data()  # Returns Path
        >>> # Load with nibabel:
        >>> import nibabel as nib
        >>> tractogram = nib.streamlines.load(str(data))
        """
        if self.streamlines is not None:
            return self.streamlines
        return self.tractogram_path

    def summary(self) -> str:
        """Get a summary description of this result."""
        storage = "in-memory" if self.streamlines is not None else "on-disk"
        return f"{self.name}: {storage}, path={self.tractogram_path.name}"

    def __repr__(self) -> str:
        """Return string representation."""
        in_mem = self.streamlines is not None
        return (
            f"Tractogram("
            f"name='{self.name}', "
            f"path='{self.tractogram_path.name}', "
            f"in_memory={in_mem})"
        )

    def save(self, output_path: Path) -> Path:
        """Save tractogram to disk by copying the source file.

        Parameters
        ----------
        output_path : Path
            Destination file path.

        Returns
        -------
        Path
            Path to the saved file.
        """
        import shutil

        output_path.parent.mkdir(parents=True, exist_ok=True)

        if self.tractogram_path.exists():
            shutil.copy2(self.tractogram_path, output_path)
        else:
            raise FileNotFoundError(
                f"Cannot save tractogram: source file '{self.tractogram_path}' "
                f"no longer exists."
            )
        return output_path

__post_init__()

Initialize and validate tractogram data.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize and validate tractogram data."""
    super().__init__(name=self.name, metadata=self.metadata)

    # Path is required
    if self.tractogram_path is None:
        raise ValueError("tractogram_path is required")

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    in_mem = self.streamlines is not None
    return (
        f"Tractogram("
        f"name='{self.name}', "
        f"path='{self.tractogram_path.name}', "
        f"in_memory={in_mem})"
    )

get_data()

Get tractogram data.

Returns:

Type Description
streamlines or path

Returns in-memory streamlines if available, otherwise returns path. Use nibabel.streamlines.load() to load from path.

Examples:

>>> result = Tractogram(name="tracts", tractogram_path=Path("tracts.tck"))
>>> data = result.get_data()  # Returns Path
>>> # Load with nibabel:
>>> import nibabel as nib
>>> tractogram = nib.streamlines.load(str(data))
Source code in src/lacuna/core/data_types.py
def get_data(self) -> list[np.ndarray] | np.ndarray | Path:
    """Get tractogram data.

    Returns
    -------
    streamlines or path
        Returns in-memory streamlines if available, otherwise returns path.
        Use nibabel.streamlines.load() to load from path.

    Examples
    --------
    >>> result = Tractogram(name="tracts", tractogram_path=Path("tracts.tck"))
    >>> data = result.get_data()  # Returns Path
    >>> # Load with nibabel:
    >>> import nibabel as nib
    >>> tractogram = nib.streamlines.load(str(data))
    """
    if self.streamlines is not None:
        return self.streamlines
    return self.tractogram_path

save(output_path)

Save tractogram to disk by copying the source file.

Parameters:

Name Type Description Default
output_path Path

Destination file path.

required

Returns:

Type Description
Path

Path to the saved file.

Source code in src/lacuna/core/data_types.py
def save(self, output_path: Path) -> Path:
    """Save tractogram to disk by copying the source file.

    Parameters
    ----------
    output_path : Path
        Destination file path.

    Returns
    -------
    Path
        Path to the saved file.
    """
    import shutil

    output_path.parent.mkdir(parents=True, exist_ok=True)

    if self.tractogram_path.exists():
        shutil.copy2(self.tractogram_path, output_path)
    else:
        raise FileNotFoundError(
            f"Cannot save tractogram: source file '{self.tractogram_path}' "
            f"no longer exists."
        )
    return output_path

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    storage = "in-memory" if self.streamlines is not None else "on-disk"
    return f"{self.name}: {storage}, path={self.tractogram_path.name}"

ValidationError

Bases: LacunaError, ValueError

Raised when data validation fails.

Source code in src/lacuna/core/exceptions.py
class ValidationError(LacunaError, ValueError):
    """Raised when data validation fails."""

    pass

VoxelMap dataclass

Bases: DataContainer

Result container for voxel-level brain maps.

This class stores voxel-level analysis outputs (e.g., functional connectivity maps, structural disconnection maps) in their native computation space.

Attributes:

Name Type Description
name str

Name/identifier for this result

data Nifti1Image

Brain map in its computation space

space str

Coordinate space identifier (e.g., 'MNI152NLin6Asym')

resolution float

Resolution in mm (e.g., 1.0, 2.0)

metadata dict

Additional metadata about the output

Examples:

>>> import nibabel as nib
>>> import numpy as np
>>> # Create a sample brain map
>>> data = np.random.randn(91, 109, 91)
>>> img = nib.Nifti1Image(data, np.eye(4) * 2)
>>> voxel_map = VoxelMap(
...     name="functional_connectivity",
...     data=img,
...     space="MNI152NLin6Asym",
...     resolution=2.0,
...     metadata={"seed": "PCC"}
... )
>>> print(voxel_map.summary())
functional_connectivity: (91, 109, 91) voxels, space=MNI152NLin6Asym, resolution=2.0mm
Source code in src/lacuna/core/data_types.py
@dataclass
class VoxelMap(DataContainer):
    """Result container for voxel-level brain maps.

    This class stores voxel-level analysis outputs (e.g., functional connectivity maps,
    structural disconnection maps) in their native computation space.

    Attributes
    ----------
    name : str
        Name/identifier for this result
    data : nib.Nifti1Image
        Brain map in its computation space
    space : str
        Coordinate space identifier (e.g., 'MNI152NLin6Asym')
    resolution : float
        Resolution in mm (e.g., 1.0, 2.0)
    metadata : dict
        Additional metadata about the output

    Examples
    --------
    >>> import nibabel as nib
    >>> import numpy as np
    >>> # Create a sample brain map
    >>> data = np.random.randn(91, 109, 91)
    >>> img = nib.Nifti1Image(data, np.eye(4) * 2)

    >>> voxel_map = VoxelMap(
    ...     name="functional_connectivity",
    ...     data=img,
    ...     space="MNI152NLin6Asym",
    ...     resolution=2.0,
    ...     metadata={"seed": "PCC"}
    ... )
    >>> print(voxel_map.summary())
    functional_connectivity: (91, 109, 91) voxels, space=MNI152NLin6Asym, resolution=2.0mm
    """

    name: str
    data: nib.Nifti1Image
    space: str
    resolution: float
    metadata: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        """Initialize base class."""
        super().__init__(name=self.name, metadata=self.metadata)

    def get_data(self) -> nib.Nifti1Image:
        """Get the brain map data."""
        return self.data

    def summary(self) -> str:
        """Get a summary description of this result."""
        shape = self.data.shape
        return f"{self.name}: {shape} voxels, space={self.space}, resolution={self.resolution}mm"

    def __repr__(self) -> str:
        """Return string representation."""
        return (
            f"VoxelMap("
            f"name='{self.name}', "
            f"shape={self.data.shape}, "
            f"space='{self.space}', "
            f"resolution={self.resolution})"
        )

__post_init__()

Initialize base class.

Source code in src/lacuna/core/data_types.py
def __post_init__(self):
    """Initialize base class."""
    super().__init__(name=self.name, metadata=self.metadata)

__repr__()

Return string representation.

Source code in src/lacuna/core/data_types.py
def __repr__(self) -> str:
    """Return string representation."""
    return (
        f"VoxelMap("
        f"name='{self.name}', "
        f"shape={self.data.shape}, "
        f"space='{self.space}', "
        f"resolution={self.resolution})"
    )

get_data()

Get the brain map data.

Source code in src/lacuna/core/data_types.py
def get_data(self) -> nib.Nifti1Image:
    """Get the brain map data."""
    return self.data

summary()

Get a summary description of this result.

Source code in src/lacuna/core/data_types.py
def summary(self) -> str:
    """Get a summary description of this result."""
    shape = self.data.shape
    return f"{self.name}: {shape} voxels, space={self.space}, resolution={self.resolution}mm"

analyze(data, *, steps, n_jobs=1, show_progress=True, verbose=False)

Run an analysis pipeline defined by a steps dictionary.

This function provides a flexible interface for running analysis workflows. The steps dictionary defines which analyses to run and their parameters. Analyses are executed in the order they appear in the dictionary.

Parameters:

Name Type Description Default
data SubjectData or list of SubjectData

Input data to analyze. Single subject or batch of subjects.

required
steps dict[str, dict | None]

Analysis steps to run. Keys are analysis class names (must match exactly), values are dicts of kwargs for that analysis, or None for defaults.

Available analyses (use list_analyses() to see all): - "RegionalDamage": Parcel-based lesion quantification - "FunctionalNetworkMapping": Functional lesion network mapping - "StructuralNetworkMapping": Structural lesion network mapping - "ParcelAggregation": Aggregate voxel maps to parcels

Required parameters vary by analysis: - FunctionalNetworkMapping requires "connectome_name" - StructuralNetworkMapping requires "connectome_name" - Others have sensible defaults

required
n_jobs int

Number of parallel jobs for batch processing. Use -1 for all CPUs.

1
show_progress bool

Show tqdm progress bar during batch processing.

True
verbose bool

If True, print progress messages. If False, run silently.

True

Returns:

Type Description
SubjectData or list of SubjectData

Analyzed data with results. If input was a list, returns a list. Results are stored in subject.results dict keyed by analysis name.

Raises:

Type Description
TypeError

If data is not SubjectData or list of SubjectData.

KeyError

If an analysis name in steps is not recognized.

ValueError

If required parameters are missing for an analysis.

Examples:

Basic usage with RegionalDamage defaults:

>>> from lacuna import analyze, SubjectData
>>> result = analyze(mask_data, steps={"RegionalDamage": None})

With functional network mapping (connectome_name is required):

>>> result = analyze(
...     mask_data,
...     steps={
...         "RegionalDamage": None,
...         "FunctionalNetworkMapping": {"connectome_name": "GSP1000"},
...     }
... )

With custom parameters:

>>> result = analyze(
...     mask_data,
...     steps={
...         "RegionalDamage": {"parcel_names": ["schaefer2018parcels100networks7"]},
...         "FunctionalNetworkMapping": {
...             "connectome_name": "GSP1000",
...             "method": "boes",
...         },
...     }
... )

Batch processing with parallelization:

>>> results = analyze(
...     [subject1, subject2, subject3],
...     steps={"FunctionalNetworkMapping": {"connectome_name": "GSP1000"}},
...     n_jobs=-1,
...     show_progress=True,
... )
Source code in src/lacuna/core/pipeline.py
def analyze(
    data: SubjectData | list[SubjectData],
    *,
    steps: dict[str, dict | None],
    n_jobs: int = 1,
    show_progress: bool = True,
    verbose: bool = False,
) -> SubjectData | list[SubjectData]:
    """
    Run an analysis pipeline defined by a steps dictionary.

    This function provides a flexible interface for running analysis workflows.
    The `steps` dictionary defines which analyses to run and their parameters.
    Analyses are executed in the order they appear in the dictionary.

    Parameters
    ----------
    data : SubjectData or list of SubjectData
        Input data to analyze. Single subject or batch of subjects.
    steps : dict[str, dict | None]
        Analysis steps to run. Keys are analysis class names (must match
        exactly), values are dicts of kwargs for that analysis, or None
        for defaults.

        Available analyses (use `list_analyses()` to see all):
        - "RegionalDamage": Parcel-based lesion quantification
        - "FunctionalNetworkMapping": Functional lesion network mapping
        - "StructuralNetworkMapping": Structural lesion network mapping
        - "ParcelAggregation": Aggregate voxel maps to parcels

        Required parameters vary by analysis:
        - FunctionalNetworkMapping requires "connectome_name"
        - StructuralNetworkMapping requires "connectome_name"
        - Others have sensible defaults
    n_jobs : int, default=1
        Number of parallel jobs for batch processing. Use -1 for all CPUs.
    show_progress : bool, default=True
        Show tqdm progress bar during batch processing.
    verbose : bool, default=True
        If True, print progress messages. If False, run silently.

    Returns
    -------
    SubjectData or list of SubjectData
        Analyzed data with results. If input was a list, returns a list.
        Results are stored in `subject.results` dict keyed by analysis name.

    Raises
    ------
    TypeError
        If data is not SubjectData or list of SubjectData.
    KeyError
        If an analysis name in steps is not recognized.
    ValueError
        If required parameters are missing for an analysis.

    Examples
    --------
    Basic usage with RegionalDamage defaults:

    >>> from lacuna import analyze, SubjectData
    >>> result = analyze(mask_data, steps={"RegionalDamage": None})

    With functional network mapping (connectome_name is required):

    >>> result = analyze(
    ...     mask_data,
    ...     steps={
    ...         "RegionalDamage": None,
    ...         "FunctionalNetworkMapping": {"connectome_name": "GSP1000"},
    ...     }
    ... )

    With custom parameters:

    >>> result = analyze(
    ...     mask_data,
    ...     steps={
    ...         "RegionalDamage": {"parcel_names": ["schaefer2018parcels100networks7"]},
    ...         "FunctionalNetworkMapping": {
    ...             "connectome_name": "GSP1000",
    ...             "method": "boes",
    ...         },
    ...     }
    ... )

    Batch processing with parallelization:

    >>> results = analyze(
    ...     [subject1, subject2, subject3],
    ...     steps={"FunctionalNetworkMapping": {"connectome_name": "GSP1000"}},
    ...     n_jobs=-1,
    ...     show_progress=True,
    ... )
    """
    from lacuna.analysis import get_analysis, list_analyses

    # Validate steps is not empty
    if not steps:
        raise ValueError(
            "steps cannot be empty. Provide at least one analysis, e.g., "
            '{"RegionalDamage": None}'
        )

    # Get available analysis names for validation
    available_analyses = dict(list_analyses())

    # Build list of analysis instances
    analyses: list = []
    for analysis_name, kwargs in steps.items():
        # Strict validation: analysis must exist
        if analysis_name not in available_analyses:
            available_names = sorted(available_analyses.keys())
            raise KeyError(
                f"Unknown analysis: {analysis_name!r}. " f"Available analyses: {available_names}"
            )

        # Get the analysis class
        analysis_cls = get_analysis(analysis_name)

        # Handle None kwargs (use defaults)
        if kwargs is None:
            kwargs = {}
        else:
            # Make a copy to avoid mutating the input
            kwargs = kwargs.copy()

        # Add verbose if not specified
        if "verbose" not in kwargs:
            kwargs["verbose"] = verbose

        # Instantiate the analysis
        try:
            analysis = analysis_cls(**kwargs)
        except TypeError as e:
            raise ValueError(
                f"Invalid parameters for {analysis_name}: {e}. "
                f"Check required parameters for this analysis."
            ) from e

        analyses.append(analysis)

    # Build pipeline
    pipeline = Pipeline(name="analyze")
    for analysis in analyses:
        pipeline.add(analysis)

    # Helper function to run on single subject
    def run_single(subject: SubjectData) -> SubjectData:
        return pipeline.run(subject, verbose=verbose)

    # Handle batch vs single input
    if isinstance(data, list):
        # Validate all inputs upfront before any processing
        for i, item in enumerate(data):
            if not isinstance(item, SubjectData):
                raise TypeError(
                    f"Item {i} in data list has unsupported type: {type(item).__name__}. "
                    f"All items must be SubjectData instances."
                )

        if n_jobs == 1:
            # Sequential processing
            if show_progress:
                try:
                    from tqdm import tqdm

                    return [run_single(d) for d in tqdm(data, desc="Analyzing")]
                except ImportError:
                    return [run_single(d) for d in data]
            else:
                return [run_single(d) for d in data]
        else:
            # Parallel processing with joblib.
            # inner_max_num_threads=1 caps BLAS/OMP threads inside each worker
            # to avoid fork-after-import deadlocks on many-core nodes.
            from joblib import Parallel, delayed, parallel_backend

            with parallel_backend("loky", inner_max_num_threads=1):
                if show_progress:
                    try:
                        from tqdm import tqdm

                        results = Parallel(n_jobs=n_jobs)(
                            delayed(run_single)(d) for d in tqdm(data, desc="Analyzing")
                        )
                    except ImportError:
                        results = Parallel(n_jobs=n_jobs)(delayed(run_single)(d) for d in data)
                else:
                    results = Parallel(n_jobs=n_jobs)(delayed(run_single)(d) for d in data)
            return list(results)

    return run_single(data)

build_result_key(atlas, source, desc=None)

Build a BIDS-style result key from components.

Creates a structured key string in the format: atlas-{atlas}_source-{source}[_desc-{desc}]

The desc component is optional and automatically omitted when source is InputMask/SubjectData (the mask itself is the primary data, no additional description needed).

Parameters:

Name Type Description Default
atlas str

Atlas/parcellation name (e.g., "Schaefer100", "Tian_S4").

required
source str

Source analysis class name (e.g., "SubjectData", "FunctionalNetworkMapping"). Will be converted to appropriate source abbreviation (e.g., SubjectData -> InputMask).

required
desc str

Description/key within the source (e.g., "rmap"). Ignored for InputMask source (automatically omitted).

None

Returns:

Type Description
str

BIDS-style result key.

Examples:

>>> build_result_key("Schaefer100", "FunctionalNetworkMapping", "rmap")
'atlas-Schaefer100_source-FunctionalNetworkMapping_desc-rmap'
>>> build_result_key("tian2020parcels16", "SubjectData", "maskimg")
'atlas-tian2020parcels16_source-InputMask'
>>> build_result_key("tian2020parcels16", "SubjectData")
'atlas-tian2020parcels16_source-InputMask'
>>> build_result_key("schaefer2018parcels200networks7", "RegionalDamage", "damagescore")
'atlas-schaefer2018parcels200networks7_source-RegionalDamage_desc-damagescore'
Source code in src/lacuna/core/keys.py
def build_result_key(atlas: str, source: str, desc: str | None = None) -> str:
    """
    Build a BIDS-style result key from components.

    Creates a structured key string in the format:
    ``atlas-{atlas}_source-{source}[_desc-{desc}]``

    The desc component is optional and automatically omitted when source is
    InputMask/SubjectData (the mask itself is the primary data, no additional
    description needed).

    Parameters
    ----------
    atlas : str
        Atlas/parcellation name (e.g., "Schaefer100", "Tian_S4").
    source : str
        Source analysis class name (e.g., "SubjectData", "FunctionalNetworkMapping").
        Will be converted to appropriate source abbreviation (e.g., SubjectData -> InputMask).
    desc : str, optional
        Description/key within the source (e.g., "rmap").
        Ignored for InputMask source (automatically omitted).

    Returns
    -------
    str
        BIDS-style result key.

    Examples
    --------
    >>> build_result_key("Schaefer100", "FunctionalNetworkMapping", "rmap")
    'atlas-Schaefer100_source-FunctionalNetworkMapping_desc-rmap'

    >>> build_result_key("tian2020parcels16", "SubjectData", "maskimg")
    'atlas-tian2020parcels16_source-InputMask'

    >>> build_result_key("tian2020parcels16", "SubjectData")
    'atlas-tian2020parcels16_source-InputMask'

    >>> build_result_key("schaefer2018parcels200networks7", "RegionalDamage", "damagescore")
    'atlas-schaefer2018parcels200networks7_source-RegionalDamage_desc-damagescore'
    """
    # Convert source class name to appropriate abbreviation
    source_abbrev = SOURCE_ABBREVIATIONS.get(source, source)

    # For InputMask source, always omit desc (the mask itself is the data)
    if source_abbrev == "InputMask":
        return f"atlas-{atlas}_source-{source_abbrev}"

    if desc:
        return f"atlas-{atlas}_source-{source_abbrev}_desc-{desc}"
    return f"atlas-{atlas}_source-{source_abbrev}"

check_spatial_match(img1, img2, check_shape=True, check_affine=True, atol=0.001)

Check if two images have matching spatial properties.

Parameters:

Name Type Description Default
img1 Nifti1Image

First image.

required
img2 Nifti1Image

Second image.

required
check_shape bool

Check if shapes match.

True
check_affine bool

Check if affines match (within tolerance).

True
atol float

Absolute tolerance for affine comparison (in mm).

1e-3

Returns:

Type Description
bool

True if images match spatially.

Raises:

Type Description
SpatialMismatchError

If spatial properties don't match.

Examples:

>>> import nibabel as nib
>>> lesion = nib.load("lesion.nii.gz")
>>> anat = nib.load("anatomical.nii.gz")
>>> check_spatial_match(lesion, anat)
Source code in src/lacuna/core/validation.py
def check_spatial_match(
    img1: nib.Nifti1Image,
    img2: nib.Nifti1Image,
    check_shape: bool = True,
    check_affine: bool = True,
    atol: float = 1e-3,
) -> bool:
    """
    Check if two images have matching spatial properties.

    Parameters
    ----------
    img1 : nibabel.Nifti1Image
        First image.
    img2 : nibabel.Nifti1Image
        Second image.
    check_shape : bool, default=True
        Check if shapes match.
    check_affine : bool, default=True
        Check if affines match (within tolerance).
    atol : float, default=1e-3
        Absolute tolerance for affine comparison (in mm).

    Returns
    -------
    bool
        True if images match spatially.

    Raises
    ------
    SpatialMismatchError
        If spatial properties don't match.

    Examples
    --------
    >>> import nibabel as nib
    >>> lesion = nib.load("lesion.nii.gz")
    >>> anat = nib.load("anatomical.nii.gz")
    >>> check_spatial_match(lesion, anat)
    """
    # Check shapes
    if check_shape:
        if img1.shape != img2.shape:
            raise SpatialMismatchError(f"Image shapes don't match: {img1.shape} vs {img2.shape}")

    # Check affines
    if check_affine:
        if not np.allclose(img1.affine, img2.affine, atol=atol):
            raise SpatialMismatchError(
                f"Affine matrices don't match (tolerance={atol} mm).\n"
                f"Image 1:\n{img1.affine}\n"
                f"Image 2:\n{img2.affine}"
            )

    return True

create_provenance_record(function, parameters, version, output_space=None)

Create a provenance record for a transformation.

Parameters:

Name Type Description Default
function str

Fully qualified function name (e.g., 'lacuna.analysis.RegionalDamage').

required
parameters dict

Function parameters (must be JSON-serializable).

required
version str

Package version at time of execution.

required
output_space str

Resulting coordinate space (if spatial operation).

None

Returns:

Type Description
dict

Provenance record with function, parameters, timestamp, version.

Raises:

Type Description
ProvenanceError

If parameters are not JSON-serializable.

Examples:

>>> record = create_provenance_record(
...     function="lacuna.analysis.RegionalDamage",
...     parameters={"parcel_names": ["schaefer2018parcels100networks7"]},
...     version="0.1.0",
... )
>>> record['function']
'lacuna.analysis.RegionalDamage'
Source code in src/lacuna/core/provenance.py
def create_provenance_record(
    function: str,
    parameters: dict[str, Any],
    version: str,
    output_space: str | None = None,
) -> dict[str, Any]:
    """
    Create a provenance record for a transformation.

    Parameters
    ----------
    function : str
        Fully qualified function name (e.g., 'lacuna.analysis.RegionalDamage').
    parameters : dict
        Function parameters (must be JSON-serializable).
    version : str
        Package version at time of execution.
    output_space : str, optional
        Resulting coordinate space (if spatial operation).

    Returns
    -------
    dict
        Provenance record with function, parameters, timestamp, version.

    Raises
    ------
    ProvenanceError
        If parameters are not JSON-serializable.

    Examples
    --------
    >>> record = create_provenance_record(
    ...     function="lacuna.analysis.RegionalDamage",
    ...     parameters={"parcel_names": ["schaefer2018parcels100networks7"]},
    ...     version="0.1.0",
    ... )
    >>> record['function']
    'lacuna.analysis.RegionalDamage'
    """
    # Validate parameters are serializable
    try:
        import json

        json.dumps(parameters)
    except (TypeError, ValueError) as e:
        raise ProvenanceError(f"Parameters must be JSON-serializable, got error: {e}") from e

    # Create record
    record = {
        "function": function,
        "parameters": parameters,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "version": version,
    }

    if output_space is not None:
        record["output_space"] = output_space

    return record

ensure_ras_plus(img)

Ensure image is in RAS+ orientation (nilearn standard).

Parameters:

Name Type Description Default
img Nifti1Image

Input image (any orientation).

required

Returns:

Type Description
Nifti1Image

Image reoriented to RAS+ (if necessary).

Notes

RAS+ means: - First axis: Right to Left - Second axis: Anterior to Posterior - Third axis: Superior to Inferior

Examples:

>>> import nibabel as nib
>>> img = nib.load("lesion.nii.gz")
>>> img_ras = ensure_ras_plus(img)
Source code in src/lacuna/core/validation.py
def ensure_ras_plus(img: nib.Nifti1Image) -> nib.Nifti1Image:
    """
    Ensure image is in RAS+ orientation (nilearn standard).

    Parameters
    ----------
    img : nibabel.Nifti1Image
        Input image (any orientation).

    Returns
    -------
    nibabel.Nifti1Image
        Image reoriented to RAS+ (if necessary).

    Notes
    -----
    RAS+ means:
    - First axis: Right to Left
    - Second axis: Anterior to Posterior
    - Third axis: Superior to Inferior

    Examples
    --------
    >>> import nibabel as nib
    >>> img = nib.load("lesion.nii.gz")
    >>> img_ras = ensure_ras_plus(img)
    """
    # Get current orientation
    ornt = nib.io_orientation(img.affine)

    # RAS+ orientation
    ras_ornt = np.array([[0, 1], [1, 1], [2, 1]])

    # Calculate transformation
    transform = nib.orientations.ornt_transform(ornt, ras_ornt)

    # Apply if needed
    if not np.array_equal(transform, [[0, 1], [1, 1], [2, 1]]):
        img = img.as_reoriented(transform)

    return img

get_source_abbreviation(class_name)

Validate and return the source name for an analysis class.

This function validates that the class name is a known analysis type and returns the appropriate source abbreviation for result keys.

Parameters:

Name Type Description Default
class_name str

Analysis class name (e.g., "FunctionalNetworkMapping", "SubjectData").

required

Returns:

Type Description
str

The source abbreviation for use in result keys.

Raises:

Type Description
KeyError

If class_name is not a known analysis class.

Examples:

>>> get_source_abbreviation("FunctionalNetworkMapping")
'FunctionalNetworkMapping'
>>> get_source_abbreviation("SubjectData")
'InputMask'
Source code in src/lacuna/core/keys.py
def get_source_abbreviation(class_name: str) -> str:
    """
    Validate and return the source name for an analysis class.

    This function validates that the class name is a known analysis type
    and returns the appropriate source abbreviation for result keys.

    Parameters
    ----------
    class_name : str
        Analysis class name (e.g., "FunctionalNetworkMapping", "SubjectData").

    Returns
    -------
    str
        The source abbreviation for use in result keys.

    Raises
    ------
    KeyError
        If class_name is not a known analysis class.

    Examples
    --------
    >>> get_source_abbreviation("FunctionalNetworkMapping")
    'FunctionalNetworkMapping'
    >>> get_source_abbreviation("SubjectData")
    'InputMask'
    """
    if class_name not in SOURCE_ABBREVIATIONS:
        available = ", ".join(sorted(SOURCE_ABBREVIATIONS.keys()))
        raise KeyError(f"Unknown analysis class '{class_name}'. " f"Known classes: {available}")
    return SOURCE_ABBREVIATIONS[class_name]

merge_provenance(base_provenance, new_provenance)

Merge two provenance lists.

Parameters:

Name Type Description Default
base_provenance list

Base provenance history.

required
new_provenance list

New provenance to append.

required

Returns:

Type Description
list

Merged provenance list (ordered chronologically).

Source code in src/lacuna/core/provenance.py
def merge_provenance(base_provenance: list, new_provenance: list) -> list:
    """
    Merge two provenance lists.

    Parameters
    ----------
    base_provenance : list
        Base provenance history.
    new_provenance : list
        New provenance to append.

    Returns
    -------
    list
        Merged provenance list (ordered chronologically).
    """
    merged = list(base_provenance)  # Copy to avoid mutation
    merged.extend(new_provenance)
    return merged

parse_result_key(key)

Parse a BIDS-style result key into its components.

Extracts key-value pairs from a structured key string in the format: atlas-{atlas}_source-{source}[_desc-{desc}]

Parameters:

Name Type Description Default
key str

BIDS-style result key to parse.

required

Returns:

Type Description
dict[str, str]

Dictionary with parsed components. Keys are "atlas", "source", "desc". Missing components will not be present in the returned dict.

Raises:

Type Description
ValueError

If key is empty or has invalid format.

Examples:

>>> parse_result_key("atlas-Schaefer100_source-FunctionalNetworkMapping_desc-rmap")
{'atlas': 'Schaefer100', 'source': 'FunctionalNetworkMapping', 'desc': 'rmap'}
>>> parse_result_key("atlas-Tian_S4_source-InputMask")
{'atlas': 'Tian_S4', 'source': 'InputMask'}
Source code in src/lacuna/core/keys.py
def parse_result_key(key: str) -> dict[str, str]:
    """
    Parse a BIDS-style result key into its components.

    Extracts key-value pairs from a structured key string in the format:
    ``atlas-{atlas}_source-{source}[_desc-{desc}]``

    Parameters
    ----------
    key : str
        BIDS-style result key to parse.

    Returns
    -------
    dict[str, str]
        Dictionary with parsed components. Keys are "atlas", "source", "desc".
        Missing components will not be present in the returned dict.

    Raises
    ------
    ValueError
        If key is empty or has invalid format.

    Examples
    --------
    >>> parse_result_key("atlas-Schaefer100_source-FunctionalNetworkMapping_desc-rmap")
    {'atlas': 'Schaefer100', 'source': 'FunctionalNetworkMapping', 'desc': 'rmap'}

    >>> parse_result_key("atlas-Tian_S4_source-InputMask")
    {'atlas': 'Tian_S4', 'source': 'InputMask'}
    """
    if not key:
        raise ValueError("Result key cannot be empty")

    parts: dict[str, str] = {}

    # Split by underscore-prefixed keys (atlas-, source-, desc-)
    # Handle values that contain underscores by only splitting on known prefixes
    segments = key.split("_")
    current_key: str | None = None
    current_value_parts: list[str] = []

    for segment in segments:
        # Check if this starts a new key
        if segment.startswith("atlas-"):
            if current_key is not None:
                parts[current_key] = "_".join(current_value_parts)
            current_key = "atlas"
            current_value_parts = [segment[6:]]  # Remove "atlas-" prefix
        elif segment.startswith("parc-"):
            # Legacy support: treat parc- as atlas-
            if current_key is not None:
                parts[current_key] = "_".join(current_value_parts)
            current_key = "atlas"
            current_value_parts = [segment[5:]]  # Remove "parc-" prefix
        elif segment.startswith("source-"):
            if current_key is not None:
                parts[current_key] = "_".join(current_value_parts)
            current_key = "source"
            current_value_parts = [segment[7:]]  # Remove "source-" prefix
        elif segment.startswith("desc-"):
            if current_key is not None:
                parts[current_key] = "_".join(current_value_parts)
            current_key = "desc"
            current_value_parts = [segment[5:]]  # Remove "desc-" prefix
        else:
            # This is a continuation of the current value (contains underscore)
            if current_key is not None:
                current_value_parts.append(segment)
            else:
                raise ValueError(
                    f"Invalid result key format: '{key}'. "
                    "Expected format: atlas-{atlas}_source-{source}[_desc-{desc}]"
                )

    # Don't forget the last key-value pair
    if current_key is not None:
        parts[current_key] = "_".join(current_value_parts)

    return parts

validate_affine(affine)

Validate an affine transformation matrix.

Parameters:

Name Type Description Default
affine (ndarray, shape(4, 4))

Affine matrix to validate.

required

Raises:

Type Description
ValidationError

If affine is invalid.

Source code in src/lacuna/core/validation.py
def validate_affine(affine: np.ndarray) -> None:
    """
    Validate an affine transformation matrix.

    Parameters
    ----------
    affine : ndarray, shape (4, 4)
        Affine matrix to validate.

    Raises
    ------
    ValidationError
        If affine is invalid.
    """
    if not isinstance(affine, np.ndarray):
        raise ValidationError(f"Affine must be numpy array, got {type(affine)}")

    if affine.shape != (4, 4):
        raise ValidationError(f"Affine must be 4x4, got shape {affine.shape}")

    if not np.all(np.isfinite(affine)):
        raise ValidationError("Affine contains NaN or inf values")

    # Check if invertible
    try:
        np.linalg.inv(affine)
    except np.linalg.LinAlgError as e:
        raise ValidationError("Affine matrix is not invertible") from e

    # Check last row is [0, 0, 0, 1]
    if not np.allclose(affine[3, :], [0, 0, 0, 1]):
        raise ValidationError(f"Affine last row must be [0, 0, 0, 1], got {affine[3, :]}")

validate_nifti_image(img, require_3d=True, check_affine=True)

Validate NIfTI image properties.

Parameters:

Name Type Description Default
img Nifti1Image

Image to validate.

required
require_3d bool

Raise error if image is not 3D.

True
check_affine bool

Verify affine matrix is invertible.

True

Raises:

Type Description
ValidationError

If validation fails.

Examples:

>>> import nibabel as nib
>>> img = nib.load("lesion.nii.gz")
>>> validate_nifti_image(img)
Source code in src/lacuna/core/validation.py
def validate_nifti_image(
    img: nib.Nifti1Image, require_3d: bool = True, check_affine: bool = True
) -> None:
    """
    Validate NIfTI image properties.

    Parameters
    ----------
    img : nibabel.Nifti1Image
        Image to validate.
    require_3d : bool, default=True
        Raise error if image is not 3D.
    check_affine : bool, default=True
        Verify affine matrix is invertible.

    Raises
    ------
    ValidationError
        If validation fails.

    Examples
    --------
    >>> import nibabel as nib
    >>> img = nib.load("lesion.nii.gz")
    >>> validate_nifti_image(img)
    """
    if not isinstance(img, nib.Nifti1Image):
        raise ValidationError(f"Expected Nifti1Image, got {type(img)}")

    # Check dimensionality
    shape = img.shape
    if require_3d:
        if len(shape) != 3:
            raise ValidationError(f"Image must be 3D, got shape {shape} ({len(shape)} dimensions)")

    # Check affine matrix
    if check_affine:
        affine = img.affine
        try:
            np.linalg.inv(affine)
        except np.linalg.LinAlgError as e:
            raise ValidationError("Affine matrix is not invertible") from e

        # Check for NaN or inf values
        if not np.all(np.isfinite(affine)):
            raise ValidationError("Affine matrix contains NaN or inf values")

validate_provenance_record(record)

Validate a provenance record structure.

Parameters:

Name Type Description Default
record dict

Provenance record to validate.

required

Raises:

Type Description
ProvenanceError

If record structure is invalid.

Source code in src/lacuna/core/provenance.py
def validate_provenance_record(record: dict[str, Any]) -> None:
    """
    Validate a provenance record structure.

    Parameters
    ----------
    record : dict
        Provenance record to validate.

    Raises
    ------
    ProvenanceError
        If record structure is invalid.
    """
    required_fields = ["function", "parameters", "timestamp", "version"]

    for field_name in required_fields:
        if field_name not in record:
            raise ProvenanceError(f"Provenance record missing required field: {field_name}")

    # Validate timestamp format
    try:
        datetime.fromisoformat(record["timestamp"])
    except (ValueError, TypeError) as e:
        raise ProvenanceError(f"Invalid timestamp format: {record['timestamp']}") from e

    # Validate parameters are dict
    if not isinstance(record["parameters"], dict):
        raise ProvenanceError(f"Parameters must be dict, got {type(record['parameters'])}")