Skip to content

Utilities API

pictologics.utilities.dicom_database

DICOM Database Module

This module provides dataclass-based hierarchical organization of DICOM files with completeness validation and multi-level DataFrame exports.

The implementation supports parallel processing for improved performance on large datasets, with stateless file processing and immutable intermediate results.

DicomInstance dataclass

Represents a single DICOM file/instance.

Attributes:

Name Type Description
sop_instance_uid str

Unique identifier for this instance.

file_path Path

Absolute path to the DICOM file.

instance_number Optional[int]

Instance number within the series.

image_position_patient Optional[tuple[float, float, float]]

(x, y, z) position in patient coordinates.

image_orientation_patient Optional[tuple[float, ...]]

Direction cosines for row and column.

slice_location Optional[float]

Slice location value from DICOM header.

acquisition_datetime Optional[str]

Combined acquisition date and time.

projection_score Optional[float]

Calculated projection onto slice normal for sorting.

metadata dict[str, Any]

Additional extracted metadata tags.

Source code in pictologics/utilities/dicom_database.py
@dataclass
class DicomInstance:
    """Represents a single DICOM file/instance.

    Attributes:
        sop_instance_uid: Unique identifier for this instance.
        file_path: Absolute path to the DICOM file.
        instance_number: Instance number within the series.
        image_position_patient: (x, y, z) position in patient coordinates.
        image_orientation_patient: Direction cosines for row and column.
        slice_location: Slice location value from DICOM header.
        acquisition_datetime: Combined acquisition date and time.
        projection_score: Calculated projection onto slice normal for sorting.
        metadata: Additional extracted metadata tags.
    """

    sop_instance_uid: str
    file_path: Path
    instance_number: Optional[int] = None
    image_position_patient: Optional[tuple[float, float, float]] = None
    image_orientation_patient: Optional[tuple[float, ...]] = None
    slice_location: Optional[float] = None
    acquisition_datetime: Optional[str] = None
    projection_score: Optional[float] = None
    metadata: dict[str, Any] = field(default_factory=dict)
    tags: dict[str, Any] = field(default_factory=dict)

DicomSeries dataclass

Represents a DICOM series containing multiple instances.

Attributes:

Name Type Description
series_instance_uid str

Unique identifier for this series.

series_number Optional[int]

Series number within the study.

series_description Optional[str]

Description of the series.

modality Optional[str]

Imaging modality (CT, MR, etc.).

frame_of_reference_uid Optional[str]

Frame of reference UID.

instances list[DicomInstance]

List of DicomInstance objects in this series.

common_metadata dict[str, Any]

Metadata tags identical across all instances.

Source code in pictologics/utilities/dicom_database.py
@dataclass
class DicomSeries:
    """Represents a DICOM series containing multiple instances.

    Attributes:
        series_instance_uid: Unique identifier for this series.
        series_number: Series number within the study.
        series_description: Description of the series.
        modality: Imaging modality (CT, MR, etc.).
        frame_of_reference_uid: Frame of reference UID.
        instances: List of DicomInstance objects in this series.
        common_metadata: Metadata tags identical across all instances.
    """

    series_instance_uid: str
    series_number: Optional[int] = None
    series_description: Optional[str] = None
    modality: Optional[str] = None
    frame_of_reference_uid: Optional[str] = None
    instances: list[DicomInstance] = field(default_factory=list)
    common_metadata: dict[str, Any] = field(default_factory=dict)

    def get_sorted_instances(self) -> list[DicomInstance]:
        """Return instances sorted by spatial position (projection score).

        Uses the same methodology as pictologics.loader for spatial sorting.
        Falls back to instance number if projection scores are not available.
        """
        if all(inst.projection_score is not None for inst in self.instances):
            return sorted(self.instances, key=lambda x: x.projection_score or 0)
        return sorted(
            self.instances, key=lambda x: x.instance_number if x.instance_number else 0
        )

    def check_completeness(self, spacing_tolerance: float = 0.1) -> dict[str, Any]:
        """Check if the series has all expected slices.

        Uses geometric validation based on ImagePositionPatient projection
        to detect missing slices and gaps.

        Args:
            spacing_tolerance: Tolerance for gap detection (default 10%).

        Returns:
            Dictionary with completeness information.
        """
        result: dict[str, Any] = {
            "series_uid": self.series_instance_uid,
            "total_instances": len(self.instances),
            "expected_instances": len(self.instances),
            "is_complete": True,
            "has_gaps": False,
            "gap_indices": [],
            "gap_positions": [],
            "spacing_mm": None,
            "spacing_std": None,
            "spacing_uniform": True,
            "first_slice_position": None,
            "last_slice_position": None,
            "frame_of_reference_uid": self.frame_of_reference_uid,
            "warnings": [],
        }

        if len(self.instances) < 2:
            result["warnings"].append("Series has fewer than 2 instances")
            return result

        # Get sorted instances by projection score
        sorted_instances = self.get_sorted_instances()

        # Check if we have projection scores for geometric validation
        projection_scores = [
            inst.projection_score
            for inst in sorted_instances
            if inst.projection_score is not None
        ]

        if len(projection_scores) < 2:
            result["warnings"].append(
                "Insufficient spatial data for geometric completeness check"
            )
            # Fall back to instance number validation
            instance_numbers = [
                inst.instance_number
                for inst in sorted_instances
                if inst.instance_number is not None
            ]
            if len(instance_numbers) >= 2:
                instance_numbers_sorted = sorted(instance_numbers)
                expected_range = set(
                    range(instance_numbers_sorted[0], instance_numbers_sorted[-1] + 1)
                )
                missing = expected_range - set(instance_numbers)
                if missing:
                    result["is_complete"] = False
                    result["has_gaps"] = True
                    result["gap_indices"] = sorted(missing)
                    result["expected_instances"] = len(expected_range)
            return result

        # Calculate spacings between consecutive slices
        spacings = np.diff(projection_scores)
        median_spacing = float(np.median(np.abs(spacings)))
        spacing_std = float(np.std(np.abs(spacings)))

        result["spacing_mm"] = median_spacing
        result["spacing_std"] = spacing_std
        result["first_slice_position"] = projection_scores[0]
        result["last_slice_position"] = projection_scores[-1]

        # Check for uniform spacing
        if median_spacing > 0:
            spacing_cv = spacing_std / median_spacing  # Coefficient of variation
            result["spacing_uniform"] = spacing_cv < spacing_tolerance

        # Detect gaps (spacing significantly larger than expected)
        gap_threshold = median_spacing * (1 + spacing_tolerance)
        gap_indices = []
        gap_positions = []

        for i, spacing in enumerate(np.abs(spacings)):
            if spacing > gap_threshold * 1.5:  # Gap detected
                gap_indices.append(i + 1)
                gap_positions.append(projection_scores[i])

        if gap_indices:
            result["has_gaps"] = True
            result["gap_indices"] = gap_indices
            result["gap_positions"] = gap_positions

            # Estimate expected instances based on position range and median spacing
            position_range = abs(projection_scores[-1] - projection_scores[0])
            if median_spacing > 0:
                expected = int(round(position_range / median_spacing)) + 1
                result["expected_instances"] = expected
                result["is_complete"] = False

        return result

    def get_instance_uids(self) -> list[str]:
        """Get list of all instance SOPInstanceUIDs."""
        return [inst.sop_instance_uid for inst in self.instances]

    def get_file_paths(self) -> list[str]:
        """Get list of all instance file paths as strings."""
        return [str(inst.file_path) for inst in self.instances]

check_completeness(spacing_tolerance=0.1)

Check if the series has all expected slices.

Uses geometric validation based on ImagePositionPatient projection to detect missing slices and gaps.

Parameters:

Name Type Description Default
spacing_tolerance float

Tolerance for gap detection (default 10%).

0.1

Returns:

Type Description
dict[str, Any]

Dictionary with completeness information.

Source code in pictologics/utilities/dicom_database.py
def check_completeness(self, spacing_tolerance: float = 0.1) -> dict[str, Any]:
    """Check if the series has all expected slices.

    Uses geometric validation based on ImagePositionPatient projection
    to detect missing slices and gaps.

    Args:
        spacing_tolerance: Tolerance for gap detection (default 10%).

    Returns:
        Dictionary with completeness information.
    """
    result: dict[str, Any] = {
        "series_uid": self.series_instance_uid,
        "total_instances": len(self.instances),
        "expected_instances": len(self.instances),
        "is_complete": True,
        "has_gaps": False,
        "gap_indices": [],
        "gap_positions": [],
        "spacing_mm": None,
        "spacing_std": None,
        "spacing_uniform": True,
        "first_slice_position": None,
        "last_slice_position": None,
        "frame_of_reference_uid": self.frame_of_reference_uid,
        "warnings": [],
    }

    if len(self.instances) < 2:
        result["warnings"].append("Series has fewer than 2 instances")
        return result

    # Get sorted instances by projection score
    sorted_instances = self.get_sorted_instances()

    # Check if we have projection scores for geometric validation
    projection_scores = [
        inst.projection_score
        for inst in sorted_instances
        if inst.projection_score is not None
    ]

    if len(projection_scores) < 2:
        result["warnings"].append(
            "Insufficient spatial data for geometric completeness check"
        )
        # Fall back to instance number validation
        instance_numbers = [
            inst.instance_number
            for inst in sorted_instances
            if inst.instance_number is not None
        ]
        if len(instance_numbers) >= 2:
            instance_numbers_sorted = sorted(instance_numbers)
            expected_range = set(
                range(instance_numbers_sorted[0], instance_numbers_sorted[-1] + 1)
            )
            missing = expected_range - set(instance_numbers)
            if missing:
                result["is_complete"] = False
                result["has_gaps"] = True
                result["gap_indices"] = sorted(missing)
                result["expected_instances"] = len(expected_range)
        return result

    # Calculate spacings between consecutive slices
    spacings = np.diff(projection_scores)
    median_spacing = float(np.median(np.abs(spacings)))
    spacing_std = float(np.std(np.abs(spacings)))

    result["spacing_mm"] = median_spacing
    result["spacing_std"] = spacing_std
    result["first_slice_position"] = projection_scores[0]
    result["last_slice_position"] = projection_scores[-1]

    # Check for uniform spacing
    if median_spacing > 0:
        spacing_cv = spacing_std / median_spacing  # Coefficient of variation
        result["spacing_uniform"] = spacing_cv < spacing_tolerance

    # Detect gaps (spacing significantly larger than expected)
    gap_threshold = median_spacing * (1 + spacing_tolerance)
    gap_indices = []
    gap_positions = []

    for i, spacing in enumerate(np.abs(spacings)):
        if spacing > gap_threshold * 1.5:  # Gap detected
            gap_indices.append(i + 1)
            gap_positions.append(projection_scores[i])

    if gap_indices:
        result["has_gaps"] = True
        result["gap_indices"] = gap_indices
        result["gap_positions"] = gap_positions

        # Estimate expected instances based on position range and median spacing
        position_range = abs(projection_scores[-1] - projection_scores[0])
        if median_spacing > 0:
            expected = int(round(position_range / median_spacing)) + 1
            result["expected_instances"] = expected
            result["is_complete"] = False

    return result

get_file_paths()

Get list of all instance file paths as strings.

Source code in pictologics/utilities/dicom_database.py
def get_file_paths(self) -> list[str]:
    """Get list of all instance file paths as strings."""
    return [str(inst.file_path) for inst in self.instances]

get_instance_uids()

Get list of all instance SOPInstanceUIDs.

Source code in pictologics/utilities/dicom_database.py
def get_instance_uids(self) -> list[str]:
    """Get list of all instance SOPInstanceUIDs."""
    return [inst.sop_instance_uid for inst in self.instances]

get_sorted_instances()

Return instances sorted by spatial position (projection score).

Uses the same methodology as pictologics.loader for spatial sorting. Falls back to instance number if projection scores are not available.

Source code in pictologics/utilities/dicom_database.py
def get_sorted_instances(self) -> list[DicomInstance]:
    """Return instances sorted by spatial position (projection score).

    Uses the same methodology as pictologics.loader for spatial sorting.
    Falls back to instance number if projection scores are not available.
    """
    if all(inst.projection_score is not None for inst in self.instances):
        return sorted(self.instances, key=lambda x: x.projection_score or 0)
    return sorted(
        self.instances, key=lambda x: x.instance_number if x.instance_number else 0
    )

DicomStudy dataclass

Represents a DICOM study containing multiple series.

Attributes:

Name Type Description
study_instance_uid str

Unique identifier for this study.

study_date Optional[str]

Date of the study.

study_time Optional[str]

Time of the study.

study_description Optional[str]

Description of the study.

series list[DicomSeries]

List of DicomSeries objects in this study.

common_metadata dict[str, Any]

Metadata tags identical across all series.

Source code in pictologics/utilities/dicom_database.py
@dataclass
class DicomStudy:
    """Represents a DICOM study containing multiple series.

    Attributes:
        study_instance_uid: Unique identifier for this study.
        study_date: Date of the study.
        study_time: Time of the study.
        study_description: Description of the study.
        series: List of DicomSeries objects in this study.
        common_metadata: Metadata tags identical across all series.
    """

    study_instance_uid: str
    study_date: Optional[str] = None
    study_time: Optional[str] = None
    study_description: Optional[str] = None
    series: list[DicomSeries] = field(default_factory=list)
    common_metadata: dict[str, Any] = field(default_factory=dict)

    def get_instance_uids(self) -> list[str]:
        """Get list of all instance SOPInstanceUIDs in this study."""
        uids = []
        for s in self.series:
            uids.extend(s.get_instance_uids())
        return uids

    def get_file_paths(self) -> list[str]:
        """Get list of all instance file paths in this study."""
        paths = []
        for s in self.series:
            paths.extend(s.get_file_paths())
        return paths

get_file_paths()

Get list of all instance file paths in this study.

Source code in pictologics/utilities/dicom_database.py
def get_file_paths(self) -> list[str]:
    """Get list of all instance file paths in this study."""
    paths = []
    for s in self.series:
        paths.extend(s.get_file_paths())
    return paths

get_instance_uids()

Get list of all instance SOPInstanceUIDs in this study.

Source code in pictologics/utilities/dicom_database.py
def get_instance_uids(self) -> list[str]:
    """Get list of all instance SOPInstanceUIDs in this study."""
    uids = []
    for s in self.series:
        uids.extend(s.get_instance_uids())
    return uids

DicomPatient dataclass

Represents a DICOM patient containing multiple studies.

Attributes:

Name Type Description
patient_id str

Patient identifier.

patients_name Optional[str]

Patient's name.

patients_birth_date Optional[str]

Patient's birth date.

patients_sex Optional[str]

Patient's sex.

studies list[DicomStudy]

List of DicomStudy objects for this patient.

common_metadata dict[str, Any]

Metadata tags identical across all studies.

