Source code for darwin.importer.formats.coco

from logging import getLogger
from pathlib import Path
from typing import Dict, Iterator, List, Optional

import orjson as json
from upolygon import find_contours, rle_decode

import darwin.datatypes as dt
from darwin.path_utils import deconstruct_full_path
from darwin.utils import attempt_decode

DEPRECATION_MESSAGE = """

This function is going to be turned into private. This means that breaking 
changes in its interface and implementation are to be expected. We encourage using ``parse_annotation`` 
instead of calling this low-level function directly.
"""


logger = getLogger(__name__)


[docs] def parse_path(path: Path) -> Optional[List[dt.AnnotationFile]]: """ Parses the given ``coco`` file and returns a ``List[dt.AnnotationFile]`` with the parsed information. Parameters ---------- path : Path The ``Path`` to the ``coco`` file. Returns ------- Optional[List[dt.AnnotationFile]] Returns ``None`` if the given file is not in ``json`` format, or ``List[dt.AnnotationFile]`` otherwise. """ if path.suffix != ".json": return None data = attempt_decode(path) return list(parse_json(path, data))
[docs] def parse_json( path: Path, data: Dict[str, dt.UnknownType] ) -> Iterator[dt.AnnotationFile]: """ Parses the given ``json`` structure into an ``Iterator[dt.AnnotationFile]``. Parameters ---------- path : Path The ``Path`` where file containing the ``data`` is. data : Dict[str, Any] The ``json`` data to process. Returns ------- Iterator[dt.AnnotationFile] An iterator of all parsed annotation files. """ annotations = data["annotations"] image_lookup_table = {image["id"]: image for image in data["images"]} category_lookup_table = { category["id"]: category for category in data["categories"] } tag_categories = data.get("tag_categories") or [] tag_category_lookup_table = { category["id"]: category for category in tag_categories } image_annotations: Dict[str, dt.UnknownType] = {} for image in data["images"]: image_id = image["id"] tag_ids = image.get("tag_ids") or [] if image_id not in image_annotations: image_annotations[image_id] = [] for tag_id in tag_ids: tag = tag_category_lookup_table[tag_id] image_annotations[image_id].append(dt.make_tag(tag["name"])) for annotation in annotations: image_id = annotation["image_id"] annotation["category_id"] annotation["segmentation"] if image_id not in image_annotations: image_annotations[image_id] = [] image_annotations[image_id].extend( parse_annotation(annotation, category_lookup_table) ) for image_id in image_annotations.keys(): image = image_lookup_table[int(image_id)] annotations = list(filter(None, image_annotations[image_id])) annotation_classes = {annotation.annotation_class for annotation in annotations} remote_path, filename = deconstruct_full_path(image["file_name"]) yield dt.AnnotationFile( path, filename, annotation_classes, annotations, remote_path=remote_path )
[docs] def parse_annotation( annotation: Dict[str, dt.UnknownType], category_lookup_table: Dict[str, dt.UnknownType], ) -> List[dt.Annotation]: """ Parses the given ``json`` dictionary into a darwin ``Annotation`` if possible. Parameters ---------- annotation : Dict[str, dt.UnknownType] The ``json`` dictionary to parse. category_lookup_table : Dict[str, dt.UnknownType] Dictionary with all the categories from the ``coco`` file. Returns ------- Optional[dt.Annotation] A darwin ``Annotation`` if the parse was successful, or ``None`` otherwise. """ category = category_lookup_table[annotation["category_id"]] segmentation = annotation["segmentation"] iscrowd = annotation.get("iscrowd") == 1 if iscrowd: logger.warn( f"Skipping annotation {annotation.get('id')} because it is a crowd " "annotation, and Darwin does not support import of COCO crowd annotations." ) return [] if len(segmentation) == 0 and len(annotation["bbox"]) == 4: x, y, w, h = map(int, annotation["bbox"]) return [dt.make_bounding_box(category["name"], x, y, w, h)] elif ( len(segmentation) == 0 and len(annotation["bbox"]) == 1 and len(annotation["bbox"][0]) == 4 ): x, y, w, h = map(int, annotation["bbox"][0]) return [dt.make_bounding_box(category["name"], x, y, w, h)] elif isinstance(segmentation, dict): logger.warn( "warning, converting complex coco rle mask to polygon, could take some time" ) if isinstance(segmentation["counts"], list): mask = rle_decode(segmentation["counts"], segmentation["size"][::-1]) else: counts = decode_binary_rle(segmentation["counts"]) mask = rle_decode(counts, segmentation["size"][::-1]) _labels, external, _internal = find_contours(mask) paths = [] for external_path in external: # skip paths with less than 2 points if len(external_path) // 2 <= 2: continue path = [] points = iter(external_path) while True: try: x, y = next(points), next(points) path.append({"x": x, "y": y}) except StopIteration: break paths.append(path) return [dt.make_polygon(category["name"], paths)] elif isinstance(segmentation, list): paths = segmentation if isinstance(segmentation[0], list) else [segmentation] point_paths = [] for path in paths: point_path = [] points = iter(path) while True: try: x, y = next(points), next(points) point_path.append({"x": x, "y": y}) except StopIteration: break point_paths.append(point_path) return [dt.make_polygon(category["name"], point_paths)] else: return []
def _decode_file(current_encoding: str, path: Path): if current_encoding == "system_default": with path.open() as f: data = json.loads(f.read()) else: with path.open(encoding=current_encoding) as f: data = json.loads(f.read()) return list(parse_json(path, data))
[docs] def decode_binary_rle(data: str) -> List[int]: """ Decodes binary rle to integer list rle. """ m = len(data) counts = [0] * m h = 0 p = 0 while p < m: x = 0 k = 0 more = 1 while more > 0: c = ord(data[p]) - 48 x |= (c & 0x1F) << 5 * k more = c & 0x20 p = p + 1 k = k + 1 if more == 0 and (c & 0x10) != 0: x |= -1 << 5 * k if h > 2: x += counts[h - 2] counts[h] = x h += 1 return counts[0:h]