Source code for darwin.dataset.download_manager

"""
Holds helper functions that deal with downloading videos and images.
"""

import functools
import os
import time
import urllib
from collections import Counter
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    TYPE_CHECKING,
)

import numpy as np
import orjson as json
import requests
from PIL import Image
from rich.console import Console

import darwin.datatypes as dt
from darwin.dataset.utils import (
    sanitize_filename,
    SUPPORTED_IMAGE_EXTENSIONS,
    SUPPORTED_VIDEO_EXTENSIONS,
)
from darwin.datatypes import AnnotationFile
from darwin.exceptions import MissingDependency
from darwin.utils import (
    attempt_decode,
    get_response_content,
    has_json_content_type,
    is_file_extension_allowed,
    parse_darwin_json,
)

if TYPE_CHECKING:
    from darwin.client import Client


[docs] def download_all_images_from_annotations( client: "Client", annotations_path: Path, images_path: Path, force_replace: bool = False, remove_extra: bool = False, annotation_format: str = "json", use_folders: bool = True, video_frames: bool = False, force_slots: bool = False, ignore_slots: bool = False, ) -> Tuple[Callable[[], Iterable[Any]], int]: """ Downloads all the images corresponding to a project. Parameters ---------- api_key : str API Key of the current team annotations_path : Path Path where the annotations are located images_path : Path Path where to download the images force_replace : bool, default: False Forces the re-download of an existing image remove_extra : bool, default: False Removes local files that would not be overwritten by the release being pulled. annotation_format : str, default: "json" Format of the annotations. Currently only JSON and xml are expected use_folders : bool, default: False Recreate folders video_frames : bool, default: False Pulls video frames images instead of video files force_slots: bool, default: False Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name}) If False, all multi-slotted items and items with slots containing multiple source files will be downloaded as the deeper file structure Returns ------- generator : function Generator for doing the actual downloads count : int The files count Raises ------ ValueError If the given annotation file is not in darwin (json) or pascalvoc (xml) format. """ Path(images_path).mkdir(exist_ok=True) if annotation_format not in ["json", "xml"]: raise ValueError(f"Annotation format {annotation_format} not supported") # Verify that there is not already image in the images folder existing_images = { image for image in images_path.rglob("*") if is_file_extension_allowed(image.name) } annotations_to_download_path: List = [] for annotation_path in annotations_path.glob(f"*.{annotation_format}"): annotation = parse_darwin_json(annotation_path, count=0) if annotation is None: continue if not force_replace: # Check the planned path for the image against the existing images planned_image_paths = _get_planned_image_paths( annotation, images_path, use_folders ) if all( planned_image_path in existing_images for planned_image_path in planned_image_paths ): continue if force_slots: force_slots_for_item = True else: force_slots_for_item = len(annotation.slots) > 1 or any( len(slot.source_files) > 1 for slot in annotation.slots ) annotations_to_download_path.append((annotation_path, force_slots_for_item)) if remove_extra: annotations = ( parse_darwin_json(annotation_path) for annotation_path in annotations_path.glob(f"*.{annotation_format}") ) release_image_paths = [ _get_planned_image_paths(annotation, images_path, use_folders) for annotation in annotations ] if any(isinstance(i, list) for i in release_image_paths): release_image_paths = [ item for sublist in release_image_paths for item in sublist ] for existing_image in existing_images: if existing_image not in release_image_paths: print(f"Removing {existing_image} as it is not part of this release") existing_image.unlink() _remove_empty_directories(images_path) # Create the generator with the partial functions download_functions: List = [] for annotation_path, force_slots in annotations_to_download_path: file_download_functions = lazy_download_image_from_annotation( client, annotation_path, images_path, annotation_format, use_folders, video_frames, force_slots, ignore_slots, ) download_functions.extend(file_download_functions) if not use_folders: _check_for_duplicate_local_filepaths(download_functions) return lambda: download_functions, len(download_functions)
[docs] def lazy_download_image_from_annotation( client: "Client", annotation: AnnotationFile, images_path: Path, annotation_format: str, use_folders: bool, video_frames: bool, force_slots: bool, ignore_slots: bool = False, ) -> Iterable[Callable[[], None]]: """ Returns functions to download an image given an annotation. Same as `download_image_from_annotation` but returns Callables that trigger the download instead fetching files interally. Parameters ---------- client : Client Client of the current team annotation : AnnotationFile Annotation file corresponding to the dataset file images_path : Path Path where to download the image annotation_format : str Format of the annotations. Currently only JSON is supported use_folders : bool Recreate folder structure video_frames : bool Pulls video frames images instead of video files force_slots: bool Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name}) Raises ------ NotImplementedError If the format of the annotation is not supported. """ if annotation_format == "json": return _download_image_from_json_annotation( client, annotation, images_path, use_folders, video_frames, force_slots, ignore_slots, ) else: console = Console() console.print("[bold red]Unsupported file format. Please use 'json'.") raise NotImplementedError
def _download_image_from_json_annotation( client: "Client", annotation_path: Path, image_path: Path, use_folders: bool, video_frames: bool, force_slots: bool, ignore_slots: bool = False, ) -> Iterable[Callable[[], None]]: annotation = parse_darwin_json(annotation_path, count=0) if annotation is None: return [] # If we are using folders, extract the path for the image and create the folder if needed sub_path = annotation.remote_path if use_folders else Path("/") parent_path = Path(image_path) / Path(sub_path).relative_to(Path(sub_path).anchor) parent_path.mkdir(exist_ok=True, parents=True) annotation.slots.sort(key=lambda slot: slot.name or "0") if len(annotation.slots) > 0: if ignore_slots: return _download_single_slot_from_json_annotation( annotation, client, parent_path, annotation_path, video_frames, use_folders, ) if force_slots: return _download_all_slots_from_json_annotation( annotation, client, parent_path, video_frames ) else: return _download_single_slot_from_json_annotation( annotation, client, parent_path, annotation_path, video_frames, use_folders, ) return [] def _download_all_slots_from_json_annotation( annotation: dt.AnnotationFile, client: "Client", parent_path: Path, video_frames: bool, ) -> Iterable[Callable[[], None]]: generator = [] for slot in annotation.slots: if not slot.name: raise ValueError("Slot name is required to download all slots") slot_path = ( parent_path / sanitize_filename(annotation.filename) / sanitize_filename(slot.name) ) slot_path.mkdir(exist_ok=True, parents=True) if video_frames and slot.type != "image": video_path: Path = slot_path video_path.mkdir(exist_ok=True, parents=True) if not slot.frame_urls: segment_manifests = get_segment_manifests(slot, slot_path, client) for index, manifest in enumerate(segment_manifests): if slot.segments is None: raise ValueError("No segments found") segment_url = slot.segments[index]["url"] path = video_path / f".{index:07d}.ts" generator.append( functools.partial( _download_and_extract_video_segment, segment_url, client, path, manifest, ) ) else: for i, frame_url in enumerate(slot.frame_urls or []): path = video_path / f"{i:07d}.png" generator.append( functools.partial( _download_image, frame_url, path, client, slot ) ) else: for upload in slot.source_files: file_path = slot_path / sanitize_filename(upload.file_name) generator.append( functools.partial( _download_image_with_trace, annotation, upload.url, file_path, client, ) ) return generator def _download_single_slot_from_json_annotation( annotation: dt.AnnotationFile, client: "Client", parent_path: Path, annotation_path: Path, video_frames: bool, use_folders: bool = True, ) -> Iterable[Callable[[], None]]: slot = annotation.slots[0] generator = [] if video_frames and slot.type != "image": video_path: Path = parent_path / ( annotation_path.stem if not use_folders else Path(annotation.filename).stem ) video_path.mkdir(exist_ok=True, parents=True) # Indicates it's a long video and uses the segment and manifest if not slot.frame_urls: segment_manifests = get_segment_manifests(slot, video_path, client) for index, manifest in enumerate(segment_manifests): if slot.segments is None: raise ValueError("No segments found") segment_url = slot.segments[index]["url"] path = video_path / f".{index:07d}.ts" generator.append( functools.partial( _download_and_extract_video_segment, segment_url, client, path, manifest, ) ) else: for i, frame_url in enumerate(slot.frame_urls): path = video_path / f"{i:07d}.png" generator.append( functools.partial(_download_image, frame_url, path, client, slot) ) else: if len(slot.source_files) > 0: image = slot.source_files[0] image_url = image.url image_filename = image.file_name if image_filename.endswith(".nii.gz"): suffix = ".nii.gz" stem = annotation.filename[: -len(suffix)] else: suffix = Path(image_filename).suffix stem = Path(annotation.filename).stem filename = str(Path(stem + suffix)) image_path = parent_path / sanitize_filename( filename or annotation.filename ) generator.append( functools.partial( _download_image_with_trace, annotation, image_url, image_path, client, ) ) return generator def _update_local_path(annotation: AnnotationFile, url, local_path): if annotation.version.major == 1: return # we modify raw json, as internal representation does't store all the data raw_annotation = attempt_decode(annotation.path) for slot in raw_annotation["item"]["slots"]: for source_file in slot["source_files"]: if source_file["url"] == url: source_file["local_path"] = str(local_path) with annotation.path.open(mode="w") as file: op = json.dumps(raw_annotation, json.OPT_INDENT_2).decode("utf-8") file.write(op) def _download_image( url: str, path: Path, client: "Client", slot: Optional[dt.Slot] = None ) -> None: if path.exists(): return TIMEOUT: int = 60 start: float = time.time() transform_file_function = None if slot and slot.metadata and slot.metadata.get("colorspace") == "RG16": transform_file_function = _rg16_to_grayscale while True: if "token" in url: response: requests.Response = client._get_raw_from_full_url( url, stream=True ) else: response = client._get_raw_from_full_url(url, stream=True) # Correct status: download image if response.ok and has_json_content_type(response): # this branch is a workaround for edge case in V1 when video file from external storage could be registered # with multiple keys (so that one file consist of several other) _fetch_multiple_files(path, response, transform_file_function) return elif response.ok: _write_file(path, response, transform_file_function) return # Fatal-error status: fail if 400 <= response.status_code <= 499: raise Exception( f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}." ) # Timeout if time.time() - start > TIMEOUT: raise Exception(f"Timeout url request ({url}) after {TIMEOUT} seconds.") time.sleep(1) def _download_image_with_trace(annotation, image_url, image_path, client): _download_image(image_url, image_path, client) _update_local_path(annotation, image_url, image_path) def _fetch_multiple_files( path: Path, response: requests.Response, transform_file_function=None ) -> None: obj = response.json() if "urls" not in obj: raise Exception(f"Malformed response: {obj}") urls = obj["urls"] # remove extension from os file path, e.g /some/path/example.dcm -> /some/path/example # and create such directory dir_path = Path(path).with_suffix("") dir_path.mkdir(exist_ok=True, parents=True) for url in urls: # get filename which is last http path segment filename = urllib.parse.urlparse(url).path.rsplit("/", 1)[-1] path = dir_path / filename response = requests.get(url, stream=True) if response.ok: _write_file(path, response, transform_file_function) else: raise Exception( f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}." ) def _write_file( path: Path, response: requests.Response, transform_file_function=None ) -> None: with open(str(path), "wb") as file: for chunk in response: file.write(chunk) if transform_file_function is not None: transform_file_function(path) def _rg16_to_grayscale(path): # Custom 16bit grayscale encoded on (RG)B channels # into regular 8bit grayscale image = Image.open(path) image_2d_rgb = np.asarray(image) image_2d_r = np.uint16(image_2d_rgb[:, :, 0]) << 8 image_2d_g = np.uint16(image_2d_rgb[:, :, 1]) image_2d_gray = np.bitwise_or(image_2d_r, image_2d_g) image_2d_gray = image_2d_gray / (1 << 16) * 255 new_image = Image.fromarray(np.uint8(image_2d_gray), mode="L") new_image.save(path) def _download_and_extract_video_segment( url: str, client: "Client", path: Path, manifest: dt.SegmentManifest ) -> None: _download_video_segment_file(url, client, path) _extract_frames_from_segment(path, manifest) path.unlink() def _extract_frames_from_segment(path: Path, manifest: dt.SegmentManifest) -> None: # import cv2 here to avoid dependency on OpenCV when not needed if not installed as optional extra try: from cv2 import VideoCapture, imwrite # pylint: disable=import-outside-toplevel except ImportError as e: raise MissingDependency( "Missing Dependency: OpenCV required for Video Extraction. Install with `pip install darwin-py\[ocv]`" ) from e cap = VideoCapture(str(path)) # Read and save frames. Iterates over every frame because frame seeking in OCV is not reliable or guaranteed. frames_to_extract = { item.frame: item.visible_frame for item in manifest.items if item.visibility } frame_index = 0 while cap.isOpened(): success, frame = cap.read() if frame is None: break if not success: raise ValueError( f"Failed to read frame {frame_index} from video segment {path}" ) if frame_index in frames_to_extract: visible_frame = frames_to_extract.pop(frame_index) frame_path = path.parent / f"{visible_frame:07d}.png" imwrite(str(frame_path), frame) if not frames_to_extract: break frame_index += 1 cap.release() def _download_video_segment_file(url: str, client: "Client", path: Path) -> None: auth_token = "token" in url response = client._get_raw_from_full_url(url, stream=True, auth_token=auth_token) if not response.ok or (400 <= response.status_code <= 499): raise Exception( f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}." ) # create new filename for segment with . with open(str(path), "wb") as file: for chunk in response: file.write(chunk)
[docs] def download_manifest_txts( urls: List[str], client: "Client", folder: Path ) -> List[Path]: paths = [] for index, url in enumerate(urls): auth_token = "token" in url response = client._get_raw_from_full_url( url, stream=True, auth_token=auth_token ) if not response.ok or (400 <= response.status_code <= 499): raise Exception( f"Request to ({url}) failed. Status code: {response.status_code}, content:\n{get_response_content(response)}." ) if not response.content: raise Exception(f"Manifest file ({url}) is empty.") path = folder / f"manifest_{index + 1}.txt" with open(str(path), "wb") as file: file.write(response.content) paths.append(path) return paths
[docs] def get_segment_manifests( slot: dt.Slot, parent_path: Path, client: "Client" ) -> List[dt.SegmentManifest]: with TemporaryDirectory(dir=parent_path) as tmpdirname: tmpdir = Path(tmpdirname) if slot.frame_manifest is None: raise ValueError("No frame manifest found") frame_urls = [item["url"] for item in slot.frame_manifest] manifest_paths = download_manifest_txts(frame_urls, client, tmpdir) segment_manifests = _parse_manifests(manifest_paths, slot.name or "0") return segment_manifests
def _parse_manifests(paths: List[Path], slot: str) -> List[dt.SegmentManifest]: all_manifests: Dict[int, List[dt.ManifestItem]] = {} visible_frame_index = 0 for path in paths: with open(path) as infile: for line in infile: frame, segment_str, visibility, timestamp = line.strip("\n").split(":") segment_int = int(segment_str) if segment_int not in all_manifests: all_manifests[segment_int] = [] if bool(int(visibility)): all_manifests[segment_int].append( dt.ManifestItem( int(frame), None, segment_int, True, float(timestamp), visible_frame_index, ) ) visible_frame_index += 1 else: all_manifests[segment_int].append( dt.ManifestItem( int(frame), None, segment_int, False, float(timestamp), None ) ) # Create a list of segments, sorted by segment number and all items sorted by frame number segments = [] for segment_int, seg_manifests in all_manifests.items(): seg_manifests.sort(key=lambda x: x.frame) segments.append( dt.SegmentManifest( slot=slot, segment=segment_int, total_frames=len(seg_manifests), items=seg_manifests, ) ) # Calculate the absolute frame number for each item, as manifests are per segment absolute_frame = 0 for segment in segments: for item in segment.items: item.absolute_frame = absolute_frame absolute_frame += 1 return segments def _get_planned_image_paths( annotation: dt.AnnotationFile, images_path: Path, use_folders: bool ) -> List[Path]: """ Returns the local path that a dataset file will be downloaded to as part of a release. For multi-slotted items, returns one path for each slot. Parameters ---------- annotation : AnnotationFile Annotation file corresponding to the dataset file images_path : Path Local directory where the dataset files will be downloaded to use_folders : bool Whether to recreate the remote folder structure locally for this release """ file_paths = [] filename = Path(annotation.filename) if len(annotation.slots) == 1 and len(annotation.slots[0].source_files) == 1: if use_folders and annotation.remote_path != "/": return [images_path / Path(annotation.remote_path.lstrip("/\\")) / filename] else: return [images_path / filename] else: for slot in annotation.slots: if len(slot.source_files) > 1: # Check that the item is either a DICOM series or a frame extracted from a video is_dicom_series = all( source_file.file_name.endswith(".dcm") # type: ignore for source_file in slot.source_files ) is_extracted_frame = ( len(slot.source_files) == 2 and any( source_file.file_name.endswith(ext) # type: ignore for ext in SUPPORTED_VIDEO_EXTENSIONS for source_file in slot.source_files ) and any( source_file.file_name.endswith(ext) # type: ignore for ext in SUPPORTED_IMAGE_EXTENSIONS for source_file in slot.source_files ) ) if is_extracted_frame: # Select only the image if it's an extracted frame frame_source_file = next( source_file for source_file in slot.source_files if any( source_file.file_name.endswith(ext) # type: ignore for ext in SUPPORTED_IMAGE_EXTENSIONS ) ) slot.source_files = [frame_source_file] if not is_dicom_series and not is_extracted_frame: raise ValueError( "This slot contains data that is not a DICOM series or a frame extracted from a video" ) slot_name = Path(slot.name) for source_file in slot.source_files: file_name = source_file.file_name # type: ignore if use_folders and annotation.remote_path != "/": file_paths.append( images_path / Path(annotation.remote_path.lstrip("/\\")) / filename / slot_name / file_name ) else: file_paths.append(images_path / filename / slot_name / file_name) return file_paths def _remove_empty_directories(images_path: Path) -> bool: """ Recursively removes empty directories in the given path Parameters ---------- images_path : Path Path to remove empty subdirectories from """ entries = os.listdir(images_path) is_empty = True for entry in entries: full_path = Path(os.path.join(images_path, entry)) if os.path.isdir(full_path): if not _remove_empty_directories(full_path): is_empty = False else: if entry == ".DS_Store": os.remove(full_path) print(f"Removed file: {full_path}") else: is_empty = False # If the directory is empty files, remove it if is_empty and images_path != images_path.parent: os.rmdir(images_path) print(f"Removed empty directory: {images_path}") def _check_for_duplicate_local_filepaths( download_functions: List[Callable[[], None]], ) -> None: """ If pulling a release without folders, check for duplicate filepaths in the download functions. This can arise when pulling a flat release with identically named dataset items in different folders. If duplicates are found, display a warning message but do not block the download. Parameters ---------- download_functions : List[Callable[[], None]] A list of download functions. Each one is responsible for downloading a single file. """ image_paths_to_download = [ download_function.args[2] for download_function in download_functions ] path_counts = Counter(image_paths_to_download) duplicate_download_paths = { path: count for path, count in path_counts.items() if count > 1 } if duplicate_download_paths: console = Console() console.print( "[bold yellow]Warning: Identical filenames detected in your export release. \n\nYou are pulling a flat release with identically named dataset items.\nThe release will still be pulled, but to prevent overwriting your dataset files, please re-pull the release with the folder structure. This can be done as follows:[/bold yellow]" ) console.print( "[bold yellow]- CLI: darwin dataset pull team_slug/dataset_slug --folders\n- SDK: dataset.pull(use_folders=True)[/bold yellow]\n" ) console.print("[bold yellow]The following paths are duplicated:[/bold yellow]") for path, count in duplicate_download_paths.items(): console.print( f"[bold yellow]- {path} is duplicated {count} times[/bold yellow]" )