Skip to content

batch

lacuna.batch

Batch processing infrastructure for multi-subject analysis pipelines.

This module provides efficient batch processing capabilities with automatic strategy selection based on analysis characteristics and computational resources.

Key Components
  • batch_process(): Main entry point for batch processing
  • BatchStrategy: Abstract base class for processing strategies
  • ParallelStrategy: Multi-core parallelization using joblib
  • SequentialStrategy: Sequential processing for analyses with internal parallelization
  • VectorizedStrategy: Batch matrix operations for connectome analyses
  • select_strategy(): Automatic strategy selection logic

Examples:

>>> from lacuna.batch import batch_process
>>> from lacuna.analysis import RegionalDamage
>>> from lacuna.io import load_bids_dataset
>>>
>>> # Load subjects
>>> dataset = load_bids_dataset("path/to/bids")
>>> lesions = list(dataset.values())
>>>
>>> # Batch process with automatic optimization
>>> analysis = RegionalDamage()
>>> results = batch_process(lesions, analysis)
>>>
>>> # Process with specific number of cores
>>> results = batch_process(lesions, analysis, n_jobs=4)

BatchStrategy

Bases: ABC

Abstract base class for batch processing strategies.

Each strategy implements a different approach to processing multiple subjects: - Parallel: Uses multiprocessing for independent analyses - Vectorized: Stacks data for batch matrix operations

Parameters:

Name Type Description Default
n_jobs int

Number of parallel jobs. -1 uses all available cores. Only relevant for ParallelStrategy.

-1
Source code in src/lacuna/batch/strategies.py
class BatchStrategy(ABC):
    """
    Abstract base class for batch processing strategies.

    Each strategy implements a different approach to processing multiple subjects:
    - Parallel: Uses multiprocessing for independent analyses
    - Vectorized: Stacks data for batch matrix operations

    Parameters
    ----------
    n_jobs : int, default=-1
        Number of parallel jobs. -1 uses all available cores.
        Only relevant for ParallelStrategy.
    """

    def __init__(self, n_jobs: int = -1):
        self.n_jobs = n_jobs

    @abstractmethod
    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute analysis on all lesions using this strategy.

        Parameters
        ----------
        inputs : list[SubjectData]
            List of lesions to process
        analysis : BaseAnalysis
            Analysis instance to apply to each lesion
        progress_callback : callable or None
            Optional callback function to report progress.
            Called with current index after each subject completes.

        Returns
        -------
        list[SubjectData]
            List of processed SubjectData objects with results added

        Raises
        ------
        RuntimeError
            If execution fails
        """
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        """Strategy name for logging and debugging."""
        pass

name abstractmethod property

Strategy name for logging and debugging.

execute(inputs, analysis, progress_callback=None) abstractmethod

Execute analysis on all lesions using this strategy.

Parameters:

Name Type Description Default
inputs list[SubjectData]

List of lesions to process

required
analysis BaseAnalysis

Analysis instance to apply to each lesion

required
progress_callback callable or None

Optional callback function to report progress. Called with current index after each subject completes.

None

Returns:

Type Description
list[SubjectData]

List of processed SubjectData objects with results added

Raises:

Type Description
RuntimeError

If execution fails

Source code in src/lacuna/batch/strategies.py
@abstractmethod
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute analysis on all lesions using this strategy.

    Parameters
    ----------
    inputs : list[SubjectData]
        List of lesions to process
    analysis : BaseAnalysis
        Analysis instance to apply to each lesion
    progress_callback : callable or None
        Optional callback function to report progress.
        Called with current index after each subject completes.

    Returns
    -------
    list[SubjectData]
        List of processed SubjectData objects with results added

    Raises
    ------
    RuntimeError
        If execution fails
    """
    pass

ParallelStrategy

Bases: BatchStrategy

Parallel batch processing using joblib multiprocessing.

Best for independent per-subject analyses (RegionalDamage, ParcelAggregation) Speedup on multi-core systems (proportional to available cores) Low memory overhead

This strategy processes each subject independently using joblib.Parallel. The backend can be configured to handle different environments: - 'loky' (default): Robust multiprocessing for standalone scripts - 'threading': Thread-based parallelism for Jupyter notebooks - 'multiprocessing': Standard multiprocessing (less robust than loky)

Parameters:

Name Type Description Default
n_jobs int

Number of parallel jobs: - -1: Use all available CPU cores - 1: Sequential processing (useful for debugging) - N: Use N parallel workers

-1
backend str

Joblib backend to use: - 'loky': Robust multiprocessing (best for scripts) - 'threading': Thread-based (use in Jupyter notebooks) - 'multiprocessing': Standard multiprocessing

'loky'

Examples:

