Source code for darwin.client

import json
import logging
import os
import zlib
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union, cast

import requests
from requests import Response
from requests.adapters import HTTPAdapter
from requests.exceptions import HTTPError
from tenacity import (
    RetryCallState,
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    stop_after_delay,
    wait_exponential_jitter,
)
from tenacity.wait import wait_exponential

from darwin.backend_v2 import BackendV2
from darwin.config import Config
from darwin.dataset.identifier import DatasetIdentifier
from darwin.dataset.remote_dataset import RemoteDataset
from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2
from darwin.datatypes import (
    AnnotatorReportGrouping,
    DarwinVersionNumber,
    Feature,
    ObjectStore,
    ReportJob,
    Team,
    UnknownType,
)
from darwin.exceptions import (
    InsufficientStorage,
    InvalidLogin,
    MissingConfig,
    NameTaken,
    NotFound,
    RequestEntitySizeExceeded,
    Unauthorized,
    ValidationError,
)
from darwin.future.core.client import ClientCore, DarwinConfig
from darwin.future.core.properties import create_property as create_property_future
from darwin.future.core.properties import (
    get_team_full_properties as get_team_full_properties_future,
)
from darwin.future.core.properties import (
    get_team_properties as get_team_properties_future,
)
from darwin.future.core.properties import update_property as update_property_future
from darwin.future.core.types.common import JSONDict
from darwin.future.data_objects.properties import FullProperty
from darwin.utils import (
    get_response_content,
    has_json_content_type,
    is_project_dir,
    urljoin,
)
from darwin.utils.get_item_count import get_item_count

INITIAL_WAIT = int(os.getenv("DARWIN_RETRY_INITIAL_WAIT", "60"))
MAX_WAIT = int(os.getenv("DARWIN_RETRY_MAX_WAIT", "300"))
MAX_RETRIES = int(os.getenv("DARWIN_RETRY_MAX_ATTEMPTS", "10"))

HOUR = 60 * 60


