Source code for darwin.utils.utils

"""
Contains several unrelated utility functions used across the SDK.
"""

import platform
import re
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
)

import json_stream
import numpy as np
import orjson as json
import requests
from json_stream.base import PersistentStreamingJSONList, PersistentStreamingJSONObject
from jsonschema import validators
from natsort import natsorted
from requests import Response
from rich.progress import ProgressType, track
from upolygon import draw_polygon

import darwin.datatypes as dt
from darwin.config import Config
from darwin.exceptions import (
    MissingSchema,
    OutdatedDarwinJSONFormat,
    UnrecognizableFileEncoding,
    UnsupportedFileType,
)
from darwin.future.data_objects.properties import SelectedProperty

if TYPE_CHECKING:
    from darwin.client import Client


SUPPORTED_IMAGE_EXTENSIONS = [
    ".png",
    ".jpeg",
    ".jpg",
    ".jfif",
    ".tif",
    ".tiff",
    ".qtiff",
    ".bmp",
    ".svs",
    ".webp",
    ".JPEG",
    ".JPG",
]
SUPPORTED_VIDEO_EXTENSIONS = [
    ".avi",
    ".bpm",
    ".dcm",
    ".mov",
    ".mp4",
    ".mkv",
    ".hevc",
    ".pdf",
    ".nii",
    ".nii.gz",
    ".ndpi",
    ".rvg",
]
SUPPORTED_EXTENSIONS = SUPPORTED_IMAGE_EXTENSIONS + SUPPORTED_VIDEO_EXTENSIONS

# Define incompatible `item_merge_mode` arguments
PRESERVE_FOLDERS_KEY = "preserve_folders"
AS_FRAMES_KEY = "as_frames"
EXTRACT_VIEWS_KEY = "extract_views"

# Define reasons for blocking slot uploads
BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS = "ALREADY_EXISTS"
BLOCKED_UPLOAD_ERROR_FILE_UPLOAD_TIMEOUT = "FILE_UPLOAD_TIMEOUT"
BLOCKED_UPLOAD_ERROR_FILE_UPLOAD_FAILED = "FILE_UPLOAD_FAILED"
BLOCKED_UPLOAD_ERROR_UNEXPECTED_ERROR = "UNEXPECTED_ERROR"
BLOCKED_UPLOAD_ERROR_ITEM_COUNT_LIMIT_EXCEEDED = "ITEM_COUNT_LIMIT_EXCEEDED"

SLOTS_GRID_MAP = {
    1: [[["0"]]],
    2: [[["0"]], [["1"]]],
    3: [[["0"]], [["1"]], [["2"]]],
    4: [[["0"], ["2"]], [["1"], ["3"]]],
    5: [[["0"], ["3"]], [["1"], ["4"]], [["2"]]],
    6: [[["0"], ["3"]], [["1"], ["4"]], [["2"], ["5"]]],
    7: [[["0"], ["3"], ["6"]], [["1"], ["4"]], [["2"], ["5"]]],
    8: [[["0"], ["3"], ["6"]], [["1"], ["4"], ["7"]], [["2"], ["5"]]],
    9: [[["0"], ["3"], ["6"]], [["1"], ["4"], ["7"]], [["2"], ["5"], ["8"]]],
    10: [[["0"], ["4"], ["8"]], [["1"], ["5"], ["9"]], [["2"], ["6"]], [["3"], ["7"]]],
    11: [
        [["0"], ["4"], ["8"]],
        [["1"], ["5"], ["9"]],
        [["2"], ["6"], ["10"]],
        [["3"], ["7"]],
    ],
    12: [
        [["0"], ["4"], ["8"]],
        [["1"], ["5"], ["9"]],
        [["2"], ["6"], ["10"]],
        [["3"], ["7"], ["11"]],
    ],
    13: [
        [["0"], ["4"], ["8"], ["12"]],
        [["1"], ["5"], ["9"]],
        [["2"], ["6"], ["10"]],
        [["3"], ["7"], ["11"]],
    ],
    14: [
        [["0"], ["4"], ["8"], ["12"]],
        [["1"], ["5"], ["9"], ["13"]],
        [["2"], ["6"], ["10"]],
        [["3"], ["7"], ["11"]],
    ],
    15: [
        [["0"], ["4"], ["8"], ["12"]],
        [["1"], ["5"], ["9"], ["13"]],
        [["2"], ["6"], ["10"], ["14"]],
        [["3"], ["7"], ["11"]],
    ],
    16: [
        [["0"], ["4"], ["8"], ["12"]],
        [["1"], ["5"], ["9"], ["13"]],
        [["2"], ["6"], ["10"], ["14"]],
        [["3"], ["7"], ["11"], ["15"]],
    ],
}


_darwin_schema_cache = {}


