Source code for darwin.importer.importer

import concurrent.futures
import copy
import json
import uuid
from collections import defaultdict
from logging import getLogger
from multiprocessing import cpu_count
from pathlib import Path
from time import perf_counter
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Set,
    Tuple,
    Union,
)

from darwin.datatypes import (
    AnnotationFile,
    Property,
    PropertyClass,
    parse_property_classes,
)
from darwin.future.data_objects.properties import (
    FullProperty,
    PropertyGranularity,
    PropertyType,
    PropertyValue,
    SelectedProperty,
)
from darwin.item import DatasetItem
from darwin.path_utils import is_properties_enabled, parse_metadata
from darwin.utils.utils import _parse_annotators

Unknown = Any  # type: ignore
import numpy as np
from tqdm import tqdm

if TYPE_CHECKING:
    from darwin.client import Client
    from darwin.dataset.remote_dataset import RemoteDataset

from rich.console import Console
from rich.theme import Theme

import darwin.datatypes as dt
from darwin.datatypes import PathLike
from darwin.exceptions import IncompatibleOptions, RequestEntitySizeExceeded
from darwin.utils import secure_continue_request
from darwin.utils.flatten_list import flatten_list

logger = getLogger(__name__)

try:
    from mpire import WorkerPool

    MPIRE_AVAILABLE = True
except ImportError:
    MPIRE_AVAILABLE = False

# Classes missing import support on backend side
UNSUPPORTED_CLASSES = ["string", "graph"]

# Classes that are defined on team level automatically and available in all datasets
GLOBAL_CLASSES = ["__raster_layer__"]

# Add a length of 5 for URL encoding overhead and separators per filename
FILENAME_OVERHEAD = 5
MAX_URL_LENGTH = 2000
BASE_URL_LENGTH = 200

DEPRECATION_MESSAGE = """

This function is going to be turned into private. This means that breaking
changes in its interface and implementation are to be expected. We encourage using ``import_annotations``
instead of calling this low-level function directly.

"""


def _build_main_annotations_lookup_table(
    annotation_classes: List[Dict[str, Unknown]],
) -> Dict[str, Unknown]:
    MAIN_ANNOTATION_TYPES = [
        "bounding_box",
        "cuboid",
        "ellipse",
        "keypoint",
        "line",
        "link",
        "polygon",
        "skeleton",
        "tag",
        "string",
        "table",
        "simple_table",
        "graph",
        "mask",
        "raster_layer",
    ]
    lookup: Dict[str, Unknown] = {}
    for cls in annotation_classes:
        for annotation_type in cls["annotation_types"]:
            if annotation_type in MAIN_ANNOTATION_TYPES:
                if annotation_type not in lookup:
                    lookup[annotation_type] = {}
                lookup[annotation_type][cls["name"]] = cls["id"]

    return lookup


def _find_and_parse(  # noqa: C901
    importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]],
    file_paths: List[PathLike],
    console: Optional[Console] = None,
    use_multi_cpu: bool = True,
    cpu_limit: int = 1,
    legacy_remote_file_slot_affine_maps: Optional[
        Dict[str, Dict[Path, np.ndarray]]
    ] = {},
    pixdims_and_primary_planes: Optional[Dict[str, Dict[Path, np.ndarray]]] = {},
) -> Optional[Iterable[dt.AnnotationFile]]:
    is_console = console is not None

    logger = getLogger(__name__)

    def perf_time(reset: bool = False) -> Generator[float, float, None]:
        start = perf_counter()
        yield start
        while True:
            if reset:
                start = perf_counter()
            yield perf_counter() - start

    def maybe_console(*args: Union[str, int, float]) -> None:
        if console is not None:
            console.print(*[f"[{str(next(perf_time()))} seconds elapsed]", *args])
        else:
            logger.info(*[f"[{str(next(perf_time()))}]", *args])

    maybe_console("Parsing files... ")

    files: List[Path] = _get_files_for_parsing(file_paths)

    maybe_console(f"Found {len(files)} files")

    if use_multi_cpu and MPIRE_AVAILABLE and cpu_limit > 1:
        maybe_console(f"Using multiprocessing with {cpu_limit} workers")
        try:
            with WorkerPool(cpu_limit) as pool:
                if importer.__module__ == "darwin.importer.formats.nifti":
                    parsed_files = pool.map(
                        lambda file: importer(
                            file,
                            legacy_remote_file_slot_affine_maps=legacy_remote_file_slot_affine_maps,  # type: ignore
                            pixdims_and_primary_planes=pixdims_and_primary_planes,  # type: ignore
                        ),
                        tqdm(files),
                    )
                else:
                    parsed_files = pool.map(
                        importer, tqdm(files) if is_console else files
                    )
        except KeyboardInterrupt:
            maybe_console("Keyboard interrupt. Stopping.")
            return None
        except Exception as e:
            maybe_console(f"Error: {e}")
            return None

    else:
        maybe_console("Using single CPU")
        if importer.__module__ == "darwin.importer.formats.nifti":
            parsed_files = [
                importer(
                    file,
                    legacy_remote_file_slot_affine_maps=legacy_remote_file_slot_affine_maps,  # type: ignore
                    pixdims_and_primary_planes=pixdims_and_primary_planes,  # type: ignore
                )
                for file in tqdm(files)
            ]
        else:
            parsed_files = [
                importer(file) for file in (tqdm(files) if is_console else files)
            ]
    parsed_files = [f for f in parsed_files if f is not None]

    maybe_console("Finished.")
    # Sometimes we have a list of lists of AnnotationFile, sometimes we have a list of AnnotationFile
    # We flatten the list of lists
    if not parsed_files:
        return None
    if isinstance(parsed_files, list):
        if isinstance(parsed_files[0], list):
            parsed_files = [item for sublist in parsed_files for item in sublist]
    else:
        parsed_files = [parsed_files]

    parsed_files = [f for f in parsed_files if f is not None]
    return parsed_files


def _get_files_for_parsing(file_paths: List[PathLike]) -> List[Path]:
    packed_files = [
        filepath.glob("**/*") if filepath.is_dir() else [filepath]
        for filepath in map(Path, file_paths)
    ]
    return [file for files in packed_files for file in files]


def _build_attribute_lookup(dataset: "RemoteDataset") -> Dict[str, Unknown]:
    attributes: List[Dict[str, Unknown]] = dataset.fetch_remote_attributes()
    lookup: Dict[str, Unknown] = {}
    for attribute in attributes:
        class_id = attribute["class_id"]
        if class_id not in lookup:
            lookup[class_id] = {}
        lookup[class_id][attribute["name"]] = attribute["id"]
    return lookup


def _get_remote_files_ready_for_import(
    dataset: "RemoteDataset",
    filenames: List[str],
    chunk_size: int = 100,
) -> Dict[str, Dict[str, Any]]:
    """
    Fetches remote files that are ready for import from the datasets in chunks; by
    default 100 filenames at a time.

    The output is a dictionary for each remote file with the following keys:
    - "item_id": Item ID
    - "slot_names": A list of each slot name for the item
    - "layout": The layout of the item

    Fetching slot names & layout is necessary here to avoid double-trip to API downstream for remote files.

    Raises a ValueError if any of the remote files are not in the `new`, `annotate`,
    `review`, `complete`, or `archived` statuses.

    Parameters
    ----------
    dataset : RemoteDataset
        The remote dataset to fetch the files from.
    filenames : List[str]
        A list of filenames to fetch.
    chunk_size : int
        The number of filenames to fetch at a time.
    """
    remote_files = {}
    remote_files_not_ready_for_import = {}
    for i in range(0, len(filenames), chunk_size):
        chunk = filenames[i : i + chunk_size]
        for remote_file in dataset.fetch_remote_files(
            {"types": "image,playback_video,video_frame", "item_names": chunk}
        ):
            if remote_file.status not in [
                "new",
                "annotate",
                "review",
                "complete",
                "archived",
            ]:
                remote_files_not_ready_for_import[remote_file.full_path] = (
                    remote_file.status
                )
            else:
                slot_names = _get_slot_names(remote_file)
                remote_files[remote_file.full_path] = {
                    "item_id": remote_file.id,
                    "slot_names": slot_names,
                    "layout": remote_file.layout,
                }
    if remote_files_not_ready_for_import:
        console = Console(theme=_console_theme())
        console.print(
            "The following files are either still processing, or failed to process, so annotations cannot be imported:",
            style="warning",
        )
        for file, status in remote_files_not_ready_for_import.items():
            console.print(f"  - {file}, status: {status}")
        raise ValueError(
            "Some files targeted for annotation import are either still processing, or failed to process, so annotations cannot be imported."
        )
    return remote_files


def _get_slot_names(remote_file: DatasetItem) -> List[str]:
    """
    Returns a list of slot names for a dataset item:
    - If the item's layout is V1 or V2, it is multi-slotted.
      In this case we return the slot names in the order they appear in `slots`.
      This ensures that the default slot is the first item in the list
    - If the item's layout is V3, it is multi-channeled.
      In this case we return the slot names in the order they appear in `slots_grid`.
      This ensures that the base slot is the first item in the list

    Parameters
    ----------
    remote_file : DatasetItem
        A DatasetItem object representing a single remote dataset item

    Returns
    -------
    List[str]
        A list of slot names associated with the item
    """
    layout_version = remote_file.layout["version"]
    if layout_version == 1 or layout_version == 2:
        return [slot["slot_name"] for slot in remote_file.slots]
    elif layout_version == 3:
        return list(remote_file.layout["slots_grid"][0][0])


