Source code for darwin.utils

"""
Contains several unrelated utility functions used across the SDK.
"""

import platform
import re
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Set,
    Union,
    cast,
)

import deprecation
import numpy as np
import orjson as json
import requests
from jsonschema import exceptions, validators
from requests import Response, request
from rich.progress import ProgressType, track
from upolygon import draw_polygon

import darwin.datatypes as dt
from darwin.config import Config
from darwin.exceptions import (
    AnnotationFileValidationError,
    MissingSchema,
    OutdatedDarwinJSONFormat,
    UnknownAnnotationFileSchema,
    UnsupportedFileType,
)
from darwin.version import __version__

if TYPE_CHECKING:
    from darwin.client import Client


SUPPORTED_IMAGE_EXTENSIONS = [".png", ".jpeg", ".jpg", ".jfif", ".tif", ".tiff", ".bmp", ".svs", ".webp"]
SUPPORTED_VIDEO_EXTENSIONS = [
    ".avi",
    ".bpm",
    ".dcm",
    ".mov",
    ".mp4",
    ".pdf",
    ".nii",
    ".nii.gz",
    ".ndpi",
]
SUPPORTED_EXTENSIONS = SUPPORTED_IMAGE_EXTENSIONS + SUPPORTED_VIDEO_EXTENSIONS

_darwin_schema_cache = {}


