Skip to content

io

lacuna.io

Input/Output module for loading and saving lesion data.

Provides functions for: - Loading lesion masks from NIfTI files - Loading BIDS datasets - Exporting results to BIDS derivatives format - Saving NIfTI files - Exporting analysis results to CSV/TSV/JSON - Fetching and caching reference datasets (atlases, templates) - Converting connectome data to Lacuna HDF5 format - Downloading and registering connectomes (GSP1000, dTOR985)

BidsError

Bases: LacunaError

Raised when BIDS dataset operations fail.

Source code in src/lacuna/io/bids.py
class BidsError(LacunaError):
    """Raised when BIDS dataset operations fail."""

    pass

ConnectomeSource dataclass

Configuration for a fetchable connectome source.

Source code in src/lacuna/io/downloaders/base.py
@dataclass
class ConnectomeSource:
    """Configuration for a fetchable connectome source."""

    name: str
    """Unique identifier (e.g., 'gsp1000', 'dtor985')."""

    display_name: str
    """Human-readable name (e.g., 'GSP1000 Functional Connectome')."""

    type: Literal["functional", "structural"]
    """Connectome type determining processing pipeline."""

    description: str
    """User-facing description of the connectome."""

    source_type: Literal["dataverse", "figshare", "github"]
    """Download source requiring specific authentication/handling."""

    # Dataverse-specific
    persistent_id: str | None = None
    """DOI for Dataverse datasets (e.g., 'doi:10.7910/DVN/ILXIKS')."""

    dataverse_server: str = "https://dataverse.harvard.edu"
    """Dataverse server URL."""

    # Figshare-specific
    download_url: str | None = None
    """Direct download URL for Figshare files (deprecated, use article_id)."""

    article_id: int | None = None
    """Figshare article ID for API-based downloads."""

    # Processing
    default_batches: int = 10
    """Default number of HDF5 batches (functional only)."""

    requires_mask: bool = False
    """Whether brain mask is needed for processing."""

    mask_url: str | None = None
    """URL to download brain mask if required."""

    # Metadata
    n_subjects: int = 0
    """Number of subjects in the connectome."""

    space: str = "MNI152NLin6Asym"
    """Coordinate space."""

    estimated_size_gb: float = 0.0
    """Estimated download size in GB for user information."""

    citation: str = ""
    """Citation text for this connectome dataset."""

article_id = None class-attribute instance-attribute

Figshare article ID for API-based downloads.

citation = '' class-attribute instance-attribute

Citation text for this connectome dataset.

dataverse_server = 'https://dataverse.harvard.edu' class-attribute instance-attribute

Dataverse server URL.

default_batches = 10 class-attribute instance-attribute

Default number of HDF5 batches (functional only).

description instance-attribute

User-facing description of the connectome.

display_name instance-attribute

Human-readable name (e.g., 'GSP1000 Functional Connectome').

download_url = None class-attribute instance-attribute

Direct download URL for Figshare files (deprecated, use article_id).

estimated_size_gb = 0.0 class-attribute instance-attribute

Estimated download size in GB for user information.

mask_url = None class-attribute instance-attribute

URL to download brain mask if required.

n_subjects = 0 class-attribute instance-attribute

Number of subjects in the connectome.

name instance-attribute

Unique identifier (e.g., 'gsp1000', 'dtor985').

persistent_id = None class-attribute instance-attribute

DOI for Dataverse datasets (e.g., 'doi:10.7910/DVN/ILXIKS').

requires_mask = False class-attribute instance-attribute

Whether brain mask is needed for processing.

source_type instance-attribute

Download source requiring specific authentication/handling.

space = 'MNI152NLin6Asym' class-attribute instance-attribute

Coordinate space.

type instance-attribute

Connectome type determining processing pipeline.

FetchConfig dataclass

Configuration for a connectome fetch operation.

Source code in src/lacuna/io/downloaders/base.py
@dataclass
class FetchConfig:
    """Configuration for a connectome fetch operation."""

    connectome: str
    """Connectome name to fetch (e.g., 'gsp1000', 'dtor985')."""

    output_dir: Path
    """Directory for processed output files."""

    # Authentication
    api_key: str | None = None
    """Dataverse API key (for GSP1000). Can also use DATAVERSE_API_KEY env var."""

    # Processing options
    batches: int = 10
    """Number of HDF5 batch files for functional connectomes."""

    keep_original: bool = True
    """Keep original downloaded files after processing."""

    # Registration
    register: bool = True
    """Automatically register connectome after processing."""

    register_name: str | None = None
    """Custom name for registration. Defaults to source name (e.g., 'GSP1000')."""

    # Behavior
    force: bool = False
    """Overwrite existing files and registrations."""

    resume: bool = True
    """Resume interrupted downloads."""

    @classmethod
    def from_cli_args(cls, args: argparse.Namespace) -> FetchConfig:
        """Create config from CLI arguments."""
        return cls(
            connectome=getattr(args, "connectome", ""),
            output_dir=Path(getattr(args, "output_dir", ".")),
            api_key=getattr(args, "api_key", None),
            batches=getattr(args, "batches", 10),
            keep_original=not getattr(args, "no_keep_original", False),
            register=not getattr(args, "no_register", False),
            register_name=getattr(args, "register_name", None),
            force=getattr(args, "force", False),
            resume=getattr(args, "resume", True),
        )

    def get_api_key(self) -> str | None:
        """Get API key from config, env var, or config file."""
        if self.api_key:
            return self.api_key
        if key := os.environ.get("DATAVERSE_API_KEY"):
            return key
        # Check config file
        return _load_config_file_key()

api_key = None class-attribute instance-attribute

Dataverse API key (for GSP1000). Can also use DATAVERSE_API_KEY env var.

batches = 10 class-attribute instance-attribute

Number of HDF5 batch files for functional connectomes.

connectome instance-attribute

Connectome name to fetch (e.g., 'gsp1000', 'dtor985').

force = False class-attribute instance-attribute

Overwrite existing files and registrations.

keep_original = True class-attribute instance-attribute

Keep original downloaded files after processing.

output_dir instance-attribute

Directory for processed output files.

register = True class-attribute instance-attribute

Automatically register connectome after processing.

register_name = None class-attribute instance-attribute

Custom name for registration. Defaults to source name (e.g., 'GSP1000').

resume = True class-attribute instance-attribute

Resume interrupted downloads.

from_cli_args(args) classmethod

Create config from CLI arguments.

Source code in src/lacuna/io/downloaders/base.py
@classmethod
def from_cli_args(cls, args: argparse.Namespace) -> FetchConfig:
    """Create config from CLI arguments."""
    return cls(
        connectome=getattr(args, "connectome", ""),
        output_dir=Path(getattr(args, "output_dir", ".")),
        api_key=getattr(args, "api_key", None),
        batches=getattr(args, "batches", 10),
        keep_original=not getattr(args, "no_keep_original", False),
        register=not getattr(args, "no_register", False),
        register_name=getattr(args, "register_name", None),
        force=getattr(args, "force", False),
        resume=getattr(args, "resume", True),
    )

get_api_key()

Get API key from config, env var, or config file.

Source code in src/lacuna/io/downloaders/base.py
def get_api_key(self) -> str | None:
    """Get API key from config, env var, or config file."""
    if self.api_key:
        return self.api_key
    if key := os.environ.get("DATAVERSE_API_KEY"):
        return key
    # Check config file
    return _load_config_file_key()

FetchProgress dataclass

Progress information for fetch operations.

Source code in src/lacuna/io/downloaders/base.py
@dataclass
class FetchProgress:
    """Progress information for fetch operations."""

    phase: Literal["download", "processing", "registration"]
    """Current operation phase."""

    current_file: str
    """Name of file currently being processed."""

    files_completed: int
    """Number of files completed."""

    files_total: int
    """Total number of files to process."""

    bytes_transferred: int = 0
    """Bytes transferred in current download."""

    bytes_total: int = 0
    """Total bytes for current download."""

    message: str = ""
    """Human-readable status message."""

    @property
    def percent_complete(self) -> float:
        """Overall percentage completion."""
        if self.files_total == 0:
            return 0.0
        return (self.files_completed / self.files_total) * 100

    @property
    def download_percent(self) -> float:
        """Current file download percentage."""
        if self.bytes_total == 0:
            return 0.0
        return (self.bytes_transferred / self.bytes_total) * 100

bytes_total = 0 class-attribute instance-attribute

Total bytes for current download.

bytes_transferred = 0 class-attribute instance-attribute

Bytes transferred in current download.

current_file instance-attribute

Name of file currently being processed.

download_percent property

Current file download percentage.

files_completed instance-attribute

Number of files completed.

files_total instance-attribute

Total number of files to process.

message = '' class-attribute instance-attribute

Human-readable status message.

percent_complete property

Overall percentage completion.

phase instance-attribute

Current operation phase.

FetchResult dataclass

Result of a connectome fetch operation.

Source code in src/lacuna/io/downloaders/base.py
@dataclass
class FetchResult:
    """Result of a connectome fetch operation."""

    success: bool
    """Whether the operation completed successfully."""

    connectome_name: str
    """Name of the fetched connectome."""

    output_dir: Path
    """Directory containing processed files."""

    output_files: list[Path] = field(default_factory=list)
    """List of created output files."""

    registered: bool = False
    """Whether the connectome was registered."""

    register_name: str | None = None
    """Name used for registration, or None if not registered."""

    duration_seconds: float = 0.0
    """Total operation time in seconds."""

    download_time_seconds: float = 0.0
    """Time spent downloading."""

    processing_time_seconds: float = 0.0
    """Time spent processing."""

    warnings: list[str] = field(default_factory=list)
    """Non-fatal warnings encountered."""

    error: str | None = None
    """Error message if success=False."""

    def summary(self) -> str:
        """Generate human-readable summary."""
        if self.success:
            return (
                f"✅ Successfully fetched {self.connectome_name}\n"
                f"   Output: {self.output_dir}\n"
                f"   Files: {len(self.output_files)}\n"
                f"   Registered as: {self.register_name or 'not registered'}\n"
                f"   Time: {self.download_time_seconds:.1f}s download, "
                f"{self.processing_time_seconds:.1f}s processing"
            )
        return f"❌ Failed to fetch {self.connectome_name}: {self.error}"

connectome_name instance-attribute

Name of the fetched connectome.

download_time_seconds = 0.0 class-attribute instance-attribute

Time spent downloading.

duration_seconds = 0.0 class-attribute instance-attribute

Total operation time in seconds.

error = None class-attribute instance-attribute

Error message if success=False.

output_dir instance-attribute

Directory containing processed files.

output_files = field(default_factory=list) class-attribute instance-attribute

List of created output files.

processing_time_seconds = 0.0 class-attribute instance-attribute

Time spent processing.

register_name = None class-attribute instance-attribute

Name used for registration, or None if not registered.

registered = False class-attribute instance-attribute

Whether the connectome was registered.

success instance-attribute

Whether the operation completed successfully.

warnings = field(default_factory=list) class-attribute instance-attribute

Non-fatal warnings encountered.

summary()

Generate human-readable summary.

Source code in src/lacuna/io/downloaders/base.py
def summary(self) -> str:
    """Generate human-readable summary."""
    if self.success:
        return (
            f"✅ Successfully fetched {self.connectome_name}\n"
            f"   Output: {self.output_dir}\n"
            f"   Files: {len(self.output_files)}\n"
            f"   Registered as: {self.register_name or 'not registered'}\n"
            f"   Time: {self.download_time_seconds:.1f}s download, "
            f"{self.processing_time_seconds:.1f}s processing"
        )
    return f"❌ Failed to fetch {self.connectome_name}: {self.error}"

batch_export_to_csv(mask_data_list, output_path, analysis_name=None, include_metadata=True)

Export results from multiple SubjectData objects to a single CSV.

Combines results from multiple subjects into one CSV file with each row representing one subject. Ideal for group-level statistical analysis.

Parameters:

Name Type Description Default
mask_data_list list[SubjectData]

List of SubjectData objects (typically from batch processing)

required
output_path str or Path

Output CSV file path

required
analysis_name str

Specific analysis to export. If None, exports all results.

None
include_metadata bool

Include subject metadata as columns

True

Returns:

Type Description
Path

Path to created CSV file

Raises:

Type Description
ValueError

If list is empty or subjects have no results

Examples:

>>> from lacuna.io import load_bids_dataset, batch_export_to_csv
>>> from lacuna.analysis import RegionalDamage
>>>
>>> # Load multiple subjects
>>> dataset = load_bids_dataset("bids_dir")
>>> analysis = RegionalDamage()
>>>
>>> # Run analysis on all subjects
>>> results = [analysis.run(lesion) for lesion in dataset.values()]
>>>
>>> # Export to single CSV for group analysis
>>> batch_export_to_csv(results, "group_results.csv")
Notes
  • All subjects must have the same analysis results structure
  • Missing values are filled with NaN
  • Each row represents one subject
  • Columns are shared across all subjects
