@dataclass
class RunConfig:
"""Configuration for run commands."""
bids_dir: Path
output_dir: Path
analysis: str
participant_label: list[str] | None = None
session_id: list[str] | None = None
pattern: str | None = None
space: str | None = None
n_procs: int = -1
batch_size: int = -1
tmp_dir: Path | None = None
overwrite: bool = False
keep_intermediate: bool = False
on_empty: str = "warn" # "warn", "skip", or "error"
verbose_count: int = 0
# Analysis-specific options stored as dict
analysis_options: dict[str, Any] | None = None
@property
def is_single_file(self) -> bool:
"""Check if input is a single NIfTI file rather than BIDS directory."""
return self.bids_dir.is_file() and self.bids_dir.suffix in (".nii", ".gz")
@property
def log_level(self) -> int:
"""Convert verbose_count to log level."""
return max(25 - 5 * self.verbose_count, 10)
@property
def verbose(self) -> bool:
"""Check if verbose output is enabled."""
return self.verbose_count >= 1
@classmethod
def from_args(cls, args: Namespace) -> RunConfig:
"""Create RunConfig from parsed arguments."""
# Collect analysis-specific options based on analysis type
analysis_options: dict[str, Any] = {}
# Common analysis options
# Note: SNM uses parcellation_name (set below), not parcel_names
if (
hasattr(args, "parcel_atlases")
and args.parcel_atlases
and args.analysis not in ("snm", "structuralnetworkmapping")
):
analysis_options["parcel_names"] = args.parcel_atlases
if hasattr(args, "custom_parcellation") and args.custom_parcellation:
analysis_options["custom_parcellation"] = args.custom_parcellation
if hasattr(args, "keep_intermediate") and args.keep_intermediate:
analysis_options["keep_intermediate"] = args.keep_intermediate
# FNM/SNM connectome path - always provided as path
if hasattr(args, "connectome_path") and args.connectome_path:
analysis_options["_connectome_path"] = args.connectome_path
if hasattr(args, "method") and args.method:
analysis_options["method"] = args.method
if hasattr(args, "pini_percentile"):
analysis_options["pini_percentile"] = args.pini_percentile
# Handle --no-p-map flag (default is to compute p-map)
if hasattr(args, "no_p_map") and args.no_p_map:
analysis_options["compute_p_map"] = False
if hasattr(args, "fdr_alpha"):
fdr_alpha = args.fdr_alpha
analysis_options["fdr_alpha"] = fdr_alpha if fdr_alpha > 0 else None
if hasattr(args, "t_threshold") and args.t_threshold is not None:
analysis_options["t_threshold"] = args.t_threshold
if hasattr(args, "output_resolution") and args.output_resolution is not None:
analysis_options["output_resolution"] = args.output_resolution
if hasattr(args, "no_return_input_space") and args.no_return_input_space:
analysis_options["return_in_input_space"] = False
# SNM-specific options
if (
hasattr(args, "parcel_atlases")
and args.parcel_atlases
and args.analysis in ("snm", "structuralnetworkmapping")
):
analysis_options["parcellation_name"] = args.parcel_atlases
if hasattr(args, "compute_disconnectivity_matrix") and args.compute_disconnectivity_matrix:
analysis_options["compute_disconnectivity_matrix"] = True
if hasattr(args, "compute_roi_disconnection") and args.compute_roi_disconnection:
analysis_options["compute_roi_disconnection"] = True
# Handle --no-cache-tdi flag (default is to cache)
if hasattr(args, "no_cache_tdi") and args.no_cache_tdi:
analysis_options["cache_tdi"] = False
# Pass nprocs to analysis as n_jobs
nprocs = getattr(args, "nprocs", -1)
if nprocs != 1 and args.analysis in (
"snm",
"structuralnetworkmapping",
"fnm",
"functionalnetworkmapping",
):
analysis_options["n_jobs"] = nprocs
if hasattr(args, "show_mrtrix_output") and args.show_mrtrix_output:
analysis_options["show_mrtrix_output"] = True
# AFNM-specific options
if args.analysis in ("afnm", "acceleratedfunctionalnetworkmapping"):
if getattr(args, "matrix_path", None) is not None:
analysis_options["matrix_path"] = args.matrix_path
if getattr(args, "lesion_weighting", None) is not None:
analysis_options["lesion_weighting"] = args.lesion_weighting
return cls(
bids_dir=args.bids_dir,
output_dir=args.output_dir,
analysis=args.analysis,
participant_label=getattr(args, "participant_label", None),
session_id=getattr(args, "session_id", None),
pattern=getattr(args, "pattern", None),
space=getattr(args, "mask_space", None),
n_procs=getattr(args, "nprocs", -1),
batch_size=getattr(args, "batch_size", -1),
tmp_dir=getattr(args, "tmp_dir", None),
overwrite=getattr(args, "overwrite", False),
keep_intermediate=getattr(args, "keep_intermediate", False),
on_empty=getattr(args, "on_empty", "warn"),
verbose_count=getattr(args, "verbose_count", 0),
analysis_options=analysis_options,
)
def validate(self) -> None:
"""Validate configuration."""
if not self.bids_dir.exists():
raise ValueError(f"Input path does not exist: {self.bids_dir}")
if self.output_dir.resolve() == self.bids_dir.resolve():
raise ValueError("Output directory cannot be same as input path")
if self.is_single_file and not self.space:
raise ValueError("--mask-space is required when processing a single NIfTI file")
if self.n_procs < -1 or self.n_procs == 0:
raise ValueError(f"--nprocs must be -1 (all CPUs) or >= 1, got {self.n_procs}")
# SNM flag dependency validation
if self.analysis in ("snm", "structuralnetworkmapping"):
opts = self.analysis_options
has_atlas = "parcellation_name" in opts or opts.get("custom_parcellation")
has_disconn = opts.get("compute_disconnectivity_matrix", False)
has_roi = opts.get("compute_roi_disconnection", False)
if has_atlas and not (has_disconn or has_roi):
raise ValueError(
"--parcel-atlases/--custom-parcellation requires at least one of "
"--compute-disconnectivity-matrix or --compute-roi-disconnection"
)
if (has_disconn or has_roi) and not has_atlas:
raise ValueError(
"--compute-disconnectivity-matrix and --compute-roi-disconnection "
"require --parcel-atlases or --custom-parcellation."
)
if has_atlas and "parcellation_name" in opts:
# parcellation_name is now a list for SNM
names = opts["parcellation_name"]
if isinstance(names, str):
names = [names]
opts["parcellation_name"] = [self._validate_atlas_name(n) for n in names]
# Validate atlas names for RD and FNM (parcel_names list)
if "parcel_names" in self.analysis_options:
self.analysis_options["parcel_names"] = [
self._validate_atlas_name(n) for n in self.analysis_options["parcel_names"]
]
@staticmethod
def _validate_atlas_name(name: str) -> str:
"""Validate that an atlas name exists in the registry."""
from lacuna.assets.parcellations import PARCELLATION_REGISTRY
if name in PARCELLATION_REGISTRY:
return name
available = sorted(PARCELLATION_REGISTRY.keys())
raise ValueError(
f"Atlas '{name}' not found. "
f"Available atlases: {', '.join(available[:5])}...\n"
f"Use 'lacuna info atlases' to see all options."
)