Files
2026-02-03 02:15:07 +03:00

63 lines
1.8 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO
@dataclass(frozen=True)
class LocalConfig:
root_dir: Path
class LocalStorageBackend:
def __init__(self, cfg: LocalConfig) -> None:
self._cfg = cfg
self._cfg.root_dir.mkdir(parents=True, exist_ok=True)
def _full_path(self, key: str) -> Path:
return (self._cfg.root_dir / key).resolve()
def upload_fileobj(
self, key: str, fileobj: BinaryIO, *, content_type: str | None
) -> None:
# content_type is unused for filesystem backend.
_ = content_type
full_path = self._full_path(key)
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, "wb") as out:
while True:
chunk = fileobj.read(1024 * 1024)
if not chunk:
break
out.write(chunk)
def download_fileobj(self, key: str, fileobj: BinaryIO) -> None:
full_path = self._full_path(key)
with open(full_path, "rb") as src:
while True:
chunk = src.read(1024 * 1024)
if not chunk:
break
fileobj.write(chunk)
def exists(self, key: str) -> bool:
return self._full_path(key).exists()
def size(self, key: str) -> int:
return self._full_path(key).stat().st_size
def delete(self, key: str) -> None:
path = self._full_path(key)
if path.exists():
path.unlink()
def read(self, key: str) -> bytes:
return self._full_path(key).read_bytes()
def generate_url(self, key: str) -> str:
# Served by cpv3 via /api/files/local/{path}
return f"/api/files/local/{key}"