def _resolve_annotation_classes(
    local_annotation_classes: List[dt.AnnotationClass],
    classes_in_dataset: Dict[str, Unknown],
    classes_in_team: Dict[str, Unknown],
) -> Tuple[Set[dt.AnnotationClass], Set[dt.AnnotationClass]]:
    local_classes_not_in_dataset: Set[dt.AnnotationClass] = set()
    local_classes_not_in_team: Set[dt.AnnotationClass] = set()

    for local_cls in local_annotation_classes:
        local_annotation_type = (
            local_cls.annotation_internal_type or local_cls.annotation_type
        )
        # Only add the new class if it doesn't exist remotely already
        if (
            local_annotation_type in classes_in_dataset
            and local_cls.name in classes_in_dataset[local_annotation_type]
        ):
            continue

        # Only add the new class if it's not included in the list of the missing classes already
        if local_cls.name in [
            missing_class.name for missing_class in local_classes_not_in_dataset
        ]:
            continue
        if local_cls.name in [
            missing_class.name for missing_class in local_classes_not_in_team
        ]:
            continue

        if (
            local_annotation_type in classes_in_team
            and local_cls.name in classes_in_team[local_annotation_type]
        ):
            local_classes_not_in_dataset.add(local_cls)
        else:
            local_classes_not_in_team.add(local_cls)

    return local_classes_not_in_dataset, local_classes_not_in_team


def _get_team_properties_annotation_lookup(
    client: "Client", team_slug: str
) -> Tuple[Dict[Tuple[str, Optional[int]], FullProperty], Dict[str, FullProperty]]:
    """
    Returns two lookup dictionaries for team properties:
     - team_properties_annotation_lookup: (property-name, annotation_class_id): FullProperty object
     - team_item_properties_lookup: property-name: FullProperty object

    Args:
        client (Client): Darwin Client object
        team_slug (str): Team slug

    Returns:
        Tuple[Dict[Tuple[str, Optional[int]], FullProperty], Dict[str, FullProperty]: Tuple of two dictionaries
    """
    # get team properties -> List[FullProperty]
    team_properties = client.get_team_properties(team_slug)

    # (property-name, annotation_class_id): FullProperty object
    team_properties_annotation_lookup: Dict[Tuple[str, Optional[int]], FullProperty] = (
        {}
    )

    # property-name: FullProperty object
    team_item_properties_lookup: Dict[str, FullProperty] = {}
    for prop in team_properties:
        if (
            prop.granularity.value == "section"
            or prop.granularity.value == "annotation"
        ):
            team_properties_annotation_lookup[(prop.name, prop.annotation_class_id)] = (
                prop
            )
        elif prop.granularity.value == "item":
            team_item_properties_lookup[prop.name] = prop

    return team_properties_annotation_lookup, team_item_properties_lookup


def _update_payload_with_properties(
    annotations: List[Dict[str, Unknown]],
    annotation_id_property_map: Dict[str, Dict[str, Dict[str, Set[str]]]],
) -> None:
    """
    Updates the annotations with the properties that were created/updated during the import.

    Args:
        annotations (List[dt.Annotation]): List of annotations
        annotation_id_property_map: Dict[str, Dict[str, Dict[str, Set[str]]]]: Dict of annotation.id to frame_index -> property id -> property val ids
    """
    if not annotation_id_property_map:
        return

    for annotation in annotations:
        annotation_id = annotation["id"]

        if annotation_id_property_map.get(annotation_id):
            _map = {}
            for _frame_index, _property_map in annotation_id_property_map[
                annotation_id
            ].items():
                _map[_frame_index] = {}
                for prop_id, prop_val_set in dict(_property_map).items():
                    prop_val_list = list(prop_val_set)
                    _map[_frame_index][prop_id] = prop_val_list

            annotation["annotation_properties"] = dict(_map)


def _serialize_item_level_properties(
    item_property_values: List[Dict[str, str]],
    client: "Client",
    dataset: "RemoteDataset",
    import_annotators: bool,
    import_reviewers: bool,
) -> List[Dict[str, Any]]:
    """
    Returns serialized item-level properties to be added to the annotation import payload.

    Args:
        item_property_values (List[Dict[str, str]]): A list of dictionaries containing item property values.
        client (Client): The client instance used to interact with the API.
        dataset (RemoteDataset): The remote dataset instance.
        import_annotators (bool): Flag indicating whether to import annotators.
        import_reviewers (bool): Flag indicating whether to import reviewers.

    Returns:
        List[Dict[str, Any]]: A list of serialized item-level properties for the annotation import payload.
    """
    if not item_property_values:
        return []

    serialized_item_level_properties: List[Dict[str, Any]] = []
    actors: List[dt.DictFreeForm] = []
    # Get team properties
    _, team_item_properties_lookup = _get_team_properties_annotation_lookup(
        client, dataset.team
    )
    # We will skip text item properties that have value null
    for item_property_value in item_property_values:
        item_property = team_item_properties_lookup[item_property_value["name"]]
        item_property_id = item_property.id
        value = None
        if (
            item_property.type == "single_select"
            or item_property.type == "multi_select"
        ):
            item_property_value_id = next(
                (
                    pv.id
                    for pv in item_property.property_values or []
                    if pv.value == item_property_value["value"]
                ),
                None,
            )
            if item_property_value_id is not None:
                value = {"id": item_property_value_id}
        elif item_property.type == "text" and item_property_value["value"] is not None:
            value = {"text": item_property_value["value"]}
        if value is not None:
            actors: List[dt.DictFreeForm] = []
            actors.extend(
                _handle_annotators(
                    import_annotators, item_property_value=item_property_value
                )
            )

            actors.extend(
                _handle_reviewers(
                    import_reviewers, item_property_value=item_property_value
                )
            )
            serialized_item_level_properties.append(
                {
                    "actors": actors,
                    "property_id": item_property_id,
                    "value": value,
                }
            )

    return serialized_item_level_properties


def _parse_metadata_file(
    metadata_path: Union[Path, bool],
) -> Tuple[List[PropertyClass], List[Dict[str, str]]]:
    if isinstance(metadata_path, Path):
        metadata = parse_metadata(metadata_path)
        metadata_property_classes = parse_property_classes(metadata)
        metadata_item_props = metadata.get("properties", [])
        return metadata_property_classes, metadata_item_props
    return [], []


def _build_metadata_lookups(
    metadata_property_classes: List[PropertyClass],
    metadata_item_props: List[Dict[str, str]],
) -> Tuple[
    Set[Tuple[str, str]],
    Dict[Tuple[str, str], Property],
    Dict[Tuple[int, str], Property],
    Dict[str, Property],
]:
    metadata_classes_lookup = set()
    metadata_cls_prop_lookup = {}
    metadata_cls_id_prop_lookup = {}
    metadata_item_prop_lookup = {}

    for _cls in metadata_property_classes:
        metadata_classes_lookup.add((_cls.name, _cls.type))
        for _prop in _cls.properties or []:
            metadata_cls_prop_lookup[(_cls.name, _prop.name)] = _prop
    for _item_prop in metadata_item_props:
        metadata_item_prop_lookup[_item_prop["name"]] = _item_prop

    return (
        metadata_classes_lookup,
        metadata_cls_prop_lookup,
        metadata_cls_id_prop_lookup,
        metadata_item_prop_lookup,
    )


