from __future__ import annotations import uuid import httpx from sqlalchemy.ext.asyncio import AsyncSession from cpv3.infrastructure.settings import get_settings from cpv3.modules.captions.models import CaptionPreset from cpv3.modules.captions.repository import CaptionPresetRepository from cpv3.modules.captions.schemas import ( CaptionPresetCreate, CaptionPresetUpdate, CaptionStyleConfig, ) from cpv3.modules.transcription.schemas import Document ERROR_PRESET_NOT_FOUND = "Пресет субтитров не найден" ERROR_PRESET_FORBIDDEN = "Нельзя изменять чужой или системный пресет" # --------------------------------------------------------------------------- # Caption rendering (calls Remotion service) # --------------------------------------------------------------------------- async def generate_captions( *, video_s3_path: str, folder: str, transcription: Document, style_config: dict | None = None, callback_url: str | None = None, render_id: str | None = None, ) -> str: """Generate captions for a video using the Remotion service. Returns render_id (async mode with callback_url) or S3 output path (sync fallback). """ settings = get_settings() payload: dict = { "folder": folder, "videoSrc": video_s3_path, "transcription": transcription.model_dump(), } if style_config is not None: payload["styleConfig"] = style_config if callback_url is not None: payload["callbackUrl"] = callback_url if render_id is not None: payload["renderId"] = render_id timeout = 30.0 if callback_url else 300.0 async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.post(f"{settings.remotion_service_url}/api/render", json=payload) resp.raise_for_status() data = resp.json() # Async mode: Remotion returns { renderId, status: "queued" } if callback_url and "renderId" in data: return str(data["renderId"]) # Sync fallback: Remotion returns { output: "s3/path" } if isinstance(data, dict) and "output" in data: return str(data["output"]) raise RuntimeError("Unexpected response from remotion service") # --------------------------------------------------------------------------- # Preset service # --------------------------------------------------------------------------- class CaptionPresetService: """Business logic for caption preset CRUD with ownership checks.""" def __init__(self, session: AsyncSession) -> None: self._repo = CaptionPresetRepository(session) async def list_presets(self, *, user_id: uuid.UUID) -> list[CaptionPreset]: return await self._repo.list_all_for_user(user_id) async def get_preset(self, *, preset_id: uuid.UUID) -> CaptionPreset: preset = await self._repo.get_by_id(preset_id) if preset is None: raise ValueError(ERROR_PRESET_NOT_FOUND) return preset async def create_preset( self, *, user_id: uuid.UUID, data: CaptionPresetCreate ) -> CaptionPreset: return await self._repo.create(user_id=user_id, data=data) async def update_preset( self, *, preset_id: uuid.UUID, user_id: uuid.UUID, data: CaptionPresetUpdate, ) -> CaptionPreset: preset = await self.get_preset(preset_id=preset_id) if preset.is_system or preset.user_id != user_id: raise PermissionError(ERROR_PRESET_FORBIDDEN) return await self._repo.update(preset, data) async def delete_preset(self, *, preset_id: uuid.UUID, user_id: uuid.UUID) -> None: preset = await self.get_preset(preset_id=preset_id) if preset.is_system or preset.user_id != user_id: raise PermissionError(ERROR_PRESET_FORBIDDEN) await self._repo.deactivate(preset) async def resolve_style_config( self, *, preset_id: uuid.UUID | None = None, inline_config: dict | None = None, ) -> dict | None: """Resolve a style config from preset_id or inline override.""" if inline_config is not None: # Validate by parsing, then return as dict cfg = CaptionStyleConfig.model_validate(inline_config) return cfg.model_dump(mode="json") if preset_id is not None: preset = await self.get_preset(preset_id=preset_id) return preset.style_config return None