Skip to content

parcel_aggregation

lacuna.analysis.parcel_aggregation

Parcellation aggregation module.

Provides flexible ROI-level aggregation of voxel-level maps across multiple parcellations. This is the core tool for extracting region-of-interest statistics from mask images, connectivity maps, or any other spatial maps.

Examples:

>>> from lacuna import SubjectData
>>> from lacuna.analysis import ParcelAggregation
>>>
>>> # Use bundled atlases
>>> mask = SubjectData.from_nifti("mask.nii.gz")
>>> analysis = ParcelAggregation(
...     source="maskimg",
...     aggregation="percent"
... )
>>> result = analysis.run(mask)
>>> print(result.results["ParcelAggregation"])
>>>
>>> # Use custom atlas directory
>>> analysis = ParcelAggregation(
...     atlas_dir="/data/atlases",
...     source="maskimg",
...     aggregation="percent"
... )
>>> result = analysis.run(mask)
>>>
>>> # Aggregate functional connectivity map by atlas regions
>>> from lacuna.analysis import FunctionalNetworkMapping
>>> fnm = FunctionalNetworkMapping(connectome_name="GSP1000")
>>> result = fnm.run(mask)
>>>
>>> # Now aggregate the network map to atlas ROIs
>>> agg = ParcelAggregation(
...     source="FunctionalNetworkMapping.zmap",
...     aggregation="mean"
... )
>>> final = agg.run(result)

ParcelAggregation

Bases: BaseAnalysis

Atlas aggregation analysis.

Computations performed in input data space (atlases transformed to match input).

Aggregate voxel-level maps to ROI-level statistics using atlases.

This is a composable analysis that can: 1. Compute regional damage from lesion masks (percent overlap, volume) 2. Aggregate connectivity maps from network analyses (mean, sum, etc.) 3. Extract any voxel-level map to atlas ROI statistics

The analysis discovers all atlases in the specified directory and computes the specified aggregation method for each region in each atlas.

Computation Space: Atlases are automatically transformed to match the input data's coordinate space and resolution (parsed from metadata or BIDS-style filenames: tpl-{SPACE}res-{RES}...). If an atlas is already in the input space, no transformation is performed. After transformation, nilearn resamples the atlas to precisely match the input resolution for exact alignment.

Attributes:

Name Type Description
TARGET_SPACE None

Space is determined from the input data. Atlases are transformed to match the input data's coordinate space automatically.

TARGET_RESOLUTION None

Resolution is determined from the input data. Atlases are transformed to match the input resolution, then nilearn resamples for precise alignment.

batch_strategy str

Batch processing strategy. Set to "sequential" to avoid race conditions with threading backends when accessing shared atlas resources.

Parameters:

Name Type Description Default
source str or list[str] or dict[str, str | list[str]]

Source of data to aggregate. Accepts multiple formats:

String format: - "maskimg": Use the lesion mask directly - "{AnalysisName}.{result_key}": Use result from previous analysis Example: "FunctionalNetworkMapping.correlation_map"

List format: - List of strings in the above formats for multi-source aggregation Example: ["SubjectData.mask_img", "FunctionalNetworkMapping.correlation_map"]

Dict format (recommended for multi-source): - Mapping of analysis namespace to result key(s) Example: {"FunctionalNetworkMapping": "rmap"} Example: {"FunctionalNetworkMapping": ["rmap", "zmap"]} Example: {"SubjectData": "maskimg", "FunctionalNetworkMapping": ["rmap", "zmap"]}

"maskimg"
aggregation str

Aggregation method to use. Options: - "mean": Mean value across ROI voxels - "sum": Sum of values across ROI voxels - "percent": Percentage of ROI voxels that are non-zero (for binary masks) - "volume": Volume (in mm³) of non-zero voxels in ROI - "median": Median value across ROI voxels - "std": Standard deviation across ROI voxels

"mean"
parcel_names list of str or None

Names of atlases from the registry to process (e.g., "schaefer2018parcels100networks7"). If None, all registered atlases are processed. Use register_parcellation() or register_parcellationes_from_directory() to add custom atlases. If None, all parcellations found in atlas_dir will be processed. Example: ["schaefer2018parcels100networks7", "tian2020parcels16"]

None

Raises:

Type Description
ValueError

If atlas_dir doesn't exist, is empty, or aggregation method is invalid.

FileNotFoundError

If specified atlas directory doesn't exist.

Notes
  • Both 3D and 4D atlases support automatic resampling to match source data spatial resolution via nilearn
  • 3D atlases: integer labels, use NiftiLabelsMasker with nearest-neighbor interpolation to preserve labels
  • 4D atlases: automatically detect binary (0/1) vs probabilistic (0.0-1.0)
  • Binary: use nearest-neighbor interpolation to preserve binary masks
  • Probabilistic: use continuous interpolation for probability values
  • For 3D atlases: regions defined by integer labels (automatically rounded)
  • For 4D atlases: each volume is a binary or probability map for one region
  • 4D probabilistic maps are thresholded at threshold parameter if provided
  • Results stored in SubjectData.results["ParcelAggregation"] as dict mapping parcellation_name_region_name -> aggregated_value

Examples:

>>> # Use all bundled/registered atlases
>>> analysis = ParcelAggregation(
...     source="maskimg",
...     aggregation="percent"
... )
>>>
>>> # Use specific registered atlases
>>> analysis = ParcelAggregation(
...     source="maskimg",
...     aggregation="percent",
...     parcel_names=["schaefer2018parcels100networks7", "tian2020parcels16"]
... )
>>>
>>> # Register custom atlases first, then use them
>>> from lacuna.assets.parcellations import register_parcellations_from_directory
>>> register_parcellationes_from_directory("/data/my_atlases")
>>> analysis = ParcelAggregation(
...     source="maskimg",
...     aggregation="percent"
... )
>>>
>>> # Average functional connectivity per ROI
>>> analysis = ParcelAggregation(
...     source="FunctionalNetworkMapping.network_map",
...     aggregation="mean"
... )
See Also

RegionalDamage : Convenience wrapper for lesion overlap analysis BaseAnalysis : Parent class defining analysis interface