Source code in pictologics/utilities/dicom_database.py
@dataclass
class DicomPatient:
    """Represents a DICOM patient containing multiple studies.

    Attributes:
        patient_id: Patient identifier.
        patients_name: Patient's name.
        patients_birth_date: Patient's birth date.
        patients_sex: Patient's sex.
        studies: List of DicomStudy objects for this patient.
        common_metadata: Metadata tags identical across all studies.
    """

    patient_id: str
    patients_name: Optional[str] = None
    patients_birth_date: Optional[str] = None
    patients_sex: Optional[str] = None
    studies: list[DicomStudy] = field(default_factory=list)
    common_metadata: dict[str, Any] = field(default_factory=dict)

    def get_instance_uids(self) -> list[str]:
        """Get list of all instance SOPInstanceUIDs for this patient."""
        uids = []
        for study in self.studies:
            uids.extend(study.get_instance_uids())
        return uids

    def get_file_paths(self) -> list[str]:
        """Get list of all instance file paths for this patient."""
        paths = []
        for study in self.studies:
            paths.extend(study.get_file_paths())
        return paths

get_file_paths()

Get list of all instance file paths for this patient.

Source code in pictologics/utilities/dicom_database.py
def get_file_paths(self) -> list[str]:
    """Get list of all instance file paths for this patient."""
    paths = []
    for study in self.studies:
        paths.extend(study.get_file_paths())
    return paths

get_instance_uids()

Get list of all instance SOPInstanceUIDs for this patient.

Source code in pictologics/utilities/dicom_database.py
def get_instance_uids(self) -> list[str]:
    """Get list of all instance SOPInstanceUIDs for this patient."""
    uids = []
    for study in self.studies:
        uids.extend(study.get_instance_uids())
    return uids

DicomDatabase dataclass

Top-level database containing all patients.

This class provides the main interface for building a DICOM database from folders and exporting to various formats.

Attributes:

Name Type Description
patients list[DicomPatient]

List of DicomPatient objects.

spacing_tolerance float

Tolerance for gap detection in completeness checks.

Source code in pictologics/utilities/dicom_database.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
@dataclass
class DicomDatabase:
    """Top-level database containing all patients.

    This class provides the main interface for building a DICOM database
    from folders and exporting to various formats.

    Attributes:
        patients: List of DicomPatient objects.
        spacing_tolerance: Tolerance for gap detection in completeness checks.
    """

    patients: list[DicomPatient] = field(default_factory=list)
    spacing_tolerance: float = 0.1

    @classmethod
    def from_folders(
        cls,
        paths: list[str | Path],
        recursive: bool = True,
        spacing_tolerance: float = 0.1,
        show_progress: bool = True,
        extract_private_tags: bool = True,
        num_workers: Optional[int] = None,
        split_multiseries: bool = True,
    ) -> "DicomDatabase":
        """Build a database from folder paths.

        Args:
            paths: List of folder paths to scan.
            recursive: Whether to scan subdirectories.
            spacing_tolerance: Tolerance for gap detection (default 10%).
            show_progress: Whether to display progress bars.
            extract_private_tags: Whether to extract vendor-specific private tags.
            num_workers: Number of parallel workers. None=auto (cpu_count-1),
                        1=sequential (no multiprocessing).
            split_multiseries: Whether to split multi-phase series (e.g. cardiac)
                              into separate series based on tags or spatial duplicates.

        Returns:
            DicomDatabase instance populated with all discovered DICOM files.

        Example:
            Build database from multiple folders:

            ```python
            from pictologics.utilities.dicom_database import DicomDatabase

            db = DicomDatabase.from_folders(
                paths=["data/patient1", "data/patient2"],
                recursive=True,
                num_workers=4
            )
            print(f"Found {len(db.patients)} patients")
            ```
        """
        # Convert paths to Path objects
        path_objs = [Path(p) for p in paths]

        # Determine number of workers
        workers = _get_num_workers(num_workers)

        # Step 1: Discover all DICOM files
        dicom_files = _scan_dicom_files(path_objs, recursive, show_progress, workers)

        if not dicom_files:
            return cls(patients=[], spacing_tolerance=spacing_tolerance)

        # Step 2: Extract metadata from each file (parallel if workers > 1)
        file_metadata = _extract_all_metadata(
            dicom_files, show_progress, extract_private_tags, workers
        )

        # Step 3: Build hierarchy from flat metadata list
        patients = _build_hierarchy(file_metadata, spacing_tolerance, split_multiseries)

        # Step 4: Sort the hierarchy for consistent output
        patients = _sort_hierarchy(patients)

        return cls(patients=patients, spacing_tolerance=spacing_tolerance)

    # ========================================================================
    # DataFrame Export Methods
    # ========================================================================

    def get_patients_df(self, include_instance_lists: bool = False) -> pd.DataFrame:
        """Export patient-level summary DataFrame.

        Args:
            include_instance_lists: Whether to include InstanceSOPUIDs and
                InstanceFilePaths columns. Defaults to False to reduce memory.

        Returns:
            DataFrame with patient information and aggregated statistics.
        """
        rows = []
        for patient in self.patients:
            row: Dict[str, Any] = {
                "PatientID": patient.patient_id,
                "PatientsName": patient.patients_name,
                "PatientsBirthDate": patient.patients_birth_date,
                "PatientsSex": patient.patients_sex,
                "NumStudies": len(patient.studies),
                "NumSeries": sum(len(study.series) for study in patient.studies),
                "NumInstances": sum(
                    len(series.instances)
                    for study in patient.studies
                    for series in study.series
                ),
            }
            if include_instance_lists:
                row["InstanceSOPUIDs"] = patient.get_instance_uids()
                row["InstanceFilePaths"] = patient.get_file_paths()

            # Add study date range
            study_dates = [
                study.study_date for study in patient.studies if study.study_date
            ]
            if study_dates:
                row["EarliestStudyDate"] = min(study_dates)
                row["LatestStudyDate"] = max(study_dates)
            else:
                row["EarliestStudyDate"] = None
                row["LatestStudyDate"] = None

            # Add common metadata from patient level
            for key, value in patient.common_metadata.items():
                if key not in row:
                    row[key] = value

            rows.append(row)

        return pd.DataFrame(rows)

    def get_studies_df(
        self,
        patient_id: Optional[str] = None,
        include_instance_lists: bool = False,
    ) -> pd.DataFrame:
        """Export study-level summary DataFrame.

        Args:
            patient_id: Optional filter by patient ID.
            include_instance_lists: Whether to include InstanceSOPUIDs and
                InstanceFilePaths columns. Defaults to False to reduce memory.

        Returns:
            DataFrame with study information.
        """
        rows = []
        for patient in self.patients:
            if patient_id and patient.patient_id != patient_id:
                continue

            for study in patient.studies:
                row: Dict[str, Any] = {
                    # Patient info
                    "PatientID": patient.patient_id,
                    "PatientsName": patient.patients_name,
                    "PatientsBirthDate": patient.patients_birth_date,
                    "PatientsSex": patient.patients_sex,
                    # Study info
                    "StudyInstanceUID": study.study_instance_uid,
                    "StudyDate": study.study_date,
                    "StudyTime": study.study_time,
                    "StudyDescription": study.study_description,
                    "NumSeries": len(study.series),
                    "NumInstances": sum(
                        len(series.instances) for series in study.series
                    ),
                }
                if include_instance_lists:
                    row["InstanceSOPUIDs"] = study.get_instance_uids()
                    row["InstanceFilePaths"] = study.get_file_paths()

                # Collect modalities present
                modalities = list(set(s.modality for s in study.series if s.modality))
                row["ModalitiesPresent"] = modalities

                # Add common metadata
                for key, value in patient.common_metadata.items():
                    if key not in row:
                        row[key] = value
                for key, value in study.common_metadata.items():
                    if key not in row:
                        row[key] = value

                rows.append(row)

        return pd.DataFrame(rows)

    def get_series_df(
        self,
        patient_id: Optional[str] = None,
        study_uid: Optional[str] = None,
        include_instance_lists: bool = False,
    ) -> pd.DataFrame:
        """Export series-level summary DataFrame with completeness info.

        Args:
            patient_id: Optional filter by patient ID.
            study_uid: Optional filter by study UID.
            include_instance_lists: Whether to include InstanceSOPUIDs and
                InstanceFilePaths columns. Defaults to False to reduce memory.

        Returns:
            DataFrame with series information including completeness validation.
        """
        rows = []
        for patient in self.patients:
            if patient_id and patient.patient_id != patient_id:
                continue

            for study in patient.studies:
                if study_uid and study.study_instance_uid != study_uid:
                    continue

                for series in study.series:
                    completeness = series.check_completeness(self.spacing_tolerance)

                    row = {
                        # Patient info
                        "PatientID": patient.patient_id,
                        "PatientsName": patient.patients_name,
                        # Study info
                        "StudyInstanceUID": study.study_instance_uid,
                        "StudyDate": study.study_date,
                        "StudyDescription": study.study_description,
                        # Series info
                        "SeriesInstanceUID": series.series_instance_uid,
                        "SeriesNumber": series.series_number,
                        "SeriesDescription": series.series_description,
                        "Modality": series.modality,
                        "FrameOfReferenceUID": series.frame_of_reference_uid,
                        # Completeness
                        "NumInstances": completeness["total_instances"],
                        "ExpectedInstances": completeness["expected_instances"],
                        "IsComplete": completeness["is_complete"],
                        "HasGaps": completeness["has_gaps"],
                        "GapIndices": completeness["gap_indices"],
                        "SpacingMM": completeness["spacing_mm"],
                        "SpacingUniform": completeness["spacing_uniform"],
                        "FirstSlicePosition": completeness["first_slice_position"],
                        "LastSlicePosition": completeness["last_slice_position"],
                        "CompletenessWarnings": completeness["warnings"],
                    }
                    if include_instance_lists:
                        row["InstanceSOPUIDs"] = series.get_instance_uids()
                        row["InstanceFilePaths"] = series.get_file_paths()

                    # Add common metadata from all levels
                    for key, value in patient.common_metadata.items():
                        if key not in row:
                            row[key] = value
                    for key, value in study.common_metadata.items():
                        if key not in row:
                            row[key] = value
                    for key, value in series.common_metadata.items():
                        if key not in row:
                            row[key] = value

                    rows.append(row)

        return pd.DataFrame(rows)

    def get_instances_df(
        self,
        patient_id: Optional[str] = None,
        study_uid: Optional[str] = None,
        series_uid: Optional[str] = None,
    ) -> pd.DataFrame:
        """Export instance-level detail DataFrame.

        Args:
            patient_id: Optional filter by patient ID.
            study_uid: Optional filter by study UID.
            series_uid: Optional filter by series UID.

        Returns:
            DataFrame with complete instance information.
        """
        rows = []
        for patient in self.patients:
            if patient_id and patient.patient_id != patient_id:
                continue

            for study in patient.studies:
                if study_uid and study.study_instance_uid != study_uid:
                    continue

                for series in study.series:
                    if series_uid and series.series_instance_uid != series_uid:
                        continue

                    for instance in series.instances:
                        row: dict[str, Any] = {
                            # Hierarchy IDs
                            "PatientID": patient.patient_id,
                            "StudyInstanceUID": study.study_instance_uid,
                            "SeriesInstanceUID": series.series_instance_uid,
                            "SOPInstanceUID": instance.sop_instance_uid,
                            # Instance info
                            "FilePath": str(instance.file_path),
                            "InstanceNumber": instance.instance_number,
                            "SliceLocation": instance.slice_location,
                            "ProjectionScore": instance.projection_score,
                            "AcquisitionDateTime": instance.acquisition_datetime,
                        }

                        # Image position
                        if instance.image_position_patient:
                            row["ImagePositionPatient_X"] = (
                                instance.image_position_patient[0]
                            )
                            row["ImagePositionPatient_Y"] = (
                                instance.image_position_patient[1]
                            )
                            row["ImagePositionPatient_Z"] = (
                                instance.image_position_patient[2]
                            )
                        else:
                            row["ImagePositionPatient_X"] = None
                            row["ImagePositionPatient_Y"] = None
                            row["ImagePositionPatient_Z"] = None

                        # Image orientation
                        row["ImageOrientationPatient"] = (
                            instance.image_orientation_patient
                        )

                        # Add parent-level metadata
                        row["PatientsName"] = patient.patients_name
                        row["StudyDate"] = study.study_date
                        row["StudyDescription"] = study.study_description
                        row["SeriesNumber"] = series.series_number
                        row["SeriesDescription"] = series.series_description
                        row["Modality"] = series.modality

                        # Add instance-specific metadata
                        for key, value in instance.metadata.items():
                            if key not in row:
                                row[key] = value

                        rows.append(row)

        return pd.DataFrame(rows)

    # ========================================================================
    # Export Methods
    # ========================================================================

    def export_csv(
        self,
        base_path: str,
        levels: Optional[list[str]] = None,
        include_instance_lists: bool = False,
    ) -> dict[str, str]:
        """Export DataFrames to separate CSV files.

        Args:
            base_path: Base path for output files (without extension).
            levels: List of levels to export ('patients', 'studies', 'series',
                   'instances'). Defaults to all levels.
            include_instance_lists: Whether to include InstanceSOPUIDs and
                InstanceFilePaths columns. Defaults to False to reduce file size.

        Returns:
            Dictionary mapping level names to created file paths.

        Example:
            Export database to CSV files:

            ```python
            files = db.export_csv(
                base_path="output/dicom_db",
                levels=["patients", "studies", "series"]
            )
            # Creates output/dicom_db_patients.csv, output/dicom_db_studies.csv, etc.
            ```
        """
        if levels is None:
            levels = ["patients", "studies", "series", "instances"]

        created_files = {}

        for level in levels:
            if level == "patients":
                df = self.get_patients_df(include_instance_lists=include_instance_lists)
            elif level == "studies":
                df = self.get_studies_df(include_instance_lists=include_instance_lists)
            elif level == "series":
                df = self.get_series_df(include_instance_lists=include_instance_lists)
            elif level == "instances":
                df = self.get_instances_df()
            else:
                continue

            file_path = f"{base_path}_{level}.csv"
            # Convert list columns to JSON strings for CSV compatibility
            for col in df.columns:
                if df[col].apply(lambda x: isinstance(x, list)).any():
                    df[col] = df[col].apply(
                        lambda x: json.dumps(x) if isinstance(x, list) else x
                    )
            df.to_csv(file_path, index=False)
            created_files[level] = file_path

        return created_files

    def export_json(
        self,
        json_path: str,
        include_instance_lists: bool = True,
    ) -> str:
        """Export full hierarchy to JSON.

        Args:
            json_path: Path for the output JSON file.
            include_instance_lists: Whether to include per-instance file paths
                in the JSON output. Defaults to True for full export.

        Returns:
            Path to the created file.

        Example:
            Export full database hierarchy to JSON:

            ```python
            json_path = db.export_json("output/db.json")
            ```
        """
        data: dict[str, list[Any]] = {"patients": []}

        for patient in self.patients:
            patient_dict: dict[str, Any] = {
                "patient_id": patient.patient_id,
                "patients_name": patient.patients_name,
                "patients_birth_date": patient.patients_birth_date,
                "patients_sex": patient.patients_sex,
                "common_metadata": patient.common_metadata,
                "studies": [],
            }
            if include_instance_lists:
                patient_dict["instance_uids"] = patient.get_instance_uids()
                patient_dict["file_paths"] = patient.get_file_paths()

            for study in patient.studies:
                study_dict: dict[str, Any] = {
                    "study_instance_uid": study.study_instance_uid,
                    "study_date": study.study_date,
                    "study_time": study.study_time,
                    "study_description": study.study_description,
                    "common_metadata": study.common_metadata,
                    "series": [],
                }

                for series in study.series:
                    completeness = series.check_completeness(self.spacing_tolerance)
                    series_dict: dict[str, Any] = {
                        "series_instance_uid": series.series_instance_uid,
                        "series_number": series.series_number,
                        "series_description": series.series_description,
                        "modality": series.modality,
                        "frame_of_reference_uid": series.frame_of_reference_uid,
                        "common_metadata": series.common_metadata,
                        "completeness": completeness,
                        "instances": [],
                    }

                    for instance in series.instances:
                        instance_dict: dict[str, Any] = {
                            "sop_instance_uid": instance.sop_instance_uid,
                            "instance_number": instance.instance_number,
                            "image_position_patient": instance.image_position_patient,
                            "slice_location": instance.slice_location,
                            "projection_score": instance.projection_score,
                            "metadata": instance.metadata,
                        }
                        if include_instance_lists:
                            instance_dict["file_path"] = str(instance.file_path)
                        series_dict["instances"].append(instance_dict)

                    study_dict["series"].append(series_dict)

                patient_dict["studies"].append(study_dict)

            data["patients"].append(patient_dict)

        with open(json_path, "w") as f:
            json.dump(data, f, indent=2, default=str)

        return json_path

