Skip to content

main

lacuna.cli.main

Lacuna CLI main module.

This module provides the main entry point for the Lacuna CLI, orchestrating the workflow from argument parsing through analysis execution to output writing.

Commands: lacuna fetch - Download and setup connectomes lacuna run - Run analyses (rd, fnm, snm) lacuna collect - Aggregate results across subjects lacuna info - Display available resources

Functions: main: Main CLI entry point that parses arguments and runs the workflow.

RunConfig dataclass

Configuration for run commands.

Source code in src/lacuna/cli/main.py
@dataclass
class RunConfig:
    """Configuration for run commands."""

    bids_dir: Path
    output_dir: Path
    analysis: str
    participant_label: list[str] | None = None
    session_id: list[str] | None = None
    pattern: str | None = None
    space: str | None = None
    n_procs: int = -1
    batch_size: int = -1
    tmp_dir: Path | None = None
    overwrite: bool = False
    keep_intermediate: bool = False
    on_empty: str = "warn"  # "warn", "skip", or "error"
    verbose_count: int = 0
    # Analysis-specific options stored as dict
    analysis_options: dict[str, Any] | None = None

    @property
    def is_single_file(self) -> bool:
        """Check if input is a single NIfTI file rather than BIDS directory."""
        return self.bids_dir.is_file() and self.bids_dir.suffix in (".nii", ".gz")

    @property
    def log_level(self) -> int:
        """Convert verbose_count to log level."""
        return max(25 - 5 * self.verbose_count, 10)

    @property
    def verbose(self) -> bool:
        """Check if verbose output is enabled."""
        return self.verbose_count >= 1

    @classmethod
    def from_args(cls, args: Namespace) -> RunConfig:
        """Create RunConfig from parsed arguments."""
        # Collect analysis-specific options based on analysis type
        analysis_options: dict[str, Any] = {}

        # Common analysis options
        # Note: SNM uses parcellation_name (set below), not parcel_names
        if (
            hasattr(args, "parcel_atlases")
            and args.parcel_atlases
            and args.analysis not in ("snm", "structuralnetworkmapping")
        ):
            analysis_options["parcel_names"] = args.parcel_atlases
        if hasattr(args, "custom_parcellation") and args.custom_parcellation:
            analysis_options["custom_parcellation"] = args.custom_parcellation
        if hasattr(args, "keep_intermediate") and args.keep_intermediate:
            analysis_options["keep_intermediate"] = args.keep_intermediate

        # FNM/SNM connectome path - always provided as path
        if hasattr(args, "connectome_path") and args.connectome_path:
            analysis_options["_connectome_path"] = args.connectome_path
        if hasattr(args, "method") and args.method:
            analysis_options["method"] = args.method
        if hasattr(args, "pini_percentile"):
            analysis_options["pini_percentile"] = args.pini_percentile
        # Handle --no-p-map flag (default is to compute p-map)
        if hasattr(args, "no_p_map") and args.no_p_map:
            analysis_options["compute_p_map"] = False
        if hasattr(args, "fdr_alpha"):
            fdr_alpha = args.fdr_alpha
            analysis_options["fdr_alpha"] = fdr_alpha if fdr_alpha > 0 else None
        if hasattr(args, "t_threshold") and args.t_threshold is not None:
            analysis_options["t_threshold"] = args.t_threshold
        if hasattr(args, "output_resolution") and args.output_resolution is not None:
            analysis_options["output_resolution"] = args.output_resolution
        if hasattr(args, "no_return_input_space") and args.no_return_input_space:
            analysis_options["return_in_input_space"] = False

        # SNM-specific options
        if (
            hasattr(args, "parcel_atlases")
            and args.parcel_atlases
            and args.analysis in ("snm", "structuralnetworkmapping")
        ):
            analysis_options["parcellation_name"] = args.parcel_atlases
        if hasattr(args, "compute_disconnectivity_matrix") and args.compute_disconnectivity_matrix:
            analysis_options["compute_disconnectivity_matrix"] = True
        if hasattr(args, "compute_roi_disconnection") and args.compute_roi_disconnection:
            analysis_options["compute_roi_disconnection"] = True
        # Handle --no-cache-tdi flag (default is to cache)
        if hasattr(args, "no_cache_tdi") and args.no_cache_tdi:
            analysis_options["cache_tdi"] = False
        # Pass nprocs to analysis as n_jobs
        nprocs = getattr(args, "nprocs", -1)
        if nprocs != 1 and args.analysis in (
            "snm",
            "structuralnetworkmapping",
            "fnm",
            "functionalnetworkmapping",
        ):
            analysis_options["n_jobs"] = nprocs
        if hasattr(args, "show_mrtrix_output") and args.show_mrtrix_output:
            analysis_options["show_mrtrix_output"] = True

        # AFNM-specific options
        if args.analysis in ("afnm", "acceleratedfunctionalnetworkmapping"):
            if getattr(args, "matrix_path", None) is not None:
                analysis_options["matrix_path"] = args.matrix_path
            if getattr(args, "lesion_weighting", None) is not None:
                analysis_options["lesion_weighting"] = args.lesion_weighting

        return cls(
            bids_dir=args.bids_dir,
            output_dir=args.output_dir,
            analysis=args.analysis,
            participant_label=getattr(args, "participant_label", None),
            session_id=getattr(args, "session_id", None),
            pattern=getattr(args, "pattern", None),
            space=getattr(args, "mask_space", None),
            n_procs=getattr(args, "nprocs", -1),
            batch_size=getattr(args, "batch_size", -1),
            tmp_dir=getattr(args, "tmp_dir", None),
            overwrite=getattr(args, "overwrite", False),
            keep_intermediate=getattr(args, "keep_intermediate", False),
            on_empty=getattr(args, "on_empty", "warn"),
            verbose_count=getattr(args, "verbose_count", 0),
            analysis_options=analysis_options,
        )

    def validate(self) -> None:
        """Validate configuration."""
        if not self.bids_dir.exists():
            raise ValueError(f"Input path does not exist: {self.bids_dir}")
        if self.output_dir.resolve() == self.bids_dir.resolve():
            raise ValueError("Output directory cannot be same as input path")
        if self.is_single_file and not self.space:
            raise ValueError("--mask-space is required when processing a single NIfTI file")
        if self.n_procs < -1 or self.n_procs == 0:
            raise ValueError(f"--nprocs must be -1 (all CPUs) or >= 1, got {self.n_procs}")

        # SNM flag dependency validation
        if self.analysis in ("snm", "structuralnetworkmapping"):
            opts = self.analysis_options
            has_atlas = "parcellation_name" in opts or opts.get("custom_parcellation")
            has_disconn = opts.get("compute_disconnectivity_matrix", False)
            has_roi = opts.get("compute_roi_disconnection", False)

            if has_atlas and not (has_disconn or has_roi):
                raise ValueError(
                    "--parcel-atlases/--custom-parcellation requires at least one of "
                    "--compute-disconnectivity-matrix or --compute-roi-disconnection"
                )
            if (has_disconn or has_roi) and not has_atlas:
                raise ValueError(
                    "--compute-disconnectivity-matrix and --compute-roi-disconnection "
                    "require --parcel-atlases or --custom-parcellation."
                )
            if has_atlas and "parcellation_name" in opts:
                # parcellation_name is now a list for SNM
                names = opts["parcellation_name"]
                if isinstance(names, str):
                    names = [names]
                opts["parcellation_name"] = [self._validate_atlas_name(n) for n in names]

        # Validate atlas names for RD and FNM (parcel_names list)
        if "parcel_names" in self.analysis_options:
            self.analysis_options["parcel_names"] = [
                self._validate_atlas_name(n) for n in self.analysis_options["parcel_names"]
            ]

    @staticmethod
    def _validate_atlas_name(name: str) -> str:
        """Validate that an atlas name exists in the registry."""
        from lacuna.assets.parcellations import PARCELLATION_REGISTRY

        if name in PARCELLATION_REGISTRY:
            return name

        available = sorted(PARCELLATION_REGISTRY.keys())
        raise ValueError(
            f"Atlas '{name}' not found. "
            f"Available atlases: {', '.join(available[:5])}...\n"
            f"Use 'lacuna info atlases' to see all options."
        )