def _import_properties(
    metadata_path: Union[Path, bool],
    item_properties: List[Dict[str, str]],
    client: "Client",
    annotations: List[dt.Annotation],
    annotation_class_ids_map: Dict[Tuple[str, str], str],
    dataset: "RemoteDataset",
) -> Dict[str, Dict[str, Dict[str, Set[str]]]]:
    """
    Creates/Updates missing/mismatched properties from annotation & metadata.json file to team-properties.
    As the properties are created/updated, the annotation_id_property_map is updated with the new/old property ids.
    ^ This is used in the import-annotations payload later on.

    Args:
        metadata_path (Union[Path, bool]): Path object to .v7/metadata.json file
        client (Client): Darwin Client object
        item_properties (List[Dict[str, str]]): List of item-level properties present in the annotation file
        annotations (List[dt.Annotation]): List of annotations
        annotation_class_ids_map (Dict[Tuple[str, str], str]): Dict of annotation class names/types to annotation class ids
        dataset (RemoteDataset): RemoteDataset object

    Raises:
        ValueError: raise error if annotation class not present in metadata and in team-properties
        ValueError: raise error if annotation-property not present in metadata and in team-properties
        ValueError: raise error if property value is missing for a property that requires a value
        ValueError: raise error if property value/type is different in m_prop (.v7/metadata.json) options

    Returns:
        Dict[str, Dict[str, Dict[str, Set[str]]]]: Dict of annotation.id to frame_index -> property id -> property val ids
    """
    annotation_property_map: Dict[str, Dict[str, Dict[str, Set[str]]]] = {}

    # Parse metadata
    metadata_property_classes, metadata_item_props = _parse_metadata_file(metadata_path)

    # Get team properties
    (
        team_properties_annotation_lookup,
        team_item_properties_lookup,
    ) = _get_team_properties_annotation_lookup(client, dataset.team)

    # Build metadata lookups
    (
        metadata_classes_lookup,
        metadata_cls_prop_lookup,
        metadata_cls_id_prop_lookup,
        metadata_item_prop_lookup,
    ) = _build_metadata_lookups(metadata_property_classes, metadata_item_props)

    # (annotation-id): dt.Annotation object
    annotation_id_map: Dict[str, dt.Annotation] = {}

    annotation_and_section_level_properties_to_create: List[FullProperty] = []
    annotation_and_section_level_properties_to_update: List[FullProperty] = []
    for annotation in annotations:
        annotation_name = annotation.annotation_class.name
        annotation_type = annotation_type = (
            annotation.annotation_class.annotation_internal_type
            or annotation.annotation_class.annotation_type
        )
        annotation_name_type = (annotation_name, annotation_type)
        if annotation_name_type not in annotation_class_ids_map:
            continue
        annotation_class_id = int(annotation_class_ids_map[annotation_name_type])
        if not annotation.id:
            continue
        annotation_id = annotation.id
        if annotation_id not in annotation_property_map:
            annotation_property_map[annotation_id] = defaultdict(
                lambda: defaultdict(set)
            )
        annotation_id_map[annotation_id] = annotation

        # loop on annotation properties and check if they exist in metadata & team
        for a_prop in annotation.properties or []:
            a_prop: SelectedProperty

            # raise error if annotation-property not present in metadata
            if (annotation_name, a_prop.name) not in metadata_cls_prop_lookup:
                # check if they are present in team properties
                if (
                    a_prop.name,
                    annotation_class_id,
                ) in team_properties_annotation_lookup:
                    # get team property
                    t_prop: FullProperty = team_properties_annotation_lookup[
                        (a_prop.name, annotation_class_id)
                    ]
                    if t_prop.type == "text":
                        set_text_property_value(
                            annotation_property_map,
                            annotation_id,
                            a_prop,
                            t_prop,
                        )
                        continue

                    # if property value is None, update annotation_property_map with empty set
                    if a_prop.value is None:
                        assert t_prop.id is not None

                        annotation_property_map[annotation_id][str(a_prop.frame_index)][
                            t_prop.id
                        ] = set()
                        continue

                    # get team property value
                    t_prop_val = None
                    for prop_val in t_prop.property_values or []:
                        if prop_val.value == a_prop.value:
                            t_prop_val = prop_val
                            break

                    # if property value exists in team properties, update annotation_property_map
                    if t_prop_val:
                        assert t_prop.id is not None
                        assert t_prop_val.id is not None
                        annotation_property_map[annotation_id][str(a_prop.frame_index)][
                            t_prop.id
                        ].add(t_prop_val.id)
                        continue

                # TODO: Change this so that if a property isn't found in the metadata, we can create it assuming it's an option, multi-select with no description (DAR-2920)
                raise ValueError(
                    f"Annotation: '{annotation_name}' -> Property '{a_prop.name}' not found in {metadata_path}"
                )

            # get metadata property
            m_prop: Property = metadata_cls_prop_lookup[(annotation_name, a_prop.name)]

            # update metadata-property lookup
            metadata_cls_id_prop_lookup[(annotation_class_id, a_prop.name)] = m_prop

            # get metadata property type
            m_prop_type: PropertyType = m_prop.type

            # get metadata property options
            m_prop_options: List[Dict[str, str]] = m_prop.property_values or []

            # check if property value is missing for a property that requires a value
            if m_prop.required and not a_prop.value:
                raise ValueError(
                    f"Annotation: '{annotation_name}' -> Property '{a_prop.name}' requires a value!"
                )

            # check if property and annotation class exists in team
            if (
                a_prop.name,
                annotation_class_id,
            ) not in team_properties_annotation_lookup:
                # check if fullproperty exists in annotation_and_section_level_properties_to_create
                for full_property in annotation_and_section_level_properties_to_create:
                    if (
                        full_property.name == a_prop.name
                        and full_property.annotation_class_id == annotation_class_id
                    ):
                        # make sure property_values is not None
                        if full_property.property_values is None:
                            full_property.property_values = []

                        property_values = full_property.property_values
                        if a_prop.value is None:
                            # skip creating property if property value is None
                            continue
                        # find property value in m_prop (.v7/metadata.json) options
                        for m_prop_option in m_prop_options:
                            if m_prop_option.get("value") == a_prop.value:
                                # check if property value exists in property_values
                                for prop_val in property_values:
                                    if prop_val.value == a_prop.value:
                                        break
                                else:
                                    # update property_values with new value
                                    full_property.property_values.append(
                                        PropertyValue(
                                            value=m_prop_option.get("value"),  # type: ignore
                                            color=m_prop_option.get("color"),  # type: ignore
                                        )
                                    )
                                break
                        break
                else:
                    property_values = []
                    if a_prop.value is None:
                        # skip creating property if property value is None
                        continue
                    # find property value in m_prop (.v7/metadata.json) options
                    for m_prop_option in m_prop_options:
                        if m_prop_option.get("value") == a_prop.value:
                            property_values.append(
                                PropertyValue(
                                    value=m_prop_option.get("value"),  # type: ignore
                                    color=m_prop_option.get("color"),  # type: ignore
                                )
                            )
                            break
                    # if it doesn't exist, create it
                    for prop in annotation_and_section_level_properties_to_create:
                        if (
                            prop.name == a_prop.name
                            and prop.annotation_class_id == annotation_class_id
                        ):
                            current_prop_values = [
                                value.value for value in prop.property_values
                            ]
                            for value in property_values:
                                if value.value not in current_prop_values:
                                    prop.property_values.append(value)
                            break
                    else:
                        full_property = FullProperty(
                            name=a_prop.name,
                            type=m_prop_type,  # type from .v7/metadata.json
                            required=m_prop.required,  # required from .v7/metadata.json
                            description=m_prop.description
                            or "property-created-during-annotation-import",
                            slug=client.default_team,
                            annotation_class_id=int(annotation_class_id),
                            property_values=property_values,
                            granularity=PropertyGranularity(m_prop.granularity),
                        )
                    # Don't attempt the same propery creation multiple times
                    if (
                        full_property
                        not in annotation_and_section_level_properties_to_create
                    ):
                        annotation_and_section_level_properties_to_create.append(
                            full_property
                        )
                continue

            # check if property value is different in m_prop (.v7/metadata.json) options
            if m_prop.type != "text":
                for m_prop_option in m_prop_options:
                    if m_prop_option.get("value") == a_prop.value:
                        break
                else:
                    if a_prop.value:
                        raise ValueError(
                            f"Annotation: '{annotation_name}' -> Property '{a_prop.value}' not found in .v7/metadata.json, found: {m_prop.property_values}"
                        )

            # get team property
            t_prop: FullProperty = team_properties_annotation_lookup[
                (a_prop.name, annotation_class_id)
            ]

            if a_prop.value is None:
                # if property value is None, update annotation_property_map with empty set
                assert t_prop.id is not None
                annotation_property_map[annotation_id][str(a_prop.frame_index)][
                    t_prop.id
                ] = set()
                continue

            # check if property value is different in t_prop (team) options
            if t_prop.type != "text":
                for t_prop_val in t_prop.property_values or []:
                    if t_prop_val.value == a_prop.value:
                        break
                else:
                    # if it is, update it
                    full_property = FullProperty(
                        id=t_prop.id,
                        name=a_prop.name,
                        type=m_prop_type,
                        required=m_prop.required,
                        description=m_prop.description
                        or "property-updated-during-annotation-import",
                        slug=client.default_team,
                        annotation_class_id=int(annotation_class_id),
                        property_values=[
                            PropertyValue(
                                value=a_prop.value,
                                color=m_prop_option.get("color"),  # type: ignore
                            )
                        ],
                        granularity=t_prop.granularity,
                    )
                    # Don't attempt the same propery update multiple times
                    if (
                        full_property
                        not in annotation_and_section_level_properties_to_update
                    ):
                        annotation_and_section_level_properties_to_update.append(
                            full_property
                        )
                    continue

            assert t_prop.id is not None

            if t_prop.type == "text":
                set_text_property_value(
                    annotation_property_map, annotation_id, a_prop, t_prop
                )
            else:
                assert t_prop_val.id is not None
                annotation_property_map[annotation_id][str(a_prop.frame_index)][
                    t_prop.id
                ].add(t_prop_val.id)

    # Create/Update team item properties based on metadata
    (
        item_properties_to_create_from_metadata,
        item_properties_to_update_from_metadata,
    ) = _create_update_item_properties(
        _normalize_item_properties(metadata_item_prop_lookup),
        team_item_properties_lookup,
        client,
    )

    console = Console(theme=_console_theme())

    properties_to_create = (
        annotation_and_section_level_properties_to_create
        + item_properties_to_create_from_metadata
    )
    properties_to_update = (
        annotation_and_section_level_properties_to_update
        + item_properties_to_update_from_metadata
    )

    created_properties = []
    if properties_to_create:
        console.print(f"Creating {len(properties_to_create)} properties:", style="info")
        for full_property in properties_to_create:
            if full_property.granularity.value == "item":
                console.print(
                    f"- Creating item-level property '{full_property.name}' of type: {full_property.type}"
                )
            else:
                console.print(
                    f"- Creating property '{full_property.name}' of type {full_property.type}",
                )
            prop = client.create_property(
                team_slug=full_property.slug, params=full_property
            )
            created_properties.append(prop)

    updated_properties = []
    if properties_to_update:
        console.print(
            f"Performing {len(properties_to_update)} property update(s):", style="info"
        )
        for full_property in properties_to_update:
            if full_property.granularity.value == "item":
                console.print(
                    f"- Updating item-level property '{full_property.name}' with new value: {full_property.property_values[0].value}",
                )
            else:
                console.print(
                    f"- Updating property '{full_property.name}' of type {full_property.type}",
                )
            prop = client.update_property(
                team_slug=full_property.slug, params=full_property
            )
            updated_properties.append(prop)

    # get latest team properties
    (
        team_properties_annotation_lookup,
        team_item_properties_lookup,
    ) = _get_team_properties_annotation_lookup(client, dataset.team)

    # Update item-level properties from annotations
    _, item_properties_to_update_from_annotations = _create_update_item_properties(
        _normalize_item_properties(item_properties),
        team_item_properties_lookup,
        client,
    )

    if item_properties_to_update_from_annotations:
        console.print(
            f"Performing {len(item_properties_to_update_from_annotations)} property update(s):",
            style="info",
        )
        for full_property in item_properties_to_update_from_annotations:
            if full_property.granularity.value == "item":
                console.print(
                    f"- Updating item-level property '{full_property.name}' with new value: {full_property.property_values[0].value}"
                )
            else:
                console.print(
                    f"- Updating property {full_property.name} ({full_property.type})",
                )
            prop = client.update_property(
                team_slug=full_property.slug, params=full_property
            )
            updated_properties.append(prop)

    # get latest team properties
    (
        team_properties_annotation_lookup,
        team_item_properties_lookup,
    ) = _get_team_properties_annotation_lookup(client, dataset.team)

    # loop over metadata_cls_id_prop_lookup, and update additional metadata property values
    for (annotation_class_id, prop_name), m_prop in metadata_cls_id_prop_lookup.items():
        # does the annotation-property exist in the team? if not, skip
        if (prop_name, annotation_class_id) not in team_properties_annotation_lookup:
            continue

        # get metadata property values
        m_prop_values = {
            m_prop_val["value"]: m_prop_val
            for m_prop_val in m_prop.property_values or []
            if m_prop_val["value"]
        }

        # get team property
        t_prop: FullProperty = team_properties_annotation_lookup[
            (prop_name, annotation_class_id)
        ]

        # get team property values
        t_prop_values = [prop_val.value for prop_val in t_prop.property_values or []]

        # get diff of metadata property values and team property values
        extra_values = set(m_prop_values.keys()) - set(t_prop_values)

        # if there are extra values in metadata, create a new FullProperty with the extra values
        if extra_values:
            extra_property_values = [
                PropertyValue(
                    value=m_prop_values[extra_value].get("value"),  # type: ignore
                    color=m_prop_values[extra_value].get("color"),  # type: ignore
                )
                for extra_value in extra_values
            ]
            full_property = FullProperty(
                id=t_prop.id,
                name=t_prop.name,
                type=t_prop.type,
                required=t_prop.required,
                description=t_prop.description,
                slug=client.default_team,
                annotation_class_id=t_prop.annotation_class_id,
                property_values=extra_property_values,
                granularity=PropertyGranularity(t_prop.granularity.value),
            )
            console.print(
                f"Updating property {full_property.name} ({full_property.type}) with extra metadata values {extra_values}",
                style="info",
            )
            prop = client.update_property(
                team_slug=full_property.slug, params=full_property
            )

    # update annotation_property_map with property ids from created_properties & updated_properties
    for annotation_id, _ in annotation_property_map.items():
        if not annotation_id_map.get(annotation_id):
            continue
        annotation = annotation_id_map[annotation_id]
        annotation_class = annotation.annotation_class
        annotation_class_name = annotation_class.name
        annotation_type = (
            annotation_class.annotation_internal_type
            or annotation_class.annotation_type
        )
        annotation_class_id = annotation_class_ids_map[
            (annotation_class_name, annotation_type)
        ]
        for a_prop in annotation.properties or []:
            frame_index = str(a_prop.frame_index)

            for prop in created_properties + updated_properties:
                if (
                    prop.name == a_prop.name
                    and annotation_class_id == prop.annotation_class_id
                ):
                    if a_prop.value is None:
                        if not annotation_property_map[annotation_id][frame_index][
                            prop.id
                        ]:
                            annotation_property_map[annotation_id][frame_index][
                                prop.id
                            ] = set()
                            break

                    if prop.type == "text":
                        set_text_property_value(
                            annotation_property_map, annotation_id, a_prop, prop
                        )
                    else:
                        for prop_val in prop.property_values or []:
                            if prop_val.value == a_prop.value:
                                annotation_property_map[annotation_id][frame_index][
                                    prop.id
                                ].add(prop_val.id)
                                break
                    break
    _assign_item_properties_to_dataset(
        item_properties, team_item_properties_lookup, client, dataset, console
    )

    return annotation_property_map