export_csv(base_path, levels=None, include_instance_lists=False)

Export DataFrames to separate CSV files.

Parameters:

Name Type Description Default
base_path str

Base path for output files (without extension).

required
levels Optional[list[str]]

List of levels to export ('patients', 'studies', 'series', 'instances'). Defaults to all levels.

None
include_instance_lists bool

Whether to include InstanceSOPUIDs and InstanceFilePaths columns. Defaults to False to reduce file size.

False

Returns:

Type Description
dict[str, str]

Dictionary mapping level names to created file paths.

Example

Export database to CSV files:

files = db.export_csv(
    base_path="output/dicom_db",
    levels=["patients", "studies", "series"]
)
# Creates output/dicom_db_patients.csv, output/dicom_db_studies.csv, etc.
Source code in pictologics/utilities/dicom_database.py
def export_csv(
    self,
    base_path: str,
    levels: Optional[list[str]] = None,
    include_instance_lists: bool = False,
) -> dict[str, str]:
    """Export DataFrames to separate CSV files.

    Args:
        base_path: Base path for output files (without extension).
        levels: List of levels to export ('patients', 'studies', 'series',
               'instances'). Defaults to all levels.
        include_instance_lists: Whether to include InstanceSOPUIDs and
            InstanceFilePaths columns. Defaults to False to reduce file size.

    Returns:
        Dictionary mapping level names to created file paths.

    Example:
        Export database to CSV files:

        ```python
        files = db.export_csv(
            base_path="output/dicom_db",
            levels=["patients", "studies", "series"]
        )
        # Creates output/dicom_db_patients.csv, output/dicom_db_studies.csv, etc.
        ```
    """
    if levels is None:
        levels = ["patients", "studies", "series", "instances"]

    created_files = {}

    for level in levels:
        if level == "patients":
            df = self.get_patients_df(include_instance_lists=include_instance_lists)
        elif level == "studies":
            df = self.get_studies_df(include_instance_lists=include_instance_lists)
        elif level == "series":
            df = self.get_series_df(include_instance_lists=include_instance_lists)
        elif level == "instances":
            df = self.get_instances_df()
        else:
            continue

        file_path = f"{base_path}_{level}.csv"
        # Convert list columns to JSON strings for CSV compatibility
        for col in df.columns:
            if df[col].apply(lambda x: isinstance(x, list)).any():
                df[col] = df[col].apply(
                    lambda x: json.dumps(x) if isinstance(x, list) else x
                )
        df.to_csv(file_path, index=False)
        created_files[level] = file_path

    return created_files

export_json(json_path, include_instance_lists=True)

Export full hierarchy to JSON.

Parameters:

Name Type Description Default
json_path str

Path for the output JSON file.

required
include_instance_lists bool

Whether to include per-instance file paths in the JSON output. Defaults to True for full export.

True

Returns:

Type Description
str

Path to the created file.

Example

Export full database hierarchy to JSON:

json_path = db.export_json("output/db.json")
Source code in pictologics/utilities/dicom_database.py
def export_json(
    self,
    json_path: str,
    include_instance_lists: bool = True,
) -> str:
    """Export full hierarchy to JSON.

    Args:
        json_path: Path for the output JSON file.
        include_instance_lists: Whether to include per-instance file paths
            in the JSON output. Defaults to True for full export.

    Returns:
        Path to the created file.

    Example:
        Export full database hierarchy to JSON:

        ```python
        json_path = db.export_json("output/db.json")
        ```
    """
    data: dict[str, list[Any]] = {"patients": []}

    for patient in self.patients:
        patient_dict: dict[str, Any] = {
            "patient_id": patient.patient_id,
            "patients_name": patient.patients_name,
            "patients_birth_date": patient.patients_birth_date,
            "patients_sex": patient.patients_sex,
            "common_metadata": patient.common_metadata,
            "studies": [],
        }
        if include_instance_lists:
            patient_dict["instance_uids"] = patient.get_instance_uids()
            patient_dict["file_paths"] = patient.get_file_paths()

        for study in patient.studies:
            study_dict: dict[str, Any] = {
                "study_instance_uid": study.study_instance_uid,
                "study_date": study.study_date,
                "study_time": study.study_time,
                "study_description": study.study_description,
                "common_metadata": study.common_metadata,
                "series": [],
            }

            for series in study.series:
                completeness = series.check_completeness(self.spacing_tolerance)
                series_dict: dict[str, Any] = {
                    "series_instance_uid": series.series_instance_uid,
                    "series_number": series.series_number,
                    "series_description": series.series_description,
                    "modality": series.modality,
                    "frame_of_reference_uid": series.frame_of_reference_uid,
                    "common_metadata": series.common_metadata,
                    "completeness": completeness,
                    "instances": [],
                }

                for instance in series.instances:
                    instance_dict: dict[str, Any] = {
                        "sop_instance_uid": instance.sop_instance_uid,
                        "instance_number": instance.instance_number,
                        "image_position_patient": instance.image_position_patient,
                        "slice_location": instance.slice_location,
                        "projection_score": instance.projection_score,
                        "metadata": instance.metadata,
                    }
                    if include_instance_lists:
                        instance_dict["file_path"] = str(instance.file_path)
                    series_dict["instances"].append(instance_dict)

                study_dict["series"].append(series_dict)

            patient_dict["studies"].append(study_dict)

        data["patients"].append(patient_dict)

    with open(json_path, "w") as f:
        json.dump(data, f, indent=2, default=str)

    return json_path

from_folders(paths, recursive=True, spacing_tolerance=0.1, show_progress=True, extract_private_tags=True, num_workers=None, split_multiseries=True) classmethod

Build a database from folder paths.

Parameters:

Name Type Description Default
paths list[str | Path]

List of folder paths to scan.

required
recursive bool

Whether to scan subdirectories.

True
spacing_tolerance float

Tolerance for gap detection (default 10%).

0.1
show_progress bool

Whether to display progress bars.

True
extract_private_tags bool

Whether to extract vendor-specific private tags.

True
num_workers Optional[int]

Number of parallel workers. None=auto (cpu_count-1), 1=sequential (no multiprocessing).

None
split_multiseries bool

Whether to split multi-phase series (e.g. cardiac) into separate series based on tags or spatial duplicates.

True

Returns:

Type Description
'DicomDatabase'

DicomDatabase instance populated with all discovered DICOM files.

Example

Build database from multiple folders:

from pictologics.utilities.dicom_database import DicomDatabase

db = DicomDatabase.from_folders(
    paths=["data/patient1", "data/patient2"],
    recursive=True,
    num_workers=4
)
print(f"Found {len(db.patients)} patients")
Source code in pictologics/utilities/dicom_database.py
@classmethod
def from_folders(
    cls,
    paths: list[str | Path],
    recursive: bool = True,
    spacing_tolerance: float = 0.1,
    show_progress: bool = True,
    extract_private_tags: bool = True,
    num_workers: Optional[int] = None,
    split_multiseries: bool = True,
) -> "DicomDatabase":
    """Build a database from folder paths.

    Args:
        paths: List of folder paths to scan.
        recursive: Whether to scan subdirectories.
        spacing_tolerance: Tolerance for gap detection (default 10%).
        show_progress: Whether to display progress bars.
        extract_private_tags: Whether to extract vendor-specific private tags.
        num_workers: Number of parallel workers. None=auto (cpu_count-1),
                    1=sequential (no multiprocessing).
        split_multiseries: Whether to split multi-phase series (e.g. cardiac)
                          into separate series based on tags or spatial duplicates.

    Returns:
        DicomDatabase instance populated with all discovered DICOM files.

    Example:
        Build database from multiple folders:

        ```python
        from pictologics.utilities.dicom_database import DicomDatabase

        db = DicomDatabase.from_folders(
            paths=["data/patient1", "data/patient2"],
            recursive=True,
            num_workers=4
        )
        print(f"Found {len(db.patients)} patients")
        ```
    """
    # Convert paths to Path objects
    path_objs = [Path(p) for p in paths]

    # Determine number of workers
    workers = _get_num_workers(num_workers)

    # Step 1: Discover all DICOM files
    dicom_files = _scan_dicom_files(path_objs, recursive, show_progress, workers)

    if not dicom_files:
        return cls(patients=[], spacing_tolerance=spacing_tolerance)

    # Step 2: Extract metadata from each file (parallel if workers > 1)
    file_metadata = _extract_all_metadata(
        dicom_files, show_progress, extract_private_tags, workers
    )

    # Step 3: Build hierarchy from flat metadata list
    patients = _build_hierarchy(file_metadata, spacing_tolerance, split_multiseries)

    # Step 4: Sort the hierarchy for consistent output
    patients = _sort_hierarchy(patients)

    return cls(patients=patients, spacing_tolerance=spacing_tolerance)

get_instances_df(patient_id=None, study_uid=None, series_uid=None)

Export instance-level detail DataFrame.

Parameters:

Name Type Description Default
patient_id Optional[str]

Optional filter by patient ID.

None
study_uid Optional[str]

Optional filter by study UID.

None
series_uid Optional[str]

Optional filter by series UID.

None

Returns:

Type Description
DataFrame

DataFrame with complete instance information.

Source code in pictologics/utilities/dicom_database.py
def get_instances_df(
    self,
    patient_id: Optional[str] = None,
    study_uid: Optional[str] = None,
    series_uid: Optional[str] = None,
) -> pd.DataFrame:
    """Export instance-level detail DataFrame.

    Args:
        patient_id: Optional filter by patient ID.
        study_uid: Optional filter by study UID.
        series_uid: Optional filter by series UID.

    Returns:
        DataFrame with complete instance information.
    """
    rows = []
    for patient in self.patients:
        if patient_id and patient.patient_id != patient_id:
            continue

        for study in patient.studies:
            if study_uid and study.study_instance_uid != study_uid:
                continue

            for series in study.series:
                if series_uid and series.series_instance_uid != series_uid:
                    continue

                for instance in series.instances:
                    row: dict[str, Any] = {
                        # Hierarchy IDs
                        "PatientID": patient.patient_id,
                        "StudyInstanceUID": study.study_instance_uid,
                        "SeriesInstanceUID": series.series_instance_uid,
                        "SOPInstanceUID": instance.sop_instance_uid,
                        # Instance info
                        "FilePath": str(instance.file_path),
                        "InstanceNumber": instance.instance_number,
                        "SliceLocation": instance.slice_location,
                        "ProjectionScore": instance.projection_score,
                        "AcquisitionDateTime": instance.acquisition_datetime,
                    }

                    # Image position
                    if instance.image_position_patient:
                        row["ImagePositionPatient_X"] = (
                            instance.image_position_patient[0]
                        )
                        row["ImagePositionPatient_Y"] = (
                            instance.image_position_patient[1]
                        )
                        row["ImagePositionPatient_Z"] = (
                            instance.image_position_patient[2]
                        )
                    else:
                        row["ImagePositionPatient_X"] = None
                        row["ImagePositionPatient_Y"] = None
                        row["ImagePositionPatient_Z"] = None

                    # Image orientation
                    row["ImageOrientationPatient"] = (
                        instance.image_orientation_patient
                    )

                    # Add parent-level metadata
                    row["PatientsName"] = patient.patients_name
                    row["StudyDate"] = study.study_date
                    row["StudyDescription"] = study.study_description
                    row["SeriesNumber"] = series.series_number
                    row["SeriesDescription"] = series.series_description
                    row["Modality"] = series.modality

                    # Add instance-specific metadata
                    for key, value in instance.metadata.items():
                        if key not in row:
                            row[key] = value

                    rows.append(row)

    return pd.DataFrame(rows)

get_patients_df(include_instance_lists=False)

Export patient-level summary DataFrame.

Parameters:

Name Type Description Default
include_instance_lists bool

Whether to include InstanceSOPUIDs and InstanceFilePaths columns. Defaults to False to reduce memory.

False

Returns:

Type Description
DataFrame

DataFrame with patient information and aggregated statistics.

Source code in pictologics/utilities/dicom_database.py
def get_patients_df(self, include_instance_lists: bool = False) -> pd.DataFrame:
    """Export patient-level summary DataFrame.

    Args:
        include_instance_lists: Whether to include InstanceSOPUIDs and
            InstanceFilePaths columns. Defaults to False to reduce memory.

    Returns:
        DataFrame with patient information and aggregated statistics.
    """
    rows = []
    for patient in self.patients:
        row: Dict[str, Any] = {
            "PatientID": patient.patient_id,
            "PatientsName": patient.patients_name,
            "PatientsBirthDate": patient.patients_birth_date,
            "PatientsSex": patient.patients_sex,
            "NumStudies": len(patient.studies),
            "NumSeries": sum(len(study.series) for study in patient.studies),
            "NumInstances": sum(
                len(series.instances)
                for study in patient.studies
                for series in study.series
            ),
        }
        if include_instance_lists:
            row["InstanceSOPUIDs"] = patient.get_instance_uids()
            row["InstanceFilePaths"] = patient.get_file_paths()

        # Add study date range
        study_dates = [
            study.study_date for study in patient.studies if study.study_date
        ]
        if study_dates:
            row["EarliestStudyDate"] = min(study_dates)
            row["LatestStudyDate"] = max(study_dates)
        else:
            row["EarliestStudyDate"] = None
            row["LatestStudyDate"] = None

        # Add common metadata from patient level
        for key, value in patient.common_metadata.items():
            if key not in row:
                row[key] = value

        rows.append(row)

    return pd.DataFrame(rows)