is_single_file property

Check if input is a single NIfTI file rather than BIDS directory.

log_level property

Convert verbose_count to log level.

verbose property

Check if verbose output is enabled.

from_args(args) classmethod

Create RunConfig from parsed arguments.

Source code in src/lacuna/cli/main.py
@classmethod
def from_args(cls, args: Namespace) -> RunConfig:
    """Create RunConfig from parsed arguments."""
    # Collect analysis-specific options based on analysis type
    analysis_options: dict[str, Any] = {}

    # Common analysis options
    # Note: SNM uses parcellation_name (set below), not parcel_names
    if (
        hasattr(args, "parcel_atlases")
        and args.parcel_atlases
        and args.analysis not in ("snm", "structuralnetworkmapping")
    ):
        analysis_options["parcel_names"] = args.parcel_atlases
    if hasattr(args, "custom_parcellation") and args.custom_parcellation:
        analysis_options["custom_parcellation"] = args.custom_parcellation
    if hasattr(args, "keep_intermediate") and args.keep_intermediate:
        analysis_options["keep_intermediate"] = args.keep_intermediate

    # FNM/SNM connectome path - always provided as path
    if hasattr(args, "connectome_path") and args.connectome_path:
        analysis_options["_connectome_path"] = args.connectome_path
    if hasattr(args, "method") and args.method:
        analysis_options["method"] = args.method
    if hasattr(args, "pini_percentile"):
        analysis_options["pini_percentile"] = args.pini_percentile
    # Handle --no-p-map flag (default is to compute p-map)
    if hasattr(args, "no_p_map") and args.no_p_map:
        analysis_options["compute_p_map"] = False
    if hasattr(args, "fdr_alpha"):
        fdr_alpha = args.fdr_alpha
        analysis_options["fdr_alpha"] = fdr_alpha if fdr_alpha > 0 else None
    if hasattr(args, "t_threshold") and args.t_threshold is not None:
        analysis_options["t_threshold"] = args.t_threshold
    if hasattr(args, "output_resolution") and args.output_resolution is not None:
        analysis_options["output_resolution"] = args.output_resolution
    if hasattr(args, "no_return_input_space") and args.no_return_input_space:
        analysis_options["return_in_input_space"] = False

    # SNM-specific options
    if (
        hasattr(args, "parcel_atlases")
        and args.parcel_atlases
        and args.analysis in ("snm", "structuralnetworkmapping")
    ):
        analysis_options["parcellation_name"] = args.parcel_atlases
    if hasattr(args, "compute_disconnectivity_matrix") and args.compute_disconnectivity_matrix:
        analysis_options["compute_disconnectivity_matrix"] = True
    if hasattr(args, "compute_roi_disconnection") and args.compute_roi_disconnection:
        analysis_options["compute_roi_disconnection"] = True
    # Handle --no-cache-tdi flag (default is to cache)
    if hasattr(args, "no_cache_tdi") and args.no_cache_tdi:
        analysis_options["cache_tdi"] = False
    # Pass nprocs to analysis as n_jobs
    nprocs = getattr(args, "nprocs", -1)
    if nprocs != 1 and args.analysis in (
        "snm",
        "structuralnetworkmapping",
        "fnm",
        "functionalnetworkmapping",
    ):
        analysis_options["n_jobs"] = nprocs
    if hasattr(args, "show_mrtrix_output") and args.show_mrtrix_output:
        analysis_options["show_mrtrix_output"] = True

    # AFNM-specific options
    if args.analysis in ("afnm", "acceleratedfunctionalnetworkmapping"):
        if getattr(args, "matrix_path", None) is not None:
            analysis_options["matrix_path"] = args.matrix_path
        if getattr(args, "lesion_weighting", None) is not None:
            analysis_options["lesion_weighting"] = args.lesion_weighting

    return cls(
        bids_dir=args.bids_dir,
        output_dir=args.output_dir,
        analysis=args.analysis,
        participant_label=getattr(args, "participant_label", None),
        session_id=getattr(args, "session_id", None),
        pattern=getattr(args, "pattern", None),
        space=getattr(args, "mask_space", None),
        n_procs=getattr(args, "nprocs", -1),
        batch_size=getattr(args, "batch_size", -1),
        tmp_dir=getattr(args, "tmp_dir", None),
        overwrite=getattr(args, "overwrite", False),
        keep_intermediate=getattr(args, "keep_intermediate", False),
        on_empty=getattr(args, "on_empty", "warn"),
        verbose_count=getattr(args, "verbose_count", 0),
        analysis_options=analysis_options,
    )

