from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass from os import path from tempfile import NamedTemporaryFile from typing import BinaryIO, Callable, Protocol from uuid import uuid4 import anyio import anyio.to_thread from cpv3.infrastructure.storage.types import FileInfo @dataclass(frozen=True) class TempFile: path: str cleanup: Callable[[], None] class StorageBackend(Protocol): """Protocol defining the interface for storage backends.""" def upload_fileobj( self, key: str, fileobj: BinaryIO, *, content_type: str | None ) -> None: ... def download_fileobj(self, key: str, fileobj: BinaryIO) -> None: ... def exists(self, key: str) -> bool: ... def size(self, key: str) -> int: ... def delete(self, key: str) -> None: ... def read(self, key: str) -> bytes: ... def generate_url(self, key: str) -> str: ... class StorageService: """High-level async storage service wrapping a backend.""" def __init__(self, backend: StorageBackend) -> None: self._backend = backend def _make_key(self, file_name: str, folder: str, gen_name: bool) -> str: if gen_name: _, ext = path.splitext(file_name) file_name = f"{uuid4().hex}{ext if ext else ''}" return path.join(folder, file_name) if folder else file_name async def upload_fileobj( self, *, fileobj: BinaryIO, file_name: str, folder: str = "", gen_name: bool = True, content_type: str | None = None, ) -> str: key = self._make_key(file_name, folder, gen_name) def _upload() -> None: fileobj.seek(0) self._backend.upload_fileobj(key, fileobj, content_type=content_type) await anyio.to_thread.run_sync(_upload) return key async def exists(self, key: str) -> bool: return await anyio.to_thread.run_sync(lambda: self._backend.exists(key)) async def delete(self, key: str) -> None: await anyio.to_thread.run_sync(lambda: self._backend.delete(key)) async def size(self, key: str) -> int: return await anyio.to_thread.run_sync(lambda: self._backend.size(key)) async def read(self, key: str) -> bytes: return await anyio.to_thread.run_sync(lambda: self._backend.read(key)) async def url(self, key: str) -> str: return await anyio.to_thread.run_sync(lambda: self._backend.generate_url(key)) async def get_file_info(self, key: str) -> FileInfo: if not await self.exists(key): raise FileNotFoundError(f"File '{key}' does not exist") file_url = await self.url(key) file_size = await self.size(key) return FileInfo( file_path=key, file_url=file_url, file_size=file_size, filename=path.basename(key), ) async def download_to_temp(self, key: str) -> TempFile: if not await self.exists(key): raise FileNotFoundError(f"File '{key}' does not exist") _, ext = path.splitext(key) suffix = ext if ext else ".bin" out_path: str with NamedTemporaryFile(suffix=suffix, delete=False) as tmp: out_path = tmp.name def _download() -> None: with open(out_path, "wb") as out: self._backend.download_fileobj(key, out) await anyio.to_thread.run_sync(_download) def _cleanup() -> None: import os if os.path.exists(out_path): os.remove(out_path) return TempFile(path=out_path, cleanup=_cleanup)