>>> from lacuna.batch.strategies import ParallelStrategy
>>> from lacuna.analysis import RegionalDamage
>>>
>>> # For standalone scripts (default)
>>> strategy = ParallelStrategy(n_jobs=4)
>>> results = strategy.execute(lesions, RegionalDamage())
>>>
>>> # For Jupyter notebooks
>>> strategy = ParallelStrategy(n_jobs=4, backend='threading')
>>> results = strategy.execute(lesions, RegionalDamage())
Source code in src/lacuna/batch/strategies.py
class ParallelStrategy(BatchStrategy):
    """
    Parallel batch processing using joblib multiprocessing.

    Best for independent per-subject analyses (RegionalDamage, ParcelAggregation)
    Speedup on multi-core systems (proportional to available cores)
    Low memory overhead

    This strategy processes each subject independently using joblib.Parallel.
    The backend can be configured to handle different environments:
    - 'loky' (default): Robust multiprocessing for standalone scripts
    - 'threading': Thread-based parallelism for Jupyter notebooks
    - 'multiprocessing': Standard multiprocessing (less robust than loky)

    Parameters
    ----------
    n_jobs : int, default=-1
        Number of parallel jobs:
        - -1: Use all available CPU cores
        - 1: Sequential processing (useful for debugging)
        - N: Use N parallel workers
    backend : str, default='loky'
        Joblib backend to use:
        - 'loky': Robust multiprocessing (best for scripts)
        - 'threading': Thread-based (use in Jupyter notebooks)
        - 'multiprocessing': Standard multiprocessing

    Examples
    --------
    >>> from lacuna.batch.strategies import ParallelStrategy
    >>> from lacuna.analysis import RegionalDamage
    >>>
    >>> # For standalone scripts (default)
    >>> strategy = ParallelStrategy(n_jobs=4)
    >>> results = strategy.execute(lesions, RegionalDamage())
    >>>
    >>> # For Jupyter notebooks
    >>> strategy = ParallelStrategy(n_jobs=4, backend='threading')
    >>> results = strategy.execute(lesions, RegionalDamage())
    """

    def __init__(self, n_jobs: int = -1, backend: str = "loky"):
        super().__init__(n_jobs)
        self.backend = backend

        # Resolve -1 to actual core count
        if self.n_jobs == -1:
            import os

            self.n_jobs = (
                len(os.sched_getaffinity(0))
                if hasattr(os, "sched_getaffinity")
                else os.cpu_count() or 1
            )

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute parallel batch processing.

        Processes subjects in parallel using joblib. Each subject is processed
        independently, and failures are caught and reported as warnings without
        stopping the entire batch.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply
        progress_callback : callable or None
            Progress reporting function

        Returns
        -------
        list[SubjectData]
            Successfully processed subjects (failures are filtered out)
        """
        # Execute in parallel
        if self.n_jobs == 1:
            # Sequential processing (useful for debugging)
            results = []
            for i, lesion in enumerate(inputs):
                result = _process_one_subject(lesion, i, analysis)
                results.append(result)
                if progress_callback:
                    progress_callback(i)
        else:
            # Parallel processing with joblib using user-specified backend.
            # Uses module-level _process_one_subject for pickle compatibility
            # with all backends including standard 'multiprocessing'.
            # For process-based backends, inner_max_num_threads=1 prevents
            # BLAS/OMP oversubscription and fork-after-import deadlocks on
            # many-core nodes. The threading backend rejects the kwarg, so we
            # only pass it where it applies.
            backend_kwargs: dict = {}
            if self.backend in ("loky", "multiprocessing"):
                backend_kwargs["inner_max_num_threads"] = 1
            with parallel_backend(self.backend, **backend_kwargs):
                results = Parallel(n_jobs=self.n_jobs)(
                    delayed(_process_one_subject)(lesion, i, analysis)
                    for i, lesion in enumerate(inputs)
                )
            # Update progress bar once for the entire batch (not per-item to avoid duplicates)
            if progress_callback:
                for _ in range(len(results)):
                    progress_callback(0)  # Index doesn't matter, just triggers update

        # Sort by original index and filter out failures
        results = sorted(results, key=lambda x: x[0])
        successful_results = [r[1] for r in results if r[1] is not None]

        # Warn if any subjects failed
        n_failed = len(inputs) - len(successful_results)
        if n_failed > 0:
            warnings.warn(
                f"{n_failed} out of {len(inputs)} subjects failed processing. "
                "Check warnings above for details.",
                RuntimeWarning,
                stacklevel=2,
            )

        return successful_results

    @property
    def name(self) -> str:
        return "parallel"

execute(inputs, analysis, progress_callback=None)

Execute parallel batch processing.

Processes subjects in parallel using joblib. Each subject is processed independently, and failures are caught and reported as warnings without stopping the entire batch.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply

required
progress_callback callable or None

Progress reporting function

None

Returns:

Type Description
list[SubjectData]

Successfully processed subjects (failures are filtered out)

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute parallel batch processing.

    Processes subjects in parallel using joblib. Each subject is processed
    independently, and failures are caught and reported as warnings without
    stopping the entire batch.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply
    progress_callback : callable or None
        Progress reporting function

    Returns
    -------
    list[SubjectData]
        Successfully processed subjects (failures are filtered out)
    """
    # Execute in parallel
    if self.n_jobs == 1:
        # Sequential processing (useful for debugging)
        results = []
        for i, lesion in enumerate(inputs):
            result = _process_one_subject(lesion, i, analysis)
            results.append(result)
            if progress_callback:
                progress_callback(i)
    else:
        # Parallel processing with joblib using user-specified backend.
        # Uses module-level _process_one_subject for pickle compatibility
        # with all backends including standard 'multiprocessing'.
        # For process-based backends, inner_max_num_threads=1 prevents
        # BLAS/OMP oversubscription and fork-after-import deadlocks on
        # many-core nodes. The threading backend rejects the kwarg, so we
        # only pass it where it applies.
        backend_kwargs: dict = {}
        if self.backend in ("loky", "multiprocessing"):
            backend_kwargs["inner_max_num_threads"] = 1
        with parallel_backend(self.backend, **backend_kwargs):
            results = Parallel(n_jobs=self.n_jobs)(
                delayed(_process_one_subject)(lesion, i, analysis)
                for i, lesion in enumerate(inputs)
            )
        # Update progress bar once for the entire batch (not per-item to avoid duplicates)
        if progress_callback:
            for _ in range(len(results)):
                progress_callback(0)  # Index doesn't matter, just triggers update

    # Sort by original index and filter out failures
    results = sorted(results, key=lambda x: x[0])
    successful_results = [r[1] for r in results if r[1] is not None]

    # Warn if any subjects failed
    n_failed = len(inputs) - len(successful_results)
    if n_failed > 0:
        warnings.warn(
            f"{n_failed} out of {len(inputs)} subjects failed processing. "
            "Check warnings above for details.",
            RuntimeWarning,
            stacklevel=2,
        )

    return successful_results

