"""
Storage uploader module for uploading video artifacts to external cloud storage.
Supports AWS S3, Google Cloud Storage, and Azure Blob Storage
Note on retry strategy:
- All cloud SDKs are configured to use their built-in robust retry logic for
transient HTTP errors (429, 5xx) and internal timeouts.
- We add a thin outer retry layer using `tenacity` ONLY for immediate network-level
failures (ConnectionError, socket.timeout) that may occur before SDK logic engages.
"""
import mimetypes
import os
import socket
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
from tenacity import (
retry,
retry_if_exception,
stop_after_delay,
wait_exponential_jitter,
)
from darwin.datatypes import ObjectStore
[docs]
class StorageClient(ABC):
"""Abstract base class for cloud storage clients."""
[docs]
@abstractmethod
def upload_file(self, local_path: str, storage_key: str) -> None:
"""
Upload a file to cloud storage.
Parameters
----------
local_path : str
Path to local file to upload
storage_key : str
Storage key (path) where file will be stored
Raises
------
Exception
If upload fails
"""
pass
[docs]
class S3StorageClient(StorageClient):
"""AWS S3 storage client implementation.
Uses boto3's "standard" retry mode for consistent retry behavior:
- 5 attempts total (1 initial + 4 retries)
- Exponential backoff with jitter
- Retries on throttling (429), server errors (5xx), and transient connection errors
"""
def __init__(self, bucket: str, region: Optional[str], prefix: str):
"""
Initialize S3 storage client.
Parameters
----------
bucket : str
S3 bucket name
region : Optional[str]
AWS region (optional, uses environment or default)
prefix : str
Base prefix for all storage keys
"""
try:
import boto3
from botocore.config import Config
except ImportError:
raise ImportError(
"boto3 is required for AWS S3 storage. Install with: pip install darwin-py[storage_aws]"
)
# Use region from ObjectStore if available, otherwise from env or default
session_config = {}
if region:
session_config["region_name"] = region
elif os.getenv("AWS_REGION"):
session_config["region_name"] = os.getenv("AWS_REGION")
retry_config = Config(
retries={
"mode": "standard", # Recommended mode with exponential backoff
"max_attempts": 5, # 5 attempts (1 initial + 4 retries)
}
)
self.s3_client = boto3.client("s3", config=retry_config, **session_config)
self.bucket = bucket
self.prefix = prefix
[docs]
def upload_file(self, local_path: str, storage_key: str) -> None:
"""Upload file to S3."""
# Handle gzip content encoding
extra_args = {}
if local_path.endswith(".gz"):
extra_args = {"ContentEncoding": "gzip"}
self.s3_client.upload_file(
local_path, self.bucket, storage_key, ExtraArgs=extra_args
)
[docs]
class GCSStorageClient(StorageClient):
"""Google Cloud Storage client implementation.
Uses google-cloud-storage's built-in retry (DEFAULT_RETRY):
- Retries on 429, 500, 502, 503, 504 errors
- Exponential backoff with jitter
- upload_from_filename() uses DEFAULT_RETRY automatically
"""
def __init__(self, bucket: str, prefix: str):
"""
Initialize GCS storage client.
Parameters
----------
bucket : str
GCS bucket name
prefix : str
Base prefix for all storage keys
"""
try:
from google.cloud import storage
except ImportError:
raise ImportError(
"google-cloud-storage is required for GCS storage. Install with: pip install darwin-py[storage_gcp]"
)
self.storage_client = storage.Client()
self.bucket = self.storage_client.bucket(bucket)
self.prefix = prefix
[docs]
def upload_file(self, local_path: str, storage_key: str) -> None:
"""Upload file to GCS."""
blob = self.bucket.blob(storage_key)
# Handle gzip content encoding
if local_path.endswith(".gz"):
blob.content_encoding = "gzip"
blob.upload_from_filename(local_path)
[docs]
class AzureStorageClient(StorageClient):
"""Azure Blob Storage client implementation.
Uses azure-storage-blob's built-in ExponentialRetry policy:
- 5 retry attempts with exponential backoff
- Initial backoff ~0.8s, max ~60s
- Retries on 429, 500, 502, 503, 504 errors
"""
def __init__(self, account_name: str, container: str, prefix: str):
"""
Initialize Azure storage client.
Parameters
----------
account_name : str
Azure storage account name
container : str
Azure container name
prefix : str
Base prefix for all storage keys (within the container)
"""
try:
from azure.storage.blob import BlobServiceClient, ExponentialRetry
except ImportError:
raise ImportError(
"azure-storage-blob is required for Azure storage. Install with: pip install darwin-py[storage_azure]"
)
# Configure retry policy: 5 attempts, exponential backoff
retry_policy = ExponentialRetry(retry_total=5)
connection_string = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
account_key = os.getenv("AZURE_STORAGE_ACCOUNT_KEY")
if connection_string:
self.blob_service_client = BlobServiceClient.from_connection_string(
connection_string, retry_policy=retry_policy
)
else:
# Use account name from ObjectStore with either account key or DefaultAzureCredential
account_url = f"https://{account_name}.blob.core.windows.net"
if account_key:
# Use account key if provided
self.blob_service_client = BlobServiceClient(
account_url=account_url,
credential=account_key,
retry_policy=retry_policy,
)
else:
# Use DefaultAzureCredential (supports managed identity, Azure CLI, etc.)
try:
from azure.identity import DefaultAzureCredential
except ImportError:
raise ImportError(
"azure-identity is required for DefaultAzureCredential. "
"Install with: pip install darwin-py[storage_azure]"
)
self.blob_service_client = BlobServiceClient(
account_url=account_url,
credential=DefaultAzureCredential(),
retry_policy=retry_policy,
)
self.container_client = self.blob_service_client.get_container_client(container)
self.prefix = prefix
[docs]
def upload_file(self, local_path: str, storage_key: str) -> None:
"""Upload file to Azure Blob Storage."""
from azure.storage.blob import ContentSettings
blob_client = self.container_client.get_blob_client(storage_key)
# Detect content type from file extension
content_type, _ = mimetypes.guess_type(local_path)
# Build content settings
settings_kwargs = {}
if content_type:
settings_kwargs["content_type"] = content_type
if local_path.endswith(".gz"):
settings_kwargs["content_encoding"] = "gzip"
content_settings = (
ContentSettings(**settings_kwargs) if settings_kwargs else None
)
with open(local_path, "rb") as data:
blob_client.upload_blob(
data, overwrite=True, content_settings=content_settings
)
[docs]
def create_storage_client(object_store: ObjectStore) -> StorageClient:
"""
Create storage client based on ObjectStore provider.
Parameters
----------
object_store : ObjectStore
ObjectStore configuration containing provider, bucket, region, etc.
Returns
-------
StorageClient
Appropriate storage client for the provider
Raises
------
ValueError
If provider is not supported
"""
if object_store.provider == "aws":
return S3StorageClient(
bucket=object_store.bucket,
region=object_store.region,
prefix=object_store.prefix,
)
elif object_store.provider == "gcp":
return GCSStorageClient(bucket=object_store.bucket, prefix=object_store.prefix)
elif object_store.provider == "azure":
# For Azure: bucket field contains storage account name
# Prefix format: "container-name/folder-name"
# If blank, defaults to "data" container
if not object_store.prefix or object_store.prefix.strip() == "":
# Default to "data" container if prefix is blank
container = "data"
prefix = ""
elif "/" in object_store.prefix:
# Extract container from first segment: "container-name/folder-name"
container, _, prefix = object_store.prefix.partition("/")
else:
# No slash: treat entire prefix as container with empty path
# E.g., "mycontainer" → container="mycontainer", prefix=""
container = object_store.prefix
prefix = ""
return AzureStorageClient(
account_name=object_store.bucket,
container=container,
prefix=prefix,
)
else:
raise ValueError(
f"Unsupported storage provider: {object_store.provider}. "
f"Supported providers: aws, gcp, azure"
)
def _is_retryable_error(exception: Exception) -> bool:
"""
Check if an exception is a transient network error that should be retried.
This is a thin outer retry layer. Cloud SDKs handle most transient errors
internally (429, 5xx). We retry only on:
- Connection errors (ConnectionError, ConnectionResetError, BrokenPipeError)
- Timeout errors (TimeoutError, socket.timeout)
These can occur at network level before/after SDK retry logic.
Parameters
----------
exception : Exception
Exception to check
Returns
-------
bool
True if exception should be retried
"""
# Retryable network/connection errors
if isinstance(exception, (ConnectionError, TimeoutError, socket.timeout)):
return True
# Check wrapped cause (SDKs often wrap underlying errors)
cause = exception.__cause__
if cause is not None and cause is not exception:
return _is_retryable_error(cause)
return False
[docs]
@retry(
reraise=True,
wait=wait_exponential_jitter(initial=0.5, max=5, jitter=1),
stop=stop_after_delay(30),
retry=retry_if_exception(_is_retryable_error),
)
def upload_with_retry(client: StorageClient, local_path: str, storage_key: str) -> None:
"""
Upload file with retry for connection/timeout errors.
Cloud SDKs handle HTTP-level retries (429, 5xx) internally.
This adds an outer retry layer for network-level failures.
Parameters
----------
client : StorageClient
Storage client to use for upload
local_path : str
Path to local file
storage_key : str
Storage key where file will be stored
Raises
------
Exception
If upload fails after retries or on non-retryable error
"""
client.upload_file(local_path, storage_key)
[docs]
def upload_artifacts(
object_store: ObjectStore,
local_artifacts_dir: str,
source_file: str,
storage_key_prefix: str,
max_workers: int = 10,
) -> None:
"""
Upload all artifacts with retry logic.
Uploads source video file and all extracted artifacts to cloud storage.
Aborts entire operation if any upload fails.
Parameters
----------
object_store : ObjectStore
ObjectStore configuration
local_artifacts_dir : str
Local directory containing extracted artifacts
source_file : str
Path to source video file
storage_key_prefix : str
Prefix for all storage keys (e.g., prefix/item_uuid/files/slot_uuid)
max_workers : int
Number of ThreadPoolExecutor workers (default: 10)
Raises
------
Exception
If any upload fails after retries
"""
client = create_storage_client(object_store)
# Collect all files to upload
files_to_upload = []
# Source file
source_filename = os.path.basename(source_file)
files_to_upload.append((source_file, f"{storage_key_prefix}/{source_filename}"))
# Walk artifacts directory
for root, dirs, files in os.walk(local_artifacts_dir):
for file in files:
local_path = os.path.join(root, file)
relative_path = os.path.relpath(local_path, local_artifacts_dir)
storage_key = f"{storage_key_prefix}/{relative_path}".replace(os.sep, "/")
files_to_upload.append((local_path, storage_key))
# Upload with ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(upload_with_retry, client, local, key)
for local, key in files_to_upload
]
# Wait for all, abort on first failure
for future in futures:
future.result() # Raises if upload failed