""" Task service for submitting and managing background tasks. """ from __future__ import annotations import asyncio import logging import uuid from datetime import datetime, timezone from typing import Any import dramatiq # type: ignore[import-untyped] from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] from pydantic import BaseModel from sqlalchemy import create_engine, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session, sessionmaker from cpv3.infrastructure.deps import _get_storage_service from cpv3.infrastructure.settings import get_settings from cpv3.modules.jobs.models import Job, JobEvent from cpv3.modules.jobs.repository import JobRepository from cpv3.modules.jobs.schemas import JobCreate, JobTypeEnum from cpv3.modules.tasks.schemas import ( CaptionsGenerateRequest, MediaConvertRequest, MediaProbeRequest, SilenceRemoveRequest, TaskSubmitResponse, TranscriptionGenerateRequest, ) from cpv3.modules.transcription.repository import TranscriptionRepository from cpv3.modules.users.models import User from cpv3.modules.webhooks.repository import WebhookRepository from cpv3.modules.webhooks.schemas import WebhookCreate logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- _settings = get_settings() _redis_broker = RedisBroker(url=_settings.redis_url) dramatiq.set_broker(_redis_broker) # --------------------------------------------------------------------------- # Sync DB helpers for Dramatiq workers # --------------------------------------------------------------------------- def _get_sync_session() -> Session: """Create sync DB session for worker tasks.""" settings = get_settings() sync_url = settings.get_database_url().replace( "postgresql+asyncpg://", "postgresql://" ) engine = create_engine(sync_url, pool_pre_ping=True) return sessionmaker(bind=engine, expire_on_commit=False)() def _update_job( job_id: uuid.UUID, *, status: str | None = None, current_message: str | None = None, progress_pct: float | None = None, error_message: str | None = None, output_data: dict | None = None, started_at: datetime | None = None, finished_at: datetime | None = None, ) -> Job | None: """Update job in database (sync, for workers).""" with _get_sync_session() as session: job = session.execute(select(Job).where(Job.id == job_id)).scalar_one_or_none() if job is None: return None if status is not None: job.status = status if current_message is not None: job.current_message = current_message if progress_pct is not None: job.project_pct = progress_pct if error_message is not None: job.error_message = error_message if output_data is not None: job.output_data = output_data if started_at is not None: job.started_at = started_at if finished_at is not None: job.finished_at = finished_at # Create event event = JobEvent( job_id=job_id, event_type=f"status_{status}" if status else "progress", payload={"status": status or job.status, "message": current_message}, ) session.add(event) session.commit() session.refresh(job) return job def _run_async(coro: Any) -> Any: """Run async function in new event loop (for sync Dramatiq actors).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() # --------------------------------------------------------------------------- # Dramatiq actors # --------------------------------------------------------------------------- @dramatiq.actor(max_retries=3, min_backoff=1000) def media_probe_actor(job_id: str, file_key: str) -> None: """Probe media file to extract metadata.""" from cpv3.modules.media.service import probe_media _job_id = uuid.UUID(job_id) _update_job( _job_id, status="RUNNING", current_message="Starting", started_at=datetime.now(timezone.utc), ) try: storage = _get_storage_service() _update_job(_job_id, current_message="Probing media", progress_pct=50.0) result = _run_async(probe_media(storage, file_key=file_key)) _update_job( _job_id, status="DONE", current_message="Completed", progress_pct=100.0, output_data=result.model_dump(mode="json"), finished_at=datetime.now(timezone.utc), ) except Exception as e: logger.exception("media_probe_actor failed: %s", _job_id) _update_job( _job_id, status="FAILED", error_message=str(e), finished_at=datetime.now(timezone.utc), ) raise @dramatiq.actor(max_retries=3, min_backoff=1000) def silence_remove_actor( job_id: str, file_key: str, out_folder: str, min_silence_duration_ms: int, silence_threshold_db: int, padding_ms: int, ) -> None: """Remove silence from media file.""" from cpv3.modules.media.service import remove_silence _job_id = uuid.UUID(job_id) _update_job( _job_id, status="RUNNING", current_message="Starting", started_at=datetime.now(timezone.utc), ) try: storage = _get_storage_service() _update_job(_job_id, current_message="Processing", progress_pct=30.0) result = _run_async( remove_silence( storage, file_key=file_key, out_folder=out_folder, min_silence_duration_ms=min_silence_duration_ms, silence_threshold_db=silence_threshold_db, padding_ms=padding_ms, ) ) _update_job( _job_id, status="DONE", current_message="Completed", progress_pct=100.0, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=datetime.now(timezone.utc), ) except Exception as e: logger.exception("silence_remove_actor failed: %s", _job_id) _update_job( _job_id, status="FAILED", error_message=str(e), finished_at=datetime.now(timezone.utc), ) raise @dramatiq.actor(max_retries=3, min_backoff=1000) def media_convert_actor( job_id: str, file_key: str, out_folder: str, output_format: str ) -> None: """Convert media file to specified format.""" from cpv3.modules.media.service import convert_to_mp4 _job_id = uuid.UUID(job_id) _update_job( _job_id, status="RUNNING", current_message="Starting", started_at=datetime.now(timezone.utc), ) try: if output_format.lower() != "mp4": raise ValueError(f"Unsupported format: {output_format}") storage = _get_storage_service() _update_job(_job_id, current_message="Converting", progress_pct=30.0) result = _run_async( convert_to_mp4(storage, file_key=file_key, out_folder=out_folder) ) _update_job( _job_id, status="DONE", current_message="Completed", progress_pct=100.0, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=datetime.now(timezone.utc), ) except Exception as e: logger.exception("media_convert_actor failed: %s", _job_id) _update_job( _job_id, status="FAILED", error_message=str(e), finished_at=datetime.now(timezone.utc), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def transcription_generate_actor( job_id: str, file_key: str, engine: str, language: str | None, model: str ) -> None: """Generate transcription from audio/video file.""" from cpv3.modules.transcription.service import ( transcribe_with_google_speech, transcribe_with_whisper, ) _job_id = uuid.UUID(job_id) _update_job( _job_id, status="RUNNING", current_message="Starting", started_at=datetime.now(timezone.utc), ) try: storage = _get_storage_service() _update_job( _job_id, current_message=f"Transcribing ({engine})", progress_pct=20.0 ) if engine == "whisper": document = _run_async( transcribe_with_whisper( storage, file_key=file_key, model_name=model, language=language ) ) elif engine == "google": language_codes = [language] if language else None document = _run_async( transcribe_with_google_speech( storage, file_key=file_key, language_codes=language_codes ) ) else: raise ValueError(f"Unknown engine: {engine}") _update_job( _job_id, status="DONE", current_message="Completed", progress_pct=100.0, output_data={"document": document.model_dump(mode="json")}, finished_at=datetime.now(timezone.utc), ) except Exception as e: logger.exception("transcription_generate_actor failed: %s", _job_id) _update_job( _job_id, status="FAILED", error_message=str(e), finished_at=datetime.now(timezone.utc), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def captions_generate_actor( job_id: str, video_s3_path: str, folder: str, transcription_json: dict ) -> None: """Generate captions on video.""" from cpv3.modules.captions.service import generate_captions from cpv3.modules.transcription.schemas import Document _job_id = uuid.UUID(job_id) _update_job( _job_id, status="RUNNING", current_message="Starting", started_at=datetime.now(timezone.utc), ) try: _update_job(_job_id, current_message="Rendering captions", progress_pct=30.0) document = Document.model_validate(transcription_json) output_path = _run_async( generate_captions( video_s3_path=video_s3_path, folder=folder, transcription=document ) ) _update_job( _job_id, status="DONE", current_message="Completed", progress_pct=100.0, output_data={"output_path": output_path}, finished_at=datetime.now(timezone.utc), ) except Exception as e: logger.exception("captions_generate_actor failed: %s", _job_id) _update_job( _job_id, status="FAILED", error_message=str(e), finished_at=datetime.now(timezone.utc), ) raise # --------------------------------------------------------------------------- # Task Service # --------------------------------------------------------------------------- class TaskService: """Service for submitting background tasks.""" def __init__(self, session: AsyncSession) -> None: self._session = session self._job_repo = JobRepository(session) self._webhook_repo = WebhookRepository(session) async def _create_job( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, ) -> tuple[Job, str]: """Create job and webhook, return job and webhook URL.""" settings = get_settings() broker_id = uuid.uuid4().hex job = await self._job_repo.create( requester=requester, data=JobCreate( broker_id=broker_id, project_id=project_id, input_data=input_data, job_type=job_type, ), ) webhook_url = f"{settings.webhook_base_url}/api/tasks/webhook/{job.id}/" await self._webhook_repo.create( requester=requester, data=WebhookCreate( project_id=project_id, event=f"task.{job_type.lower()}", url=webhook_url ), ) return job, webhook_url async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest ) -> TaskSubmitResponse: """Submit media probe task.""" job, webhook_url = await self._create_job( requester=requester, job_type="MEDIA_PROBE", project_id=request.project_id, input_data=request.model_dump(mode="json"), ) media_probe_actor.send(job_id=str(job.id), file_key=request.file_key) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status="PENDING" ) async def submit_silence_remove( self, *, requester: User, request: SilenceRemoveRequest ) -> TaskSubmitResponse: """Submit silence removal task.""" job, webhook_url = await self._create_job( requester=requester, job_type="SILENCE_REMOVE", project_id=request.project_id, input_data=request.model_dump(mode="json"), ) silence_remove_actor.send( job_id=str(job.id), file_key=request.file_key, out_folder=request.out_folder, min_silence_duration_ms=request.min_silence_duration_ms, silence_threshold_db=request.silence_threshold_db, padding_ms=request.padding_ms, ) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status="PENDING" ) async def submit_media_convert( self, *, requester: User, request: MediaConvertRequest ) -> TaskSubmitResponse: """Submit media conversion task.""" job, webhook_url = await self._create_job( requester=requester, job_type="MEDIA_CONVERT", project_id=request.project_id, input_data=request.model_dump(mode="json"), ) media_convert_actor.send( job_id=str(job.id), file_key=request.file_key, out_folder=request.out_folder, output_format=request.output_format, ) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status="PENDING" ) async def submit_transcription_generate( self, *, requester: User, request: TranscriptionGenerateRequest ) -> TaskSubmitResponse: """Submit transcription generation task.""" job, webhook_url = await self._create_job( requester=requester, job_type="TRANSCRIPTION_GENERATE", project_id=request.project_id, input_data=request.model_dump(mode="json"), ) transcription_generate_actor.send( job_id=str(job.id), file_key=request.file_key, engine=request.engine, language=request.language, model=request.model, ) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status="PENDING" ) async def submit_captions_generate( self, *, requester: User, request: CaptionsGenerateRequest ) -> TaskSubmitResponse: """Submit captions generation task.""" transcription_repo = TranscriptionRepository(self._session) transcription = await transcription_repo.get_by_id(request.transcription_id) if transcription is None: raise ValueError(f"Transcription {request.transcription_id} not found") job, webhook_url = await self._create_job( requester=requester, job_type="CAPTIONS_GENERATE", project_id=request.project_id, input_data=request.model_dump(mode="json"), ) captions_generate_actor.send( job_id=str(job.id), video_s3_path=request.video_s3_path, folder=request.folder, transcription_json=transcription.document, ) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status="PENDING" )