Source code in src/lacuna/io/export.py
def batch_export_to_csv(
    mask_data_list: list[SubjectData],
    output_path: str | Path,
    analysis_name: str | None = None,
    include_metadata: bool = True,
) -> Path:
    """
    Export results from multiple SubjectData objects to a single CSV.

    Combines results from multiple subjects into one CSV file with each
    row representing one subject. Ideal for group-level statistical analysis.

    Parameters
    ----------
    mask_data_list : list[SubjectData]
        List of SubjectData objects (typically from batch processing)
    output_path : str or Path
        Output CSV file path
    analysis_name : str, optional
        Specific analysis to export. If None, exports all results.
    include_metadata : bool, default=True
        Include subject metadata as columns

    Returns
    -------
    Path
        Path to created CSV file

    Raises
    ------
    ValueError
        If list is empty or subjects have no results

    Examples
    --------
    >>> from lacuna.io import load_bids_dataset, batch_export_to_csv
    >>> from lacuna.analysis import RegionalDamage
    >>>
    >>> # Load multiple subjects
    >>> dataset = load_bids_dataset("bids_dir")
    >>> analysis = RegionalDamage()
    >>>
    >>> # Run analysis on all subjects
    >>> results = [analysis.run(lesion) for lesion in dataset.values()]
    >>>
    >>> # Export to single CSV for group analysis
    >>> batch_export_to_csv(results, "group_results.csv")

    Notes
    -----
    - All subjects must have the same analysis results structure
    - Missing values are filled with NaN
    - Each row represents one subject
    - Columns are shared across all subjects
    """
    if not mask_data_list:
        raise ValueError("mask_data_list is empty")

    output_path = Path(output_path)

    # Collect all rows
    rows = []
    for mask_data in mask_data_list:
        if not mask_data.results:
            continue  # Skip subjects without results

        row_data = {}

        # Add metadata if requested
        if include_metadata:
            row_data["subject_id"] = mask_data.metadata.get("subject_id", "unknown")
            row_data["session_id"] = mask_data.metadata.get("session_id", "")
            row_data["coordinate_space"] = mask_data.get_coordinate_space()

        # Filter by analysis name
        if analysis_name:
            if analysis_name not in mask_data.results:
                continue  # Skip subjects without this analysis
            results_to_export = {analysis_name: mask_data.results[analysis_name]}
        else:
            results_to_export = mask_data.results

        # Flatten results
        for analysis, results_dict in results_to_export.items():
            if isinstance(results_dict, dict):
                for key, value in results_dict.items():
                    col_name = f"{analysis}.{key}"
                    if isinstance(value, (list, tuple)) and len(value) == 1:
                        row_data[col_name] = value[0]
                    else:
                        row_data[col_name] = value
            else:
                row_data[analysis] = results_dict

        rows.append(row_data)

    if not rows:
        raise ValueError("No results to export. Ensure subjects have analysis results.")

    # Create DataFrame and save
    df = pd.DataFrame(rows)
    df.to_csv(output_path, index=False)

    return output_path

batch_export_to_tsv(mask_data_list, output_path, analysis_name=None, include_metadata=True)

Export results from multiple SubjectData objects to a single TSV.

Identical to batch_export_to_csv but uses tab delimiter. TSV is preferred in neuroimaging for BIDS compatibility.

Parameters:

Name Type Description Default
mask_data_list list[SubjectData]

List of SubjectData objects

required
output_path str or Path

Output TSV file path

required
analysis_name str

Specific analysis to export

None
include_metadata bool

Include subject metadata as columns

True

Returns:

Type Description
Path

Path to created TSV file

Raises:

Type Description
ValueError

If list is empty or subjects have no results

Examples:

>>> from lacuna.io import batch_export_to_tsv
>>>
>>> # Export group results to BIDS-compatible TSV
>>> batch_export_to_tsv(results, "group_results.tsv")
See Also

batch_export_to_csv : CSV batch export

Source code in src/lacuna/io/export.py
def batch_export_to_tsv(
    mask_data_list: list[SubjectData],
    output_path: str | Path,
    analysis_name: str | None = None,
    include_metadata: bool = True,
) -> Path:
    """
    Export results from multiple SubjectData objects to a single TSV.

    Identical to batch_export_to_csv but uses tab delimiter.
    TSV is preferred in neuroimaging for BIDS compatibility.

    Parameters
    ----------
    mask_data_list : list[SubjectData]
        List of SubjectData objects
    output_path : str or Path
        Output TSV file path
    analysis_name : str, optional
        Specific analysis to export
    include_metadata : bool, default=True
        Include subject metadata as columns

    Returns
    -------
    Path
        Path to created TSV file

    Raises
    ------
    ValueError
        If list is empty or subjects have no results

    Examples
    --------
    >>> from lacuna.io import batch_export_to_tsv
    >>>
    >>> # Export group results to BIDS-compatible TSV
    >>> batch_export_to_tsv(results, "group_results.tsv")

    See Also
    --------
    batch_export_to_csv : CSV batch export
    """
    if not mask_data_list:
        raise ValueError("mask_data_list is empty")

    output_path = Path(output_path)

    # Collect all rows (same as CSV version)
    rows = []
    for mask_data in mask_data_list:
        if not mask_data.results:
            continue

        row_data = {}

        if include_metadata:
            row_data["subject_id"] = mask_data.metadata.get("subject_id", "unknown")
            row_data["session_id"] = mask_data.metadata.get("session_id", "")
            row_data["coordinate_space"] = mask_data.get_coordinate_space()

        if analysis_name:
            if analysis_name not in mask_data.results:
                continue
            results_to_export = {analysis_name: mask_data.results[analysis_name]}
        else:
            results_to_export = mask_data.results

        for analysis, results_dict in results_to_export.items():
            if isinstance(results_dict, dict):
                for key, value in results_dict.items():
                    col_name = f"{analysis}.{key}"
                    if isinstance(value, (list, tuple)) and len(value) == 1:
                        row_data[col_name] = value[0]
                    else:
                        row_data[col_name] = value
            else:
                row_data[analysis] = results_dict

        rows.append(row_data)

    if not rows:
        raise ValueError("No results to export. Ensure subjects have analysis results.")

    # Create DataFrame and save with tab delimiter
    df = pd.DataFrame(rows)
    df.to_csv(output_path, sep="\t", index=False)

    return output_path

export_bids_derivatives(subject_data, output_dir, export_lesion_mask=True, export_voxelmaps=True, export_parcel_data=True, export_connectivity=True, export_scalars=True, export_provenance=True, overwrite=False)

Export SubjectData and all its analysis results to BIDS derivatives format.

Exports the full spectrum of results stored in a SubjectData object: - Lesion mask as NIfTI - VoxelMaps (correlation maps, disconnection maps, etc.) as NIfTI - ParcelData (regional values) as TSV - ConnectivityMatrix as TSV - ScalarMetric and other scalars as JSON - Processing provenance as JSON

Parameters:

Name Type Description Default
subject_data SubjectData

Processed lesion data with analysis results.

required
output_dir str or Path

Root directory for derivatives (e.g., 'derivatives/lacuna-v0.1.0').

required
export_lesion_mask bool

Save the original lesion mask as NIfTI file.

True
export_voxelmaps bool

Save VoxelMap results (e.g., correlation maps, z-maps) as NIfTI files.

True
export_parcel_data bool

Save ParcelData results (regional aggregations) as TSV files.

True
export_connectivity bool

Save ConnectivityMatrix results as TSV files.

True
export_scalars bool

Save ScalarMetric and other scalar results as JSON files.

True
export_provenance bool

Save processing provenance as JSON.

True
overwrite bool

Overwrite existing files.

False

Returns:

Type Description
Path

Path to created subject derivatives directory.

Raises:

Type Description
FileExistsError

If output files exist and overwrite=False.

ValueError

If subject_data has no subject_id in metadata.

Examples:

>>> # Export all results
>>> output_path = export_bids_derivatives(
...     subject_data,
...     'derivatives/lacuna-v0.1.0'
... )
>>> print(f"Derivatives saved to: {output_path}")
>>>
>>> # Export only VoxelMaps (NIfTI files)
>>> export_bids_derivatives(
...     subject_data,
...     'derivatives/lacuna-v0.1.0',
...     export_lesion_mask=False,
...     export_parcel_data=False,
...     export_connectivity=False,
...     export_scalars=False,
...     export_provenance=False
... )
Source code in src/lacuna/io/bids.py
def export_bids_derivatives(
    subject_data: SubjectData,
    output_dir: str | Path,
    export_lesion_mask: bool = True,
    export_voxelmaps: bool = True,
    export_parcel_data: bool = True,
    export_connectivity: bool = True,
    export_scalars: bool = True,
    export_provenance: bool = True,
    overwrite: bool = False,
) -> Path:
    """
    Export SubjectData and all its analysis results to BIDS derivatives format.

    Exports the full spectrum of results stored in a SubjectData object:
    - Lesion mask as NIfTI
    - VoxelMaps (correlation maps, disconnection maps, etc.) as NIfTI
    - ParcelData (regional values) as TSV
    - ConnectivityMatrix as TSV
    - ScalarMetric and other scalars as JSON
    - Processing provenance as JSON

    Parameters
    ----------
    subject_data : SubjectData
        Processed lesion data with analysis results.
    output_dir : str or Path
        Root directory for derivatives (e.g., 'derivatives/lacuna-v0.1.0').
    export_lesion_mask : bool, default=True
        Save the original lesion mask as NIfTI file.
    export_voxelmaps : bool, default=True
        Save VoxelMap results (e.g., correlation maps, z-maps) as NIfTI files.
    export_parcel_data : bool, default=True
        Save ParcelData results (regional aggregations) as TSV files.
    export_connectivity : bool, default=True
        Save ConnectivityMatrix results as TSV files.
    export_scalars : bool, default=True
        Save ScalarMetric and other scalar results as JSON files.
    export_provenance : bool, default=True
        Save processing provenance as JSON.
    overwrite : bool, default=False
        Overwrite existing files.

    Returns
    -------
    Path
        Path to created subject derivatives directory.

    Raises
    ------
    FileExistsError
        If output files exist and overwrite=False.
    ValueError
        If subject_data has no subject_id in metadata.

    Examples
    --------
    >>> # Export all results
    >>> output_path = export_bids_derivatives(
    ...     subject_data,
    ...     'derivatives/lacuna-v0.1.0'
    ... )
    >>> print(f"Derivatives saved to: {output_path}")
    >>>
    >>> # Export only VoxelMaps (NIfTI files)
    >>> export_bids_derivatives(
    ...     subject_data,
    ...     'derivatives/lacuna-v0.1.0',
    ...     export_lesion_mask=False,
    ...     export_parcel_data=False,
    ...     export_connectivity=False,
    ...     export_scalars=False,
    ...     export_provenance=False
    ... )
    """
    import nibabel as nib

    from ..core.data_types import (
        ConnectivityMatrix,
        ScalarMetric,
        Tractogram,
        VoxelMap,
    )
    from ..core.data_types import (
        ParcelData as ParcelDataType,
    )

    output_dir = Path(output_dir)

    # Validate metadata
    if "subject_id" not in subject_data.metadata:
        raise ValueError("SubjectData metadata must contain 'subject_id' for BIDS export")

    subject_id = subject_data.metadata["subject_id"]
    session_id = subject_data.metadata.get("session_id")

    # Determine base filename
    if session_id:
        base_name = f"{subject_id}_{session_id}"
    else:
        base_name = subject_id

    # Create subject directory
    subject_dir = output_dir / subject_id
    if session_id:
        subject_dir = subject_dir / session_id

    # Create dataset_description.json if it doesn't exist
    desc_file = output_dir / "dataset_description.json"
    if not desc_file.exists():
        desc_file.parent.mkdir(parents=True, exist_ok=True)
        from .. import __version__

        dataset_description = {
            "Name": "Lacuna Derivatives",
            "BIDSVersion": "1.6.0",
            "GeneratedBy": [
                {
                    "Name": "lacuna",
                    "Version": __version__,
                    "Description": "Lesion network mapping and analysis toolkit",
                }
            ],
        }
        with open(desc_file, "w") as f:
            json.dump(dataset_description, f, indent=2)

    # Create anat/ directory for all derivatives (BIDS compliant)
    # All lesion-derived outputs go in anat/ per BIDS derivatives spec
    anat_dir = subject_dir / "anat"
    anat_dir.mkdir(parents=True, exist_ok=True)

    # Save lesion mask - use label entity per BIDS spec
    # Preserve original label from metadata if available (e.g., WMH, acuteinfarct, lacune)
    label = subject_data.metadata.get("label", "lesion")
    if export_lesion_mask:
        coord_space = subject_data.get_coordinate_space()
        mask_bf = BidsFilename(space=coord_space, suffix="mask")
        lesion_filename = f"{base_name}_label-{label}_{mask_bf}.nii.gz"
        lesion_path = anat_dir / lesion_filename

        if lesion_path.exists() and not overwrite:
            raise FileExistsError(
                f"Lesion mask already exists: {lesion_path}. Use overwrite=True to replace."
            )

        nib.save(subject_data.mask_img, lesion_path)

    # Save analysis results
    if subject_data.results:
        for _namespace, results_data in subject_data.results.items():
            if not isinstance(results_data, dict):
                continue

            for key, value in results_data.items():
                # VoxelMap -> NIfTI (goes to anat/ for spatial data)
                if isinstance(value, VoxelMap) and export_voxelmaps:
                    bf = BidsFilename.from_result_key(key, "map", namespace=_namespace)
                    if value.space:
                        bf.space = value.space
                    bids_key = str(bf)
                    export_voxelmap(
                        value,
                        anat_dir,
                        subject_id=subject_id,
                        session_id=session_id,
                        desc=bids_key,
                        label=label,
                        overwrite=overwrite,
                    )

                # ParcelData -> TSV (goes to anat/ for BIDS compliance)
                elif isinstance(value, ParcelDataType) and export_parcel_data:
                    bf = BidsFilename.from_result_key(key, "values", namespace=_namespace)
                    bids_key = str(bf)
                    _export_parcel_data(
                        value,
                        anat_dir,
                        subject_id=subject_id,
                        session_id=session_id,
                        desc=bids_key,
                        label=label,
                        overwrite=overwrite,
                    )

                # ConnectivityMatrix -> TSV (goes to anat/ for BIDS compliance)
                elif isinstance(value, ConnectivityMatrix) and export_connectivity:
                    bf = BidsFilename.from_result_key(key, "connmatrix", namespace=_namespace)
                    bids_key = str(bf)
                    export_connectivity_matrix(
                        value,
                        anat_dir,
                        subject_id=subject_id,
                        session_id=session_id,
                        desc=bids_key,
                        label=label,
                        overwrite=overwrite,
                    )

                # Tractogram -> .tck file (goes to anat/ for BIDS compliance)
                elif isinstance(value, Tractogram):
                    bf = BidsFilename.from_result_key(key, "tractogram", namespace=_namespace)
                    bids_key = str(bf)
                    suffix = value.tractogram_path.suffix or ".tck"
                    label_part = f"_label-{label}" if label else ""
                    tck_filename = f"{base_name}{label_part}_{bids_key}{suffix}"
                    tck_path = anat_dir / tck_filename

                    if not tck_path.exists() or overwrite:
                        try:
                            value.save(tck_path)
                        except FileNotFoundError:
                            pass  # Source file no longer exists and no in-memory data

                # ScalarMetric or other serializable -> JSON (goes to anat/ for BIDS compliance)
                elif export_scalars:
                    if isinstance(value, ScalarMetric):
                        data_to_save = value.get_data()
                    else:
                        data_to_save = value

                    try:
                        bf = BidsFilename.from_result_key(key, "metrics", namespace=_namespace)
                        bids_key = str(bf)
                        label_part = f"_label-{label}" if label else ""
                        results_filename = f"{base_name}{label_part}_{bids_key}.json"
                        results_path = anat_dir / results_filename

                        if results_path.exists() and not overwrite:
                            continue

                        with open(results_path, "w") as f:
                            json.dump(data_to_save, f, indent=2, default=str)
                    except (TypeError, ValueError):
                        # Skip non-serializable results
                        pass

    # Save provenance (goes to anat/ for BIDS compliance)
    if export_provenance and subject_data.provenance:
        prov_filename = f"{base_name}_desc-provenance.json"
        prov_path = anat_dir / prov_filename

        if prov_path.exists() and not overwrite:
            raise FileExistsError(
                f"Provenance file already exists: {prov_path}. Use overwrite=True to replace."
            )

        # Convert provenance to serializable format
        prov_data = []
        for step in subject_data.provenance:
            if hasattr(step, "to_dict"):
                prov_data.append(step.to_dict())
            elif isinstance(step, dict):
                prov_data.append(step)
            else:
                prov_data.append(str(step))

        with open(prov_path, "w") as f:
            json.dump(prov_data, f, indent=2, default=str)

    return subject_dir

