from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import torch
from PIL import Image as PILImage
from torch.functional import Tensor
from torchvision.transforms.functional import to_tensor
from darwin.cli_functions import _error, _load_client
from darwin.client import Client
from darwin.dataset.identifier import DatasetIdentifier
from darwin.dataset.local_dataset import LocalDataset
from darwin.torch.transforms import (
Compose,
ConvertPolygonsToInstanceMasks,
ConvertPolygonsToSemanticMask,
)
from darwin.torch.utils import clamp_bbox_to_image_size, polygon_area
from darwin.utils import convert_polygons_to_sequences
[docs]
def get_dataset(
dataset_slug: str,
dataset_type: str,
partition: Optional[str] = None,
split: str = "default",
split_type: str = "random",
transform: Optional[List] = None,
client: Optional[Client] = None,
) -> LocalDataset:
"""
Creates and returns a ``LocalDataset``.
Parameters
----------
dataset_slug : str
Slug of the dataset to retrieve.
dataset_type : str
The type of dataset ``["classification", "instance-segmentation", "object-detection", "semantic-segmentation"]``.
partition : str, default: None
Selects one of the partitions ``["train", "val", "test", None]``.
split : str, default: "default"
Selects the split that defines the percentages used.
split_type : str, default: "random"
Heuristic used to do the split ``[random, stratified]``.
transform : Optional[List], default: None
List of PyTorch transforms.
client : Optional[Client], default: None
Client to use to retrieve the dataset.
"""
dataset_functions = {
"classification": ClassificationDataset,
"instance-segmentation": InstanceSegmentationDataset,
"semantic-segmentation": SemanticSegmentationDataset,
"object-detection": ObjectDetectionDataset,
}
dataset_function = dataset_functions.get(dataset_type)
if not dataset_function:
list_of_types = ", ".join(dataset_functions.keys())
return _error(f"dataset_type needs to be one of '{list_of_types}'")
identifier = DatasetIdentifier.parse(dataset_slug)
if client is None:
client = _load_client()
for p in client.list_local_datasets(team_slug=identifier.team_slug):
if identifier.dataset_slug == p.name:
return dataset_function(
dataset_path=p,
partition=partition,
split=split,
split_type=split_type,
release_name=identifier.version,
transform=transform,
)
_error(
f"Dataset '{identifier.dataset_slug}' does not exist locally. "
f"Use 'darwin dataset remote' to see all the available datasets, "
f"and 'darwin dataset pull' to pull them."
)
[docs]
class ClassificationDataset(LocalDataset):
"""
Represents a LocalDataset used for training on classification tasks.
Attributes
----------
transform : Optional[Callable], default: None
torchvision transform function to run on the dataset.
is_multi_label : bool, default: False
Whether the dataset is multilabel or not.
Parameters
----------
transform: Optional[Union[Callable, List[Callable]]], default: None
torchvision function or list to set the ``transform`` attribute. If it is a list, it will
be composed via torchvision.
"""
def __init__(
self, transform: Optional[Union[Callable, List]] = None, **kwargs
) -> None:
super().__init__(annotation_type="tag", **kwargs)
if transform is not None and isinstance(transform, list):
transform = Compose(transform)
self.transform: Optional[Callable] = transform
self.is_multi_label = False
self.check_if_multi_label()
def __getitem__(self, index: int) -> Tuple[Tensor, Tensor]:
"""
See superclass for documentation.
Parameters
----------
index : int
The index of the image.
Returns
-------
Tuple[Tensor, Tensor]
A tuple of tensors, where the first value is the image tensor and the second is the
target's tensor.
"""
img: PILImage.Image = self.get_image(index)
if self.transform is not None:
img_tensor = self.transform(img)
else:
img_tensor = to_tensor(img)
target = self.get_target(index)
return img_tensor, target
[docs]
def get_target(self, index: int) -> Tensor:
"""
Returns the classification target.
Parameters
----------
index : int
Index of the image.
Returns
-------
Tensor
The target's tensor.
"""
data = self.parse_json(index)
annotations = data.pop("annotations")
tags = [
a.annotation_class.name
for a in annotations
if a.annotation_class.annotation_type == "tag"
]
if not self.is_multi_label:
# Binary or multiclass must have a label per image
assert len(tags) >= 1, f"No tags were found for index={index}"
target: Tensor = torch.tensor(self.classes.index(tags[0]))
else:
target = torch.zeros(len(self.classes))
# one hot encode all the targets, all zeros if the image/frame is without tag
for tag in tags:
idx = self.classes.index(tag)
target[idx] = 1
return target
[docs]
def check_if_multi_label(self) -> None:
"""
Loops over all the ``.json`` files and checks if we have more than one tag in at least one
file, if yes we assume the dataset is for multi label classification.
"""
for idx in range(len(self)):
target = self.parse_json(idx)
annotations = target.pop("annotations")
tags = [
a.annotation_class.name
for a in annotations
if a.annotation_class.annotation_type == "tag"
]
if len(tags) > 1:
self.is_multi_label = True
break
[docs]
def get_class_idx(self, index: int) -> int:
"""
Returns the ``category_id`` of the image with the given index.
Parameters
----------
index : int
Index of the image.
Returns
-------
int
``category_id`` of the image.
"""
target: Tensor = self.get_target(index)
return target["category_id"]
[docs]
def measure_weights(self) -> np.ndarray:
"""
Computes the class balancing weights (not the frequencies!!) given the train loader.
Gets the weights proportional to the inverse of their class frequencies.
The vector sums up to 1.
Returns
-------
np.ndarray[float]
Weight for each class in the train set (one for each class) as a 1D array normalized.
"""
# Collect all the labels by iterating over the whole dataset
labels = []
for i, _filename in enumerate(self.images_path):
target: Tensor = self.get_target(i)
if self.is_multi_label:
# get the indices of the class present
target = torch.where(target == 1)[0]
labels.extend(target.tolist())
else:
labels.append(target.item())
return self._compute_weights(labels)
[docs]
class InstanceSegmentationDataset(LocalDataset):
"""
Represents an instance of a LocalDataset used for training on instance segmentation tasks.
Parameters
----------
transform: Optional[Union[Callable, List[Callable]]], default: None
torchvision function or list to set the ``transform`` attribute. If it is a list, it will
be composed via torchvision.
Attributes
----------
transform : Optional[Callable], default: None
torchvision transform function to run on the dataset.
is_multi_label : bool, default: False
Whether the dataset is multilabel or not.
convert_polygons : ConvertPolygonsToInstanceMasks
Object used to convert polygons to instance masks.
"""
def __init__(self, transform: Optional[Union[Callable, List]] = None, **kwargs):
super().__init__(annotation_type="polygon", **kwargs)
if transform is not None and isinstance(transform, list):
transform = Compose(transform)
self.transform: Optional[Callable] = transform
self.convert_polygons = ConvertPolygonsToInstanceMasks()
def __getitem__(self, index: int) -> Tuple[Tensor, Dict[str, Any]]:
"""
Notes
-----
The return value is a dict with the following fields:
image_id : int
Index of the image inside the dataset
image_path: str
The path to the image on the file system
labels : tensor(n)
The class label of each one of the instances
masks : tensor(n, H, W)
Segmentation mask of each one of the instances
boxes : tensor(n, 4)
Coordinates of the bounding box enclosing the instances as [x, y, x, y]
area : float
Area in pixels of each one of the instances
"""
img: PILImage.Image = self.get_image(index)
target: Dict[str, Any] = self.get_target(index)
img, target = self.convert_polygons(img, target)
if self.transform is not None:
img_tensor, target = self.transform(img, target)
else:
img_tensor = to_tensor(img)
return img_tensor, target
[docs]
def get_target(self, index: int) -> Dict[str, Any]:
"""
Builds and returns the target dictionary for the item at the given index.
The target dictionary will have the following format:
.. code-block:: python
{
"annotations": [
{
"category_id": int,
"segmentation": List[List[int | float]],
"bbox": List[float],
"area": float
}
]
}
Parameters
----------
index : int
The actual index of the item in the ``Dataset``.
Returns
-------
Dict[str, Any]
The target.
"""
target = self.parse_json(index)
annotations = []
for annotation in target["annotations"]:
# Darwin V2 only has paths (TODO it might be more robust fixes)
if "paths" in annotation.data:
path_key = "paths"
if path_key not in annotation.data:
print(
f"Warning: missing polygon in annotation {self.annotations_path[index]}"
)
# Extract the sequences of coordinates from the polygon annotation
sequences = convert_polygons_to_sequences(
annotation.data[path_key],
height=target["height"],
width=target["width"],
rounding=False,
)
# Compute the bbox of the polygon
x_coords = [s[0::2] for s in sequences]
y_coords = [s[1::2] for s in sequences]
min_x: float = np.min([np.min(x_coord) for x_coord in x_coords])
min_y: float = np.min([np.min(y_coord) for y_coord in y_coords])
max_x: float = np.max([np.max(x_coord) for x_coord in x_coords])
max_y: float = np.max([np.max(y_coord) for y_coord in y_coords])
# Clamp the coordinates to the image dimensions
min_x: float = max(0, min_x)
min_y: float = max(0, min_y)
max_x: float = min(target["width"] - 1, max_x)
max_y: float = min(target["height"] - 1, max_y)
assert min_x < max_x and min_y < max_y
# Convert to XYWH
w: float = max_x - min_x
h: float = max_y - min_y
# Compute the area of the polygon
# TODO fix with addictive/subtractive paths in complex polygons
poly_area: float = np.sum(
[
polygon_area(x_coord, y_coord)
for x_coord, y_coord in zip(x_coords, y_coords)
]
)
# Create and append the new entry for this annotation
annotations.append(
{
"category_id": self.classes.index(annotation.annotation_class.name),
"segmentation": sequences,
"bbox": [min_x, min_y, w, h],
"area": poly_area,
}
)
target["annotations"] = annotations
return target
[docs]
def measure_weights(self) -> np.ndarray:
"""
Computes the class balancing weights (not the frequencies!!) given the train loader
Get the weights proportional to the inverse of their class frequencies.
The vector sums up to 1.
Returns
-------
class_weights : np.ndarray[float]
Weight for each class in the train set (one for each class) as a 1D array normalized.
"""
# Collect all the labels by iterating over the whole dataset
labels: List[int] = []
for i, _ in enumerate(self.images_path):
target = self.get_target(i)
labels.extend([a["category_id"] for a in target["annotations"]])
return self._compute_weights(labels)
[docs]
class SemanticSegmentationDataset(LocalDataset):
"""
Represents an instance of a LocalDataset used for training on semantic segmentation tasks.
Parameters
----------
transform : Optional[Union[List[Callable], Callable]], default: None
torchvision function or list to set the ``transform`` attribute. If it is a list, it will
be composed via torchvision.
Attributes
----------
transform : Optional[Callable], default: None
torchvision transform function(s) to run on the dataset.
convert_polygons : ConvertPolygonsToSemanticMask
Object used to convert polygons to semantic masks.
"""
def __init__(
self, transform: Optional[Union[List[Callable], Callable]] = None, **kwargs
):
super().__init__(annotation_type="polygon", **kwargs)
if "__background__" not in self.classes:
self.classes.insert(0, "__background__")
self.num_classes += 1
if transform is not None and isinstance(transform, list):
transform = Compose(transform)
self.transform: Optional[Callable] = transform
self.convert_polygons = ConvertPolygonsToSemanticMask()
def __getitem__(self, index: int) -> Tuple[Tensor, Dict[str, Any]]:
"""
See superclass for documentation
Notes
-----
The return value is a dict with the following fields:
image_id : int
Index of the image inside the dataset
image_path: str
The path to the image on the file system
mask : tensor(H, W)
Segmentation mask where each pixel encodes a class label
"""
img: PILImage.Image = self.get_image(index)
target: Dict[str, Any] = self.get_target(index)
img, target = self.convert_polygons(img, target)
if self.transform is not None:
img_tensor, target = self.transform(img, target)
else:
img_tensor = to_tensor(img)
return img_tensor, target
[docs]
def get_target(self, index: int) -> Dict[str, Any]:
"""
Builds and returns the target dictionary for the item at the given index.
The returned dictionary has the following structure:
.. code-block:: python
{
"annotations": [
{
"category_id": int,
"segmentation": List[List[float | int]]
}
]
}
Parameters
----------
index : int
The actual index of the item in the ``Dataset``.
Returns
-------
Dict[str, Any]
The target.
"""
target = self.parse_json(index)
annotations: List[Dict[str, Union[int, List[List[Union[int, float]]]]]] = []
for obj in target["annotations"]:
if "paths" in obj.data:
paths = obj.data["paths"]
else:
paths = [obj.data["path"]]
for path in paths:
sequences = convert_polygons_to_sequences(
path,
height=target["height"],
width=target["width"],
)
# Discard polygons with less than three points
sequences[:] = [s for s in sequences if len(s) >= 6]
if not sequences:
continue
annotations.append(
{
"category_id": self.classes.index(obj.annotation_class.name),
"segmentation": sequences,
}
)
target["annotations"] = annotations
return target
[docs]
def measure_weights(self) -> np.ndarray:
"""
Computes the class balancing weights (not the frequencies!!) given the train loader
Get the weights proportional to the inverse of their class frequencies.
The vector sums up to 1.
Returns
-------
class_weights : np.ndarray[float]
Weight for each class in the train set (one for each class) as a 1D array normalized.
"""
# Collect all the labels by iterating over the whole dataset
# specifically add in the background class as it won't be an annotation to include
BACKGROUND_CLASS: int = 0
labels = [BACKGROUND_CLASS]
for i, _ in enumerate(self.images_path):
target = self.get_target(i)
labels.extend([a["category_id"] for a in target["annotations"]])
return self._compute_weights(labels)
[docs]
class ObjectDetectionDataset(LocalDataset):
"""
Represents an instance of a LocalDataset used for training on object detection tasks.
Parameters
----------
transform : Optional[Union[List[Callable], Callable]], default: None
torchvision function or list to set the ``transform`` attribute. If it is a list, it will
be composed via torchvision.
Attributes
----------
transform : Optional[Callable], default: None
torchvision transform function(s) to run on the dataset.
"""
def __init__(self, transform: Optional[List] = None, **kwargs):
super().__init__(annotation_type="bounding_box", **kwargs)
if transform is not None and isinstance(transform, list):
transform = Compose(transform)
self.transform: Optional[Callable] = transform
def __getitem__(self, index: int):
"""
Notes
-----
The return value is a dict with the following fields:
image_id : int
Index of the image inside the dataset
image_path: str
The path to the image on the file system
labels : tensor(n)
The class label of each one of the instances
boxes : tensor(n, 4)
Coordinates of the bounding box enclosing the instances as [x, y, w, h] (coco format)
area : float
Area in pixels of each one of the instances
"""
img: PILImage.Image = self.get_image(index)
target: Dict[str, Any] = self.get_target(index)
width, height = img.size
target = clamp_bbox_to_image_size(target, width, height)
if self.transform is not None:
img_tensor, target = self.transform(img, target)
else:
img_tensor = to_tensor(img)
return img_tensor, target
[docs]
def get_target(self, index: int) -> Dict[str, Tensor]:
"""
Builds and returns the target dictionary for the item at the given index.
The returned dictionary has the following structure:
.. code-block:: python
{
"boxes": Tensor,
"area": Tensor,
"labels": Tensor,
"image_id": Tensor,
"iscrowd": Tensor
}
Parameters
----------
index : int
The actual index of the item in the ``Dataset``.
Returns
-------
Dict[str, Any]
The target.
"""
target = self.parse_json(index)
annotations = target.pop("annotations")
targets = []
for annotation in annotations:
bbox = (
annotation.data
if annotation.annotation_class.annotation_type == "bounding_box"
else annotation.data["bounding_box"]
)
x = bbox["x"]
y = bbox["y"]
w = bbox["w"]
h = bbox["h"]
bbox = torch.tensor([x, y, w, h])
area = bbox[2] * bbox[3]
label = torch.tensor(self.classes.index(annotation.annotation_class.name))
ann = {"bbox": bbox, "area": area, "label": label}
targets.append(ann)
# following https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html
stacked_targets = {
"boxes": torch.stack([v["bbox"] for v in targets]),
"area": torch.stack([v["area"] for v in targets]),
"labels": torch.stack([v["label"] for v in targets]),
"image_id": torch.tensor([index]),
}
stacked_targets["iscrowd"] = torch.zeros_like(stacked_targets["labels"])
return stacked_targets
[docs]
def measure_weights(self) -> np.ndarray:
"""
Computes the class balancing weights (not the frequencies!!) given the train loader
Get the weights proportional to the inverse of their class frequencies.
The vector sums up to 1.
Returns
-------
class_weights : np.ndarray[float]
Weight for each class in the train set (one for each class) as a 1D array normalized.
"""
# Collect all the labels by iterating over the whole dataset
labels = []
for i, _ in enumerate(self.images_path):
target = self.get_target(i)
labels.extend(target["labels"].tolist())
return self._compute_weights(labels)