diff --git a/cpv3/modules/tasks/router.py b/cpv3/modules/tasks/router.py index 9a95928..3b5249c 100644 --- a/cpv3/modules/tasks/router.py +++ b/cpv3/modules/tasks/router.py @@ -7,7 +7,7 @@ from __future__ import annotations import uuid from typing import cast -from fastapi import APIRouter, Depends, HTTPException, Request, status +from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from cpv3.db.session import get_db @@ -22,6 +22,7 @@ from cpv3.modules.tasks.schemas import ( TaskStatusResponse, TaskSubmitResponse, TaskTypeEnum, + TaskWebhookEvent, TranscriptionGenerateRequest, ) from cpv3.modules.tasks.service import TaskService @@ -146,23 +147,16 @@ async def get_task_status( @router.post("/webhook/{job_id}/", include_in_schema=False) async def task_webhook_callback( job_id: uuid.UUID, - request: Request, + body: TaskWebhookEvent, db: AsyncSession = Depends(get_db), ) -> dict[str, str]: """Internal webhook endpoint for task status updates.""" + service = TaskService(db) try: - await request.json() - except Exception: + await service.record_webhook_event(job_id=job_id, event=body) + except ValueError as exc: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON payload" - ) - - job_service = JobService(db) - job = await job_service.get_job(job_id) - - if job is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail="Job not found" - ) + status_code=status.HTTP_404_NOT_FOUND, detail=str(exc) + ) from exc return {"status": "received", "job_id": str(job_id)} diff --git a/cpv3/modules/tasks/schemas.py b/cpv3/modules/tasks/schemas.py index 953359f..c886414 100644 --- a/cpv3/modules/tasks/schemas.py +++ b/cpv3/modules/tasks/schemas.py @@ -8,20 +8,14 @@ from datetime import datetime from typing import Literal from uuid import UUID -from pydantic import Field +from pydantic import Field, model_validator from cpv3.common.schemas import Schema +from cpv3.modules.jobs.schemas import JobStatusEnum, JobTypeEnum -TaskTypeEnum = Literal[ - "MEDIA_PROBE", - "SILENCE_REMOVE", - "MEDIA_CONVERT", - "TRANSCRIPTION_GENERATE", - "CAPTIONS_GENERATE", -] - -TaskStatusEnum = Literal["PENDING", "RUNNING", "FAILED", "CANCELLED", "DONE"] +TaskTypeEnum = JobTypeEnum +TaskStatusEnum = JobStatusEnum # --- Request schemas --- @@ -104,3 +98,33 @@ class TaskStatusResponse(Schema): output_data: dict | None = None started_at: datetime | None = None finished_at: datetime | None = None + + +class TaskWebhookEvent(Schema): + """Webhook event payload for task updates.""" + + status: TaskStatusEnum | None = None + progress_pct: float | None = None + current_message: str | None = None + error_message: str | None = None + output_data: dict | None = None + started_at: datetime | None = None + finished_at: datetime | None = None + + @model_validator(mode="after") + def validate_has_update(self) -> "TaskWebhookEvent": + has_update = any( + value is not None + for value in ( + self.status, + self.progress_pct, + self.current_message, + self.error_message, + self.output_data, + self.started_at, + self.finished_at, + ) + ) + if not has_update: + raise ValueError("Webhook event must include at least one update field.") + return self diff --git a/cpv3/modules/tasks/service.py b/cpv3/modules/tasks/service.py index 0fc5996..480a678 100644 --- a/cpv3/modules/tasks/service.py +++ b/cpv3/modules/tasks/service.py @@ -12,22 +12,27 @@ from typing import Any import dramatiq # type: ignore[import-untyped] from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] -from pydantic import BaseModel -from sqlalchemy import create_engine, select +import httpx from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import Session, sessionmaker from cpv3.infrastructure.deps import _get_storage_service from cpv3.infrastructure.settings import get_settings -from cpv3.modules.jobs.models import Job, JobEvent -from cpv3.modules.jobs.repository import JobRepository -from cpv3.modules.jobs.schemas import JobCreate, JobTypeEnum +from cpv3.modules.jobs.models import Job +from cpv3.modules.jobs.repository import JobEventRepository, JobRepository +from cpv3.modules.jobs.schemas import ( + JobCreate, + JobEventCreate, + JobStatusEnum, + JobTypeEnum, + JobUpdate, +) from cpv3.modules.tasks.schemas import ( CaptionsGenerateRequest, MediaConvertRequest, MediaProbeRequest, SilenceRemoveRequest, TaskSubmitResponse, + TaskWebhookEvent, TranscriptionGenerateRequest, ) from cpv3.modules.transcription.repository import TranscriptionRepository @@ -37,6 +42,40 @@ from cpv3.modules.webhooks.schemas import WebhookCreate logger = logging.getLogger(__name__) +JOB_STATUS_PENDING: JobStatusEnum = "PENDING" +JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING" +JOB_STATUS_DONE: JobStatusEnum = "DONE" +JOB_STATUS_FAILED: JobStatusEnum = "FAILED" + +JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE" +JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE" +JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT" +JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE" +JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE" + +EVENT_TYPE_STATUS_PREFIX = "status_" +EVENT_TYPE_PROGRESS = "progress" +EVENT_TYPE_LOG = "log" +EVENT_TYPE_OUTPUT = "output" +EVENT_TYPE_ERROR = "error" + +TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/" +WEBHOOK_TIMEOUT_SECONDS = 10 + +MESSAGE_STARTING = "Starting" +MESSAGE_COMPLETED = "Completed" +MESSAGE_PROBING_MEDIA = "Probing media" +MESSAGE_PROCESSING = "Processing" +MESSAGE_CONVERTING = "Converting" +MESSAGE_RENDERING_CAPTIONS = "Rendering captions" + +PROGRESS_COMPLETE = 100.0 +PROGRESS_MEDIA_PROBE = 50.0 +PROGRESS_SILENCE_REMOVE = 30.0 +PROGRESS_MEDIA_CONVERT = 30.0 +PROGRESS_TRANSCRIPTION = 20.0 +PROGRESS_CAPTIONS = 30.0 + # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- @@ -47,62 +86,53 @@ dramatiq.set_broker(_redis_broker) # --------------------------------------------------------------------------- -# Sync DB helpers for Dramatiq workers +# Webhook helpers for Dramatiq workers # --------------------------------------------------------------------------- -def _get_sync_session() -> Session: - """Create sync DB session for worker tasks.""" +def _utc_now() -> datetime: + """Return current UTC time.""" + return datetime.now(timezone.utc) + + +def _build_webhook_url(job_id: uuid.UUID) -> str: + """Build the internal webhook URL for task updates.""" settings = get_settings() - sync_url = settings.get_database_url().replace( - "postgresql+asyncpg://", "postgresql://" - ) - engine = create_engine(sync_url, pool_pre_ping=True) - return sessionmaker(bind=engine, expire_on_commit=False)() + base_url = settings.webhook_base_url.rstrip("/") + return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}" -def _update_job( - job_id: uuid.UUID, - *, - status: str | None = None, - current_message: str | None = None, - progress_pct: float | None = None, - error_message: str | None = None, - output_data: dict | None = None, - started_at: datetime | None = None, - finished_at: datetime | None = None, -) -> Job | None: - """Update job in database (sync, for workers).""" - with _get_sync_session() as session: - job = session.execute(select(Job).where(Job.id == job_id)).scalar_one_or_none() - if job is None: - return None +def _build_webhook_event_name(job_type: JobTypeEnum) -> str: + """Build webhook event name for a job type.""" + return f"task.{job_type.lower()}" - if status is not None: - job.status = status - if current_message is not None: - job.current_message = current_message - if progress_pct is not None: - job.project_pct = progress_pct - if error_message is not None: - job.error_message = error_message - if output_data is not None: - job.output_data = output_data - if started_at is not None: - job.started_at = started_at - if finished_at is not None: - job.finished_at = finished_at - # Create event - event = JobEvent( - job_id=job_id, - event_type=f"status_{status}" if status else "progress", - payload={"status": status or job.status, "message": current_message}, +def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None: + """Send a task webhook event to the API.""" + payload = event.model_dump(mode="json", exclude_none=True) + try: + response = httpx.post( + webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS ) - session.add(event) - session.commit() - session.refresh(job) - return job + response.raise_for_status() + except Exception: + logger.exception("Failed to send task webhook event to %s", webhook_url) + raise + + +def _derive_event_type(event: TaskWebhookEvent) -> str: + """Derive a job event type from a webhook event payload.""" + if event.status is not None: + return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}" + if event.error_message is not None: + return EVENT_TYPE_ERROR + if event.progress_pct is not None: + return EVENT_TYPE_PROGRESS + if event.current_message is not None: + return EVENT_TYPE_LOG + if event.output_data is not None: + return EVENT_TYPE_OUTPUT + return EVENT_TYPE_LOG def _run_async(coro: Any) -> Any: @@ -121,37 +151,49 @@ def _run_async(coro: Any) -> Any: @dramatiq.actor(max_retries=3, min_backoff=1000) -def media_probe_actor(job_id: str, file_key: str) -> None: +def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: """Probe media file to extract metadata.""" from cpv3.modules.media.service import probe_media - _job_id = uuid.UUID(job_id) - _update_job( - _job_id, - status="RUNNING", - current_message="Starting", - started_at=datetime.now(timezone.utc), + job_uuid = uuid.UUID(job_id) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_RUNNING, + current_message=MESSAGE_STARTING, + started_at=_utc_now(), + ), ) try: storage = _get_storage_service() - _update_job(_job_id, current_message="Probing media", progress_pct=50.0) - result = _run_async(probe_media(storage, file_key=file_key)) - _update_job( - _job_id, - status="DONE", - current_message="Completed", - progress_pct=100.0, - output_data=result.model_dump(mode="json"), - finished_at=datetime.now(timezone.utc), + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + current_message=MESSAGE_PROBING_MEDIA, + progress_pct=PROGRESS_MEDIA_PROBE, + ), ) - except Exception as e: - logger.exception("media_probe_actor failed: %s", _job_id) - _update_job( - _job_id, - status="FAILED", - error_message=str(e), - finished_at=datetime.now(timezone.utc), + result = _run_async(probe_media(storage, file_key=file_key)) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + current_message=MESSAGE_COMPLETED, + progress_pct=PROGRESS_COMPLETE, + output_data=result.model_dump(mode="json"), + finished_at=_utc_now(), + ), + ) + except Exception as exc: + logger.exception("media_probe_actor failed: %s", job_uuid) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + error_message=str(exc), + finished_at=_utc_now(), + ), ) raise @@ -159,6 +201,7 @@ def media_probe_actor(job_id: str, file_key: str) -> None: @dramatiq.actor(max_retries=3, min_backoff=1000) def silence_remove_actor( job_id: str, + webhook_url: str, file_key: str, out_folder: str, min_silence_duration_ms: int, @@ -168,17 +211,25 @@ def silence_remove_actor( """Remove silence from media file.""" from cpv3.modules.media.service import remove_silence - _job_id = uuid.UUID(job_id) - _update_job( - _job_id, - status="RUNNING", - current_message="Starting", - started_at=datetime.now(timezone.utc), + job_uuid = uuid.UUID(job_id) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_RUNNING, + current_message=MESSAGE_STARTING, + started_at=_utc_now(), + ), ) try: storage = _get_storage_service() - _update_job(_job_id, current_message="Processing", progress_pct=30.0) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + current_message=MESSAGE_PROCESSING, + progress_pct=PROGRESS_SILENCE_REMOVE, + ), + ) result = _run_async( remove_silence( storage, @@ -189,42 +240,52 @@ def silence_remove_actor( padding_ms=padding_ms, ) ) - _update_job( - _job_id, - status="DONE", - current_message="Completed", - progress_pct=100.0, - output_data={ - "file_path": result.file_path, - "file_url": result.file_url, - "file_size": result.file_size, - }, - finished_at=datetime.now(timezone.utc), + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + current_message=MESSAGE_COMPLETED, + progress_pct=PROGRESS_COMPLETE, + output_data={ + "file_path": result.file_path, + "file_url": result.file_url, + "file_size": result.file_size, + }, + finished_at=_utc_now(), + ), ) - except Exception as e: - logger.exception("silence_remove_actor failed: %s", _job_id) - _update_job( - _job_id, - status="FAILED", - error_message=str(e), - finished_at=datetime.now(timezone.utc), + except Exception as exc: + logger.exception("silence_remove_actor failed: %s", job_uuid) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + error_message=str(exc), + finished_at=_utc_now(), + ), ) raise @dramatiq.actor(max_retries=3, min_backoff=1000) def media_convert_actor( - job_id: str, file_key: str, out_folder: str, output_format: str + job_id: str, + webhook_url: str, + file_key: str, + out_folder: str, + output_format: str, ) -> None: """Convert media file to specified format.""" from cpv3.modules.media.service import convert_to_mp4 - _job_id = uuid.UUID(job_id) - _update_job( - _job_id, - status="RUNNING", - current_message="Starting", - started_at=datetime.now(timezone.utc), + job_uuid = uuid.UUID(job_id) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_RUNNING, + current_message=MESSAGE_STARTING, + started_at=_utc_now(), + ), ) try: @@ -232,36 +293,51 @@ def media_convert_actor( raise ValueError(f"Unsupported format: {output_format}") storage = _get_storage_service() - _update_job(_job_id, current_message="Converting", progress_pct=30.0) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + current_message=MESSAGE_CONVERTING, + progress_pct=PROGRESS_MEDIA_CONVERT, + ), + ) result = _run_async( convert_to_mp4(storage, file_key=file_key, out_folder=out_folder) ) - _update_job( - _job_id, - status="DONE", - current_message="Completed", - progress_pct=100.0, - output_data={ - "file_path": result.file_path, - "file_url": result.file_url, - "file_size": result.file_size, - }, - finished_at=datetime.now(timezone.utc), + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + current_message=MESSAGE_COMPLETED, + progress_pct=PROGRESS_COMPLETE, + output_data={ + "file_path": result.file_path, + "file_url": result.file_url, + "file_size": result.file_size, + }, + finished_at=_utc_now(), + ), ) - except Exception as e: - logger.exception("media_convert_actor failed: %s", _job_id) - _update_job( - _job_id, - status="FAILED", - error_message=str(e), - finished_at=datetime.now(timezone.utc), + except Exception as exc: + logger.exception("media_convert_actor failed: %s", job_uuid) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + error_message=str(exc), + finished_at=_utc_now(), + ), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def transcription_generate_actor( - job_id: str, file_key: str, engine: str, language: str | None, model: str + job_id: str, + webhook_url: str, + file_key: str, + engine: str, + language: str | None, + model: str, ) -> None: """Generate transcription from audio/video file.""" from cpv3.modules.transcription.service import ( @@ -269,18 +345,24 @@ def transcription_generate_actor( transcribe_with_whisper, ) - _job_id = uuid.UUID(job_id) - _update_job( - _job_id, - status="RUNNING", - current_message="Starting", - started_at=datetime.now(timezone.utc), + job_uuid = uuid.UUID(job_id) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_RUNNING, + current_message=MESSAGE_STARTING, + started_at=_utc_now(), + ), ) try: storage = _get_storage_service() - _update_job( - _job_id, current_message=f"Transcribing ({engine})", progress_pct=20.0 + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + current_message=f"Transcribing ({engine})", + progress_pct=PROGRESS_TRANSCRIPTION, + ), ) if engine == "whisper": @@ -299,64 +381,84 @@ def transcription_generate_actor( else: raise ValueError(f"Unknown engine: {engine}") - _update_job( - _job_id, - status="DONE", - current_message="Completed", - progress_pct=100.0, - output_data={"document": document.model_dump(mode="json")}, - finished_at=datetime.now(timezone.utc), + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + current_message=MESSAGE_COMPLETED, + progress_pct=PROGRESS_COMPLETE, + output_data={"document": document.model_dump(mode="json")}, + finished_at=_utc_now(), + ), ) - except Exception as e: - logger.exception("transcription_generate_actor failed: %s", _job_id) - _update_job( - _job_id, - status="FAILED", - error_message=str(e), - finished_at=datetime.now(timezone.utc), + except Exception as exc: + logger.exception("transcription_generate_actor failed: %s", job_uuid) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + error_message=str(exc), + finished_at=_utc_now(), + ), ) raise @dramatiq.actor(max_retries=2, min_backoff=2000) def captions_generate_actor( - job_id: str, video_s3_path: str, folder: str, transcription_json: dict + job_id: str, + webhook_url: str, + video_s3_path: str, + folder: str, + transcription_json: dict, ) -> None: """Generate captions on video.""" from cpv3.modules.captions.service import generate_captions from cpv3.modules.transcription.schemas import Document - _job_id = uuid.UUID(job_id) - _update_job( - _job_id, - status="RUNNING", - current_message="Starting", - started_at=datetime.now(timezone.utc), + job_uuid = uuid.UUID(job_id) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_RUNNING, + current_message=MESSAGE_STARTING, + started_at=_utc_now(), + ), ) try: - _update_job(_job_id, current_message="Rendering captions", progress_pct=30.0) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + current_message=MESSAGE_RENDERING_CAPTIONS, + progress_pct=PROGRESS_CAPTIONS, + ), + ) document = Document.model_validate(transcription_json) output_path = _run_async( generate_captions( video_s3_path=video_s3_path, folder=folder, transcription=document ) ) - _update_job( - _job_id, - status="DONE", - current_message="Completed", - progress_pct=100.0, - output_data={"output_path": output_path}, - finished_at=datetime.now(timezone.utc), + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + current_message=MESSAGE_COMPLETED, + progress_pct=PROGRESS_COMPLETE, + output_data={"output_path": output_path}, + finished_at=_utc_now(), + ), ) - except Exception as e: - logger.exception("captions_generate_actor failed: %s", _job_id) - _update_job( - _job_id, - status="FAILED", - error_message=str(e), - finished_at=datetime.now(timezone.utc), + except Exception as exc: + logger.exception("captions_generate_actor failed: %s", job_uuid) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + error_message=str(exc), + finished_at=_utc_now(), + ), ) raise @@ -367,14 +469,15 @@ def captions_generate_actor( class TaskService: - """Service for submitting background tasks.""" + """Service for submitting background tasks and recording webhook updates.""" def __init__(self, session: AsyncSession) -> None: self._session = session self._job_repo = JobRepository(session) + self._event_repo = JobEventRepository(session) self._webhook_repo = WebhookRepository(session) - async def _create_job( + async def _create_job_and_webhook( self, *, requester: User, @@ -383,7 +486,6 @@ class TaskService: input_data: dict, ) -> tuple[Job, str]: """Create job and webhook, return job and webhook URL.""" - settings = get_settings() broker_id = uuid.uuid4().hex job = await self._job_repo.create( @@ -396,92 +498,132 @@ class TaskService: ), ) - webhook_url = f"{settings.webhook_base_url}/api/tasks/webhook/{job.id}/" + webhook_url = _build_webhook_url(job.id) await self._webhook_repo.create( requester=requester, data=WebhookCreate( - project_id=project_id, event=f"task.{job_type.lower()}", url=webhook_url + project_id=project_id, + event=_build_webhook_event_name(job_type), + url=webhook_url, ), ) return job, webhook_url + async def _submit_task( + self, + *, + requester: User, + job_type: JobTypeEnum, + project_id: uuid.UUID | None, + input_data: dict, + actor: Any, + actor_kwargs: dict[str, Any], + ) -> TaskSubmitResponse: + job, webhook_url = await self._create_job_and_webhook( + requester=requester, + job_type=job_type, + project_id=project_id, + input_data=input_data, + ) + actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs) + return TaskSubmitResponse( + job_id=job.id, + webhook_url=webhook_url, + status=JOB_STATUS_PENDING, + ) + + async def record_webhook_event( + self, *, job_id: uuid.UUID, event: TaskWebhookEvent + ) -> Job: + """Apply a webhook event to the job and store a job event record.""" + job = await self._job_repo.get_by_id(job_id) + if job is None: + raise ValueError(f"Job {job_id} not found") + + job_update = JobUpdate( + status=event.status, + project_pct=event.progress_pct, + current_message=event.current_message, + error_message=event.error_message, + output_data=event.output_data, + started_at=event.started_at, + finished_at=event.finished_at, + ) + job = await self._job_repo.update(job, job_update) + + event_type = _derive_event_type(event) + payload = event.model_dump(mode="json", exclude_none=True) + await self._event_repo.create( + JobEventCreate(job_id=job.id, event_type=event_type, payload=payload) + ) + return job + async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest ) -> TaskSubmitResponse: """Submit media probe task.""" - job, webhook_url = await self._create_job( + return await self._submit_task( requester=requester, - job_type="MEDIA_PROBE", + job_type=JOB_TYPE_MEDIA_PROBE, project_id=request.project_id, input_data=request.model_dump(mode="json"), - ) - media_probe_actor.send(job_id=str(job.id), file_key=request.file_key) - return TaskSubmitResponse( - job_id=job.id, webhook_url=webhook_url, status="PENDING" + actor=media_probe_actor, + actor_kwargs={"file_key": request.file_key}, ) async def submit_silence_remove( self, *, requester: User, request: SilenceRemoveRequest ) -> TaskSubmitResponse: """Submit silence removal task.""" - job, webhook_url = await self._create_job( + return await self._submit_task( requester=requester, - job_type="SILENCE_REMOVE", + job_type=JOB_TYPE_SILENCE_REMOVE, project_id=request.project_id, input_data=request.model_dump(mode="json"), - ) - silence_remove_actor.send( - job_id=str(job.id), - file_key=request.file_key, - out_folder=request.out_folder, - min_silence_duration_ms=request.min_silence_duration_ms, - silence_threshold_db=request.silence_threshold_db, - padding_ms=request.padding_ms, - ) - return TaskSubmitResponse( - job_id=job.id, webhook_url=webhook_url, status="PENDING" + actor=silence_remove_actor, + actor_kwargs={ + "file_key": request.file_key, + "out_folder": request.out_folder, + "min_silence_duration_ms": request.min_silence_duration_ms, + "silence_threshold_db": request.silence_threshold_db, + "padding_ms": request.padding_ms, + }, ) async def submit_media_convert( self, *, requester: User, request: MediaConvertRequest ) -> TaskSubmitResponse: """Submit media conversion task.""" - job, webhook_url = await self._create_job( + return await self._submit_task( requester=requester, - job_type="MEDIA_CONVERT", + job_type=JOB_TYPE_MEDIA_CONVERT, project_id=request.project_id, input_data=request.model_dump(mode="json"), - ) - media_convert_actor.send( - job_id=str(job.id), - file_key=request.file_key, - out_folder=request.out_folder, - output_format=request.output_format, - ) - return TaskSubmitResponse( - job_id=job.id, webhook_url=webhook_url, status="PENDING" + actor=media_convert_actor, + actor_kwargs={ + "file_key": request.file_key, + "out_folder": request.out_folder, + "output_format": request.output_format, + }, ) async def submit_transcription_generate( self, *, requester: User, request: TranscriptionGenerateRequest ) -> TaskSubmitResponse: """Submit transcription generation task.""" - job, webhook_url = await self._create_job( + return await self._submit_task( requester=requester, - job_type="TRANSCRIPTION_GENERATE", + job_type=JOB_TYPE_TRANSCRIPTION_GENERATE, project_id=request.project_id, input_data=request.model_dump(mode="json"), - ) - transcription_generate_actor.send( - job_id=str(job.id), - file_key=request.file_key, - engine=request.engine, - language=request.language, - model=request.model, - ) - return TaskSubmitResponse( - job_id=job.id, webhook_url=webhook_url, status="PENDING" + actor=transcription_generate_actor, + actor_kwargs={ + "file_key": request.file_key, + "engine": request.engine, + "language": request.language, + "model": request.model, + }, ) async def submit_captions_generate( @@ -493,18 +635,15 @@ class TaskService: if transcription is None: raise ValueError(f"Transcription {request.transcription_id} not found") - job, webhook_url = await self._create_job( + return await self._submit_task( requester=requester, - job_type="CAPTIONS_GENERATE", + job_type=JOB_TYPE_CAPTIONS_GENERATE, project_id=request.project_id, input_data=request.model_dump(mode="json"), - ) - captions_generate_actor.send( - job_id=str(job.id), - video_s3_path=request.video_s3_path, - folder=request.folder, - transcription_json=transcription.document, - ) - return TaskSubmitResponse( - job_id=job.id, webhook_url=webhook_url, status="PENDING" + actor=captions_generate_actor, + actor_kwargs={ + "video_s3_path": request.video_s3_path, + "folder": request.folder, + "transcription_json": transcription.document, + }, )