Source code for darwin.exporter.formats.mask

import colorsys
import math
import os
from csv import writer as csv_writer
from pathlib import Path
from typing import Dict, Iterable, List, Set, Tuple, get_args

import numpy as np

try:
    from numpy.typing import NDArray
except ImportError:
    NDArray = Any  # type:ignore # noqa F821
from PIL import Image
from upolygon import draw_polygon

import darwin.datatypes as dt
from darwin.exceptions import DarwinException
from darwin.utils import convert_polygons_to_sequences


[docs] def get_palette(mode: dt.MaskTypes.Mode, categories: List[str]) -> dt.MaskTypes.Palette: """ Returns a palette for the given mode and categories. Parameters ---------- mode: dt.MaskTypes.Mode The mode to use for the palette. categories: List[str] A list of categories to be rendered. Returns ------- dt.MaskTypes.Palette A dict of categories and their corresponding palette value. """ if mode not in get_args(dt.MaskTypes.Mode): raise ValueError(f"Unknown mode {mode}.") from DarwinException if not isinstance(categories, list) or not categories: raise ValueError(f"categories must be a non-empty list. Got {categories}.") num_categories: int = len(categories) if mode == "index": if num_categories > 254: raise ValueError("maximum number of classes supported: 254.") palette = {c: i for i, c in enumerate(categories)} if mode == "grey": if num_categories > 254: raise ValueError("maximum number of classes supported: 254.") elif num_categories == 1: raise ValueError( "only having the '__background__' class is not allowed. Please add more classes." ) palette = { c: int(i * 255 / (num_categories - 1)) for i, c in enumerate(categories) } if mode == "rgb": if num_categories > 360: raise ValueError("maximum number of classes supported: 360.") palette = {c: i for i, c in enumerate(categories)} if not palette: raise ValueError( "Failed to generate a palette.", mode, categories ) from DarwinException return palette
[docs] def get_rgb_colours( categories: dt.MaskTypes.CategoryList, ) -> Tuple[dt.MaskTypes.RgbColors, dt.MaskTypes.RgbPalette]: """ Returns a list of RGB colours and a dict of categories and their corresponding RGB palette value. Parameters ---------- categories: dt.MaskTypes.CategoryList A list of categories to be rendered. Returns ------- dt.MaskTypes.RgbColors A list of RGB colours for each category. dt.MaskTypes.RgbPalette A dict of categories and their corresponding RGB palette value. """ num_categories: int = len(categories) # Generate HSV colours for all classes except for BG SATURATION_OF_COLOUR: float = 0.8 VALUE_OF_COLOUR: float = 1.0 hsv_colours: dt.MaskTypes.HsvColors = [ (x / num_categories, SATURATION_OF_COLOUR, VALUE_OF_COLOUR) for x in range(num_categories - 1) ] rgb_colour_list: dt.MaskTypes.RgbColorList = [ [int(e * 255) for e in colorsys.hsv_to_rgb(*x)] for x in hsv_colours ] # Now we add BG class with [0 0 0] RGB value rgb_colour_list.insert(0, [0, 0, 0]) palette_rgb: dt.MaskTypes.RgbPalette = dict(zip(categories, rgb_colour_list)) rgb_colours: dt.MaskTypes.RgbColors = [c for e in rgb_colour_list for c in e] return rgb_colours, palette_rgb
[docs] def get_render_mode(annotations: List[dt.AnnotationLike]) -> dt.MaskTypes.TypeOfRender: """ Returns the type of render mode for the given annotations. Parameters ---------- annotations: List[dt.AnnotationLike] A list of annotations to be rendered. Returns ------- TypeOfRenderType A string reading either "raster" or "polygon". """ non_video_annotations: List[dt.Annotation] = [ a for a in annotations if not isinstance(a, dt.VideoAnnotation) ] if not non_video_annotations: return "polygon" list_of_types: List[str] = [ a.annotation_class.annotation_type for a in non_video_annotations ] types: Set[str] = set(list_of_types) is_raster_mask = ("mask" in types) and ("raster_layer" in types) is_polygon = "polygon" in types raster_layer_count = len([a for a in types if a == "raster_layer"]) if is_raster_mask and is_polygon: raise ValueError( "Cannot have both raster and polygon annotations in the same file" ) if is_raster_mask and raster_layer_count > 1: raise ValueError("Cannot have more than one raster layer in the same file") if is_raster_mask: return "raster" if is_polygon: return "polygon" raise ValueError( "No renderable annotations found in file, found types: " + ",".join(list_of_types) )
[docs] def rle_decode( rle: dt.MaskTypes.UndecodedRLE, label_colours: Dict[int, int] ) -> List[int]: """Decodes a run-length encoded list of integers and substitutes labels by colours. Args: rle (List[int]): A run-length encoded list of integers. Returns: List[int]: The decoded list of integers. """ if len(rle) % 2 != 0: raise ValueError("RLE must be a list of pairs of integers.") output = [] for i in range(0, len(rle), 2): output += [label_colours[rle[i]]] * rle[i + 1] return output
[docs] def get_or_generate_colour(cat_name: str, colours: dt.MaskTypes.ColoursDict) -> int: """ Returns the colour for the given category name, or generates a new one if it doesn't exist. Parameters ---------- cat_name: str The name of the category. colours: dt.MaskTypes.ColoursDict A dictionary of category names and their corresponding colours. Returns ------- int - the integer for the colour name. These will later be reassigned to a wider spread across the colour spectrum. """ if cat_name not in colours: colours[cat_name] = len(colours) + 1 return colours[cat_name]
[docs] def render_polygons( mask: NDArray, colours: dt.MaskTypes.ColoursDict, categories: dt.MaskTypes.CategoryList, annotations: List[dt.AnnotationLike], annotation_file: dt.AnnotationFile, height: int, width: int, ) -> dt.MaskTypes.RendererReturn: """ Renders the polygons in the given annotations onto the given mask. Parameters ---------- mask: NDArray The mask to render the polygons onto. colours: dt.MaskTypes.ColoursDict A dictionary of category names and their corresponding colours. categories: dt.MaskTypes.CategoryList A list of category names. annotations: List[dt.AnnotationLike] A list of annotations to be rendered. annotation_file: dt.AnnotationFile The annotation file that the annotations belong to. height: int The height of the image. width: int The width of the image. Returns ------- Tuple[List[Exception], Image, dt.MaskTypes.CategoryList, dt.MaskTypes.ColoursDict] """ errors: List[Exception] = [] filtered_annotations: List[dt.Annotation] = [ a for a in annotations if not isinstance(a, dt.VideoAnnotation) ] beyond_window = annotations_exceed_window(filtered_annotations, height, width) if beyond_window: # If the annotations exceed the window, we need to offset the mask to fit them all in. # Capture the offsets so we can shift the annotations back to their original positions later x_min, x_max, y_min, y_max = get_extents(filtered_annotations, height, width) new_height = y_max - y_min new_width = x_max - x_min mask = np.zeros((new_height, new_width), dtype=np.uint8) offset_x, offset_y = -x_min, -y_min for a in filtered_annotations: try: cat = a.annotation_class.name if cat not in categories: categories.append(cat) if a.annotation_class.annotation_type == "polygon": polygon = a.data["paths"] else: raise ValueError( f"Unknown annotation type {a.annotation_class.annotation_type}" ) if beyond_window: # Offset the polygon by the minimum x and y values to shift it to new frame of reference polygon_off = offset_polygon(polygon, offset_x, offset_y) sequence = convert_polygons_to_sequences( polygon_off, height=new_height, width=new_width ) else: sequence = convert_polygons_to_sequences( polygon, height=height, width=width ) colour_to_draw = categories.index(cat) mask = draw_polygon(mask, sequence, colour_to_draw) if cat not in colours: colours[cat] = colour_to_draw except Exception as e: errors.append(e) continue if beyond_window: # crop the mask to the original image size and in the correct offset location mask = mask[offset_y : offset_y + height, offset_x : offset_x + width] # It's not necessary to return the mask, it's modified in place, but it's more explicit return errors, mask, categories, colours
[docs] def render_raster( mask: NDArray, colours: dt.MaskTypes.ColoursDict, categories: dt.MaskTypes.CategoryList, annotations: List[dt.AnnotationLike], annotation_file: dt.AnnotationFile, height: int, width: int, ) -> dt.MaskTypes.RendererReturn: """ Renders the raster layers in the given annotations onto the given mask. Parameters ---------- mask: NDArray The mask to render the polygons onto. Not used. Only returned if no errors occur. colours: dt.MaskTypes.ColoursDict The colours list. Only returned if no errors occur. annotations: List[dt.AnnotationLike] A list of annotations to be rendered. annotation_file: dt.AnnotationFile Not used. Present for interface consistency. height: int The height of the image. width: int The width of the image. Returns ------- Tuple[List[Exception], Image, dt.MaskTypes.CategoryList, dt.MaskTypes.ColoursDict] """ errors: List[Exception] = [] raster_layer: dt.RasterLayer mask_colours: Dict[str, int] = {} label_colours: Dict[int, int] = {0: 0} for a in annotations: if isinstance(a, dt.VideoAnnotation): continue if a.annotation_class.annotation_type == "mask" and a.id: new_mask = dt.AnnotationMask( id=a.id, name=a.annotation_class.name, slot_names=a.slot_names, ) try: new_mask.validate() except Exception as e: errors.append(e) continue # Add the category to the list of categories if new_mask.name not in categories: categories.append(new_mask.name) colour_to_draw = categories.index(new_mask.name) if new_mask.id not in mask_colours: mask_colours[new_mask.id] = colour_to_draw if new_mask.name not in colours: colours[new_mask.name] = colour_to_draw raster_layer_list = [ a for a in annotations if a.annotation_class.annotation_type == "raster_layer" ] if len(raster_layer_list) == 0: errors.append( ValueError(f"File {annotation_file.filename} has no raster layer") ) return errors, mask, categories, colours if len(raster_layer_list) > 1: errors.append( ValueError( f"File {annotation_file.filename} has more than one raster layer" ) ) return errors, mask, categories, colours rl = raster_layer_list[0] if isinstance(rl, dt.VideoAnnotation): return errors, mask, categories, colours raster_layer = dt.RasterLayer( rle=rl.data["dense_rle"], slot_names=a.slot_names, mask_annotation_ids_mapping=rl.data["mask_annotation_ids_mapping"], total_pixels=rl.data["total_pixels"], ) raster_layer.validate() for uuid, label in raster_layer.mask_annotation_ids_mapping.items(): colour_to_draw = mask_colours.get(uuid) if colour_to_draw is None: errors.append( ValueError( f"Could not find mask with uuid {uuid} among masks in the file {annotation_file.filename}." ) ) return errors, mask, categories, colours label_colours[label] = colour_to_draw decoded = rle_decode(raster_layer.rle, label_colours) mask = np.array(decoded, dtype=np.uint8).reshape(height, width) return errors, mask, categories, colours
[docs] def export( annotation_files: Iterable[dt.AnnotationFile], output_dir: Path, mode: dt.MaskTypes.Mode, ) -> None: masks_dir: Path = output_dir / "masks" masks_dir.mkdir(exist_ok=True, parents=True) annotation_files = list(annotation_files) accepted_types = ["polygon", "raster_layer", "mask"] all_classes_sets: List[Set[dt.AnnotationClass]] = [ a.annotation_classes for a in annotation_files ] if len(all_classes_sets) > 0: all_classes: Set[dt.AnnotationClass] = set.union(*all_classes_sets) categories: List[str] = ["__background__"] + sorted( {c.name for c in all_classes if c.annotation_type in accepted_types}, key=lambda x: x.lower(), ) palette = get_palette(mode, categories) else: categories = ["__background__"] palette = {} colours: dt.MaskTypes.ColoursDict = {} for annotation_file in annotation_files: image_rel_path = os.path.splitext(annotation_file.full_path)[0].lstrip("/") outfile = masks_dir / f"{image_rel_path}.png" outfile.parent.mkdir(parents=True, exist_ok=True) height = annotation_file.image_height width = annotation_file.image_width if height is None or width is None: raise ValueError( f"Annotation file {annotation_file.filename} references an image with no height or width" ) mask: NDArray = np.zeros((height, width)).astype(np.uint8) annotations: List[dt.AnnotationLike] = [ a for a in annotation_file.annotations if a.annotation_class.annotation_type in accepted_types ] render_type = get_render_mode(annotations) if render_type == "raster": # Add categories to list errors, mask, categories, colours = render_raster( mask, colours, categories, annotations, annotation_file, height, width ) else: # Add categories to list errors, mask, categories, colours = render_polygons( mask, colours, categories, annotations, annotation_file, height, width ) if errors: print(f"Errors rendering {annotation_file.filename}:") for e in errors: print(e) raise DarwinException.from_multiple_exceptions(errors) # Map to palette mask = np.array( mask, dtype=np.uint8 ) # Final double check that type is using correct dtype if mode == "rgb": rgb_colours, palette_rgb = get_rgb_colours(categories) image = Image.fromarray(mask, "P") image.putpalette(rgb_colours) image = image.convert("RGB") elif mode == "grey": for value, colour in enumerate(palette.values()): mask = np.where(mask == value, colour, mask) image = Image.fromarray(mask) else: image = Image.fromarray(mask) image.save(outfile) with open(output_dir / "class_mapping.csv", "w", newline="") as f: writer = csv_writer(f) writer.writerow(["class_name", "class_color"]) for class_key in categories: if mode == "rgb": col = palette_rgb[class_key] writer.writerow([class_key, f"{col[0]} {col[1]} {col[2]}"]) else: writer.writerow([class_key, f"{palette[class_key]}"])
[docs] def annotations_exceed_window( annotations: List[dt.Annotation], height: int, width: int ) -> bool: """Check if any annotations exceed the image window Args: annotations (List[dt.Annotation]): List of annotations height (int): height of image width (int): width of image Returns: bool: True if any annotation exceeds window, false otherwise """ for item in annotations: if "bounding_box" not in item.data: continue bbox = item.data["bounding_box"] if bbox["x"] < 0: return True if bbox["y"] < 0: return True if bbox["x"] + bbox["w"] > width: return True if bbox["y"] + bbox["h"] > height: return True return False
[docs] def get_extents( annotations: List[dt.Annotation], height: int = 0, width: int = 0 ) -> Tuple[int, int, int, int]: """Create a bounding box around all annotations in discrete pixel space Args: annotations (List[dt.Annotation]): List of annotations height (int): Height to start with width (int): Width to start with Returns: Tuple[int, int, int, int]: x_min, x_max, y_min, y_max """ x_min = y_min = 0 x_max, y_max = width, height for item in annotations: if "bounding_box" not in item.data: continue bbox = item.data["bounding_box"] x_min = min(x_min, bbox["x"]) x_max = max(x_max, bbox["x"] + bbox["w"]) y_min = min(y_min, bbox["y"]) y_max = max(y_max, bbox["y"] + bbox["h"]) return math.floor(x_min), math.ceil(x_max), math.floor(y_min), math.ceil(y_max)
[docs] def offset_polygon(polygon: List, offset_x: int, offset_y: int) -> List: """Offsets a polygon by a given amount Args: polygon (List): List of coordinates offset_x (int): x offset value offset_y (int): y offset value Returns: List: polygon with offset applied """ return offset_polygon_paths(polygon, offset_x, offset_y)
[docs] def offset_polygon_paths(polygons: List, offset_x: int, offset_y: int) -> List: new_polygons = [] for polygon in polygons: new_polygons.append(offset_simple_polygon(polygon, offset_x, offset_y)) return new_polygons
[docs] def offset_simple_polygon(polygon: List, offset_x: int, offset_y: int) -> List: new_polygon = [] for point in polygon: new_polygon.append({"x": point["x"] + offset_x, "y": point["y"] + offset_y}) return new_polygon