get_series_df(patient_id=None, study_uid=None, include_instance_lists=False)

Export series-level summary DataFrame with completeness info.

Parameters:

Name Type Description Default
patient_id Optional[str]

Optional filter by patient ID.

None
study_uid Optional[str]

Optional filter by study UID.

None
include_instance_lists bool

Whether to include InstanceSOPUIDs and InstanceFilePaths columns. Defaults to False to reduce memory.

False

Returns:

Type Description
DataFrame

DataFrame with series information including completeness validation.

Source code in pictologics/utilities/dicom_database.py
def get_series_df(
    self,
    patient_id: Optional[str] = None,
    study_uid: Optional[str] = None,
    include_instance_lists: bool = False,
) -> pd.DataFrame:
    """Export series-level summary DataFrame with completeness info.

    Args:
        patient_id: Optional filter by patient ID.
        study_uid: Optional filter by study UID.
        include_instance_lists: Whether to include InstanceSOPUIDs and
            InstanceFilePaths columns. Defaults to False to reduce memory.

    Returns:
        DataFrame with series information including completeness validation.
    """
    rows = []
    for patient in self.patients:
        if patient_id and patient.patient_id != patient_id:
            continue

        for study in patient.studies:
            if study_uid and study.study_instance_uid != study_uid:
                continue

            for series in study.series:
                completeness = series.check_completeness(self.spacing_tolerance)

                row = {
                    # Patient info
                    "PatientID": patient.patient_id,
                    "PatientsName": patient.patients_name,
                    # Study info
                    "StudyInstanceUID": study.study_instance_uid,
                    "StudyDate": study.study_date,
                    "StudyDescription": study.study_description,
                    # Series info
                    "SeriesInstanceUID": series.series_instance_uid,
                    "SeriesNumber": series.series_number,
                    "SeriesDescription": series.series_description,
                    "Modality": series.modality,
                    "FrameOfReferenceUID": series.frame_of_reference_uid,
                    # Completeness
                    "NumInstances": completeness["total_instances"],
                    "ExpectedInstances": completeness["expected_instances"],
                    "IsComplete": completeness["is_complete"],
                    "HasGaps": completeness["has_gaps"],
                    "GapIndices": completeness["gap_indices"],
                    "SpacingMM": completeness["spacing_mm"],
                    "SpacingUniform": completeness["spacing_uniform"],
                    "FirstSlicePosition": completeness["first_slice_position"],
                    "LastSlicePosition": completeness["last_slice_position"],
                    "CompletenessWarnings": completeness["warnings"],
                }
                if include_instance_lists:
                    row["InstanceSOPUIDs"] = series.get_instance_uids()
                    row["InstanceFilePaths"] = series.get_file_paths()

                # Add common metadata from all levels
                for key, value in patient.common_metadata.items():
                    if key not in row:
                        row[key] = value
                for key, value in study.common_metadata.items():
                    if key not in row:
                        row[key] = value
                for key, value in series.common_metadata.items():
                    if key not in row:
                        row[key] = value

                rows.append(row)

    return pd.DataFrame(rows)

get_studies_df(patient_id=None, include_instance_lists=False)

Export study-level summary DataFrame.

Parameters:

Name Type Description Default
patient_id Optional[str]

Optional filter by patient ID.

None
include_instance_lists bool

Whether to include InstanceSOPUIDs and InstanceFilePaths columns. Defaults to False to reduce memory.

False

Returns:

Type Description
DataFrame

DataFrame with study information.

Source code in pictologics/utilities/dicom_database.py
def get_studies_df(
    self,
    patient_id: Optional[str] = None,
    include_instance_lists: bool = False,
) -> pd.DataFrame:
    """Export study-level summary DataFrame.

    Args:
        patient_id: Optional filter by patient ID.
        include_instance_lists: Whether to include InstanceSOPUIDs and
            InstanceFilePaths columns. Defaults to False to reduce memory.

    Returns:
        DataFrame with study information.
    """
    rows = []
    for patient in self.patients:
        if patient_id and patient.patient_id != patient_id:
            continue

        for study in patient.studies:
            row: Dict[str, Any] = {
                # Patient info
                "PatientID": patient.patient_id,
                "PatientsName": patient.patients_name,
                "PatientsBirthDate": patient.patients_birth_date,
                "PatientsSex": patient.patients_sex,
                # Study info
                "StudyInstanceUID": study.study_instance_uid,
                "StudyDate": study.study_date,
                "StudyTime": study.study_time,
                "StudyDescription": study.study_description,
                "NumSeries": len(study.series),
                "NumInstances": sum(
                    len(series.instances) for series in study.series
                ),
            }
            if include_instance_lists:
                row["InstanceSOPUIDs"] = study.get_instance_uids()
                row["InstanceFilePaths"] = study.get_file_paths()

            # Collect modalities present
            modalities = list(set(s.modality for s in study.series if s.modality))
            row["ModalitiesPresent"] = modalities

            # Add common metadata
            for key, value in patient.common_metadata.items():
                if key not in row:
                    row[key] = value
            for key, value in study.common_metadata.items():
                if key not in row:
                    row[key] = value

            rows.append(row)

    return pd.DataFrame(rows)

pictologics.utilities.dicom_utils

DICOM Utility Functions.

This module provides shared utility functions for working with DICOM files, including multi-phase series detection and splitting logic used by both the DicomDatabase and the image loader.

DicomPhaseInfo dataclass

Information about a detected phase in a DICOM series.

Attributes:

Name Type Description
index int

Zero-based index of this phase.

num_slices int

Number of slices/instances in this phase.

file_paths list[Path]

List of file paths belonging to this phase.

label Optional[str]

Human-readable label (e.g., "Phase 0%", "Echo 1").

split_tag Optional[str]

The DICOM tag used to detect this phase, or "spatial" if detected via duplicate positions.

split_value Optional[Any]

The value of the split tag for this phase.

Source code in pictologics/utilities/dicom_utils.py
@dataclass
class DicomPhaseInfo:
    """Information about a detected phase in a DICOM series.

    Attributes:
        index: Zero-based index of this phase.
        num_slices: Number of slices/instances in this phase.
        file_paths: List of file paths belonging to this phase.
        label: Human-readable label (e.g., "Phase 0%", "Echo 1").
        split_tag: The DICOM tag used to detect this phase, or "spatial"
            if detected via duplicate positions.
        split_value: The value of the split tag for this phase.
    """

    index: int
    num_slices: int
    file_paths: list[Path]
    label: Optional[str] = None
    split_tag: Optional[str] = None
    split_value: Optional[Any] = None

get_dicom_phases(path, recursive=False)

Discover phases in a DICOM series directory.

Scans a directory for DICOM files and detects if the series contains multiple phases (e.g., cardiac phases, temporal positions, echo numbers). This is useful before calling load_image() with a specific dataset_index.

Multi-phase detection uses the same logic as :class:DicomDatabase to ensure consistent behavior across the library.

Parameters:

Name Type Description Default
path str

Path to directory containing DICOM files.

required
recursive bool

If True, recursively searches subdirectories. Default False.

False

Returns:

Type Description
list[DicomPhaseInfo]

List of :class:DicomPhaseInfo objects describing each detected phase.

list[DicomPhaseInfo]

For single-phase series, returns a list with one element.

Raises:

Type Description
FileNotFoundError

If the path does not exist.

ValueError

If no DICOM files are found.

Example

Discover phases before loading:

from pictologics.utilities import get_dicom_phases
from pictologics import load_image

# Discover phases in a cardiac CT directory
phases = get_dicom_phases("cardiac_ct/")
print(f"Found {len(phases)} phases:")
for phase in phases:
    print(f"  Phase {phase.index}: {phase.num_slices} slices - {phase.label}")

# Load the 5th phase (40%)
img = load_image("cardiac_ct/", dataset_index=4)

# Check if series is multi-phase
if len(phases) > 1:
    print("Multi-phase series detected!")
else:
    print("Single-phase series")
See Also
  • :func:load_image: Main image loading function with dataset_index support.
  • :class:DicomDatabase: Full DICOM database parsing with automatic phase splitting.
Source code in pictologics/utilities/dicom_utils.py
def get_dicom_phases(
    path: str,
    recursive: bool = False,
) -> list[DicomPhaseInfo]:
    """Discover phases in a DICOM series directory.

    Scans a directory for DICOM files and detects if the series contains
    multiple phases (e.g., cardiac phases, temporal positions, echo numbers).
    This is useful before calling ``load_image()`` with a specific ``dataset_index``.

    Multi-phase detection uses the same logic as :class:`DicomDatabase` to ensure
    consistent behavior across the library.

    Args:
        path: Path to directory containing DICOM files.
        recursive: If True, recursively searches subdirectories. Default False.

    Returns:
        List of :class:`DicomPhaseInfo` objects describing each detected phase.
        For single-phase series, returns a list with one element.

    Raises:
        FileNotFoundError: If the path does not exist.
        ValueError: If no DICOM files are found.

    Example:
        Discover phases before loading:

        ```python
        from pictologics.utilities import get_dicom_phases
        from pictologics import load_image

        # Discover phases in a cardiac CT directory
        phases = get_dicom_phases("cardiac_ct/")
        print(f"Found {len(phases)} phases:")
        for phase in phases:
            print(f"  Phase {phase.index}: {phase.num_slices} slices - {phase.label}")

        # Load the 5th phase (40%)
        img = load_image("cardiac_ct/", dataset_index=4)

        # Check if series is multi-phase
        if len(phases) > 1:
            print("Multi-phase series detected!")
        else:
            print("Single-phase series")
        ```

    See Also:
        - :func:`load_image`: Main image loading function with ``dataset_index`` support.
        - :class:`DicomDatabase`: Full DICOM database parsing with automatic phase splitting.
    """
    path_obj = Path(path)
    if not path_obj.exists():
        raise FileNotFoundError(f"Path does not exist: {path}")

    # Collect DICOM files
    if path_obj.is_file():
        if pydicom.misc.is_dicom(path_obj):
            dicom_files = [path_obj]
        else:
            raise ValueError(f"File is not a DICOM file: {path}")
    else:
        if recursive:
            candidates = list(path_obj.rglob("*"))
        else:
            candidates = list(path_obj.iterdir())

        dicom_files = [
            f for f in candidates if f.is_file() and pydicom.misc.is_dicom(f)
        ]

    if not dicom_files:
        raise ValueError(f"No DICOM files found in: {path}")

    # Extract metadata for phase detection
    file_metadata: list[dict[str, Any]] = []
    for f in dicom_files:
        try:
            dcm = pydicom.dcmread(f, stop_before_pixels=True)
            meta: dict[str, Any] = {
                "file_path": f,
                "InstanceNumber": getattr(dcm, "InstanceNumber", None),
            }

            # Extract position
            try:
                ipp = dcm.ImagePositionPatient
                meta["ImagePositionPatient"] = (
                    float(ipp[0]),
                    float(ipp[1]),
                    float(ipp[2]),
                )
            except (AttributeError, IndexError, TypeError):
                meta["ImagePositionPatient"] = None

            # Extract multi-phase tags
            for tag in MULTI_PHASE_TAGS:
                val = getattr(dcm, tag, None)
                if val is not None:
                    meta[tag] = val

            file_metadata.append(meta)
        except Exception:
            continue

    if not file_metadata:
        raise ValueError(f"Could not read any DICOM files from: {path}")

    # Split into phases
    phases = split_dicom_phases(file_metadata)

    # Determine which tag was used for splitting
    split_tag = None
    if len(phases) > 1:
        # Check which tag has different values across phases
        for tag in MULTI_PHASE_TAGS:
            first_val = phases[0][0].get(tag) if phases[0] else None
            if first_val is not None:
                # Check if other phases have different values
                for phase in phases[1:]:
                    if phase and phase[0].get(tag) != first_val:
                        split_tag = tag
                        break
            if split_tag:
                break
        if not split_tag:
            split_tag = "spatial"  # Fallback was used

    # Build DicomPhaseInfo objects
    result: list[DicomPhaseInfo] = []
    for i, phase_meta in enumerate(phases):
        file_paths = [m["file_path"] for m in phase_meta]
        split_value = phase_meta[0].get(split_tag) if split_tag and phase_meta else None

        # Generate label
        if split_tag == "NominalPercentageOfCardiacPhase":
            label = f"Phase {split_value}%" if split_value is not None else f"Phase {i}"
        elif split_tag == "TemporalPositionIdentifier":
            label = (
                f"Temporal {split_value}" if split_value is not None else f"Time {i}"
            )
        elif split_tag == "EchoNumber":
            label = f"Echo {split_value}" if split_value is not None else f"Echo {i}"
        elif split_tag == "AcquisitionNumber":
            label = (
                f"Acquisition {split_value}" if split_value is not None else f"Acq {i}"
            )
        elif split_tag == "TriggerTime":
            label = (
                f"Trigger {split_value}ms"
                if split_value is not None
                else f"Trigger {i}"
            )
        elif split_tag == "spatial":
            label = f"Volume {i + 1}"
        else:
            label = f"Dataset {i}"

        result.append(
            DicomPhaseInfo(
                index=i,
                num_slices=len(file_paths),
                file_paths=file_paths,
                label=label,
                split_tag=split_tag,
                split_value=split_value,
            )
        )

    return result

split_dicom_phases(file_metadata)

Split DICOM file metadata into multiple phases/groups.

This function detects multi-phase DICOM series (e.g., cardiac phases, multi-echo, dynamic contrast) and splits them into separate groups.

The detection strategy is: 1. Check for distinctive DICOM tags (CardiacPhase, TemporalPosition, etc.) If a tag has >1 unique value, use it to group files. 2. Fallback: Check for duplicate spatial positions (ImagePositionPatient). If duplicates exist, group by order of appearance.

Parameters:

Name Type Description Default
file_metadata list[dict[str, Any]]

List of dictionaries containing at minimum: - 'file_path': Path to the DICOM file - 'ImagePositionPatient': Optional tuple of (x, y, z) - Any of the MULTI_PHASE_TAGS (optional)

required

Returns:

Type Description
list[list[dict[str, Any]]]

List of lists, where each inner list contains metadata dicts

list[list[dict[str, Any]]]

for one phase. Single-phase series return [[all_metadata]].

Example

Split DICOM metadata into separate phases:

from pictologics.utilities.dicom_utils import split_dicom_phases
from pathlib import Path