validate()

Validate configuration.

Source code in src/lacuna/cli/main.py
def validate(self) -> None:
    """Validate configuration."""
    if not self.bids_dir.exists():
        raise ValueError(f"Input path does not exist: {self.bids_dir}")
    if self.output_dir.resolve() == self.bids_dir.resolve():
        raise ValueError("Output directory cannot be same as input path")
    if self.is_single_file and not self.space:
        raise ValueError("--mask-space is required when processing a single NIfTI file")
    if self.n_procs < -1 or self.n_procs == 0:
        raise ValueError(f"--nprocs must be -1 (all CPUs) or >= 1, got {self.n_procs}")

    # SNM flag dependency validation
    if self.analysis in ("snm", "structuralnetworkmapping"):
        opts = self.analysis_options
        has_atlas = "parcellation_name" in opts or opts.get("custom_parcellation")
        has_disconn = opts.get("compute_disconnectivity_matrix", False)
        has_roi = opts.get("compute_roi_disconnection", False)

        if has_atlas and not (has_disconn or has_roi):
            raise ValueError(
                "--parcel-atlases/--custom-parcellation requires at least one of "
                "--compute-disconnectivity-matrix or --compute-roi-disconnection"
            )
        if (has_disconn or has_roi) and not has_atlas:
            raise ValueError(
                "--compute-disconnectivity-matrix and --compute-roi-disconnection "
                "require --parcel-atlases or --custom-parcellation."
            )
        if has_atlas and "parcellation_name" in opts:
            # parcellation_name is now a list for SNM
            names = opts["parcellation_name"]
            if isinstance(names, str):
                names = [names]
            opts["parcellation_name"] = [self._validate_atlas_name(n) for n in names]

    # Validate atlas names for RD and FNM (parcel_names list)
    if "parcel_names" in self.analysis_options:
        self.analysis_options["parcel_names"] = [
            self._validate_atlas_name(n) for n in self.analysis_options["parcel_names"]
        ]