export_provenance_to_json(mask_data, output_path, indent=2)

Export provenance data to JSON format.

Saves the complete processing history and metadata as a standalone JSON file for reproducibility and audit trails.

Parameters:

Name Type Description Default
mask_data SubjectData

SubjectData object with provenance data

required
output_path str or Path

Output JSON file path

required
indent int

JSON indentation for readability (0 for compact)

2

Returns:

Type Description
Path

Path to created JSON file

Raises:

Type Description
ValueError

If mask_data has no provenance data

Examples:

>>> from lacuna.io import export_provenance_to_json
>>>
>>> # Export provenance history
>>> export_provenance_to_json(result, "provenance.json")
>>>
>>> # Export compact JSON
>>> export_provenance_to_json(result, "prov.json", indent=0)
Notes

Provenance includes: - Source file paths - Processing steps (transformations, analyses) - Software versions - Timestamps - Parameters used for each operation

Source code in src/lacuna/io/export.py
def export_provenance_to_json(
    mask_data: SubjectData,
    output_path: str | Path,
    indent: int = 2,
) -> Path:
    """
    Export provenance data to JSON format.

    Saves the complete processing history and metadata as a standalone
    JSON file for reproducibility and audit trails.

    Parameters
    ----------
    mask_data : SubjectData
        SubjectData object with provenance data
    output_path : str or Path
        Output JSON file path
    indent : int, default=2
        JSON indentation for readability (0 for compact)

    Returns
    -------
    Path
        Path to created JSON file

    Raises
    ------
    ValueError
        If mask_data has no provenance data

    Examples
    --------
    >>> from lacuna.io import export_provenance_to_json
    >>>
    >>> # Export provenance history
    >>> export_provenance_to_json(result, "provenance.json")
    >>>
    >>> # Export compact JSON
    >>> export_provenance_to_json(result, "prov.json", indent=0)

    Notes
    -----
    Provenance includes:
    - Source file paths
    - Processing steps (transformations, analyses)
    - Software versions
    - Timestamps
    - Parameters used for each operation
    """
    output_path = Path(output_path)

    if not mask_data.provenance:
        raise ValueError(
            "SubjectData has no provenance data to export.\n"
            "Provenance is automatically tracked during analysis operations."
        )

    # Ensure parent directory exists
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Write provenance as JSON
    with open(output_path, "w") as f:
        json.dump(mask_data.provenance, f, indent=indent if indent > 0 else None)

    return output_path

export_results_to_csv(mask_data, output_path, analysis_name=None, include_metadata=True)

Export analysis results to CSV format.

Converts nested results dictionary to a flat CSV structure suitable for statistical analysis or visualization in external tools.

Parameters:

Name Type Description Default
mask_data SubjectData

SubjectData object with analysis results

required
output_path str or Path

Output CSV file path

required
analysis_name str

Specific analysis to export. If None, exports all results. Example: "RegionalDamage", "ParcelAggregation"

None
include_metadata bool

Include subject metadata (subject_id, session_id, etc.) as columns

True

Returns:

Type Description
Path

Path to created CSV file

Raises:

Type Description
ValueError

If mask_data has no results or specified analysis not found

Examples:

>>> from lacuna import SubjectData
>>> from lacuna.analysis import RegionalDamage
>>> from lacuna.io import export_results_to_csv
>>>
>>> lesion = SubjectData.from_nifti("lesion.nii.gz")
>>> analysis = RegionalDamage()
>>> result = analysis.run(lesion)
>>>
>>> # Export all results
>>> export_results_to_csv(result, "results.csv")
>>>
>>> # Export specific analysis
>>> export_results_to_csv(result, "damage.csv", analysis_name="RegionalDamage")
Notes
  • Results are flattened: nested dicts become columns with dot notation
  • Example: {"ParcelAggregation": {"region1": 0.5}} becomes columns "ParcelAggregation.region1" with value 0.5
  • Multiple analyses create multiple columns
  • Metadata columns (if included): subject_id, session_id, coordinate_space
Source code in src/lacuna/io/export.py
def export_results_to_csv(
    mask_data: SubjectData,
    output_path: str | Path,
    analysis_name: str | None = None,
    include_metadata: bool = True,
) -> Path:
    """
    Export analysis results to CSV format.

    Converts nested results dictionary to a flat CSV structure suitable
    for statistical analysis or visualization in external tools.

    Parameters
    ----------
    mask_data : SubjectData
        SubjectData object with analysis results
    output_path : str or Path
        Output CSV file path
    analysis_name : str, optional
        Specific analysis to export. If None, exports all results.
        Example: "RegionalDamage", "ParcelAggregation"
    include_metadata : bool, default=True
        Include subject metadata (subject_id, session_id, etc.) as columns

    Returns
    -------
    Path
        Path to created CSV file

    Raises
    ------
    ValueError
        If mask_data has no results or specified analysis not found

    Examples
    --------
    >>> from lacuna import SubjectData
    >>> from lacuna.analysis import RegionalDamage
    >>> from lacuna.io import export_results_to_csv
    >>>
    >>> lesion = SubjectData.from_nifti("lesion.nii.gz")
    >>> analysis = RegionalDamage()
    >>> result = analysis.run(lesion)
    >>>
    >>> # Export all results
    >>> export_results_to_csv(result, "results.csv")
    >>>
    >>> # Export specific analysis
    >>> export_results_to_csv(result, "damage.csv", analysis_name="RegionalDamage")

    Notes
    -----
    - Results are flattened: nested dicts become columns with dot notation
    - Example: {"ParcelAggregation": {"region1": 0.5}} becomes columns
      "ParcelAggregation.region1" with value 0.5
    - Multiple analyses create multiple columns
    - Metadata columns (if included): subject_id, session_id, coordinate_space
    """
    output_path = Path(output_path)

    if not mask_data.results:
        raise ValueError("SubjectData has no results to export")

    # Filter by analysis name if specified
    if analysis_name:
        if analysis_name not in mask_data.results:
            available = list(mask_data.results.keys())
            raise ValueError(
                f"Analysis '{analysis_name}' not found in results.\nAvailable analyses: {available}"
            )
        results_to_export = {analysis_name: mask_data.results[analysis_name]}
    else:
        results_to_export = mask_data.results

    # Flatten results to single row
    row_data = {}

    # Add metadata if requested
    if include_metadata:
        row_data["subject_id"] = mask_data.metadata.get("subject_id", "unknown")
        row_data["session_id"] = mask_data.metadata.get("session_id", "")
        row_data["coordinate_space"] = mask_data.get_coordinate_space()

    # Flatten nested results
    for analysis, results_dict in results_to_export.items():
        if isinstance(results_dict, dict):
            for key, value in results_dict.items():
                # Create column name: Analysis.key
                col_name = f"{analysis}.{key}"
                # Convert to scalar if possible
                if isinstance(value, (list, tuple)) and len(value) == 1:
                    row_data[col_name] = value[0]
                else:
                    row_data[col_name] = value
        else:
            # Non-dict result, store as-is
            row_data[analysis] = results_dict

    # Create DataFrame and save
    df = pd.DataFrame([row_data])
    df.to_csv(output_path, index=False)

    return output_path

export_results_to_json(mask_data, output_path, analysis_name=None, include_metadata=True, include_provenance=False, indent=2)

Export analysis results to JSON format.

Creates a JSON file with analysis results, optionally including metadata and provenance. Useful for web applications or further programmatic processing.

Parameters:

Name Type Description Default
mask_data SubjectData

SubjectData object with analysis results

required
output_path str or Path

Output JSON file path

required
analysis_name str

Specific analysis to export. If None, exports all results.

None
include_metadata bool

Include subject metadata in JSON

True
include_provenance bool

Include provenance data in JSON

False
indent int

JSON indentation for readability (0 for compact)

2

Returns:

Type Description
Path

Path to created JSON file

Raises:

Type Description
ValueError

If mask_data has no results or specified analysis not found

Examples:

>>> from lacuna.io import export_results_to_json
>>>
>>> # Export all results with metadata
>>> export_results_to_json(result, "results.json")
>>>
>>> # Export specific analysis with full provenance
>>> export_results_to_json(
...     result,
...     "damage_full.json",
...     analysis_name="RegionalDamage",
...     include_provenance=True
... )
>>>
>>> # Compact JSON for web APIs
>>> export_results_to_json(result, "api_response.json", indent=0)
Notes