# Assume metadata list already collected
metadata = [
    {'file_path': Path('slice1.dcm'), 'CardiacPhase': 0},
    {'file_path': Path('slice2.dcm'), 'CardiacPhase': 10},
    # ... more files
]
phases = split_dicom_phases(metadata)
print(f"Found {len(phases)} phases")
Source code in pictologics/utilities/dicom_utils.py
def split_dicom_phases(
    file_metadata: list[dict[str, Any]],
) -> list[list[dict[str, Any]]]:
    """Split DICOM file metadata into multiple phases/groups.

    This function detects multi-phase DICOM series (e.g., cardiac phases,
    multi-echo, dynamic contrast) and splits them into separate groups.

    The detection strategy is:
    1. Check for distinctive DICOM tags (CardiacPhase, TemporalPosition, etc.)
       If a tag has >1 unique value, use it to group files.
    2. Fallback: Check for duplicate spatial positions (ImagePositionPatient).
       If duplicates exist, group by order of appearance.

    Args:
        file_metadata: List of dictionaries containing at minimum:
            - 'file_path': Path to the DICOM file
            - 'ImagePositionPatient': Optional tuple of (x, y, z)
            - Any of the MULTI_PHASE_TAGS (optional)

    Returns:
        List of lists, where each inner list contains metadata dicts
        for one phase. Single-phase series return [[all_metadata]].

    Example:
        Split DICOM metadata into separate phases:

        ```python
        from pictologics.utilities.dicom_utils import split_dicom_phases
        from pathlib import Path

        # Assume metadata list already collected
        metadata = [
            {'file_path': Path('slice1.dcm'), 'CardiacPhase': 0},
            {'file_path': Path('slice2.dcm'), 'CardiacPhase': 10},
            # ... more files
        ]
        phases = split_dicom_phases(metadata)
        print(f"Found {len(phases)} phases")
        ```
    """
    if len(file_metadata) < 2:
        return [file_metadata]

    # 1. Try splitting by multi-phase tags
    for tag in MULTI_PHASE_TAGS:
        values: dict[Any, list[dict[str, Any]]] = {}
        for meta in file_metadata:
            val = meta.get(tag)
            if val is not None:
                values.setdefault(val, []).append(meta)

        # If we have multiple groups and covered all files
        if len(values) > 1:
            total_grouped = sum(len(g) for g in values.values())
            if total_grouped == len(file_metadata):
                # Sort groups by tag value
                sorted_keys = sorted(values.keys())
                return [values[k] for k in sorted_keys]

    # 2. Fallback: Spatial duplication check
    pos_map: dict[tuple[float, float, float], list[dict[str, Any]]] = {}
    for meta in file_metadata:
        pos = meta.get("ImagePositionPatient")
        if pos:
            pos_tuple = tuple(pos) if isinstance(pos, (list, tuple)) else pos
            pos_map.setdefault(pos_tuple, []).append(meta)

    # Check if we have duplicates (any position has >1 instance)
    if any(len(g) > 1 for g in pos_map.values()):
        num_phases = max(len(g) for g in pos_map.values())
        phase_groups: list[list[dict[str, Any]]] = [[] for _ in range(num_phases)]

        # Sort by instance number for consistency
        sorted_metadata = sorted(
            file_metadata,
            key=lambda x: x.get("InstanceNumber", 0) or 0,
        )

        # Re-map with sorted metadata
        pos_map_sorted: dict[tuple[float, float, float], list[dict[str, Any]]] = {}
        for meta in sorted_metadata:
            pos = meta.get("ImagePositionPatient")
            if pos:
                pos_tuple = tuple(pos) if isinstance(pos, (list, tuple)) else pos
                pos_map_sorted.setdefault(pos_tuple, []).append(meta)
            else:
                phase_groups[0].append(meta)

        # Distribute duplicates across phases
        for _, metas in pos_map_sorted.items():
            for i, meta in enumerate(metas):
                if i < num_phases:
                    phase_groups[i].append(meta)
                else:
                    phase_groups[-1].append(meta)  # pragma: no cover

        # Filter empty groups
        return [g for g in phase_groups if g]

    return [file_metadata]

pictologics.utilities.sr_parser

DICOM Structured Report (SR) Parser

This module provides functionality for parsing DICOM Structured Reports (SR) and extracting measurement data into structured formats (DataFrames, CSV, JSON).

Supports TID1500 (Measurement Report) and other common SR templates. Uses highdicom for robust SR parsing and content extraction.

SRMeasurement dataclass

Represents a single measurement from an SR document.

This dataclass captures individual measurement values extracted from DICOM Structured Reports, including the measurement name, value, units, and associated context.

Attributes:

Name Type Description
name str

Measurement concept name (e.g., "Agatston Score", "Volume").

value float

Numerical measurement value.

unit str

Unit of measurement (e.g., "mm", "HU", "1" for unitless).

finding_type Optional[str]

Type of finding this measurement relates to (optional).

finding_site Optional[str]

Anatomical site of finding (optional).

derivation Optional[str]

How the measurement was derived (optional).

tracking_id Optional[str]

Optional tracking identifier for longitudinal studies.

metadata dict[str, Any]

Additional extracted attributes not captured above.

Source code in pictologics/utilities/sr_parser.py
@dataclass
class SRMeasurement:
    """Represents a single measurement from an SR document.

    This dataclass captures individual measurement values extracted from
    DICOM Structured Reports, including the measurement name, value,
    units, and associated context.

    Attributes:
        name: Measurement concept name (e.g., "Agatston Score", "Volume").
        value: Numerical measurement value.
        unit: Unit of measurement (e.g., "mm", "HU", "1" for unitless).
        finding_type: Type of finding this measurement relates to (optional).
        finding_site: Anatomical site of finding (optional).
        derivation: How the measurement was derived (optional).
        tracking_id: Optional tracking identifier for longitudinal studies.
        metadata: Additional extracted attributes not captured above.
    """

    name: str
    value: float
    unit: str
    finding_type: Optional[str] = None
    finding_site: Optional[str] = None
    derivation: Optional[str] = None
    tracking_id: Optional[str] = None
    metadata: dict[str, Any] = field(default_factory=dict)

SRMeasurementGroup dataclass

Represents a group of related measurements.

SR documents often organize measurements into groups based on anatomical site, finding type, or other criteria. This dataclass captures such groupings.

Attributes:

Name Type Description
group_id Optional[str]

Identifier for this measurement group (optional).

finding_type Optional[str]

Type of finding for this group (optional).

finding_site Optional[str]

Anatomical site for this group (optional).

measurements list[SRMeasurement]

List of SRMeasurement objects in this group.

metadata dict[str, Any]

Additional group-level attributes.

Source code in pictologics/utilities/sr_parser.py
@dataclass
class SRMeasurementGroup:
    """Represents a group of related measurements.

    SR documents often organize measurements into groups based on
    anatomical site, finding type, or other criteria. This dataclass
    captures such groupings.

    Attributes:
        group_id: Identifier for this measurement group (optional).
        finding_type: Type of finding for this group (optional).
        finding_site: Anatomical site for this group (optional).
        measurements: List of SRMeasurement objects in this group.
        metadata: Additional group-level attributes.
    """

    group_id: Optional[str] = None
    finding_type: Optional[str] = None
    finding_site: Optional[str] = None
    measurements: list[SRMeasurement] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

SRDocument dataclass

Represents a parsed DICOM Structured Report.

This class provides the main interface for accessing SR content, following the same pattern as DicomDatabase. It can be constructed from a file using the from_file() class method.

Attributes:

Name Type Description
file_path Path

Path to the source SR file.

sop_instance_uid str

Unique identifier for this SR instance.

template_id Optional[str]

SR template identifier (e.g., "1500" for TID1500).

document_title Optional[str]

Title of the SR document.

measurement_groups list[SRMeasurementGroup]

List of SRMeasurementGroup objects.

patient_id Optional[str]

Patient identifier.

study_instance_uid Optional[str]

Study UID.

series_instance_uid Optional[str]

Series UID.

content_datetime Optional[str]

When the SR was created.

metadata dict[str, Any]

Additional document-level attributes.

Example

Load and parse an SR document:

from pictologics.utilities.sr_parser import SRDocument

sr = SRDocument.from_file("measurements.dcm")
print(f"Template: {sr.template_id}")
print(f"Groups: {len(sr.measurement_groups)}")

# Export measurements to DataFrame
df = sr.get_measurements_df()
print(df[["measurement_name", "value", "unit"]])

# Export to CSV
sr.export_csv("measurements.csv")
Source code in pictologics/utilities/sr_parser.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
@dataclass
class SRDocument:
    """Represents a parsed DICOM Structured Report.

    This class provides the main interface for accessing SR content,
    following the same pattern as DicomDatabase. It can be constructed
    from a file using the `from_file()` class method.

    Attributes:
        file_path: Path to the source SR file.
        sop_instance_uid: Unique identifier for this SR instance.
        template_id: SR template identifier (e.g., "1500" for TID1500).
        document_title: Title of the SR document.
        measurement_groups: List of SRMeasurementGroup objects.
        patient_id: Patient identifier.
        study_instance_uid: Study UID.
        series_instance_uid: Series UID.
        content_datetime: When the SR was created.
        metadata: Additional document-level attributes.

    Example:
        Load and parse an SR document:

        ```python
        from pictologics.utilities.sr_parser import SRDocument

        sr = SRDocument.from_file("measurements.dcm")
        print(f"Template: {sr.template_id}")
        print(f"Groups: {len(sr.measurement_groups)}")

        # Export measurements to DataFrame
        df = sr.get_measurements_df()
        print(df[["measurement_name", "value", "unit"]])

        # Export to CSV
        sr.export_csv("measurements.csv")
        ```
    """

    file_path: Path
    sop_instance_uid: str
    template_id: Optional[str] = None
    document_title: Optional[str] = None
    measurement_groups: list[SRMeasurementGroup] = field(default_factory=list)
    patient_id: Optional[str] = None
    study_instance_uid: Optional[str] = None
    series_instance_uid: Optional[str] = None
    content_datetime: Optional[str] = None
    metadata: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_file(
        cls,
        path: str | Path,
        extract_private_tags: bool = False,
    ) -> "SRDocument":
        """Load and parse an SR document from file.

        This method reads a DICOM Structured Report file and extracts
        all measurement content into the hierarchical dataclass structure.
        Follows the same pattern as DicomDatabase.from_folders().

        Args:
            path: Path to DICOM SR file.
            extract_private_tags: Whether to extract vendor-specific tags
                into the metadata dictionaries. Defaults to False.

        Returns:
            SRDocument instance with parsed content.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If the file is not a valid DICOM SR object.
        """
        import pydicom

        path_obj = Path(path)
        if not path_obj.exists():
            raise FileNotFoundError(f"SR file not found: {path}")

        # Load the DICOM file
        try:
            dcm = pydicom.dcmread(str(path_obj))
        except Exception as e:
            raise ValueError(f"Failed to read DICOM file: {e}") from e

        # Check if it's an SR document
        sr_sop_classes = [
            "1.2.840.10008.5.1.4.1.1.88.11",  # Basic Text SR
            "1.2.840.10008.5.1.4.1.1.88.22",  # Enhanced SR
            "1.2.840.10008.5.1.4.1.1.88.33",  # Comprehensive SR
            "1.2.840.10008.5.1.4.1.1.88.34",  # Comprehensive 3D SR
            "1.2.840.10008.5.1.4.1.1.88.35",  # Extensible SR
            "1.2.840.10008.5.1.4.1.1.88.40",  # Procedure Log
        ]
        sop_class = str(getattr(dcm, "SOPClassUID", ""))
        if sop_class not in sr_sop_classes:
            raise ValueError(
                f"File is not a DICOM SR document. SOPClassUID: {sop_class}"
            )

        # Extract basic document info
        sop_instance_uid = str(getattr(dcm, "SOPInstanceUID", ""))
        patient_id = str(getattr(dcm, "PatientID", "")) or None
        study_uid = str(getattr(dcm, "StudyInstanceUID", "")) or None
        series_uid = str(getattr(dcm, "SeriesInstanceUID", "")) or None

        # Content datetime
        content_date = getattr(dcm, "ContentDate", None)
        content_time = getattr(dcm, "ContentTime", None)
        if content_date:
            content_datetime = str(content_date)
            if content_time:
                content_datetime += f"T{content_time}"
        else:
            content_datetime = None

        # Extract document title from ConceptNameCodeSequence
        doc_title = None
        if hasattr(dcm, "ConceptNameCodeSequence") and dcm.ConceptNameCodeSequence:
            concept = dcm.ConceptNameCodeSequence[0]
            doc_title = str(getattr(concept, "CodeMeaning", "")) or None

        # Extract template ID if present
        template_id = None
        if hasattr(dcm, "ContentTemplateSequence") and dcm.ContentTemplateSequence:
            template = dcm.ContentTemplateSequence[0]
            template_id = str(getattr(template, "TemplateIdentifier", "")) or None

        # Parse content sequence for measurements
        measurement_groups = _parse_content_sequence(dcm, extract_private_tags)

        # Build metadata dict
        metadata: dict[str, Any] = {}
        if extract_private_tags:
            # Extract any private tags
            for elem in dcm:
                if elem.tag.is_private:
                    try:
                        metadata[elem.keyword or str(elem.tag)] = str(elem.value)
                    except Exception:
                        pass

        return cls(
            file_path=path_obj,
            sop_instance_uid=sop_instance_uid,
            template_id=template_id,
            document_title=doc_title,
            measurement_groups=measurement_groups,
            patient_id=patient_id,
            study_instance_uid=study_uid,
            series_instance_uid=series_uid,
            content_datetime=content_datetime,
            metadata=metadata,
        )

    def get_measurements_df(self) -> pd.DataFrame:
        """Export all measurements as a DataFrame.

        Returns a flat DataFrame with all measurements from all groups,
        including group context for each measurement.

        Returns:
            DataFrame with columns:
            - group_id: Identifier of the measurement group
            - finding_type: Type of finding
            - finding_site: Anatomical site
            - measurement_name: Name of the measurement
            - value: Numerical value
            - unit: Unit of measurement
            - derivation: How it was derived
            - tracking_id: Tracking identifier
        """
        rows = []
        for group in self.measurement_groups:
            for meas in group.measurements:
                rows.append(
                    {
                        "group_id": group.group_id,
                        "finding_type": group.finding_type or meas.finding_type,
                        "finding_site": group.finding_site or meas.finding_site,
                        "measurement_name": meas.name,
                        "value": meas.value,
                        "unit": meas.unit,
                        "derivation": meas.derivation,
                        "tracking_id": meas.tracking_id,
                    }
                )

        return pd.DataFrame(rows)

    def get_summary(self) -> dict[str, Any]:
        """Get document summary without full parsing.

        Returns:
            Dictionary with summary information including:
            - sop_instance_uid
            - template_id
            - document_title
            - num_groups
            - num_measurements
            - patient_id
            - study_instance_uid
        """
        total_measurements = sum(len(g.measurements) for g in self.measurement_groups)
        return {
            "sop_instance_uid": self.sop_instance_uid,
            "template_id": self.template_id,
            "document_title": self.document_title,
            "num_groups": len(self.measurement_groups),
            "num_measurements": total_measurements,
            "patient_id": self.patient_id,
            "study_instance_uid": self.study_instance_uid,
            "content_datetime": self.content_datetime,
        }

    def export_csv(self, path: str | Path) -> Path:
        """Export measurements to CSV file.

        Args:
            path: Output path for the CSV file.

        Returns:
            Path to the created CSV file.
        """
        path_obj = Path(path)
        df = self.get_measurements_df()
        df.to_csv(path_obj, index=False)
        return path_obj

    def export_json(self, path: str | Path) -> Path:
        """Export full SR content to JSON.

        Exports the complete document structure including all groups,
        measurements, and metadata.

        Args:
            path: Output path for the JSON file.

        Returns:
            Path to the created JSON file.
        """
        import json

        path_obj = Path(path)

        # Build JSON structure
        data: dict[str, Any] = {
            "sop_instance_uid": self.sop_instance_uid,
            "template_id": self.template_id,
            "document_title": self.document_title,
            "patient_id": self.patient_id,
            "study_instance_uid": self.study_instance_uid,
            "series_instance_uid": self.series_instance_uid,
            "content_datetime": self.content_datetime,
            "metadata": self.metadata,
            "measurement_groups": [],
        }

        for group in self.measurement_groups:
            group_data: dict[str, Any] = {
                "group_id": group.group_id,
                "finding_type": group.finding_type,
                "finding_site": group.finding_site,
                "metadata": group.metadata,
                "measurements": [],
            }
            for meas in group.measurements:
                meas_data = {
                    "name": meas.name,
                    "value": meas.value,
                    "unit": meas.unit,
                    "finding_type": meas.finding_type,
                    "finding_site": meas.finding_site,
                    "derivation": meas.derivation,
                    "tracking_id": meas.tracking_id,
                    "metadata": meas.metadata,
                }
                group_data["measurements"].append(meas_data)
            data["measurement_groups"].append(group_data)

        with open(path_obj, "w") as f:
            json.dump(data, f, indent=2)

        return path_obj

    # ========================================================================
    # Batch Processing (from_folders)
    # ========================================================================

    @classmethod
    def from_folders(
        cls,
        paths: list[str | Path],
        recursive: bool = True,
        show_progress: bool = True,
        num_workers: Optional[int] = None,
        output_dir: Optional[str | Path] = None,
        export_csv: bool = True,
        export_json: bool = True,
        extract_private_tags: bool = False,
    ) -> "SRBatch":
        """Batch process SR files from folders.

        Scans directories for DICOM SR files, parses each one, and optionally
        exports individual CSV/JSON files plus a combined output and log.

        This method follows the same pattern as DicomDatabase.from_folders().

        Args:
            paths: List of folder paths to scan for SR files.
            recursive: Whether to scan subdirectories (default: True).
            show_progress: Whether to display progress bars (default: True).
            num_workers: Number of parallel workers. None=auto (cpu_count-1),
                        1=sequential (no multiprocessing).
            output_dir: If specified, exports each SR to this directory.
            export_csv: Export individual CSV files (default: True).
            export_json: Export individual JSON files (default: True).
            extract_private_tags: Whether to extract private tags (default: False).

        Returns:
            SRBatch containing all parsed documents and processing log.

        Example:
            Process all SR files in a folder:

            ```python
            from pictologics.utilities.sr_parser import SRDocument

            # Process folder
            batch = SRDocument.from_folders(["sr_data/"])
            print(f"Found {len(batch.documents)} SR files")
            df = batch.get_combined_measurements_df()

            # Process with exports
            batch = SRDocument.from_folders(
                ["sr_data/"],
                output_dir="sr_exports/",
                export_csv=True,
                export_json=True
            )
            batch.export_log("sr_exports/processing_log.csv")
            ```
        """
        import os
        from concurrent.futures import ProcessPoolExecutor

        from tqdm import tqdm

        # Convert paths to Path objects
        path_objs = [Path(p) for p in paths]

        # Determine number of workers
        if num_workers is None:
            cpu_count = os.cpu_count()
            num_workers = max(1, (cpu_count - 1) if cpu_count else 1)

        # Step 1: Discover all SR files
        sr_files: list[Path] = []
        for path_obj in path_objs:
            if not path_obj.exists():
                continue
            if path_obj.is_file():
                if is_dicom_sr(path_obj):
                    sr_files.append(path_obj)
            else:
                # Directory
                iterator = path_obj.rglob("*") if recursive else path_obj.iterdir()
                for f in iterator:
                    if f.is_file() and is_dicom_sr(f):
                        sr_files.append(f)

        if not sr_files:
            return SRBatch(documents=[], processing_log=[], output_dir=None)

        # Create output directory if specified
        out_path = Path(output_dir) if output_dir else None
        if out_path:
            out_path.mkdir(parents=True, exist_ok=True)

        # Step 2: Process each SR file
        processing_log: list[dict[str, Any]] = []
        documents: list["SRDocument"] = []

        # Prepare worker arguments
        worker_args = [
            (f, extract_private_tags, out_path, export_csv, export_json)
            for f in sr_files
        ]

        if num_workers == 1:
            # Sequential processing
            iterator = tqdm(
                worker_args, desc="Processing SR files", disable=not show_progress
            )
            for args in iterator:
                result = _process_sr_file_worker(args)
                processing_log.append(result["log"])
                if result["document"] is not None:
                    documents.append(result["document"])
        else:
            # Parallel processing
            with ProcessPoolExecutor(max_workers=num_workers) as executor:
                results = list(
                    tqdm(
                        executor.map(_process_sr_file_worker, worker_args),
                        total=len(worker_args),
                        desc="Processing SR files",
                        disable=not show_progress,
                    )
                )
            for result in results:
                processing_log.append(result["log"])
                if result["document"] is not None:
                    documents.append(result["document"])  # pragma: no cover

        return SRBatch(
            documents=documents,
            processing_log=processing_log,
            output_dir=out_path,
        )

