Skip to content

strategies

lacuna.batch.strategies

Batch processing strategies for different analysis types.

This module implements the Strategy pattern for batch processing, enabling automatic optimization based on analysis characteristics:

  • ParallelStrategy: Independent per-subject processing with multiprocessing
  • VectorizedStrategy: Batch matrix operations for connectome analyses

BatchStrategy

Bases: ABC

Abstract base class for batch processing strategies.

Each strategy implements a different approach to processing multiple subjects: - Parallel: Uses multiprocessing for independent analyses - Vectorized: Stacks data for batch matrix operations

Parameters:

Name Type Description Default
n_jobs int

Number of parallel jobs. -1 uses all available cores. Only relevant for ParallelStrategy.

-1
Source code in src/lacuna/batch/strategies.py
class BatchStrategy(ABC):
    """
    Abstract base class for batch processing strategies.

    Each strategy implements a different approach to processing multiple subjects:
    - Parallel: Uses multiprocessing for independent analyses
    - Vectorized: Stacks data for batch matrix operations

    Parameters
    ----------
    n_jobs : int, default=-1
        Number of parallel jobs. -1 uses all available cores.
        Only relevant for ParallelStrategy.
    """

    def __init__(self, n_jobs: int = -1):
        self.n_jobs = n_jobs

    @abstractmethod
    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute analysis on all lesions using this strategy.

        Parameters
        ----------
        inputs : list[SubjectData]
            List of lesions to process
        analysis : BaseAnalysis
            Analysis instance to apply to each lesion
        progress_callback : callable or None
            Optional callback function to report progress.
            Called with current index after each subject completes.

        Returns
        -------
        list[SubjectData]
            List of processed SubjectData objects with results added

        Raises
        ------
        RuntimeError
            If execution fails
        """
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        """Strategy name for logging and debugging."""
        pass

name abstractmethod property

Strategy name for logging and debugging.

execute(inputs, analysis, progress_callback=None) abstractmethod

Execute analysis on all lesions using this strategy.

Parameters:

Name Type Description Default
inputs list[SubjectData]

List of lesions to process

required
analysis BaseAnalysis

Analysis instance to apply to each lesion

required
progress_callback callable or None

Optional callback function to report progress. Called with current index after each subject completes.

None

Returns:

Type Description
list[SubjectData]

List of processed SubjectData objects with results added

Raises:

Type Description
RuntimeError

If execution fails

Source code in src/lacuna/batch/strategies.py
@abstractmethod
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute analysis on all lesions using this strategy.

    Parameters
    ----------
    inputs : list[SubjectData]
        List of lesions to process
    analysis : BaseAnalysis
        Analysis instance to apply to each lesion
    progress_callback : callable or None
        Optional callback function to report progress.
        Called with current index after each subject completes.

    Returns
    -------
    list[SubjectData]
        List of processed SubjectData objects with results added

    Raises
    ------
    RuntimeError
        If execution fails
    """
    pass

ParallelStrategy

Bases: BatchStrategy

Parallel batch processing using joblib multiprocessing.

Best for independent per-subject analyses (RegionalDamage, ParcelAggregation) Speedup on multi-core systems (proportional to available cores) Low memory overhead

This strategy processes each subject independently using joblib.Parallel. The backend can be configured to handle different environments: - 'loky' (default): Robust multiprocessing for standalone scripts - 'threading': Thread-based parallelism for Jupyter notebooks - 'multiprocessing': Standard multiprocessing (less robust than loky)

Parameters:

Name Type Description Default
n_jobs int

Number of parallel jobs: - -1: Use all available CPU cores - 1: Sequential processing (useful for debugging) - N: Use N parallel workers

-1
backend str

Joblib backend to use: - 'loky': Robust multiprocessing (best for scripts) - 'threading': Thread-based (use in Jupyter notebooks) - 'multiprocessing': Standard multiprocessing

'loky'

Examples:

>>> from lacuna.batch.strategies import ParallelStrategy
>>> from lacuna.analysis import RegionalDamage
>>>
>>> # For standalone scripts (default)
>>> strategy = ParallelStrategy(n_jobs=4)
>>> results = strategy.execute(lesions, RegionalDamage())
>>>
>>> # For Jupyter notebooks
>>> strategy = ParallelStrategy(n_jobs=4, backend='threading')
>>> results = strategy.execute(lesions, RegionalDamage())
Source code in src/lacuna/batch/strategies.py
class ParallelStrategy(BatchStrategy):
    """
    Parallel batch processing using joblib multiprocessing.

    Best for independent per-subject analyses (RegionalDamage, ParcelAggregation)
    Speedup on multi-core systems (proportional to available cores)
    Low memory overhead

    This strategy processes each subject independently using joblib.Parallel.
    The backend can be configured to handle different environments:
    - 'loky' (default): Robust multiprocessing for standalone scripts
    - 'threading': Thread-based parallelism for Jupyter notebooks
    - 'multiprocessing': Standard multiprocessing (less robust than loky)

    Parameters
    ----------
    n_jobs : int, default=-1
        Number of parallel jobs:
        - -1: Use all available CPU cores
        - 1: Sequential processing (useful for debugging)
        - N: Use N parallel workers
    backend : str, default='loky'
        Joblib backend to use:
        - 'loky': Robust multiprocessing (best for scripts)
        - 'threading': Thread-based (use in Jupyter notebooks)
        - 'multiprocessing': Standard multiprocessing

    Examples
    --------
    >>> from lacuna.batch.strategies import ParallelStrategy
    >>> from lacuna.analysis import RegionalDamage
    >>>
    >>> # For standalone scripts (default)
    >>> strategy = ParallelStrategy(n_jobs=4)
    >>> results = strategy.execute(lesions, RegionalDamage())
    >>>
    >>> # For Jupyter notebooks
    >>> strategy = ParallelStrategy(n_jobs=4, backend='threading')
    >>> results = strategy.execute(lesions, RegionalDamage())
    """

    def __init__(self, n_jobs: int = -1, backend: str = "loky"):
        super().__init__(n_jobs)
        self.backend = backend

        # Resolve -1 to actual core count
        if self.n_jobs == -1:
            import os

            self.n_jobs = (
                len(os.sched_getaffinity(0))
                if hasattr(os, "sched_getaffinity")
                else os.cpu_count() or 1
            )

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute parallel batch processing.

        Processes subjects in parallel using joblib. Each subject is processed
        independently, and failures are caught and reported as warnings without
        stopping the entire batch.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply
        progress_callback : callable or None
            Progress reporting function

        Returns
        -------
        list[SubjectData]
            Successfully processed subjects (failures are filtered out)
        """
        # Execute in parallel
        if self.n_jobs == 1:
            # Sequential processing (useful for debugging)
            results = []
            for i, lesion in enumerate(inputs):
                result = _process_one_subject(lesion, i, analysis)
                results.append(result)
                if progress_callback:
                    progress_callback(i)
        else:
            # Parallel processing with joblib using user-specified backend.
            # Uses module-level _process_one_subject for pickle compatibility
            # with all backends including standard 'multiprocessing'.
            # For process-based backends, inner_max_num_threads=1 prevents
            # BLAS/OMP oversubscription and fork-after-import deadlocks on
            # many-core nodes. The threading backend rejects the kwarg, so we
            # only pass it where it applies.
            backend_kwargs: dict = {}
            if self.backend in ("loky", "multiprocessing"):
                backend_kwargs["inner_max_num_threads"] = 1
            with parallel_backend(self.backend, **backend_kwargs):
                results = Parallel(n_jobs=self.n_jobs)(
                    delayed(_process_one_subject)(lesion, i, analysis)
                    for i, lesion in enumerate(inputs)
                )
            # Update progress bar once for the entire batch (not per-item to avoid duplicates)
            if progress_callback:
                for _ in range(len(results)):
                    progress_callback(0)  # Index doesn't matter, just triggers update

        # Sort by original index and filter out failures
        results = sorted(results, key=lambda x: x[0])
        successful_results = [r[1] for r in results if r[1] is not None]

        # Warn if any subjects failed
        n_failed = len(inputs) - len(successful_results)
        if n_failed > 0:
            warnings.warn(
                f"{n_failed} out of {len(inputs)} subjects failed processing. "
                "Check warnings above for details.",
                RuntimeWarning,
                stacklevel=2,
            )

        return successful_results

    @property
    def name(self) -> str:
        return "parallel"

execute(inputs, analysis, progress_callback=None)

Execute parallel batch processing.

Processes subjects in parallel using joblib. Each subject is processed independently, and failures are caught and reported as warnings without stopping the entire batch.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply

required
progress_callback callable or None

Progress reporting function

None

Returns:

Type Description
list[SubjectData]

Successfully processed subjects (failures are filtered out)

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute parallel batch processing.

    Processes subjects in parallel using joblib. Each subject is processed
    independently, and failures are caught and reported as warnings without
    stopping the entire batch.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply
    progress_callback : callable or None
        Progress reporting function

    Returns
    -------
    list[SubjectData]
        Successfully processed subjects (failures are filtered out)
    """
    # Execute in parallel
    if self.n_jobs == 1:
        # Sequential processing (useful for debugging)
        results = []
        for i, lesion in enumerate(inputs):
            result = _process_one_subject(lesion, i, analysis)
            results.append(result)
            if progress_callback:
                progress_callback(i)
    else:
        # Parallel processing with joblib using user-specified backend.
        # Uses module-level _process_one_subject for pickle compatibility
        # with all backends including standard 'multiprocessing'.
        # For process-based backends, inner_max_num_threads=1 prevents
        # BLAS/OMP oversubscription and fork-after-import deadlocks on
        # many-core nodes. The threading backend rejects the kwarg, so we
        # only pass it where it applies.
        backend_kwargs: dict = {}
        if self.backend in ("loky", "multiprocessing"):
            backend_kwargs["inner_max_num_threads"] = 1
        with parallel_backend(self.backend, **backend_kwargs):
            results = Parallel(n_jobs=self.n_jobs)(
                delayed(_process_one_subject)(lesion, i, analysis)
                for i, lesion in enumerate(inputs)
            )
        # Update progress bar once for the entire batch (not per-item to avoid duplicates)
        if progress_callback:
            for _ in range(len(results)):
                progress_callback(0)  # Index doesn't matter, just triggers update

    # Sort by original index and filter out failures
    results = sorted(results, key=lambda x: x[0])
    successful_results = [r[1] for r in results if r[1] is not None]

    # Warn if any subjects failed
    n_failed = len(inputs) - len(successful_results)
    if n_failed > 0:
        warnings.warn(
            f"{n_failed} out of {len(inputs)} subjects failed processing. "
            "Check warnings above for details.",
            RuntimeWarning,
            stacklevel=2,
        )

    return successful_results