JSON structure: { "metadata": {...}, # If include_metadata=True "results": {...}, # Analysis results "provenance": {...} # If include_provenance=True }

Source code in src/lacuna/io/export.py
def export_results_to_json(
    mask_data: SubjectData,
    output_path: str | Path,
    analysis_name: str | None = None,
    include_metadata: bool = True,
    include_provenance: bool = False,
    indent: int = 2,
) -> Path:
    """
    Export analysis results to JSON format.

    Creates a JSON file with analysis results, optionally including
    metadata and provenance. Useful for web applications or further
    programmatic processing.

    Parameters
    ----------
    mask_data : SubjectData
        SubjectData object with analysis results
    output_path : str or Path
        Output JSON file path
    analysis_name : str, optional
        Specific analysis to export. If None, exports all results.
    include_metadata : bool, default=True
        Include subject metadata in JSON
    include_provenance : bool, default=False
        Include provenance data in JSON
    indent : int, default=2
        JSON indentation for readability (0 for compact)

    Returns
    -------
    Path
        Path to created JSON file

    Raises
    ------
    ValueError
        If mask_data has no results or specified analysis not found

    Examples
    --------
    >>> from lacuna.io import export_results_to_json
    >>>
    >>> # Export all results with metadata
    >>> export_results_to_json(result, "results.json")
    >>>
    >>> # Export specific analysis with full provenance
    >>> export_results_to_json(
    ...     result,
    ...     "damage_full.json",
    ...     analysis_name="RegionalDamage",
    ...     include_provenance=True
    ... )
    >>>
    >>> # Compact JSON for web APIs
    >>> export_results_to_json(result, "api_response.json", indent=0)

    Notes
    -----
    JSON structure:
    {
        "metadata": {...},          # If include_metadata=True
        "results": {...},           # Analysis results
        "provenance": {...}         # If include_provenance=True
    }
    """
    output_path = Path(output_path)

    if not mask_data.results:
        raise ValueError("SubjectData has no results to export")

    # Build export data structure
    export_data: dict[str, Any] = {}

    # Add metadata if requested
    if include_metadata:
        export_data["metadata"] = dict(mask_data.metadata)
        export_data["metadata"]["coordinate_space"] = mask_data.get_coordinate_space()

    # Add results
    if analysis_name:
        if analysis_name not in mask_data.results:
            available = list(mask_data.results.keys())
            raise ValueError(
                f"Analysis '{analysis_name}' not found in results.\nAvailable analyses: {available}"
            )
        export_data["results"] = {analysis_name: mask_data.results[analysis_name]}
    else:
        export_data["results"] = mask_data.results

    # Add provenance if requested
    if include_provenance and mask_data.provenance:
        export_data["provenance"] = mask_data.provenance

    # Ensure parent directory exists
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Write JSON
    with open(output_path, "w") as f:
        json.dump(export_data, f, indent=indent if indent > 0 else None)

    return output_path

export_results_to_tsv(mask_data, output_path, analysis_name=None, include_metadata=True)

Export analysis results to TSV (tab-separated values) format.

Identical to export_results_to_csv but uses tab delimiter. TSV is preferred in neuroimaging for BIDS compatibility.

Parameters:

Name Type Description Default
mask_data SubjectData

SubjectData object with analysis results

required
output_path str or Path

Output TSV file path

required
analysis_name str

Specific analysis to export. If None, exports all results.

None
include_metadata bool

Include subject metadata as columns

True

Returns:

Type Description
Path

Path to created TSV file

Raises:

Type Description
ValueError

If mask_data has no results or specified analysis not found

Examples:

>>> from lacuna.io import export_results_to_tsv
>>>
>>> # Export to TSV (BIDS-compatible format)
>>> export_results_to_tsv(result, "results.tsv")
>>>
>>> # Export specific analysis without metadata
>>> export_results_to_tsv(
...     result,
...     "atlas_only.tsv",
...     analysis_name="ParcelAggregation",
...     include_metadata=False
... )
See Also

export_results_to_csv : CSV export (identical but comma-delimited)

Source code in src/lacuna/io/export.py
def export_results_to_tsv(
    mask_data: SubjectData,
    output_path: str | Path,
    analysis_name: str | None = None,
    include_metadata: bool = True,
) -> Path:
    """
    Export analysis results to TSV (tab-separated values) format.

    Identical to export_results_to_csv but uses tab delimiter.
    TSV is preferred in neuroimaging for BIDS compatibility.

    Parameters
    ----------
    mask_data : SubjectData
        SubjectData object with analysis results
    output_path : str or Path
        Output TSV file path
    analysis_name : str, optional
        Specific analysis to export. If None, exports all results.
    include_metadata : bool, default=True
        Include subject metadata as columns

    Returns
    -------
    Path
        Path to created TSV file

    Raises
    ------
    ValueError
        If mask_data has no results or specified analysis not found

    Examples
    --------
    >>> from lacuna.io import export_results_to_tsv
    >>>
    >>> # Export to TSV (BIDS-compatible format)
    >>> export_results_to_tsv(result, "results.tsv")
    >>>
    >>> # Export specific analysis without metadata
    >>> export_results_to_tsv(
    ...     result,
    ...     "atlas_only.tsv",
    ...     analysis_name="ParcelAggregation",
    ...     include_metadata=False
    ... )

    See Also
    --------
    export_results_to_csv : CSV export (identical but comma-delimited)
    """
    output_path = Path(output_path)

    if not mask_data.results:
        raise ValueError("SubjectData has no results to export")

    # Filter by analysis name if specified
    if analysis_name:
        if analysis_name not in mask_data.results:
            available = list(mask_data.results.keys())
            raise ValueError(
                f"Analysis '{analysis_name}' not found in results.\nAvailable analyses: {available}"
            )
        results_to_export = {analysis_name: mask_data.results[analysis_name]}
    else:
        results_to_export = mask_data.results

    # Flatten results to single row
    row_data = {}

    # Add metadata if requested
    if include_metadata:
        row_data["subject_id"] = mask_data.metadata.get("subject_id", "unknown")
        row_data["session_id"] = mask_data.metadata.get("session_id", "")
        row_data["coordinate_space"] = mask_data.get_coordinate_space()

    # Flatten nested results
    for analysis, results_dict in results_to_export.items():
        if isinstance(results_dict, dict):
            for key, value in results_dict.items():
                col_name = f"{analysis}.{key}"
                if isinstance(value, (list, tuple)) and len(value) == 1:
                    row_data[col_name] = value[0]
                else:
                    row_data[col_name] = value
        else:
            row_data[analysis] = results_dict

    # Create DataFrame and save with tab delimiter
    df = pd.DataFrame([row_data])
    df.to_csv(output_path, sep="\t", index=False)

    return output_path

fetch_connectome(name, output_dir, **kwargs)

Generic fetch function that dispatches to specific connectome fetchers.

Parameters:

Name Type Description Default
name str

Connectome name ('gsp1000', 'dtor985').

required
output_dir str or Path

Directory for output files.

required
**kwargs

Additional arguments passed to specific fetch function.

{}

Returns:

Type Description
FetchResult

Result from the specific fetch operation.

Raises:

Type Description
ValueError

If connectome name is not recognized.

Examples:

>>> from lacuna.io import fetch_connectome
>>> result = fetch_connectome("gsp1000", "/data", api_key="key", batches=50)
Source code in src/lacuna/io/fetch.py
def fetch_connectome(
    name: str,
    output_dir: str | Path,
    **kwargs,
) -> FetchResult:
    """
    Generic fetch function that dispatches to specific connectome fetchers.

    Parameters
    ----------
    name : str
        Connectome name ('gsp1000', 'dtor985').
    output_dir : str or Path
        Directory for output files.
    **kwargs
        Additional arguments passed to specific fetch function.

    Returns
    -------
    FetchResult
        Result from the specific fetch operation.

    Raises
    ------
    ValueError
        If connectome name is not recognized.

    Examples
    --------
    >>> from lacuna.io import fetch_connectome
    >>> result = fetch_connectome("gsp1000", "/data", api_key="key", batches=50)
    """
    from .downloaders import CONNECTOME_SOURCES

    name = name.lower()

    if name not in CONNECTOME_SOURCES:
        available = ", ".join(CONNECTOME_SOURCES.keys())
        raise ValueError(f"Unknown connectome '{name}'. Available: {available}")

    if name == "gsp1000":
        return fetch_gsp1000(output_dir, **kwargs)
    elif name == "dtor985":
        return fetch_dtor985(output_dir, **kwargs)
    elif name == "hcp1065":
        return fetch_hcp1065(output_dir, **kwargs)
    else:
        raise ValueError(f"No fetch implementation for '{name}'")

fetch_dtor985(output_dir, *, api_key=None, keep_original=True, register=True, register_name='dTOR985', force=False, progress_callback=None, verbose=False)

Download, convert, and register the dTOR985 structural tractogram.

Downloads the Diffusion Tensor Imaging Open Resource 985-subject tractogram from Figshare in TrackVis (.trk) format, converts to MRtrix3 (.tck) format, and optionally registers for use with StructuralNetworkMapping.

Parameters:

Name Type Description Default
output_dir str or Path

Directory for output .tck file.

required
api_key str

Figshare API key for authenticated downloads. If not provided, uses FIGSHARE_API_KEY environment variable. Get one from https://figshare.com/account/applications.

None
keep_original bool

Keep original .trk file after conversion.

True
register bool

Automatically register tractogram after processing.

True
register_name str

Name for tractogram registration.

"dTOR985"
force bool

Overwrite existing files and registrations.

False
progress_callback callable

Function called with FetchProgress updates during operation.

None
verbose bool

Print informational messages.

False

Returns:

Type Description
FetchResult

Result containing output path, registration status, and timing.

Raises:

Type Description
DownloadError

If download fails or API key is missing.

ProcessingError

If .trk to .tck conversion fails.

Examples:

>>> from lacuna.io import fetch_dtor985
>>> result = fetch_dtor985("/data/connectomes/dtor985", api_key="YOUR_TOKEN")
>>> print(result.output_files[0])  # Path to .tck file
Source code in src/lacuna/io/fetch.py
def fetch_dtor985(
    output_dir: str | Path,
    *,
    api_key: str | None = None,
    keep_original: bool = True,
    register: bool = True,
    register_name: str = "dTOR985",
    force: bool = False,
    progress_callback: Callable[[FetchProgress], None] | None = None,
    verbose: bool = False,
) -> FetchResult:
    """
    Download, convert, and register the dTOR985 structural tractogram.

    Downloads the Diffusion Tensor Imaging Open Resource 985-subject tractogram
    from Figshare in TrackVis (.trk) format, converts to MRtrix3 (.tck) format,
    and optionally registers for use with StructuralNetworkMapping.

    Parameters
    ----------
    output_dir : str or Path
        Directory for output .tck file.
    api_key : str, optional
        Figshare API key for authenticated downloads. If not provided,
        uses FIGSHARE_API_KEY environment variable. Get one from
        https://figshare.com/account/applications.
    keep_original : bool, default=True
        Keep original .trk file after conversion.
    register : bool, default=True
        Automatically register tractogram after processing.
    register_name : str, default="dTOR985"
        Name for tractogram registration.
    force : bool, default=False
        Overwrite existing files and registrations.
    progress_callback : callable, optional
        Function called with FetchProgress updates during operation.
    verbose : bool, default=False
        Print informational messages.

    Returns
    -------
    FetchResult
        Result containing output path, registration status, and timing.

    Raises
    ------
    DownloadError
        If download fails or API key is missing.
    ProcessingError
        If .trk to .tck conversion fails.

    Examples
    --------
    >>> from lacuna.io import fetch_dtor985
    >>> result = fetch_dtor985("/data/connectomes/dtor985", api_key="YOUR_TOKEN")
    >>> print(result.output_files[0])  # Path to .tck file
    """
    from ..core.exceptions import DownloadError, ProcessingError
    from .convert import trk_to_tck
    from .downloaders import CONNECTOME_SOURCES
    from .downloaders.figshare import FigshareDownloader

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    start_time = time.time()
    download_time = 0.0
    processing_time = 0.0
    warn_list: list[str] = []

    source = CONNECTOME_SOURCES["dtor985"]

    # Check if .tck already exists
    tck_path = output_dir / f"{source.name}.tck"
    trk_path = output_dir / f"{source.name}.trk"

    if tck_path.exists() and not force:
        if verbose:
            print(f"Using existing .tck file: {tck_path}")
        warn_list.append(f"Using existing .tck file: {tck_path}")

        registered = _register_dtor985(
            register, register_name, source, tck_path, progress_callback, warn_list
        )

        return FetchResult(
            success=True,
            connectome_name="dtor985",
            output_dir=output_dir,
            output_files=[tck_path],
            registered=registered,
            register_name=register_name if registered else None,
            duration_seconds=time.time() - start_time,
            download_time_seconds=0.0,
            processing_time_seconds=0.0,
            warnings=warn_list,
        )

    try:
        # Phase 1: Download
        download_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="download",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Downloading dTOR985 tractogram...",
                )
            )

        downloader = FigshareDownloader(source, api_key=api_key)
        downloaded_files = downloader.download(
            output_path=output_dir,
            progress_callback=progress_callback,
        )

        if not downloaded_files:
            raise DownloadError(url=source.download_url or "", reason="No files downloaded")

        trk_path = downloaded_files[0]
        download_time = time.time() - download_start

        # Phase 2: Convert to .tck
        processing_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="processing",
                    current_file=trk_path.name,
                    files_completed=0,
                    files_total=1,
                    message="Converting to .tck format...",
                )
            )

        tck_path = trk_path.with_suffix(".tck")

        if tck_path.exists() and not force:
            if verbose:
                print(f"Using existing .tck file: {tck_path}")
            warn_list.append(f"Using existing .tck file: {tck_path}")
        else:
            tck_path = trk_to_tck(trk_path, tck_path)

        if not keep_original and trk_path.exists():
            trk_path.unlink()

        processing_time = time.time() - processing_start

        # Phase 3: Registration
        registered = _register_dtor985(
            register, register_name, source, tck_path, progress_callback, warn_list
        )

        duration = time.time() - start_time

        output_files = [tck_path]
        if keep_original and trk_path.exists():
            output_files.insert(0, trk_path)

        return FetchResult(
            success=True,
            connectome_name="dtor985",
            output_dir=output_dir,
            output_files=output_files,
            registered=registered,
            register_name=register_name if registered else None,
            duration_seconds=duration,
            download_time_seconds=download_time,
            processing_time_seconds=processing_time,
            warnings=warn_list,
        )

    except (DownloadError, ProcessingError):
        raise
    except Exception as e:
        raise ProcessingError(operation="fetch_dtor985", reason=str(e)) from e

fetch_gsp1000(output_dir, *, api_key=None, batches=10, test_mode=False, skip_checksum=False, register=True, register_name='GSP1000', force=False, progress_callback=None, verbose=False)

Download, process, and register the GSP1000 functional connectome.

Downloads the Brain Genomics Superstruct Project 1000-subject resting-state fMRI dataset from Harvard Dataverse, converts to HDF5 batch format, and optionally registers for use with FunctionalNetworkMapping.

Parameters:

Name Type Description Default
output_dir str or Path

Directory for output HDF5 batch files.

required
api_key str

Harvard Dataverse API key. If not provided, looks for DATAVERSE_API_KEY environment variable.

None
batches int

Number of HDF5 batch files to create. More batches = lower RAM usage. Recommendations: 4GB RAM → 100, 8GB → 50, 16GB → 25, 32GB+ → 10.

10
test_mode bool

If True, downloads only 1 tarball (~2GB) to test the full pipeline.

False
skip_checksum bool

Skip checksum verification. Use when Dataverse metadata is outdated.

False
register bool

Automatically register connectome after processing.

True
register_name str

Name for connectome registration.

"GSP1000"
force bool

Overwrite existing files and registrations.

False
progress_callback callable

Function called with FetchProgress updates during operation.

None
verbose bool

Print informational messages.

False

Returns:

Type Description
FetchResult

Result containing output paths, registration status, and timing.

Raises:

Type Description
AuthenticationError

If API key is missing or invalid.

DownloadError

If download fails after retries.

ProcessingError

If NIfTI to HDF5 conversion fails.

Examples:

>>> from lacuna.io import fetch_gsp1000
>>> result = fetch_gsp1000(
...     output_dir="/data/connectomes/gsp1000",
...     api_key="your-dataverse-api-key",
...     batches=50
... )
>>> print(result.summary())
Source code in src/lacuna/io/fetch.py
def fetch_gsp1000(
    output_dir: str | Path,
    *,
    api_key: str | None = None,
    batches: int = 10,
    test_mode: bool = False,
    skip_checksum: bool = False,
    register: bool = True,
    register_name: str = "GSP1000",
    force: bool = False,
    progress_callback: Callable[[FetchProgress], None] | None = None,
    verbose: bool = False,
) -> FetchResult:
    """
    Download, process, and register the GSP1000 functional connectome.

    Downloads the Brain Genomics Superstruct Project 1000-subject resting-state
    fMRI dataset from Harvard Dataverse, converts to HDF5 batch format, and
    optionally registers for use with FunctionalNetworkMapping.

    Parameters
    ----------
    output_dir : str or Path
        Directory for output HDF5 batch files.
    api_key : str, optional
        Harvard Dataverse API key. If not provided, looks for DATAVERSE_API_KEY
        environment variable.
    batches : int, default=10
        Number of HDF5 batch files to create. More batches = lower RAM usage.
        Recommendations: 4GB RAM → 100, 8GB → 50, 16GB → 25, 32GB+ → 10.
    test_mode : bool, default=False
        If True, downloads only 1 tarball (~2GB) to test the full pipeline.
    skip_checksum : bool, default=False
        Skip checksum verification. Use when Dataverse metadata is outdated.
    register : bool, default=True
        Automatically register connectome after processing.
    register_name : str, default="GSP1000"
        Name for connectome registration.
    force : bool, default=False
        Overwrite existing files and registrations.
    progress_callback : callable, optional
        Function called with FetchProgress updates during operation.
    verbose : bool, default=False
        Print informational messages.

    Returns
    -------
    FetchResult
        Result containing output paths, registration status, and timing.

    Raises
    ------
    AuthenticationError
        If API key is missing or invalid.
    DownloadError
        If download fails after retries.
    ProcessingError
        If NIfTI to HDF5 conversion fails.

    Examples
    --------
    >>> from lacuna.io import fetch_gsp1000
    >>> result = fetch_gsp1000(
    ...     output_dir="/data/connectomes/gsp1000",
    ...     api_key="your-dataverse-api-key",
    ...     batches=50
    ... )
    >>> print(result.summary())
    """
    from ..core.exceptions import AuthenticationError, DownloadError, ProcessingError
    from .convert import gsp1000_to_hdf5
    from .downloaders import CONNECTOME_SOURCES
    from .downloaders.dataverse import DataverseDownloader

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    start_time = time.time()
    download_time = 0.0
    processing_time = 0.0
    warn_list: list[str] = []

    source = CONNECTOME_SOURCES["gsp1000"]

    # Create directories
    raw_dir = output_dir / "raw"
    processed_dir = output_dir / "processed"
    raw_dir.mkdir(parents=True, exist_ok=True)
    processed_dir.mkdir(parents=True, exist_ok=True)

    # Check if processed files already exist
    stale_test_data = False
    existing_hdf5 = list(processed_dir.glob("*.h5")) + list(processed_dir.glob("*.hdf5"))
    if existing_hdf5 and not force:
        # Detect stale test-mode data: single chunk with ≤10 subjects
        stale_test_data = False
        if not test_mode and len(existing_hdf5) == 1:
            try:
                import h5py

                with h5py.File(existing_hdf5[0], "r") as hf:
                    if hf.attrs.get("n_subjects", 0) <= 10:
                        stale_test_data = True
            except Exception:
                pass

        if stale_test_data:
            if verbose:
                print(
                    "Existing HDF5 appears to be from test mode " "— overwriting with full dataset"
                )
            warn_list.append("Overwriting stale test-mode HDF5 data")
        else:
            if verbose:
                print(f"Using existing HDF5 files: {processed_dir} ({len(existing_hdf5)} files)")
            warn_list.append(f"Using existing HDF5 files: {processed_dir}")

            # Skip to registration phase
            registered = _register_gsp1000(
                register, register_name, source, processed_dir, progress_callback, warn_list
            )

            return FetchResult(
                success=True,
                connectome_name="gsp1000",
                output_dir=processed_dir,
                output_files=existing_hdf5,
                registered=registered,
                register_name=register_name if registered else None,
                duration_seconds=time.time() - start_time,
                download_time_seconds=0.0,
                processing_time_seconds=0.0,
                warnings=warn_list,
            )

    try:
        # Phase 1: Download
        download_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="download",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Initializing download...",
                )
            )

        downloader = DataverseDownloader(source, api_key=api_key)
        downloader.download(
            output_path=raw_dir,
            progress_callback=progress_callback,
            test_mode=test_mode,
            skip_checksum=skip_checksum,
        )

        download_time = time.time() - download_start

        # Phase 2: Extract tarballs
        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="processing",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Extracting tarballs...",
                )
            )

        import tarfile

        tar_files = list(raw_dir.glob("*.tar"))
        for tar_path in tar_files:
            with tarfile.open(tar_path, "r") as tar:
                tar.extractall(path=raw_dir)

        # Phase 3: Convert to HDF5
        processing_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="processing",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Converting to HDF5 format...",
                )
            )

        if test_mode:
            subjects_per_chunk = 10
            max_subjects = 10
            warn_list.append("Test mode: using first 10 subjects in single chunk")
        else:
            subjects_per_chunk = max(1, 1000 // batches)
            max_subjects = None

        # Find brain mask
        mask_path = _find_brain_mask(raw_dir)

        # Run conversion (overwrite if force or stale test-mode data detected)
        output_files = gsp1000_to_hdf5(
            gsp_dir=raw_dir,
            mask_path=mask_path,
            output_dir=processed_dir,
            subjects_per_chunk=subjects_per_chunk,
            max_subjects=max_subjects,
            overwrite=force or stale_test_data,
        )

        processing_time = time.time() - processing_start

        # Phase 4: Registration
        registered = _register_gsp1000(
            register, register_name, source, processed_dir, progress_callback, warn_list
        )

        duration = time.time() - start_time

        return FetchResult(
            success=True,
            connectome_name="gsp1000",
            output_dir=processed_dir,
            output_files=output_files,
            registered=registered,
            register_name=register_name if registered else None,
            duration_seconds=duration,
            download_time_seconds=download_time,
            processing_time_seconds=processing_time,
            warnings=warn_list,
        )

    except (AuthenticationError, DownloadError, ProcessingError):
        raise
    except Exception as e:
        raise ProcessingError(operation="fetch_gsp1000", reason=str(e)) from e

fetch_hcp1065(output_dir, *, keep_original=True, register=True, register_name='HCP1065', force=False, progress_callback=None, verbose=False)

Download, merge, and register the HCP1065 structural tractogram.

Downloads the Human Connectome Project 1065-subject averaged tractography atlas from GitHub Releases as a zip of TrackVis (.trk) files, merges all tract files (excluding cranial nerves) into a single MRtrix3 (.tck) file, and optionally registers for use with StructuralNetworkMapping.

Parameters:

Name Type Description Default
output_dir str or Path

Directory for output .tck file.

required
keep_original bool

Keep original .zip file and extracted tracts after merging.

True
register bool

Automatically register tractogram after processing.

True
register_name str

Name for tractogram registration.

"HCP1065"
force bool

Overwrite existing files and registrations.

False
progress_callback callable

Function called with FetchProgress updates during operation.

None
verbose bool

Print informational messages.

False

Returns:

Type Description
FetchResult

Result containing output path, registration status, and timing.

Raises:

Type Description
DownloadError

If download fails.

ProcessingError

If extraction or merging fails.

Examples:

>>> from lacuna.io import fetch_hcp1065
>>> result = fetch_hcp1065("/data/connectomes/hcp1065")
>>> print(result.output_files[0])  # Path to .tck file
Source code in src/lacuna/io/fetch.py
def fetch_hcp1065(
    output_dir: str | Path,
    *,
    keep_original: bool = True,
    register: bool = True,
    register_name: str = "HCP1065",
    force: bool = False,
    progress_callback: Callable[[FetchProgress], None] | None = None,
    verbose: bool = False,
) -> FetchResult:
    """
    Download, merge, and register the HCP1065 structural tractogram.

    Downloads the Human Connectome Project 1065-subject averaged tractography
    atlas from GitHub Releases as a zip of TrackVis (.trk) files, merges all
    tract files (excluding cranial nerves) into a single MRtrix3 (.tck) file,
    and optionally registers for use with StructuralNetworkMapping.

    Parameters
    ----------
    output_dir : str or Path
        Directory for output .tck file.
    keep_original : bool, default=True
        Keep original .zip file and extracted tracts after merging.
    register : bool, default=True
        Automatically register tractogram after processing.
    register_name : str, default="HCP1065"
        Name for tractogram registration.
    force : bool, default=False
        Overwrite existing files and registrations.
    progress_callback : callable, optional
        Function called with FetchProgress updates during operation.
    verbose : bool, default=False
        Print informational messages.

    Returns
    -------
    FetchResult
        Result containing output path, registration status, and timing.

    Raises
    ------
    DownloadError
        If download fails.
    ProcessingError
        If extraction or merging fails.

    Examples
    --------
    >>> from lacuna.io import fetch_hcp1065
    >>> result = fetch_hcp1065("/data/connectomes/hcp1065")
    >>> print(result.output_files[0])  # Path to .tck file
    """
    from ..core.exceptions import DownloadError, ProcessingError
    from .convert import merge_trk_to_tck
    from .downloaders import CONNECTOME_SOURCES
    from .downloaders.github import GithubReleaseDownloader

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    start_time = time.time()
    download_time = 0.0
    processing_time = 0.0
    warn_list: list[str] = []

    source = CONNECTOME_SOURCES["hcp1065"]

    # Check if .tck already exists
    tck_path = output_dir / f"{source.name}.tck"

    if tck_path.exists() and not force:
        if verbose:
            print(f"Using existing .tck file: {tck_path}")
        warn_list.append(f"Using existing .tck file: {tck_path}")

        registered = _register_hcp1065(
            register, register_name, source, tck_path, progress_callback, warn_list
        )

        return FetchResult(
            success=True,
            connectome_name="hcp1065",
            output_dir=output_dir,
            output_files=[tck_path],
            registered=registered,
            register_name=register_name if registered else None,
            duration_seconds=time.time() - start_time,
            download_time_seconds=0.0,
            processing_time_seconds=0.0,
            warnings=warn_list,
        )

    try:
        # Phase 1: Download zip
        download_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="download",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Downloading HCP1065 tractography atlas...",
                )
            )

        downloader = GithubReleaseDownloader(source)
        downloaded_files = downloader.download(
            output_path=output_dir,
            progress_callback=progress_callback,
        )

        if not downloaded_files:
            raise DownloadError(url=source.download_url or "", reason="No files downloaded")

        zip_path = downloaded_files[0]
        download_time = time.time() - download_start

        # Phase 2: Extract zip
        processing_start = time.time()

        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="processing",
                    current_file=zip_path.name,
                    files_completed=0,
                    files_total=1,
                    message="Extracting tract files...",
                )
            )

        import zipfile

        extract_dir = output_dir / "hcp1065_tracts"
        if not extract_dir.exists() or not any(extract_dir.iterdir()) or force:
            extract_dir.mkdir(parents=True, exist_ok=True)
            with zipfile.ZipFile(zip_path, "r") as zf:
                zf.extractall(extract_dir)

        # Phase 3: Merge .trk files to single .tck
        if progress_callback:
            progress_callback(
                FetchProgress(
                    phase="processing",
                    current_file="",
                    files_completed=0,
                    files_total=1,
                    message="Merging tract files to .tck format...",
                )
            )

        if tck_path.exists() and not force:
            if verbose:
                print(f"Using existing .tck file: {tck_path}")
            warn_list.append(f"Using existing .tck file: {tck_path}")
        else:
            tck_path = merge_trk_to_tck(
                source_dir=extract_dir,
                output_path=tck_path,
                overwrite=force,
            )

        # Cleanup originals if requested
        if not keep_original:
            import shutil

            if zip_path.exists():
                zip_path.unlink()
            if extract_dir.exists():
                shutil.rmtree(extract_dir)

        processing_time = time.time() - processing_start

        # Phase 4: Registration
        registered = _register_hcp1065(
            register, register_name, source, tck_path, progress_callback, warn_list
        )

        duration = time.time() - start_time

        output_files = [tck_path]
        if keep_original and zip_path.exists():
            output_files.insert(0, zip_path)

        return FetchResult(
            success=True,
            connectome_name="hcp1065",
            output_dir=output_dir,
            output_files=output_files,
            registered=registered,
            register_name=register_name if registered else None,
            duration_seconds=duration,
            download_time_seconds=download_time,
            processing_time_seconds=processing_time,
            warnings=warn_list,
        )

    except (DownloadError, ProcessingError):
        raise
    except Exception as e:
        raise ProcessingError(operation="fetch_hcp1065", reason=str(e)) from e

get_connectome_path(name_or_path)

Resolve a connectome name or path to its file location.

For registered connectomes, looks up path in registry. For paths, validates existence.

Parameters:

Name Type Description Default
name_or_path str

Either a registered connectome name (e.g., "GSP1000") or a direct path to .h5 file or directory.

required

Returns:

Type Description
Path

Resolved path to connectome data.

Raises:

Type Description
FileNotFoundError

If connectome cannot be resolved.

Examples:

>>> path = get_connectome_path("GSP1000")  # Registered name
>>> path = get_connectome_path("/data/my_connectome.h5")  # Direct path
Source code in src/lacuna/io/fetch.py
def get_connectome_path(name_or_path: str) -> Path:
    """
    Resolve a connectome name or path to its file location.

    For registered connectomes, looks up path in registry.
    For paths, validates existence.

    Parameters
    ----------
    name_or_path : str
        Either a registered connectome name (e.g., "GSP1000") or
        a direct path to .h5 file or directory.

    Returns
    -------
    Path
        Resolved path to connectome data.

    Raises
    ------
    FileNotFoundError
        If connectome cannot be resolved.

    Examples
    --------
    >>> path = get_connectome_path("GSP1000")  # Registered name
    >>> path = get_connectome_path("/data/my_connectome.h5")  # Direct path
    """
    # Check if it's a path
    path = Path(name_or_path)
    if path.exists():
        return path

    # Try looking up in registry
    try:
        from ..assets.connectomes import get_functional_connectome

        return get_functional_connectome(name_or_path).data_path
    except (ImportError, KeyError, AttributeError):
        pass

    # Check cache directory
    cache_dir = get_data_dir() / "connectomes"
    candidates = [
        cache_dir / name_or_path,
        cache_dir / name_or_path.lower(),
        cache_dir / f"{name_or_path}.h5",
        cache_dir / f"{name_or_path.lower()}.h5",
    ]

    for candidate in candidates:
        if candidate.exists():
            return candidate

    raise FileNotFoundError(
        f"Connectome '{name_or_path}' not found.\n"
        "Options:\n"
        "  - Provide a direct path to an existing .h5 file or directory\n"
        "  - Register a connectome using lacuna.assets.connectomes\n"
        "  - Download using: lacuna.io.fetch_gsp1000() or fetch_dtor985()\n\n"
        "Quick start:\n"
        "1. Get API key from https://dataverse.harvard.edu/\n"
        "2. Run:\n"
        "   lacuna fetch gsp1000 /path/to/output --api-key YOUR_KEY\n\n"
        "Or in Python:\n"
        "   from lacuna.io import fetch_gsp1000\n"
        "   fetch_gsp1000('/path/to/output', api_key='YOUR_KEY')"
    )

get_data_dir()

Get the data cache directory following XDG Base Directory specification.

Priority: 1. LACUNA_DATA_DIR environment variable (explicit user choice) 2. XDG_CACHE_HOME/lacuna (XDG standard) 3. ~/.cache/lacuna (fallback)

Returns:

Type Description
Path

Absolute path to data cache directory

Examples:

>>> data_dir = get_data_dir()
>>> print(data_dir)
PosixPath('/home/user/.cache/lacuna')
>>> import os
>>> os.environ['LACUNA_DATA_DIR'] = '/mnt/nvme/lacuna_data'
>>> data_dir = get_data_dir()
>>> print(data_dir)
PosixPath('/mnt/nvme/lacuna_data')
Source code in src/lacuna/io/fetch.py
def get_data_dir() -> Path:
    """
    Get the data cache directory following XDG Base Directory specification.

    Priority:
    1. LACUNA_DATA_DIR environment variable (explicit user choice)
    2. XDG_CACHE_HOME/lacuna (XDG standard)
    3. ~/.cache/lacuna (fallback)

    Returns
    -------
    Path
        Absolute path to data cache directory

    Examples
    --------
    >>> data_dir = get_data_dir()
    >>> print(data_dir)
    PosixPath('/home/user/.cache/lacuna')

    >>> import os
    >>> os.environ['LACUNA_DATA_DIR'] = '/mnt/nvme/lacuna_data'
    >>> data_dir = get_data_dir()
    >>> print(data_dir)
    PosixPath('/mnt/nvme/lacuna_data')
    """
    if env_dir := os.getenv("LACUNA_DATA_DIR"):
        return Path(env_dir).expanduser().resolve()

    if xdg_cache := os.getenv("XDG_CACHE_HOME"):
        return Path(xdg_cache) / "lacuna"

    return Path.home() / ".cache" / "lacuna"

get_fetch_status(name)

Get the current status of a connectome (downloaded, processed, registered).

Parameters:

Name Type Description Default
name str

Connectome name ('gsp1000', 'dtor985').

required

Returns:

Type Description
dict

Status information including: - downloaded: bool - processed: bool - registered: bool - location: Path | None - size_bytes: int | None

Source code in src/lacuna/io/fetch.py
def get_fetch_status(name: str) -> dict:
    """
    Get the current status of a connectome (downloaded, processed, registered).

    Parameters
    ----------
    name : str
        Connectome name ('gsp1000', 'dtor985').

    Returns
    -------
    dict
        Status information including:
        - downloaded: bool
        - processed: bool
        - registered: bool
        - location: Path | None
        - size_bytes: int | None
    """
    from .downloaders import CONNECTOME_SOURCES

    name = name.lower()
    if name not in CONNECTOME_SOURCES:
        raise ValueError(f"Unknown connectome '{name}'")

    # Check cache directory
    cache_dir = get_data_dir() / "connectomes" / name
    processed_dir = cache_dir / "processed"

    downloaded = cache_dir.exists() and any(cache_dir.iterdir())
    processed = processed_dir.exists() and any(processed_dir.iterdir())

    # Calculate size if exists
    size_bytes = None
    location = None
    if processed:
        location = processed_dir
        size_bytes = sum(f.stat().st_size for f in processed_dir.rglob("*") if f.is_file())
    elif downloaded:
        location = cache_dir
        size_bytes = sum(f.stat().st_size for f in cache_dir.rglob("*") if f.is_file())

    return {
        "downloaded": downloaded,
        "processed": processed,
        "registered": False,  # TODO: Check actual registry
        "location": location,
        "size_bytes": size_bytes,
    }

gsp1000_to_hdf5(gsp_dir, mask_path, output_dir, subjects_per_chunk=10, *, max_subjects=None, overwrite=False)

Convert GSP1000 functional data to Lacuna-compatible HDF5 chunks.

Scans a directory of functional NIfTI files from the GSP1000 dataset, extracts time-series from within a brain mask, and saves the data into multiple smaller HDF5 chunk files for efficient analysis.

Expected GSP1000 directory structure: gsp_dir/ └── sub-/ └── func/ └── bld001_rest_*_finalmask.nii.gz

Parameters:

Name Type Description Default
gsp_dir str | Path

Path to the GSP1000 dataset directory

required
mask_path str | Path

Path to MNI152 brain mask (.nii.gz)

required
output_dir str | Path

Directory where chunk HDF5 files will be saved

required
subjects_per_chunk int

Number of subjects to include in each chunk file

10
max_subjects int

Maximum number of subjects to process. If set, only the first max_subjects files are used. Useful for test mode.

None
overwrite bool

Whether to overwrite existing chunk files

False

Returns:

Type Description
list[Path]

List of created chunk file paths

Raises:

Type Description
FileNotFoundError

If GSP directory or mask file not found

ValueError

If no matching NIfTI files found in GSP directory

Examples:

>>> chunk_files = gsp1000_to_hdf5(
...     gsp_dir="/data/GSP1000",
...     mask_path="/data/templates/MNI152_T1_2mm_Brain_Mask.nii.gz",
...     output_dir="/data/connectomes/gsp1000_chunks",
...     subjects_per_chunk=10
... )
>>> print(f"Created {len(chunk_files)} chunk files")
Notes
  • Each chunk file is self-contained with all necessary metadata
  • Timeseries are NOT preprocessed (demeaning, variance normalization) to preserve raw data - preprocessing happens during analysis
  • HDF5 files use chunking (1, n_timepoints, n_voxels) for efficient subject-wise access
Source code in src/lacuna/io/convert.py
def gsp1000_to_hdf5(
    gsp_dir: str | Path,
    mask_path: str | Path,
    output_dir: str | Path,
    subjects_per_chunk: int = 10,
    *,
    max_subjects: int | None = None,
    overwrite: bool = False,
) -> list[Path]:
    """
    Convert GSP1000 functional data to Lacuna-compatible HDF5 chunks.

    Scans a directory of functional NIfTI files from the GSP1000 dataset,
    extracts time-series from within a brain mask, and saves the data into
    multiple smaller HDF5 chunk files for efficient analysis.

    Expected GSP1000 directory structure:
        gsp_dir/
        └── sub-*/
            └── func/
                └── *bld001_rest_*_finalmask.nii.gz

    Parameters
    ----------
    gsp_dir : str | Path
        Path to the GSP1000 dataset directory
    mask_path : str | Path
        Path to MNI152 brain mask (.nii.gz)
    output_dir : str | Path
        Directory where chunk HDF5 files will be saved
    subjects_per_chunk : int, default=10
        Number of subjects to include in each chunk file
    max_subjects : int, optional
        Maximum number of subjects to process. If set, only the first
        ``max_subjects`` files are used. Useful for test mode.
    overwrite : bool, default=False
        Whether to overwrite existing chunk files

    Returns
    -------
    list[Path]
        List of created chunk file paths

    Raises
    ------
    FileNotFoundError
        If GSP directory or mask file not found
    ValueError
        If no matching NIfTI files found in GSP directory

    Examples
    --------
    >>> chunk_files = gsp1000_to_hdf5(
    ...     gsp_dir="/data/GSP1000",
    ...     mask_path="/data/templates/MNI152_T1_2mm_Brain_Mask.nii.gz",
    ...     output_dir="/data/connectomes/gsp1000_chunks",
    ...     subjects_per_chunk=10
    ... )
    >>> print(f"Created {len(chunk_files)} chunk files")

    Notes
    -----
    - Each chunk file is self-contained with all necessary metadata
    - Timeseries are NOT preprocessed (demeaning, variance normalization)
      to preserve raw data - preprocessing happens during analysis
    - HDF5 files use chunking (1, n_timepoints, n_voxels) for efficient
      subject-wise access
    """
    gsp_dir = Path(gsp_dir)
    mask_path = Path(mask_path)
    output_dir = Path(output_dir)

    # Validate inputs
    if not gsp_dir.exists():
        raise FileNotFoundError(f"GSP directory not found: {gsp_dir}")
    if not mask_path.exists():
        raise FileNotFoundError(f"Mask file not found: {mask_path}")

    # Find all functional NIfTI files
    search_pattern = str(gsp_dir / "sub-*" / "func" / "*bld001_rest_*_finalmask.nii.gz")
    all_subject_files = sorted(glob.glob(search_pattern))

    if not all_subject_files:
        raise ValueError(
            f"No NIfTI files found matching pattern: {search_pattern}\n"
            "Expected GSP1000 structure: sub-*/func/*bld001_rest_*_finalmask.nii.gz"
        )

    if max_subjects is not None and len(all_subject_files) > max_subjects:
        all_subject_files = all_subject_files[:max_subjects]

    n_total_subjects = len(all_subject_files)
    print(f"Found {n_total_subjects} subject files")

    # Load brain mask metadata once
    print(f"Loading brain mask from: {mask_path}")
    mask_img = nib.load(mask_path)
    mask_data = mask_img.get_fdata().astype(bool)
    mask_affine = mask_img.affine
    in_mask_indices = np.where(mask_data)
    n_voxels = len(in_mask_indices[0])

    # Get number of timepoints from first subject
    first_img = nib.load(all_subject_files[0])
    n_timepoints = first_img.shape[3]

    print(f"Mask contains {n_voxels:,} in-brain voxels")
    print(f"Detected {n_timepoints} timepoints per subject")

    # Split subjects into chunks
    subject_chunks = [
        all_subject_files[i : i + subjects_per_chunk]
        for i in range(0, n_total_subjects, subjects_per_chunk)
    ]
    print(f"Data will be split into {len(subject_chunks)} chunk files")

    # Create output directory
    output_dir.mkdir(parents=True, exist_ok=True)

    # Process each chunk
    created_files = []
    for chunk_idx, chunk_files in enumerate(tqdm(subject_chunks, desc="Processing chunks")):
        chunk_filename = output_dir / f"gsp1000_chunk_{chunk_idx:03d}.h5"

        if chunk_filename.exists() and not overwrite:
            print(f"  Skipping existing chunk: {chunk_filename.name}")
            created_files.append(chunk_filename)
            continue

        n_subjects_in_chunk = len(chunk_files)

        with h5py.File(chunk_filename, "w") as hf:
            # Create timeseries dataset with chunking for efficient access
            timeseries_dset = hf.create_dataset(
                "timeseries",
                shape=(n_subjects_in_chunk, n_timepoints, n_voxels),
                dtype=np.float32,
                chunks=(1, n_timepoints, n_voxels),
                compression="gzip",
                compression_opts=1,  # Minimal compression for speed
            )

            # Store metadata (makes each chunk self-contained)
            hf.create_dataset("mask_indices", data=np.vstack(in_mask_indices).T)
            hf.create_dataset("mask_affine", data=mask_affine)

            # Attributes
            hf.attrs["n_subjects"] = n_subjects_in_chunk
            hf.attrs["n_timepoints"] = n_timepoints
            hf.attrs["n_voxels"] = n_voxels
            hf.attrs["mask_shape"] = mask_data.shape
            hf.attrs["space"] = "MNI152_2mm"
            hf.attrs["description"] = f"GSP1000 functional connectome chunk {chunk_idx}"
            hf.attrs["source"] = "Harvard Dataverse doi:10.7910/DVN/ILXIKS"

            # Process subjects in this chunk
            for subj_idx, file_path in enumerate(
                tqdm(
                    chunk_files,
                    desc=f"  Chunk {chunk_idx + 1}/{len(subject_chunks)}",
                    leave=False,
                )
            ):
                # Load 4D functional data
                func_img = nib.load(file_path)
                func_data = func_img.get_fdata()

                # Extract timeseries from masked voxels and transpose
                # Shape: (n_timepoints, n_voxels)
                subject_timeseries = func_data[in_mask_indices].T

                # Store in HDF5
                timeseries_dset[subj_idx, :, :] = subject_timeseries

        created_files.append(chunk_filename)

    print("\n✅ Conversion complete!")
    print(f"Created {len(created_files)} chunk files in: {output_dir}")

    return created_files

list_fetchable_connectomes()

List all connectomes available for fetching.

Returns:

Type Description
list of ConnectomeSource

Available connectome sources with metadata.

Examples:

>>> from lacuna.io import list_fetchable_connectomes
>>> for source in list_fetchable_connectomes():
...     print(f"{source.name}: {source.display_name}")
Source code in src/lacuna/io/fetch.py
def list_fetchable_connectomes() -> list[ConnectomeSource]:
    """
    List all connectomes available for fetching.

    Returns
    -------
    list of ConnectomeSource
        Available connectome sources with metadata.

    Examples
    --------
    >>> from lacuna.io import list_fetchable_connectomes
    >>> for source in list_fetchable_connectomes():
    ...     print(f"{source.name}: {source.display_name}")
    """
    from .downloaders import CONNECTOME_SOURCES

    return list(CONNECTOME_SOURCES.values())

load_bids_dataset(bids_root, pattern='*', suffix='_mask.nii.gz', recursive=True, space=None, resolution=None, subjects=None)

Load mask files from a BIDS dataset using pattern matching.

This function finds all files matching the pattern and suffix in the BIDS dataset structure and loads them as SubjectData objects. No external BIDS validation library (pybids) is required.

Parameters:

Name Type Description Default
bids_root str or Path

Path to BIDS dataset root directory (or any directory containing masks).

required
pattern str

Glob/fnmatch pattern to filter files. Matched against the full filename (without path). Examples: - "" : All mask files - "CAS001" : All masks for subject CAS001 - "ses-01" : All session 01 masks - "acuteinfarct" : All acute infarct masks - "CAS001ses-01acuteinfarct" : Specific subject, session, and label

"*"
suffix str

File suffix to search for. Common options: - "_mask.nii.gz" : Standard BIDS mask suffix - "_mask.nii" : Uncompressed masks - ".nii.gz" : Any NIfTI file

"_mask.nii.gz"
recursive bool

If True, search recursively in subdirectories.

True
space str or None

Coordinate space for loaded masks. If None, attempts to detect from filename (_space-XXX) or sidecar JSON. If detection fails and space is not provided, a warning is emitted and the file is skipped. Supported spaces: MNI152NLin6Asym, MNI152NLin2009cAsym

None
resolution float or None

Voxel resolution in mm. If None, attempts to detect from filename (_res-X) or sidecar JSON.

None
subjects list of str

List of subject IDs to include (without 'sub-' prefix). If provided, only files from these subjects will be loaded. This is more efficient than loading all subjects and filtering afterward.

None

Returns:

Type Description
dict of str -> SubjectData

Dictionary mapping filenames (without suffix) to SubjectData objects.

Raises:

Type Description
FileNotFoundError

If bids_root doesn't exist.

BidsError

If no matching files are found.

Examples:

Load all masks in a BIDS dataset:

>>> dataset = load_bids_dataset('/data/METAVCI_PSCI_BIDS')
>>> print(f"Loaded {len(dataset)} masks")

Load specific subject:

>>> dataset = load_bids_dataset(
...     '/data/METAVCI_PSCI_BIDS',
...     pattern="CAS001*"
... )

Load specific session and label:

>>> dataset = load_bids_dataset(
...     '/data/METAVCI_PSCI_BIDS',
...     pattern="CAS001*ses-01*acuteinfarct"
... )