export_csv(path)

Export measurements to CSV file.

Parameters:

Name Type Description Default
path str | Path

Output path for the CSV file.

required

Returns:

Type Description
Path

Path to the created CSV file.

Source code in pictologics/utilities/sr_parser.py
def export_csv(self, path: str | Path) -> Path:
    """Export measurements to CSV file.

    Args:
        path: Output path for the CSV file.

    Returns:
        Path to the created CSV file.
    """
    path_obj = Path(path)
    df = self.get_measurements_df()
    df.to_csv(path_obj, index=False)
    return path_obj

export_json(path)

Export full SR content to JSON.

Exports the complete document structure including all groups, measurements, and metadata.

Parameters:

Name Type Description Default
path str | Path

Output path for the JSON file.

required

Returns:

Type Description
Path

Path to the created JSON file.

Source code in pictologics/utilities/sr_parser.py
def export_json(self, path: str | Path) -> Path:
    """Export full SR content to JSON.

    Exports the complete document structure including all groups,
    measurements, and metadata.

    Args:
        path: Output path for the JSON file.

    Returns:
        Path to the created JSON file.
    """
    import json

    path_obj = Path(path)

    # Build JSON structure
    data: dict[str, Any] = {
        "sop_instance_uid": self.sop_instance_uid,
        "template_id": self.template_id,
        "document_title": self.document_title,
        "patient_id": self.patient_id,
        "study_instance_uid": self.study_instance_uid,
        "series_instance_uid": self.series_instance_uid,
        "content_datetime": self.content_datetime,
        "metadata": self.metadata,
        "measurement_groups": [],
    }

    for group in self.measurement_groups:
        group_data: dict[str, Any] = {
            "group_id": group.group_id,
            "finding_type": group.finding_type,
            "finding_site": group.finding_site,
            "metadata": group.metadata,
            "measurements": [],
        }
        for meas in group.measurements:
            meas_data = {
                "name": meas.name,
                "value": meas.value,
                "unit": meas.unit,
                "finding_type": meas.finding_type,
                "finding_site": meas.finding_site,
                "derivation": meas.derivation,
                "tracking_id": meas.tracking_id,
                "metadata": meas.metadata,
            }
            group_data["measurements"].append(meas_data)
        data["measurement_groups"].append(group_data)

    with open(path_obj, "w") as f:
        json.dump(data, f, indent=2)

    return path_obj

from_file(path, extract_private_tags=False) classmethod

Load and parse an SR document from file.

This method reads a DICOM Structured Report file and extracts all measurement content into the hierarchical dataclass structure. Follows the same pattern as DicomDatabase.from_folders().

Parameters:

Name Type Description Default
path str | Path

Path to DICOM SR file.

required
extract_private_tags bool

Whether to extract vendor-specific tags into the metadata dictionaries. Defaults to False.

False

Returns:

Type Description
'SRDocument'

SRDocument instance with parsed content.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If the file is not a valid DICOM SR object.

Source code in pictologics/utilities/sr_parser.py
@classmethod
def from_file(
    cls,
    path: str | Path,
    extract_private_tags: bool = False,
) -> "SRDocument":
    """Load and parse an SR document from file.

    This method reads a DICOM Structured Report file and extracts
    all measurement content into the hierarchical dataclass structure.
    Follows the same pattern as DicomDatabase.from_folders().

    Args:
        path: Path to DICOM SR file.
        extract_private_tags: Whether to extract vendor-specific tags
            into the metadata dictionaries. Defaults to False.

    Returns:
        SRDocument instance with parsed content.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the file is not a valid DICOM SR object.
    """
    import pydicom

    path_obj = Path(path)
    if not path_obj.exists():
        raise FileNotFoundError(f"SR file not found: {path}")

    # Load the DICOM file
    try:
        dcm = pydicom.dcmread(str(path_obj))
    except Exception as e:
        raise ValueError(f"Failed to read DICOM file: {e}") from e

    # Check if it's an SR document
    sr_sop_classes = [
        "1.2.840.10008.5.1.4.1.1.88.11",  # Basic Text SR
        "1.2.840.10008.5.1.4.1.1.88.22",  # Enhanced SR
        "1.2.840.10008.5.1.4.1.1.88.33",  # Comprehensive SR
        "1.2.840.10008.5.1.4.1.1.88.34",  # Comprehensive 3D SR
        "1.2.840.10008.5.1.4.1.1.88.35",  # Extensible SR
        "1.2.840.10008.5.1.4.1.1.88.40",  # Procedure Log
    ]
    sop_class = str(getattr(dcm, "SOPClassUID", ""))
    if sop_class not in sr_sop_classes:
        raise ValueError(
            f"File is not a DICOM SR document. SOPClassUID: {sop_class}"
        )

    # Extract basic document info
    sop_instance_uid = str(getattr(dcm, "SOPInstanceUID", ""))
    patient_id = str(getattr(dcm, "PatientID", "")) or None
    study_uid = str(getattr(dcm, "StudyInstanceUID", "")) or None
    series_uid = str(getattr(dcm, "SeriesInstanceUID", "")) or None

    # Content datetime
    content_date = getattr(dcm, "ContentDate", None)
    content_time = getattr(dcm, "ContentTime", None)
    if content_date:
        content_datetime = str(content_date)
        if content_time:
            content_datetime += f"T{content_time}"
    else:
        content_datetime = None

    # Extract document title from ConceptNameCodeSequence
    doc_title = None
    if hasattr(dcm, "ConceptNameCodeSequence") and dcm.ConceptNameCodeSequence:
        concept = dcm.ConceptNameCodeSequence[0]
        doc_title = str(getattr(concept, "CodeMeaning", "")) or None

    # Extract template ID if present
    template_id = None
    if hasattr(dcm, "ContentTemplateSequence") and dcm.ContentTemplateSequence:
        template = dcm.ContentTemplateSequence[0]
        template_id = str(getattr(template, "TemplateIdentifier", "")) or None

    # Parse content sequence for measurements
    measurement_groups = _parse_content_sequence(dcm, extract_private_tags)

    # Build metadata dict
    metadata: dict[str, Any] = {}
    if extract_private_tags:
        # Extract any private tags
        for elem in dcm:
            if elem.tag.is_private:
                try:
                    metadata[elem.keyword or str(elem.tag)] = str(elem.value)
                except Exception:
                    pass

    return cls(
        file_path=path_obj,
        sop_instance_uid=sop_instance_uid,
        template_id=template_id,
        document_title=doc_title,
        measurement_groups=measurement_groups,
        patient_id=patient_id,
        study_instance_uid=study_uid,
        series_instance_uid=series_uid,
        content_datetime=content_datetime,
        metadata=metadata,
    )

from_folders(paths, recursive=True, show_progress=True, num_workers=None, output_dir=None, export_csv=True, export_json=True, extract_private_tags=False) classmethod

Batch process SR files from folders.

Scans directories for DICOM SR files, parses each one, and optionally exports individual CSV/JSON files plus a combined output and log.

This method follows the same pattern as DicomDatabase.from_folders().

Parameters:

Name Type Description Default
paths list[str | Path]

List of folder paths to scan for SR files.

required
recursive bool

Whether to scan subdirectories (default: True).

True
show_progress bool

Whether to display progress bars (default: True).

True
num_workers Optional[int]

Number of parallel workers. None=auto (cpu_count-1), 1=sequential (no multiprocessing).

None
output_dir Optional[str | Path]

If specified, exports each SR to this directory.

None
export_csv bool

Export individual CSV files (default: True).

True
export_json bool

Export individual JSON files (default: True).

True
extract_private_tags bool

Whether to extract private tags (default: False).

False

Returns:

Type Description
'SRBatch'

SRBatch containing all parsed documents and processing log.

Example

Process all SR files in a folder:

from pictologics.utilities.sr_parser import SRDocument

# Process folder
batch = SRDocument.from_folders(["sr_data/"])
print(f"Found {len(batch.documents)} SR files")
df = batch.get_combined_measurements_df()