def _normalize_item_properties(
    item_properties: Union[Dict[str, Dict[str, Any]], List[Dict[str, str]]],
) -> Dict[str, Dict[str, Any]]:
    """
    Normalizes item properties to a common dictionary format.

    Args:
        item_properties (Union[Dict[str, Dict[str, Any]], List[Dict[str, str]]]): Item properties in different formats.

    Returns:
        Dict[str, Dict[str, Any]]: Normalized item properties.
    """
    if isinstance(item_properties, dict):
        return item_properties

    normalized_properties = defaultdict(lambda: {"property_values": []})
    if item_properties:
        for item_prop in item_properties:
            name = item_prop["name"]
            value = item_prop["value"]
            if value:
                normalized_properties[name]["property_values"].append({"value": value})

    return normalized_properties


def _create_update_item_properties(
    item_properties: Dict[str, Dict[str, Any]],
    team_item_properties_lookup: Dict[str, FullProperty],
    client: "Client",
) -> Tuple[List[FullProperty], List[FullProperty]]:
    """
    Compares item-level properties present in `item_properties` with the team item properties and plans to create or update them.

    Args:
        item_properties (Dict[str, Dict[str, Any]]): Dictionary of item-level properties present in the annotation file
        team_item_properties_lookup (Dict[str, FullProperty]): Lookup of team item properties
        client (Client): Darwin Client object

    Returns:
        Tuple[List[FullProperty], List[FullProperty]]: Tuple of lists of properties to be created and updated
    """
    create_properties = []
    update_properties = []
    for item_prop_name, m_prop in item_properties.items():
        m_prop_values = [
            prop_val["value"] for prop_val in m_prop.get("property_values", [])
        ]

        # If the property exists in the team, check that all values are present
        if item_prop_name in team_item_properties_lookup:
            t_prop = team_item_properties_lookup[item_prop_name]
            # If the property is a text property it won't have predefined values, so continue
            if t_prop.type == "text":
                continue
            t_prop_values = [
                prop_val.value for prop_val in t_prop.property_values or []
            ]

            # Add one update per missing property value
            for m_prop_value in m_prop_values:
                if m_prop_value not in t_prop_values:
                    update_property = FullProperty(
                        id=t_prop.id,
                        name=t_prop.name,
                        type=t_prop.type,
                        required=t_prop.required,
                        description=t_prop.description,
                        slug=client.default_team,
                        annotation_class_id=t_prop.annotation_class_id,
                        property_values=[PropertyValue(value=m_prop_value)],
                        granularity=PropertyGranularity.item,
                    )
                    update_properties.append(update_property)

        # If the property does not exist in the team, create it
        else:
            # If we've already planned to create this property, simply extend the property values
            for prop in create_properties:
                if prop.name == item_prop_name:
                    current_prop_values = [
                        value.value for value in prop.property_values
                    ]
                    if prop.property_values is None:
                        prop.property_values = []
                    for val in m_prop_values:
                        if val.value not in current_prop_values:
                            prop.property_values.append(PropertyValue(value=val))
                    break
            else:
                full_property = FullProperty(
                    name=item_prop_name,
                    type=m_prop.get("type", "multi_select"),
                    required=bool(m_prop.get("required", False)),
                    description=m_prop.get(
                        "description", "property-created-during-annotation-import"
                    ),
                    slug=client.default_team,
                    annotation_class_id=None,
                    property_values=[PropertyValue(value=val) for val in m_prop_values],
                    granularity=PropertyGranularity.item,
                )
                create_properties.append(full_property)

    return create_properties, update_properties


def _assign_item_properties_to_dataset(
    item_properties: List[Dict[str, str]],
    team_item_properties_lookup: Dict[str, FullProperty],
    client: "Client",
    dataset: "RemoteDataset",
    console: Console,
) -> None:
    """
    Ensures that all item-level properties to be imported are assigned to the target dataset

    Args:
        item_properties (List[Dict[str, str]]): List of item-level properties present in the annotation file
        team_item_properties_lookup (Dict[str, FullProperty]): Server- side state of item-level properties
        client (Client): Darwin Client object
        dataset (RemoteDataset): RemoteDataset object
        console (Console): Rich Console
    """
    if item_properties:
        item_properties_set = {prop["name"] for prop in item_properties}
        for item_property in item_properties_set:
            for team_prop in team_item_properties_lookup:
                if team_prop == item_property:
                    prop_datasets = (
                        team_item_properties_lookup[team_prop].dataset_ids or []
                    )
                    if dataset.dataset_id not in prop_datasets:
                        updated_property = team_item_properties_lookup[team_prop]
                        updated_property.dataset_ids.append(dataset.dataset_id)
                        updated_property.property_values = (
                            []
                        )  # Necessary to clear, otherwise we're trying to add the exsting values to themselves
                        console.print(
                            f"Adding item-level property '{updated_property.name}' to the dataset '{dataset.name}' ",
                            style="info",
                        )
                        client.update_property(dataset.team, updated_property)