[docs] def is_extension_allowed_by_filename(filename: str) -> bool: """ Returns whether or not the given video or image extension is allowed. Parameters ---------- filename : str The filename. Returns ------- bool Whether or not the given extension of the filename is allowed. """ return any(filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS)
[docs] def is_image_extension_allowed_by_filename(filename: str) -> bool: """ Returns whether or not the given image extension is allowed. Parameters ---------- filename : str The image extension. Returns ------- bool Whether or not the given extension is allowed. """ return any(filename.lower().endswith(ext) for ext in SUPPORTED_IMAGE_EXTENSIONS)
[docs] def is_file_extension_allowed(filename: str) -> bool: """ Returns whether or not the given image extension is allowed. Parameters ---------- filename : str The name of the file. Returns ------- bool Whether or not the given extension is allowed. """ return any(filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS)
[docs] def urljoin(*parts: str) -> str: """ Take as input an unpacked list of strings and joins them to form an URL. Parameters ---------- parts : str The list of strings to form the url. Returns ------- str The url. """ return "/".join(part.strip("/") for part in parts)
[docs] def is_project_dir(project_path: Path) -> bool: """ Verifies if the directory is a project from Darwin by inspecting its structure. Parameters ---------- project_path : Path Directory to examine Returns ------- bool Is the directory a project from Darwin? """ return (project_path / "releases").exists() and (project_path / "images").exists()
[docs] def get_progress_bar( array: List[dt.AnnotationFile], description: Optional[str] = None ) -> Iterable[ProgressType]: """ Get a rich a progress bar for the given list of annotation files. Parameters ---------- array : List[dt.AnnotationFile] The list of annotation files. description : Optional[str], default: None A description to show above the progress bar. Returns ------- Iterable[ProgressType] An iterable of ``ProgressType`` to show a progress bar. """ if description: return track(array, description=description) return track(array)
[docs] def prompt(msg: str, default: Optional[str] = None) -> str: """ Prompt the user on a CLI to input a message. Parameters ---------- msg : str Message to print. default : Optional[str], default: None Default values which is put between [] when the user is prompted. Returns ------- str The input from the user or the default value provided as parameter if user does not provide one. """ if default: msg = f"{msg} [{default}]: " else: msg = f"{msg}: " result = input(msg) if not result and default: return default return result
[docs] def find_files( files: List[dt.PathLike], *, files_to_exclude: List[dt.PathLike] = [], recursive: bool = True, sort: bool = False, ) -> List[Path]: """ Retrieve a list of all files belonging to supported extensions. The exploration can be made recursive and a list of files can be excluded if desired. Parameters ---------- files: List[dt.PathLike] List of files that will be filtered with the supported file extensions and returned. files_to_exclude : List[dt.PathLike] List of files to exclude from the search. recursive : bool Flag for recursive search. sort : bool Flag for sorting the files naturally, i.e. file2.txt will come before file10.txt. Returns ------- List[Path] List of all files belonging to supported extensions. Can't return None. """ found_files: List[Path] = [] pattern = "**/*" if recursive else "*" for f in files: path = Path(f) if path.is_dir(): found_files.extend( [ path_object for path_object in path.glob(pattern) if is_extension_allowed_by_filename(str(path_object)) ] ) elif is_extension_allowed_by_filename(str(path)): found_files.append(path) else: raise UnsupportedFileType(path) files_to_exclude_full_paths = [str(Path(f)) for f in files_to_exclude] filtered_files = [ f for f in found_files if str(f) not in files_to_exclude_full_paths ] if sort: return natsorted(filtered_files) return filtered_files
[docs] def secure_continue_request() -> bool: """ Asks for explicit approval from the user. Empty string not accepted. Returns ------- bool True if the user wishes to continue, False otherwise. """ return input("Do you want to continue? [y/N] ") in ["Y", "y"]
[docs] def persist_client_configuration( client: "Client", default_team: Optional[str] = None, config_path: Optional[Path] = None, ) -> Config: """ Authenticate user against the server and creates a configuration file for him/her. Parameters ---------- client : Client Client to take the configurations from. default_team : Optional[str], default: None The default team for the user. config_path : Optional[Path], default: None Specifies where to save the configuration file. Returns ------- Config A configuration object to handle YAML files. """ if not config_path: config_path = Path.home() / ".darwin" / "config.yaml" config_path.parent.mkdir(exist_ok=True) team_config: Optional[dt.Team] = client.config.get_default_team() if not team_config: raise ValueError("Unable to get default team.") config: Config = Config(config_path) config.set_team( team=team_config.slug, api_key=team_config.api_key, datasets_dir=team_config.datasets_dir, ) config.set_global( api_endpoint=client.url, base_url=client.base_url, default_team=default_team ) return config
def _get_local_filename(metadata: Dict[str, Any]) -> str: if "original_filename" in metadata: return metadata["original_filename"] else: return metadata["filename"] def _get_schema(data: dict) -> Optional[dict]: version = _parse_version(data) schema_url = data.get("schema_ref") or _default_schema(version) if not schema_url: return None if schema_url not in _darwin_schema_cache: response = requests.get(schema_url) response.raise_for_status() schema = response.json() _darwin_schema_cache[schema_url] = schema return _darwin_schema_cache[schema_url]
[docs] def validate_file_against_schema(path: Path) -> List: data, _ = load_data_from_file(path) return validate_data_against_schema(data)
[docs] def validate_data_against_schema(data) -> List: try: schema = _get_schema(data) except requests.exceptions.RequestException as e: raise MissingSchema(f"Error retrieving schema from url: {e}") if not schema: raise MissingSchema("Schema not found") validator = validators.Draft202012Validator(schema) errors = list(validator.iter_errors(data)) return errors
[docs] def attempt_decode(path: Path) -> dict: try: with path.open() as infile: data = json.loads(infile.read()) return data except Exception: pass encodings = ["utf-8", "utf-16", "utf-32", "ascii"] for encoding in encodings: try: with path.open(encoding=encoding) as infile: data = json.loads(infile.read()) return data except Exception: continue raise UnrecognizableFileEncoding( f"Unable to load file {path} with any encodings: {encodings}" )
[docs] def load_data_from_file(path: Path) -> Tuple[dict, dt.AnnotationFileVersion]: data = attempt_decode(path) version = _parse_version(data) return data, version
[docs] def parse_darwin_json( path: Path, count: Optional[int] = None ) -> Optional[dt.AnnotationFile]: """ Parses the given JSON file in v7's darwin proprietary format. Works for images, split frame videos (treated as images) and playback videos. Parameters ---------- path : Path Path to the file to parse. count : Optional[int] Optional count parameter. Used only if the 's image sequence is None. Returns ------- Optional[dt.AnnotationFile] An AnnotationFile with the information from the parsed JSON file, or None, if there were no annotations in the JSON. Raises ------ OutdatedDarwinJSONFormat If the given darwin video JSON file is missing the 'width' and 'height' keys in the 'image' dictionary. """ path = Path(path) data, version = load_data_from_file(path) if "annotations" not in data: return None return _parse_darwin_v2(path, data)
[docs] def stream_darwin_json(path: Path) -> PersistentStreamingJSONObject: """ Returns a Darwin JSON file as a persistent stream. This allows for parsing large files without loading them entirely into memory. Parameters ---------- path : Path Path to the file to parse. Returns ------- PersistentStreamingJSONObject A stream of the JSON file. """ with path.open() as infile: return json_stream.load(infile, persistent=True)
[docs] def get_image_path_from_stream( darwin_json: PersistentStreamingJSONObject, images_dir: Path, annotation_filepath: Path, with_folders: bool = True, ) -> Path: """ Returns the path to the image file associated with the given darwin json file. Compatible with Darwin JSON V2, as well as releases in folders and flat structures. Parameters ---------- darwin_json : PersistentStreamingJSONObject A stream of the JSON file. images_dir : Path Path to the directory containing the images. with_folders: bool Flag to determine if the release was pulled with or without folders. annotation_filepath : Path Path to the annotation file. Used if loading the JSON as a stream fails. Returns ------- Path Path to the image file. """ try: item_name_stem = Path(darwin_json["item"]["name"]).stem source_name_suffix = Path( darwin_json["item"]["slots"][0]["source_files"][0]["file_name"] ).suffix local_file_name = Path(item_name_stem + source_name_suffix) if not with_folders: return images_dir / local_file_name else: return ( images_dir / (Path(darwin_json["item"]["path"].lstrip("/\\"))) / local_file_name ) except OSError: # Load in the JSON as normal darwin_json = parse_darwin_json(path=annotation_filepath) if not with_folders: return images_dir / Path(darwin_json.filename) else: return images_dir / Path(darwin_json.full_path.lstrip("/\\"))
[docs] def is_stream_list_empty(json_list: PersistentStreamingJSONList) -> bool: try: json_list[0] except IndexError: return True return False
def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile: item = data["item"] item_source = item.get("source_info", {}) slots: List[dt.Slot] = list( filter(None, map(_parse_darwin_slot, item.get("slots", []))) ) annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( data ) annotation_classes: Set[dt.AnnotationClass] = { annotation.annotation_class for annotation in annotations } if len(slots) == 0: annotation_file = dt.AnnotationFile( version=_parse_version(data), path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), dataset_name=item.get("source_info", {}) .get("dataset", {}) .get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=False, image_width=None, image_height=None, image_url=None, image_thumbnail_url=None, workview_url=item_source.get("workview_url", None), seq=0, frame_urls=None, remote_path=item["path"], slots=slots, item_properties=data.get("properties", []), ) else: slot = slots[0] annotation_file = dt.AnnotationFile( version=_parse_version(data), path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), dataset_name=item.get("source_info", {}) .get("dataset", {}) .get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=slot.frame_urls is not None or slot.frame_manifest is not None, image_width=slot.width, image_height=slot.height, image_url=( None if len(slot.source_files or []) == 0 else slot.source_files[0].url ), image_thumbnail_url=slot.thumbnail_url, workview_url=item_source.get("workview_url", None), seq=0, frame_urls=slot.frame_urls, remote_path=item["path"], slots=slots, frame_count=slot.frame_count, item_properties=data.get("properties", []), ) return annotation_file
[docs] def get_annotations_in_slot( slot_name: str, annotations: Sequence[Union[dt.Annotation, dt.VideoAnnotation]] ) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: return [ annotation for annotation in annotations if hasattr(annotation, "slot_names") and annotation.slot_names and annotation.slot_names[0] == slot_name ]
def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot: source_files_data = data.get("source_files", []) source_files = [ dt.SourceFile(file_name=source_file["file_name"], url=source_file.get("url")) for source_file in source_files_data ] return dt.Slot( name=data["slot_name"], type=data["type"], width=data.get("width"), height=data.get("height"), source_files=source_files, thumbnail_url=data.get("thumbnail_url"), frame_count=data.get("frame_count"), frame_urls=data.get("frame_urls"), fps=data.get("fps"), metadata=data.get("metadata"), segments=data.get("segments", []), frame_manifest=data.get("frame_manifests"), ) def _parse_darwin_image( path: Path, data: Dict[str, Any], count: Optional[int] ) -> dt.AnnotationFile: annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( data ) annotation_classes: Set[dt.AnnotationClass] = { annotation.annotation_class for annotation in annotations } slot = dt.Slot( name=None, type="image", source_files=[ dt.SourceFile( file_name=_get_local_filename(data["image"]), url=data["image"].get("url"), ) ], thumbnail_url=data["image"].get("thumbnail_url"), width=data["image"].get("width"), height=data["image"].get("height"), metadata=data["image"].get("metadata"), ) annotation_file = dt.AnnotationFile( path=path, filename=_get_local_filename(data["image"]), annotation_classes=annotation_classes, annotations=annotations, is_video=False, image_width=data["image"].get("width"), image_height=data["image"].get("height"), image_url=data["image"].get("url"), workview_url=data["image"].get("workview_url"), seq=data["image"].get("seq", count), frame_urls=None, remote_path=data["image"].get("path", "/"), slots=[], image_thumbnail_url=data["image"].get("thumbnail_url"), ) annotation_file.slots.append(slot) return annotation_file def _parse_darwin_video( path: Path, data: Dict[str, Any], count: Optional[int] ) -> dt.AnnotationFile: annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations( data ) annotation_classes: Set[dt.AnnotationClass] = { annotation.annotation_class for annotation in annotations } if "width" not in data["image"] or "height" not in data["image"]: raise OutdatedDarwinJSONFormat( "Missing width/height in video, please re-export" ) slot = dt.Slot( name=None, type="video", source_files=[ dt.SourceFile( file_name=_get_local_filename(data["image"]), url=data["image"].get("url"), ) ], thumbnail_url=data["image"].get("thumbnail_url"), width=data["image"].get("width"), height=data["image"].get("height"), frame_count=data["image"].get("frame_count"), frame_urls=data["image"].get("frame_urls"), fps=data["image"].get("fps"), metadata=data["image"].get("metadata"), ) annotation_file = dt.AnnotationFile( path=path, filename=_get_local_filename(data["image"]), annotation_classes=annotation_classes, annotations=annotations, is_video=True, image_width=data["image"].get("width"), image_height=data["image"].get("height"), image_url=data["image"].get("url"), workview_url=data["image"].get("workview_url"), seq=data["image"].get("seq", count), frame_urls=data["image"].get("frame_urls"), remote_path=data["image"].get("path", "/"), slots=[], image_thumbnail_url=data["image"].get("thumbnail_url"), ) annotation_file.slots.append(slot) return annotation_file def _parse_darwin_annotation( annotation: Dict[str, Any], only_keyframes: bool = False, annotation_type: Optional[str] = None, annotation_data: Optional[Dict] = None, ) -> Optional[dt.Annotation]: slot_names = parse_slot_names(annotation) name: str = annotation["name"].strip() main_annotation: Optional[dt.Annotation] = None # Darwin JSON 2.0 representation of polygons if "polygon" in annotation and "paths" in annotation["polygon"]: bounding_box = annotation.get("bounding_box") paths = annotation["polygon"]["paths"] main_annotation = dt.make_polygon( name, paths, bounding_box, slot_names=slot_names ) elif "polygon" in annotation and "path" in annotation["polygon"]: bounding_box = annotation.get("bounding_box") path = annotation["polygon"]["path"] main_annotation = dt.make_polygon( name, path, bounding_box, slot_names=slot_names ) elif "bounding_box" in annotation: bounding_box = annotation["bounding_box"] main_annotation = dt.make_bounding_box( name, bounding_box["x"], bounding_box["y"], bounding_box["w"], bounding_box["h"], slot_names=slot_names, ) elif "tag" in annotation: main_annotation = dt.make_tag(name, slot_names=slot_names) elif "line" in annotation: main_annotation = dt.make_line( name, annotation["line"]["path"], slot_names=slot_names ) elif "keypoint" in annotation: main_annotation = dt.make_keypoint( name, annotation["keypoint"]["x"], annotation["keypoint"]["y"], slot_names=slot_names, ) elif "ellipse" in annotation: main_annotation = dt.make_ellipse( name, annotation["ellipse"], slot_names=slot_names ) elif "cuboid" in annotation: main_annotation = dt.make_cuboid( name, annotation["cuboid"], slot_names=slot_names ) elif "skeleton" in annotation: main_annotation = dt.make_skeleton( name, annotation["skeleton"]["nodes"], slot_names=slot_names ) elif "table" in annotation: main_annotation = dt.make_table( name, annotation["table"]["bounding_box"], annotation["table"]["cells"], slot_names=slot_names, ) elif "simple_table" in annotation: main_annotation = dt.make_simple_table( name, annotation["simple_table"]["bounding_box"], annotation["simple_table"]["col_offsets"], annotation["simple_table"]["row_offsets"], slot_names=slot_names, ) elif "string" in annotation: main_annotation = dt.make_string( name, annotation["string"]["sources"], slot_names=slot_names ) elif "graph" in annotation: main_annotation = dt.make_graph( name, annotation["graph"]["nodes"], annotation["graph"]["edges"], slot_names=slot_names, ) elif "mask" in annotation: main_annotation = dt.make_mask(name, slot_names=slot_names) elif "raster_layer" in annotation: raster_layer = annotation["raster_layer"] main_annotation = dt.make_raster_layer( name, raster_layer["mask_annotation_ids_mapping"], raster_layer["total_pixels"], raster_layer["dense_rle"], slot_names=slot_names, ) elif only_keyframes: main_annotation = make_keyframe_annotation( annotation_type, annotation_data, name, slot_names ) if not main_annotation: print(f"[WARNING] Unsupported annotation type: '{annotation.keys()}'") return None if "id" in annotation: main_annotation.id = annotation["id"] if "instance_id" in annotation: main_annotation.subs.append( dt.make_instance_id(annotation["instance_id"]["value"]) ) if "attributes" in annotation: main_annotation.subs.append(dt.make_attributes(annotation["attributes"])) if "text" in annotation: main_annotation.subs.append(dt.make_text(annotation["text"]["text"])) if "inference" in annotation: main_annotation.subs.append( dt.make_opaque_sub("inference", annotation["inference"]) ) if "directional_vector" in annotation: main_annotation.subs.append( dt.make_opaque_sub("directional_vector", annotation["directional_vector"]) ) if "measures" in annotation: main_annotation.subs.append( dt.make_opaque_sub("measures", annotation["measures"]) ) if annotation.get("annotators") is not None: main_annotation.annotators = _parse_annotators(annotation["annotators"]) if annotation.get("reviewers") is not None: main_annotation.reviewers = _parse_annotators(annotation["reviewers"]) if "properties" in annotation: main_annotation.properties = _parse_properties(annotation["properties"]) return main_annotation
[docs] def make_keyframe_annotation( annotation_type: Optional[str], annotation_data: Optional[Dict], name: str, slot_names: List[str], ) -> dt.Annotation: if annotation_type == "polygon": return dt.make_polygon( name, annotation_data["paths"], annotation_data["bounding_box"] ) elif annotation_type == "bounding_box": return dt.make_bounding_box( name, annotation_data["x"], annotation_data["y"], annotation_data["w"], annotation_data["h"], ) elif annotation_type == "tag": return dt.make_tag(name) elif annotation_type == "line": return dt.make_line(name, annotation_data["path"]) elif annotation_type == "keypoint": return dt.make_keypoint(name, annotation_data["x"], annotation_data["y"]) elif annotation_type == "ellipse": return dt.make_ellipse(name, annotation_data) elif annotation_type == "cuboid": return dt.make_cuboid(name, annotation_data) elif annotation_type == "skeleton": return dt.make_skeleton(name, annotation_data["nodes"]) elif annotation_type == "table": return dt.make_table( name, annotation_data["bounding_box"], annotation_data["cells"] ) elif annotation_type == "simple_table": return dt.make_simple_table( name, annotation_data["bounding_box"], annotation_data["col_offsets"], annotation_data["row_offsets"], ) elif annotation_type == "string": return dt.make_string(name, annotation_data["sources"]) elif annotation_type == "graph": return dt.make_graph(name, annotation_data["nodes"], annotation_data["edges"]) elif annotation_type == "mask": return dt.make_mask(name) elif annotation_type == "raster_layer": return dt.make_raster_layer( name, annotation_data["mask_annotation_ids_mapping"], annotation_data["total_pixels"], annotation_data["dense_rle"], ) else: raise ValueError(f"Unsupported annotation type: '{annotation_type}'")
[docs] def update_annotation_data( main_annotation_data: Dict[str, Any], annotation_type: Optional[str], annotation_data: Optional[Dict], ) -> Tuple[Optional[str], Optional[Dict]]: if annotation_type == "polygon": bounding_box = main_annotation_data.get("bounding_box") paths = main_annotation_data["paths"] annotation_data = {"paths": paths, "bounding_box": bounding_box} elif annotation_type == "bounding_box": annotation_data = { "x": main_annotation_data["x"], "y": main_annotation_data["y"], "w": main_annotation_data["w"], "h": main_annotation_data["h"], } elif annotation_type == "tag": annotation_data = {} elif annotation_type == "line": annotation_data = {"path": main_annotation_data["path"]} elif annotation_type == "keypoint": annotation_data = { "x": main_annotation_data["x"], "y": main_annotation_data["y"], } elif annotation_type == "ellipse": annotation_data = { "angle": main_annotation_data["angle"], "center": main_annotation_data["center"], "radius": main_annotation_data["radius"], } elif annotation_type == "cuboid": annotation_data = { "back": main_annotation_data["back"], "front": main_annotation_data["front"], } elif annotation_type == "skeleton": annotation_data = {"nodes": main_annotation_data["nodes"]} elif annotation_type == "table": annotation_type = "table" annotation_data = { "bounding_box": main_annotation_data["table"]["bounding_box"], "cells": main_annotation_data["table"]["cells"], } elif annotation_type == "string": annotation_data = {"sources": main_annotation_data["string"]["sources"]} elif annotation_type == "graph": annotation_data = { "nodes": main_annotation_data["graph"]["nodes"], "edges": main_annotation_data["graph"]["edges"], } elif annotation_type == "mask": annotation_data = {} elif annotation_type == "raster_layer": annotation_data = { "dense_rle": main_annotation_data["dense_rle"], "mask_annotation_ids_mapping": main_annotation_data[ "mask_annotation_ids_mapping" ], "total_pixels": main_annotation_data["total_pixels"], } return annotation_data
def _parse_darwin_video_annotation(annotation: dict) -> Optional[dt.VideoAnnotation]: name = annotation["name"].strip() frame_annotations = {} keyframes: Dict[int, bool] = {} frames = {**annotation.get("frames", {}), **annotation.get("sections", {})} only_keyframes = annotation.get("only_keyframes", False) annotation_type, annotation_data = None, None if only_keyframes: for f, frame in frames.items(): annotation_type, annotation_data = get_annotation_type_and_data( frame, annotation_type, annotation_data ) if annotation_type: break for f, frame in frames.items(): frame_annotations[int(f)] = _parse_darwin_annotation( {**frame, **{"name": name, "id": annotation.get("id", None)}}, only_keyframes, annotation_type, annotation_data, ) # If we hit a keyframe, we need to update annotation_data for frames later on that may be missing a main type if only_keyframes: annotation_data = update_annotation_data( frame_annotations[int(f)].data, annotation_type, annotation_data ) keyframes[int(f)] = frame.get("keyframe", False) if not frame_annotations or None in frame_annotations.values(): return None main_annotation = dt.make_video_annotation( frame_annotations, keyframes, annotation.get("ranges", annotation.get("segments", [])), annotation.get("interpolated", False), slot_names=parse_slot_names(annotation), properties=_parse_properties(annotation.get("properties", [])), hidden_areas=annotation.get("hidden_areas", []), ) if "id" in annotation: main_annotation.id = annotation["id"] if "annotators" in annotation: main_annotation.annotators = _parse_annotators(annotation["annotators"]) if annotation.get("reviewers") is not None: main_annotation.reviewers = _parse_annotators(annotation["reviewers"]) return main_annotation
[docs] def get_annotation_type_and_data( frame: Dict, annotation_type: str, annotation_data: Dict ) -> Tuple[Optional[str], Optional[Dict]]: """ Returns the type of a given video annotation and its data. """ if "polygon" in frame: if frame["polygon"]["paths"]: bounding_box = frame.get("bounding_box") paths = frame["polygon"]["paths"] annotation_type = "polygon" annotation_data = {"paths": paths, "bounding_box": bounding_box} else: bounding_box = frame.get("bounding_box") path = frame["polygon"]["paths"] annotation_type = "polygon" annotation_data = {"paths": path, "bounding_box": bounding_box} elif "bounding_box" in frame: bounding_box = frame["bounding_box"] annotation_type = "bounding_box" annotation_data = { "x": bounding_box["x"], "y": bounding_box["y"], "w": bounding_box["w"], "h": bounding_box["h"], } elif "tag" in frame: annotation_type = "tag" annotation_data = {} elif "line" in frame: annotation_type = "line" annotation_data = {"path": frame["line"]["path"]} elif "keypoint" in frame: annotation_type = "keypoint" annotation_data = { "x": frame["keypoint"]["x"], "y": frame["keypoint"]["y"], } elif "ellipse" in frame: annotation_type = "ellipse" annotation_data = frame["ellipse"] elif "cuboid" in frame: annotation_type = "cuboid" annotation_data = frame["cuboid"] elif "skeleton" in frame: annotation_type = "skeleton" annotation_data = {"nodes": frame["skeleton"]["nodes"]} elif "table" in frame: annotation_type = "table" annotation_data = { "bounding_box": frame["table"]["bounding_box"], "cells": frame["table"]["cells"], } elif "string" in frame: annotation_type = "string" annotation_data = {"sources": frame["string"]["sources"]} elif "graph" in frame: annotation_type = "graph" annotation_type = { "nodes": frame["graph"]["nodes"], "edges": frame["graph"]["edges"], } elif "mask" in frame: annotation_type = "mask" annotation_data = {} elif "raster_layer" in frame: raster_layer = frame["raster_layer"] annotation_type = "raster_layer" annotation_data = { "dense_rle": raster_layer["dense_rle"], "mask_annotation_ids_mapping": raster_layer["mask_annotation_ids_mapping"], "total_pixels": raster_layer["total_pixels"], } return annotation_type, annotation_data
def _parse_darwin_raster_annotation(annotation: dict) -> Optional[dt.Annotation]: if not annotation.get("raster_layer"): raise ValueError("Raster annotation must have a 'raster_layer' field") id: Optional[str] = annotation.get("id") name: Optional[str] = annotation.get("name") raster_layer: Optional[dt.JSONFreeForm] = annotation.get("raster_layer") slot_names: Optional[List[str]] = parse_slot_names(annotation) if not id or not name or not raster_layer: raise ValueError( "Raster annotation must have an 'id', 'name' and 'raster_layer' field" ) dense_rle, mask_annotation_ids_mapping, total_pixels = ( raster_layer.get("dense_rle", None), raster_layer.get("mask_annotation_ids_mapping", None), raster_layer.get("total_pixels", None), ) if not dense_rle or not mask_annotation_ids_mapping or not total_pixels: raise ValueError( "Raster annotation must have a 'dense_rle', 'mask_annotation_ids_mapping' and 'total_pixels' field" ) new_annotation = dt.Annotation( dt.AnnotationClass(name.strip(), "raster_layer"), { "dense_rle": dense_rle, "mask_annotation_ids_mapping": mask_annotation_ids_mapping, "total_pixels": total_pixels, }, slot_names=slot_names or [], id=id, ) return new_annotation def _parse_darwin_mask_annotation(annotation: dict) -> Optional[dt.Annotation]: id: Optional[str] = annotation.get("id") name: Optional[str] = annotation.get("name") mask: Optional[dt.JSONFreeForm] = annotation.get("mask") slot_names: Optional[List[str]] = parse_slot_names(annotation) if not id or not name or mask is None: raise ValueError("Mask annotation must have an 'id', 'name' and 'mask' field") if ("sparse_rle" in mask) and (mask["sparse_rle"] is not None): raise ValueError("Mask annotation field 'sparse_rle' must contain a null value") new_annotation = dt.Annotation( dt.AnnotationClass(name.strip(), "mask"), mask, slot_names=slot_names or [], id=id, ) return new_annotation def _parse_annotators(annotators: List[Dict[str, Any]]) -> List[dt.AnnotationAuthor]: if not (hasattr(annotators, "full_name") or not hasattr(annotators, "email")): raise AttributeError( "JSON file must contain annotators with 'full_name' and 'email' fields" ) return [ dt.AnnotationAuthor(annotator["full_name"], annotator["email"]) for annotator in annotators ] def _parse_properties( properties: List[Dict[str, Any]], ) -> Optional[List[SelectedProperty]]: selected_properties = [] for property in properties: frame_index = property.get("frame_index") selected_properties.append( SelectedProperty( frame_index=frame_index if frame_index is not None else "global", name=property.get("name", None), value=property.get("value", None), ) ) return selected_properties or None
[docs] def split_video_annotation(annotation: dt.AnnotationFile) -> List[dt.AnnotationFile]: """ Splits the given video ``AnnotationFile`` into several video ``AnnotationFile``s, one for each ``frame_url``. Parameters ---------- annotation : dt.AnnotationFile The video ``AnnotationFile`` we want to split. Returns ------- List[dt.AnnotationFile] A list with the split video ``AnnotationFile``\\s. Raises ------ AttributeError If the given ``AnnotationFile`` is not a video annotation, or if the given annotation has no ``frame_url`` attribute. """ if not annotation.is_video: raise AttributeError("This is not a video annotation") # changes here from annotation.frame_urls to annotation.frame_count with frame_urls as backup # due to addition of long videos feature, where frame_urls is no longer available. # frame_count should be available for both, however existing annotations will not have this if not annotation.frame_count and not annotation.frame_urls: raise AttributeError("This Annotation has no frames") urls = annotation.frame_urls or [None] * (annotation.frame_count or 1) frame_annotations = [] for i, frame_url in enumerate(urls): print(i) annotations = [ a.frames[i] for a in annotation.annotations if isinstance(a, dt.VideoAnnotation) and i in a.frames ] annotation_classes: Set[dt.AnnotationClass] = { annotation.annotation_class for annotation in annotations } filename: str = f"{Path(annotation.filename).stem}/{i:07d}.png" frame_annotations.append( dt.AnnotationFile( annotation.path, filename, annotation_classes, annotations, [], False, annotation.image_width, annotation.image_height, frame_url, annotation.workview_url, annotation.seq, dataset_name=annotation.dataset_name, item_id=annotation.item_id, slots=annotation.slots, remote_path=annotation.remote_path, ) ) return frame_annotations
[docs] def parse_slot_names(annotation: dict) -> List[str]: return annotation.get("slot_names", [])
[docs] def ispolygon(annotation: dt.AnnotationClass) -> bool: """ Returns whether or not the given ``AnnotationClass`` is a polygon. Parameters ---------- annotation : AnnotationClass The ``AnnotationClass`` to evaluate. Returns ------- ``True`` is the given ``AnnotationClass`` is a polygon, ``False`` otherwise. """ return annotation.annotation_type == "polygon"
[docs] def convert_polygons_to_sequences( polygons: List[Union[dt.Polygon, List[dt.Polygon]]], height: Optional[int] = None, width: Optional[int] = None, rounding: bool = True, ) -> List[List[Union[int, float]]]: """ Converts a list of polygons, encoded as a list of dictionaries of into a list of nd.arrays of coordinates. Parameters ---------- polygons : Iterable[dt.Polygon] Non empty list of coordinates in the format ``[{x: x1, y:y1}, ..., {x: xn, y:yn}]`` or a list of them as ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. height : Optional[int], default: None Maximum height for a polygon coordinate. width : Optional[int], default: None Maximum width for a polygon coordinate. rounding : bool, default: True Whether or not to round values when creating sequences. Returns ------- sequences: List[ndarray[float]] List of arrays of coordinates in the format [[x1, y1, x2, y2, ..., xn, yn], ..., [x1, y1, x2, y2, ..., xn, yn]] Raises ------ ValueError If the given list is a falsy value (such as ``[]``) or if it's structure is incorrect. """ if not polygons: raise ValueError("No polygons provided") # If there is a single polygon composing the instance then this is # transformed to polygons = [[{x: x1, y:y1}, ..., {x: xn, y:yn}]] list_polygons: List[dt.Polygon] = [] if isinstance(polygons[0], list): list_polygons = cast(List[dt.Polygon], polygons) else: list_polygons = cast(List[dt.Polygon], [polygons]) if not isinstance(list_polygons[0], list) or not isinstance( list_polygons[0][0], dict ): raise ValueError("Unknown input format") sequences: List[List[Union[int, float]]] = [] for polygon in list_polygons: path: List[Union[int, float]] = [] for point in polygon: # Clip coordinates to the image size x = max(min(point["x"], width - 1) if width else point["x"], 0) y = max(min(point["y"], height - 1) if height else point["y"], 0) if rounding: path.append(round(x)) path.append(round(y)) else: path.append(x) path.append(y) sequences.append(path) return sequences
[docs] def convert_xyxy_to_bounding_box(box: List[Union[int, float]]) -> dt.BoundingBox: """ Converts a list of xy coordinates representing a bounding box into a dictionary. This is used by in-platform model training. Parameters ---------- box : List[Union[int, float]] List of arrays of coordinates in the format [x1, y1, x2, y2] Returns ------- BoundingBox Bounding box in the format ``{x: x1, y: y1, h: height, w: width}``. Raises ------ ValueError If ``box`` has an incorrect format. """ if not isinstance(box[0], float) and not isinstance(box[0], int): raise ValueError("Unknown input format") x1, y1, x2, y2 = box width = x2 - x1 height = y2 - y1 return {"x": x1, "y": y1, "w": width, "h": height}
[docs] def convert_polygons_to_mask( polygons: List, height: int, width: int, value: Optional[int] = 1 ) -> np.ndarray: """ Converts a list of polygons, encoded as a list of dictionaries into an ``nd.array`` mask. Parameters ---------- polygons: list List of coordinates in the format ``[{x: x1, y:y1}, ..., {x: xn, y:yn}]`` or a list of them as ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. height : int The maximum height for the created mask. width : int The maximum width for the created mask. value : Optional[int], default: 1 The drawing value for ``upolygon``. Returns ------- ndarray ``ndarray`` mask of the polygon(s). """ sequence = convert_polygons_to_sequences(polygons, height=height, width=width) mask = np.zeros((height, width)).astype(np.uint8) draw_polygon(mask, sequence, value) return mask
[docs] def chunk(items: List[Any], size: int) -> Iterator[Any]: """ Splits the given list into chunks of the given size and yields them. Parameters ---------- items : List[Any] The list of items to be split. size : int The size of each split. Yields ------ Iterator[Any] A chunk of the of the given size. """ for i in range(0, len(items), size): yield items[i : i + size]
[docs] def is_unix_like_os() -> bool: """ Returns ``True`` if the executing OS is Unix-based (Ubuntu or MacOS, for example) or ``False`` otherwise. Returns -------- bool True for Unix-based systems, False otherwise. """ return platform.system() != "Windows"
[docs] def has_json_content_type(response: Response) -> bool: """ Returns ``True`` if response has application/json content type or ``False`` otherwise. Returns -------- bool True for application/json content type, False otherwise. """ return "application/json" in response.headers.get("content-type", "")
[docs] def get_response_content(response: Response) -> Any: """ Returns json content if response has application/json content-type, otherwise returns text. Returns -------- Any Json or text content. """ if has_json_content_type(response): return response.json() else: return response.text
def _parse_version(data: dict) -> dt.AnnotationFileVersion: version_string = data.get("version", "1.0") major, minor, suffix = re.findall(r"^(\d+)\.(\d+)(.*)$", version_string)[0] return dt.AnnotationFileVersion(int(major), int(minor), suffix) def _data_to_annotations( data: Dict[str, Any], ) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: raw_image_annotations = filter( lambda annotation: ( ("frames" not in annotation) and ("raster_layer" not in annotation) and ("mask" not in annotation) ), data["annotations"], ) raw_video_annotations = filter( lambda annotation: "frames" in annotation, data["annotations"] ) raw_raster_annotations = filter( lambda annotation: "raster_layer" in annotation, data["annotations"] ) raw_mask_annotations = filter( lambda annotation: "mask" in annotation, data["annotations"] ) image_annotations: List[dt.Annotation] = list( filter(None, map(_parse_darwin_annotation, raw_image_annotations)) ) video_annotations: List[dt.VideoAnnotation] = list( filter(None, map(_parse_darwin_video_annotation, raw_video_annotations)) ) raster_annotations: List[dt.Annotation] = list( filter(None, map(_parse_darwin_raster_annotation, raw_raster_annotations)) ) mask_annotations: List[dt.Annotation] = list( filter(None, map(_parse_darwin_mask_annotation, raw_mask_annotations)) ) return [ *image_annotations, *video_annotations, *raster_annotations, *mask_annotations, ] def _supported_schema_versions() -> Dict[Tuple[int, int, str], str]: return { ( 2, 0, "", ): "https://darwin-public.s3.eu-west-1.amazonaws.com/darwin_json/2.0/schema.json" } def _default_schema(version: dt.AnnotationFileVersion) -> Optional[str]: return _supported_schema_versions().get( (version.major, version.minor, version.suffix) )
[docs] def get_annotation_files_from_dir(path: Path) -> Iterator[str]: """ Returns an iterator of all the JSON annotation files in the given directory. Ignores the .v7/metadata.json properties manifest file if present. Parameters ---------- path : Path The directory to search for JSON annotation files. Returns ------- Iterator[str] An iterator of all the JSON annotation files in the given directory. """ return ( str(filepath) for filepath in sorted(path.glob("**/*.json")) if "/.v7/" not in str(filepath) and "\\.v7\\" not in str(filepath) )
[docs] def convert_sequences_to_polygons( sequences: List[Union[List[int], List[float]]], height: Optional[int] = None, width: Optional[int] = None, ) -> Dict[str, List[dt.Polygon]]: """ Converts a list of polygons, encoded as a list of dictionaries of into a list of nd.arrays of coordinates. This is used by the backend. Parameters ---------- sequences : List[Union[List[int], List[float]]] List of arrays of coordinates in the format ``[x1, y1, x2, y2, ..., xn, yn]`` or as a list of them as ``[[x1, y1, x2, y2, ..., xn, yn], ..., [x1, y1, x2, y2, ..., xn, yn]]``. height : Optional[int], default: None Maximum height for a polygon coordinate. width : Optional[int], default: None Maximum width for a polygon coordinate. Returns ------- Dict[str, List[dt.Polygon]] Dictionary with the key ``path`` containing a list of coordinates in the format of ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. Raises ------ ValueError If sequences is a falsy value (such as ``[]``) or if it is in an incorrect format. """ if not sequences: raise ValueError("No sequences provided") # If there is a single sequences composing the instance then this is # transformed to polygons = [[x1, y1, ..., xn, yn]] if not isinstance(sequences[0], list): sequences = [sequences] if not isinstance(sequences[0][0], (int, float)): raise ValueError("Unknown input format") def grouped(iterable, n): return zip(*[iter(iterable)] * n) polygons = [] for sequence in sequences: path = [] for x, y in grouped(sequence, 2): # Clip coordinates to the image size x = max(min(x, width - 1) if width else x, 0) y = max(min(y, height - 1) if height else y, 0) path.append({"x": x, "y": y}) polygons.append(path) return {"path": polygons}