Source code for darwin.dataset.upload_manager

from __future__ import annotations
import concurrent.futures
import os
import time
from dataclasses import dataclass
from enum import Enum
from pathlib import Path, PurePosixPath
import zipfile
import tempfile
from typing import (
    TYPE_CHECKING,
    Any,
    BinaryIO,
    Callable,
    Iterator,
    List,
    Optional,
    Set,
    Tuple,
    Dict,
)
import requests

from darwin.datatypes import PathLike, Slot, SourceFile
from darwin.doc_enum import DocEnum
from darwin.path_utils import construct_full_path
from darwin.utils import chunk
from darwin.utils.utils import is_image_extension_allowed_by_filename, SLOTS_GRID_MAP

if TYPE_CHECKING:
    from darwin.client import Client
    from darwin.dataset import RemoteDataset
    from darwin.dataset.identifier import DatasetIdentifier

from abc import ABC, abstractmethod


[docs] class ItemMergeMode(Enum): SLOTS = "slots" SERIES = "series" CHANNELS = "channels"
[docs] class ItemPayload: """ Represents an item's payload. Parameters ---------- dataset_item_id : int The id of the dataset this item belongs to. filename : str The filename of where this ``ItemPayload``'s data is. path : str The path to ``filename``. reasons : Optional[List[str]], default: None A per-slot reason to upload this ``ItemPayload``. Attributes ---------- dataset_item_id : int The id of the dataset this item belongs to. filename : str The filename of where this ``ItemPayload``'s data is. path : str The path to ``filename``. """ def __init__( self, *, dataset_item_id: int, filename: str, path: str, reasons: Optional[List[str]] = None, slots: List[Dict[str, str]], ): self.dataset_item_id = dataset_item_id self.filename = filename self.path = PurePosixPath(path).as_posix() self.slots = [ Slot( type=slot["type"], source_files=[SourceFile(file_name=slot["file_name"])], name=slot["slot_name"], upload_id=slot["upload_id"] if "upload_id" in slot else None, reason=slot["reason"] if "reason" in slot else None, ) for slot in slots ]
[docs] @staticmethod def parse_v2(payload): return ItemPayload( dataset_item_id=payload.get("id", None), filename=payload["name"], path=payload["path"], reasons=[slot.get("reason", None) for slot in payload["slots"]], slots=payload["slots"], )
@property def full_path(self) -> str: """The full ``Path`` (with filename inclduded) to the file.""" return construct_full_path(self.path, self.filename)
[docs] class UploadStage(DocEnum): """ The different stages of uploading a file. """ REQUEST_SIGNATURE = 0, "First stage, when authentication is being performed." UPLOAD_TO_S3 = 1, "Second stage, when the file is being uploaded to S3." CONFIRM_UPLOAD_COMPLETE = ( 2, "Final stage, when we confirm the file was correctly uploaded.", ) OTHER = 3, "If the stage of the upload process is unknown."
[docs] @dataclass class UploadRequestError(Exception): """ Error throw when uploading a file fails with an unrecoverable error. """ #: The ``Path`` of the file being uploaded. file_path: Path #: The ``UploadStage`` when the failure happened. stage: UploadStage #: The ``Exception`` that triggered this unrecoverable error. error: Optional[Exception] = None
[docs] class LocalFile: """ Represents a file locally stored. Parameters ---------- local_path : PathLike The ``Path`` of the file. kwargs : Any Data relative to this file. Can be anything. Attributes ---------- local_path : PathLike The ``Path`` of the file. data : Dict[str, str] Dictionary with metadata relative to this file. It has the following format: .. code-block:: python { "filename": "a_filename", "path": "a path" } - ``data["filename"]`` will hold the value passed as ``filename`` from ``kwargs`` or default to ``self.local_path.name`` - ``data["path"]`` will hold the value passed as ``path`` from ``kwargs`` or default to ``"/"`` """ def __init__( self, local_path: PathLike, **kwargs, ): self.local_path = Path(local_path) self.data = kwargs self._type_check(kwargs) def _type_check(self, args) -> None: self.data["filename"] = args.get("filename") or self.local_path.name self.data["path"] = args.get("path") or "/"
[docs] def serialize(self): return { "files": [{"file_name": self.data["filename"], "slot_name": "0"}], "name": self.data["filename"], }
[docs] def serialize_darwin_json_v2(self): optional_properties = ["tags", "fps", "as_frames", "extract_views"] slot = {"file_name": self.data["filename"], "slot_name": "0"} for optional_property in optional_properties: if optional_property in self.data: slot[optional_property] = self.data.get(optional_property) return { "slots": [slot], "name": self.data["filename"], "path": self.data["path"], }
@property def full_path(self) -> str: """The full ``Path`` (with filename inclduded) to the item.""" return construct_full_path(self.data["path"], self.data["filename"])
[docs] class MultiFileItem: def __init__( self, directory: Path, files: List[Path], merge_mode: ItemMergeMode, fps: int ): self.directory = directory self.name = directory.name self.files = [LocalFile(file, fps=fps) for file in files] self.merge_mode = merge_mode self._prepare_local_files_and_create_layout() def _prepare_local_files_and_create_layout(self): """ This function: - Ensures that the files to be uploaded are valid for the given merge mode - Creates a LayoutV3 object for `ItemMergeMode.SLOTS` & `ItemMergeMode.CHANNELS` items For `ItemMergeMode.SERIES` items: - Every slice is zipped into a single source file. This is necessary to upload individual DICOM slices as volumetric series - Layout is set to `None` because the files are zipped into a single source file Raises ------ ValueError - If no DICOM files are found in the directory for `ItemMergeMode.SERIES` items - If the number of files is greater than 16 for `ItemMergeMode.CHANNELS` items """ self.slot_names = [] if self.merge_mode == ItemMergeMode.SLOTS: num_viewports = min(len(self.files), 16) slots_grid = SLOTS_GRID_MAP[num_viewports] self.layout = { "version": 3, "slots_grid": slots_grid, } self.slot_names = [str(i) for i in range(len(self.files))] elif self.merge_mode == ItemMergeMode.SERIES: self.files = [ file for file in self.files if file.local_path.suffix.lower() == ".dcm" ] if not self.files: raise ValueError("No `.dcm` files found in 1st level of directory") self._create_series_zip() self.layout = None self.slot_names = ["0"] elif self.merge_mode == ItemMergeMode.CHANNELS: # Currently, only image files are supported in multi-channel items. This is planned to change in the future self.files = [ file for file in self.files if is_image_extension_allowed_by_filename(str(file.local_path)) ] if not self.files: raise ValueError( "No supported filetypes found in 1st level of directory. Currently, multi-channel items only support images." ) if len(self.files) > 16: raise ValueError( f"No multi-channel item can have more than 16 files. The following directory has {len(self.files)} files: {self.directory}" ) self.layout = { "version": 3, "slots_grid": [[[file.local_path.name for file in self.files]]], } self.slot_names = self.layout["slots_grid"][0][0]
[docs] def serialize_darwin_json_v2(self): optional_properties = ["fps"] slots = [] for idx, local_file in enumerate(self.files): slot = { "file_name": local_file.data["filename"], "slot_name": self.slot_names[idx], } for optional_property in optional_properties: if optional_property in local_file.data: slot[optional_property] = local_file.data.get(optional_property) slots.append(slot) return { "slots": slots, "layout": self.layout, "name": self.name, "path": "/", }
def _create_series_zip(self): """ For a given series `MultiFileItem`: - Zip all `.dcm` files into a temporary zip file - Replace all `.dcm` files with the zip file This is necessary to upload individual DICOM slices as volumetric series """ self._temp_zip_dir = tempfile.TemporaryDirectory() zip_path = Path(self._temp_zip_dir.name) / f"{self.name}.dcm" with zipfile.ZipFile(zip_path, "w") as zip_file: for local_file in self.files: zip_file.write(local_file.local_path, local_file.local_path.name) self.files = [LocalFile(zip_path)] @property def full_path(self) -> str: """The full ``Path`` (with filename included) to the item""" return "/" + self.name
[docs] class FileMonitor(object): """ Monitors the progress of a :class:``BufferedReader``. To use this monitor, you construct your :class:``BufferedReader`` as you normally would, then construct this object with it as argument. Parameters ---------- io : BinaryIO IO object used by this class. Depency injection. file_size : int The fie of the file in bytes. callback : Callable[["FileMonitor"], None] Callable function used by this class. Depency injection via constructor. Attributes ---------- io : BinaryIO IO object used by this class. Depency injection. callback : Callable[["FileMonitor"], None] Callable function used by this class. Depency injection. bytes_read : int Amount of bytes read from the IO. len : int Total size of the IO. """ def __init__( self, io: BinaryIO, file_size: int, callback: Callable[["FileMonitor"], None] ): self.io: BinaryIO = io self.callback: Callable[["FileMonitor"], None] = callback self.bytes_read: int = 0 self.len: int = file_size
[docs] def read(self, size: int = -1) -> Any: """ Reads given amount of bytes from configured IO and calls the configured callback for each block read. The callback is passed a reference this object that can be used to get current self.bytes_read. Parameters ---------- size : int, default: -1 The number of bytes to read. Defaults to -1, so all bytes until EOF are read. Returns ------- data: Any Data read from the IO. """ data: Any = self.io.read(size) self.bytes_read += len(data) self.callback(self) return data
ByteReadCallback = Callable[[Optional[str], float, float], None] ProgressCallback = Callable[[int, float], None] FileUploadCallback = Callable[[str, int, int], None]
[docs] class UploadHandler(ABC): """ Holds responsibilities for file upload management and failure into ``RemoteDataset``\\s. Parameters ---------- dataset: RemoteDataset Target ``RemoteDataset`` where we want to upload our files to. uploading_files : Union[List[LocalFile], List[MultiFileItems]] List of ``LocalFile``\\s or ``MultiFileItem``\\s to be uploaded. Attributes ---------- dataset : RemoteDataset Target ``RemoteDataset`` where we want to upload our files to. errors : List[UploadRequestError] List of errors that happened during the upload process local_files : List[LocalFile] List of ``LocalFile``\\s to be uploaded. multi_file_items : List[MultiFileItem] List of ``MultiFileItem``\\s to be uploaded. blocked_items : List[ItemPayload] List of items that were not able to be uploaded. pending_items : List[ItemPayload] List of items waiting to be uploaded. """ def __init__( self, dataset: "RemoteDataset", local_files: List[LocalFile], multi_file_items: Optional[List[MultiFileItem]] = None, handle_as_slices: Optional[bool] = False, ): self._progress: Optional[ Iterator[Callable[[Optional[ByteReadCallback]], None]] ] = None self.multi_file_items = multi_file_items self.local_files = local_files self.dataset: RemoteDataset = dataset self.errors: List[UploadRequestError] = [] self.blocked_items, self.pending_items = self._request_upload( handle_as_slices=handle_as_slices )
[docs] @staticmethod def build( dataset: "RemoteDataset", local_files: List[LocalFile], handle_as_slices: Optional[bool] = False, ): return UploadHandlerV2(dataset, local_files, handle_as_slices=handle_as_slices)
@property def client(self) -> "Client": """The ``Client`` used by this ``UploadHander``\\'s ``RemoteDataset``.""" return self.dataset.client @property def dataset_identifier(self) -> "DatasetIdentifier": """The ``DatasetIdentifier`` of this ``UploadHander``\\'s ``RemoteDataset``.""" return self.dataset.identifier @property def blocked_count(self) -> int: """Number of items that could not be uploaded successfully.""" return len(self.blocked_items) @property def error_count(self) -> int: """Number of errors that prevented items from being uploaded.""" return len(self.errors) @property def pending_count(self) -> int: """Number of items waiting to be uploaded.""" return len(self.pending_items) @property def total_count(self) -> int: """Total number of blocked and pending items.""" return self.pending_count + self.blocked_count @property def progress(self): """Current level of upload progress.""" return self._progress
[docs] def prepare_upload( self, ) -> Optional[Iterator[Callable[[Optional[ByteReadCallback]], None]]]: self._progress = self._upload_files() return self._progress
[docs] def upload( self, multi_threaded: bool = True, progress_callback: Optional[ProgressCallback] = None, file_upload_callback: Optional[FileUploadCallback] = None, max_workers: Optional[int] = None, ) -> None: if not self._progress: self.prepare_upload() if progress_callback: progress_callback(self.pending_count, 0) # needed to ensure that we don't mark a file as completed twice file_complete: Set[str] = set() def callback(file_name, file_total_bytes, file_bytes_sent): if file_upload_callback: file_upload_callback(file_name, file_total_bytes, file_bytes_sent) if progress_callback: if ( file_total_bytes == file_bytes_sent and file_name not in file_complete ): file_complete.add(file_name) progress_callback(self.pending_count, 1) if max_workers: if max_workers < 1: raise ValueError("max_workers must be greater than 0") elif max_workers > concurrent.futures.ThreadPoolExecutor()._max_workers: raise ValueError( f"max_workers must be less than or equal to {concurrent.futures.ThreadPoolExecutor()._max_workers}" ) if multi_threaded and self.progress: with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers ) as executor: future_to_progress = { executor.submit(f, callback) for f in self.progress } for future in concurrent.futures.as_completed(future_to_progress): try: future.result() except Exception as exc: print("exception", exc) elif self.progress: for file_to_upload in self.progress: file_to_upload(callback)
@abstractmethod def _request_upload( self, handle_as_slices: Optional[bool] = False ) -> Tuple[List[ItemPayload], List[ItemPayload]]: pass @abstractmethod def _upload_files(self) -> Iterator[Callable[[Optional[ByteReadCallback]], None]]: pass @abstractmethod def _upload_file( self, dataset_item_id: int, file_path: Path, byte_read_callback: Optional[ByteReadCallback], ) -> None: pass
[docs] class UploadHandlerV2(UploadHandler): def __init__( self, dataset: "RemoteDataset", local_files: List[LocalFile], multi_file_items: Optional[List[MultiFileItem]] = None, handle_as_slices: Optional[bool] = False, ): super().__init__( dataset=dataset, local_files=local_files, multi_file_items=multi_file_items, handle_as_slices=handle_as_slices, ) def _request_upload( self, handle_as_slices: Optional[bool] = False ) -> Tuple[List[ItemPayload], List[ItemPayload]]: blocked_items = [] items = [] chunk_size: int = _upload_chunk_size() single_file_items = self.local_files upload_payloads = [] if self.multi_file_items: upload_payloads.extend( [ { "items": [ file.serialize_darwin_json_v2() for file in file_chunk ], "options": { "handle_as_slices": handle_as_slices, }, } for file_chunk in chunk(self.multi_file_items, chunk_size) ] ) local_files_for_multi_file_items = [ file for multi_file_item in self.multi_file_items for file in multi_file_item.files ] single_file_items = [ file for file in single_file_items if file not in local_files_for_multi_file_items ] upload_payloads.extend( [ { "items": [file.serialize_darwin_json_v2() for file in file_chunk], "options": {"handle_as_slices": handle_as_slices}, } for file_chunk in chunk(single_file_items, chunk_size) ] ) dataset_slug: str = self.dataset_identifier.dataset_slug team_slug: Optional[str] = self.dataset_identifier.team_slug for upload_payload in upload_payloads: data: Dict[str, Any] = self.client.api_v2.register_data( dataset_slug, upload_payload, team_slug=team_slug ) blocked_items.extend( [ItemPayload.parse_v2(item) for item in data["blocked_items"]] ) items.extend([ItemPayload.parse_v2(item) for item in data["items"]]) return blocked_items, items def _upload_files(self) -> Iterator[Callable[[Optional[ByteReadCallback]], None]]: def upload_function( dataset_slug, local_path, upload_id ) -> Callable[[Optional[ByteReadCallback]], None]: return lambda byte_read_callback=None: self._upload_file( dataset_slug, local_path, upload_id, byte_read_callback ) file_lookup = {file.full_path: file for file in self.local_files} for item in self.pending_items: for slot in item.slots: upload_id = slot.upload_id slot_path = ( Path(item.path) / Path((slot.source_files[0].file_name)) ).as_posix() file = file_lookup.get(str(slot_path)) if not file: raise ValueError( f"Cannot match {slot_path} from payload with files to upload" ) yield upload_function( self.dataset.identifier.dataset_slug, file.local_path, upload_id ) def _upload_file( self, dataset_slug: str, file_path: Path, upload_id: str, byte_read_callback: Optional[ByteReadCallback], ) -> None: try: self._do_upload_file(dataset_slug, file_path, upload_id, byte_read_callback) except UploadRequestError as e: self.errors.append(e) except Exception as e: self.errors.append( UploadRequestError( file_path=file_path, stage=UploadStage.OTHER, error=e ) ) def _do_upload_file( self, dataset_slug: str, file_path: Path, upload_id: str, byte_read_callback: Optional[ByteReadCallback] = None, ) -> None: team_slug: Optional[str] = self.dataset_identifier.team_slug try: sign_response: Dict[str, Any] = self.client.api_v2.sign_upload( dataset_slug, upload_id, team_slug=team_slug ) except Exception as e: raise UploadRequestError( file_path=file_path, stage=UploadStage.REQUEST_SIGNATURE, error=e ) upload_url = sign_response["upload_url"] try: file_size = file_path.stat().st_size if byte_read_callback: byte_read_callback(str(file_path), file_size, 0) def callback(monitor): if byte_read_callback: byte_read_callback(str(file_path), file_size, monitor.bytes_read) with file_path.open("rb") as m: monitor = FileMonitor(m, file_size, callback) retries = 0 while retries < 5: upload_response = requests.put(f"{upload_url}", data=monitor) # If s3 is getting to many request it will return 503, we will sleep and retry if upload_response.status_code != 503: break time.sleep(2**retries) retries += 1 upload_response.raise_for_status() except Exception as e: raise UploadRequestError( file_path=file_path, stage=UploadStage.UPLOAD_TO_S3, error=e ) try: self.client.api_v2.confirm_upload( dataset_slug, upload_id, team_slug=team_slug ) except Exception as e: raise UploadRequestError( file_path=file_path, stage=UploadStage.CONFIRM_UPLOAD_COMPLETE, error=e )
DEFAULT_UPLOAD_CHUNK_SIZE: int = 500 def _upload_chunk_size() -> int: """ Gets the chunk size to be used from the OS environment, or uses the default one if that is not possible. The default chunk size is 500. Returns ------- int The chunk size to be used. """ env_chunk: Optional[str] = os.getenv("DARWIN_UPLOAD_CHUNK_SIZE") if env_chunk is None: return DEFAULT_UPLOAD_CHUNK_SIZE try: return int(env_chunk) except ValueError: print("Cannot cast environment variable DEFAULT_UPLOAD_CHUNK_SIZE to integer") print(f"Setting chunk size to {DEFAULT_UPLOAD_CHUNK_SIZE}") return DEFAULT_UPLOAD_CHUNK_SIZE