[docs] class JobPendingException(Exception): """Raised when a requested job is not finished or failed."""
[docs] def log_rate_limit_exceeded(retry_state: RetryCallState): wait_time = retry_state.next_action.sleep print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...")
[docs] def retry_if_status_code_429(retry_state: RetryCallState): exception = retry_state.outcome.exception() if isinstance(exception, HTTPError): response: Response = exception.response return response.status_code == 429 return False
[docs] def retry_if_status_code_429_or_5xx(retry_state: RetryCallState) -> bool: """ Determines if a request should be retried based on the response status code. Retries on: - Rate limit (429) - Server errors (500, 502, 503, 504) Parameters ---------- retry_state : RetryCallState The current state of the retry mechanism Returns ------- bool True if the request should be retried, False otherwise """ exception = retry_state.outcome.exception() if isinstance(exception, HTTPError): response: Response = exception.response return response.status_code in { 429, 500, 502, 503, 504, } return False
[docs] def log_retry_error(retry_state: RetryCallState) -> None: """ Logs information about why a request is being retried. Parameters ---------- retry_state : RetryCallState The current state of the retry mechanism """ wait_time = retry_state.next_action.sleep exception = retry_state.outcome.exception() if isinstance(exception, HTTPError): response: Response = exception.response if response.status_code == 429: print(f"Rate limit exceeded. Retrying in {wait_time:.2f} seconds...") else: print( f"Server error {response.status_code}. Retrying in {wait_time:.2f} seconds..." )
[docs] class Client: def __init__( self, config: Config, default_team: Optional[str] = None, log: Optional[Logger] = None, ): self.config: Config = config self.url: str = config.get("global/api_endpoint") self.base_url: str = config.get("global/base_url") self.default_team: str = default_team or config.get("global/default_team") self.features: Dict[str, List[Feature]] = {} self._newer_version: Optional[DarwinVersionNumber] = None self.session = requests.Session() adapter = HTTPAdapter(pool_maxsize=100) self.session.mount("https://", adapter) if log is None: self.log: Logger = logging.getLogger("darwin") else: self.log = log @staticmethod def _get_item_count(dataset_dict: Dict[str, UnknownType]) -> int: """ Returns the number of items in the dataset. Parameters ---------- dataset_dict: Dict[str, UnknownType] The dataset dictionary. Returns ------- int The number of items in the dataset. """ num_items: Optional[int] = dataset_dict.get("num_items") num_videos: Optional[int] = dataset_dict.get("num_videos") num_images: Optional[int] = dataset_dict.get("num_images") if num_items is not None: return num_items return (num_images or 0) + (num_videos or 0)
[docs] def list_local_datasets(self, team_slug: Optional[str] = None) -> Iterator[Path]: """ Returns a list of all local folders which are detected as dataset. Parameters ---------- team_slug: Optional[str] The team slug of the dataset. Defaults to None. Returns ------- Iterator[Path] Iterator of all local datasets """ team_configs: List[Team] = [] if team_slug: team_data: Optional[Team] = self.config.get_team(team_slug) if team_data: team_configs.append(team_data) else: team_configs = self.config.get_all_teams() for team_config in team_configs: projects_team: Path = Path(team_config.datasets_dir) / team_config.slug for project_path in projects_team.glob("*"): if project_path.is_dir() and is_project_dir(project_path): yield Path(project_path)
[docs] def list_remote_datasets( self, team_slug: Optional[str] = None ) -> Iterator[RemoteDatasetV2]: """ Returns a list of all available datasets with the team currently authenticated against. Parameters ---------- team_slug: Optional[str] The team slug of the dataset. Defaults to None. Returns ------- Iterator[RemoteDataset] List of all remote datasets """ response: List[Dict[str, UnknownType]] = cast( List[Dict[str, UnknownType]], self._get("/datasets/", team_slug=team_slug) ) for dataset in response: yield RemoteDatasetV2( name=dataset["name"], slug=dataset["slug"], team=team_slug or self.default_team, dataset_id=dataset["id"], item_count=get_item_count(dataset), progress=dataset["progress"], client=self, )
[docs] def get_remote_dataset( self, dataset_identifier: Union[str, DatasetIdentifier] ) -> RemoteDatasetV2: """ Get a remote dataset based on its identifier. Parameters ---------- dataset_identifier : Union[str, DatasetIdentifier] Identifier of the dataset. Can be the string version or a DatasetIdentifier object. Returns ------- RemoteDataset Initialized dataset. Raises ------- NotFound If no dataset with the given identifier was found. """ parsed_dataset_identifier: DatasetIdentifier = DatasetIdentifier.parse( dataset_identifier ) if not parsed_dataset_identifier.team_slug: parsed_dataset_identifier.team_slug = self.default_team try: matching_datasets: List[RemoteDatasetV2] = [ dataset for dataset in self.list_remote_datasets( team_slug=parsed_dataset_identifier.team_slug ) if dataset.slug == parsed_dataset_identifier.dataset_slug ] except Unauthorized: # There is a chance that we tried to access an open dataset dataset: Dict[str, UnknownType] = cast( Dict[str, UnknownType], self._get( f"{parsed_dataset_identifier.team_slug}/{parsed_dataset_identifier.dataset_slug}" ), ) # If there isn't a record of this team, create one. if not self.config.get_team( parsed_dataset_identifier.team_slug, raise_on_invalid_team=False ): datasets_dir: Path = Path.home() / ".darwin" / "datasets" self.config.set_team( team=parsed_dataset_identifier.team_slug, api_key="", datasets_dir=str(datasets_dir), ) return RemoteDatasetV2( name=dataset["name"], slug=dataset["slug"], team=parsed_dataset_identifier.team_slug, dataset_id=dataset["id"], item_count=get_item_count(dataset), progress=0, client=self, ) if not matching_datasets: raise NotFound(str(parsed_dataset_identifier)) if parsed_dataset_identifier.version: matching_datasets[0].release = parsed_dataset_identifier.version return matching_datasets[0]
[docs] def create_dataset( self, name: str, team_slug: Optional[str] = None ) -> RemoteDataset: """ Create a remote dataset. Parameters ---------- name : str Name of the dataset to create. team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- RemoteDataset The created dataset. """ dataset: Dict[str, UnknownType] = cast( Dict[str, UnknownType], self._post("/datasets", {"name": name}, team_slug=team_slug), ) return RemoteDatasetV2( name=dataset["name"], team=team_slug or self.default_team, slug=dataset["slug"], dataset_id=dataset["id"], item_count=get_item_count(dataset), progress=0, client=self, )
[docs] def archive_remote_dataset(self, dataset_id: int, team_slug: str) -> None: """ Archive (soft delete) a remote dataset. Parameters ---------- dataset_id: int Id of the dataset to archive. team_slug: str Team slug of the dataset. """ self._put(f"datasets/{dataset_id}/archive", payload={}, team_slug=team_slug)
[docs] def fetch_remote_classes( self, team_slug: Optional[str] = None ) -> List[Dict[str, UnknownType]]: """ Fetches all remote classes on the remote dataset. Parameters ---------- team_slug: Optional[str] The team slug of the dataset. Defaults to None. Returns ------- Dict[str, UnknownType] None if no information about the team is found, a List of Annotation classes otherwise. Raises ------ ValueError If no team was found. """ the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team) if not the_team: raise ValueError("No team was found.") the_team_slug: str = the_team.slug response: Dict[str, UnknownType] = cast( Dict[str, UnknownType], self._get(f"/teams/{the_team_slug}/annotation_classes?include_tags=true"), ) return response["annotation_classes"]
[docs] def update_annotation_class( self, class_id: int, payload: Dict[str, UnknownType] ) -> Dict[str, UnknownType]: """ Updates the AnnotationClass with the given id. Parameters ---------- class_id: int The id of the AnnotationClass to update. payload: Dict[str, UnknownType] A dictionary with the changes to perform. Returns ------- Dict[str, UnknownType] A dictionary with the result of the operation. """ response: Dict[str, UnknownType] = cast( Dict[str, UnknownType], self._put(f"/annotation_classes/{class_id}", payload), ) return response
[docs] def create_annotation_class( self, dataset_id: int, type_ids: List[int], name: str ) -> Dict[str, UnknownType]: """ Creates an AnnotationClass. Parameters ---------- dataset_id: int The id of the dataset this AnnotationClass will belong to originaly. type_ids: List[int] A list of type ids for the annotations this class will hold. name: str The name of the annotation class. Returns ------- Dict[str, UnknownType] A dictionary with the result of the operation. """ response: Dict[str, UnknownType] = cast( Dict[str, UnknownType], self._post( "/annotation_classes", payload={ "dataset_id": dataset_id, "name": name, "metadata": {"_color": "auto"}, "annotation_type_ids": type_ids, "datasets": [{"id": dataset_id}], }, ), ) return response
[docs] def fetch_remote_attributes(self, dataset_id: int) -> List[Dict[str, UnknownType]]: """ Fetches all attributes remotely. Parameters ---------- dataset_id: int The id of the dataset with the attributes we want to fetch. Returns ------- List[Dict[str, UnknownType]] A List with the attributes, where each attribute is a dictionary. """ response: List[Dict[str, UnknownType]] = cast( List[Dict[str, UnknownType]], self._get(f"/datasets/{dataset_id}/attributes"), ) return response
[docs] def load_feature_flags(self, team_slug: Optional[str] = None) -> None: """ Loads in memory the set of current features enabled for a team. Parameters ---------- team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. """ the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team) if not the_team: return None the_team_slug: str = the_team.slug self.features[the_team_slug] = self.get_team_features(the_team_slug)
[docs] def get_team_features(self, team_slug: str) -> List[Feature]: """ Gets all the features for the given team together with their statuses. Parameters ---------- team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- List[Feature] List of Features for the given team. """ response: List[Dict[str, UnknownType]] = cast( List[Dict[str, UnknownType]], self._get(f"/teams/{team_slug}/features") ) features: List[Feature] = [] for feature in response: features.append( Feature(name=str(feature["name"]), enabled=bool(feature["enabled"])) ) return features
[docs] def feature_enabled( self, feature_name: str, team_slug: Optional[str] = None ) -> bool: """ Returns whether or not a given feature is enabled for a team. Parameters ---------- feature_name: str The name of the feature. team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- bool False if the given feature is not enabled OR the team was not found. True otherwise. """ the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team) if not the_team: return False the_team_slug: str = the_team.slug if the_team_slug not in self.features: self.load_feature_flags(the_team_slug) team_features: List[Feature] = self.features[the_team_slug] for feature in team_features: if feature.name == feature_name: return feature.enabled return False
[docs] def get_datasets_dir(self, team_slug: Optional[str] = None) -> str: """ Gets the dataset directory of the specified team or the default one Parameters ---------- team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- str Path of the datasets for the selected team or the default one, or None if the Team was not found. Raises ------ ValueError If no team was found. """ the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team) if not the_team: raise ValueError("No team was found.") return the_team.datasets_dir
[docs] def set_datasets_dir( self, datasets_dir: Path, team_slug: Optional[str] = None ) -> None: """ Sets the dataset directory of the specified team or the default one. Parameters ---------- datasets_dir: Path Path to set as dataset directory of the team. team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. """ self.config.put( f"teams/{team_slug or self.default_team}/datasets_dir", datasets_dir )
[docs] def annotation_types(self) -> List[Dict[str, UnknownType]]: """ Returns a list of annotation types. Returns ------ List[Dict[str, UnknownType]] A list with the annotation types as dictionaries. """ response: List[Dict[str, UnknownType]] = cast( List[Dict[str, UnknownType]], self._get("/annotation_types") ) return response
[docs] def get_annotators_report( self, dataset_ids: list[int], start: datetime, stop: datetime, group_by: list[AnnotatorReportGrouping], team_slug: Optional[str] = None, ) -> Response: """ Gets annotators the report for the given dataset. Parameters ---------- dataset_ids: int Ids of the datasets to include in the report. start : datetime.datetime Timezone aware report start DateTime stop : datetime.datetime Timezone aware report end DateTime group_by: list[AnnotatorReportGrouping] Non-empty list of grouping options for the report. team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------ Response The raw response of the report (CSV format) or None if the Team was not found. Raises ------ ValueError If no team was found. If start or stop parameters are not timezone aware. If no group_by options provided. """ if start.utcoffset() is None or stop.utcoffset() is None: raise ValueError( "start and stop parameters must be timezone aware (e.g. 2024-11-04T00:00:00Z)" ) if not group_by: raise ValueError( f"At least one group_by option is required, any of: {[option.value for option in AnnotatorReportGrouping]}" ) the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team) if not the_team: raise ValueError("No team was found.") the_team_slug: str = the_team.slug response_data = self._post( f"/v3/teams/{the_team_slug}/reports/annotator/jobs", { "start": start.isoformat(timespec="seconds"), "stop": stop.isoformat(timespec="seconds"), "dataset_ids": dataset_ids, "group_by": [option.value for option in group_by], "format": "csv", "metrics": [ "active_time", "total_annotations", "total_items_annotated", "time_per_item", "time_per_annotation", "review_pass_rate", ], }, the_team_slug, ) report_job = ReportJob.model_validate(response_data) finished_report_job = self.poll_pending_report_job(the_team_slug, report_job.id) assert isinstance(finished_report_job.url, str) return self._get_raw_from_full_url(finished_report_job.url, the_team_slug)
[docs] @retry( reraise=True, wait=wait_exponential(max=MAX_WAIT), stop=stop_after_delay(2 * HOUR), retry=retry_if_exception_type(JobPendingException), ) def poll_pending_report_job(self, team_slug: str, job_id: str) -> ReportJob: job_status_url = f"/v3/teams/{team_slug}/reports/annotator/jobs/{job_id}" response_data = self._get(job_status_url, team_slug) report_job = ReportJob.model_validate(response_data) if report_job.status == "failed": raise ValueError("Building an annotator report failed, try again later.") if report_job.status != "finished": raise JobPendingException( f"Polling for generated report results timed out, job status can be requested manually: {urljoin(self.url, job_status_url)}" ) return report_job
[docs] def fetch_binary(self, url: str) -> Response: """ Fetches binary data from the given url via a stream. Parameters ---------- url: str The full url to download the binary data. Returns ------- Response ``request``'s Response object. """ response: Response = self._get_raw_from_full_url(url, stream=True) return response
[docs] @classmethod def local(cls, team_slug: Optional[str] = None) -> "Client": """ Factory method to use the default configuration file to init the client Parameters ---------- team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- Client The initialized client. """ config_path: Path = Path.home() / ".darwin" / "config.yaml" return Client.from_config(config_path, team_slug=team_slug)
[docs] @classmethod def from_config( cls, config_path: Path, team_slug: Optional[str] = None ) -> "Client": """ Factory method to create a client from the configuration file passed as parameter Parameters ---------- config_path : str Path to a configuration file to use to create the client team_slug: Optional[str] Team slug of the team the dataset will belong to. Defaults to None. Returns ------- Client The initialized client. """ if not config_path.exists(): raise MissingConfig() config = Config(config_path) return cls(config=config, default_team=team_slug)
[docs] @classmethod def from_guest(cls, datasets_dir: Optional[Path] = None) -> "Client": """ Factory method to create a client and access datasets as a guest. Parameters ---------- datasets_dir : Optional[Path] String where the client should be initialized from (aka the root path). Defaults to None. Returns ------- Client The initialized client. """ if datasets_dir is None: datasets_dir = Path.home() / ".darwin" / "datasets" config: Config = Config(path=None) config.set_global( api_endpoint=Client.default_api_url(), base_url=Client.default_base_url() ) return cls(config=config)
[docs] @classmethod def from_api_key( cls, api_key: str, datasets_dir: Optional[Path] = None ) -> "Client": """ Factory method to create a client given an API key. Parameters ---------- api_key: str API key to use to authenticate the client datasets_dir : Optional[Path] String where the client should be initialized from (aka the root path). Defaults to None. Returns ------- Client The initialized client. """ if not datasets_dir: datasets_dir = Path.home() / ".darwin" / "datasets" headers: Dict[str, str] = { "Content-Type": "application/json", "Authorization": f"ApiKey {api_key}", } api_url: str = Client.default_api_url() response: requests.Response = requests.get( urljoin(api_url, "/users/token_info"), headers=headers ) if not response.ok: raise InvalidLogin() data: Dict[str, UnknownType] = response.json() team: str = data["selected_team"]["slug"] config: Config = Config(path=None) config.set_team(team=team, api_key=api_key, datasets_dir=str(datasets_dir)) config.set_global(api_endpoint=api_url, base_url=Client.default_base_url()) return cls(config=config, default_team=team)
[docs] @staticmethod def default_api_url() -> str: """ Returns the default api url. Returns ------- str The default api url. """ return f"{Client.default_base_url()}/api/"
[docs] @staticmethod def default_base_url() -> str: """ Returns the default base url. Returns ------- str The default base url. """ return os.getenv("DARWIN_BASE_URL", "https://darwin.v7labs.com")
def _get_headers( self, team_slug: Optional[str] = None, compressed: bool = False, auth_token: Optional[bool] = False, ) -> Dict[str, str]: headers: Dict[str, str] = {"Content-Type": "application/json"} if auth_token: return headers api_key: Optional[str] = None team_config: Optional[Team] = self.config.get_team( team_slug or self.default_team, raise_on_invalid_team=False ) if team_config: api_key = team_config.api_key if api_key and len(api_key) > 0: headers["Authorization"] = f"ApiKey {api_key}" if compressed: headers["X-Darwin-Payload-Compression-Version"] = "1" from darwin.version import __version__ headers["User-Agent"] = f"darwin-py/{__version__}" return headers @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_status_code_429_or_5xx, before_sleep=log_retry_error, ) def _get_raw_from_full_url( self, url: str, team_slug: Optional[str] = None, stream: bool = False, auth_token: Optional[bool] = False, ) -> Response: response: Response = self.session.get( url, headers=self._get_headers(team_slug, auth_token=auth_token), stream=stream, ) self.log.debug( f"Client GET request response ({get_response_content(response)}) with status " f"({response.status_code}). " f"Client: ({self})" f"Request: (url={url})" ) self._raise_if_known_error(response, url) response.raise_for_status() return response def _get_raw( self, endpoint: str, team_slug: Optional[str] = None, stream: bool = False, ) -> Response: return self._get_raw_from_full_url( urljoin(self.url, endpoint), team_slug, stream=stream ) def _get( self, endpoint: str, team_slug: Optional[str] = None ) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]: response = self._get_raw(endpoint, team_slug) return self._decode_response(response) @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_status_code_429_or_5xx, before_sleep=log_retry_error, ) def _put_raw( self, endpoint: str, payload: Dict[str, UnknownType], team_slug: Optional[str] = None, ) -> Response: response: requests.Response = self.session.put( urljoin(self.url, endpoint), json=payload, headers=self._get_headers(team_slug), ) self.log.debug( f"Client PUT request got response ({get_response_content(response)}) with status " f"({response.status_code}). " f"Client: ({self})" f"Request: (endpoint={endpoint}, payload={payload})" ) self._raise_if_known_error(response, urljoin(self.url, endpoint)) response.raise_for_status() return response def _put( self, endpoint: str, payload: Dict[str, UnknownType], team_slug: Optional[str] = None, ) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]: response: Response = self._put_raw(endpoint, payload, team_slug) return self._decode_response(response) @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_status_code_429_or_5xx, before_sleep=log_retry_error, ) def _post_raw( self, endpoint: str, payload: Optional[Dict[str, UnknownType]] = None, team_slug: Optional[str] = None, ) -> Response: if payload is None: payload = {} compression_level = int( self.config.get("global/payload_compression_level", "0") ) if compression_level > 0: compressed_payload = zlib.compress( json.dumps(payload).encode("utf-8"), level=compression_level ) response: Response = requests.post( urljoin(self.url, endpoint), data=compressed_payload, headers=self._get_headers(team_slug, compressed=True), ) else: response: Response = requests.post( urljoin(self.url, endpoint), json=payload, headers=self._get_headers(team_slug), ) self.log.debug( f"Client POST request response ({get_response_content(response)}) with unexpected status " f"({response.status_code}). " f"Client: ({self})" f"Request: (endpoint={endpoint}, payload={payload})" ) self._raise_if_known_error(response, urljoin(self.url, endpoint)) response.raise_for_status() return response def _post( self, endpoint: str, payload: Optional[Dict[str, UnknownType]] = None, team_slug: Optional[str] = None, ) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]: response: Response = self._post_raw(endpoint, payload, team_slug) return self._decode_response(response) @retry( wait=wait_exponential_jitter(initial=INITIAL_WAIT, max=MAX_WAIT), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_status_code_429_or_5xx, before_sleep=log_retry_error, ) def _delete( self, endpoint: str, payload: Optional[Dict[str, UnknownType]] = None, team_slug: Optional[str] = None, ) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]: if payload is None: payload = {} response: requests.Response = self.session.delete( urljoin(self.url, endpoint), json=payload, headers=self._get_headers(team_slug), ) self.log.debug( f"Client DELETE request response ({get_response_content(response)}) with unexpected status " f"({response.status_code}). " f"Client: ({self})" f"Request: (endpoint={endpoint})" ) self._raise_if_known_error(response, urljoin(self.url, endpoint)) response.raise_for_status() return self._decode_response(response) def _raise_if_known_error(self, response: Response, url: str) -> None: if response.status_code == 401: raise Unauthorized() if response.status_code == 404: raise NotFound(url) if response.status_code == 413: raise RequestEntitySizeExceeded(url) if has_json_content_type(response): body = response.json() is_name_taken: Optional[bool] = None if isinstance(body, Dict): errors = body.get("errors") if errors and isinstance(errors, list): for error in errors: # we haven't really implemented this yet pass if errors and isinstance(errors, Dict): is_name_taken = errors.get("name") == ["has already been taken"] if response.status_code == 422: if is_name_taken: raise NameTaken raise ValidationError(body) if response.status_code == 429: error_code: Optional[str] = None try: error_code = response.json()["errors"]["code"] except Exception: pass if error_code == "INSUFFICIENT_REMAINING_STORAGE": raise InsufficientStorage() def _decode_response( self, response: requests.Response ) -> Union[Dict[str, UnknownType], List[Dict[str, UnknownType]]]: """Decode the response as JSON entry or return a dictionary with the error Parameters ---------- response: requests.Response Response to decode debug : bool Debugging flag. In this case failed requests get printed Returns ------- dict JSON decoded entry or error """ if "latest-darwin-py" in response.headers: self._handle_latest_darwin_py(response.headers["latest-darwin-py"]) try: return response.json() except ValueError: self.log.error(f"[ERROR {response.status_code}] {response.text}") response.close() return { "error": "Response is not JSON encoded", "status_code": response.status_code, "text": response.text, } def _handle_latest_darwin_py(self, server_latest_version: str) -> None: try: def parse_version(version: str) -> DarwinVersionNumber: (major, minor, patch) = version.split(".") return (int(major), int(minor), int(patch)) from darwin.version import __version__ current_version = parse_version(__version__) latest_version = parse_version(server_latest_version) if current_version >= latest_version: return self._newer_version = latest_version except Exception: pass @property def newer_darwin_version(self) -> Optional[DarwinVersionNumber]: return self._newer_version def __str__(self) -> str: return f"Client(default_team={self.default_team})" @property def api_v2(self) -> BackendV2: team = self.config.get_default_team() if not team: raise ValueError("No team was found.") return BackendV2(self, team.slug)
[docs] def get_team_properties( self, team_slug: Optional[str] = None, include_property_values: bool = True ) -> List[FullProperty]: darwin_config = DarwinConfig.from_old(self.config, team_slug) future_client = ClientCore(darwin_config) if not include_property_values: return get_team_properties_future( client=future_client, team_slug=team_slug or self.default_team, ) return get_team_full_properties_future( client=future_client, team_slug=team_slug or self.default_team, )
[docs] def create_property( self, team_slug: Optional[str], params: Union[FullProperty, JSONDict] ) -> FullProperty: darwin_config = DarwinConfig.from_old(self.config, team_slug) future_client = ClientCore(darwin_config) return create_property_future( client=future_client, team_slug=team_slug or self.default_team, params=params, )
[docs] def update_property( self, team_slug: Optional[str], params: Union[FullProperty, JSONDict] ) -> FullProperty: darwin_config = DarwinConfig.from_old(self.config, team_slug) future_client = ClientCore(darwin_config) return update_property_future( client=future_client, team_slug=team_slug or self.default_team, params=params, )
[docs] def get_external_storage( self, team_slug: Optional[str] = None, name: Optional[str] = None ) -> ObjectStore: """ Get an external storage connection by name. If no name is provided, the default team's external storage connection will be returned. Parameters ---------- team_slug: Optional[str] The team slug. name: Optional[str] The name of the external storage connection. Returns ------- Optional[ObjectStore] The external storage connection with the given name. Raises ------ ValueError If no external storage connection is found in the team. ValueError If no name is provided and the default external storage connection is read-only. ValueError If provided connection name is read-only. """ if not team_slug: team_slug = self.default_team connections = self.list_external_storage_connections(team_slug) if not connections: raise ValueError( f"No external storage connections found in the team: {team_slug}. Please configure one.\n\nGuidelines can be found here: https://docs.v7labs.com/docs/external-storage-configuration" ) # If no name is provided, return the default connection if name is None: for connection in connections: if connection.default: if connection.readonly: raise ValueError( "The default external storage connection is read-only. darwin-py only supports read-write configuration.\n\nPlease use the REST API to register items from read-only storage: https://docs.v7labs.com/docs/registering-items-from-external-storage#read-only-registration" ) return connection # If a name is provided, return the connection with the given name for connection in connections: if connection.name == name: if connection.readonly: raise ValueError( "The selected external storage connection is read-only. darwin-py only supports read-write configuraiton.\n\nPlease use the REST API to register items from read-only storage: https://docs.v7labs.com/docs/registering-items-from-external-storage#read-only-registration" ) return connection raise ValueError( f"No external storage connection found with the name: {name} in the team {team_slug}. Please configure one.\n\nGuidelines can be found at https://docs.v7labs.com/docs/external-storage-configuration" )
[docs] def list_external_storage_connections(self, team_slug: str) -> List[ObjectStore]: """ Returns a list of all available external storage connections. Parameters ---------- team_slug: str The team slug. Returns ------- List[ObjectStore] A list of all available external storage connections. """ response: List[Dict[str, UnknownType]] = cast( List[Dict[str, UnknownType]], self._get(f"/teams/{team_slug}/storage"), ) return [ ObjectStore( name=connection["name"], prefix=connection["prefix"], readonly=connection["readonly"], provider=connection["provider"], default=connection["default"], ) for connection in response ]