from datetime import date
from operator import itemgetter
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
from zlib import crc32
import numpy as np
import orjson as json
from upolygon import draw_polygon, rle_encode
import darwin.datatypes as dt
from darwin.utils import convert_polygons_to_sequences
DEPRECATION_MESSAGE = """
This function is going to be turned into private. This means that breaking
changes in its interface and implementation are to be expected. We encourage using ``export``
instead of calling this low-level function directly.
"""
[docs]
def export(annotation_files: Iterator[dt.AnnotationFile], output_dir: Path) -> None:
"""
Exports the given ``AnnotationFile``\\s into the coco format inside of the given ``output_dir``.
Parameters
----------
annotation_files : Iterator[dt.AnnotationFile]
The ``AnnotationFile``\\s to be exported.
output_dir : Path
The folder where the new coco file will be.
"""
output = _build_json(list(annotation_files))
output_file_path = (output_dir / "output").with_suffix(".json")
with open(output_file_path, "w") as f:
op = json.dumps(
output, option=json.OPT_INDENT_2 | json.OPT_SERIALIZE_NUMPY
).decode("utf-8")
f.write(op)
def _build_json(annotation_files: List[dt.AnnotationFile]) -> Dict[str, Any]:
categories: Dict[str, int] = _calculate_categories(annotation_files)
tag_categories: Dict[str, int] = _calculate_tag_categories(annotation_files)
return {
"info": _build_info(),
"licenses": _build_licenses(),
"images": _build_images(annotation_files, tag_categories),
"annotations": list(_build_annotations(annotation_files, categories)),
"categories": list(_build_categories(categories)),
"tag_categories": list(_build_tag_categories(tag_categories)),
}
def _calculate_categories(annotation_files: List[dt.AnnotationFile]) -> Dict[str, int]:
categories: Dict[str, int] = {}
for annotation_file in annotation_files:
for annotation_class in annotation_file.annotation_classes:
if (
annotation_class.name not in categories
and annotation_class.annotation_type
in [
"polygon",
"bounding_box",
]
):
categories[annotation_class.name] = _calculate_category_id(
annotation_class
)
return dict(sorted(categories.items(), key=itemgetter(1)))
def _calculate_tag_categories(
annotation_files: List[dt.AnnotationFile],
) -> Dict[str, int]:
categories: Dict[str, int] = {}
for annotation_file in annotation_files:
for annotation_class in annotation_file.annotation_classes:
if (
annotation_class.name not in categories
and annotation_class.annotation_type == "tag"
):
categories[annotation_class.name] = _calculate_category_id(
annotation_class
)
return dict(sorted(categories.items(), key=itemgetter(1)))
def _calculate_category_id(annotation_class: dt.AnnotationClass) -> int:
return crc32(str.encode(annotation_class.name))
def _build_info() -> Dict[str, Any]:
# TODO fill out these fields in a meaningful way
today = date.today()
return {
"description": "Exported from Darwin",
"url": "n/a",
"version": "n/a",
"year": today.year,
"contributor": "n/a",
"date_created": today.strftime("%Y/%m/%d"),
}
def _build_licenses() -> List[Dict[str, Any]]:
return [{"url": "n/a", "id": 0, "name": "placeholder license"}]
def _build_images(
annotation_files: List[dt.AnnotationFile], tag_categories: Dict[str, int]
) -> List[Dict[str, Any]]:
return [
_build_image(annotation_file, tag_categories)
for annotation_file in sorted(annotation_files, key=lambda x: x.seq)
]
def _build_image(
annotation_file: dt.AnnotationFile, tag_categories: Dict[str, int]
) -> Dict[str, Any]:
tags = [
annotation
for annotation in annotation_file.annotations
if annotation.annotation_class.annotation_type == "tag"
]
return {
"license": 0,
"file_name": annotation_file.filename,
"coco_url": "n/a",
"height": annotation_file.image_height,
"width": annotation_file.image_width,
"date_captured": "",
"flickr_url": "n/a",
"darwin_url": annotation_file.image_url,
"darwin_workview_url": annotation_file.workview_url,
"id": _build_image_id(annotation_file),
"tag_ids": [tag_categories[tag.annotation_class.name] for tag in tags],
}
def _build_image_id(annotation_file: dt.AnnotationFile) -> int:
# CoCo file format requires unique image IDs
# darwin 1.0 produces unique 'seq' values that can be used
# darwin 2.0 does not provide `seq` so we hash the path + filename to produce a unique-enough 32bit int
if annotation_file.seq:
return annotation_file.seq
else:
full_path = str(
Path(annotation_file.remote_path or "/") / Path(annotation_file.filename)
)
return crc32(str.encode(full_path))
def _build_annotations(
annotation_files: List[dt.AnnotationFile], categories: Dict[str, int]
) -> Iterator[Optional[Dict[str, Any]]]:
annotation_id = 0
for annotation_file in annotation_files:
for annotation in annotation_file.annotations:
annotation_id += 1
annotation_data = _build_annotation(
annotation_file, annotation_id, annotation, categories
)
if annotation_data:
yield annotation_data
def _build_annotation(
annotation_file: dt.AnnotationFile,
annotation_id: int,
annotation: dt.Annotation,
categories: Dict[str, int],
) -> Optional[Dict[str, Any]]:
annotation_type = annotation.annotation_class.annotation_type
if annotation_type == "polygon":
if len(annotation.data["paths"]) == 1:
sequences = convert_polygons_to_sequences(
annotation.data["paths"], rounding=False
)
x_coords = [s[0::2] for s in sequences]
y_coords = [s[1::2] for s in sequences]
min_x = np.min([np.min(x_coord) for x_coord in x_coords])
min_y = np.min([np.min(y_coord) for y_coord in y_coords])
max_x = np.max([np.max(x_coord) for x_coord in x_coords])
max_y = np.max([np.max(y_coord) for y_coord in y_coords])
w = max_x - min_x
h = max_y - min_y
# Compute the area of the polygon
poly_area = np.sum(
[
_polygon_area(x_coord, y_coord)
for x_coord, y_coord in zip(x_coords, y_coords)
]
)
return {
"id": annotation_id,
"image_id": _build_image_id(annotation_file),
"category_id": categories[annotation.annotation_class.name],
"segmentation": sequences,
"area": poly_area,
"bbox": [min_x, min_y, w, h],
"iscrowd": 0,
"extra": _build_extra(annotation),
}
elif len(annotation.data["paths"]) > 1:
mask = np.zeros((annotation_file.image_height, annotation_file.image_width))
sequences = convert_polygons_to_sequences(annotation.data["paths"])
draw_polygon(mask, sequences, 1)
counts = rle_encode(mask)
x_coords = [s[0::2] for s in sequences]
y_coords = [s[1::2] for s in sequences]
min_x = np.min([np.min(x_coord) for x_coord in x_coords])
min_y = np.min([np.min(y_coord) for y_coord in y_coords])
max_x = np.max([np.max(x_coord) for x_coord in x_coords])
max_y = np.max([np.max(y_coord) for y_coord in y_coords])
w = max_x - min_x + 1
h = max_y - min_y + 1
return {
"id": annotation_id,
"image_id": _build_image_id(annotation_file),
"category_id": categories[annotation.annotation_class.name],
"segmentation": {
"counts": counts,
"size": [annotation_file.image_height, annotation_file.image_width],
},
"area": 0,
"bbox": [min_x, min_y, w, h],
"iscrowd": 1,
"extra": _build_extra(annotation),
}
elif annotation_type == "tag":
pass
elif annotation_type == "bounding_box":
x = annotation.data["x"]
y = annotation.data["y"]
w = annotation.data["w"]
h = annotation.data["h"]
return _build_annotation(
annotation_file,
annotation_id,
dt.make_polygon(
annotation.annotation_class.name,
[
{"x": x, "y": y},
{"x": x + w, "y": y},
{"x": x + w, "y": y + h},
{"x": x, "y": y + h},
],
None,
annotation.subs,
),
categories,
)
else:
print(f"skipping unsupported annotation_type '{annotation_type}'")
def _build_extra(annotation: dt.Annotation) -> Dict[str, Any]:
data = {}
instance_id_sub = annotation.get_sub("instance_id")
attributes_sub = annotation.get_sub("attributes")
text_sub = annotation.get_sub("text")
if instance_id_sub:
data["instance_id"] = instance_id_sub.data
if attributes_sub:
data["attributes"] = attributes_sub.data
if text_sub:
data["text"] = text_sub.data
return data
def _build_categories(categories: Dict[str, int]) -> Iterator[Dict[str, Any]]:
for name, id in categories.items():
yield {"id": id, "name": name, "supercategory": "root"}
def _build_tag_categories(categories: Dict[str, int]) -> Iterator[Dict[str, Any]]:
for name, id in categories.items():
yield {"id": id, "name": name}
def _polygon_area(x: np.ndarray, y: np.ndarray) -> float:
"""
Returns the area of the input polygon, represented with two numpy arrays
for x and y coordinates.
"""
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))