Files
2026-02-03 02:15:07 +03:00

125 lines
3.6 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from os import path
from tempfile import NamedTemporaryFile
from typing import BinaryIO, Callable, Protocol
from uuid import uuid4
import anyio
import anyio.to_thread
from cpv3.infrastructure.storage.types import FileInfo
@dataclass(frozen=True)
class TempFile:
path: str
cleanup: Callable[[], None]
class StorageBackend(Protocol):
"""Protocol defining the interface for storage backends."""
def upload_fileobj(
self, key: str, fileobj: BinaryIO, *, content_type: str | None
) -> None: ...
def download_fileobj(self, key: str, fileobj: BinaryIO) -> None: ...
def exists(self, key: str) -> bool: ...
def size(self, key: str) -> int: ...
def delete(self, key: str) -> None: ...
def read(self, key: str) -> bytes: ...
def generate_url(self, key: str) -> str: ...
class StorageService:
"""High-level async storage service wrapping a backend."""
def __init__(self, backend: StorageBackend) -> None:
self._backend = backend
def _make_key(self, file_name: str, folder: str, gen_name: bool) -> str:
if gen_name:
_, ext = path.splitext(file_name)
file_name = f"{uuid4().hex}{ext if ext else ''}"
return path.join(folder, file_name) if folder else file_name
async def upload_fileobj(
self,
*,
fileobj: BinaryIO,
file_name: str,
folder: str = "",
gen_name: bool = True,
content_type: str | None = None,
) -> str:
key = self._make_key(file_name, folder, gen_name)
def _upload() -> None:
fileobj.seek(0)
self._backend.upload_fileobj(key, fileobj, content_type=content_type)
await anyio.to_thread.run_sync(_upload)
return key
async def exists(self, key: str) -> bool:
return await anyio.to_thread.run_sync(lambda: self._backend.exists(key))
async def delete(self, key: str) -> None:
await anyio.to_thread.run_sync(lambda: self._backend.delete(key))
async def size(self, key: str) -> int:
return await anyio.to_thread.run_sync(lambda: self._backend.size(key))
async def read(self, key: str) -> bytes:
return await anyio.to_thread.run_sync(lambda: self._backend.read(key))
async def url(self, key: str) -> str:
return await anyio.to_thread.run_sync(lambda: self._backend.generate_url(key))
async def get_file_info(self, key: str) -> FileInfo:
if not await self.exists(key):
raise FileNotFoundError(f"File '{key}' does not exist")
file_url = await self.url(key)
file_size = await self.size(key)
return FileInfo(
file_path=key,
file_url=file_url,
file_size=file_size,
filename=path.basename(key),
)
async def download_to_temp(self, key: str) -> TempFile:
if not await self.exists(key):
raise FileNotFoundError(f"File '{key}' does not exist")
_, ext = path.splitext(key)
suffix = ext if ext else ".bin"
out_path: str
with NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
out_path = tmp.name
def _download() -> None:
with open(out_path, "wb") as out:
self._backend.download_fileobj(key, out)
await anyio.to_thread.run_sync(_download)
def _cleanup() -> None:
import os
if os.path.exists(out_path):
os.remove(out_path)
return TempFile(path=out_path, cleanup=_cleanup)