[docs] def import_annotations( # noqa: C901 dataset: "RemoteDataset", importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]], file_paths: List[PathLike], append: bool, class_prompt: bool = True, delete_for_empty: bool = False, import_annotators: bool = False, import_reviewers: bool = False, overwrite: bool = False, use_multi_cpu: bool = False, cpu_limit: Optional[int] = None, ) -> None: """ Imports the given given Annotations into the given Dataset. Parameters ---------- dataset : RemoteDataset Dataset where the Annotations will be imported to. importer : Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]] Parsing module containing the logic to parse the given Annotation files given in ``files_path``. See ``importer/format`` for a list of out of supported parsers. file_paths : List[PathLike] A list of ``Path``'s or strings containing the Annotations we wish to import. append : bool If ``True`` appends the given annotations to the datasets. If ``False`` will override them. Incompatible with ``delete-for-empty``. class_prompt : bool If ``False`` classes will be created and added to the datasets without requiring a user's prompt. delete_for_empty : bool, default: False If ``True`` will use empty annotation files to delete all annotations from the remote file. If ``False``, empty annotation files will simply be skipped. Only works for V2 datasets. Incompatible with ``append``. import_annotators : bool, default: False If ``True`` it will import the annotators from the files to the dataset, if available. If ``False`` it will not import the annotators. import_reviewers : bool, default: False If ``True`` it will import the reviewers from the files to the dataset, if . If ``False`` it will not import the reviewers. overwrite : bool, default: False If ``True`` it will bypass a warning that the import will overwrite the current annotations if any are present. If ``False`` this warning will be skipped and the import will overwrite the current annotations without warning. use_multi_cpu : bool, default: True If ``True`` will use multiple available CPU cores to parse the annotation files. If ``False`` will use only the current Python process, which runs in one core. Processing using multiple cores is faster, but may slow down a machine also running other processes. Processing with one core is slower, but will run well alongside other processes. cpu_limit : int, default: 2 less than total cpu count The maximum number of CPU cores to use when ``use_multi_cpu`` is ``True``. If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores. If ``cpu_limit`` is less than 1, it will be set to CPU count - 2. If ``cpu_limit`` is omitted, it will be set to CPU count - 2. Raises ------- ValueError - If ``file_paths`` is not a list. - If the application is unable to fetch any remote classes. - If the application was unable to find/parse any annotation files. - If the application was unable to fetch remote file list. IncompatibleOptions - If both ``append`` and ``delete_for_empty`` are specified as ``True``. """ console = Console(theme=_console_theme()) if append and delete_for_empty: raise IncompatibleOptions( "The options 'append' and 'delete_for_empty' cannot be used together. Use only one of them." ) cpu_limit, use_multi_cpu = _get_multi_cpu_settings( cpu_limit, cpu_count(), use_multi_cpu ) if use_multi_cpu: console.print(f"Using {cpu_limit} CPUs for parsing...", style="info") else: console.print("Using 1 CPU for parsing...", style="info") if not isinstance(file_paths, list): raise ValueError( f"file_paths must be a list of 'Path' or 'str'. Current value: {file_paths}" ) console.print("Fetching remote class list...", style="info") team_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes(True) if not team_classes: raise ValueError("Unable to fetch remote class list.") classes_in_dataset: dt.DictFreeForm = _build_main_annotations_lookup_table( [ cls for cls in team_classes if cls["available"] or cls["name"] in GLOBAL_CLASSES ] ) classes_in_team: dt.DictFreeForm = _build_main_annotations_lookup_table( [ cls for cls in team_classes if not cls["available"] and cls["name"] not in GLOBAL_CLASSES ] ) attributes = _build_attribute_lookup(dataset) console.print("Retrieving local annotations ...", style="info") local_files = [] local_files_missing_remotely = [] remote_files_targeted_by_import = _get_remote_files_targeted_by_import( importer, file_paths, dataset, console, use_multi_cpu, cpu_limit ) ( legacy_remote_file_slot_affine_maps, pixdims_and_primary_planes, ) = _get_remote_medical_file_transform_requirements( remote_files_targeted_by_import, console ) if importer.__module__ == "darwin.importer.formats.nifti": maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse( importer, file_paths, console, use_multi_cpu, cpu_limit, legacy_remote_file_slot_affine_maps, pixdims_and_primary_planes, ) else: maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse( importer, file_paths, console, use_multi_cpu, cpu_limit, ) if not maybe_parsed_files: raise ValueError("Not able to parse any files.") parsed_files: List[AnnotationFile] = flatten_list(list(maybe_parsed_files)) filenames: List[str] = [ parsed_file.filename for parsed_file in parsed_files if parsed_file is not None ] console.print("Fetching remote file list...", style="info") # This call will only filter by filename; so can return a superset of matched files across different paths # There is logic in this function to then include paths to narrow down to the single correct matching file remote_files: Dict[str, Dict[str, Any]] = {} # Try to fetch files in large chunks; in case the filenames are too large and exceed the url size # retry in smaller chunks chunk_size = 100 while chunk_size > 0: try: remote_files = _get_remote_files_ready_for_import( dataset, filenames, chunk_size, ) break except RequestEntitySizeExceeded: chunk_size -= 8 if chunk_size <= 0: raise ValueError("Unable to fetch remote file list.") for parsed_file in parsed_files: if parsed_file.full_path not in remote_files: local_files_missing_remotely.append(parsed_file) else: local_files.append(parsed_file) annotation_format = _get_annotation_format(importer) local_files, slot_errors, slot_warnings = _verify_slot_annotation_alignment( local_files, remote_files ) _display_slot_warnings_and_errors( slot_errors, slot_warnings, annotation_format, console ) if annotation_format == "darwin": dataset.client.load_feature_flags() # Check if the flag exists. When the flag is deprecated in the future we will always perform this check static_instance_id_feature_flag_exists = any( feature.name == "STATIC_INSTANCE_ID" for feature in dataset.client.features.get(dataset.team, []) ) check_for_multi_instance_id_annotations = ( static_instance_id_feature_flag_exists and dataset.client.feature_enabled("STATIC_INSTANCE_ID") ) or not static_instance_id_feature_flag_exists if check_for_multi_instance_id_annotations: _warn_for_annotations_with_multiple_instance_ids(local_files, console) console.print( f"{len(local_files) + len(local_files_missing_remotely)} annotation file(s) found.", style="info", ) if local_files_missing_remotely: console.print( f"{len(local_files_missing_remotely)} file(s) are missing from the dataset", style="warning", ) for local_file in local_files_missing_remotely: console.print( f"\t{local_file.path}: '{local_file.full_path}'", style="warning" ) if class_prompt and not secure_continue_request(): return ( local_classes_not_in_dataset, local_classes_not_in_team, ) = _resolve_annotation_classes( [ annotation_class for file in local_files for annotation_class in file.annotation_classes ], classes_in_dataset, classes_in_team, ) console.print( f"{len(local_classes_not_in_team)} classes needs to be created.", style="info" ) console.print( f"{len(local_classes_not_in_dataset)} classes needs to be added to {dataset.identifier}", style="info", ) missing_skeletons: List[dt.AnnotationClass] = list( filter(_is_skeleton_class, local_classes_not_in_team) ) missing_skeleton_names: str = ", ".join(map(_get_skeleton_name, missing_skeletons)) if missing_skeletons: console.print( f"Found missing skeleton classes: {missing_skeleton_names}. Missing Skeleton classes cannot be created. Exiting now.", style="error", ) return if local_classes_not_in_team: console.print("About to create the following classes", style="info") for missing_class in local_classes_not_in_team: console.print( f"\t{missing_class.name}, type: {missing_class.annotation_internal_type or missing_class.annotation_type}", style="info", ) if class_prompt and not secure_continue_request(): return for missing_class in local_classes_not_in_team: dataset.create_annotation_class( missing_class.name, missing_class.annotation_internal_type or missing_class.annotation_type, ) if local_classes_not_in_dataset: console.print( f"About to add the following classes to {dataset.identifier}", style="info" ) for cls in local_classes_not_in_dataset: dataset.add_annotation_class(cls) # Refetch classes to update mappings if local_classes_not_in_team or local_classes_not_in_dataset: maybe_remote_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes() if not maybe_remote_classes: raise ValueError("Unable to fetch remote classes.") remote_classes = _build_main_annotations_lookup_table(maybe_remote_classes) else: remote_classes = _build_main_annotations_lookup_table(team_classes) if delete_for_empty: console.print( "Importing annotations...\nEmpty annotation file(s) will clear all existing annotations in matching remote files.", style="info", ) else: console.print( "Importing annotations...\nEmpty annotations will be skipped, if you want to delete annotations rerun with '--delete-for-empty'.", style="info", ) if not append and not overwrite: continue_to_overwrite = _overwrite_warning( dataset.client, dataset, local_files, remote_files, console ) if not continue_to_overwrite: return def import_annotation(parsed_file): image_id = remote_files[parsed_file.full_path]["item_id"] default_slot_name = remote_files[parsed_file.full_path]["slot_names"][0] if parsed_file.slots and parsed_file.slots[0].name: default_slot_name = parsed_file.slots[0].name metadata_path = is_properties_enabled(parsed_file.path) errors, _ = _import_annotations( dataset.client, image_id, remote_classes, attributes, parsed_file.annotations, parsed_file.item_properties, default_slot_name, dataset, append, delete_for_empty, import_annotators, import_reviewers, metadata_path, ) if errors: console.print(f"Errors importing {parsed_file.filename}", style="error") for error in errors: console.print(f"\t{error}", style="error") def process_local_file(local_file): if local_file is None: parsed_files = [] elif not isinstance(local_file, List): parsed_files = [local_file] else: parsed_files = local_file # Remove files missing on the server missing_files = [ missing_file.full_path for missing_file in local_files_missing_remotely ] parsed_files = [ parsed_file for parsed_file in parsed_files if parsed_file.full_path not in missing_files ] files_to_not_track = [ file_to_track for file_to_track in parsed_files if not (file_to_track.annotations or file_to_track.item_properties) and (not delete_for_empty) ] for file in files_to_not_track: console.print( f"{file.filename} has no annotations. Skipping upload...", style="warning", ) files_to_track = [ file for file in parsed_files if file not in files_to_not_track ] if files_to_track: _warn_unsupported_annotations(files_to_track) if use_multi_cpu: with concurrent.futures.ThreadPoolExecutor( max_workers=cpu_limit ) as executor: futures = [ executor.submit(import_annotation, file) for file in files_to_track ] for _ in tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc="Importing annotations from local file", ): future = next(concurrent.futures.as_completed(futures)) try: future.result() except Exception as exc: console.print( f"Generated an exception: {exc}", style="error" ) else: for file in tqdm( files_to_track, desc="Importing annotations from local file" ): import_annotation(file) if use_multi_cpu: with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_limit) as executor: futures = [ executor.submit(process_local_file, local_file) for local_file in local_files ] for _ in tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc="Processing local annotation files", ): future = next(concurrent.futures.as_completed(futures)) try: future.result() except Exception as exc: console.print(f"Generated an exception: {exc}", style="error") else: for local_file in tqdm(local_files, desc="Processing local annotation files"): process_local_file(local_file)
def _get_multi_cpu_settings( cpu_limit: Optional[int], cpu_count: int, use_multi_cpu: bool ) -> Tuple[int, bool]: if cpu_limit == 1 or cpu_count == 1 or not use_multi_cpu: return 1, False if cpu_limit is None: return max([cpu_count - 2, 2]), True return cpu_limit if cpu_limit <= cpu_count else cpu_count, True def _warn_unsupported_annotations(parsed_files: List[AnnotationFile]) -> None: console = Console(theme=_console_theme()) for parsed_file in parsed_files: skipped_annotations = [] for annotation in parsed_file.annotations: if annotation.annotation_class.annotation_type in UNSUPPORTED_CLASSES: skipped_annotations.append(annotation) if len(skipped_annotations) > 0: types = { c.annotation_class.annotation_type for c in skipped_annotations } # noqa: C417 console.print( f"Import of annotation class types '{', '.join(types)}' is not yet supported. Skipping {len(skipped_annotations)} " + "annotations from '{parsed_file.full_path}'.\n", style="warning", ) def _is_skeleton_class(the_class: dt.AnnotationClass) -> bool: return ( the_class.annotation_internal_type or the_class.annotation_type ) == "skeleton" def _get_skeleton_name(skeleton: dt.AnnotationClass) -> str: return skeleton.name def _handle_subs( annotation: dt.Annotation, data: dt.DictFreeForm, annotation_class_id: str, attributes: Dict[str, dt.UnknownType], include_empty_attributes: Optional[bool] = False, ) -> dt.DictFreeForm: for sub in annotation.subs: if sub.annotation_type == "text": data["text"] = {"text": sub.data} elif sub.annotation_type == "attributes": attributes_with_key = [] for attr in sub.data: if ( annotation_class_id in attributes and attr in attributes[annotation_class_id] ): attributes_with_key.append(attributes[annotation_class_id][attr]) else: print( f"The attribute '{attr}' for class '{annotation.annotation_class.name}' was not imported." ) data["attributes"] = {"attributes": attributes_with_key} elif sub.annotation_type == "instance_id": data["instance_id"] = {"value": sub.data} else: data[sub.annotation_type] = sub.data if not data.get("attributes") and include_empty_attributes: data["attributes"] = {"attributes": []} return data def _format_polygon_for_import( annotation: dt.Annotation, data: dt.DictFreeForm ) -> dt.DictFreeForm: if "polygon" in data: if len(annotation.data["paths"]) > 1: data["polygon"] = { "path": annotation.data["paths"][0], "additional_paths": annotation.data["paths"][1:], } elif len(annotation.data["paths"]) == 1: data["polygon"] = {"path": annotation.data["paths"][0]} return data def _annotators_or_reviewers_to_payload( actors: List[dt.AnnotationAuthor], role: dt.AnnotationAuthorRole ) -> List[dt.DictFreeForm]: return [{"email": actor.email, "role": role.value} for actor in actors] def _handle_reviewers( import_reviewers: bool, annotation: Optional[dt.Annotation] = None, item_property_value: Optional[Dict[str, Any]] = None, ) -> List[dt.DictFreeForm]: if import_reviewers: if annotation and annotation.reviewers: return _annotators_or_reviewers_to_payload( annotation.reviewers, dt.AnnotationAuthorRole.REVIEWER ) elif item_property_value and "reviewers" in item_property_value: return _annotators_or_reviewers_to_payload( _parse_annotators(item_property_value["reviewers"]), dt.AnnotationAuthorRole.REVIEWER, ) return [] def _handle_annotators( import_annotators: bool, annotation: Optional[dt.Annotation] = None, item_property_value: Optional[Dict[str, Any]] = None, ) -> List[dt.DictFreeForm]: if import_annotators: if annotation and annotation.annotators: return _annotators_or_reviewers_to_payload( annotation.annotators, dt.AnnotationAuthorRole.ANNOTATOR ) elif item_property_value and "annotators" in item_property_value: return _annotators_or_reviewers_to_payload( _parse_annotators(item_property_value["annotators"]), dt.AnnotationAuthorRole.ANNOTATOR, ) return [] def _handle_video_annotation_subs(annotation: dt.VideoAnnotation): """ Remove duplicate sub-annotations from the VideoAnnotation.annotation(s) to be imported. """ last_subs = None for frame_index, _annotation in annotation.frames.items(): _annotation: dt.Annotation subs = [] for sub in _annotation.subs: if ( last_subs is not None and all( any( last_sub.annotation_type == sub.annotation_type and last_sub.data == sub.data for last_sub in last_subs ) for sub in _annotation.subs ) and not annotation.keyframes[frame_index] ): # drop sub-annotation whenever we know it didn't change since last one # which likely wouldn't create on backend side sub-annotation keyframe. # this is a workaround for the backend not handling duplicate sub-annotations. continue subs.append(sub) last_subs = _annotation.subs _annotation.subs = subs def _get_annotation_data( annotation: dt.AnnotationLike, annotation_class_id: str, attributes: dt.DictFreeForm ) -> dt.DictFreeForm: annotation_class = annotation.annotation_class if isinstance(annotation, dt.VideoAnnotation): _handle_video_annotation_subs(annotation) data = annotation.get_data( only_keyframes=True, post_processing=lambda annotation, data: _handle_subs( annotation, _format_polygon_for_import(annotation, data), annotation_class_id, attributes, include_empty_attributes=True, ), ) else: data = {annotation_class.annotation_type: annotation.data} data = _format_polygon_for_import(annotation, data) data = _handle_subs(annotation, data, annotation_class_id, attributes) return data def _handle_slot_names( annotation: dt.Annotation, dataset_version: int, default_slot_name: str ) -> dt.Annotation: if not annotation.slot_names and dataset_version > 1: annotation.slot_names.extend([default_slot_name]) return annotation def _get_overwrite_value(append: bool) -> str: return "false" if append else "true" def _parse_empty_masks( annotation: dt.Annotation, raster_layer: dt.Annotation, raster_layer_dense_rle_ids: Optional[Set[str]] = None, raster_layer_dense_rle_ids_frames: Optional[Dict[int, Set[str]]] = None, ): """ Check if the mask is empty (i.e. masks that do not have a corresponding raster layer) if so, skip import of the mask. This function is used for both dt.Annotation and dt.VideoAnnotation objects. Args: annotation (dt.Annotation or dt.VideoAnnotation): annotation object to be imported raster_layer (dt.Annotation or dt.VideoAnnotation): raster layer object to be imported raster_layer_dense_rle_ids (Optional[Set[str]], optional): raster-layer dense_rle_ids. Defaults to None. raster_layer_dense_rle_ids_frames (Optional[Dict[int, Set[str]]], optional): raster-layer dense_rle_ids for each frame. Defaults to None. Returns: tuple[Optional[Set[str]], Optional[Dict[int, Set[str]]]]: raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames """ # For dt.VideoAnnotation, create dense_rle ids for each frame. if raster_layer_dense_rle_ids_frames is None and isinstance( annotation, dt.VideoAnnotation ): assert isinstance(raster_layer, dt.VideoAnnotation) # build a dict of frame_index: set of dense_rle_ids (for each frame in VideoAnnotation object) raster_layer_dense_rle_ids_frames = {} for frame_index, _rl in raster_layer.frames.items(): raster_layer_dense_rle_ids_frames[frame_index] = set( _rl.data["dense_rle"][::2] ) # check every frame # - if the 'annotation_class_id' is in raster_layer's mask_annotation_ids_mapping dict # - if the 'dense_rle_id' is in raster_layer's dense_rle_ids_frames dict # if not, skip import of the mask, and remove it from mask_annotation_ids_mapping for frame_index, _annotation in annotation.frames.items(): _annotation_id = _annotation.id if ( frame_index in raster_layer_dense_rle_ids_frames and raster_layer.frames[frame_index].data[ "mask_annotation_ids_mapping" ][_annotation_id] not in raster_layer_dense_rle_ids_frames[frame_index] ): # skip import of the mask, and remove it from mask_annotation_ids_mapping logger.warning( f"Skipping import of mask annotation '{_annotation.annotation_class.name}' as it does not have a corresponding raster layer" ) del raster_layer.frames[frame_index]["mask_annotation_ids_mapping"][ _annotation_id ] return raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames # For dt.Annotation, create dense_rle ids. elif raster_layer_dense_rle_ids is None and isinstance(annotation, dt.Annotation): assert isinstance(raster_layer, dt.Annotation) # build a set of dense_rle_ids (for the Annotation object) raster_layer_dense_rle_ids = set(raster_layer.data["dense_rle"][::2]) # check the annotation (i.e. mask) # - if the 'annotation_class_id' is in raster_layer's mask_annotation_ids_mapping dict # - if the 'dense_rle_id' is in raster_layer's dense_rle_ids dict # if not, skip import of the mask, and remove it from mask_annotation_ids_mapping _annotation_id = annotation.id if ( raster_layer.data["mask_annotation_ids_mapping"][_annotation_id] not in raster_layer_dense_rle_ids ): # skip import of the mask, and remove it from mask_annotation_ids_mapping logger.warning( f"Skipping import of mask annotation '{annotation.annotation_class.name}' as it does not have a corresponding raster layer" ) del raster_layer.data["mask_annotation_ids_mapping"][_annotation_id] return raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames return raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames def _import_annotations( client: "Client", # TODO: This is unused, should it be? id: Union[str, int], remote_classes: dt.DictFreeForm, attributes: dt.DictFreeForm, annotations: List[dt.Annotation], item_properties: List[Dict[str, str]], default_slot_name: str, dataset: "RemoteDataset", append: bool, delete_for_empty: bool, # TODO: This is unused, should it be? import_annotators: bool, import_reviewers: bool, metadata_path: Union[Path, bool] = False, ) -> Tuple[dt.ErrorList, dt.Success]: errors: dt.ErrorList = [] success: dt.Success = dt.Success.SUCCESS raster_layer: Optional[dt.Annotation] = None raster_layer_dense_rle_ids: Optional[Set[str]] = None raster_layer_dense_rle_ids_frames: Optional[Dict[int, Set[str]]] = None serialized_annotations = [] annotation_class_ids_map: Dict[Tuple[str, str], str] = {} for annotation in annotations: annotation_class = annotation.annotation_class annotation_type = ( annotation_class.annotation_internal_type or annotation_class.annotation_type ) if ( ( annotation_type not in remote_classes or annotation_class.name not in remote_classes[annotation_type] ) and annotation_type != "raster_layer" # We do not skip raster layers as they are always available. ): if annotation_type not in remote_classes: logger.warning( f"Annotation type '{annotation_type}' is not in the remote classes, skipping import of annotation '{annotation_class.name}'" ) else: logger.warning( f"Annotation '{annotation_class.name}' is not in the remote classes, skipping import" ) continue annotation_class_id: str = remote_classes[annotation_type][ annotation_class.name ] data = _get_annotation_data(annotation, annotation_class_id, attributes) # check if the mask is empty (i.e. masks that do not have a corresponding raster layer) if so, skip import of the mask if annotation_type == "mask": if raster_layer is None: raster_layer = next( ( a for a in annotations if a.annotation_class.annotation_type == "raster_layer" ), None, ) if raster_layer: ( raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames, ) = _parse_empty_masks( annotation, raster_layer, raster_layer_dense_rle_ids, raster_layer_dense_rle_ids_frames, ) actors: List[dt.DictFreeForm] = [] actors.extend(_handle_annotators(import_annotators, annotation=annotation)) actors.extend(_handle_reviewers(import_reviewers, annotation=annotation)) # Insert the default slot name if not available in the import source annotation = _handle_slot_names(annotation, dataset.version, default_slot_name) annotation_class_ids_map[(annotation_class.name, annotation_type)] = ( annotation_class_id ) serial_obj = { "annotation_class_id": annotation_class_id, "data": data, "context_keys": {"slot_names": annotation.slot_names}, } annotation.id = annotation.id or str(uuid.uuid4()) serial_obj["id"] = annotation.id if actors: serial_obj["actors"] = actors # type: ignore serialized_annotations.append(serial_obj) annotation_id_property_map = _import_properties( metadata_path, item_properties, client, annotations, # type: ignore annotation_class_ids_map, dataset, ) _update_payload_with_properties(serialized_annotations, annotation_id_property_map) serialized_item_level_properties = _serialize_item_level_properties( item_properties, client, dataset, import_annotators, import_reviewers ) payload: dt.DictFreeForm = {"annotations": serialized_annotations} if serialized_item_level_properties: payload["properties"] = serialized_item_level_properties payload["overwrite"] = _get_overwrite_value(append) try: dataset.import_annotation(id, payload=payload) except RequestEntitySizeExceeded: logger.warning( "Annotation payload exceeds request entity size. Splitting payload into smaller chunks for import." ) payloads = _split_payloads(payload) for chunked_payload in payloads: try: dataset.import_annotation(id, payload=chunked_payload) except Exception as e: errors.append(e) success = dt.Success.FAILURE except Exception as e: errors.append(e) success = dt.Success.FAILURE return errors, success # mypy: ignore-errors def _console_theme() -> Theme: return Theme( { "success": "bold green", "warning": "bold yellow", "error": "bold red", "info": "bold deep_sky_blue1", } ) def _overwrite_warning( client: "Client", dataset: "RemoteDataset", local_files: List[dt.AnnotationFile], remote_files: Dict[str, Dict[str, Any]], console: Console, ) -> bool: """ Determines if any dataset items targeted for import already have annotations or item-level properties that will be overwritten. If they do, a warning is displayed to the user and they are prompted to confirm if they want to proceed with the import. Parameters ---------- client : Client The Darwin Client object. dataset : RemoteDataset The dataset where the annotations will be imported. files : List[dt.AnnotationFile] The list of local annotation files to will be imported. remote_files : Dict[str, Tuple[str, str]] A dictionary of the remote files in the dataset. console : Console The console object. Returns ------- bool True if the user wants to proceed with the import, False otherwise. """ files_with_annotations_to_overwrite = [] files_with_item_properties_to_overwrite = [] for local_file in local_files: item_id = remote_files.get(local_file.full_path)["item_id"] # type: ignore # Check if the item has annotations that will be overwritten remote_annotations = client.api_v2._get_remote_annotations( item_id, dataset.team, ) if ( remote_annotations and local_file.full_path not in files_with_annotations_to_overwrite ): files_with_annotations_to_overwrite.append(local_file.full_path) # Check if the item has item-level properties that will be overwritten if local_file.item_properties: response: Dict[str, List[Dict[str, str]]] = ( client.api_v2._get_properties_state_for_item(item_id, dataset.team) ) item_property_ids_with_populated_values = [ property_data["id"] for property_data in response["properties"] if property_data["values"] ] if item_property_ids_with_populated_values: files_with_item_properties_to_overwrite.append(local_file.full_path) if files_with_annotations_to_overwrite or files_with_item_properties_to_overwrite: # Overwriting of annotations if files_with_annotations_to_overwrite: console.print( f"The following {len(files_with_annotations_to_overwrite)} dataset item(s) have annotations that will be overwritten by this import:", style="warning", ) for file in files_with_annotations_to_overwrite: console.print(f"- {file}", style="warning") # Overwriting of item-level-properties if files_with_item_properties_to_overwrite: console.print( f"The following {len(files_with_item_properties_to_overwrite)} dataset item(s) have item-level properties that will be overwritten by this import:", style="warning", ) for file in files_with_item_properties_to_overwrite: console.print(f"- {file}", style="warning") proceed = input("Do you want to proceed with the import? [y/N] ") if proceed.lower() != "y": return False return True def _get_annotation_format( importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]], ) -> str: """ Returns the annotation format of the importer used to parse local annotation files Parameters ---------- importer : Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]] The importer used to parse local annotation files Returns ------- annotation_format : str The annotation format of the importer used to parse local files """ return importer.__module__.split(".")[3] def _verify_slot_annotation_alignment( local_files: List[dt.AnnotationFile], remote_files: Dict[str, Dict[str, Any]], ) -> Tuple[List[dt.AnnotationFile], Dict[str, List[str]], Dict[str, List[str]]]: """ Runs slot alignment validation against annotations being imported. The following checks are run: - For multi-slotted items: - For every annotation not uploaded to a specific slot: A non-blocking warning is generated explaining that it will be uploaded to the default slot - For multi-channeled items: - For every annotation not uploaded to a specific slot: A non-blocking warning is generated explaining that it will be uploaded to the base slot - For every annotation uploaded to a slot other than the base slot: A blocking error is generated explaining that annotations can only be uploaded to the base slot of multi-channeled items Files that generate exclusively non-blocking warnings will have those warnings displayed, but their import will continue. Files that generate any blocking error will only have their blocking errors displayed, and they are removed from the list of files to be imported. Errors can only be generated by referring to multi-slotted or multi-channeled items. These concepts are only supported by Darwin JSON 2.0, so stop imports of other formats if any warnings occur. Parameters ---------- local_files : List[dt.AnnotationFile] A list of local annotation files to be uploaded remote_files : Dict[str, Dict[str, Any]] Information about each remote dataset item that corresponds to the local annotation file being uploaded Returns ------- local_files : List[dt.AnnotationFile] A pruned list of the input annotation flies. It excludes any input files that generated a blocking warning slot_errors : Dict[str, List[str]] A dictionary of blocking errors for each file slot_warnings : Dict[str, List[str]] A dictionary of non-blocking warnings for each file """ slot_errors, slot_warnings = {}, {} for local_file in local_files: remote_file = remote_files[local_file.full_path] local_file_path = str(local_file.path) if len(remote_file["slot_names"]) == 1: continue # Skip single-slotted items base_slot = remote_file["slot_names"][0] layout_version = remote_file["layout"]["version"] if layout_version == 1 or layout_version == 2: # Multi-slotted item for annotation in local_file.annotations: try: annotation_slot = annotation.slot_names[0] except IndexError: if local_file_path not in slot_warnings: slot_warnings[local_file_path] = [] slot_warnings[local_file_path].append( f"Annotation imported to multi-slotted item not assigned slot. Uploading to the default slot: {base_slot}" ) elif layout_version == 3: # Channeled item for annotation in local_file.annotations: try: annotation_slot = annotation.slot_names[0] except IndexError: if local_file_path not in slot_warnings: slot_warnings[local_file_path] = [] slot_warnings[local_file_path].append( f"Annotation imported to multi-channeled item not assigned a slot. Uploading to the base slot: {base_slot}" ) annotation.slot_names = [base_slot] continue if annotation_slot != base_slot: if local_file_path not in slot_errors: slot_errors[local_file_path] = [] slot_errors[local_file_path].append( f"Annotation is linked to slot {annotation_slot} of the multi-channeled item {local_file.full_path}. Annotations uploaded to multi-channeled items have to be uploaded to the base slot, which for this item is {base_slot}." ) else: raise Exception(f"Unknown layout version: {layout_version}") # Remove non-blocking warnings if there are corresponding blocking warnings for key in slot_errors.keys(): if key in slot_warnings: del slot_warnings[key] local_files = [ local_file for local_file in local_files if str(local_file.path) not in slot_errors ] return local_files, slot_errors, slot_warnings def _display_slot_warnings_and_errors( slot_errors: Dict[str, List[str]], slot_warnings: Dict[str, List[str]], annotation_format: str, console: Console, ) -> None: """ Displays slot warnings and errors. Parameters ---------- local_files : List[dt.AnnotationFile] A list of local annotation files to be uploaded slot_errors : Dict[str, List[str]] A dictionary of blocking warnings for each file slot_warnings : Dict[str, List[str]] A dictionary of non-blocking warnings for each file annotation_format : str The annotation format of the importer used to parse local files console : Console The console object Raises ------ TypeError If there are any warnings generated and the annotation format is not Darwin JSON 2.0 or NifTI """ # Warnings can only be generated by referring to slots, which is only supported by the Darwin JSON & NiFTI formats # Therefore, stop imports of all other formats if there are any warnings supported_formats = ["darwin", "nifti"] if (slot_errors or slot_warnings) and annotation_format not in supported_formats: raise TypeError( "You are attempting to import annotations to multi-slotted or multi-channeled items using an annotation format that doesn't support them. To import annotations to multi-slotted or multi-channeled items, please use the Darwin JSON 2.0 format: https://docs.v7labs.com/reference/darwin-json" ) if slot_warnings: console.print( f"WARNING: {len(slot_warnings)} file(s) have the following non-blocking warnings. Imports of these files will continue:", style="warning", ) for file in slot_warnings: console.print(f"- File: {file}, warnings:", style="info") for warning in slot_warnings[file]: console.print(f" - {warning}") if slot_errors: console.print( f"WARNING: {len(slot_errors)} file(s) have the following blocking issues and will not be imported. Please resolve these issues and re-import them.", style="warning", ) for file in slot_errors: console.print(f"- File: {file}, issues:", style="info") for warning in slot_errors[file]: console.print(f" - {warning}") def _warn_for_annotations_with_multiple_instance_ids( local_files: List[dt.AnnotationFile], console: Console ) -> None: """ Warns the user if any video annotations have multiple unique instance IDs. This function checks each video annotation in the provided list of local annotation files for multiple instance ID values. A warning is printed to the console for each instance of this occurrence. Parameters ---------- local_files : List[dt.AnnotationFile] A list of local annotation files to be checked. console : Console The console object used to print warnings and messages. """ files_with_multi_instance_id_annotations = {} files_with_video_annotations = [ local_file for local_file in local_files if local_file.is_video ] for file in files_with_video_annotations: for annotation in file.annotations: unique_instance_ids = [] for frame_idx in annotation.frames: # type: ignore for subannotation in annotation.frames[frame_idx].subs: # type: ignore if subannotation.annotation_type == "instance_id": instance_id = subannotation.data if instance_id not in unique_instance_ids: unique_instance_ids.append(instance_id) if len(unique_instance_ids) > 1: if file.path not in files_with_multi_instance_id_annotations: files_with_multi_instance_id_annotations[file.path] = 1 else: files_with_multi_instance_id_annotations[file.path] += 1 if files_with_multi_instance_id_annotations: console.print( "The following files have annotation(s) with multiple instance ID values. Instance IDs are static, so only the first instance ID of each annotation will be imported:", style="warning", ) for file in files_with_multi_instance_id_annotations: console.print( f"- File: {file} has {files_with_multi_instance_id_annotations[file]} annotation(s) with multiple instance IDs" ) def _split_payloads( payload: Dict[str, Any], max_payload_size: int = 32_000_000 ) -> List[Dict[str, Any]]: """ This function takes an input payload and splits it into smaller payloads, ensuring each chunk does not exceed the specified maximum size. This is useful when importing annotations, as it prevents HTTP 413 errors (`RequestEntitySizeExceeded`) from occurring due to large request entity sizes. Parameters ---------- payload : Dict[str, Any] The input payload to be split. max_payload_size : int, optional The maximum size of each split payload. Defaults to 32,000,000 bytes. Returns ------- List[Dict[str, Any]] A list of split payloads, each not exceeding the specified maximum size. Raises ------ ValueError If any single annotation exceeds the `max_payload_size` limit """ payloads = [] base_payload = {"annotations": [], "overwrite": payload["overwrite"]} current_payload = copy.deepcopy(base_payload) current_payload_size = 0 for annotation in payload["annotations"]: annotation_size = len(json.dumps({"annotations": [annotation]}).encode("utf-8")) if current_payload_size + annotation_size < max_payload_size: current_payload["annotations"].append(annotation) current_payload_size += annotation_size else: if annotation_size > max_payload_size: raise ValueError( f"One or more annotations exceed the maximum allowed size of 32 MiB ({max_payload_size})" ) payloads.append(current_payload) current_payload = copy.deepcopy(base_payload) current_payload["overwrite"] = ( False # Required to make sure subsequent payloads don't overwrite previous ones ) current_payload["annotations"].append(annotation) current_payload_size = annotation_size if current_payload["annotations"]: payloads.append(current_payload) return payloads def _get_remote_files_targeted_by_import( importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]], file_paths: List[PathLike], dataset: "RemoteDataset", console: Optional[Console] = None, use_multi_cpu: bool = True, cpu_limit: int = 1, ) -> List[DatasetItem]: """ Parses local annotations files for import and returns a list of remote dataset items targeted by the import. Handles chunking of requests if there are many files to avoid URL length issues. Parameters ---------- importer: Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]] The importer used to parse local annotation files file_paths: List[PathLike] A list of local annotation files to be uploaded dataset: RemoteDataset The remote dataset to fetch files from console: Optional[Console] The console object use_multi_cpu: bool Whether to use multi-CPU processing cpu_limit: int The number of CPUs to use for processing Returns ------- List[DatasetItem] A list of remote dataset items targeted by the import Raises ------ ValueError If no files could be parsed or if the URL becomes too long even with minimum chunk size """ maybe_parsed_files = _find_and_parse( importer, file_paths, console, use_multi_cpu, cpu_limit, ) if not maybe_parsed_files: raise ValueError("Not able to parse any files.") remote_filenames = list({file.filename for file in maybe_parsed_files}) remote_filepaths = [file.full_path for file in maybe_parsed_files] all_remote_files: List[DatasetItem] = [] current_chunk: List[str] = [] current_length = BASE_URL_LENGTH max_chunk_length = MAX_URL_LENGTH - BASE_URL_LENGTH for filename in remote_filenames: filename_length = len(filename) + FILENAME_OVERHEAD if current_length + filename_length > max_chunk_length and current_chunk: remote_files = dataset.fetch_remote_files( filters={"item_names": current_chunk} ) all_remote_files.extend(remote_files) current_chunk = [] current_length = BASE_URL_LENGTH current_chunk.append(filename) current_length += filename_length if current_chunk: remote_files = dataset.fetch_remote_files(filters={"item_names": current_chunk}) all_remote_files.extend(remote_files) return [ remote_file for remote_file in all_remote_files if remote_file.full_path in remote_filepaths ] def _parse_plane_map( medical_metadata: Dict[str, Any], slot_name: str, remote_file_path: str, console: Console, ) -> Optional[str]: """Parse and validate the plane map from medical metadata. Args: medical_metadata: Medical metadata dictionary slot_name: Name of the slot remote_file_path: Path to the remote file console: Console for logging warnings Returns: The plane map string if valid, None otherwise """ try: return medical_metadata["plane_map"][slot_name] except (KeyError, TypeError): console.print( f"Missing plane_map for slot {slot_name} in file {remote_file_path}", style="warning", ) return None def _parse_pixdims( medical_metadata: Dict[str, Any], slot_name: str, remote_file_path: str, console: Console, ) -> Optional[List[float]]: """Parse and validate the pixdims from medical metadata. Args: medical_metadata: Medical metadata dictionary slot_name: Name of the slot remote_file_path: Path to the remote file console: Console for logging warnings Returns: List of float pixdims if valid, None otherwise """ try: raw_pixdims = medical_metadata.get("pixdims") if not raw_pixdims: console.print( f"Missing pixdims for slot {slot_name} in file {remote_file_path}", style="warning", ) return None return [float(dim) for dim in raw_pixdims] except (ValueError, TypeError): console.print( f"Invalid pixdims format for slot {slot_name} in file {remote_file_path}", style="warning", ) return None def _parse_affine( medical_metadata: Dict[str, Any], slot_name: str, remote_file_path: str, console: Console, ) -> Optional[np.ndarray]: """Parse and validate the affine matrix from medical metadata. Args: medical_metadata: Medical metadata dictionary slot_name: Name of the slot remote_file_path: Path to the remote file console: Console for logging warnings Returns: Numpy array of affine matrix if valid, None otherwise """ try: affine = medical_metadata.get("affine") if affine: return np.array(affine, dtype=np.float64) return None except (ValueError, TypeError): console.print( f"Invalid affine matrix format for slot {slot_name} in file {remote_file_path}", style="warning", ) return None def _get_remote_medical_file_transform_requirements( remote_files_targeted_by_import: List[DatasetItem], console: Console ) -> Tuple[Dict[Path, Dict[str, Any]], Dict[Path, Dict[Path, Tuple[List[float], str]]]]: """ Parse remote files targeted by import and extract medical file transform requirements. For medical files, extracts pixdims and primary plane information. For legacy NifTI files, extracts affine matrix information. Args: remote_files_targeted_by_import: List of remote files targeted by the import console: Console for logging warnings Returns: Tuple containing: - Dictionary mapping remote files to slot affine matrices for legacy NifTI scaling - Dictionary mapping remote files to pixdims and primary plane information """ legacy_remote_file_slot_affine_map = {} pixdims_and_primary_planes = {} for remote_file in remote_files_targeted_by_import: if not remote_file.slots: continue slot_pixdim_primary_plane_map = {} slot_affine_map = {} remote_path = Path(remote_file.full_path) for slot in remote_file.slots: if not slot_is_medical(slot): continue slot_name = slot["slot_name"] medical_metadata = slot["metadata"]["medical"] primary_plane = _parse_plane_map( medical_metadata, slot_name, remote_file.full_path, console ) pixdims = _parse_pixdims( medical_metadata, slot_name, remote_file.full_path, console ) if primary_plane is not None and pixdims is not None: slot_pixdim_primary_plane_map[slot_name] = (pixdims, primary_plane) # Parse affine matrix for legacy NifTI files if not slot_is_handled_by_monai(slot): affine = _parse_affine( medical_metadata, slot_name, remote_file.full_path, console ) if affine is not None: slot_affine_map[slot_name] = affine if slot_pixdim_primary_plane_map: pixdims_and_primary_planes[remote_path] = slot_pixdim_primary_plane_map if slot_affine_map: legacy_remote_file_slot_affine_map[remote_path] = slot_affine_map return legacy_remote_file_slot_affine_map, pixdims_and_primary_planes
[docs] def slot_is_medical(slot: Dict[str, Any]) -> bool: return slot.get("metadata", {}).get("medical") is not None
[docs] def slot_is_handled_by_monai(slot: Dict[str, Any]) -> bool: return slot.get("metadata", {}).get("medical", {}).get("handler") == "MONAI"
[docs] def set_text_property_value(annotation_property_map, annotation_id, a_prop, t_prop): if a_prop.value is None: # here we will remove the property value annotation_property_map[annotation_id][str(a_prop.frame_index)][t_prop.id] = [] else: annotation_property_map[annotation_id][str(a_prop.frame_index)][t_prop.id] = [ {"text": a_prop.value} ]