SequentialStrategy

Bases: BatchStrategy

Sequential batch processing for analyses that should not run in parallel.

Best for analyses with external dependencies that don't benefit from parallel execution (e.g., StructuralNetworkMapping with MRtrix3).

This strategy processes each subject one at a time, regardless of the n_jobs parameter. It's designed for analyses where: - External tools handle their own internal parallelization (e.g., tckedit -nthreads) - Running multiple instances in parallel causes resource contention - Memory-mapped files or shared resources would conflict

Parameters:

Name Type Description Default
n_jobs int

Not used for sequential processing. Kept for interface compatibility. The analysis itself may use internal parallelization (e.g., MRtrix3's -nthreads).

-1

Examples:

>>> from lacuna.batch.strategies import SequentialStrategy
>>> from lacuna.analysis import StructuralNetworkMapping
>>>
>>> strategy = SequentialStrategy()
>>> results = strategy.execute(masks, StructuralNetworkMapping(...))
Source code in src/lacuna/batch/strategies.py
class SequentialStrategy(BatchStrategy):
    """
    Sequential batch processing for analyses that should not run in parallel.

    Best for analyses with external dependencies that don't benefit from
    parallel execution (e.g., StructuralNetworkMapping with MRtrix3).

    This strategy processes each subject one at a time, regardless of the
    n_jobs parameter. It's designed for analyses where:
    - External tools handle their own internal parallelization (e.g., tckedit -nthreads)
    - Running multiple instances in parallel causes resource contention
    - Memory-mapped files or shared resources would conflict

    Parameters
    ----------
    n_jobs : int, default=-1
        Not used for sequential processing. Kept for interface compatibility.
        The analysis itself may use internal parallelization (e.g., MRtrix3's -nthreads).

    Examples
    --------
    >>> from lacuna.batch.strategies import SequentialStrategy
    >>> from lacuna.analysis import StructuralNetworkMapping
    >>>
    >>> strategy = SequentialStrategy()
    >>> results = strategy.execute(masks, StructuralNetworkMapping(...))
    """

    def __init__(self, n_jobs: int = -1):
        super().__init__(n_jobs)

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute sequential batch processing.

        Processes subjects one at a time. Failures are caught and reported
        as warnings without stopping the entire batch.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply
        progress_callback : callable or None
            Progress reporting function

        Returns
        -------
        list[SubjectData]
            Successfully processed subjects (failures are filtered out)
        """
        results = []
        for i, lesion in enumerate(inputs):
            idx, result = _process_one_subject(lesion, i, analysis)
            results.append((idx, result))
            if progress_callback:
                progress_callback(i)

        # Sort by original index and filter out failures
        results = sorted(results, key=lambda x: x[0])
        successful_results = [r[1] for r in results if r[1] is not None]

        # Warn if any subjects failed
        n_failed = len(inputs) - len(successful_results)
        if n_failed > 0:
            warnings.warn(
                f"{n_failed} out of {len(inputs)} subjects failed processing. "
                "Check warnings above for details.",
                RuntimeWarning,
                stacklevel=2,
            )

        return successful_results

    @property
    def name(self) -> str:
        return "sequential"

execute(inputs, analysis, progress_callback=None)

Execute sequential batch processing.

Processes subjects one at a time. Failures are caught and reported as warnings without stopping the entire batch.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply

required
progress_callback callable or None

Progress reporting function

None

Returns:

Type Description
list[SubjectData]

Successfully processed subjects (failures are filtered out)

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute sequential batch processing.

    Processes subjects one at a time. Failures are caught and reported
    as warnings without stopping the entire batch.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply
    progress_callback : callable or None
        Progress reporting function

    Returns
    -------
    list[SubjectData]
        Successfully processed subjects (failures are filtered out)
    """
    results = []
    for i, lesion in enumerate(inputs):
        idx, result = _process_one_subject(lesion, i, analysis)
        results.append((idx, result))
        if progress_callback:
            progress_callback(i)

    # Sort by original index and filter out failures
    results = sorted(results, key=lambda x: x[0])
    successful_results = [r[1] for r in results if r[1] is not None]

    # Warn if any subjects failed
    n_failed = len(inputs) - len(successful_results)
    if n_failed > 0:
        warnings.warn(
            f"{n_failed} out of {len(inputs)} subjects failed processing. "
            "Check warnings above for details.",
            RuntimeWarning,
            stacklevel=2,
        )

    return successful_results

VectorizedStrategy

Bases: BatchStrategy

Vectorized batch processing using batched NumPy operations.

Best for matrix-based analyses (FunctionalNetworkMapping) Speedup via optimized BLAS operations and reduced overhead Moderate memory overhead (processes lesions in configurable batches)

This strategy leverages vectorized operations to process multiple lesions simultaneously through each connectome batch. Instead of: for lesion in lesions: for connectome_batch in batches: process(lesion, connectome_batch)

It does: for connectome_batch in batches: process_all_lesions_together(lesions_batch, connectome_batch)

This dramatically reduces overhead and enables efficient BLAS operations.

The analysis class must implement: run_batch(inputs: list[SubjectData]) -> list[SubjectData]

Parameters:

Name Type Description Default
n_jobs int

Not used for vectorized processing (uses BLAS parallelization instead). Kept for interface compatibility.

-1
lesion_batch_size int or None

Number of lesions to process together in memory. - None: Process all lesions together (maximum speed, high memory) - N: Process N lesions at a time (balanced speed/memory) Useful when processing hundreds of lesions.

None

Examples:

>>> from lacuna.batch.strategies import VectorizedStrategy
>>> from lacuna.analysis import FunctionalNetworkMapping
>>>
>>> # Process all lesions together (fastest)
>>> strategy = VectorizedStrategy()
>>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
>>>
>>> # Process 50 lesions at a time (memory-constrained)
>>> strategy = VectorizedStrategy(lesion_batch_size=50)
>>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
Source code in src/lacuna/batch/strategies.py
class VectorizedStrategy(BatchStrategy):
    """
    Vectorized batch processing using batched NumPy operations.

    Best for matrix-based analyses (FunctionalNetworkMapping)
    Speedup via optimized BLAS operations and reduced overhead
    Moderate memory overhead (processes lesions in configurable batches)

    This strategy leverages vectorized operations to process multiple lesions
    simultaneously through each connectome batch. Instead of:
        for lesion in lesions:
            for connectome_batch in batches:
                process(lesion, connectome_batch)

    It does:
        for connectome_batch in batches:
            process_all_lesions_together(lesions_batch, connectome_batch)

    This dramatically reduces overhead and enables efficient BLAS operations.

    The analysis class must implement:
        run_batch(inputs: list[SubjectData]) -> list[SubjectData]

    Parameters
    ----------
    n_jobs : int, default=-1
        Not used for vectorized processing (uses BLAS parallelization instead).
        Kept for interface compatibility.
    lesion_batch_size : int or None, default=None
        Number of lesions to process together in memory.
        - None: Process all lesions together (maximum speed, high memory)
        - N: Process N lesions at a time (balanced speed/memory)
        Useful when processing hundreds of lesions.

    Examples
    --------
    >>> from lacuna.batch.strategies import VectorizedStrategy
    >>> from lacuna.analysis import FunctionalNetworkMapping
    >>>
    >>> # Process all lesions together (fastest)
    >>> strategy = VectorizedStrategy()
    >>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
    >>>
    >>> # Process 50 lesions at a time (memory-constrained)
    >>> strategy = VectorizedStrategy(lesion_batch_size=50)
    >>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
    """

    def __init__(
        self,
        n_jobs: int = -1,
        lesion_batch_size: int | None = None,
        batch_result_callback: Callable[[list[SubjectData]], None] | None = None,
    ):
        super().__init__(n_jobs)
        self.lesion_batch_size = lesion_batch_size
        self.batch_result_callback = batch_result_callback

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute vectorized batch processing.

        Calls analysis.run_batch() which processes multiple lesions together
        using vectorized operations. Falls back to sequential processing if
        run_batch() is not implemented.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply (must implement run_batch method)
        progress_callback : callable or None
            Progress reporting function (called after each lesion batch)

        Returns
        -------
        list[SubjectData]
            Processed subjects

        Raises
        ------
        NotImplementedError
            If analysis doesn't implement run_batch()
        """
        # Check if analysis supports batch processing
        if not hasattr(analysis, "run_batch") or not callable(analysis.run_batch):
            raise NotImplementedError(
                f"Analysis {analysis.__class__.__name__} does not implement "
                f"run_batch() method required for vectorized strategy. "
                f"Please use parallel strategy instead or implement run_batch()."
            )

        # Process all lesions together if no batch size specified
        if self.lesion_batch_size is None:
            results = analysis.run_batch(inputs)

            # Call batch result callback if provided
            if self.batch_result_callback:
                self.batch_result_callback(results)

            # Update progress
            if progress_callback:
                for i in range(len(results)):
                    progress_callback(i)

            return results

        # Process lesions in batches
        all_results = []
        n_lesions = len(inputs)

        for batch_start in range(0, n_lesions, self.lesion_batch_size):
            batch_end = min(batch_start + self.lesion_batch_size, n_lesions)
            lesion_batch = inputs[batch_start:batch_end]

            # Process this batch
            batch_results = analysis.run_batch(lesion_batch)

            # Call batch result callback if provided (for immediate saving)
            if self.batch_result_callback:
                self.batch_result_callback(batch_results)

            all_results.extend(batch_results)

            # Update progress
            if progress_callback:
                for i in range(batch_start, batch_end):
                    progress_callback(i)

        return all_results

    @property
    def name(self) -> str:
        return "vectorized"

execute(inputs, analysis, progress_callback=None)

Execute vectorized batch processing.

Calls analysis.run_batch() which processes multiple lesions together using vectorized operations. Falls back to sequential processing if run_batch() is not implemented.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply (must implement run_batch method)

required
progress_callback callable or None

Progress reporting function (called after each lesion batch)

None

Returns:

Type Description
list[SubjectData]

Processed subjects

Raises:

Type Description
NotImplementedError

If analysis doesn't implement run_batch()

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute vectorized batch processing.

    Calls analysis.run_batch() which processes multiple lesions together
    using vectorized operations. Falls back to sequential processing if
    run_batch() is not implemented.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply (must implement run_batch method)
    progress_callback : callable or None
        Progress reporting function (called after each lesion batch)

    Returns
    -------
    list[SubjectData]
        Processed subjects

    Raises
    ------
    NotImplementedError
        If analysis doesn't implement run_batch()
    """
    # Check if analysis supports batch processing
    if not hasattr(analysis, "run_batch") or not callable(analysis.run_batch):
        raise NotImplementedError(
            f"Analysis {analysis.__class__.__name__} does not implement "
            f"run_batch() method required for vectorized strategy. "
            f"Please use parallel strategy instead or implement run_batch()."
        )

    # Process all lesions together if no batch size specified
    if self.lesion_batch_size is None:
        results = analysis.run_batch(inputs)

        # Call batch result callback if provided
        if self.batch_result_callback:
            self.batch_result_callback(results)

        # Update progress
        if progress_callback:
            for i in range(len(results)):
                progress_callback(i)

        return results

    # Process lesions in batches
    all_results = []
    n_lesions = len(inputs)

    for batch_start in range(0, n_lesions, self.lesion_batch_size):
        batch_end = min(batch_start + self.lesion_batch_size, n_lesions)
        lesion_batch = inputs[batch_start:batch_end]

        # Process this batch
        batch_results = analysis.run_batch(lesion_batch)

        # Call batch result callback if provided (for immediate saving)
        if self.batch_result_callback:
            self.batch_result_callback(batch_results)

        all_results.extend(batch_results)

        # Update progress
        if progress_callback:
            for i in range(batch_start, batch_end):
                progress_callback(i)

    return all_results

batch_process(inputs=None, analysis=None, n_jobs=-1, show_progress=True, strategy=None, backend='loky', lesion_batch_size=None, batch_result_callback=None, progress_desc=None)

Process multiple subjects through an analysis pipeline with automatic optimization.

This function automatically selects the optimal processing strategy based on the analysis type and available system resources. It provides progress monitoring and graceful error handling for individual subject failures.

Parameters:

Name Type Description Default
inputs list[SubjectData] or list[VoxelMap]

List of SubjectData or VoxelMap objects to process. All items must be of the same type (no mixing).

None
analysis BaseAnalysis

Analysis instance to apply to each input

None
n_jobs int

Number of parallel jobs: - -1: Use all available CPU cores - 1: Sequential processing (useful for debugging) - N: Use N parallel workers

-1
show_progress bool

Display progress bar during processing

True
strategy str or None

Force specific strategy: - None: Automatic selection based on analysis.batch_strategy - "parallel": Force parallel processing (joblib multiprocessing) - "sequential": Force sequential processing (one at a time) - "vectorized": Force vectorized processing (batch matrix operations)

None
backend str

Joblib backend for parallel processing:

'loky'
lesion_batch_size int or None

For vectorized strategy: number of lesions to process together in memory. - None: Process all lesions at once (fastest, high memory) - N: Process N lesions at a time (balanced speed/memory) Only applies when using vectorized strategy. Ignored for parallel strategy.

None
batch_result_callback callable or None

Callback function called after each lesion batch is processed. Signature: callback(batch_results: list[SubjectData]) -> None Use this to save results immediately and free memory. Example: batch_result_callback=lambda batch: [save(r) for r in batch] - 'loky': Robust multiprocessing (best for standalone scripts) - 'threading': Thread-based parallelism (use in Jupyter notebooks to avoid pickling issues) - 'multiprocessing': Standard multiprocessing Note: Only applies when using parallel processing (n_jobs > 1)

None

Returns:

Type Description
list[SubjectData]

List of processed SubjectData objects with results added. Subjects that failed processing are excluded (warnings are emitted).

Raises:

Type Description
ValueError

If inputs is empty or analysis is invalid

RuntimeError

If strategy selection or execution fails

Examples:

Basic usage with automatic optimization:

>>> from lacuna import batch_process
>>> from lacuna.analysis import RegionalDamage
>>> from lacuna.io import load_bids_dataset
>>>
>>> # Load subjects
>>> dataset = load_bids_dataset("path/to/bids")
>>> lesions = list(dataset.values())
>>>
>>> # Process with automatic strategy selection
>>> analysis = RegionalDamage()
>>> results = batch_process(lesions, analysis)
>>> print(f"Processed {len(results)} subjects")

Use in Jupyter notebooks (threading backend to avoid pickling issues):

>>> results = batch_process(
...     lesions,
...     analysis,
...     n_jobs=-1,
...     backend='threading'  # Works in Jupyter!
... )

Control parallelization:

>>> # Use all cores (default)
>>> results = batch_process(lesions, analysis, n_jobs=-1)
>>>
>>> # Use 4 cores
>>> results = batch_process(lesions, analysis, n_jobs=4)
>>>
>>> # Sequential (debugging)
>>> results = batch_process(lesions, analysis, n_jobs=1)

Chain multiple analyses:

>>> from lacuna.analysis import RegionalDamage, ParcelAggregation
>>>
>>> # First analysis
>>> regional = RegionalDamage()
>>> after_regional = batch_process(lesions, regional)
>>>
>>> # Second analysis on results
>>> aggregation = ParcelAggregation(source="maskimg")
>>> final = batch_process(after_regional, aggregation)
Notes
  • Progress bar requires tqdm package
  • Parallel processing requires joblib package
  • Individual subject failures emit warnings but don't stop the batch
  • Strategy selection is automatic based on analysis.batch_strategy attribute
Source code in src/lacuna/batch/api.py
def batch_process(
    inputs: list[SubjectData | VoxelMap] | None = None,
    analysis: BaseAnalysis | None = None,
    n_jobs: int = -1,
    show_progress: bool = True,
    strategy: str | None = None,
    backend: str = "loky",
    lesion_batch_size: int | None = None,
    batch_result_callback: Callable | None = None,
    progress_desc: str | None = None,
) -> list[SubjectData | ParcelData]:
    """
    Process multiple subjects through an analysis pipeline with automatic optimization.

    This function automatically selects the optimal processing strategy based on
    the analysis type and available system resources. It provides progress monitoring
    and graceful error handling for individual subject failures.

    Parameters
    ----------
    inputs : list[SubjectData] or list[VoxelMap]
        List of SubjectData or VoxelMap objects to process.
        All items must be of the same type (no mixing).
    analysis : BaseAnalysis
        Analysis instance to apply to each input
    n_jobs : int, default=-1
        Number of parallel jobs:
        - -1: Use all available CPU cores
        - 1: Sequential processing (useful for debugging)
        - N: Use N parallel workers
    show_progress : bool, default=True
        Display progress bar during processing
    strategy : str or None, default=None
        Force specific strategy:
        - None: Automatic selection based on analysis.batch_strategy
        - "parallel": Force parallel processing (joblib multiprocessing)
        - "sequential": Force sequential processing (one at a time)
        - "vectorized": Force vectorized processing (batch matrix operations)
    backend : str, default='loky'
        Joblib backend for parallel processing:
    lesion_batch_size : int or None, default=None
        For vectorized strategy: number of lesions to process together in memory.
        - None: Process all lesions at once (fastest, high memory)
        - N: Process N lesions at a time (balanced speed/memory)
        Only applies when using vectorized strategy. Ignored for parallel strategy.
    batch_result_callback : callable or None, default=None
        Callback function called after each lesion batch is processed.
        Signature: callback(batch_results: list[SubjectData]) -> None
        Use this to save results immediately and free memory.
        Example: batch_result_callback=lambda batch: [save(r) for r in batch]
        - 'loky': Robust multiprocessing (best for standalone scripts)
        - 'threading': Thread-based parallelism (use in Jupyter notebooks to avoid pickling issues)
        - 'multiprocessing': Standard multiprocessing
        Note: Only applies when using parallel processing (n_jobs > 1)

    Returns
    -------
    list[SubjectData]
        List of processed SubjectData objects with results added.
        Subjects that failed processing are excluded (warnings are emitted).

    Raises
    ------
    ValueError
        If inputs is empty or analysis is invalid
    RuntimeError
        If strategy selection or execution fails

    Examples
    --------
    Basic usage with automatic optimization:

    >>> from lacuna import batch_process
    >>> from lacuna.analysis import RegionalDamage
    >>> from lacuna.io import load_bids_dataset
    >>>
    >>> # Load subjects
    >>> dataset = load_bids_dataset("path/to/bids")
    >>> lesions = list(dataset.values())
    >>>
    >>> # Process with automatic strategy selection
    >>> analysis = RegionalDamage()
    >>> results = batch_process(lesions, analysis)
    >>> print(f"Processed {len(results)} subjects")

    Use in Jupyter notebooks (threading backend to avoid pickling issues):

    >>> results = batch_process(
    ...     lesions,
    ...     analysis,
    ...     n_jobs=-1,
    ...     backend='threading'  # Works in Jupyter!
    ... )

    Control parallelization:

    >>> # Use all cores (default)
    >>> results = batch_process(lesions, analysis, n_jobs=-1)
    >>>
    >>> # Use 4 cores
    >>> results = batch_process(lesions, analysis, n_jobs=4)
    >>>
    >>> # Sequential (debugging)
    >>> results = batch_process(lesions, analysis, n_jobs=1)

    Chain multiple analyses:

    >>> from lacuna.analysis import RegionalDamage, ParcelAggregation
    >>>
    >>> # First analysis
    >>> regional = RegionalDamage()
    >>> after_regional = batch_process(lesions, regional)
    >>>
    >>> # Second analysis on results
    >>> aggregation = ParcelAggregation(source="maskimg")
    >>> final = batch_process(after_regional, aggregation)

    Notes
    -----
    - Progress bar requires tqdm package
    - Parallel processing requires joblib package
    - Individual subject failures emit warnings but don't stop the batch
    - Strategy selection is automatic based on analysis.batch_strategy attribute
    """
    # Validate inputs
    if not inputs:
        raise ValueError("inputs cannot be empty")

    # Validate analysis parameter
    if analysis is None:
        raise ValueError("analysis parameter is required")

    # Validate input types (raises TypeError for invalid/mixed types)
    _detect_input_type(inputs)

    if not isinstance(analysis, BaseAnalysis):
        raise ValueError(f"analysis must be a BaseAnalysis instance, got {type(analysis)}")

    # Select processing strategy
    strategy_instance = select_strategy(
        analysis=analysis,
        n_subjects=len(inputs),
        n_jobs=n_jobs,
        force_strategy=strategy,
        backend=backend,
        lesion_batch_size=lesion_batch_size,
        batch_result_callback=batch_result_callback,
    )

    # Setup progress tracking
    progress_bar = None
    if show_progress:
        # Build progress bar description
        desc = progress_desc or analysis.__class__.__name__
        progress_bar = tqdm(
            total=len(inputs),
            desc=desc,
            unit="mask",
        )

        def progress_callback(idx: int) -> None:
            """Update progress bar."""
            if progress_bar:
                progress_bar.update(1)

    else:
        progress_callback = None

    # Execute batch processing
    try:
        results = strategy_instance.execute(
            inputs=inputs,
            analysis=analysis,
            progress_callback=progress_callback,
        )
    finally:
        # Close progress bar
        if progress_bar:
            progress_bar.close()

    return results

select_strategy(analysis, n_subjects, n_jobs=-1, force_strategy=None, backend='loky', lesion_batch_size=None, batch_result_callback=None)

Select batch processing strategy based on analysis.batch_strategy attribute.

Parameters:

Name Type Description Default
analysis BaseAnalysis

Analysis instance to be executed

required
n_subjects int

Number of subjects to process (currently unused, reserved for future)

required
n_jobs int

Number of parallel jobs requested

-1
force_strategy str or None

Override automatic selection. Options: - "parallel": Force parallel processing - "vectorized": Force vectorized processing

None
backend str

Joblib backend for parallel processing: - 'loky': Robust multiprocessing (best for standalone scripts) - 'threading': Thread-based (use in Jupyter notebooks) - 'multiprocessing': Standard multiprocessing

'loky'
lesion_batch_size int or None

For vectorized strategy: number of lesions to process together.

None
batch_result_callback callable or None

Callback function called after each batch is processed.

None

Returns:

Type Description
BatchStrategy

Instantiated strategy ready for execution

Raises:

Type Description
ValueError

If force_strategy is invalid

Source code in src/lacuna/batch/selection.py
def select_strategy(
    analysis: BaseAnalysis,
    n_subjects: int,
    n_jobs: int = -1,
    force_strategy: str | None = None,
    backend: str = "loky",
    lesion_batch_size: int | None = None,
    batch_result_callback: Callable | None = None,
) -> BatchStrategy:
    """
    Select batch processing strategy based on analysis.batch_strategy attribute.

    Parameters
    ----------
    analysis : BaseAnalysis
        Analysis instance to be executed
    n_subjects : int
        Number of subjects to process (currently unused, reserved for future)
    n_jobs : int, default=-1
        Number of parallel jobs requested
    force_strategy : str or None, default=None
        Override automatic selection. Options:
        - "parallel": Force parallel processing
        - "vectorized": Force vectorized processing
    backend : str, default='loky'
        Joblib backend for parallel processing:
        - 'loky': Robust multiprocessing (best for standalone scripts)
        - 'threading': Thread-based (use in Jupyter notebooks)
        - 'multiprocessing': Standard multiprocessing
    lesion_batch_size : int or None, default=None
        For vectorized strategy: number of lesions to process together.
    batch_result_callback : callable or None, default=None
        Callback function called after each batch is processed.

    Returns
    -------
    BatchStrategy
        Instantiated strategy ready for execution

    Raises
    ------
    ValueError
        If force_strategy is invalid
    """
    # Handle force override
    if force_strategy is not None:
        force_strategy = force_strategy.lower()

        if force_strategy == "parallel":
            return ParallelStrategy(n_jobs=n_jobs, backend=backend)
        elif force_strategy == "sequential":
            return SequentialStrategy(n_jobs=n_jobs)
        elif force_strategy == "vectorized":
            return VectorizedStrategy(
                n_jobs=n_jobs,
                lesion_batch_size=lesion_batch_size,
                batch_result_callback=batch_result_callback,
            )
        else:
            raise ValueError(
                f"Unknown strategy '{force_strategy}'. "
                f"Available strategies: 'parallel', 'sequential', 'vectorized'"
            )

    # Get strategy from analysis class attribute
    preferred_strategy = getattr(analysis, "batch_strategy", "parallel")

    # Dispatch to appropriate strategy
    if preferred_strategy == "vectorized":
        return VectorizedStrategy(
            n_jobs=n_jobs,
            lesion_batch_size=lesion_batch_size,
            batch_result_callback=batch_result_callback,
        )
    elif preferred_strategy == "sequential":
        return SequentialStrategy(n_jobs=n_jobs)
    elif preferred_strategy == "parallel":
        return ParallelStrategy(n_jobs=n_jobs, backend=backend)
    else:
        # Unknown strategy - fall back to parallel with warning
        warnings.warn(
            f"Unknown batch_strategy '{preferred_strategy}' in "
            f"{analysis.__class__.__name__}. Falling back to parallel.",
            RuntimeWarning,
            stacklevel=2,
        )
        return ParallelStrategy(n_jobs=n_jobs, backend=backend)