from __future__ import annotations import uuid from sqlalchemy import Select, select from sqlalchemy.ext.asyncio import AsyncSession from cpv3.modules.media.models import ArtifactMediaFile, MediaFile from cpv3.modules.media.schemas import ( ArtifactMediaFileCreate, ArtifactMediaFileUpdate, MediaFileCreate, MediaFileUpdate, ) from cpv3.modules.users.models import User class MediaFileRepository: """Repository for MediaFile database operations.""" def __init__(self, session: AsyncSession) -> None: self._session = session async def list_all(self, *, requester: User) -> list[MediaFile]: stmt: Select[tuple[MediaFile]] = select(MediaFile).where( MediaFile.is_deleted.is_(False) ) if not requester.is_staff: stmt = stmt.where(MediaFile.owner_id == requester.id) result = await self._session.execute(stmt.order_by(MediaFile.created_at.desc())) return list(result.scalars().all()) async def get_by_id(self, media_file_id: uuid.UUID) -> MediaFile | None: result = await self._session.execute( select(MediaFile).where(MediaFile.id == media_file_id) ) media_file = result.scalar_one_or_none() if media_file is None or media_file.is_deleted: return None return media_file async def create(self, *, requester: User, data: MediaFileCreate) -> MediaFile: media_file = MediaFile( owner_id=requester.id, project_id=data.project_id, duration_seconds=data.duration_seconds, frame_rate=data.frame_rate, width=data.width, height=data.height, probe_json=data.probe_json, notes=data.notes, meta=data.meta, ) self._session.add(media_file) await self._session.commit() await self._session.refresh(media_file) return media_file async def update(self, media_file: MediaFile, data: MediaFileUpdate) -> MediaFile: for key, value in data.model_dump(exclude_unset=True).items(): if value is not None: setattr(media_file, key, value) await self._session.commit() await self._session.refresh(media_file) return media_file async def mark_deleted(self, media_file: MediaFile) -> None: media_file.is_deleted = True await self._session.commit() class ArtifactRepository: """Repository for ArtifactMediaFile database operations.""" def __init__(self, session: AsyncSession) -> None: self._session = session async def list_all(self) -> list[ArtifactMediaFile]: result = await self._session.execute( select(ArtifactMediaFile) .where(ArtifactMediaFile.is_deleted.is_(False)) .order_by(ArtifactMediaFile.created_at.desc()) ) return list(result.scalars().all()) async def get_by_id(self, artifact_id: uuid.UUID) -> ArtifactMediaFile | None: result = await self._session.execute( select(ArtifactMediaFile).where(ArtifactMediaFile.id == artifact_id) ) artifact = result.scalar_one_or_none() if artifact is None or artifact.is_deleted: return None return artifact async def create(self, data: ArtifactMediaFileCreate) -> ArtifactMediaFile: artifact = ArtifactMediaFile( project_id=data.project_id, file_id=data.file_id, media_file_id=data.media_file_id, artifact_type=data.artifact_type, ) self._session.add(artifact) await self._session.commit() await self._session.refresh(artifact) return artifact async def update( self, artifact: ArtifactMediaFile, data: ArtifactMediaFileUpdate ) -> ArtifactMediaFile: for key, value in data.model_dump(exclude_unset=True).items(): if value is not None: setattr(artifact, key, value) await self._session.commit() await self._session.refresh(artifact) return artifact async def mark_deleted(self, artifact: ArtifactMediaFile) -> None: artifact.is_deleted = True await self._session.commit()