""" Task service for submitting and managing background tasks. """ from __future__ import annotations import asyncio import logging import uuid from datetime import datetime, timezone from typing import Any import dramatiq # type: ignore[import-untyped] from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] import httpx from sqlalchemy.ext.asyncio import AsyncSession from cpv3.infrastructure.deps import _get_storage_service from cpv3.infrastructure.settings import get_settings from cpv3.modules.jobs.models import Job from cpv3.modules.jobs.repository import JobEventRepository, JobRepository from cpv3.modules.jobs.schemas import ( JobCreate, JobEventCreate, JobStatusEnum, JobTypeEnum, JobUpdate, ) from cpv3.modules.tasks.schemas import ( CaptionsGenerateRequest, MediaConvertRequest, MediaProbeRequest, SilenceRemoveRequest, TaskSubmitResponse, TaskWebhookEvent, TranscriptionGenerateRequest, ) from cpv3.modules.transcription.repository import TranscriptionRepository from cpv3.modules.users.models import User from cpv3.modules.webhooks.repository import WebhookRepository from cpv3.modules.webhooks.schemas import WebhookCreate logger = logging.getLogger(__name__) JOB_STATUS_PENDING: JobStatusEnum = "PENDING" JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING" JOB_STATUS_DONE: JobStatusEnum = "DONE" JOB_STATUS_FAILED: JobStatusEnum = "FAILED" JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE" JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE" JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT" JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE" JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE" EVENT_TYPE_STATUS_PREFIX = "status_" EVENT_TYPE_PROGRESS = "progress" EVENT_TYPE_LOG = "log" EVENT_TYPE_OUTPUT = "output" EVENT_TYPE_ERROR = "error" TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/" WEBHOOK_TIMEOUT_SECONDS = 10 MESSAGE_STARTING = "Starting" MESSAGE_COMPLETED = "Completed" MESSAGE_PROBING_MEDIA = "Probing media" MESSAGE_PROCESSING = "Processing" MESSAGE_CONVERTING = "Converting" MESSAGE_RENDERING_CAPTIONS = "Rendering captions" PROGRESS_COMPLETE = 100.0 PROGRESS_MEDIA_PROBE = 50.0 PROGRESS_SILENCE_REMOVE = 30.0 PROGRESS_MEDIA_CONVERT = 30.0 PROGRESS_TRANSCRIPTION = 20.0 PROGRESS_CAPTIONS = 30.0 # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- _settings = get_settings() _redis_broker = RedisBroker(url=_settings.redis_url) dramatiq.set_broker(_redis_broker) # --------------------------------------------------------------------------- # Webhook helpers for Dramatiq workers # --------------------------------------------------------------------------- def _utc_now() -> datetime: """Return current UTC time.""" return datetime.now(timezone.utc) def _build_webhook_url(job_id: uuid.UUID) -> str: """Build the internal webhook URL for task updates.""" settings = get_settings() base_url = settings.webhook_base_url.rstrip("/") return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}" def _build_webhook_event_name(job_type: JobTypeEnum) -> str: """Build webhook event name for a job type.""" return f"task.{job_type.lower()}" def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None: """Send a task webhook event to the API.""" payload = event.model_dump(mode="json", exclude_none=True) try: response = httpx.post( webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS ) response.raise_for_status() except Exception: logger.exception("Failed to send task webhook event to %s", webhook_url) raise def _derive_event_type(event: TaskWebhookEvent) -> str: """Derive a job event type from a webhook event payload.""" if event.status is not None: return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}" if event.error_message is not None: return EVENT_TYPE_ERROR if event.progress_pct is not None: return EVENT_TYPE_PROGRESS if event.current_message is not None: return EVENT_TYPE_LOG if event.output_data is not None: return EVENT_TYPE_OUTPUT return EVENT_TYPE_LOG def _run_async(coro: Any) -> Any: """Run async function in new event loop (for sync Dramatiq actors).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() # --------------------------------------------------------------------------- # Dramatiq actors # --------------------------------------------------------------------------- @dramatiq.actor(max_retries=3, min_backoff=1000) def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: """Probe media file to extract metadata.""" from cpv3.modules.media.service import probe_media job_uuid = uuid.UUID(job_id) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_PROBING_MEDIA, progress_pct=PROGRESS_MEDIA_PROBE, ), ) result = _run_async(probe_media(storage, file_key=file_key)) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data=result.model_dump(mode="json"), finished_at=_utc_now(), ), ) except Exception as exc: logger.exception("media_probe_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) raise @dramatiq.actor(max_retries=3, min_backoff=1000) def silence_remove_actor( job_id: str, webhook_url: str, file_key: str, out_folder: str, min_silence_duration_ms: int, silence_threshold_db: int, padding_ms: int, ) -> None: """Remove silence from media file.""" from cpv3.modules.media.service import remove_silence job_uuid = uuid.UUID(job_id) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_PROCESSING, progress_pct=PROGRESS_SILENCE_REMOVE, ), ) result = _run_async( remove_silence( storage, file_key=file_key, out_folder=out_folder, min_silence_duration_ms=min_silence_duration_ms, silence_threshold_db=silence_threshold_db, padding_ms=padding_ms, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=_utc_now(), ), ) except Exception as exc: logger.exception("silence_remove_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) raise @dramatiq.actor(max_retries=3, min_backoff=1000) def media_convert_actor( job_id: str, webhook_url: str, file_key: str, out_folder: str, output_format: str, ) -> None: """Convert media file to specified format.""" from cpv3.modules.media.service import convert_to_mp4 job_uuid = uuid.UUID(job_id) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: if output_format.lower() != "mp4": raise ValueError(f"Unsupported format: {output_format}") storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_CONVERTING, progress_pct=PROGRESS_MEDIA_CONVERT, ), ) result = _run_async( convert_to_mp4(storage, file_key=file_key, out_folder=out_folder) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=_utc_now(), ), ) except Exception as exc: logger.exception("media_convert_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def transcription_generate_actor( job_id: str, webhook_url: str, file_key: str, engine: str, language: str | None, model: str, ) -> None: """Generate transcription from audio/video file.""" from cpv3.modules.transcription.service import ( transcribe_with_google_speech, transcribe_with_whisper, ) job_uuid = uuid.UUID(job_id) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=f"Transcribing ({engine})", progress_pct=PROGRESS_TRANSCRIPTION, ), ) if engine == "whisper": document = _run_async( transcribe_with_whisper( storage, file_key=file_key, model_name=model, language=language ) ) elif engine == "google": language_codes = [language] if language else None document = _run_async( transcribe_with_google_speech( storage, file_key=file_key, language_codes=language_codes ) ) else: raise ValueError(f"Unknown engine: {engine}") _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={"document": document.model_dump(mode="json")}, finished_at=_utc_now(), ), ) except Exception as exc: logger.exception("transcription_generate_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def captions_generate_actor( job_id: str, webhook_url: str, video_s3_path: str, folder: str, transcription_json: dict, ) -> None: """Generate captions on video.""" from cpv3.modules.captions.service import generate_captions from cpv3.modules.transcription.schemas import Document job_uuid = uuid.UUID(job_id) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_RENDERING_CAPTIONS, progress_pct=PROGRESS_CAPTIONS, ), ) document = Document.model_validate(transcription_json) output_path = _run_async( generate_captions( video_s3_path=video_s3_path, folder=folder, transcription=document ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={"output_path": output_path}, finished_at=_utc_now(), ), ) except Exception as exc: logger.exception("captions_generate_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) raise # --------------------------------------------------------------------------- # Task Service # --------------------------------------------------------------------------- class TaskService: """Service for submitting background tasks and recording webhook updates.""" def __init__(self, session: AsyncSession) -> None: self._session = session self._job_repo = JobRepository(session) self._event_repo = JobEventRepository(session) self._webhook_repo = WebhookRepository(session) async def _create_job_and_webhook( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, ) -> tuple[Job, str]: """Create job and webhook, return job and webhook URL.""" broker_id = uuid.uuid4().hex job = await self._job_repo.create( requester=requester, data=JobCreate( broker_id=broker_id, project_id=project_id, input_data=input_data, job_type=job_type, ), ) webhook_url = _build_webhook_url(job.id) await self._webhook_repo.create( requester=requester, data=WebhookCreate( project_id=project_id, event=_build_webhook_event_name(job_type), url=webhook_url, ), ) return job, webhook_url async def _submit_task( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, actor: Any, actor_kwargs: dict[str, Any], ) -> TaskSubmitResponse: job, webhook_url = await self._create_job_and_webhook( requester=requester, job_type=job_type, project_id=project_id, input_data=input_data, ) actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status=JOB_STATUS_PENDING, ) async def record_webhook_event( self, *, job_id: uuid.UUID, event: TaskWebhookEvent ) -> Job: """Apply a webhook event to the job and store a job event record.""" job = await self._job_repo.get_by_id(job_id) if job is None: raise ValueError(f"Job {job_id} not found") job_update = JobUpdate( status=event.status, project_pct=event.progress_pct, current_message=event.current_message, error_message=event.error_message, output_data=event.output_data, started_at=event.started_at, finished_at=event.finished_at, ) job = await self._job_repo.update(job, job_update) event_type = _derive_event_type(event) payload = event.model_dump(mode="json", exclude_none=True) await self._event_repo.create( JobEventCreate(job_id=job.id, event_type=event_type, payload=payload) ) return job async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest ) -> TaskSubmitResponse: """Submit media probe task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_MEDIA_PROBE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=media_probe_actor, actor_kwargs={"file_key": request.file_key}, ) async def submit_silence_remove( self, *, requester: User, request: SilenceRemoveRequest ) -> TaskSubmitResponse: """Submit silence removal task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_SILENCE_REMOVE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=silence_remove_actor, actor_kwargs={ "file_key": request.file_key, "out_folder": request.out_folder, "min_silence_duration_ms": request.min_silence_duration_ms, "silence_threshold_db": request.silence_threshold_db, "padding_ms": request.padding_ms, }, ) async def submit_media_convert( self, *, requester: User, request: MediaConvertRequest ) -> TaskSubmitResponse: """Submit media conversion task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_MEDIA_CONVERT, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=media_convert_actor, actor_kwargs={ "file_key": request.file_key, "out_folder": request.out_folder, "output_format": request.output_format, }, ) async def submit_transcription_generate( self, *, requester: User, request: TranscriptionGenerateRequest ) -> TaskSubmitResponse: """Submit transcription generation task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_TRANSCRIPTION_GENERATE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=transcription_generate_actor, actor_kwargs={ "file_key": request.file_key, "engine": request.engine, "language": request.language, "model": request.model, }, ) async def submit_captions_generate( self, *, requester: User, request: CaptionsGenerateRequest ) -> TaskSubmitResponse: """Submit captions generation task.""" transcription_repo = TranscriptionRepository(self._session) transcription = await transcription_repo.get_by_id(request.transcription_id) if transcription is None: raise ValueError(f"Transcription {request.transcription_id} not found") return await self._submit_task( requester=requester, job_type=JOB_TYPE_CAPTIONS_GENERATE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=captions_generate_actor, actor_kwargs={ "video_s3_path": request.video_s3_path, "folder": request.folder, "transcription_json": transcription.document, }, )