import argparse
import concurrent.futures
import datetime
import json
import os
import sys
import traceback
from glob import glob
from itertools import tee
from pathlib import Path
from typing import Dict, Iterator, List, NoReturn, Optional, Set, Union
import humanize
from rich.console import Console
from rich.live import Live
from rich.progress import (
BarColumn,
DownloadColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from rich.table import Table
from rich.theme import Theme
from darwin.client import Client
from darwin.config import Config
from darwin.dataset import RemoteDataset
from darwin.dataset.identifier import DatasetIdentifier
from darwin.dataset.release import Release
from darwin.dataset.split_manager import split_dataset
from darwin.dataset.upload_manager import LocalFile
from darwin.dataset.utils import get_release_path
from darwin.datatypes import (
AnnotatorReportGrouping,
ExportParser,
ImportParser,
NumberLike,
PathLike,
Team,
UnknownType,
)
from darwin.exceptions import (
AnnotationFileValidationError,
IncompatibleOptions,
InvalidLogin,
MissingConfig,
MissingDependency,
MissingSchema,
NameTaken,
NotFound,
Unauthenticated,
UnknownAnnotationFileSchema,
UnrecognizableFileEncoding,
UnsupportedExportFormat,
UnsupportedFileType,
ValidationError,
)
from darwin.exporter import ExporterNotFoundError, export_annotations, get_exporter
from darwin.exporter.formats import supported_formats as export_formats
from darwin.extractor import video
from darwin.importer import ImporterNotFoundError, get_importer, import_annotations
from darwin.importer.formats import supported_formats as import_formats
from darwin.item import DatasetItem
from darwin.utils import (
BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS,
find_files,
persist_client_configuration,
prompt,
secure_continue_request,
validate_file_against_schema,
)
[docs]
def validate_api_key(api_key: str) -> None:
"""
Validates the given API key. Exits the application if it fails validation.
Parameters
----------
api_key: str
The API key to be validated.
"""
example_key = "DHMhAWr.BHucps-tKMAi6rWF1xieOpUvNe5WzrHP"
if len(api_key) != 40:
_error(f"Expected key to be 40 characters long\n(example: {example_key})")
if "." not in api_key:
_error(f"Expected key formatted as prefix . suffix\n(example: {example_key})")
if len(api_key.split(".")[0]) != 7:
_error(f"Expected key prefix to be 7 characters long\n(example: {example_key})")
[docs]
def authenticate(
api_key: str,
default_team: Optional[Union[str, bool]] = None,
datasets_dir: Optional[Path] = None,
) -> Config:
"""
Authenticate the API key against the server and creates a configuration file for it.
Parameters
----------
api_key : str
API key to use for the client login.
default_team: Optional[bool]
Flag to make the team the default one. Defaults to None.
datasets_dir: Optional[Path]
Dataset directory on the file system. Defaults to None.
Returns
-------
Config
A configuration object to handle YAML files.
"""
# Resolve the home folder if the dataset_dir starts with ~ or ~user
validate_api_key(api_key)
try:
client = Client.from_api_key(api_key=api_key)
config_path = Path.home() / ".darwin" / "config.yaml"
config_path.parent.mkdir(exist_ok=True)
if default_team is None:
default_team_name = (
client.default_team
if input(f"Make {client.default_team} the default team? [y/N] ")
in ["Y", "y"]
else None
)
elif default_team is False:
default_team_name = None
elif default_team is True:
default_team_name = client.default_team
else:
default_team_name = default_team
if datasets_dir is None:
datasets_dir = Path(prompt("Datasets directory", "~/.darwin/datasets"))
datasets_dir = Path(datasets_dir).expanduser()
Path(datasets_dir).mkdir(parents=True, exist_ok=True)
client.set_datasets_dir(datasets_dir)
return persist_client_configuration(client, default_team=default_team_name)
except InvalidLogin:
_error("Invalid API key")
[docs]
def current_team() -> None:
"""Print the team currently authenticated against."""
client: Client = _load_client()
print(client.default_team)
[docs]
def list_teams() -> None:
"""Print a table of teams to which the client belong to."""
for team in _config().get_all_teams():
if team.default:
print(f"{team.slug} (default)")
else:
print(team.slug)
[docs]
def set_team(team_slug: str) -> None:
"""
Switches the client to the selected team and persist the change on the configuration file.
Parameters
----------
team_slug : str
Slug of the team to switch to.
"""
config = _config()
config.set_default_team(team_slug)
[docs]
def set_compression_level(compression_level: int) -> None:
"""
Change the compression level of text/json contents sent to Darwin APIs and persist the change on the configuration file.
Can be in range from 0 - no compression, to 9 - best compression. By default, 0 is used.
Parameters
----------
compression_level : int
Compression level to use.
"""
config = _config()
config.set_compression_level(compression_level)
[docs]
def create_dataset(dataset_slug: str) -> None:
"""
Creates a dataset remotely. Exits the application if the dataset's name is already taken or is
not valid.
Parameters
----------
dataset_slug : str
Slug of the new dataset.
"""
identifier: DatasetIdentifier = DatasetIdentifier.parse(dataset_slug)
client: Client = _load_client(team_slug=identifier.team_slug)
try:
dataset: RemoteDataset = client.create_dataset(name=identifier.dataset_slug)
print(
f"Dataset '{dataset.name}' ({dataset.team}/{dataset.slug}) has been created.\nAccess at {dataset.remote_path}"
)
print_new_version_info(client)
except NameTaken:
_error(f"Dataset name '{identifier.dataset_slug}' is already taken.")
except ValidationError:
_error(f"Dataset name '{identifier.dataset_slug}' is not valid.")
[docs]
def local(team: Optional[str] = None) -> None:
"""
Lists synced datasets, stored in the specified path.
Parameters
----------
team: Optional[str]
The name of the team to list, or the default one if no team is given. Defaults to None.
"""
table: Table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Image Count", justify="right")
table.add_column("Sync Date", justify="right")
table.add_column("Size", justify="right")
client: Client = _load_client()
for dataset_path in client.list_local_datasets(team_slug=team):
files_in_dataset_path = find_files([dataset_path])
table.add_row(
f"{dataset_path.parent.name}/{dataset_path.name}",
str(len(files_in_dataset_path)),
humanize.naturaldate(
datetime.datetime.fromtimestamp(dataset_path.stat().st_mtime)
),
humanize.naturalsize(sum(p.stat().st_size for p in files_in_dataset_path)),
)
Console().print(table)
[docs]
def path(dataset_slug: str) -> Path:
"""
Returns the absolute path of the specified dataset.
Exits the application if the dataset does not exist locally.
Parameters
----------
dataset_slug: str
The dataset's slug.
Returns
-------
Path
The absolute path of the dataset.
"""
identifier: DatasetIdentifier = DatasetIdentifier.parse(dataset_slug)
client: Client = _load_client()
for path in client.list_local_datasets(team_slug=identifier.team_slug):
if identifier.dataset_slug == path.name:
return path
_error(
f"Dataset '{identifier.dataset_slug}' does not exist locally. "
f"Use 'darwin dataset remote' to see all the available datasets, "
f"and 'darwin dataset pull' to pull them."
)
[docs]
def url(dataset_slug: str) -> None:
"""
Prints the url of the specified dataset.
Exits the application if no dataset was found.
Parameters
----------
dataset_slug: str
The dataset's slug.
"""
client: Client = _load_client()
try:
remote_dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
print(remote_dataset.remote_path)
except NotFound as e:
_error(f"Dataset '{e.name}' does not exist.")
[docs]
def export_dataset(
dataset_slug: str,
include_url_token: bool,
name: str,
annotation_class_ids: Optional[List[str]] = None,
include_authorship: bool = False,
version: Optional[str] = None,
) -> None:
"""
Create a new release for the dataset.
Parameters
----------
dataset_slug : str
Slug of the dataset to which we perform the operation on.
include_url_token : bool, default: False
If ``True`` includes the url token, if ``False`` does not.
name : str
Name of the release.
annotation_class_ids : Optional[List[str]], default: None
List of the classes to filter.
include_authorship : bool, default: False
If ``True`` include annotator and reviewer metadata for each annotation.
version : Optional[str], default: None
When used for V2 dataset, allows to force generation of either Darwin JSON 1.0 (Legacy) or newer 2.0.
Ommit this option to get your team's default.
"""
client: Client = _load_client()
identifier: DatasetIdentifier = DatasetIdentifier.parse(dataset_slug)
ds: RemoteDataset = client.get_remote_dataset(identifier)
try:
ds.export(
annotation_class_ids=annotation_class_ids,
name=name,
include_url_token=include_url_token,
include_authorship=include_authorship,
version=version,
)
except ValidationError:
_error("Nothing to export")
else:
identifier.version = name
print(f"Dataset {dataset_slug} successfully exported to {identifier}")
print_new_version_info(client)
[docs]
def pull_dataset(
dataset_slug: str,
only_annotations: bool = False,
folders: bool = True,
video_frames: bool = False,
force_slots: bool = False,
ignore_slots: bool = False,
no_folders: bool = False,
retry: bool = False,
retry_timeout: int = 600,
retry_interval: int = 10,
) -> None:
"""
Downloads a remote dataset (images and annotations) in the datasets directory.
Exits the application if dataset is not found, the user is not authenticated, there are no
releases or the export format for the latest release is not supported.
Parameters
----------
dataset_slug: str
Slug of the dataset to which we perform the operation on.
only_annotations: bool
Download only the annotations and no corresponding images. Defaults to False.
folders: bool
Recreates the folders in the dataset. Defaults to True.
video_frames: bool
Pulls video frames images instead of video files. Defaults to False.
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
no_folders: bool
Does not recreate the folders in the dataset. Defaults to False.
retry: bool
If True, will repeatedly try to download the release if it is still processing until the timeout is reached.
retry_timeout: int
If retrying, total time to wait for the release to be ready for download
retry_interval: int
If retrying, time to wait between retries of checking if the release is ready for download.
"""
version: str = DatasetIdentifier.parse(dataset_slug).version or "latest"
client: Client = _load_client(maybe_guest=True)
try:
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
except NotFound:
_error(
f"Dataset '{dataset_slug}' does not exist, please check the spelling. "
"Use 'darwin remote' to list all the remote datasets."
)
except Unauthenticated:
_error("please re-authenticate")
if no_folders:
folders = False
try:
release: Release = dataset.get_release(version, retry)
dataset.pull(
release=release,
only_annotations=only_annotations,
use_folders=folders,
video_frames=video_frames,
force_slots=force_slots,
ignore_slots=ignore_slots,
retry=retry,
retry_timeout=retry_timeout,
retry_interval=retry_interval,
)
print_new_version_info(client)
except NotFound:
_error(
f"Version '{dataset.identifier}:{version}' does not exist. "
f"Use 'darwin dataset releases' to list all available versions."
)
except UnsupportedExportFormat as uef:
_error(
f"Version '{dataset.identifier}:{version}' is of format '{uef.format}', "
f"only the darwin formats ('json', 'darwin_json_2') are supported for `darwin dataset pull`"
)
except MissingDependency as e:
_error(str(e))
print(f"Dataset {release.identifier} downloaded at {dataset.local_path} .")
[docs]
def split(
dataset_slug: str, val_percentage: float, test_percentage: float, seed: int = 0
) -> None:
"""
Splits a local version of a dataset into train, validation, and test partitions.
Parameters
----------
dataset_slug: str
Slug of the dataset to which we perform the operation on.
val_percentage: float
Percentage in the validation set.
test_percentage: float
Percentage in the test set.
seed: int
Random seed. Defaults to 0.
"""
identifier: DatasetIdentifier = DatasetIdentifier.parse(dataset_slug)
client: Client = _load_client()
for p in client.list_local_datasets(team_slug=identifier.team_slug):
if identifier.dataset_slug == p.name:
try:
split_path = split_dataset(
dataset_path=p,
release_name=identifier.version,
val_percentage=val_percentage,
test_percentage=test_percentage,
split_seed=seed,
)
print(f"Partition lists saved at {split_path}")
return
except ImportError as e:
_error(e.msg)
except NotFound as e:
_error(e.name)
except ValueError as e:
_error(e.args[0])
_error(
f"Dataset '{identifier.dataset_slug}' does not exist locally. "
f"Use 'darwin dataset remote' to see all the available datasets, "
f"and 'darwin dataset pull' to pull them."
)
[docs]
def list_remote_datasets(all_teams: bool, team: Optional[str] = None) -> None:
"""
Lists remote datasets with its annotation progress.
Parameters
----------
all_teams: bool
If True, lists remote datasets from all teams, if False, lists only datasets from the given
Team.
team: Optional[str]
Name of the team with the datasets we want to see. Uses the default Team is non is given.
Defaults to None.
"""
# TODO: add listing open datasets
table: Table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Item Count", justify="right")
table.add_column("Complete Items", justify="right")
datasets: List[RemoteDataset] = []
client: Optional[Client] = None
if all_teams:
teams: List[Team] = _config().get_all_teams()
for a_team in teams:
client = _load_client(a_team.slug)
datasets += list(client.list_remote_datasets())
else:
client = _load_client(team)
datasets = list(client.list_remote_datasets())
for dataset in datasets:
table.add_row(
f"{dataset.team}/{dataset.slug}",
str(dataset.item_count),
f"{dataset.progress * 100:.1f}%",
)
if table.row_count == 0:
print("No dataset available.")
else:
Console().print(table)
print_new_version_info(client)
[docs]
def remove_remote_dataset(dataset_slug: str) -> None:
"""
Remove a remote dataset from the workview. The dataset gets archived.
Exits the application if no dataset with the given slug were found.
Parameters
----------
dataset_slug: str
The dataset's slug.
"""
client: Client = _load_client()
try:
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
print(f"About to delete {dataset.identifier} on darwin.")
if not secure_continue_request():
print("Cancelled.")
return
dataset.remove_remote()
print_new_version_info(client)
except NotFound:
_error(f"No dataset with name '{dataset_slug}'")
[docs]
def dataset_list_releases(dataset_slug: str) -> None:
"""
Lists all the releases from the given dataset.
Exits the application if no dataset with the given slug were found.
Parameters
----------
dataset_slug: str
The dataset's slug.
"""
client: Client = _load_client()
try:
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
releases: List[Release] = dataset.get_releases()
if len(releases) == 0:
print("No available releases, export one first.")
return
table: Table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name")
table.add_column("Item Count", justify="right")
table.add_column("Class Count", justify="right")
table.add_column("Export Date", justify="right")
for release in releases:
if not release.available:
continue
table.add_row(
str(release.identifier),
str(release.image_count),
str(release.class_count),
str(release.export_date),
)
Console().print(table)
print_new_version_info(client)
except NotFound:
_error(f"No dataset with name '{dataset_slug}'")
[docs]
def upload_data(
dataset_identifier: str,
files: Optional[List[Union[PathLike, LocalFile]]],
files_to_exclude: Optional[List[PathLike]],
fps: int,
path: Optional[str],
frames: bool,
extract_views: bool = False,
handle_as_slices: bool = False,
preserve_folders: bool = False,
verbose: bool = False,
item_merge_mode: Optional[str] = None,
) -> None:
"""
Uploads the provided files to the remote dataset.
Exits the application if no dataset with the given name is found, the files in the given path
have unsupported formats, or if there are no files found in the given Path.
Parameters
----------
dataset_identifier : str
Slug of the dataset to retrieve.
files : List[Union[PathLike, LocalFile]]
List of files to upload. Can be None.
files_to_exclude : List[PathLike]
List of files to exclude from the file scan (which is done only if files is None).
fps : int
Frame rate to split videos in.
path : Optional[str]
If provided; files will be placed under this path in the v7 platform. If `preserve_folders`
is `True` then it must be possible to draw a relative path from this folder to the one the
files are in, otherwise an error will be raised.
frames : bool
Specify whether the files will be uploaded as a list of frames or not.
extract_views : bool
If providing a volume, specify whether to extract the orthogonal views or not.
handle_as_slices : bool
Whether to upload DICOM files as slices
preserve_folders : bool
Specify whether or not to preserve folder paths when uploading.
verbose : bool
Specify whether to have full traces print when uploading files or not.
item_merge_mode : Optional[str]
If set, each file path passed to `files_to_upload` behaves as follows:
- Paths pointing directly to individual files are ignored
- Paths pointing to folders of files will be uploaded according to the following mode rules.
Note that folders will not be recursively searched, so only files in the first level of the folder will be uploaded:
- "slots": Each file in the folder will be uploaded to a different slot of the same item.
- "series": All `.dcm` files in the folder will be concatenated into a single slot. All other files are ignored.
- "channels": Each file in the folder will be uploaded to a different channel of the same item.
"""
client: Client = _load_client()
try:
max_workers: int = concurrent.futures.ThreadPoolExecutor()._max_workers # type: ignore
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_identifier
)
sync_metadata: Progress = Progress(
SpinnerColumn(), TextColumn("[bold blue]Syncing metadata")
)
overall_progress = Progress(
TextColumn("[bold blue]{task.fields[filename]}"),
BarColumn(),
"{task.completed} of {task.total}",
)
file_progress = Progress(
TextColumn("[bold green]{task.fields[filename]}", justify="right"),
BarColumn(),
"[progress.percentage]{task.percentage:>3.1f}%",
DownloadColumn(),
"•",
TransferSpeedColumn(),
"•",
TimeRemainingColumn(),
)
progress_table: Table = Table.grid()
progress_table.add_row(sync_metadata)
progress_table.add_row(file_progress)
progress_table.add_row(overall_progress)
with Live(progress_table):
sync_task: TaskID = sync_metadata.add_task("")
file_tasks: Dict[str, TaskID] = {}
overall_task = overall_progress.add_task(
"[green]Total progress",
filename="Total progress",
total=0,
visible=False,
)
def progress_callback(
total_file_count: NumberLike, file_advancement: NumberLike
) -> None:
sync_metadata.update(sync_task, visible=False)
overall_progress.update(
overall_task,
total=total_file_count,
advance=file_advancement,
visible=True,
)
def file_upload_callback(
file_name: str,
file_total_bytes: NumberLike,
file_bytes_sent: NumberLike,
) -> None:
if file_name not in file_tasks:
file_tasks[file_name] = file_progress.add_task(
f"[blue]{file_name}", filename=file_name, total=file_total_bytes
)
# Rich has a concurrency issue, so sometimes updating progress
# or removing a task fails. Wrapping this logic around a try/catch block
# is a workaround, we should consider solving this properly (e.g.: using locks)
try:
file_progress.update(
file_tasks[file_name], completed=file_bytes_sent
)
for task in file_progress.tasks:
if task.finished and len(file_progress.tasks) >= max_workers:
file_progress.remove_task(task.id)
except Exception:
pass
upload_manager = dataset.push(
files,
files_to_exclude=files_to_exclude,
fps=fps,
as_frames=frames,
extract_views=extract_views,
handle_as_slices=handle_as_slices,
path=path,
preserve_folders=preserve_folders,
progress_callback=progress_callback,
file_upload_callback=file_upload_callback,
item_merge_mode=item_merge_mode,
)
console = Console(theme=_console_theme())
console.print()
if not upload_manager.blocked_count and not upload_manager.error_count:
console.print(
f"All {upload_manager.total_count} files have been successfully uploaded.\n",
style="success",
)
return
already_existing_items = []
other_skipped_items = []
for item in upload_manager.blocked_items:
for slot in item.slots:
if (slot.reason is not None) and (
slot.reason.upper() == BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS
):
already_existing_items.append(item)
else:
other_skipped_items.append(item)
if already_existing_items:
console.print(
f"Skipped {len(already_existing_items)} files already in the dataset.\n",
style="warning",
)
if upload_manager.error_count or other_skipped_items:
error_count = upload_manager.error_count + len(other_skipped_items)
console.print(
f"{error_count} files couldn't be uploaded because an error occurred.\n",
style="error",
)
if not verbose and upload_manager.error_count:
console.print('Re-run with "--verbose" for further details')
return
error_table: Table = Table(
"Dataset Item ID",
"Filename",
"Remote Path",
"Stage",
"Reason",
show_header=True,
header_style="bold cyan",
)
for item in upload_manager.blocked_items:
for slot in item.slots:
if (slot.reason is not None) and (
slot.reason.upper() != BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS
):
error_table.add_row(
str(item.dataset_item_id),
item.filename,
item.path,
"UPLOAD_REQUEST",
slot.reason,
)
for error in upload_manager.errors:
for local_file in upload_manager.local_files:
if local_file.local_path != error.file_path:
continue
for pending_item in upload_manager.pending_items:
if pending_item.filename != local_file.data["filename"]:
continue
error_table.add_row(
str(pending_item.dataset_item_id),
pending_item.filename,
pending_item.path,
error.stage.name,
str(error.error),
)
break
if error_table.row_count:
console.print(error_table)
print_new_version_info(client)
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
except UnsupportedFileType as e:
_error(f"Unsupported file type {e.path.suffix} ({e.path.name})")
except ValueError as e:
_error(f"{e}")
[docs]
def dataset_import(
dataset_slug: str,
format: str,
files: List[PathLike],
append: bool,
class_prompt: bool = True,
delete_for_empty: bool = False,
import_annotators: bool = False,
import_reviewers: bool = False,
overwrite: bool = False,
use_multi_cpu: bool = False,
cpu_limit: Optional[int] = None,
) -> None:
"""
Imports annotation files to the given dataset.
Exits the application if no dataset with the given slug is found.
Parameters
----------
dataset_slug : str
The dataset's slug.
format : str
Format of the export files.
files : List[PathLike]
List of where the files are.
append : bool, default: True
If ``True`` it appends the annotation from the files to the dataset, if ``False`` it will
override the dataset's current annotations with the ones from the given files.
Incompatible with ``delete-for-empty``.
delete_for_empty : bool, default: False
If ``True`` will use empty annotation files to delete all annotations from the remote file.
If ``False``, empty annotation files will simply be skipped.
Only works for V2 datasets.
Incompatible with ``append``.
import_annotators : bool, default: False
If ``True`` it will import the annotators from the files to the dataset, if available.
If ``False`` it will not import the annotators.
import_reviewers : bool, default: False
If ``True`` it will import the reviewers from the files to the dataset, if .
If ``False`` it will not import the reviewers.
overwrite : bool, default: False
If ``True`` it will bypass a warning that the import will overwrite the current annotations if any are present.
If ``False`` this warning will be skipped and the import will overwrite the current annotations without warning.
use_multi_cpu : bool, default: False
If ``True`` it will use all multiple CPUs to speed up the import process.
cpu_limit : Optional[int], default: Core count - 2
The maximum number of CPUs to use for the import process.
"""
client: Client = _load_client(dataset_identifier=dataset_slug)
try:
importer: ImportParser = get_importer(format)
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
if cpu_limit is not None:
use_multi_cpu = True
import_annotations(
dataset,
importer,
files,
append,
class_prompt,
delete_for_empty,
import_annotators,
import_reviewers,
overwrite,
use_multi_cpu,
cpu_limit,
)
except ImporterNotFoundError:
_error(
f"Unsupported import format: {format}, currently supported: {import_formats}"
)
except AttributeError as e:
_error(f"Internal problem with import occured: {str(e)}")
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
except IncompatibleOptions as e:
_error(str(e))
except UnrecognizableFileEncoding as e:
_error(str(e))
except UnknownAnnotationFileSchema as e:
_error(str(e))
except AnnotationFileValidationError as e:
_error(str(e))
[docs]
def list_files(
dataset_slug: str,
statuses: Optional[str],
path: Optional[str],
only_filenames: bool,
sort_by: Optional[str] = "updated_at:desc",
) -> None:
"""
List all file from the given dataset.
Exits the application if it finds unknown file statuses, if no dataset with the given slug is
found or if another general error occurred.
Parameters
----------
dataset_slug: str
The dataset's slug.
statuses: Optional[str]
Only list files with the given statuses. Valid statuses are: 'annotate', 'archived',
'complete', 'new', 'review'.
path: Optional[str]
Only list files whose Path matches.
only_filenames: bool
If True, only prints the filenames, if False it prints the full file url.
sort_by: Optional[str]
Sort order for listing files. Defaults to 'updated_at:desc'.
"""
client: Client = _load_client(dataset_identifier=dataset_slug)
try:
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
filters: Dict[str, UnknownType] = {}
if statuses:
for status in statuses.split(","):
if not _has_valid_status(status):
_error(
f"Invalid status '{status}', available statuses: annotate, archived, complete, new, review"
)
filters["statuses"] = statuses
else:
filters["statuses"] = "new,annotate,review,complete"
if path:
filters["path"] = path
if not sort_by:
sort_by = "updated_at:desc"
table: Table = Table(show_header=True, header_style="bold cyan")
table.add_column("Name", justify="left")
if not only_filenames:
table.add_column("Status", justify="left")
table.add_column("URL", justify="left")
for file in dataset.fetch_remote_files(filters, sort_by): # type: ignore
if only_filenames:
table.add_row(file.filename)
else:
image_url = dataset.workview_url_for_item(file)
table.add_row(
file.filename,
f"{file.status if not file.archived else 'archived'}",
image_url,
)
Console().print(table)
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
except ValueError as e:
_error(str(e))
[docs]
def set_file_status(dataset_slug: str, status: str, files: List[str]) -> None:
"""
Sets the status of the given files from the given dataset.
Exits the application if the given status is unknown or if no dataset was found.
Parameters
----------
dataset_slug: str
The dataset's slug.
status: str
The new status for the files.
files: List[str]
Names of the files we want to update.
"""
available_statuses = ["archived", "new", "restore-archived", "complete"]
if status not in available_statuses:
_error(
f"Invalid status '{status}', available statuses: {', '.join(available_statuses)}"
)
client: Client = _load_client(dataset_identifier=dataset_slug)
try:
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
items: Iterator[DatasetItem] = dataset.fetch_remote_files(
{"item_names": ",".join(files)}
)
if status == "archived":
dataset.archive(items)
elif status == "new":
dataset.move_to_new(items)
elif status == "restore-archived":
dataset.restore_archived(items)
elif status == "complete":
dataset.complete(items)
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
except ValueError as e:
_error(str(e))
[docs]
def delete_files(
dataset_slug: str, files: List[str], skip_user_confirmation: bool = False
) -> None:
"""
Deletes the files from the given dataset.
Exits the application if no dataset with the given slug is found or a general error occurs.
Parameters
----------
dataset_slug : str
The dataset's slug.
files : List[str]
The list of filenames to delete.
skip_user_confirmation : bool, default: False
If ``True``, skips user confirmation, if False it will prompt the user.
"""
client: Client = _load_client(dataset_identifier=dataset_slug)
try:
console = Console(theme=_console_theme(), stderr=True)
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=dataset_slug
)
items, items_2 = tee(dataset.fetch_remote_files({"item_names": files}))
if not skip_user_confirmation and not secure_continue_request():
console.print("Cancelled.")
return
found_filenames: Set[str] = {item.filename for item in items_2}
not_found_filenames: Set[str] = set(files) - found_filenames
for filename in not_found_filenames:
console.print(f"File not found: {filename}", style="warning")
with console.status("[bold red]Deleting files..."):
dataset.delete_items(items)
console.print("Operation successfully completed!", style="success")
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
except Exception:
_error("An error has occurred, please try again later.")
[docs]
def validate_schemas(
location: str,
pattern: bool = False,
silent: bool = False,
output: Optional[Path] = None,
) -> None:
"""
Validate function for the CLI. Takes one of 3 required key word arguments describing the location of files and prints and/or saves an output
Parameters
----------
location : str
str path to a folder or file location to search
pattern : bool, optional
glob style pattern matching, by default None
silent : bool, optional
flag to set silent console printing, only showing errors, by default False
output : Optional[Path], optional
filename for saving to output, by default None
"""
all_errors = {}
if pattern:
to_validate = [Path(filename) for filename in glob(location)]
elif os.path.isfile(location):
to_validate = [Path(location)]
elif os.path.isdir(location):
to_validate = [Path(filename) for filename in Path(location).glob("*.json")]
else:
to_validate = []
console = Console(theme=_console_theme(), stderr=True)
if not to_validate:
console.print("No files found to validate", style="warning")
return
console.print(f"Validating schemas for {len(to_validate)} files")
for file in to_validate:
try:
errors = [
{"message": e.message, "location": e.json_path}
for e in validate_file_against_schema(file)
]
except MissingSchema as e:
errors = [{"message": e.message, "location": "schema link"}]
all_errors[str(file)] = errors
if not errors:
if not silent:
console.print(f"{str(file)}: No Errors", style="success")
continue
console.print(f"{str(file)}: {len(errors)} errors", style="error")
for error in errors:
console.print(f"\t- Problem found in {error['location']}", style="error")
console.print(f"\t\t- {error['message']}", style="error")
if output:
try:
filename: Path = output
if os.path.isdir(output):
filename = Path(os.path.join(output, "report.json"))
with open(filename, "w") as outfile:
json.dump(all_errors, outfile, indent=2)
console.print(f"Writing report to {filename}", style="success")
except Exception as e:
console.print(f"Error writing output file with {e}", style="error")
console.print("Did you supply an invalid filename?")
[docs]
def dataset_convert(
dataset_identifier: str,
format: str,
output_dir: Optional[PathLike] = None,
) -> None:
"""
Converts the annotations from the given dataset to the given format.
Exits the application if no dataset with the given slug exists or no releases for the dataset
were previously pulled.
Parameters
----------
dataset_identifier : str
The dataset identifier, normally in the "<team-slug>/<dataset-slug>:<version>" form.
format : str
The format we want to convert to.
output_dir : Optional[PathLike], default: None
The folder where the exported annotation files will be. If None it will be the inside the
annotations folder of the dataset under 'other_formats/{format}'.
"""
identifier: DatasetIdentifier = DatasetIdentifier.parse(dataset_identifier)
client: Client = _load_client(team_slug=identifier.team_slug)
try:
parser: ExportParser = get_exporter(format)
dataset: RemoteDataset = client.get_remote_dataset(
dataset_identifier=identifier
)
if not dataset.local_path.exists():
_error(
f"No annotations downloaded for dataset f{dataset}, first pull a release using "
f"'darwin dataset pull {identifier}'"
)
release_path: Path = get_release_path(dataset.local_path, identifier.version)
annotations_path: Path = release_path / "annotations"
if output_dir is None:
output_dir = release_path / "other_formats" / format
else:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
export_annotations(parser, [annotations_path], output_dir)
except ExporterNotFoundError:
_error(
f"Unsupported export format: {format}, currently supported: {export_formats}"
)
except AttributeError:
_error(
f"Unsupported export format: {format}, currently supported: {export_formats}"
)
except NotFound as e:
_error(f"No dataset with name '{e.name}'")
[docs]
def convert(
format: str,
files: List[PathLike],
output_dir: Path,
) -> None:
"""
Converts the given files to the specified format.
Parameters
----------
format: str
The target format to export to.
files: List[PathLike]
List of files to be converted.
output_dir: Path
Folder where the exported annotations will be placed.
"""
try:
parser: ExportParser = get_exporter(format)
except ExporterNotFoundError:
_error(f"Unsupported export format, currently supported: {export_formats}")
except AttributeError:
_error(f"Unsupported export format, currently supported: {export_formats}")
export_annotations(
parser,
files,
output_dir,
split_sequences=(format != "nifti"),
)
[docs]
def report_annotators(
dataset_slugs: list[str],
start: datetime.datetime,
stop: datetime.datetime,
group_by: list[AnnotatorReportGrouping],
pretty: bool,
) -> None:
"""
Prints an annotators report in CSV format.
Parameters
----------
dataset_slugs : list[str]
Slugs of datasets to include in the report.
start : datetime.datetime
Timezone aware report start DateTime.
stop : datetime.datetime
Timezone aware report end DateTime.
group_by: list[AnnotatorReportGrouping]
Non-empty list of grouping options for the report.
pretty : bool
If ``True``, it will print the output in a Rich formatted table.
"""
client: Client = _load_client()
console = Console(theme=_console_theme())
dataset_ids = []
for dataset in client.list_remote_datasets():
if dataset.slug in dataset_slugs:
dataset_ids.append(dataset.dataset_id)
dataset_slugs.remove(dataset.slug)
if dataset_slugs:
_error(f"Datasets '{dataset_slugs}' do not exist.")
report: str = client.get_annotators_report(
dataset_ids,
start,
stop,
group_by,
).text
# the API does not return CSV headers if the report is empty
if not report:
report = "timestamp,dataset_id,dataset_name,dataset_slug,workflow_id,workflow_name,current_stage_id,current_stage_name,actor_id,actor_type,actor_email,actor_full_name,active_time,total_annotations,review_pass_rate,total_items_annotated,time_per_annotation,time_per_item\n"
if not pretty:
print(report)
return
lines: List[str] = report.split("\n")
lines.pop(0) # remove csv headers
lines.pop() # remove last line, which is empty
table: Table = Table(show_header=True, header_style="bold cyan")
table.add_column("Date")
for header in [
"Dataset Id",
"Dataset Name",
"Dataset Slug",
"Workflow Id",
"Workflow Name",
"Current Stage Id",
"Current Stage Name",
"User Id",
"User Type",
"Email",
"Full Name",
"Active Time",
"Total Annotations",
"Review Pass Rate",
"Total Items Annotated",
"Time Per Annotation",
"Time Per Item",
]:
table.add_column(header, justify="right")
for row in lines:
table.add_row(*row.split(","))
console.print(table)
[docs]
def help(parser: argparse.ArgumentParser, subparser: Optional[str] = None) -> None:
"""
Prints the help text for the given command.
Parameters
----------
parser: argparse.ArgumentParser
The parser used to read input from the user.
subparser: Optional[str]
Actions from the parser to be processed. Defaults to None.
"""
if subparser:
parser = next(
action.choices[subparser]
for action in parser._actions
if isinstance(action, argparse._SubParsersAction)
and subparser in action.choices
)
actions = [
action
for action in parser._actions
if isinstance(action, argparse._SubParsersAction)
]
print(parser.description)
print("\nCommands:")
for action in actions:
# get all subparsers and print help
for choice in sorted(action._choices_actions, key=lambda x: x.dest):
print(" {:<19} {}".format(choice.dest, choice.help))
[docs]
def print_new_version_info(client: Optional[Client] = None) -> None:
"""
Prints a message informing the user of a new darwin-py version.
Does nothing if no new version is available or if no client is provided.
Parameters
----------
client: Optional[Client]
The client containing information about the new version. Defaults to None.
"""
if not client or not client.newer_darwin_version:
return
(a, b, c) = tuple(client.newer_darwin_version)
console = Console(theme=_console_theme(), stderr=True)
console.print(
f"A newer version of darwin-py ({a}.{b}.{c}) is available!",
"Run the following command to install it:",
"",
f" pip install darwin-py=={a}.{b}.{c}",
"",
sep="\n",
style="warning",
)
def _error(message: str) -> NoReturn:
console = Console(theme=_console_theme())
console.print(f"Error: {message}", style="error")
sys.exit(1)
def _config() -> Config:
return Config(Path.home() / ".darwin" / "config.yaml")
def _load_client(
team_slug: Optional[str] = None,
maybe_guest: bool = False,
dataset_identifier: Optional[str] = None,
) -> Client:
"""Fetches a client
Parameters
----------
maybe_guest : bool
Flag to make a guest client, if config is missing
Returns
-------
Client
The client requested
"""
if not team_slug and dataset_identifier:
team_slug = DatasetIdentifier.parse(dataset_identifier).team_slug
try:
api_key = os.getenv("DARWIN_API_KEY")
if api_key:
client = Client.from_api_key(api_key)
else:
config_dir = Path.home() / ".darwin" / "config.yaml"
client = Client.from_config(config_dir, team_slug=team_slug)
return client
except MissingConfig:
if maybe_guest:
return Client.from_guest()
else:
_error("Authenticate first")
except InvalidLogin:
_error("Please re-authenticate")
except Unauthenticated:
_error("Please re-authenticate")
def _console_theme() -> Theme:
return Theme(
{"success": "bold green", "warning": "bold yellow", "error": "bold red"}
)
def _has_valid_status(status: str) -> bool:
return status in ["new", "annotate", "review", "complete", "archived"]