Source code in src/lacuna/analysis/parcel_aggregation.py
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
class ParcelAggregation(BaseAnalysis):
    """Atlas aggregation analysis.

    Computations performed in input data space (atlases transformed to match input).

    Aggregate voxel-level maps to ROI-level statistics using atlases.

    This is a composable analysis that can:
    1. Compute regional damage from lesion masks (percent overlap, volume)
    2. Aggregate connectivity maps from network analyses (mean, sum, etc.)
    3. Extract any voxel-level map to atlas ROI statistics

    The analysis discovers all atlases in the specified directory and computes
    the specified aggregation method for each region in each atlas.

    **Computation Space:**
    Atlases are automatically transformed to match the input data's coordinate space
    and resolution (parsed from metadata or BIDS-style filenames: tpl-{SPACE}_res-{RES}_...).
    If an atlas is already in the input space, no transformation is performed.
    After transformation, nilearn resamples the atlas to precisely match the input
    resolution for exact alignment.

    Attributes
    ----------
    TARGET_SPACE : None
        Space is determined from the input data. Atlases are transformed to match
        the input data's coordinate space automatically.
    TARGET_RESOLUTION : None
        Resolution is determined from the input data. Atlases are transformed to
        match the input resolution, then nilearn resamples for precise alignment.
    batch_strategy : str
        Batch processing strategy. Set to "sequential" to avoid race conditions
        with threading backends when accessing shared atlas resources.

    Parameters
    ----------
    source : str or list[str] or dict[str, str | list[str]], default="maskimg"
        Source of data to aggregate. Accepts multiple formats:

        **String format:**
        - "maskimg": Use the lesion mask directly
        - "{AnalysisName}.{result_key}": Use result from previous analysis
          Example: "FunctionalNetworkMapping.correlation_map"

        **List format:**
        - List of strings in the above formats for multi-source aggregation
          Example: ["SubjectData.mask_img", "FunctionalNetworkMapping.correlation_map"]

        **Dict format (recommended for multi-source):**
        - Mapping of analysis namespace to result key(s)
          Example: {"FunctionalNetworkMapping": "rmap"}
          Example: {"FunctionalNetworkMapping": ["rmap", "zmap"]}
          Example: {"SubjectData": "maskimg", "FunctionalNetworkMapping": ["rmap", "zmap"]}

    aggregation : str, default="mean"
        Aggregation method to use. Options:
        - "mean": Mean value across ROI voxels
        - "sum": Sum of values across ROI voxels
        - "percent": Percentage of ROI voxels that are non-zero (for binary masks)
        - "volume": Volume (in mm³) of non-zero voxels in ROI
        - "median": Median value across ROI voxels
        - "std": Standard deviation across ROI voxels
    parcel_names : list of str or None, default=None
        Names of atlases from the registry to process (e.g., "schaefer2018parcels100networks7").
        If None, all registered atlases are processed.
        Use register_parcellation() or register_parcellationes_from_directory() to add custom atlases.
        If None, all parcellations found in atlas_dir will be processed.
        Example: ["schaefer2018parcels100networks7", "tian2020parcels16"]

    Raises
    ------
    ValueError
        If atlas_dir doesn't exist, is empty, or aggregation method is invalid.
    FileNotFoundError
        If specified atlas directory doesn't exist.

    Notes
    -----
    - Both 3D and 4D atlases support automatic resampling to match source data
      spatial resolution via nilearn
    - 3D atlases: integer labels, use NiftiLabelsMasker with nearest-neighbor
      interpolation to preserve labels
    - 4D atlases: automatically detect binary (0/1) vs probabilistic (0.0-1.0)
      * Binary: use nearest-neighbor interpolation to preserve binary masks
      * Probabilistic: use continuous interpolation for probability values
    - For 3D atlases: regions defined by integer labels (automatically rounded)
    - For 4D atlases: each volume is a binary or probability map for one region
    - 4D probabilistic maps are thresholded at `threshold` parameter if provided
    - Results stored in SubjectData.results["ParcelAggregation"] as dict
      mapping parcellation_name_region_name -> aggregated_value

    Examples
    --------
    >>> # Use all bundled/registered atlases
    >>> analysis = ParcelAggregation(
    ...     source="maskimg",
    ...     aggregation="percent"
    ... )
    >>>
    >>> # Use specific registered atlases
    >>> analysis = ParcelAggregation(
    ...     source="maskimg",
    ...     aggregation="percent",
    ...     parcel_names=["schaefer2018parcels100networks7", "tian2020parcels16"]
    ... )
    >>>
    >>> # Register custom atlases first, then use them
    >>> from lacuna.assets.parcellations import register_parcellations_from_directory
    >>> register_parcellationes_from_directory("/data/my_atlases")
    >>> analysis = ParcelAggregation(
    ...     source="maskimg",
    ...     aggregation="percent"
    ... )
    >>>
    >>> # Average functional connectivity per ROI
    >>> analysis = ParcelAggregation(
    ...     source="FunctionalNetworkMapping.network_map",
    ...     aggregation="mean"
    ... )

    See Also
    --------
    RegionalDamage : Convenience wrapper for lesion overlap analysis
    BaseAnalysis : Parent class defining analysis interface
    """

    #: Space is determined from the input data
    TARGET_SPACE = None
    #: Resolution is determined from the input data
    TARGET_RESOLUTION = None
    #: Preferred batch processing strategy (sequential to avoid threading race conditions)
    batch_strategy: str = "sequential"

    VALID_AGGREGATIONS = ["mean", "sum", "percent", "volume", "median", "std"]
    VALID_SOURCES = ["maskimg"]

    def __init__(
        self,
        source: str | list[str] | dict[str, str | list[str]] = "maskimg",
        aggregation: str = "mean",
        parcel_names: list[str] | None = None,
        verbose: bool = False,
        keep_intermediate: bool = False,
    ):
        """Initialize ParcelAggregation analysis.

        Parameters
        ----------
        source : str or list[str] or dict, default="maskimg"
            Source of data to aggregate.
        aggregation : str, default="mean"
            Aggregation method to use.
        parcel_names : list of str or None, default=None
            Names of atlases from the registry to process.
        verbose : bool, default=False
            If True, print progress messages.
        keep_intermediate : bool, default=False
            If True, include intermediate results (e.g., warped mask images)
            in the output. Useful for debugging and quality control.
        """
        super().__init__(verbose=verbose, keep_intermediate=keep_intermediate)

        # Initialize logger for warnings and info messages
        from lacuna.utils.logging import ConsoleLogger

        self.logger = ConsoleLogger(verbose=verbose)

        # Normalize and validate source parameter
        self.sources = self._normalize_sources(source)
        self.source = source  # Keep original for compatibility
        self.aggregation = aggregation
        self.parcel_names = parcel_names

        # Validate aggregation method
        if aggregation not in self.VALID_AGGREGATIONS:
            from lacuna.utils.suggestions import format_suggestions, suggest_similar

            suggestions = suggest_similar(aggregation, list(self.VALID_AGGREGATIONS))
            hint = format_suggestions(suggestions)
            msg = (
                f"Invalid aggregation method: '{aggregation}'\n"
                f"Valid options: {', '.join(self.VALID_AGGREGATIONS)}"
            )
            if hint:
                msg = f"{msg}\n{hint}"
            raise ValueError(msg)

        # Threshold validation removed - accepts any float value (T061)
        # This allows for flexible thresholding (e.g., negative z-scores, arbitrary cutoffs)

        # Validate parcel_names if provided
        if parcel_names is not None:
            if not isinstance(parcel_names, list):
                raise TypeError(
                    f"parcel_names must be a list of strings or None, got {type(parcel_names).__name__}"
                )
            if not all(isinstance(name, str) for name in parcel_names):
                raise TypeError("All items in parcel_names must be strings")
            if not parcel_names:
                raise ValueError(
                    "parcel_names cannot be an empty list (use None to process all atlases)"
                )

        # Will be populated in _validate_inputs (thread-safe)
        self.atlases = []
        self._atlases_lock = threading.Lock()
        # Cache for loaded+transformed atlas images, keyed by
        # (atlas_path, input_space, input_resolution) to avoid redundant
        # disk I/O and spatial transformations across subjects
        self._atlas_cache: dict[tuple, nib.Nifti1Image] = {}

    def __getstate__(self):
        """Exclude non-picklable lock from serialization for multiprocessing."""
        state = self.__dict__.copy()
        # Remove the lock - it can't be pickled
        state.pop("_atlases_lock", None)
        # Don't serialize the atlas cache - rebuild in new process
        state.pop("_atlas_cache", None)
        return state

    def __setstate__(self, state):
        """Recreate lock after unpickling for multiprocessing."""
        self.__dict__.update(state)
        # Recreate the lock in the new process
        self._atlases_lock = threading.Lock()
        # Recreate empty atlas cache in the new process
        self._atlas_cache = {}

    def _normalize_sources(self, source: str | list[str] | dict[str, str | list[str]]) -> list[str]:
        """
        Normalize source parameter to a list of sources.

        Parameters
        ----------
        source : str or list[str] or dict[str, str | list[str]]
            Source specification in one of these formats:
            - str: Single source like "maskimg" or "FunctionalNetworkMapping.correlation_map"
            - list[str]: Multiple sources as strings
            - dict: Mapping of namespace to key(s), e.g.,
              {"FunctionalNetworkMapping": "rmap"} or
              {"FunctionalNetworkMapping": ["rmap", "zmap"]}

        Returns
        -------
        list[str]
            Normalized list of source strings in "Namespace.key" format.

        Raises
        ------
        TypeError
            If source is not str, list[str], or dict.
        ValueError
            If source list/dict is empty.

        Examples
        --------
        >>> agg._normalize_sources("maskimg")
        ['mask_img']
        >>> agg._normalize_sources({"FunctionalNetworkMapping": "rmap"})
        ['FunctionalNetworkMapping.rmap']
        >>> agg._normalize_sources({"FunctionalNetworkMapping": ["rmap", "zmap"]})
        ['FunctionalNetworkMapping.rmap', 'FunctionalNetworkMapping.zmap']
        """
        if isinstance(source, str):
            return [source]
        elif isinstance(source, dict):
            if not source:
                raise ValueError("source dict cannot be empty")
            sources = []
            for namespace, keys in source.items():
                if not isinstance(namespace, str):
                    raise TypeError(f"Source namespace must be str, got {type(namespace).__name__}")
                if isinstance(keys, str):
                    # Single key: {"FunctionalNetworkMapping": "rmap"}
                    sources.append(f"{namespace}.{keys}")
                elif isinstance(keys, list):
                    # Multiple keys: {"FunctionalNetworkMapping": ["rmap", "zmap"]}
                    if not keys:
                        raise ValueError(f"Source keys for '{namespace}' cannot be empty")
                    for key in keys:
                        if not isinstance(key, str):
                            raise TypeError(f"Source key must be str, got {type(key).__name__}")
                        sources.append(f"{namespace}.{key}")
                else:
                    raise TypeError(
                        f"Source value must be str or list[str], got {type(keys).__name__}"
                    )
            return sources
        elif isinstance(source, list):
            if not source:
                raise ValueError("source cannot be empty list")
            if not all(isinstance(s, str) for s in source):
                raise TypeError("All items in source list must be strings")
            return source
        else:
            raise TypeError(f"source must be str, list[str], or dict, got {type(source).__name__}")

    def run(
        self, data: "SubjectData | nib.Nifti1Image | list[nib.Nifti1Image]"
    ) -> "SubjectData | ParcelData | list[ParcelData]":
        """
        Execute atlas aggregation analysis on various input types.

        Supports flexible input types with matching return types:
        - SubjectData -> SubjectData (with results attached)
        - nibabel.Nifti1Image -> ParcelData
        - list[nibabel.Nifti1Image] -> list[ParcelData]

        Parameters
        ----------
        data : SubjectData or nibabel.Nifti1Image or list[nibabel.Nifti1Image]
            Input data to aggregate:
            - SubjectData: Standard workflow, returns SubjectData with results
            - nibabel.Nifti1Image: Single image, returns ParcelData
            - list[nibabel.Nifti1Image]: Batch processing, returns list of results

        Returns
        -------
        SubjectData or ParcelData or list[ParcelData]
            Results matching input type:
            - SubjectData input: New SubjectData instance with results in .results dict
            - nibabel input: Single ParcelData
            - list input: List of ParcelData objects (one per input image)

        Raises
        ------
        ValueError
            If input validation fails or source data not found.
        TypeError
            If input type is not supported.

        Notes
        -----
        This method overrides BaseAnalysis.run() to support flexible input types.
        The base class run() is designed for SubjectData only.

        Examples
        --------
        >>> # SubjectData input
        >>> mask_data = SubjectData(mask_img, space='MNI152NLin6Asym', resolution=2)
        >>> analysis = ParcelAggregation(aggregation='percent')
        >>> result = analysis.run(mask_data)
        >>> isinstance(data, SubjectData)
        True

        >>> # Nibabel image input
        >>> import nibabel as nib
        >>> img = nib.load('mask.nii.gz')
        >>> result = analysis.run(img)
        >>> isinstance(result, ParcelData)
        True

        >>> # List of images
        >>> images = [nib.load(f'mask_{i}.nii.gz') for i in range(5)]
        >>> results = analysis.run(images)
        >>> len(results) == 5
        True
        """
        from lacuna.core.data_types import VoxelMap

        # Detect input type and delegate to appropriate handler
        if isinstance(data, SubjectData):
            # Standard SubjectData workflow - use base class run()
            return super().run(data)

        elif isinstance(data, VoxelMap):
            # VoxelMap - run directly without SubjectData wrapper
            return self._run_voxelmap(data)

        elif isinstance(data, nib.Nifti1Image):
            # Single nibabel image - return ParcelData
            return self._run_single_image(data)

        elif isinstance(data, list):
            # List of images or VoxelMaps - return list of results
            if not data:
                raise ValueError("Empty list provided - at least one image required")

            # Check if all are VoxelMaps or all are Images
            if all(isinstance(item, VoxelMap) for item in data):
                # Process VoxelMaps directly
                return [self._run_voxelmap(vm) for vm in data]

            elif all(isinstance(img, nib.Nifti1Image) for img in data):
                return self._run_batch_images(data)

            else:
                raise TypeError(
                    "When providing a list, all items must be of the same type: "
                    "either all VoxelMap or all nibabel.Nifti1Image objects"
                )

        else:
            raise TypeError(
                f"Unsupported input type: {type(data).__name__}\n"
                "Supported types: SubjectData, VoxelMap, nibabel.Nifti1Image, "
                "list[VoxelMap], list[nibabel.Nifti1Image]"
            )

    def _run_single_image(self, img: nib.Nifti1Image) -> "ParcelData":
        """
        Run aggregation on a single nibabel image.

        This method auto-detects space and resolution from the image header,
        then runs aggregation directly without requiring a SubjectData wrapper.
        This allows processing of continuous-valued images (not just binary masks).

        Parameters
        ----------
        img : nibabel.Nifti1Image
            Input image to aggregate

        Returns
        -------
        ParcelData
            Aggregation result combining all atlas aggregations
        """
        # Load atlases using same logic as _run_voxelmap
        if not hasattr(self, "atlases") or not self.atlases:
            self.atlases = self._load_parcellations_from_registry()

        # Auto-detect space and resolution from image header
        input_space = SubjectData._detect_space_from_image(img)
        input_resolution = SubjectData._detect_resolution_from_image(img)

        # Use detected space or fall back with warning
        if input_space is None:
            input_space = "MNI152NLin6Asym"
            self.logger.warning(
                "Could not auto-detect coordinate space from image header. "
                "Assuming MNI152NLin6Asym. For explicit control, use SubjectData wrapper: "
                "SubjectData(img, space='...', resolution=...)"
            )
        if input_resolution is None:
            input_resolution = float(round(abs(img.affine[0, 0])))
            self.logger.info(
                f"Could not detect resolution from image header, using voxel size: {input_resolution}mm"
            )

        # Calculate voxel volume from source data
        voxel_volume_mm3 = np.abs(np.linalg.det(img.affine[:3, :3]))

        # Collect all ROI results across atlases
        all_roi_data = {}

        # Process each atlas
        for atlas_info in self.atlases:
            parcellation_name = atlas_info["name"]
            atlas_space = atlas_info.get("space")
            atlas_resolution = atlas_info.get("resolution")

            # Load atlas image
            atlas_img = nib.load(atlas_info["atlas_path"])

            # Transform atlas to match input data space if needed
            atlas_img = self._ensure_atlas_matches_input_space(
                atlas_img=atlas_img,
                atlas_space=atlas_space,
                atlas_resolution=atlas_resolution,
                input_space=input_space,
                input_resolution=input_resolution,
                input_affine=img.affine,
                parcellation_name=parcellation_name,
            )

            labels = atlas_info["labels"]
            atlas_data = atlas_img.get_fdata()

            if atlas_data.ndim == 3:
                # 3D integer-labeled atlas
                atlas_results = self._aggregate_3d_atlas(img, atlas_img, labels, voxel_volume_mm3)
            elif atlas_data.ndim == 4:
                # 4D probabilistic atlas
                atlas_results = self._aggregate_4d_atlas(img, atlas_img, labels, voxel_volume_mm3)
            else:
                continue

            # Merge results from this atlas
            all_roi_data.update(atlas_results)

        # Return single ParcelData with all ROI results
        from lacuna.core.data_types import ParcelData

        return ParcelData(
            name=f"{self.aggregation}_aggregation",
            data=all_roi_data,
            parcel_names=(
                self.parcel_names if self.parcel_names else [a["name"] for a in self.atlases]
            ),
            aggregation_method=self.aggregation,
            metadata={
                "source": "Nifti1Image",
                "n_regions": len(all_roi_data),
                "space": input_space,
                "resolution": input_resolution,
            },
        )

    def _run_batch_images(self, images: list[nib.Nifti1Image]) -> list["ParcelData"]:
        """
        Run aggregation on a batch of nibabel images.

        Parameters
        ----------
        images : list[nibabel.Nifti1Image]
            List of images to aggregate

        Returns
        -------
        list[ParcelData]
            List of aggregation results (one per input image)
        """
        results = []
        for img in images:
            result = self._run_single_image(img)
            results.append(result)

        return results

    def _run_voxelmap(self, voxel_map: "VoxelMap") -> "ParcelData":
        """
        Run aggregation on a VoxelMap directly.

        This bypasses SubjectData validation since VoxelMaps can contain
        continuous values (e.g., correlation maps, z-scores).

        Parameters
        ----------
        voxel_map : VoxelMap
            VoxelMap containing the data to aggregate

        Returns
        -------
        ParcelData
            Aggregation result combining all atlas aggregations
        """
        # Load atlases using same logic as _load_parcellations_from_registry
        if not hasattr(self, "atlases") or not self.atlases:
            self.atlases = self._load_parcellations_from_registry()

        # Get space and resolution from VoxelMap
        input_space = voxel_map.space
        input_resolution = voxel_map.resolution
        source_img = voxel_map.data

        # Calculate voxel volume from source data
        voxel_volume_mm3 = np.abs(np.linalg.det(source_img.affine[:3, :3]))

        # Collect all ROI results across atlases
        all_roi_data = {}

        # Process each atlas
        for atlas_info in self.atlases:
            parcellation_name = atlas_info["name"]
            atlas_space = atlas_info.get("space")
            atlas_resolution = atlas_info.get("resolution")

            # Load atlas image
            atlas_img = nib.load(atlas_info["atlas_path"])

            # Transform atlas to match input data space if needed
            atlas_img = self._ensure_atlas_matches_input_space(
                atlas_img=atlas_img,
                atlas_space=atlas_space,
                atlas_resolution=atlas_resolution,
                input_space=input_space,
                input_resolution=input_resolution,
                input_affine=source_img.affine,
                parcellation_name=parcellation_name,
            )

            labels = atlas_info["labels"]
            atlas_data = atlas_img.get_fdata()

            if atlas_data.ndim == 3:
                # 3D integer-labeled atlas
                atlas_results = self._aggregate_3d_atlas(
                    source_img, atlas_img, labels, voxel_volume_mm3
                )
            elif atlas_data.ndim == 4:
                # 4D probabilistic atlas
                atlas_results = self._aggregate_4d_atlas(
                    source_img, atlas_img, labels, voxel_volume_mm3
                )
            else:
                continue

            # Merge results from this atlas
            all_roi_data.update(atlas_results)

        # Return single ParcelData with all ROI results
        from lacuna.core.data_types import ParcelData

        return ParcelData(
            name=f"{self.aggregation}_aggregation",
            data=all_roi_data,
            parcel_names=(
                self.parcel_names if self.parcel_names else [a["name"] for a in self.atlases]
            ),
            aggregation_method=self.aggregation,
            metadata={
                "source": "VoxelMap",
                "source_name": voxel_map.name,
                "n_regions": len(all_roi_data),
                "space": input_space,
                "resolution": input_resolution,
            },
        )

    def _validate_inputs(self, mask_data: SubjectData) -> None:
        """
        Validate lesion data and load atlases from registry.

        Parameters
        ----------
        mask_data : SubjectData
            Lesion data to validate

        Raises
        ------
        ValueError
            If lesion data is invalid or source data not found
        """
        # Build list of available sources
        available = ["SubjectData.mask_img"]
        if mask_data.results:
            for analysis_name, analysis_results in mask_data.results.items():
                for key in analysis_results.keys():
                    available.append(f"{analysis_name}.{key}")

        # Validate each source exists
        missing_sources = []
        for src in self.sources:
            source_img = self._get_source_image_for_source(mask_data, src)
            if source_img is None:
                missing_sources.append(src)

        if missing_sources:
            from lacuna.utils.suggestions import format_suggestions, suggest_similar

            suggestions = []
            for missing in missing_sources:
                similar = suggest_similar(missing, available)
                if similar:
                    suggestions.extend(similar)

            error_msg = (
                f"Source data not found: {missing_sources}\n"
                "Check that the source exists in SubjectData.\n"
                f"Available sources: {', '.join(available)}"
            )
            if suggestions:
                error_msg += f"\n\nDid you mean: {format_suggestions(suggestions)}?"

            raise ValueError(error_msg)

        # Load atlases from registry (thread-safe)
        # Use lock to prevent race condition where multiple threads
        # simultaneously check 'if not self.atlases' and all try to load
        with self._atlases_lock:
            if not self.atlases:
                self.atlases = self._load_parcellations_from_registry()

        if not self.atlases:
            if self.parcel_names is not None:
                raise ValueError(
                    f"No matching parcellations found for specified names: {self.parcel_names}\n"
                    "Available parcellations in registry: check list_parcellations()\n"
                    "Use register_parcellation() or register_parcellationes_from_directory() to add atlases"
                )
            else:
                raise ValueError(
                    "No valid parcellations found in registry\n"
                    "Use register_parcellation() or register_parcellationes_from_directory() to add atlases"
                )

        # Warn if some requested atlases weren't found
        if self.parcel_names is not None:
            found_names = {atlas["name"] for atlas in self.atlases}
            missing_names = set(self.parcel_names) - found_names
            if missing_names:
                self.logger.warning(
                    f"Some requested parcellations were not found: {sorted(missing_names)}. "
                    f"Found: {sorted(found_names)}"
                )

    def _load_parcellations_from_registry(self) -> list[dict]:
        """
        Load atlases from the registry (bundled or user-registered).

        Returns
        -------
        list[dict]
            List of atlas dictionaries with keys: name, image, labels, space, resolution
        """
        from lacuna.assets.parcellations.loader import BUNDLED_PARCELLATIONS_DIR

        # Get atlases from registry (filter by names if provided)
        if self.parcel_names is not None:
            # Load specific atlases by name
            atlases_data = []
            for name in self.parcel_names:
                try:
                    atlas = load_parcellation(name)

                    # Resolve paths (absolute or relative to bundled dir)
                    atlas_filename_path = Path(atlas.metadata.parcellation_filename)
                    if atlas_filename_path.is_absolute():
                        atlas_path = atlas_filename_path
                    else:
                        atlas_path = (
                            BUNDLED_PARCELLATIONS_DIR / atlas.metadata.parcellation_filename
                        )

                    labels_filename_path = Path(atlas.metadata.labels_filename)
                    if labels_filename_path.is_absolute():
                        labels_path = labels_filename_path
                    else:
                        labels_path = BUNDLED_PARCELLATIONS_DIR / atlas.metadata.labels_filename

                    atlases_data.append(
                        {
                            "name": name,
                            "atlas_path": atlas_path,
                            "labels_path": labels_path,
                            "labels": atlas.labels,
                            "space": atlas.metadata.space,
                            "resolution": atlas.metadata.resolution,
                            "is_4d": getattr(atlas.metadata, "is_4d", False),
                        }
                    )
                except KeyError:
                    # Atlas not in registry - will be caught by validation
                    pass
        else:
            # Load all registered atlases
            atlas_metadatas = list_parcellations()
            atlases_data = []
            for metadata in atlas_metadatas:
                atlas = load_parcellation(metadata.name)

                # Resolve paths (absolute or relative to bundled dir)
                atlas_filename_path = Path(atlas.metadata.parcellation_filename)
                if atlas_filename_path.is_absolute():
                    atlas_path = atlas_filename_path
                else:
                    atlas_path = BUNDLED_PARCELLATIONS_DIR / atlas.metadata.parcellation_filename

                labels_filename_path = Path(atlas.metadata.labels_filename)
                if labels_filename_path.is_absolute():
                    labels_path = labels_filename_path
                else:
                    labels_path = BUNDLED_PARCELLATIONS_DIR / atlas.metadata.labels_filename

                atlases_data.append(
                    {
                        "name": metadata.name,
                        "atlas_path": atlas_path,
                        "labels_path": labels_path,
                        "labels": atlas.labels,
                        "space": metadata.space,
                        "resolution": metadata.resolution,
                        "is_4d": getattr(metadata, "is_4d", False),
                    }
                )

        return atlases_data

    def _ensure_atlas_matches_input_space(
        self,
        atlas_img: nib.Nifti1Image,
        atlas_space: str,
        atlas_resolution: int,
        input_space: str,
        input_resolution: int,
        input_affine: np.ndarray,
        parcellation_name: str | None = None,
    ) -> nib.Nifti1Image:
        """
        Transform atlas to match input data space if spaces don't match.

        This allows ParcelAggregation to work with any voxel-level image,
        not just lesion data, by transforming the atlas to the input space.

        Parameters
        ----------
        atlas_img : nib.Nifti1Image
            Atlas image to potentially transform
        atlas_space : str
            Atlas coordinate space (e.g., 'MNI152NLin6Asym')
        atlas_resolution : int
            Atlas resolution in mm (e.g., 1 or 2)
        input_space : str
            Input data coordinate space
        input_resolution : int
            Input data resolution in mm
        input_affine : np.ndarray
            Input data affine matrix

        Returns
        -------
        nib.Nifti1Image
            Atlas in input space (transformed if needed, original if already matching)
        """
        # If atlas doesn't specify space, assume it matches
        if atlas_space is None:
            return atlas_img

        # Validate declared space against image header (affine and shape)
        from lacuna.core.spaces import (
            REFERENCE_SHAPES,
            detect_space_from_header,
            spaces_are_equivalent,
        )

        detected = detect_space_from_header(atlas_img)
        if detected is None:
            # Affine check failed (e.g. flipped data strides) — fall back to
            # shape + voxel-size matching, which is orientation-independent.
            img_shape = atlas_img.shape[:3]
            voxel_size = round(float(atlas_img.header.get_zooms()[0]), 1)
            shape_to_space = {
                shape: space
                for (space, res), shape in REFERENCE_SHAPES.items()
                if res == voxel_size
            }
            if img_shape in shape_to_space:
                detected = (shape_to_space[img_shape], voxel_size)

        if detected is not None:
            detected_space, _ = detected
            if not spaces_are_equivalent(detected_space, atlas_space):
                raise ValueError(
                    f"Parcellation '{parcellation_name}': declared space is "
                    f"'{atlas_space}' but image header matches "
                    f"'{detected_space}'. Check that the correct coordinate "
                    f"space was specified for this atlas."
                )

        if spaces_are_equivalent(atlas_space, input_space):
            # Same space or equivalent alias - no coordinate transformation needed
            # (nilearn will handle resolution resampling during aggregation)
            return atlas_img

        # Need to transform atlas to input space
        from lacuna.core.spaces import CoordinateSpace
        from lacuna.spatial.transform import transform_image

        # Create target space matching input data
        target_space = CoordinateSpace(
            identifier=input_space,
            resolution=input_resolution,
            reference_affine=input_affine,
        )

        # Transform atlas using nearest neighbor to preserve labels
        # Logging is handled by transform_image
        return transform_image(
            img=atlas_img,
            source_space=atlas_space,
            target_space=target_space,
            source_resolution=atlas_resolution,
            interpolation="nearest",  # Preserve integer labels
            image_name=f"atlas '{parcellation_name}'" if parcellation_name else "atlas",
            verbose=self.verbose,
        )

    def _run_analysis(self, mask_data: SubjectData) -> dict[str, "DataContainer"]:
        """
        Compute ROI-level aggregation for all atlases and sources.

        Parameters
        ----------
        mask_data : SubjectData
            Validated lesion data

        Returns
        -------
        dict[str, DataContainer]
            Dictionary mapping BIDS-style keys to ParcelData objects.
            Keys follow the pattern: parc-{atlas}_source-{SourceClass}_desc-{key}
        """
        # Log analysis start with atlas names
        n_atlases = len(self.atlases) if hasattr(self, "atlases") and self.atlases else 0
        n_sources = len(self.sources)
        atlas_names = [a["name"] for a in self.atlases] if self.atlases else []
        self.logger.info(
            f"Aggregating {n_sources} source(s) across {n_atlases} atlas(es): {', '.join(atlas_names)}"
        )

        # Get input data space/resolution once
        input_space = mask_data.space
        input_resolution = mask_data.resolution

        # Collect results with BIDS-style keys
        all_results: dict[str, DataContainer] = {}

        # Process each source
        for source in self.sources:
            # Parse source string to extract source class and key
            if "." in source:
                # Cross-analysis source: "AnalysisName.result_key"
                source_class, source_key = source.split(".", 1)
            else:
                # Direct source: "maskimg" -> from SubjectData
                source_class = "SubjectData"
                source_key = source

            # Get source image for this source
            source_img = self._get_source_image_for_source(mask_data, source)

            # Calculate voxel volume from source data
            voxel_volume_mm3 = np.abs(np.linalg.det(source_img.affine[:3, :3]))

            # Process each atlas
            for atlas_info in self.atlases:
                parcellation_name = atlas_info["name"]
                atlas_space = atlas_info.get("space")
                atlas_resolution = atlas_info.get("resolution")

                # Load and transform atlas (cached across subjects)
                cache_key = (
                    atlas_info["atlas_path"],
                    input_space,
                    input_resolution,
                )
                if cache_key in self._atlas_cache:
                    atlas_img = self._atlas_cache[cache_key]
                else:
                    atlas_img = nib.load(atlas_info["atlas_path"])
                    atlas_img = self._ensure_atlas_matches_input_space(
                        atlas_img=atlas_img,
                        atlas_space=atlas_space,
                        atlas_resolution=atlas_resolution,
                        input_space=input_space,
                        input_resolution=input_resolution,
                        input_affine=source_img.affine,
                        parcellation_name=parcellation_name,
                    )
                    self._atlas_cache[cache_key] = atlas_img

                # Store warped atlas as intermediate if requested
                if self.keep_intermediate:
                    from lacuna.core.data_types import VoxelMap

                    # Build unique key for this atlas + source combination
                    intermediate_key = f"warped_atlas_{parcellation_name}_{source_key}"
                    warped_atlas = VoxelMap(
                        name=f"warped_{parcellation_name}",
                        data=atlas_img,
                        space=input_space,
                        resolution=input_resolution,
                        metadata={
                            "original_space": atlas_space,
                            "original_resolution": atlas_resolution,
                            "parcellation_name": parcellation_name,
                            "source": source,
                            "description": (
                                f"Atlas '{parcellation_name}' transformed from "
                                f"{atlas_space}@{atlas_resolution}mm to "
                                f"{input_space}@{input_resolution}mm"
                            ),
                        },
                    )
                    all_results[intermediate_key] = warped_atlas

                labels = atlas_info["labels"]
                atlas_data = atlas_img.get_fdata()

                # Warn if nilearn will resample atlas to match source resolution
                atlas_shape = atlas_data.shape[:3]  # Handle 4D atlases
                source_shape = source_img.get_fdata().shape
                if source_shape != atlas_shape:
                    self.logger.info(
                        f"Resampling parcellation '{parcellation_name}' to match source data "
                        f"(source: {source_shape}, parcellation: {atlas_shape})"
                    )

                if atlas_data.ndim == 3:
                    # 3D integer-labeled atlas - use nilearn NiftiLabelsMasker
                    atlas_results = self._aggregate_3d_atlas(
                        source_img, atlas_img, labels, voxel_volume_mm3
                    )
                elif atlas_data.ndim == 4:
                    # 4D probabilistic atlas - use nilearn resampling
                    atlas_results = self._aggregate_4d_atlas(
                        source_img, atlas_img, labels, voxel_volume_mm3
                    )
                else:
                    self.logger.warning(
                        f"Skipping parcellation '{parcellation_name}': "
                        f"unexpected dimensions {atlas_data.ndim}D"
                    )
                    continue

                # Create ParcelData for this atlas + source combination
                roi_result = ParcelData(
                    name=parcellation_name,
                    data=atlas_results,
                    parcel_names=[parcellation_name],
                    aggregation_method=self.aggregation,
                    metadata={
                        "source": source,
                        "source_class": source_class,
                        "source_key": source_key,
                        "n_regions": len(atlas_results),
                    },
                )

                # Build BIDS-style result key
                result_key = build_result_key(
                    atlas=parcellation_name,
                    source=source_class,
                    desc=source_key,
                )

                all_results[result_key] = roi_result

        self.logger.success(f"Aggregation complete ({len(all_results)} results)")
        return all_results

    def _aggregate_3d_atlas(
        self,
        source_img: nib.Nifti1Image,
        atlas_img: nib.Nifti1Image,
        labels: dict[int, str],
        voxel_volume_mm3: float,
    ) -> dict[str, float]:
        """
        Aggregate source data for 3D integer-labeled atlas using nilearn.

        Uses nilearn's NiftiLabelsMasker for robust extraction with automatic
        resampling, masking, and efficient computation.

        Note: Suppresses nilearn's verbose label removal warnings when verbose is False.

        Parameters
        ----------
        source_img : nib.Nifti1Image
            Source image to aggregate
        atlas_img : nib.Nifti1Image
            3D atlas with integer labels
        labels : dict[int, str]
            Mapping from region ID to region name
        voxel_volume_mm3 : float
            Volume of one voxel in mm³ (for volume aggregation)

        Returns
        -------
        dict[str, float]
            Mapping from region name to aggregated value
        """
        import warnings

        # Suppress nilearn's verbose label removal warnings unless in verbose mode
        # These warnings come from sklearn's set_output and are too verbose for standard use
        if not self.verbose:
            warnings.filterwarnings(
                "ignore",
                message=".*following labels were removed.*",
                category=UserWarning,
                module="sklearn",
            )

        # Map our aggregation methods to nilearn strategies
        strategy_map = {
            "mean": "mean",
            "sum": "sum",
            "median": "median",
            "std": "standard_deviation",
            "percent": "mean",  # Will multiply by 100
            "volume": "sum",  # Will multiply by voxel_volume_mm3
        }

        if self.aggregation not in strategy_map:
            raise ValueError(f"Unknown aggregation method: {self.aggregation}")

        strategy = strategy_map[self.aggregation]

        # Create label names list (NiftiLabelsMasker expects ordered list)
        # Background (0) should not be included
        atlas_data = atlas_img.get_fdata()

        # Round atlas values to ensure integer labels
        # This handles edge cases where resampling or data type conversion
        # might introduce small floating point values
        atlas_data_rounded = np.round(atlas_data).astype(int)

        region_ids = np.unique(atlas_data_rounded)
        region_ids = region_ids[
            region_ids > 0
        ]  # Exclude background        # Build ordered list of label names
        label_names = [labels.get(int(rid), f"Region{int(rid)}") for rid in sorted(region_ids)]

        # Initialize NiftiLabelsMasker with appropriate settings
        masker = NiftiLabelsMasker(
            labels_img=atlas_img,
            labels=label_names,
            background_label=0,
            strategy=strategy,
            resampling_target="data",  # Resample atlas to match source data
            standardize=False,  # Don't normalize for static maps
            detrend=False,  # No detrending for static maps
            memory=None,  # No caching for now
            verbose=0,
            keep_masked_labels=False,  # Remove empty region signals (future nilearn default)
        )

        # Extract values - nilearn expects 4D input (add time dimension if needed)
        if source_img.ndim == 3:
            # Add a dummy 4th dimension for time
            source_data_4d = source_img.get_fdata()[..., np.newaxis]
            source_img_4d = nib.Nifti1Image(source_data_4d, source_img.affine)
        else:
            source_img_4d = source_img

        # Transform: returns (n_timepoints, n_regions) array
        region_values = masker.fit_transform(source_img_4d)

        # Squeeze to get (n_regions,) for single timepoint
        if region_values.shape[0] == 1:
            region_values = region_values.squeeze(axis=0)

        # Apply post-processing based on aggregation type
        if self.aggregation == "percent":
            # Convert mean (0-1) to percentage (0-100)
            region_values = region_values * 100
        elif self.aggregation == "volume":
            # Convert count to volume (mm³)
            region_values = region_values * voxel_volume_mm3

        # Build results dict
        # Note: region_values length might not match label_names if regions are lost during resampling
        # We zip without strict=True and handle the mismatch
        results = {}
        for i, value in enumerate(region_values):
            if i < len(label_names):
                label_name = label_names[i]
            else:
                # Fallback if we get more regions than expected
                label_name = f"Region{i}"
            results[label_name] = float(value)

        return results

    def _aggregate_4d_atlas(
        self,
        source_img: nib.Nifti1Image,
        atlas_img: nib.Nifti1Image,
        labels: dict[int, str],
        voxel_volume_mm3: float,
    ) -> dict[str, float]:
        """
        Aggregate source data for 4D atlas with automatic resampling.

        Uses nilearn's resample_to_img for automatic spatial alignment of atlas
        to source data. Detects binary vs probabilistic atlases and uses appropriate
        interpolation method ('nearest' for binary, 'continuous' for probabilistic).

        Parameters
        ----------
        source_img : nib.Nifti1Image
            Source image to aggregate
        atlas_img : nib.Nifti1Image
            4D atlas (x, y, z, n_regions) with binary or probability maps
        labels : dict[int, str]
            Mapping from region ID to region name
        voxel_volume_mm3 : float
            Volume of one voxel in mm³ (for volume aggregation)

        Returns
        -------
        dict[str, float]
            Mapping from region name to aggregated value
        """
        # Detect if atlas is binary (only 0s and 1s) or probabilistic
        atlas_data_orig = atlas_img.get_fdata()
        unique_values = np.unique(atlas_data_orig)
        is_binary = np.all(np.isin(unique_values, [0.0, 1.0]))

        # Use appropriate interpolation based on atlas type
        # 'nearest' for binary to preserve 0/1 values
        # 'continuous' for probabilistic to interpolate between probability values
        interpolation = "nearest" if is_binary else "continuous"

        # Resample atlas to match source data spatial resolution
        atlas_resampled = resample_to_img(
            atlas_img,
            source_img,
            interpolation=interpolation,
            copy=True,
            force_resample=True,
            copy_header=True,
        )

        source_data = source_img.get_fdata()
        atlas_data = atlas_resampled.get_fdata()

        results = {}
        n_regions = atlas_data.shape[3]

        # Get sorted label IDs to map volume indices to label IDs
        # Volume index i corresponds to the i-th label ID in sorted order
        sorted_label_ids = sorted(labels.keys())

        # Validate that we have the right number of labels
        if len(sorted_label_ids) != n_regions:
            self.logger.warning(
                f"Number of volumes ({n_regions}) does not match number of labels "
                f"({len(sorted_label_ids)}). Using available labels."
            )

        for region_idx in range(n_regions):
            # Get probability map for this region
            prob_map = atlas_data[:, :, :, region_idx]

            # Create binary mask from non-zero probability values
            region_mask = prob_map > 0

            # Get values in this region
            region_values = source_data[region_mask]

            # Compute aggregation
            value = self._compute_aggregation(region_values, region_mask, voxel_volume_mm3)

            # Map volume index to label ID using sorted label IDs
            # Volume 0 → sorted_label_ids[0] (could be 0, 1, or any starting ID)
            # Volume 1 → sorted_label_ids[1], etc.
            if region_idx < len(sorted_label_ids):
                region_id = sorted_label_ids[region_idx]
                region_name = labels[region_id]
            else:
                # Fallback if more volumes than labels
                region_name = f"Region{region_idx}"

            results[region_name] = value

        return results

    def _compute_aggregation(
        self,
        region_values: np.ndarray,
        region_mask: np.ndarray,
        voxel_volume_mm3: float,
    ) -> float:
        """
        Compute specified aggregation method on region values.

        Parameters
        ----------
        region_values : np.ndarray
            Values within the region
        region_mask : np.ndarray
            Boolean mask for the region
        voxel_volume_mm3 : float
            Volume of one voxel in mm³

        Returns
        -------
        float
            Aggregated value
        """
        if len(region_values) == 0:
            return 0.0

        if self.aggregation == "mean":
            return float(np.mean(region_values))

        elif self.aggregation == "sum":
            return float(np.sum(region_values))

        elif self.aggregation == "median":
            return float(np.median(region_values))

        elif self.aggregation == "std":
            return float(np.std(region_values))

        elif self.aggregation == "percent":
            # Percentage of ROI voxels that are non-zero
            n_total = np.sum(region_mask)
            n_nonzero = np.sum(region_values > 0)
            return (n_nonzero / n_total * 100) if n_total > 0 else 0.0

        elif self.aggregation == "volume":
            # Volume of non-zero voxels in mm³
            n_nonzero = np.sum(region_values > 0)
            return n_nonzero * voxel_volume_mm3

        else:
            raise ValueError(f"Unknown aggregation method: {self.aggregation}")

    def _get_source_image_for_source(
        self, mask_data: SubjectData, source: str
    ) -> nib.Nifti1Image | None:
        """
        Get source image from SubjectData for a specific source string.

        Parameters
        ----------
        mask_data : SubjectData
            Lesion data containing source
        source : str
            Source specification (e.g., "SubjectData.mask_img", "FunctionalNetworkMapping.correlation_map")

        Returns
        -------
        nib.Nifti1Image or None
            Source image, or None if not found
        """
        # Handle "SubjectData.mask_img" or just "maskimg"
        if source == "maskimg" or source == "SubjectData.mask_img":
            return mask_data.mask_img

        # Result from previous analysis: "AnalysisName.result_key"
        if "." in source:
            analysis_name, result_key = source.split(".", 1)

            # Handle SubjectData prefix
            if analysis_name == "SubjectData":
                if result_key == "maskimg":
                    return mask_data.mask_img
                return None

            if analysis_name in mask_data.results:
                analysis_results = mask_data.results[analysis_name]

                if result_key in analysis_results:
                    result = analysis_results[result_key]

                    # If it's a NIfTI image, return it
                    if isinstance(result, nib.Nifti1Image):
                        return result

                    # If it's a VoxelMap, return the underlying image
                    from lacuna.core.data_types import VoxelMap

                    if isinstance(result, VoxelMap):
                        return result.data

                    # If it's a path, load it
                    if isinstance(result, (str, Path)):
                        result_path = Path(result)
                        if result_path.exists():
                            return nib.load(result_path)

        return None

    def _get_source_image(self, mask_data: SubjectData) -> nib.Nifti1Image | None:
        """
        Get source image from SubjectData based on first source in sources list.

        This is a compatibility method for single-source usage.

        Parameters
        ----------
        mask_data : SubjectData
            Lesion data containing source

        Returns
        -------
        nib.Nifti1Image or None
            Source image, or None if not found
        """
        if self.sources:
            return self._get_source_image_for_source(mask_data, self.sources[0])
        return None

    def _get_parameters(self) -> dict:
        """Get analysis parameters for provenance and display.

        Returns
        -------
        dict
            Dictionary of parameter names and values.
        """
        return {
            "source": self.source,
            "aggregation": self.aggregation,
            "parcel_names": self.parcel_names,
            "num_atlases": len(self.atlases) if hasattr(self, "atlases") else None,
            "verbose": self.verbose,
        }