main(argv=None)

Main CLI entry point.

Parses command-line arguments and routes to appropriate command handler.

Parameters:

Name Type Description Default
argv list of str

Command-line arguments. If None, uses sys.argv[1:].

None

Returns:

Type Description
int

Exit code (0 for success, non-zero for errors).

Source code in src/lacuna/cli/main.py
def main(argv: list[str] | None = None) -> int:
    """
    Main CLI entry point.

    Parses command-line arguments and routes to appropriate command handler.

    Parameters
    ----------
    argv : list of str, optional
        Command-line arguments. If None, uses sys.argv[1:].

    Returns
    -------
    int
        Exit code (0 for success, non-zero for errors).
    """
    from lacuna.cli.parser import build_parser

    if argv is None:
        argv = sys.argv[1:]

    parser = build_parser()
    args = parser.parse_args(argv)

    # Route to appropriate command handler
    if args.command == "fetch":
        return _handle_fetch_command(args)
    elif args.command == "run":
        return _handle_run_command(args)
    elif args.command == "collect":
        return _handle_collect_command(args)
    elif args.command == "info":
        return _handle_info_command(args)
    elif args.command == "bidsify":
        return _handle_bidsify_command(args)
    elif args.command == "parcellate":
        return _handle_parcellate_command(args)
    elif args.command == "tutorial":
        return _handle_tutorial_command(args)
    elif args.command == "check":
        return _handle_check_command(args)
    else:
        # No command specified - show help
        parser.print_help()
        return EXIT_SUCCESS