Load from a specific subject's anat folder:

>>> dataset = load_bids_dataset(
...     '/data/METAVCI_PSCI_BIDS/sub-CAS001/ses-01/anat',
...     pattern="*WMH*"
... )

Load all WMH masks across all subjects:

>>> dataset = load_bids_dataset(
...     '/data/METAVCI_PSCI_BIDS',
...     pattern="*WMH*"
... )

Load masks with explicit space (when not in filename):

>>> dataset = load_bids_dataset(
...     '/data/METAVCI_PSCI_BIDS',
...     pattern="*CAS005*",
...     space="MNI152NLin6Asym",
...     resolution=2.0
... )
Source code in src/lacuna/io/bids.py
def load_bids_dataset(
    bids_root: str | Path,
    pattern: str = "*",
    suffix: str = "_mask.nii.gz",
    recursive: bool = True,
    space: str | None = None,
    resolution: float | None = None,
    subjects: list[str] | None = None,
) -> dict[str, SubjectData]:
    """
    Load mask files from a BIDS dataset using pattern matching.

    This function finds all files matching the pattern and suffix in the BIDS
    dataset structure and loads them as SubjectData objects. No external BIDS
    validation library (pybids) is required.

    Parameters
    ----------
    bids_root : str or Path
        Path to BIDS dataset root directory (or any directory containing masks).
    pattern : str, default="*"
        Glob/fnmatch pattern to filter files. Matched against the full filename
        (without path). Examples:
        - "*" : All mask files
        - "CAS001*" : All masks for subject CAS001
        - "*ses-01*" : All session 01 masks
        - "*acuteinfarct*" : All acute infarct masks
        - "CAS001*ses-01*acuteinfarct" : Specific subject, session, and label
    suffix : str, default="_mask.nii.gz"
        File suffix to search for. Common options:
        - "_mask.nii.gz" : Standard BIDS mask suffix
        - "_mask.nii" : Uncompressed masks
        - ".nii.gz" : Any NIfTI file
    recursive : bool, default=True
        If True, search recursively in subdirectories.
    space : str or None, default=None
        Coordinate space for loaded masks. If None, attempts to detect from
        filename (_space-XXX) or sidecar JSON. If detection fails and space
        is not provided, a warning is emitted and the file is skipped.
        Supported spaces: MNI152NLin6Asym, MNI152NLin2009cAsym
    resolution : float or None, default=None
        Voxel resolution in mm. If None, attempts to detect from filename
        (_res-X) or sidecar JSON.
    subjects : list of str, optional
        List of subject IDs to include (without 'sub-' prefix). If provided,
        only files from these subjects will be loaded. This is more efficient
        than loading all subjects and filtering afterward.

    Returns
    -------
    dict of str -> SubjectData
        Dictionary mapping filenames (without suffix) to SubjectData objects.

    Raises
    ------
    FileNotFoundError
        If bids_root doesn't exist.
    BidsError
        If no matching files are found.

    Examples
    --------
    Load all masks in a BIDS dataset:

    >>> dataset = load_bids_dataset('/data/METAVCI_PSCI_BIDS')
    >>> print(f"Loaded {len(dataset)} masks")

    Load specific subject:

    >>> dataset = load_bids_dataset(
    ...     '/data/METAVCI_PSCI_BIDS',
    ...     pattern="CAS001*"
    ... )

    Load specific session and label:

    >>> dataset = load_bids_dataset(
    ...     '/data/METAVCI_PSCI_BIDS',
    ...     pattern="CAS001*ses-01*acuteinfarct"
    ... )

    Load from a specific subject's anat folder:

    >>> dataset = load_bids_dataset(
    ...     '/data/METAVCI_PSCI_BIDS/sub-CAS001/ses-01/anat',
    ...     pattern="*WMH*"
    ... )

    Load all WMH masks across all subjects:

    >>> dataset = load_bids_dataset(
    ...     '/data/METAVCI_PSCI_BIDS',
    ...     pattern="*WMH*"
    ... )

    Load masks with explicit space (when not in filename):

    >>> dataset = load_bids_dataset(
    ...     '/data/METAVCI_PSCI_BIDS',
    ...     pattern="*CAS005*",
    ...     space="MNI152NLin6Asym",
    ...     resolution=2.0
    ... )
    """
    bids_root = Path(bids_root)

    # Check if path exists
    if not bids_root.exists():
        raise FileNotFoundError(f"Directory not found: {bids_root}")

    # Find all matching files
    if recursive:
        # Search recursively
        all_files = list(bids_root.rglob(f"*{suffix}"))
    else:
        # Search only in root
        all_files = list(bids_root.glob(f"*{suffix}"))

    # Filter by pattern - match pattern anywhere in filename
    matching_files = []
    for filepath in all_files:
        filename = filepath.name
        # Remove suffix for pattern matching
        name_without_suffix = filename
        if filename.endswith(".nii.gz"):
            name_without_suffix = filename[:-7]
        elif filename.endswith(".nii"):
            name_without_suffix = filename[:-4]

        # Match pattern (supports wildcards) - try multiple patterns
        if (
            fnmatch.fnmatch(name_without_suffix, f"*{pattern}*")
            or fnmatch.fnmatch(name_without_suffix, pattern)
            or fnmatch.fnmatch(name_without_suffix, f"{pattern}*")
            or fnmatch.fnmatch(name_without_suffix, f"*{pattern}")
        ):
            matching_files.append(filepath)

    # Filter by subject IDs if specified (before loading for efficiency)
    if subjects:
        # Normalize subject IDs (handle with/without 'sub-' prefix)
        normalized_subjects = set()
        for subj in subjects:
            if subj.startswith("sub-"):
                normalized_subjects.add(subj)
            else:
                normalized_subjects.add(f"sub-{subj}")

        filtered_files = []
        for filepath in matching_files:
            # Extract subject ID from path or filename
            path_str = str(filepath)
            # Look for sub-XXX pattern in path
            import re

            match = re.search(r"sub-([^/_]+)", path_str)
            if match:
                file_subject = f"sub-{match.group(1)}"
                if file_subject in normalized_subjects:
                    filtered_files.append(filepath)
        matching_files = filtered_files

    if not matching_files:
        subject_msg = f" for subjects {subjects}" if subjects else ""
        # Build diagnostic message
        diag_parts = [
            f"No files matching pattern '{pattern}' with suffix '{suffix}'{subject_msg} "
            f"found in: {bids_root}",
            f"Searched {'recursively' if recursive else 'non-recursively'}.",
        ]
        if subjects and all_files:
            # Files exist but were filtered out — show what was found
            n_pattern = len(
                [
                    f
                    for f in all_files
                    if any(
                        fnmatch.fnmatch(
                            (
                                f.name[:-7]
                                if f.name.endswith(".nii.gz")
                                else f.name[:-4] if f.name.endswith(".nii") else f.name
                            ),
                            p,
                        )
                        for p in (f"*{pattern}*", pattern, f"{pattern}*", f"*{pattern}")
                    )
                ]
            )
            diag_parts.append(
                f"Found {len(all_files)} file(s) with suffix '{suffix}', "
                f"{n_pattern} matched pattern '{pattern}', "
                f"but none matched subjects {subjects}."
            )
            # Show sample filenames to help debugging
            sample = [f.name for f in all_files[:5]]
            diag_parts.append(f"Sample files found: {sample}")
        elif not all_files:
            diag_parts.append(f"No files with suffix '{suffix}' exist under {bids_root}.")
        raise BidsError("\n".join(diag_parts))

    # Load each file as SubjectData
    mask_data_dict = {}

    for filepath in sorted(matching_files):
        # Create key from filename (without suffix)
        filename = filepath.name
        if filename.endswith(".nii.gz"):
            key = filename[:-7]  # Remove .nii.gz
        elif filename.endswith(".nii"):
            key = filename[:-4]  # Remove .nii
        else:
            key = filename

        # Build metadata from BIDS entities in filename
        metadata = _parse_bids_entities(filename)
        metadata["source_path"] = str(filepath)
        metadata["bids_root"] = str(bids_root)

        # Parse sidecar JSON if available
        sidecar_data = _parse_sidecar(filepath)

        # Get space: function parameter > sidecar JSON > filename entity
        file_space = (
            space  # Function parameter takes precedence
            or sidecar_data.get("Space")
            or sidecar_data.get("space")
            or metadata.get("space")
        )

        # Get resolution: function parameter > sidecar JSON > filename entity
        file_resolution = _parse_resolution(
            resolution  # Function parameter takes precedence
            or sidecar_data.get("Resolution")
            or sidecar_data.get("resolution")
            or metadata.get("resolution")
        )

        try:
            mask_data = SubjectData.from_nifti(
                mask_path=filepath,
                metadata=metadata,
                space=file_space,
                resolution=file_resolution,
            )
            mask_data_dict[key] = mask_data
        except Exception as e:
            warnings.warn(
                f"Failed to load {filepath}: {e}",
                UserWarning,
                stacklevel=2,
            )

    if not mask_data_dict:
        raise BidsError(
            f"No valid mask files could be loaded from: {bids_root}\n"
            f"Pattern: '{pattern}', Suffix: '{suffix}'"
        )

    return mask_data_dict

merge_trk_to_tck(source_dir, output_path, *, exclude_patterns=None, overwrite=False)

Merge multiple TrackVis .trk/.trk.gz tractograms into a single MRtrix3 .tck file.

Recursively finds all .trk and .trk.gz files in the source directory, loads their streamlines (excluding files matching specified patterns), and saves them as a single merged .tck tractogram.

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing .trk/.trk.gz tract files (searched recursively).

required
output_path str | Path

Output path for the merged .tck file.

required
exclude_patterns list[str]

List of patterns to match against file paths for exclusion. Files whose path contains any of these strings (case-insensitive) are skipped. Default: ["cranial nerve", "cranial_nerve"].

None
overwrite bool

Whether to overwrite an existing output file.

False

Returns:

Type Description
Path

Path to the created .tck file.

Raises:

Type Description
FileNotFoundError

If source directory not found.

ValueError

If no .trk/.trk.gz files found or output is not .tck format.

RuntimeError

If merging fails.

Examples:

>>> tck_path = merge_trk_to_tck(
...     source_dir="/data/hcp1065_tracts",
...     output_path="/data/hcp1065.tck",
... )
Source code in src/lacuna/io/convert.py
def merge_trk_to_tck(
    source_dir: str | Path,
    output_path: str | Path,
    *,
    exclude_patterns: list[str] | None = None,
    overwrite: bool = False,
) -> Path:
    """
    Merge multiple TrackVis .trk/.trk.gz tractograms into a single MRtrix3 .tck file.

    Recursively finds all .trk and .trk.gz files in the source directory,
    loads their streamlines (excluding files matching specified patterns),
    and saves them as a single merged .tck tractogram.

    Parameters
    ----------
    source_dir : str | Path
        Directory containing .trk/.trk.gz tract files (searched recursively).
    output_path : str | Path
        Output path for the merged .tck file.
    exclude_patterns : list[str], optional
        List of patterns to match against file paths for exclusion.
        Files whose path contains any of these strings (case-insensitive)
        are skipped. Default: ``["cranial nerve", "cranial_nerve"]``.
    overwrite : bool, default=False
        Whether to overwrite an existing output file.

    Returns
    -------
    Path
        Path to the created .tck file.

    Raises
    ------
    FileNotFoundError
        If source directory not found.
    ValueError
        If no .trk/.trk.gz files found or output is not .tck format.
    RuntimeError
        If merging fails.

    Examples
    --------
    >>> tck_path = merge_trk_to_tck(
    ...     source_dir="/data/hcp1065_tracts",
    ...     output_path="/data/hcp1065.tck",
    ... )
    """
    from nibabel.streamlines import TckFile, Tractogram

    source_dir = Path(source_dir)
    output_path = Path(output_path)

    if exclude_patterns is None:
        exclude_patterns = ["cranial nerve", "cranial_nerve"]

    if not source_dir.exists():
        raise FileNotFoundError(f"Source directory not found: {source_dir}")

    if output_path.suffix != ".tck":
        raise ValueError(f"Output must be .tck format, got: {output_path.suffix}")

    if output_path.exists() and not overwrite:
        print(f"Output file already exists: {output_path}")
        return output_path

    # Find all .trk and .trk.gz files
    trk_files = sorted(source_dir.rglob("*.trk.gz")) + sorted(source_dir.rglob("*.trk"))

    if not trk_files:
        raise ValueError(
            f"No .trk or .trk.gz files found in: {source_dir}\n"
            "Expected directory containing tractography files."
        )

    # Filter out excluded patterns
    exclude_lower = [p.lower() for p in exclude_patterns]
    filtered_files = []
    for f in trk_files:
        path_str = str(f).lower()
        if any(pattern in path_str for pattern in exclude_lower):
            continue
        filtered_files.append(f)

    if not filtered_files:
        raise ValueError(
            f"All {len(trk_files)} tract files were excluded by patterns: {exclude_patterns}"
        )

    print(
        f"Found {len(filtered_files)} tract files ({len(trk_files) - len(filtered_files)} excluded)"
    )

    # Load and merge streamlines
    all_streamlines = []
    files_processed = 0

    print("Loading and merging streamlines...")
    for trk_path in tqdm(filtered_files, desc="Merging tracts"):
        try:
            trk = nib.streamlines.load(str(trk_path))
            all_streamlines.extend(trk.streamlines)
            files_processed += 1
        except Exception as e:
            print(f"  Warning: Error loading {trk_path.name}: {e}")

    if not all_streamlines:
        raise RuntimeError("No streamlines loaded from any tract file.")

    print(f"Processed {files_processed} files, {len(all_streamlines)} total streamlines")

    # Create output directory
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Create merged tractogram and save
    print(f"Saving merged tractogram to {output_path}...")
    try:
        tractogram = Tractogram(
            streamlines=all_streamlines,
            affine_to_rasmm=np.eye(4),
        )
        tck = TckFile(tractogram)
        tck.save(str(output_path))
    except Exception as e:
        raise RuntimeError(f"Failed to save merged tractogram: {e}") from e

    print(f"Merge complete: {output_path}")
    return output_path

save_nifti(mask_data, output_path, save_anatomical=False)