__getstate__()

Exclude non-picklable lock from serialization for multiprocessing.

Source code in src/lacuna/analysis/parcel_aggregation.py
def __getstate__(self):
    """Exclude non-picklable lock from serialization for multiprocessing."""
    state = self.__dict__.copy()
    # Remove the lock - it can't be pickled
    state.pop("_atlases_lock", None)
    # Don't serialize the atlas cache - rebuild in new process
    state.pop("_atlas_cache", None)
    return state

__init__(source='maskimg', aggregation='mean', parcel_names=None, verbose=False, keep_intermediate=False)

Initialize ParcelAggregation analysis.

Parameters:

Name Type Description Default
source str or list[str] or dict

Source of data to aggregate.

"maskimg"
aggregation str

Aggregation method to use.

"mean"
parcel_names list of str or None

Names of atlases from the registry to process.

None
verbose bool

If True, print progress messages.

False
keep_intermediate bool

If True, include intermediate results (e.g., warped mask images) in the output. Useful for debugging and quality control.

False
Source code in src/lacuna/analysis/parcel_aggregation.py
def __init__(
    self,
    source: str | list[str] | dict[str, str | list[str]] = "maskimg",
    aggregation: str = "mean",
    parcel_names: list[str] | None = None,
    verbose: bool = False,
    keep_intermediate: bool = False,
):
    """Initialize ParcelAggregation analysis.

    Parameters
    ----------
    source : str or list[str] or dict, default="maskimg"
        Source of data to aggregate.
    aggregation : str, default="mean"
        Aggregation method to use.
    parcel_names : list of str or None, default=None
        Names of atlases from the registry to process.
    verbose : bool, default=False
        If True, print progress messages.
    keep_intermediate : bool, default=False
        If True, include intermediate results (e.g., warped mask images)
        in the output. Useful for debugging and quality control.
    """
    super().__init__(verbose=verbose, keep_intermediate=keep_intermediate)

    # Initialize logger for warnings and info messages
    from lacuna.utils.logging import ConsoleLogger

    self.logger = ConsoleLogger(verbose=verbose)

    # Normalize and validate source parameter
    self.sources = self._normalize_sources(source)
    self.source = source  # Keep original for compatibility
    self.aggregation = aggregation
    self.parcel_names = parcel_names

    # Validate aggregation method
    if aggregation not in self.VALID_AGGREGATIONS:
        from lacuna.utils.suggestions import format_suggestions, suggest_similar

        suggestions = suggest_similar(aggregation, list(self.VALID_AGGREGATIONS))
        hint = format_suggestions(suggestions)
        msg = (
            f"Invalid aggregation method: '{aggregation}'\n"
            f"Valid options: {', '.join(self.VALID_AGGREGATIONS)}"
        )
        if hint:
            msg = f"{msg}\n{hint}"
        raise ValueError(msg)

    # Threshold validation removed - accepts any float value (T061)
    # This allows for flexible thresholding (e.g., negative z-scores, arbitrary cutoffs)

    # Validate parcel_names if provided
    if parcel_names is not None:
        if not isinstance(parcel_names, list):
            raise TypeError(
                f"parcel_names must be a list of strings or None, got {type(parcel_names).__name__}"
            )
        if not all(isinstance(name, str) for name in parcel_names):
            raise TypeError("All items in parcel_names must be strings")
        if not parcel_names:
            raise ValueError(
                "parcel_names cannot be an empty list (use None to process all atlases)"
            )

    # Will be populated in _validate_inputs (thread-safe)
    self.atlases = []
    self._atlases_lock = threading.Lock()
    # Cache for loaded+transformed atlas images, keyed by
    # (atlas_path, input_space, input_resolution) to avoid redundant
    # disk I/O and spatial transformations across subjects
    self._atlas_cache: dict[tuple, nib.Nifti1Image] = {}