[docs]def is_extension_allowed_by_filename(filename: str) -> bool: """ Returns whether or not the given video or image extension is allowed. Parameters ---------- filename : str The filename. Returns ------- bool Whether or not the given extension of the filename is allowed. """ return any([filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS])
[docs]@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__) def is_extension_allowed(extension: str) -> bool: """ Returns whether or not the given extension is allowed. @Deprecated. Use is_extension_allowed_by_filename instead, and pass full filename. This is due to the fact that some extensions now include multiple dots, e.g. .nii.gz Parameters ---------- extension : str The extension. Returns ------- bool Whether or not the given extension is allowed. """ return extension.lower() in SUPPORTED_EXTENSIONS
[docs]def is_image_extension_allowed_by_filename(filename: str) -> bool: """ Returns whether or not the given image extension is allowed. Parameters ---------- filename : str The image extension. Returns ------- bool Whether or not the given extension is allowed. """ return any([filename.lower().endswith(ext) for ext in SUPPORTED_IMAGE_EXTENSIONS])
[docs]@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__) def is_image_extension_allowed(extension: str) -> bool: """ Returns whether or not the given image extension is allowed. Parameters ---------- extension : str The image extension. Returns ------- bool Whether or not the given extension is allowed. """ return extension.lower() in SUPPORTED_IMAGE_EXTENSIONS
[docs]def is_video_extension_allowed_by_filename(extension: str) -> bool: """ Returns whether or not the given image extension is allowed. Parameters ---------- extension : str The image extension. Returns ------- bool Whether or not the given extension is allowed. """ return any([extension.lower().endswith(ext) for ext in SUPPORTED_VIDEO_EXTENSIONS])
[docs]@deprecation.deprecated(deprecated_in="0.8.4", current_version=__version__) def is_video_extension_allowed(extension: str) -> bool: """ Returns whether or not the given video extension is allowed. Parameters ---------- extension : str The video extension. Returns ------- bool Whether or not the given extension is allowed. """ return extension.lower() in SUPPORTED_VIDEO_EXTENSIONS
[docs]def urljoin(*parts: str) -> str: """ Take as input an unpacked list of strings and joins them to form an URL. Parameters ---------- parts : str The list of strings to form the url. Returns ------- str The url. """ return "/".join(part.strip("/") for part in parts)
[docs]def is_project_dir(project_path: Path) -> bool: """ Verifies if the directory is a project from Darwin by inspecting its structure. Parameters ---------- project_path : Path Directory to examine Returns ------- bool Is the directory a project from Darwin? """ return (project_path / "releases").exists() and (project_path / "images").exists()
[docs]def get_progress_bar(array: List[dt.AnnotationFile], description: Optional[str] = None) -> Iterable[ProgressType]: """ Get a rich a progress bar for the given list of annotation files. Parameters ---------- array : List[dt.AnnotationFile] The list of annotation files. description : Optional[str], default: None A description to show above the progress bar. Returns ------- Iterable[ProgressType] An iterable of ``ProgressType`` to show a progress bar. """ if description: return track(array, description=description) return track(array)
[docs]def prompt(msg: str, default: Optional[str] = None) -> str: """ Prompt the user on a CLI to input a message. Parameters ---------- msg : str Message to print. default : Optional[str], default: None Default values which is put between [] when the user is prompted. Returns ------- str The input from the user or the default value provided as parameter if user does not provide one. """ if default: msg = f"{msg} [{default}]: " else: msg = f"{msg}: " result = input(msg) if not result and default: return default return result
[docs]def find_files( files: List[dt.PathLike], *, files_to_exclude: List[dt.PathLike] = [], recursive: bool = True ) -> List[Path]: """ Retrieve a list of all files belonging to supported extensions. The exploration can be made recursive and a list of files can be excluded if desired. Parameters ---------- files: List[dt.PathLike] List of files that will be filtered with the supported file extensions and returned. files_to_exclude : List[dt.PathLike] List of files to exclude from the search. recursive : bool Flag for recursive search. Returns ------- List[Path] List of all files belonging to supported extensions. Can't return None. """ found_files: List[Path] = [] pattern = "**/*" if recursive else "*" for f in files: path = Path(f) if path.is_dir() == True: found_files.extend( [ path_object for path_object in path.glob(pattern) if is_extension_allowed_by_filename(str(path_object)) ] ) elif is_extension_allowed_by_filename(str(path)): found_files.append(path) else: raise UnsupportedFileType(path) files_to_exclude_full_paths = [str(Path(f)) for f in files_to_exclude] return [f for f in found_files if str(f) not in files_to_exclude_full_paths]
[docs]def secure_continue_request() -> bool: """ Asks for explicit approval from the user. Empty string not accepted. Returns ------- bool True if the user wishes to continue, False otherwise. """ return input("Do you want to continue? [y/N] ") in ["Y", "y"]
[docs]def persist_client_configuration( client: "Client", default_team: Optional[str] = None, config_path: Optional[Path] = None ) -> Config: """ Authenticate user against the server and creates a configuration file for him/her. Parameters ---------- client : Client Client to take the configurations from. default_team : Optional[str], default: None The default team for the user. config_path : Optional[Path], default: None Specifies where to save the configuration file. Returns ------- Config A configuration object to handle YAML files. """ if not config_path: config_path = Path.home() / ".darwin" / "config.yaml" config_path.parent.mkdir(exist_ok=True) team_config: Optional[dt.Team] = client.config.get_default_team() if not team_config: raise ValueError("Unable to get default team.") config: Config = Config(config_path) config.set_team(team=team_config.slug, api_key=team_config.api_key, datasets_dir=team_config.datasets_dir) config.set_global(api_endpoint=client.url, base_url=client.base_url, default_team=default_team) return config
def _get_local_filename(metadata: Dict[str, Any]) -> str: if "original_filename" in metadata: return metadata["original_filename"] else: return metadata["filename"] def _get_schema(data: dict) -> Optional[dict]: version = _parse_version(data) schema_url = data.get("schema_ref") or _default_schema(version) if not schema_url: return None if schema_url not in _darwin_schema_cache: response = requests.get(schema_url) response.raise_for_status() schema = response.json() _darwin_schema_cache[schema_url] = schema return _darwin_schema_cache[schema_url]
[docs]def validate_file_against_schema(path: Path) -> List: data, _ = load_data_from_file(path) return validate_data_against_schema(data)
[docs]def validate_data_against_schema(data) -> List: try: schema = _get_schema(data) except requests.exceptions.RequestException as e: raise MissingSchema(f"Error retrieving schema from url: {e}") if not schema: raise MissingSchema("Schema not found") validator = validators.Draft202012Validator(schema) errors = list(validator.iter_errors(data)) return errors
[docs]def load_data_from_file(path: Path): with path.open() as infile: data = json.loads(infile.read()) version = _parse_version(data) return data, version
[docs]def parse_darwin_json(path: Path, count: Optional[int]) -> Optional[dt.AnnotationFile]: """ Parses the given JSON file in v7's darwin proprietary format. Works for images, split frame videos (treated as images) and playback videos. Parameters ---------- path : Path Path to the file to parse. count : Optional[int] Optional count parameter. Used only if the 's image sequence is None. Returns ------- Optional[dt.AnnotationFile] An AnnotationFile with the information from the parsed JSON file, or None, if there were no annotations in the JSON. Raises ------ OutdatedDarwinJSONFormat If the given darwin video JSON file is missing the 'width' and 'height' keys in the 'image' dictionary. """ path = Path(path) data, version = load_data_from_file(path) if "annotations" not in data: return None if version.major == 2: return _parse_darwin_v2(path, data) else: if "fps" in data["image"] or "frame_count" in data["image"]: return _parse_darwin_video(path, data, count) else: return _parse_darwin_image(path, data, count)
def _parse_darwin_v2(path: Path, data: Dict[str, Any]) -> dt.AnnotationFile: item = data["item"] item_source = item.get("source_info", {}) slots: List[dt.Slot] = list(filter(None, map(_parse_darwin_slot, item.get("slots", [])))) annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) annotation_classes: Set[dt.AnnotationClass] = set([annotation.annotation_class for annotation in annotations]) if len(slots) == 0: annotation_file = dt.AnnotationFile( version=_parse_version(data), path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), dataset_name=item.get("source_info", {}).get("dataset", {}).get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=False, image_width=None, image_height=None, image_url=None, image_thumbnail_url=None, workview_url=item_source.get("workview_url", None), seq=0, frame_urls=None, remote_path=item["path"], slots=slots, ) else: slot = slots[0] annotation_file = dt.AnnotationFile( version=_parse_version(data), path=path, filename=item["name"], item_id=item.get("source_info", {}).get("item_id", None), dataset_name=item.get("source_info", {}).get("dataset", {}).get("name", None), annotation_classes=annotation_classes, annotations=annotations, is_video=slot.frame_urls is not None, image_width=slot.width, image_height=slot.height, image_url=None if len(slot.source_files or []) == 0 else slot.source_files[0]["url"], image_thumbnail_url=slot.thumbnail_url, workview_url=item_source["workview_url"], seq=0, frame_urls=slot.frame_urls, remote_path=item["path"], slots=slots, ) return annotation_file def _parse_darwin_slot(data: Dict[str, Any]) -> dt.Slot: return dt.Slot( name=data["slot_name"], type=data["type"], width=data.get("width"), height=data.get("height"), source_files=data.get("source_files", []), thumbnail_url=data.get("thumbnail_url"), frame_count=data.get("frame_count"), frame_urls=data.get("frame_urls"), fps=data.get("fps"), metadata=data.get("metadata"), ) def _parse_darwin_image(path: Path, data: Dict[str, Any], count: Optional[int]) -> dt.AnnotationFile: annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) annotation_classes: Set[dt.AnnotationClass] = set([annotation.annotation_class for annotation in annotations]) slot = dt.Slot( name=None, type="image", source_files=[{"url": data["image"].get("url"), "file_name": _get_local_filename(data["image"])}], thumbnail_url=data["image"].get("thumbnail_url"), width=data["image"].get("width"), height=data["image"].get("height"), metadata=data["image"].get("metadata"), ) annotation_file = dt.AnnotationFile( path=path, filename=_get_local_filename(data["image"]), annotation_classes=annotation_classes, annotations=annotations, is_video=False, image_width=data["image"].get("width"), image_height=data["image"].get("height"), image_url=data["image"].get("url"), workview_url=data["image"].get("workview_url"), seq=data["image"].get("seq", count), frame_urls=None, remote_path=data["image"].get("path", "/"), slots=[], image_thumbnail_url=data["image"].get("thumbnail_url"), ) annotation_file.slots.append(slot) return annotation_file def _parse_darwin_video(path: Path, data: Dict[str, Any], count: Optional[int]) -> dt.AnnotationFile: annotations: List[Union[dt.Annotation, dt.VideoAnnotation]] = _data_to_annotations(data) annotation_classes: Set[dt.AnnotationClass] = set([annotation.annotation_class for annotation in annotations]) if "width" not in data["image"] or "height" not in data["image"]: raise OutdatedDarwinJSONFormat("Missing width/height in video, please re-export") slot = dt.Slot( name=None, type="video", source_files=[{"url": data["image"].get("url"), "file_name": _get_local_filename(data["image"])}], thumbnail_url=data["image"].get("thumbnail_url"), width=data["image"].get("width"), height=data["image"].get("height"), frame_count=data["image"].get("frame_count"), frame_urls=data["image"].get("frame_urls"), fps=data["image"].get("fps"), metadata=data["image"].get("metadata"), ) annotation_file = dt.AnnotationFile( path=path, filename=_get_local_filename(data["image"]), annotation_classes=annotation_classes, annotations=annotations, is_video=True, image_width=data["image"].get("width"), image_height=data["image"].get("height"), image_url=data["image"].get("url"), workview_url=data["image"].get("workview_url"), seq=data["image"].get("seq", count), frame_urls=data["image"].get("frame_urls"), remote_path=data["image"].get("path", "/"), slots=[], image_thumbnail_url=data["image"].get("thumbnail_url"), ) annotation_file.slots.append(slot) return annotation_file def _parse_darwin_annotation(annotation: Dict[str, Any]) -> Optional[dt.Annotation]: slot_names = parse_slot_names(annotation) name: str = annotation["name"] main_annotation: Optional[dt.Annotation] = None # Darwin JSON 2.0 representation of complex polygons if "polygon" in annotation and "paths" in annotation["polygon"] and len(annotation["polygon"]["paths"]) > 1: bounding_box = annotation.get("bounding_box") paths = annotation["polygon"]["paths"] main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) # Darwin JSON 2.0 representation of simple polygons elif "polygon" in annotation and "paths" in annotation["polygon"] and len(annotation["polygon"]["paths"]) == 1: bounding_box = annotation.get("bounding_box") paths = annotation["polygon"]["paths"] main_annotation = dt.make_polygon(name, paths[0], bounding_box, slot_names=slot_names) # Darwin JSON 1.0 representation of complex and simple polygons elif "polygon" in annotation: bounding_box = annotation.get("bounding_box") if "additional_paths" in annotation["polygon"]: paths = [annotation["polygon"]["path"]] + annotation["polygon"]["additional_paths"] main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) else: main_annotation = dt.make_polygon(name, annotation["polygon"]["path"], bounding_box, slot_names=slot_names) # Darwin JSON 1.0 representation of complex polygons elif "complex_polygon" in annotation: bounding_box = annotation.get("bounding_box") if isinstance(annotation["complex_polygon"]["path"][0], list): paths = annotation["complex_polygon"]["path"] else: paths = [annotation["complex_polygon"]["path"]] if "additional_paths" in annotation["complex_polygon"]: paths.extend(annotation["complex_polygon"]["additional_paths"]) main_annotation = dt.make_complex_polygon(name, paths, bounding_box, slot_names=slot_names) elif "bounding_box" in annotation: bounding_box = annotation["bounding_box"] main_annotation = dt.make_bounding_box( name, bounding_box["x"], bounding_box["y"], bounding_box["w"], bounding_box["h"], slot_names=slot_names ) elif "tag" in annotation: main_annotation = dt.make_tag(name, slot_names=slot_names) elif "line" in annotation: main_annotation = dt.make_line(name, annotation["line"]["path"], slot_names=slot_names) elif "keypoint" in annotation: main_annotation = dt.make_keypoint( name, annotation["keypoint"]["x"], annotation["keypoint"]["y"], slot_names=slot_names ) elif "ellipse" in annotation: main_annotation = dt.make_ellipse(name, annotation["ellipse"], slot_names=slot_names) elif "cuboid" in annotation: main_annotation = dt.make_cuboid(name, annotation["cuboid"], slot_names=slot_names) elif "skeleton" in annotation: main_annotation = dt.make_skeleton(name, annotation["skeleton"]["nodes"], slot_names=slot_names) elif "table" in annotation: main_annotation = dt.make_table( name, annotation["table"]["bounding_box"], annotation["table"]["cells"], slot_names=slot_names ) elif "string" in annotation: main_annotation = dt.make_string(name, annotation["string"]["sources"], slot_names=slot_names) elif "graph" in annotation: main_annotation = dt.make_graph( name, annotation["graph"]["nodes"], annotation["graph"]["edges"], slot_names=slot_names ) if not main_annotation: print(f"[WARNING] Unsupported annotation type: '{annotation.keys()}'") return None if "id" in annotation: main_annotation.id = annotation["id"] if "instance_id" in annotation: main_annotation.subs.append(dt.make_instance_id(annotation["instance_id"]["value"])) if "attributes" in annotation: main_annotation.subs.append(dt.make_attributes(annotation["attributes"])) if "text" in annotation: main_annotation.subs.append(dt.make_text(annotation["text"]["text"])) if "inference" in annotation: main_annotation.subs.append(dt.make_opaque_sub("inference", annotation["inference"])) if "directional_vector" in annotation: main_annotation.subs.append(dt.make_opaque_sub("directional_vector", annotation["directional_vector"])) if "measures" in annotation: main_annotation.subs.append(dt.make_opaque_sub("measures", annotation["measures"])) if "auto_annotate" in annotation: main_annotation.subs.append(dt.make_opaque_sub("auto_annotate", annotation["auto_annotate"])) if annotation.get("annotators") is not None: main_annotation.annotators = _parse_annotators(annotation["annotators"]) if annotation.get("reviewers") is not None: main_annotation.reviewers = _parse_annotators(annotation["reviewers"]) return main_annotation def _parse_darwin_video_annotation(annotation: dict) -> Optional[dt.VideoAnnotation]: name = annotation["name"] frame_annotations = {} keyframes: Dict[int, bool] = {} frames = {**annotation.get("frames", {}), **annotation.get("sections", {})} for f, frame in frames.items(): frame_annotations[int(f)] = _parse_darwin_annotation( {**frame, **{"name": name, "id": annotation.get("id", None)}} ) keyframes[int(f)] = frame.get("keyframe", False) if not frame_annotations: return None main_annotation = dt.make_video_annotation( frame_annotations, keyframes, annotation.get("ranges", annotation.get("segments", [])), annotation.get("interpolated", False), slot_names=parse_slot_names(annotation), ) if "id" in annotation: main_annotation.id = annotation["id"] if "annotators" in annotation: main_annotation.annotators = _parse_annotators(annotation["annotators"]) if annotation.get("reviewers") is not None: main_annotation.reviewers = _parse_annotators(annotation["reviewers"]) return main_annotation def _parse_annotators(annotators: List[Dict[str, Any]]) -> List[dt.AnnotationAuthor]: if not (hasattr(annotators, "full_name") or not hasattr(annotators, "email")): raise AttributeError("JSON file must contain annotators with 'full_name' and 'email' fields") return [dt.AnnotationAuthor(annotator["full_name"], annotator["email"]) for annotator in annotators]
[docs]def split_video_annotation(annotation: dt.AnnotationFile) -> List[dt.AnnotationFile]: """ Splits the given video ``AnnotationFile`` into several video ``AnnotationFile``s, one for each ``frame_url``. Parameters ---------- annotation : dt.AnnotationFile The video ``AnnotationFile`` we want to split. Returns ------- List[dt.AnnotationFile] A list with the split video ``AnnotationFile``\\s. Raises ------ AttributeError If the given ``AnnotationFile`` is not a video annotation, or if the given annotation has no ``frame_url`` attribute. """ if not annotation.is_video: raise AttributeError("this is not a video annotation") if not annotation.frame_urls: raise AttributeError("This Annotation has no frame urls") frame_annotations = [] for i, frame_url in enumerate(annotation.frame_urls): annotations = [ a.frames[i] for a in annotation.annotations if isinstance(a, dt.VideoAnnotation) and i in a.frames ] annotation_classes: Set[dt.AnnotationClass] = set([annotation.annotation_class for annotation in annotations]) filename: str = f"{Path(annotation.filename).stem}/{i:07d}.png" frame_annotations.append( dt.AnnotationFile( annotation.path, filename, annotation_classes, annotations, False, annotation.image_width, annotation.image_height, frame_url, annotation.workview_url, annotation.seq, item_id=annotation.item_id, slots=annotation.slots, ) ) return frame_annotations
[docs]def parse_slot_names(annotation: dict) -> List[str]: return annotation.get("slot_names", [])
[docs]def ispolygon(annotation: dt.AnnotationClass) -> bool: """ Returns whether or not the given ``AnnotationClass`` is a polygon. Parameters ---------- annotation : AnnotationClass The ``AnnotationClass`` to evaluate. Returns ------- ``True`` is the given ``AnnotationClass`` is a polygon, ``False`` otherwise. """ return annotation.annotation_type in ["polygon", "complex_polygon"]
[docs]def convert_polygons_to_sequences( polygons: List[Union[dt.Polygon, List[dt.Polygon]]], height: Optional[int] = None, width: Optional[int] = None, rounding: bool = True, ) -> List[List[Union[int, float]]]: """ Converts a list of polygons, encoded as a list of dictionaries of into a list of nd.arrays of coordinates. Parameters ---------- polygons : Iterable[dt.Polygon] Non empty list of coordinates in the format ``[{x: x1, y:y1}, ..., {x: xn, y:yn}]`` or a list of them as ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. height : Optional[int], default: None Maximum height for a polygon coordinate. width : Optional[int], default: None Maximum width for a polygon coordinate. rounding : bool, default: True Whether or not to round values when creating sequences. Returns ------- sequences: List[ndarray[float]] List of arrays of coordinates in the format [[x1, y1, x2, y2, ..., xn, yn], ..., [x1, y1, x2, y2, ..., xn, yn]] Raises ------ ValueError If the given list is a falsy value (such as ``[]``) or if it's structure is incorrect. """ if not polygons: raise ValueError("No polygons provided") # If there is a single polygon composing the instance then this is # transformed to polygons = [[{x: x1, y:y1}, ..., {x: xn, y:yn}]] list_polygons: List[dt.Polygon] = [] if isinstance(polygons[0], list): list_polygons = cast(List[dt.Polygon], polygons) else: list_polygons = cast(List[dt.Polygon], [polygons]) if not isinstance(list_polygons[0], list) or not isinstance(list_polygons[0][0], dict): raise ValueError("Unknown input format") sequences: List[List[Union[int, float]]] = [] for polygon in list_polygons: path: List[Union[int, float]] = [] for point in polygon: # Clip coordinates to the image size x = max(min(point["x"], width - 1) if width else point["x"], 0) y = max(min(point["y"], height - 1) if height else point["y"], 0) if rounding: path.append(round(x)) path.append(round(y)) else: path.append(x) path.append(y) sequences.append(path) return sequences
[docs]@deprecation.deprecated( deprecated_in="0.7.5", removed_in="0.8.0", current_version=__version__, details="Do not use.", ) def convert_sequences_to_polygons( sequences: List[Union[List[int], List[float]]], height: Optional[int] = None, width: Optional[int] = None ) -> Dict[str, List[dt.Polygon]]: """ Converts a list of polygons, encoded as a list of dictionaries of into a list of nd.arrays of coordinates. Parameters ---------- sequences : List[Union[List[int], List[float]]] List of arrays of coordinates in the format ``[x1, y1, x2, y2, ..., xn, yn]`` or as a list of them as ``[[x1, y1, x2, y2, ..., xn, yn], ..., [x1, y1, x2, y2, ..., xn, yn]]``. height : Optional[int], default: None Maximum height for a polygon coordinate. width : Optional[int], default: None Maximum width for a polygon coordinate. Returns ------- Dict[str, List[dt.Polygon]] Dictionary with the key ``path`` containing a list of coordinates in the format of ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. Raises ------ ValueError If sequences is a falsy value (such as ``[]``) or if it is in an incorrect format. """ if not sequences: raise ValueError("No sequences provided") # If there is a single sequences composing the instance then this is # transformed to polygons = [[x1, y1, ..., xn, yn]] if not isinstance(sequences[0], list): sequences = [sequences] if not isinstance(sequences[0][0], (int, float)): raise ValueError("Unknown input format") def grouped(iterable, n): return zip(*[iter(iterable)] * n) polygons = [] for sequence in sequences: path = [] for x, y in grouped(sequence, 2): # Clip coordinates to the image size x = max(min(x, width - 1) if width else x, 0) y = max(min(y, height - 1) if height else y, 0) path.append({"x": x, "y": y}) polygons.append(path) return {"path": polygons}
[docs]@deprecation.deprecated( deprecated_in="0.7.5", removed_in="0.8.0", current_version=__version__, details="Do not use.", ) def convert_xyxy_to_bounding_box(box: List[Union[int, float]]) -> dt.BoundingBox: """ Converts a list of xy coordinates representing a bounding box into a dictionary. Parameters ---------- box : List[Union[int, float]] List of arrays of coordinates in the format [x1, y1, x2, y2] Returns ------- BoundingBox Bounding box in the format ``{x: x1, y: y1, h: height, w: width}``. Raises ------ ValueError If ``box`` has an incorrect format. """ if not isinstance(box[0], float) and not isinstance(box[0], int): raise ValueError("Unknown input format") x1, y1, x2, y2 = box width = x2 - x1 height = y2 - y1 return {"x": x1, "y": y1, "w": width, "h": height}
[docs]@deprecation.deprecated( deprecated_in="0.7.5", removed_in="0.8.0", current_version=__version__, details="Do not use.", ) def convert_bounding_box_to_xyxy(box: dt.BoundingBox) -> List[float]: """ Converts dictionary representing a bounding box into a list of xy coordinates. Parameters ---------- box : BoundingBox Bounding box in the format ``{x: x1, y: y1, h: height, w: width}``. Returns ------- List[float] List of arrays of coordinates in the format ``[x1, y1, x2, y2]``. """ x2 = box["x"] + box["width"] y2 = box["y"] + box["height"] return [box["x"], box["y"], x2, y2]
[docs]def convert_polygons_to_mask(polygons: List, height: int, width: int, value: Optional[int] = 1) -> np.ndarray: """ Converts a list of polygons, encoded as a list of dictionaries into an ``nd.array`` mask. Parameters ---------- polygons: list List of coordinates in the format ``[{x: x1, y:y1}, ..., {x: xn, y:yn}]`` or a list of them as ``[[{x: x1, y:y1}, ..., {x: xn, y:yn}], ..., [{x: x1, y:y1}, ..., {x: xn, y:yn}]]``. height : int The maximum height for the created mask. width : int The maximum width for the created mask. value : Optional[int], default: 1 The drawing value for ``upolygon``. Returns ------- ndarray ``ndarray`` mask of the polygon(s). """ sequence = convert_polygons_to_sequences(polygons, height=height, width=width) mask = np.zeros((height, width)).astype(np.uint8) draw_polygon(mask, sequence, value) return mask
[docs]def chunk(items: List[Any], size: int) -> Iterator[Any]: """ Splits the given list into chunks of the given size and yields them. Parameters ---------- items : List[Any] The list of items to be split. size : int The size of each split. Yields ------ Iterator[Any] A chunk of the of the given size. """ for i in range(0, len(items), size): yield items[i : i + size]
[docs]def is_unix_like_os() -> bool: """ Returns ``True`` if the executing OS is Unix-based (Ubuntu or MacOS, for example) or ``False`` otherwise. Returns -------- bool True for Unix-based systems, False otherwise. """ return platform.system() != "Windows"
[docs]def has_json_content_type(response: Response) -> bool: """ Returns ``True`` if response has application/json content type or ``False`` otherwise. Returns -------- bool True for application/json content type, False otherwise. """ return "application/json" in response.headers.get("content-type", "")
[docs]def get_response_content(response: Response) -> Any: """ Returns json content if response has application/json content-type, otherwise returns text. Returns -------- Any Json or text content. """ if has_json_content_type(response): return response.json() else: return response.text
def _parse_version(data) -> dt.AnnotationFileVersion: version_string = data.get("version", "1.0") major, minor, suffix = re.findall(r"^(\d+)\.(\d+)(.*)$", version_string)[0] return dt.AnnotationFileVersion(int(major), int(minor), suffix) def _data_to_annotations(data: Dict[str, Any]) -> List[Union[dt.Annotation, dt.VideoAnnotation]]: raw_image_annotations = filter(lambda annotation: "frames" not in annotation, data["annotations"]) raw_video_annotations = filter(lambda annotation: "frames" in annotation, data["annotations"]) image_annotations: List[dt.Annotation] = list(filter(None, map(_parse_darwin_annotation, raw_image_annotations))) video_annotations: List[dt.VideoAnnotation] = list( filter(None, map(_parse_darwin_video_annotation, raw_video_annotations)) ) return [*image_annotations, *video_annotations] def _supported_schema_versions(): return {(2, 0, ""): "https://darwin-public.s3.eu-west-1.amazonaws.com/darwin_json/2.0/schema.json"} def _default_schema(version: dt.AnnotationFileVersion): return _supported_schema_versions().get((version.major, version.minor, version.suffix))