SequentialStrategy

Bases: BatchStrategy

Sequential batch processing for analyses that should not run in parallel.

Best for analyses with external dependencies that don't benefit from parallel execution (e.g., StructuralNetworkMapping with MRtrix3).

This strategy processes each subject one at a time, regardless of the n_jobs parameter. It's designed for analyses where: - External tools handle their own internal parallelization (e.g., tckedit -nthreads) - Running multiple instances in parallel causes resource contention - Memory-mapped files or shared resources would conflict

Parameters:

Name Type Description Default
n_jobs int

Not used for sequential processing. Kept for interface compatibility. The analysis itself may use internal parallelization (e.g., MRtrix3's -nthreads).

-1

Examples:

>>> from lacuna.batch.strategies import SequentialStrategy
>>> from lacuna.analysis import StructuralNetworkMapping
>>>
>>> strategy = SequentialStrategy()
>>> results = strategy.execute(masks, StructuralNetworkMapping(...))
Source code in src/lacuna/batch/strategies.py
class SequentialStrategy(BatchStrategy):
    """
    Sequential batch processing for analyses that should not run in parallel.

    Best for analyses with external dependencies that don't benefit from
    parallel execution (e.g., StructuralNetworkMapping with MRtrix3).

    This strategy processes each subject one at a time, regardless of the
    n_jobs parameter. It's designed for analyses where:
    - External tools handle their own internal parallelization (e.g., tckedit -nthreads)
    - Running multiple instances in parallel causes resource contention
    - Memory-mapped files or shared resources would conflict

    Parameters
    ----------
    n_jobs : int, default=-1
        Not used for sequential processing. Kept for interface compatibility.
        The analysis itself may use internal parallelization (e.g., MRtrix3's -nthreads).

    Examples
    --------
    >>> from lacuna.batch.strategies import SequentialStrategy
    >>> from lacuna.analysis import StructuralNetworkMapping
    >>>
    >>> strategy = SequentialStrategy()
    >>> results = strategy.execute(masks, StructuralNetworkMapping(...))
    """

    def __init__(self, n_jobs: int = -1):
        super().__init__(n_jobs)

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute sequential batch processing.

        Processes subjects one at a time. Failures are caught and reported
        as warnings without stopping the entire batch.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply
        progress_callback : callable or None
            Progress reporting function

        Returns
        -------
        list[SubjectData]
            Successfully processed subjects (failures are filtered out)
        """
        results = []
        for i, lesion in enumerate(inputs):
            idx, result = _process_one_subject(lesion, i, analysis)
            results.append((idx, result))
            if progress_callback:
                progress_callback(i)

        # Sort by original index and filter out failures
        results = sorted(results, key=lambda x: x[0])
        successful_results = [r[1] for r in results if r[1] is not None]

        # Warn if any subjects failed
        n_failed = len(inputs) - len(successful_results)
        if n_failed > 0:
            warnings.warn(
                f"{n_failed} out of {len(inputs)} subjects failed processing. "
                "Check warnings above for details.",
                RuntimeWarning,
                stacklevel=2,
            )

        return successful_results

    @property
    def name(self) -> str:
        return "sequential"

execute(inputs, analysis, progress_callback=None)

Execute sequential batch processing.

Processes subjects one at a time. Failures are caught and reported as warnings without stopping the entire batch.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply

required
progress_callback callable or None

Progress reporting function

None

Returns:

Type Description
list[SubjectData]

Successfully processed subjects (failures are filtered out)

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute sequential batch processing.

    Processes subjects one at a time. Failures are caught and reported
    as warnings without stopping the entire batch.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply
    progress_callback : callable or None
        Progress reporting function

    Returns
    -------
    list[SubjectData]
        Successfully processed subjects (failures are filtered out)
    """
    results = []
    for i, lesion in enumerate(inputs):
        idx, result = _process_one_subject(lesion, i, analysis)
        results.append((idx, result))
        if progress_callback:
            progress_callback(i)

    # Sort by original index and filter out failures
    results = sorted(results, key=lambda x: x[0])
    successful_results = [r[1] for r in results if r[1] is not None]

    # Warn if any subjects failed
    n_failed = len(inputs) - len(successful_results)
    if n_failed > 0:
        warnings.warn(
            f"{n_failed} out of {len(inputs)} subjects failed processing. "
            "Check warnings above for details.",
            RuntimeWarning,
            stacklevel=2,
        )

    return successful_results

VectorizedStrategy

Bases: BatchStrategy

Vectorized batch processing using batched NumPy operations.

Best for matrix-based analyses (FunctionalNetworkMapping) Speedup via optimized BLAS operations and reduced overhead Moderate memory overhead (processes lesions in configurable batches)

This strategy leverages vectorized operations to process multiple lesions simultaneously through each connectome batch. Instead of: for lesion in lesions: for connectome_batch in batches: process(lesion, connectome_batch)

It does: for connectome_batch in batches: process_all_lesions_together(lesions_batch, connectome_batch)

This dramatically reduces overhead and enables efficient BLAS operations.

The analysis class must implement: run_batch(inputs: list[SubjectData]) -> list[SubjectData]

Parameters:

Name Type Description Default
n_jobs int

Not used for vectorized processing (uses BLAS parallelization instead). Kept for interface compatibility.

-1
lesion_batch_size int or None

Number of lesions to process together in memory. - None: Process all lesions together (maximum speed, high memory) - N: Process N lesions at a time (balanced speed/memory) Useful when processing hundreds of lesions.

None

Examples:

>>> from lacuna.batch.strategies import VectorizedStrategy
>>> from lacuna.analysis import FunctionalNetworkMapping
>>>
>>> # Process all lesions together (fastest)
>>> strategy = VectorizedStrategy()
>>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
>>>
>>> # Process 50 lesions at a time (memory-constrained)
>>> strategy = VectorizedStrategy(lesion_batch_size=50)
>>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
Source code in src/lacuna/batch/strategies.py
class VectorizedStrategy(BatchStrategy):
    """
    Vectorized batch processing using batched NumPy operations.

    Best for matrix-based analyses (FunctionalNetworkMapping)
    Speedup via optimized BLAS operations and reduced overhead
    Moderate memory overhead (processes lesions in configurable batches)

    This strategy leverages vectorized operations to process multiple lesions
    simultaneously through each connectome batch. Instead of:
        for lesion in lesions:
            for connectome_batch in batches:
                process(lesion, connectome_batch)

    It does:
        for connectome_batch in batches:
            process_all_lesions_together(lesions_batch, connectome_batch)

    This dramatically reduces overhead and enables efficient BLAS operations.

    The analysis class must implement:
        run_batch(inputs: list[SubjectData]) -> list[SubjectData]

    Parameters
    ----------
    n_jobs : int, default=-1
        Not used for vectorized processing (uses BLAS parallelization instead).
        Kept for interface compatibility.
    lesion_batch_size : int or None, default=None
        Number of lesions to process together in memory.
        - None: Process all lesions together (maximum speed, high memory)
        - N: Process N lesions at a time (balanced speed/memory)
        Useful when processing hundreds of lesions.

    Examples
    --------
    >>> from lacuna.batch.strategies import VectorizedStrategy
    >>> from lacuna.analysis import FunctionalNetworkMapping
    >>>
    >>> # Process all lesions together (fastest)
    >>> strategy = VectorizedStrategy()
    >>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
    >>>
    >>> # Process 50 lesions at a time (memory-constrained)
    >>> strategy = VectorizedStrategy(lesion_batch_size=50)
    >>> results = strategy.execute(lesions, FunctionalNetworkMapping(...))
    """

    def __init__(
        self,
        n_jobs: int = -1,
        lesion_batch_size: int | None = None,
        batch_result_callback: Callable[[list[SubjectData]], None] | None = None,
    ):
        super().__init__(n_jobs)
        self.lesion_batch_size = lesion_batch_size
        self.batch_result_callback = batch_result_callback

    def execute(
        self,
        inputs: list[SubjectData],
        analysis: BaseAnalysis,
        progress_callback: Callable[[int], None] | None = None,
    ) -> list[SubjectData]:
        """
        Execute vectorized batch processing.

        Calls analysis.run_batch() which processes multiple lesions together
        using vectorized operations. Falls back to sequential processing if
        run_batch() is not implemented.

        Parameters
        ----------
        inputs : list[SubjectData]
            Subjects to process
        analysis : BaseAnalysis
            Analysis to apply (must implement run_batch method)
        progress_callback : callable or None
            Progress reporting function (called after each lesion batch)

        Returns
        -------
        list[SubjectData]
            Processed subjects

        Raises
        ------
        NotImplementedError
            If analysis doesn't implement run_batch()
        """
        # Check if analysis supports batch processing
        if not hasattr(analysis, "run_batch") or not callable(analysis.run_batch):
            raise NotImplementedError(
                f"Analysis {analysis.__class__.__name__} does not implement "
                f"run_batch() method required for vectorized strategy. "
                f"Please use parallel strategy instead or implement run_batch()."
            )

        # Process all lesions together if no batch size specified
        if self.lesion_batch_size is None:
            results = analysis.run_batch(inputs)

            # Call batch result callback if provided
            if self.batch_result_callback:
                self.batch_result_callback(results)

            # Update progress
            if progress_callback:
                for i in range(len(results)):
                    progress_callback(i)

            return results

        # Process lesions in batches
        all_results = []
        n_lesions = len(inputs)

        for batch_start in range(0, n_lesions, self.lesion_batch_size):
            batch_end = min(batch_start + self.lesion_batch_size, n_lesions)
            lesion_batch = inputs[batch_start:batch_end]

            # Process this batch
            batch_results = analysis.run_batch(lesion_batch)

            # Call batch result callback if provided (for immediate saving)
            if self.batch_result_callback:
                self.batch_result_callback(batch_results)

            all_results.extend(batch_results)

            # Update progress
            if progress_callback:
                for i in range(batch_start, batch_end):
                    progress_callback(i)

        return all_results

    @property
    def name(self) -> str:
        return "vectorized"

execute(inputs, analysis, progress_callback=None)

Execute vectorized batch processing.

Calls analysis.run_batch() which processes multiple lesions together using vectorized operations. Falls back to sequential processing if run_batch() is not implemented.

Parameters:

Name Type Description Default
inputs list[SubjectData]

Subjects to process

required
analysis BaseAnalysis

Analysis to apply (must implement run_batch method)

required
progress_callback callable or None

Progress reporting function (called after each lesion batch)

None

Returns:

Type Description
list[SubjectData]

Processed subjects

Raises:

Type Description
NotImplementedError

If analysis doesn't implement run_batch()

Source code in src/lacuna/batch/strategies.py
def execute(
    self,
    inputs: list[SubjectData],
    analysis: BaseAnalysis,
    progress_callback: Callable[[int], None] | None = None,
) -> list[SubjectData]:
    """
    Execute vectorized batch processing.

    Calls analysis.run_batch() which processes multiple lesions together
    using vectorized operations. Falls back to sequential processing if
    run_batch() is not implemented.

    Parameters
    ----------
    inputs : list[SubjectData]
        Subjects to process
    analysis : BaseAnalysis
        Analysis to apply (must implement run_batch method)
    progress_callback : callable or None
        Progress reporting function (called after each lesion batch)

    Returns
    -------
    list[SubjectData]
        Processed subjects

    Raises
    ------
    NotImplementedError
        If analysis doesn't implement run_batch()
    """
    # Check if analysis supports batch processing
    if not hasattr(analysis, "run_batch") or not callable(analysis.run_batch):
        raise NotImplementedError(
            f"Analysis {analysis.__class__.__name__} does not implement "
            f"run_batch() method required for vectorized strategy. "
            f"Please use parallel strategy instead or implement run_batch()."
        )

    # Process all lesions together if no batch size specified
    if self.lesion_batch_size is None:
        results = analysis.run_batch(inputs)

        # Call batch result callback if provided
        if self.batch_result_callback:
            self.batch_result_callback(results)

        # Update progress
        if progress_callback:
            for i in range(len(results)):
                progress_callback(i)

        return results

    # Process lesions in batches
    all_results = []
    n_lesions = len(inputs)

    for batch_start in range(0, n_lesions, self.lesion_batch_size):
        batch_end = min(batch_start + self.lesion_batch_size, n_lesions)
        lesion_batch = inputs[batch_start:batch_end]

        # Process this batch
        batch_results = analysis.run_batch(lesion_batch)

        # Call batch result callback if provided (for immediate saving)
        if self.batch_result_callback:
            self.batch_result_callback(batch_results)

        all_results.extend(batch_results)

        # Update progress
        if progress_callback:
            for i in range(batch_start, batch_end):
                progress_callback(i)

    return all_results