__setstate__(state)

Recreate lock after unpickling for multiprocessing.

Source code in src/lacuna/analysis/parcel_aggregation.py
def __setstate__(self, state):
    """Recreate lock after unpickling for multiprocessing."""
    self.__dict__.update(state)
    # Recreate the lock in the new process
    self._atlases_lock = threading.Lock()
    # Recreate empty atlas cache in the new process
    self._atlas_cache = {}

run(data)

Execute atlas aggregation analysis on various input types.

Supports flexible input types with matching return types: - SubjectData -> SubjectData (with results attached) - nibabel.Nifti1Image -> ParcelData - list[nibabel.Nifti1Image] -> list[ParcelData]

Parameters:

Name Type Description Default
data SubjectData or Nifti1Image or list[Nifti1Image]

Input data to aggregate: - SubjectData: Standard workflow, returns SubjectData with results - nibabel.Nifti1Image: Single image, returns ParcelData - list[nibabel.Nifti1Image]: Batch processing, returns list of results

required

Returns:

Type Description
SubjectData or ParcelData or list[ParcelData]

Results matching input type: - SubjectData input: New SubjectData instance with results in .results dict - nibabel input: Single ParcelData - list input: List of ParcelData objects (one per input image)

Raises:

Type Description
ValueError

If input validation fails or source data not found.