# Process with exports
batch = SRDocument.from_folders(
    ["sr_data/"],
    output_dir="sr_exports/",
    export_csv=True,
    export_json=True
)
batch.export_log("sr_exports/processing_log.csv")
Source code in pictologics/utilities/sr_parser.py
@classmethod
def from_folders(
    cls,
    paths: list[str | Path],
    recursive: bool = True,
    show_progress: bool = True,
    num_workers: Optional[int] = None,
    output_dir: Optional[str | Path] = None,
    export_csv: bool = True,
    export_json: bool = True,
    extract_private_tags: bool = False,
) -> "SRBatch":
    """Batch process SR files from folders.

    Scans directories for DICOM SR files, parses each one, and optionally
    exports individual CSV/JSON files plus a combined output and log.

    This method follows the same pattern as DicomDatabase.from_folders().

    Args:
        paths: List of folder paths to scan for SR files.
        recursive: Whether to scan subdirectories (default: True).
        show_progress: Whether to display progress bars (default: True).
        num_workers: Number of parallel workers. None=auto (cpu_count-1),
                    1=sequential (no multiprocessing).
        output_dir: If specified, exports each SR to this directory.
        export_csv: Export individual CSV files (default: True).
        export_json: Export individual JSON files (default: True).
        extract_private_tags: Whether to extract private tags (default: False).

    Returns:
        SRBatch containing all parsed documents and processing log.

    Example:
        Process all SR files in a folder:

        ```python
        from pictologics.utilities.sr_parser import SRDocument

        # Process folder
        batch = SRDocument.from_folders(["sr_data/"])
        print(f"Found {len(batch.documents)} SR files")
        df = batch.get_combined_measurements_df()

        # Process with exports
        batch = SRDocument.from_folders(
            ["sr_data/"],
            output_dir="sr_exports/",
            export_csv=True,
            export_json=True
        )
        batch.export_log("sr_exports/processing_log.csv")
        ```
    """
    import os
    from concurrent.futures import ProcessPoolExecutor

    from tqdm import tqdm

    # Convert paths to Path objects
    path_objs = [Path(p) for p in paths]

    # Determine number of workers
    if num_workers is None:
        cpu_count = os.cpu_count()
        num_workers = max(1, (cpu_count - 1) if cpu_count else 1)

    # Step 1: Discover all SR files
    sr_files: list[Path] = []
    for path_obj in path_objs:
        if not path_obj.exists():
            continue
        if path_obj.is_file():
            if is_dicom_sr(path_obj):
                sr_files.append(path_obj)
        else:
            # Directory
            iterator = path_obj.rglob("*") if recursive else path_obj.iterdir()
            for f in iterator:
                if f.is_file() and is_dicom_sr(f):
                    sr_files.append(f)

    if not sr_files:
        return SRBatch(documents=[], processing_log=[], output_dir=None)

    # Create output directory if specified
    out_path = Path(output_dir) if output_dir else None
    if out_path:
        out_path.mkdir(parents=True, exist_ok=True)

    # Step 2: Process each SR file
    processing_log: list[dict[str, Any]] = []
    documents: list["SRDocument"] = []

    # Prepare worker arguments
    worker_args = [
        (f, extract_private_tags, out_path, export_csv, export_json)
        for f in sr_files
    ]

    if num_workers == 1:
        # Sequential processing
        iterator = tqdm(
            worker_args, desc="Processing SR files", disable=not show_progress
        )
        for args in iterator:
            result = _process_sr_file_worker(args)
            processing_log.append(result["log"])
            if result["document"] is not None:
                documents.append(result["document"])
    else:
        # Parallel processing
        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            results = list(
                tqdm(
                    executor.map(_process_sr_file_worker, worker_args),
                    total=len(worker_args),
                    desc="Processing SR files",
                    disable=not show_progress,
                )
            )
        for result in results:
            processing_log.append(result["log"])
            if result["document"] is not None:
                documents.append(result["document"])  # pragma: no cover

    return SRBatch(
        documents=documents,
        processing_log=processing_log,
        output_dir=out_path,
    )

get_measurements_df()

Export all measurements as a DataFrame.

Returns a flat DataFrame with all measurements from all groups, including group context for each measurement.

Returns:

Type Description
DataFrame

DataFrame with columns:

DataFrame
  • group_id: Identifier of the measurement group
DataFrame
  • finding_type: Type of finding
DataFrame
  • finding_site: Anatomical site
DataFrame
  • measurement_name: Name of the measurement
DataFrame
  • value: Numerical value
DataFrame
  • unit: Unit of measurement
DataFrame
  • derivation: How it was derived
DataFrame
  • tracking_id: Tracking identifier
Source code in pictologics/utilities/sr_parser.py
def get_measurements_df(self) -> pd.DataFrame:
    """Export all measurements as a DataFrame.

    Returns a flat DataFrame with all measurements from all groups,
    including group context for each measurement.

    Returns:
        DataFrame with columns:
        - group_id: Identifier of the measurement group
        - finding_type: Type of finding
        - finding_site: Anatomical site
        - measurement_name: Name of the measurement
        - value: Numerical value
        - unit: Unit of measurement
        - derivation: How it was derived
        - tracking_id: Tracking identifier
    """
    rows = []
    for group in self.measurement_groups:
        for meas in group.measurements:
            rows.append(
                {
                    "group_id": group.group_id,
                    "finding_type": group.finding_type or meas.finding_type,
                    "finding_site": group.finding_site or meas.finding_site,
                    "measurement_name": meas.name,
                    "value": meas.value,
                    "unit": meas.unit,
                    "derivation": meas.derivation,
                    "tracking_id": meas.tracking_id,
                }
            )

    return pd.DataFrame(rows)

get_summary()

Get document summary without full parsing.

Returns:

Type Description
dict[str, Any]

Dictionary with summary information including:

dict[str, Any]
  • sop_instance_uid
dict[str, Any]
  • template_id
dict[str, Any]
  • document_title
dict[str, Any]
  • num_groups
dict[str, Any]
  • num_measurements
dict[str, Any]
  • patient_id
dict[str, Any]
  • study_instance_uid
Source code in pictologics/utilities/sr_parser.py
def get_summary(self) -> dict[str, Any]:
    """Get document summary without full parsing.

    Returns:
        Dictionary with summary information including:
        - sop_instance_uid
        - template_id
        - document_title
        - num_groups
        - num_measurements
        - patient_id
        - study_instance_uid
    """
    total_measurements = sum(len(g.measurements) for g in self.measurement_groups)
    return {
        "sop_instance_uid": self.sop_instance_uid,
        "template_id": self.template_id,
        "document_title": self.document_title,
        "num_groups": len(self.measurement_groups),
        "num_measurements": total_measurements,
        "patient_id": self.patient_id,
        "study_instance_uid": self.study_instance_uid,
        "content_datetime": self.content_datetime,
    }

SRBatch dataclass

Collection of parsed SR documents from batch processing.

This class holds the results of batch SR processing via SRDocument.from_folders(). It provides access to all parsed documents and methods for combined exports.

Attributes:

Name Type Description
documents list[SRDocument]

List of successfully parsed SRDocument objects.

processing_log list[dict[str, Any]]

Log entries for each processed file (success/error).

output_dir Optional[Path]

Directory where individual exports were written.

Example
from pictologics.utilities.sr_parser import SRDocument

batch = SRDocument.from_folders(["sr_data/"], output_dir="exports/")
print(f"Processed {len(batch.documents)} SR files")
df = batch.get_combined_measurements_df()
batch.export_log("exports/processing_log.csv")
Source code in pictologics/utilities/sr_parser.py
@dataclass
class SRBatch:
    """Collection of parsed SR documents from batch processing.

    This class holds the results of batch SR processing via
    SRDocument.from_folders(). It provides access to all parsed documents
    and methods for combined exports.

    Attributes:
        documents: List of successfully parsed SRDocument objects.
        processing_log: Log entries for each processed file (success/error).
        output_dir: Directory where individual exports were written.

    Example:
        ```python
        from pictologics.utilities.sr_parser import SRDocument

        batch = SRDocument.from_folders(["sr_data/"], output_dir="exports/")
        print(f"Processed {len(batch.documents)} SR files")
        df = batch.get_combined_measurements_df()
        batch.export_log("exports/processing_log.csv")
        ```
    """

    documents: list[SRDocument] = field(default_factory=list)
    processing_log: list[dict[str, Any]] = field(default_factory=list)
    output_dir: Optional[Path] = None

    def get_combined_measurements_df(self) -> pd.DataFrame:
        """Combine measurements from all documents into a single DataFrame.

        Each measurement row includes the source document's SOP Instance UID,
        patient ID, and study UID for traceability.

        Returns:
            DataFrame with all measurements from all documents.
        """
        all_rows: list[dict[str, Any]] = []

        for doc in self.documents:
            for group in doc.measurement_groups:
                for meas in group.measurements:
                    all_rows.append(
                        {
                            "sop_instance_uid": doc.sop_instance_uid,
                            "patient_id": doc.patient_id,
                            "study_instance_uid": doc.study_instance_uid,
                            "group_finding_type": group.finding_type,
                            "group_finding_site": group.finding_site,
                            "measurement_name": meas.name,
                            "value": meas.value,
                            "unit": meas.unit,
                            "finding_type": meas.finding_type,
                            "finding_site": meas.finding_site,
                            "derivation": meas.derivation,
                            "tracking_id": meas.tracking_id,
                        }
                    )

        return pd.DataFrame(all_rows)

    def export_combined_csv(self, path: str | Path) -> Path:
        """Export combined measurements to a single CSV file.

        Args:
            path: Output path for the combined CSV.

        Returns:
            Path to the created CSV file.
        """
        path_obj = Path(path)
        df = self.get_combined_measurements_df()
        df.to_csv(path_obj, index=False)
        return path_obj

    def export_log(self, path: str | Path) -> Path:
        """Export processing log to CSV.

        The log contains one row per processed file with status,
        output paths, and any error messages.

        Args:
            path: Output path for the log CSV.

        Returns:
            Path to the created CSV file.
        """
        path_obj = Path(path)
        df = pd.DataFrame(self.processing_log)
        df.to_csv(path_obj, index=False)
        return path_obj

export_combined_csv(path)

Export combined measurements to a single CSV file.

Parameters:

Name Type Description Default
path str | Path

Output path for the combined CSV.

required

Returns:

Type Description
Path

Path to the created CSV file.

Source code in pictologics/utilities/sr_parser.py
def export_combined_csv(self, path: str | Path) -> Path:
    """Export combined measurements to a single CSV file.

    Args:
        path: Output path for the combined CSV.

    Returns:
        Path to the created CSV file.
    """
    path_obj = Path(path)
    df = self.get_combined_measurements_df()
    df.to_csv(path_obj, index=False)
    return path_obj

export_log(path)

Export processing log to CSV.

The log contains one row per processed file with status, output paths, and any error messages.

Parameters:

Name Type Description Default
path str | Path

Output path for the log CSV.

required

Returns:

Type Description
Path

Path to the created CSV file.

Source code in pictologics/utilities/sr_parser.py
def export_log(self, path: str | Path) -> Path:
    """Export processing log to CSV.

    The log contains one row per processed file with status,
    output paths, and any error messages.

    Args:
        path: Output path for the log CSV.

    Returns:
        Path to the created CSV file.
    """
    path_obj = Path(path)
    df = pd.DataFrame(self.processing_log)
    df.to_csv(path_obj, index=False)
    return path_obj

get_combined_measurements_df()

Combine measurements from all documents into a single DataFrame.

Each measurement row includes the source document's SOP Instance UID, patient ID, and study UID for traceability.

Returns:

Type Description
DataFrame

DataFrame with all measurements from all documents.

Source code in pictologics/utilities/sr_parser.py
def get_combined_measurements_df(self) -> pd.DataFrame:
    """Combine measurements from all documents into a single DataFrame.

    Each measurement row includes the source document's SOP Instance UID,
    patient ID, and study UID for traceability.

    Returns:
        DataFrame with all measurements from all documents.
    """
    all_rows: list[dict[str, Any]] = []

    for doc in self.documents:
        for group in doc.measurement_groups:
            for meas in group.measurements:
                all_rows.append(
                    {
                        "sop_instance_uid": doc.sop_instance_uid,
                        "patient_id": doc.patient_id,
                        "study_instance_uid": doc.study_instance_uid,
                        "group_finding_type": group.finding_type,
                        "group_finding_site": group.finding_site,
                        "measurement_name": meas.name,
                        "value": meas.value,
                        "unit": meas.unit,
                        "finding_type": meas.finding_type,
                        "finding_site": meas.finding_site,
                        "derivation": meas.derivation,
                        "tracking_id": meas.tracking_id,
                    }
                )

    return pd.DataFrame(all_rows)

is_dicom_sr(path)

Check if a DICOM file is a Structured Report.

Parameters:

Name Type Description Default
path str | Path

Path to the potential DICOM file.

required

Returns:

Type Description
bool

True if the file is a DICOM SR object, False otherwise.

Source code in pictologics/utilities/sr_parser.py
def is_dicom_sr(path: str | Path) -> bool:
    """Check if a DICOM file is a Structured Report.

    Args:
        path: Path to the potential DICOM file.

    Returns:
        True if the file is a DICOM SR object, False otherwise.
    """
    import pydicom

    try:
        dcm = pydicom.dcmread(str(path), stop_before_pixels=True)
        sop_class = str(getattr(dcm, "SOPClassUID", ""))

        sr_sop_classes = [
            "1.2.840.10008.5.1.4.1.1.88.11",  # Basic Text SR
            "1.2.840.10008.5.1.4.1.1.88.22",  # Enhanced SR
            "1.2.840.10008.5.1.4.1.1.88.33",  # Comprehensive SR
            "1.2.840.10008.5.1.4.1.1.88.34",  # Comprehensive 3D SR
            "1.2.840.10008.5.1.4.1.1.88.35",  # Extensible SR
            "1.2.840.10008.5.1.4.1.1.88.40",  # Procedure Log
        ]

        return sop_class in sr_sop_classes
    except Exception:
        return False

pictologics.utilities.visualization

Visualization Module

This module provides utilities for visualizing medical images and segmentation masks. It supports interactive slice scrolling and batch export of images.

Key Features

  • Interactive slice viewer with matplotlib
  • Flexible display modes: image-only, mask-only, or overlay
  • Multi-label mask support (up to 20+ labels with distinct colors)
  • Window/Level normalization for CT/MR viewing
  • Configurable output formats (PNG, JPEG, TIFF)
  • Flexible slice selection for batch export

Display Modes

The visualization functions support three display modes based on which inputs are provided:

  1. Image + Mask (Overlay Mode): Both image and mask are provided. The mask is overlaid on the grayscale image with the specified transparency (alpha) and colormap.

  2. Image Only: Only image is provided (mask=None). The image is displayed as grayscale, optionally with window/level normalization applied.

  3. Mask Only: Only mask is provided (image=None). The mask can be displayed either:

  4. As a colormap visualization (mask_as_colormap=True, default): Each unique label value gets a distinct color from the specified colormap.
  5. As grayscale (mask_as_colormap=False): Values are normalized to 0-255.

Window/Level Normalization

For medical imaging (CT, MR), window/level controls are essential for proper visualization. When window_center and window_width are specified:

  • window_center (Level): The center value of the display window (default: 200 HU for soft tissue)
  • window_width (Width): The range of values displayed (default: 600 HU)

Values outside [center - width/2, center + width/2] are clipped to black/white.

Common presets: - Soft tissue: Center=40, Width=400 - Bone: Center=400, Width=1800 - Lung: Center=-600, Width=1500 - Brain: Center=40, Width=80

visualize_slices(image=None, mask=None, alpha=0.25, colormap='tab20', axis=2, initial_slice=None, window_title='Slice Viewer', window_center=None, window_width=None, mask_as_colormap=True)