Save lesion mask to NIfTI file.

Parameters:

Name Type Description Default
mask_data SubjectData

Lesion data to save.

required
output_path str or Path

Path for output NIfTI file (e.g., 'lesion.nii.gz').

required
save_anatomical bool

Also save anatomical image (if present) to adjacent file.

False

Raises:

Type Description
ValueError

If output_path doesn't have .nii or .nii.gz extension.

Examples:

>>> save_nifti(mask_data, 'output/lesion.nii.gz')
>>> save_nifti(mask_data, 'output/lesion.nii.gz', save_anatomical=True)
Source code in src/lacuna/io/bids.py
def save_nifti(
    mask_data: SubjectData, output_path: str | Path, save_anatomical: bool = False
) -> None:
    """
    Save lesion mask to NIfTI file.

    Parameters
    ----------
    mask_data : SubjectData
        Lesion data to save.
    output_path : str or Path
        Path for output NIfTI file (e.g., 'lesion.nii.gz').
    save_anatomical : bool, default=False
        Also save anatomical image (if present) to adjacent file.

    Raises
    ------
    ValueError
        If output_path doesn't have .nii or .nii.gz extension.

    Examples
    --------
    >>> save_nifti(mask_data, 'output/lesion.nii.gz')
    >>> save_nifti(mask_data, 'output/lesion.nii.gz', save_anatomical=True)
    """
    import nibabel as nib

    output_path = Path(output_path)

    # Validate extension
    if output_path.suffix not in [".nii", ".gz"]:
        raise ValueError(
            f"Output path must have .nii or .nii.gz extension, got: {output_path.suffix}"
        )

    # Create parent directory if needed
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Save lesion mask
    nib.save(mask_data.mask_img, output_path)

trk_to_tck(trk_path, output_path, *, overwrite=False)

Convert TrackVis .trk tractogram to MRtrix3 .tck format using nibabel.

This conversion is necessary because StructuralNetworkMapping uses MRtrix3 tools (tckedit, tckmap, mrcalc) which require .tck format. The default dTOR985 tractogram is distributed in .trk format.

Uses nibabel's streamlines module for pure Python conversion without requiring MRtrix3 to be installed.

Parameters:

Name Type Description Default
trk_path str | Path

Path to input TrackVis .trk file (e.g., dTOR985.trk)

required
output_path str | Path

Output path for MRtrix3 .tck file

required
overwrite bool

Whether to overwrite existing output file

False

Returns:

Type Description
Path

Path to created .tck file

Raises:

Type Description
FileNotFoundError

If trk file not found

ValueError

If input is not .trk or output is not .tck format

RuntimeError

If conversion fails

Examples:

>>> # Convert dTOR985 tractogram
>>> tck_path = trk_to_tck(
...     trk_path="/data/dTOR985.trk",
...     output_path="/data/dTOR985.tck"
... )
>>>
>>> # Later use in analysis:
>>> analysis = StructuralNetworkMapping(tractogram_path="/data/dTOR985.tck")
Notes
  • Uses nibabel for pure Python conversion (no external dependencies)
  • Preserves streamline coordinates and header information
  • The .tck file can be much larger than .trk due to format differences
  • For dTOR985: expect ~5-10GB .tck file from ~2GB .trk file
See Also

nibabel.streamlines: https://nipy.org/nibabel/reference/nibabel.streamlines.html

Source code in src/lacuna/io/convert.py
def trk_to_tck(
    trk_path: str | Path,
    output_path: str | Path,
    *,
    overwrite: bool = False,
) -> Path:
    """
    Convert TrackVis .trk tractogram to MRtrix3 .tck format using nibabel.

    This conversion is necessary because StructuralNetworkMapping uses MRtrix3
    tools (tckedit, tckmap, mrcalc) which require .tck format. The default
    dTOR985 tractogram is distributed in .trk format.

    Uses nibabel's streamlines module for pure Python conversion without
    requiring MRtrix3 to be installed.

    Parameters
    ----------
    trk_path : str | Path
        Path to input TrackVis .trk file (e.g., dTOR985.trk)
    output_path : str | Path
        Output path for MRtrix3 .tck file
    overwrite : bool, default=False
        Whether to overwrite existing output file

    Returns
    -------
    Path
        Path to created .tck file

    Raises
    ------
    FileNotFoundError
        If trk file not found
    ValueError
        If input is not .trk or output is not .tck format
    RuntimeError
        If conversion fails

    Examples
    --------
    >>> # Convert dTOR985 tractogram
    >>> tck_path = trk_to_tck(
    ...     trk_path="/data/dTOR985.trk",
    ...     output_path="/data/dTOR985.tck"
    ... )
    >>>
    >>> # Later use in analysis:
    >>> analysis = StructuralNetworkMapping(tractogram_path="/data/dTOR985.tck")

    Notes
    -----
    - Uses nibabel for pure Python conversion (no external dependencies)
    - Preserves streamline coordinates and header information
    - The .tck file can be much larger than .trk due to format differences
    - For dTOR985: expect ~5-10GB .tck file from ~2GB .trk file

    See Also
    --------
    nibabel.streamlines: https://nipy.org/nibabel/reference/nibabel.streamlines.html
    """
    from nibabel.streamlines import TckFile, TrkFile

    trk_path = Path(trk_path)
    output_path = Path(output_path)

    # Validate formats
    if trk_path.suffix != ".trk":
        raise ValueError(
            f"Input must be .trk format, got: {trk_path.suffix}\n"
            "Expected TrackVis .trk file (e.g., dTOR985.trk)"
        )

    if output_path.suffix != ".tck":
        raise ValueError(
            f"Output must be .tck format, got: {output_path.suffix}\n"
            "MRtrix3 tools require .tck format"
        )

    if not trk_path.exists():
        raise FileNotFoundError(f"TRK file not found: {trk_path}")

    if output_path.exists() and not overwrite:
        print(f"Output file already exists: {output_path}")
        return output_path

    print("🚀 Converting .trk to .tck format...")
    print(f"Input:  {trk_path}")
    print(f"Output: {output_path}")

    # Create output directory
    output_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        # Load TRK file
        print("Loading .trk file...")
        trk = TrkFile.load(str(trk_path))

        # Create TCK file with same tractogram data
        print("Creating .tck file...")
        tck = TckFile(tractogram=trk.tractogram)

        # Save to disk
        print("Writing to disk...")
        tck.save(str(output_path))

    except Exception as e:
        raise RuntimeError(
            f"Conversion failed: {e}\n"
            "Check that .trk file is valid and nibabel is properly installed."
        ) from e

    print(f"✅ Conversion complete: {output_path}")
    print("Note: Keep .tck file for StructuralNetworkMapping analyses")

    return output_path

validate_bids_derivatives(derivatives_dir, raise_on_error=True)

Validate BIDS derivatives directory structure.

Checks that a derivatives directory follows BIDS specifications: - Has dataset_description.json - SubjectData directories follow naming conventions - Files follow BIDS naming patterns - Required metadata is present

Parameters:

Name Type Description Default
derivatives_dir str or Path

Path to derivatives directory (e.g., 'derivatives/lacuna-v0.1.0')

required
raise_on_error bool

If True, raises BidsError on validation failure. If False, returns errors as list.

True

Returns:

Type Description
dict[str, list[str]]

Dictionary with validation results: - 'errors': List of error messages (MUST fix) - 'warnings': List of warning messages (SHOULD fix) Empty lists indicate passing validation.

Raises:

Type Description
BidsError

If validation fails and raise_on_error=True

FileNotFoundError

If derivatives_dir doesn't exist

Examples:

>>> from lacuna.io import validate_bids_derivatives
>>>
>>> # Validate after export
>>> validate_bids_derivatives('derivatives/lacuna-v0.1.0')
{'errors': [], 'warnings': []}
>>>
>>> # Check without raising exceptions
>>> result = validate_bids_derivatives('derivatives/lacuna-v0.1.0', raise_on_error=False)
>>> if result['errors']:
...     print(f"Found {len(result['errors'])} errors")
Notes

Validation checks: - dataset_description.json exists and is valid JSON - Contains required fields: Name, BIDSVersion, GeneratedBy - SubjectData directories match pattern: sub-

Source code in src/lacuna/io/bids.py
def validate_bids_derivatives(
    derivatives_dir: str | Path,
    raise_on_error: bool = True,
) -> dict[str, list[str]]:
    """
    Validate BIDS derivatives directory structure.

    Checks that a derivatives directory follows BIDS specifications:
    - Has dataset_description.json
    - SubjectData directories follow naming conventions
    - Files follow BIDS naming patterns
    - Required metadata is present

    Parameters
    ----------
    derivatives_dir : str or Path
        Path to derivatives directory (e.g., 'derivatives/lacuna-v0.1.0')
    raise_on_error : bool, default=True
        If True, raises BidsError on validation failure.
        If False, returns errors as list.

    Returns
    -------
    dict[str, list[str]]
        Dictionary with validation results:
        - 'errors': List of error messages (MUST fix)
        - 'warnings': List of warning messages (SHOULD fix)
        Empty lists indicate passing validation.

    Raises
    ------
    BidsError
        If validation fails and raise_on_error=True
    FileNotFoundError
        If derivatives_dir doesn't exist

    Examples
    --------
    >>> from lacuna.io import validate_bids_derivatives
    >>>
    >>> # Validate after export
    >>> validate_bids_derivatives('derivatives/lacuna-v0.1.0')
    {'errors': [], 'warnings': []}
    >>>
    >>> # Check without raising exceptions
    >>> result = validate_bids_derivatives('derivatives/lacuna-v0.1.0', raise_on_error=False)
    >>> if result['errors']:
    ...     print(f"Found {len(result['errors'])} errors")

    Notes
    -----
    Validation checks:
    - dataset_description.json exists and is valid JSON
    - Contains required fields: Name, BIDSVersion, GeneratedBy
    - SubjectData directories match pattern: sub-<label>[/ses-<label>]
    - File naming follows BIDS conventions
    - No unexpected files in root directory
    """
    derivatives_dir = Path(derivatives_dir)
    errors = []
    warnings_list = []

    # Check directory exists
    if not derivatives_dir.exists():
        raise FileNotFoundError(f"Derivatives directory not found: {derivatives_dir}")

    if not derivatives_dir.is_dir():
        errors.append(f"Path is not a directory: {derivatives_dir}")
        if raise_on_error:
            raise BidsError("Validation failed:\n" + "\n".join(errors))
        return {"errors": errors, "warnings": warnings_list}

    # Check for dataset_description.json
    desc_file = derivatives_dir / "dataset_description.json"
    if not desc_file.exists():
        errors.append(
            "Missing required file: dataset_description.json\n"
            "This file is required for BIDS derivatives."
        )
    else:
        # Validate dataset_description.json content
        try:
            with open(desc_file) as f:
                desc_data = json.load(f)

            # Check required fields
            required_fields = ["Name", "BIDSVersion", "GeneratedBy"]
            for field in required_fields:
                if field not in desc_data:
                    errors.append(f"dataset_description.json missing required field: '{field}'")

            # Check GeneratedBy structure if present
            if "GeneratedBy" in desc_data:
                if not isinstance(desc_data["GeneratedBy"], list):
                    errors.append("dataset_description.json: 'GeneratedBy' must be a list")
                elif desc_data["GeneratedBy"]:
                    # Check first entry has required fields
                    gen_by = desc_data["GeneratedBy"][0]
                    if not isinstance(gen_by, dict):
                        errors.append(
                            "dataset_description.json: GeneratedBy entries must be objects"
                        )
                    elif "Name" not in gen_by:
                        warnings_list.append(
                            "dataset_description.json: GeneratedBy entry should have 'Name' field"
                        )

        except json.JSONDecodeError as e:
            errors.append(f"dataset_description.json is not valid JSON: {e}")
        except Exception as e:
            errors.append(f"Error reading dataset_description.json: {e}")

    # Check subject directories
    subject_dirs = [d for d in derivatives_dir.iterdir() if d.is_dir()]

    if not subject_dirs:
        warnings_list.append("No subject directories found in derivatives")
    else:
        for subj_dir in subject_dirs:
            subj_name = subj_dir.name

            # Check subject directory naming
            if not subj_name.startswith("sub-"):
                # Skip non-subject directories (like sourcedata, code)
                if subj_name not in ["sourcedata", "code", ".git"]:
                    warnings_list.append(
                        f"Directory '{subj_name}' doesn't follow BIDS naming "
                        f"(should start with 'sub-')"
                    )
                continue

            # Check for expected subdirectories (all outputs go to anat/ per BIDS spec)
            expected_subdirs = ["anat", "func", "dwi"]
            has_subdirs = any((subj_dir / sd).exists() for sd in expected_subdirs)

            if not has_subdirs:
                warnings_list.append(
                    f"SubjectData '{subj_name}' has no standard BIDS subdirectories "
                    f"(anat, func, dwi)"
                )

            # Check for session subdirectories
            session_dirs = [
                d for d in subj_dir.iterdir() if d.is_dir() and d.name.startswith("ses-")
            ]
            for ses_dir in session_dirs:
                ses_name = ses_dir.name
                # Validate session naming
                if not ses_name.startswith("ses-"):
                    warnings_list.append(
                        f"Session directory '{ses_name}' in {subj_name} doesn't follow "
                        f"BIDS naming (should start with 'ses-')"
                    )

    # Check for unexpected files in root
    root_files = [f for f in derivatives_dir.iterdir() if f.is_file()]
    expected_root_files = [
        "dataset_description.json",
        "README",
        "README.md",
        "CHANGES",
        "LICENSE",
        ".bidsignore",
    ]

    for root_file in root_files:
        if root_file.name not in expected_root_files:
            warnings_list.append(
                f"Unexpected file in derivatives root: {root_file.name}\n"
                f"Consider moving to a subject directory or removing"
            )

    # Raise error if requested and errors found
    if errors and raise_on_error:
        error_msg = "BIDS derivatives validation failed:\n\nErrors:\n" + "\n".join(
            f"  - {e}" for e in errors
        )
        if warnings_list:
            error_msg += "\n\nWarnings:\n" + "\n".join(f"  - {w}" for w in warnings_list)
        raise BidsError(error_msg)

    return {"errors": errors, "warnings": warnings_list}