TypeError

If input type is not supported.

Notes

This method overrides BaseAnalysis.run() to support flexible input types. The base class run() is designed for SubjectData only.

Examples:

>>> # SubjectData input
>>> mask_data = SubjectData(mask_img, space='MNI152NLin6Asym', resolution=2)
>>> analysis = ParcelAggregation(aggregation='percent')
>>> result = analysis.run(mask_data)
>>> isinstance(data, SubjectData)
True
>>> # Nibabel image input
>>> import nibabel as nib
>>> img = nib.load('mask.nii.gz')
>>> result = analysis.run(img)
>>> isinstance(result, ParcelData)
True
>>> # List of images
>>> images = [nib.load(f'mask_{i}.nii.gz') for i in range(5)]
>>> results = analysis.run(images)
>>> len(results) == 5
True
Source code in src/lacuna/analysis/parcel_aggregation.py
def run(
    self, data: "SubjectData | nib.Nifti1Image | list[nib.Nifti1Image]"
) -> "SubjectData | ParcelData | list[ParcelData]":
    """
    Execute atlas aggregation analysis on various input types.

    Supports flexible input types with matching return types:
    - SubjectData -> SubjectData (with results attached)
    - nibabel.Nifti1Image -> ParcelData
    - list[nibabel.Nifti1Image] -> list[ParcelData]

    Parameters
    ----------
    data : SubjectData or nibabel.Nifti1Image or list[nibabel.Nifti1Image]
        Input data to aggregate:
        - SubjectData: Standard workflow, returns SubjectData with results
        - nibabel.Nifti1Image: Single image, returns ParcelData
        - list[nibabel.Nifti1Image]: Batch processing, returns list of results

    Returns
    -------
    SubjectData or ParcelData or list[ParcelData]
        Results matching input type:
        - SubjectData input: New SubjectData instance with results in .results dict
        - nibabel input: Single ParcelData
        - list input: List of ParcelData objects (one per input image)

    Raises
    ------
    ValueError
        If input validation fails or source data not found.
    TypeError
        If input type is not supported.

    Notes
    -----
    This method overrides BaseAnalysis.run() to support flexible input types.
    The base class run() is designed for SubjectData only.

    Examples
    --------
    >>> # SubjectData input
    >>> mask_data = SubjectData(mask_img, space='MNI152NLin6Asym', resolution=2)
    >>> analysis = ParcelAggregation(aggregation='percent')
    >>> result = analysis.run(mask_data)
    >>> isinstance(data, SubjectData)
    True

    >>> # Nibabel image input
    >>> import nibabel as nib
    >>> img = nib.load('mask.nii.gz')
    >>> result = analysis.run(img)
    >>> isinstance(result, ParcelData)
    True

    >>> # List of images
    >>> images = [nib.load(f'mask_{i}.nii.gz') for i in range(5)]
    >>> results = analysis.run(images)
    >>> len(results) == 5
    True
    """
    from lacuna.core.data_types import VoxelMap

    # Detect input type and delegate to appropriate handler
    if isinstance(data, SubjectData):
        # Standard SubjectData workflow - use base class run()
        return super().run(data)

    elif isinstance(data, VoxelMap):
        # VoxelMap - run directly without SubjectData wrapper
        return self._run_voxelmap(data)

    elif isinstance(data, nib.Nifti1Image):
        # Single nibabel image - return ParcelData
        return self._run_single_image(data)

    elif isinstance(data, list):
        # List of images or VoxelMaps - return list of results
        if not data:
            raise ValueError("Empty list provided - at least one image required")

        # Check if all are VoxelMaps or all are Images
        if all(isinstance(item, VoxelMap) for item in data):
            # Process VoxelMaps directly
            return [self._run_voxelmap(vm) for vm in data]

        elif all(isinstance(img, nib.Nifti1Image) for img in data):
            return self._run_batch_images(data)

        else:
            raise TypeError(
                "When providing a list, all items must be of the same type: "
                "either all VoxelMap or all nibabel.Nifti1Image objects"
            )

    else:
        raise TypeError(
            f"Unsupported input type: {type(data).__name__}\n"
            "Supported types: SubjectData, VoxelMap, nibabel.Nifti1Image, "
            "list[VoxelMap], list[nibabel.Nifti1Image]"
        )