Display interactive slice viewer with scrolling.

This function supports three display modes:

  1. Image + Mask (Overlay Mode): Both image and mask are provided. The mask is overlaid on the grayscale image with transparency.

  2. Image Only: Only image is provided. Displays grayscale slices, optionally with window/level normalization.

  3. Mask Only: Only mask is provided. Displays mask visualization using either a colormap or grayscale display.

Parameters:

Name Type Description Default
image Optional[Image]

Optional Pictologics Image object containing the image data.

None
mask Optional[Image]

Optional Pictologics Image object containing the mask data.

None
alpha float

Transparency of mask overlay (0-1). Only used in overlay mode.

0.25
colormap str

Colormap for mask labels. Options: - "tab10": 10 distinct colors - "tab20": 20 distinct colors (default) - "Set1": 9 bold colors - "Set2": 8 pastel colors - "Paired": 12 paired colors

'tab20'
axis int

Axis along which to slice (0=sagittal, 1=coronal, 2=axial).

2
initial_slice Optional[int]

Initial slice to display (default: middle).

None
window_title str

Title for the viewer window.

'Slice Viewer'
window_center Optional[float]

Window center (level) for normalization. Default: None (min-max).

None
window_width Optional[float]

Window width for normalization. Default: None (min-max).

None
mask_as_colormap bool

If True and mask-only mode, display with colormap. If False, display as grayscale.

True

Raises:

Type Description
ValueError

If neither image nor mask is provided, or if shapes don't match when both are provided.

Example

Visualise slices interactively:

from pictologics import load_image
from pictologics.utilities import visualize_slices

# View image with mask overlay
img = load_image("scan.nii.gz")
mask = load_image("segmentation.nii.gz")
visualize_slices(image=img, mask=mask)

# View image only
visualize_slices(image=img, window_center=40, window_width=400)

# View mask only with colormap
visualize_slices(mask=mask)
Source code in pictologics/utilities/visualization.py
def visualize_slices(
    image: Optional[Image] = None,
    mask: Optional[Image] = None,
    alpha: float = 0.25,
    colormap: str = "tab20",
    axis: int = 2,
    initial_slice: Optional[int] = None,
    window_title: str = "Slice Viewer",
    window_center: Optional[float] = None,
    window_width: Optional[float] = None,
    mask_as_colormap: bool = True,
) -> None:
    """
    Display interactive slice viewer with scrolling.

    This function supports three display modes:

    1. **Image + Mask (Overlay Mode)**: Both `image` and `mask` are provided.
       The mask is overlaid on the grayscale image with transparency.

    2. **Image Only**: Only `image` is provided. Displays grayscale slices,
       optionally with window/level normalization.

    3. **Mask Only**: Only `mask` is provided. Displays mask visualization
       using either a colormap or grayscale display.

    Args:
        image: Optional Pictologics Image object containing the image data.
        mask: Optional Pictologics Image object containing the mask data.
        alpha: Transparency of mask overlay (0-1). Only used in overlay mode.
        colormap: Colormap for mask labels. Options:
            - "tab10": 10 distinct colors
            - "tab20": 20 distinct colors (default)
            - "Set1": 9 bold colors
            - "Set2": 8 pastel colors
            - "Paired": 12 paired colors
        axis: Axis along which to slice (0=sagittal, 1=coronal, 2=axial).
        initial_slice: Initial slice to display (default: middle).
        window_title: Title for the viewer window.
        window_center: Window center (level) for normalization. Default: None (min-max).
        window_width: Window width for normalization. Default: None (min-max).
        mask_as_colormap: If True and mask-only mode, display with colormap.
            If False, display as grayscale.

    Raises:
        ValueError: If neither image nor mask is provided, or if shapes don't match
            when both are provided.

    Example:
        Visualise slices interactively:

        ```python
        from pictologics import load_image
        from pictologics.utilities import visualize_slices

        # View image with mask overlay
        img = load_image("scan.nii.gz")
        mask = load_image("segmentation.nii.gz")
        visualize_slices(image=img, mask=mask)

        # View image only
        visualize_slices(image=img, window_center=40, window_width=400)

        # View mask only with colormap
        visualize_slices(mask=mask)
        ```
    """
    import matplotlib.pyplot as plt
    from matplotlib.widgets import Slider

    if image is None and mask is None:
        raise ValueError("At least one of image or mask must be provided.")

    # Validate shapes if both provided
    if image is not None and mask is not None:
        if image.array.shape != mask.array.shape:
            raise ValueError(
                f"Image shape {image.array.shape} does not match "
                f"mask shape {mask.array.shape}"
            )

    # Get reference array for shape
    ref_array = _get_reference_array(image, mask)

    # Get number of slices
    num_slices = ref_array.shape[axis]

    # Set initial slice
    if initial_slice is None:
        initial_slice = num_slices // 2

    # Create figure and axes
    fig, ax = plt.subplots(1, 1, figsize=(10, 10))
    plt.subplots_adjust(bottom=0.15)

    # Get slice data
    def get_slice(
        idx: int,
    ) -> tuple[
        Optional[npt.NDArray[np.floating[Any]]], Optional[npt.NDArray[np.floating[Any]]]
    ]:
        img_slice = None
        mask_slice = None

        if image is not None:
            if axis == 0:
                img_slice = image.array[idx, :, :]
            elif axis == 1:
                img_slice = image.array[:, idx, :]
            else:
                img_slice = image.array[:, :, idx]

        if mask is not None:
            if axis == 0:
                mask_slice = mask.array[idx, :, :]
            elif axis == 1:
                mask_slice = mask.array[:, idx, :]
            else:
                mask_slice = mask.array[:, :, idx]

        return img_slice, mask_slice

    img_slice, mask_slice = get_slice(initial_slice)
    rgba = _create_display_rgba(
        img_slice,
        mask_slice,
        alpha,
        colormap,
        window_center,
        window_width,
        mask_as_colormap,
    )

    # Display
    im = ax.imshow(rgba, aspect="equal")
    ax.set_title(f"Slice {initial_slice}/{num_slices - 1}")
    ax.axis("off")

    # Add slider
    ax_slider = plt.axes((0.15, 0.05, 0.7, 0.03))
    slider = Slider(
        ax=ax_slider,
        label="Slice",
        valmin=0,
        valmax=num_slices - 1,
        valinit=initial_slice,
        valstep=1,
    )

    def update(val: float) -> None:
        idx = int(val)
        img_slice, mask_slice = get_slice(idx)
        rgba = _create_display_rgba(
            img_slice,
            mask_slice,
            alpha,
            colormap,
            window_center,
            window_width,
            mask_as_colormap,
        )
        im.set_data(rgba)
        ax.set_title(f"Slice {idx}/{num_slices - 1}")
        fig.canvas.draw_idle()

    slider.on_changed(update)

    # Add scroll wheel support
    def on_scroll(event) -> None:  # type: ignore[no-untyped-def]
        if event.button == "up":
            new_val = min(slider.val + 1, num_slices - 1)
        else:
            new_val = max(slider.val - 1, 0)
        slider.set_val(new_val)

    fig.canvas.mpl_connect("scroll_event", on_scroll)

    fig.suptitle(window_title)
    plt.show()

save_slices(output_dir, image=None, mask=None, slice_selection='10%', format='png', dpi=300, alpha=0.25, colormap='tab20', axis=2, filename_prefix='slice', window_center=None, window_width=None, mask_as_colormap=True)

Save image slices to files.

This function supports three display modes:

  1. Image + Mask (Overlay Mode): Both image and mask are provided. The mask is overlaid on the grayscale image with transparency.

  2. Image Only: Only image is provided. Saves grayscale slices, optionally with window/level normalization.

  3. Mask Only: Only mask is provided. Saves mask visualization using either a colormap or grayscale display.

Parameters:

Name Type Description Default
output_dir str

Directory to save output images.

required
image Optional[Image]

Optional Pictologics Image object containing the image data.

None
mask Optional[Image]

Optional Pictologics Image object containing the mask data.

None
slice_selection Union[str, int, list[int]]

Slice selection specification: - "every_N" or "N": Every Nth slice - "N%": Slices at each N% interval (e.g., "10%" = ~10 images) - int: Single slice index - list[int]: Specific slice indices

'10%'
format str

Output format ("png", "jpeg", "tiff").

'png'
dpi int

Output resolution in dots per inch.

300
alpha float

Transparency of mask overlay (0-1). Only used in overlay mode.

0.25
colormap str

Colormap for mask labels. Options: - "tab10": 10 distinct colors - "tab20": 20 distinct colors (default) - "Set1": 9 bold colors - "Set2": 8 pastel colors - "Paired": 12 paired colors

'tab20'
axis int

Axis along which to slice (0=sagittal, 1=coronal, 2=axial).

2
filename_prefix str

Prefix for output filenames.

'slice'
window_center Optional[float]

Window center (level) for normalization. Default: None (min-max).

None
window_width Optional[float]

Window width for normalization. Default: None (min-max).

None
mask_as_colormap bool

If True and mask-only mode, display with colormap. If False, display as grayscale.

True

Returns:

Type Description
list[str]

List of paths to saved files.

Raises:

Type Description
ValueError

If neither image nor mask is provided, or if shapes don't match when both are provided.

Example

Save image slices with and without mask overlay:

from pictologics import load_image
from pictologics.utilities import save_slices

# Save image with mask overlay
img = load_image("scan.nii.gz")
mask = load_image("segmentation.nii.gz")
files = save_slices("output/", image=img, mask=mask, slice_selection="10%")

# Save image only (no mask)
files = save_slices("output/", image=img, slice_selection="10%")

# Save mask only with colormap
files = save_slices("output/", mask=mask, slice_selection="10%")
Source code in pictologics/utilities/visualization.py
def save_slices(
    output_dir: str,
    image: Optional[Image] = None,
    mask: Optional[Image] = None,
    slice_selection: Union[str, int, list[int]] = "10%",
    format: str = "png",
    dpi: int = 300,
    alpha: float = 0.25,
    colormap: str = "tab20",
    axis: int = 2,
    filename_prefix: str = "slice",
    window_center: Optional[float] = None,
    window_width: Optional[float] = None,
    mask_as_colormap: bool = True,
) -> list[str]:
    """
    Save image slices to files.

    This function supports three display modes:

    1. **Image + Mask (Overlay Mode)**: Both `image` and `mask` are provided.
       The mask is overlaid on the grayscale image with transparency.

    2. **Image Only**: Only `image` is provided. Saves grayscale slices,
       optionally with window/level normalization.

    3. **Mask Only**: Only `mask` is provided. Saves mask visualization
       using either a colormap or grayscale display.

    Args:
        output_dir: Directory to save output images.
        image: Optional Pictologics Image object containing the image data.
        mask: Optional Pictologics Image object containing the mask data.
        slice_selection: Slice selection specification:
            - "every_N" or "N": Every Nth slice
            - "N%": Slices at each N% interval (e.g., "10%" = ~10 images)
            - int: Single slice index
            - list[int]: Specific slice indices
        format: Output format ("png", "jpeg", "tiff").
        dpi: Output resolution in dots per inch.
        alpha: Transparency of mask overlay (0-1). Only used in overlay mode.
        colormap: Colormap for mask labels. Options:
            - "tab10": 10 distinct colors
            - "tab20": 20 distinct colors (default)
            - "Set1": 9 bold colors
            - "Set2": 8 pastel colors
            - "Paired": 12 paired colors
        axis: Axis along which to slice (0=sagittal, 1=coronal, 2=axial).
        filename_prefix: Prefix for output filenames.
        window_center: Window center (level) for normalization. Default: None (min-max).
        window_width: Window width for normalization. Default: None (min-max).
        mask_as_colormap: If True and mask-only mode, display with colormap.
            If False, display as grayscale.

    Returns:
        List of paths to saved files.

    Raises:
        ValueError: If neither image nor mask is provided, or if shapes don't match
            when both are provided.

    Example:
        Save image slices with and without mask overlay:

        ```python
        from pictologics import load_image
        from pictologics.utilities import save_slices

        # Save image with mask overlay
        img = load_image("scan.nii.gz")
        mask = load_image("segmentation.nii.gz")
        files = save_slices("output/", image=img, mask=mask, slice_selection="10%")

        # Save image only (no mask)
        files = save_slices("output/", image=img, slice_selection="10%")

        # Save mask only with colormap
        files = save_slices("output/", mask=mask, slice_selection="10%")
        ```
    """
    if image is None and mask is None:
        raise ValueError("At least one of image or mask must be provided.")

    # Validate shapes if both provided
    if image is not None and mask is not None:
        if image.array.shape != mask.array.shape:
            raise ValueError(
                f"Image shape {image.array.shape} does not match "
                f"mask shape {mask.array.shape}"
            )

    # Get reference array for shape
    ref_array = _get_reference_array(image, mask)

    # Create output directory
    out_path = Path(output_dir)
    out_path.mkdir(parents=True, exist_ok=True)

    # Get number of slices along axis
    num_slices = ref_array.shape[axis]

    # Parse slice selection
    slice_indices = _parse_slice_selection(slice_selection, num_slices)

    # Validate format
    format = format.lower()
    if format == "jpg":
        format = "jpeg"
    if format not in ("png", "jpeg", "tiff"):
        format = "png"

    # Calculate pixel size based on DPI
    scale_factor = dpi / 72.0

    saved_files = []

    for idx in slice_indices:
        # Extract slices
        img_slice = None
        mask_slice = None

        if image is not None:
            if axis == 0:
                img_slice = image.array[idx, :, :]
            elif axis == 1:
                img_slice = image.array[:, idx, :]
            else:
                img_slice = image.array[:, :, idx]

        if mask is not None:
            if axis == 0:
                mask_slice = mask.array[idx, :, :]
            elif axis == 1:
                mask_slice = mask.array[:, idx, :]
            else:
                mask_slice = mask.array[:, :, idx]

        # Create display RGBA
        rgba = _create_display_rgba(
            img_slice,
            mask_slice,
            alpha,
            colormap,
            window_center,
            window_width,
            mask_as_colormap,
        )

        # Scale if needed for DPI
        if scale_factor != 1.0:
            h, w = rgba.shape[:2]
            new_h = int(h * scale_factor)
            new_w = int(w * scale_factor)
            pil_img = PILImage.fromarray(rgba)
            pil_img = pil_img.resize((new_w, new_h), PILImage.Resampling.LANCZOS)
        else:
            pil_img = PILImage.fromarray(rgba)

        # Convert to RGB for JPEG (no alpha support)
        if format == "jpeg":
            pil_img = pil_img.convert("RGB")

        # Save
        ext = {"png": ".png", "jpeg": ".jpg", "tiff": ".tiff"}[format]
        filename = f"{filename_prefix}_{idx:04d}{ext}"
        filepath = out_path / filename
        pil_img.save(filepath, dpi=(dpi, dpi))
        saved_files.append(str(filepath))

    return saved_files