""" Task service for submitting and managing background tasks. """ from __future__ import annotations import asyncio import io import json import logging import time import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any import dramatiq # type: ignore[import-untyped] import httpx from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] from sqlalchemy.ext.asyncio import AsyncSession from cpv3.infrastructure.deps import _get_storage_service from cpv3.infrastructure.settings import get_settings from cpv3.infrastructure.storage.utils import get_user_folder from cpv3.modules.files.repository import FileRepository from cpv3.modules.files.schemas import FileCreate from cpv3.modules.jobs.models import Job from cpv3.modules.jobs.repository import JobEventRepository, JobRepository from cpv3.modules.jobs.schemas import ( JobCreate, JobEventCreate, JobStatusEnum, JobTypeEnum, JobUpdate, ) from cpv3.modules.media.repository import ArtifactRepository from cpv3.modules.media.schemas import ArtifactMediaFileCreate from cpv3.modules.tasks.schemas import ( CaptionsGenerateRequest, FrameExtractRequest, MediaConvertRequest, MediaProbeRequest, SilenceApplyRequest, SilenceDetectRequest, SilenceRemoveRequest, TaskSubmitResponse, TaskWebhookEvent, TranscriptionGenerateRequest, ) from cpv3.modules.notifications.service import NotificationService from cpv3.modules.transcription.repository import TranscriptionRepository from cpv3.modules.transcription.schemas import TranscriptionCreate from cpv3.modules.users.models import User from cpv3.modules.users.repository import UserRepository from cpv3.modules.webhooks.repository import WebhookRepository from cpv3.modules.webhooks.schemas import WebhookCreate logger = logging.getLogger(__name__) JOB_STATUS_PENDING: JobStatusEnum = "PENDING" JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING" JOB_STATUS_DONE: JobStatusEnum = "DONE" JOB_STATUS_FAILED: JobStatusEnum = "FAILED" JOB_STATUS_CANCELLED: JobStatusEnum = "CANCELLED" JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE" JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE" JOB_TYPE_SILENCE_DETECT: JobTypeEnum = "SILENCE_DETECT" JOB_TYPE_SILENCE_APPLY: JobTypeEnum = "SILENCE_APPLY" JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT" JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE" JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE" JOB_TYPE_FRAME_EXTRACT: JobTypeEnum = "FRAME_EXTRACT" EVENT_TYPE_STATUS_PREFIX = "status_" EVENT_TYPE_PROGRESS = "progress" EVENT_TYPE_LOG = "log" EVENT_TYPE_OUTPUT = "output" EVENT_TYPE_ERROR = "error" TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/" WEBHOOK_TIMEOUT_SECONDS = 10 ERROR_NO_AUDIO_STREAM = "Файл не содержит аудиодорожки" ERROR_UNKNOWN_ENGINE = "Неизвестный движок транскрипции: {engine}" ENGINE_MAP: dict[str, str] = { "whisper": "LOCAL_WHISPER", "google": "GOOGLE_SPEECH_CLOUD", "salutespeech": "SALUTE_SPEECH", } MESSAGE_STARTING = "Запуск" MESSAGE_COMPLETED = "Завершено" MESSAGE_PROBING_MEDIA = "Анализ медиафайла" MESSAGE_PROCESSING = "Обработка" MESSAGE_CONVERTING = "Конвертация" MESSAGE_PREPARING_FILE = "Подготовка файла" MESSAGE_CONVERTING_VIDEO = "Конвертация видео" MESSAGE_UPLOADING_RESULT = "Загрузка результата" MESSAGE_SAVING_RESULT = "Сохранение результата" MESSAGE_RENDERING_CAPTIONS = "Рендеринг субтитров" MESSAGE_CANCELLED = "Отменено пользователем" MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров" MESSAGE_UPLOADING_FRAMES = "Загрузка кадров" MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров" PROGRESS_COMPLETE = 100.0 PROGRESS_MEDIA_PROBE = 50.0 PROGRESS_SILENCE_REMOVE = 30.0 PROGRESS_MEDIA_CONVERT = 30.0 PROGRESS_MEDIA_CONVERT_PREPARING = 5.0 PROGRESS_MEDIA_CONVERT_START = 10.0 PROGRESS_MEDIA_CONVERT_END = 95.0 PROGRESS_MEDIA_CONVERT_SAVING = 99.0 PROGRESS_SILENCE_APPLY_PREPARING = 5.0 PROGRESS_SILENCE_APPLY_START = 10.0 PROGRESS_SILENCE_APPLY_END = 95.0 PROGRESS_SILENCE_APPLY_SAVING = 99.0 PROGRESS_TRANSCRIPTION_START = 20.0 PROGRESS_TRANSCRIPTION_END = 95.0 PROGRESS_CAPTIONS = 30.0 PROGRESS_FRAME_EXTRACT_START = 10.0 PROGRESS_FRAME_EXTRACT_END = 95.0 PROGRESS_SILENCE_DETECT = 30.0 PROGRESS_SILENCE_APPLY = 30.0 MESSAGE_DETECTING_SILENCE = "Обнаружение тишины" MESSAGE_APPLYING_CUTS = "Применение вырезок" PROGRESS_THROTTLE_SECONDS = 3.0 PROGRESS_CONVERT_THROTTLE_SECONDS = 1.0 ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING) DRAMATIQ_BROKER_REF_SEPARATOR = ":" class JobCancelledError(RuntimeError): """Raised when a job was cancelled before completion.""" # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- _settings = get_settings() _redis_broker = RedisBroker(url=_settings.redis_url) dramatiq.set_broker(_redis_broker) # --------------------------------------------------------------------------- # Webhook helpers for Dramatiq workers # --------------------------------------------------------------------------- def _utc_now() -> datetime: """Return current UTC time.""" return datetime.now(timezone.utc) def _parse_frame_rate(rate_str: str) -> float | None: """Parse ffprobe frame rate string like '30/1' or '30000/1001'.""" try: if "/" in rate_str: num, den = rate_str.split("/") den_val = int(den) return round(int(num) / den_val, 3) if den_val else None return float(rate_str) except (ValueError, ZeroDivisionError): return None def _build_webhook_url(job_id: uuid.UUID) -> str: """Build the internal webhook URL for task updates.""" settings = get_settings() base_url = settings.webhook_base_url.rstrip("/") return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}" def _build_webhook_event_name(job_type: JobTypeEnum) -> str: """Build webhook event name for a job type.""" return f"task.{job_type.lower()}" def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None: """Send a task webhook event to the API.""" payload = event.model_dump(mode="json", exclude_none=True) try: response = httpx.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS) response.raise_for_status() except Exception: logger.exception("Failed to send task webhook event to %s", webhook_url) raise def _derive_event_type(event: TaskWebhookEvent) -> str: """Derive a job event type from a webhook event payload.""" if event.status is not None: return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}" if event.error_message is not None: return EVENT_TYPE_ERROR if event.progress_pct is not None: return EVENT_TYPE_PROGRESS if event.current_message is not None: return EVENT_TYPE_LOG if event.output_data is not None: return EVENT_TYPE_OUTPUT return EVENT_TYPE_LOG def _run_async(coro: Any) -> Any: """Run async function in new event loop (for sync Dramatiq actors).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() def _serialize_broker_reference(queue_name: str, redis_message_id: str) -> str: """Serialize queue name and Dramatiq redis message id into one field.""" return f"{queue_name}{DRAMATIQ_BROKER_REF_SEPARATOR}{redis_message_id}" def _parse_broker_reference(broker_id: str | None) -> tuple[str, str] | None: """Parse queue name and Dramatiq redis message id from stored broker_id.""" if not broker_id or DRAMATIQ_BROKER_REF_SEPARATOR not in broker_id: return None queue_name, redis_message_id = broker_id.split(DRAMATIQ_BROKER_REF_SEPARATOR, 1) if not queue_name or not redis_message_id: return None return queue_name, redis_message_id def _get_job_status_sync(job_id: uuid.UUID) -> JobStatusEnum | None: """Read current job status using a sync connection (safe for Dramatiq workers).""" import psycopg2 settings = get_settings() dsn = ( f"host={settings.postgres_host} port={settings.postgres_port} " f"dbname={settings.postgres_database} " f"user={settings.postgres_user} password={settings.postgres_password}" ) try: conn = psycopg2.connect(dsn) try: with conn.cursor() as cur: cur.execute("SELECT status FROM jobs WHERE id = %s", (str(job_id),)) row = cur.fetchone() return row[0] if row else None finally: conn.close() except Exception: logger.warning("Failed to check job status for %s", job_id, exc_info=True) return None def _raise_if_job_cancelled(job_id: uuid.UUID) -> None: """Stop worker execution when the job is already cancelled in the database.""" status = _get_job_status_sync(job_id) if status == JOB_STATUS_CANCELLED: raise JobCancelledError(MESSAGE_CANCELLED) # --------------------------------------------------------------------------- # Dramatiq actors # --------------------------------------------------------------------------- @dramatiq.actor(max_retries=0) def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: """Probe media file to extract metadata.""" from cpv3.modules.media.service import probe_media job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("media_probe_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_PROBING_MEDIA, progress_pct=PROGRESS_MEDIA_PROBE, ), ) result = _run_async(probe_media(storage, file_key=file_key)) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data=result.model_dump(mode="json"), finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("media_probe_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("media_probe_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def silence_remove_actor( job_id: str, webhook_url: str, file_key: str, out_folder: str, min_silence_duration_ms: int, silence_threshold_db: int, padding_ms: int, ) -> None: """Remove silence from media file.""" from cpv3.modules.media.service import remove_silence job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("silence_remove_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_PROCESSING, progress_pct=PROGRESS_SILENCE_REMOVE, ), ) result = _run_async( remove_silence( storage, file_key=file_key, out_folder=out_folder, min_silence_duration_ms=min_silence_duration_ms, silence_threshold_db=silence_threshold_db, padding_ms=padding_ms, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("silence_remove_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("silence_remove_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def silence_detect_actor( job_id: str, webhook_url: str, file_key: str, min_silence_duration_ms: int, silence_threshold_db: int, padding_ms: int, ) -> None: """Detect silent segments in media file.""" from cpv3.modules.media.service import detect_silence job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("silence_detect_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_DETECTING_SILENCE, progress_pct=PROGRESS_SILENCE_DETECT, ), ) result = _run_async( detect_silence( storage, file_key=file_key, min_silence_duration_ms=min_silence_duration_ms, silence_threshold_db=silence_threshold_db, padding_ms=padding_ms, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data=result, finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("silence_detect_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("silence_detect_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def silence_apply_actor( job_id: str, webhook_url: str, file_key: str, out_folder: str, cuts: list[dict], output_name: str | None, ) -> None: """Apply silence cuts to media file.""" from cpv3.modules.media.service import apply_silence_cuts job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("silence_apply_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_PREPARING_FILE, progress_pct=PROGRESS_SILENCE_APPLY_PREPARING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() last_report_time = 0.0 last_progress = PROGRESS_SILENCE_APPLY_PREPARING def _emit_silence_apply_progress(stage: str, pct: float | None) -> None: nonlocal last_report_time, last_progress if stage == "applying_cuts": raw_pct = min(max(pct or 0.0, 0.0), 100.0) if raw_pct >= 100.0: return mapped = PROGRESS_SILENCE_APPLY_START + (raw_pct / 100.0) * ( PROGRESS_SILENCE_APPLY_END - PROGRESS_SILENCE_APPLY_START ) message = MESSAGE_APPLYING_CUTS force = raw_pct == 0.0 elif stage == "uploading": mapped = PROGRESS_SILENCE_APPLY_END message = MESSAGE_UPLOADING_RESULT force = True else: return mapped = round(mapped, 1) now = time.monotonic() if not force: if mapped <= last_progress: return if mapped - last_progress < 1.0: return if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS: return last_report_time = now last_progress = max(last_progress, mapped) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=message, progress_pct=last_progress, ), ) result = _run_async( apply_silence_cuts( storage, file_key=file_key, out_folder=out_folder, cuts=cuts, output_name=output_name, on_progress=_emit_silence_apply_progress, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_SAVING_RESULT, progress_pct=PROGRESS_SILENCE_APPLY_SAVING, ), ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("silence_apply_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("silence_apply_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def media_convert_actor( job_id: str, webhook_url: str, file_key: str, out_folder: str, output_format: str, ) -> None: """Convert media file to specified format.""" from cpv3.modules.media.service import convert_to_mp4 job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("media_convert_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_PREPARING_FILE, progress_pct=PROGRESS_MEDIA_CONVERT_PREPARING, started_at=_utc_now(), ), ) try: if output_format.lower() != "mp4": raise ValueError(f"Неподдерживаемый формат: {output_format}") storage = _get_storage_service() last_report_time = 0.0 last_progress = PROGRESS_MEDIA_CONVERT_PREPARING def _emit_convert_progress(stage: str, pct: float | None) -> None: nonlocal last_report_time, last_progress if stage == "converting": raw_pct = min(max(pct or 0.0, 0.0), 100.0) if raw_pct >= 100.0: return mapped = PROGRESS_MEDIA_CONVERT_START + (raw_pct / 100.0) * ( PROGRESS_MEDIA_CONVERT_END - PROGRESS_MEDIA_CONVERT_START ) message = MESSAGE_CONVERTING_VIDEO force = raw_pct == 0.0 elif stage == "uploading": mapped = PROGRESS_MEDIA_CONVERT_END message = MESSAGE_UPLOADING_RESULT force = True else: return mapped = round(mapped, 1) now = time.monotonic() if not force: if mapped <= last_progress: return if mapped - last_progress < 1.0: return if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS: return last_report_time = now last_progress = max(last_progress, mapped) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=message, progress_pct=last_progress, ), ) result = _run_async( convert_to_mp4( storage, file_key=file_key, out_folder=out_folder, on_progress=_emit_convert_progress, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_SAVING_RESULT, progress_pct=PROGRESS_MEDIA_CONVERT_SAVING, ), ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "file_path": result.file_path, "file_url": result.file_url, "file_size": result.file_size, }, finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("media_convert_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("media_convert_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def transcription_generate_actor( job_id: str, webhook_url: str, file_key: str, engine: str, language: str | None, model: str, ) -> None: """Generate transcription from audio/video file.""" from cpv3.modules.transcription.service import ( transcribe_with_google_speech, transcribe_with_salute_speech, transcribe_with_whisper, ) job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("transcription_generate_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: from cpv3.modules.media.service import probe_media storage = _get_storage_service() probe = _run_async(probe_media(storage, file_key=file_key)) has_audio = any(s.codec_type == "audio" for s in probe.streams) if not has_audio: raise ValueError(ERROR_NO_AUDIO_STREAM) # Extract probe metadata for artifact creation duration_seconds = ( float(probe.format.duration) if probe.format and probe.format.duration else 0.0 ) video_stream = next((s for s in probe.streams if s.codec_type == "video"), None) probe_meta = { "duration_seconds": duration_seconds, "frame_rate": _parse_frame_rate(video_stream.r_frame_rate) if video_stream and video_stream.r_frame_rate else None, "width": video_stream.width if video_stream else None, "height": video_stream.height if video_stream else None, } _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=f"Транскрибирование ({engine})", progress_pct=PROGRESS_TRANSCRIPTION_START, ), ) last_report_time = time.monotonic() def _on_whisper_progress(pct: float) -> None: nonlocal last_report_time now = time.monotonic() if now - last_report_time < PROGRESS_THROTTLE_SECONDS: return last_report_time = now mapped = PROGRESS_TRANSCRIPTION_START + (pct / 100.0) * ( PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START ) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=f"Транскрибирование ({engine})", progress_pct=round(mapped, 1), ), ) if engine == "whisper": document = _run_async( transcribe_with_whisper( storage, file_key=file_key, model_name=model, language=language, on_progress=_on_whisper_progress, ) ) elif engine == "google": language_codes = [language] if language else None document = _run_async( transcribe_with_google_speech( storage, file_key=file_key, language_codes=language_codes ) ) elif engine == "salutespeech": audio_stream = next( (s for s in probe.streams if s.codec_type == "audio"), None ) sr = int(audio_stream.sample_rate) if audio_stream and audio_stream.sample_rate else 16000 document = _run_async( transcribe_with_salute_speech( storage, file_key=file_key, language=language, model=model, sample_rate=sr, job_id=job_uuid, on_progress=_on_whisper_progress, ) ) else: raise ValueError(ERROR_UNKNOWN_ENGINE.format(engine=engine)) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data={ "document": document.model_dump(mode="json"), "probe": probe_meta, }, finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("transcription_generate_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("transcription_generate_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) RENDER_POLL_INTERVAL_SECONDS = 5 RENDER_POLL_TIMEOUT_SECONDS = 600 @dramatiq.actor(max_retries=0) def captions_generate_actor( job_id: str, webhook_url: str, video_s3_path: str, folder: str, transcription_json: dict, style_config: dict | None = None, ) -> None: """Generate captions on video (async via Remotion + BullMQ).""" from cpv3.modules.captions.service import generate_captions from cpv3.modules.transcription.schemas import Document job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("captions_generate_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_RENDERING_CAPTIONS, progress_pct=PROGRESS_CAPTIONS, ), ) document = Document.model_validate(transcription_json) # Call Remotion with callback_url so it sends progress webhooks directly render_id = _run_async( generate_captions( video_s3_path=video_s3_path, folder=folder, transcription=document, style_config=style_config, callback_url=webhook_url, render_id=job_id, ) ) _raise_if_job_cancelled(job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_RENDERING_CAPTIONS, output_data={"render_id": render_id}, ), ) # Polling fallback: wait for Remotion to finish rendering # Primary progress is delivered via Remotion → webhook directly settings = get_settings() elapsed = 0.0 last_polled_status: str | None = None while elapsed < RENDER_POLL_TIMEOUT_SECONDS: _raise_if_job_cancelled(job_uuid) time.sleep(RENDER_POLL_INTERVAL_SECONDS) elapsed += RENDER_POLL_INTERVAL_SECONDS try: resp = httpx.get( f"{settings.remotion_service_url}/api/render/{render_id}", timeout=10, ) resp.raise_for_status() data = resp.json() except Exception: logger.warning("Render poll failed for %s, retrying...", render_id) continue status = data.get("status") if status != last_polled_status: logger.info( "Remotion render %s status=%s progress=%s callback_delivered=%s", render_id, status, data.get("progress_pct"), data.get("callback_delivered"), ) last_polled_status = status if status == "done": if data.get("callback_delivered") is True: # Remotion already sent DONE webhook — exit cleanly return output_path = data.get("output_path") if not output_path: raise RuntimeError( "Remotion render finished without output_path in polling response" ) logger.warning( "Remotion render %s finished without confirmed DONE webhook, sending fallback completion", render_id, ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, progress_pct=PROGRESS_COMPLETE, current_message="Готово", output_data={"output_path": str(output_path)}, finished_at=_utc_now(), ), ) return if status == "failed": error = data.get("error", "Render failed") raise RuntimeError(f"Remotion render failed: {error}") raise TimeoutError( f"Render {render_id} did not complete within {RENDER_POLL_TIMEOUT_SECONDS}s" ) except JobCancelledError: logger.info("captions_generate_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("captions_generate_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) @dramatiq.actor(max_retries=0) def frame_extract_actor( job_id: str, webhook_url: str, file_key: str, frames_folder: str, regenerate: bool, ) -> None: """Extract video frames at 1fps for timeline thumbnails.""" from cpv3.modules.media.service import ( delete_frames, extract_frames, read_frames_metadata, ) job_uuid = uuid.UUID(job_id) try: _raise_if_job_cancelled(job_uuid) except JobCancelledError: logger.info("frame_extract_actor cancelled: %s", job_uuid) return _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_RUNNING, current_message=MESSAGE_STARTING, started_at=_utc_now(), ), ) try: storage = _get_storage_service() # Delete old frames if regenerating if regenerate: _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_DELETING_OLD_FRAMES, progress_pct=PROGRESS_FRAME_EXTRACT_START, ), ) old_meta = _run_async(read_frames_metadata(storage, frames_folder=frames_folder)) if old_meta is not None: _run_async( delete_frames( storage, frames_folder=frames_folder, frame_count=old_meta.frame_count, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_EXTRACTING_FRAMES, progress_pct=PROGRESS_FRAME_EXTRACT_START, ), ) last_report_time = time.monotonic() def _on_progress(current: int, total: int) -> None: nonlocal last_report_time now = time.monotonic() if now - last_report_time < PROGRESS_THROTTLE_SECONDS: return last_report_time = now pct = current / total mapped = PROGRESS_FRAME_EXTRACT_START + pct * ( PROGRESS_FRAME_EXTRACT_END - PROGRESS_FRAME_EXTRACT_START ) _send_webhook_event( webhook_url, TaskWebhookEvent( current_message=MESSAGE_UPLOADING_FRAMES, progress_pct=round(mapped, 1), ), ) metadata = _run_async( extract_frames( storage, file_key=file_key, frames_folder=frames_folder, on_progress=_on_progress, ) ) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_DONE, current_message=MESSAGE_COMPLETED, progress_pct=PROGRESS_COMPLETE, output_data=metadata.model_dump(mode="json"), finished_at=_utc_now(), ), ) except JobCancelledError: logger.info("frame_extract_actor cancelled: %s", job_uuid) return except Exception as exc: logger.exception("frame_extract_actor failed: %s", job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( status=JOB_STATUS_FAILED, error_message=str(exc), finished_at=_utc_now(), ), ) # --------------------------------------------------------------------------- # Task Service # --------------------------------------------------------------------------- class TaskService: """Service for submitting background tasks and recording webhook updates.""" def __init__(self, session: AsyncSession) -> None: self._session = session self._job_repo = JobRepository(session) self._event_repo = JobEventRepository(session) self._webhook_repo = WebhookRepository(session) async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job: """Persist the transport-specific broker reference after enqueueing.""" job.broker_id = broker_reference await self._session.commit() await self._session.refresh(job) return job async def _find_duplicate_active_job( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, ) -> Job | None: """Reuse an already running job for the same request payload.""" jobs = await self._job_repo.list_active_by_type( requester=requester, job_type=job_type, project_id=project_id, statuses=ACTIVE_JOB_STATUSES, ) for job in jobs: if job.input_data == input_data: return job return None async def _create_cancellation_notification(self, job: Job) -> None: """Emit a single cancellation notification to the current user.""" if job.user_id is None: return notification_service = NotificationService(self._session) await notification_service.create_task_notification( user_id=job.user_id, job=job, event=TaskWebhookEvent( status=JOB_STATUS_CANCELLED, current_message=MESSAGE_CANCELLED, finished_at=job.finished_at, ), ) def _cancel_dramatiq_message_sync(self, broker_id: str | None) -> None: """Remove a queued Dramatiq message from Redis when possible.""" broker_reference = _parse_broker_reference(broker_id) if broker_reference is None: return queue_name, redis_message_id = broker_reference namespace = _redis_broker.namespace queue_key = f"{namespace}:{queue_name}" queue_messages_key = f"{queue_key}.msgs" delayed_queue_key = f"{queue_key}.DQ" delayed_messages_key = f"{delayed_queue_key}.msgs" acks_pattern = f"{namespace}:__acks__.*.{queue_name}" pipeline = _redis_broker.client.pipeline() pipeline.lrem(queue_key, 1, redis_message_id) pipeline.hdel(queue_messages_key, redis_message_id) pipeline.lrem(delayed_queue_key, 1, redis_message_id) pipeline.hdel(delayed_messages_key, redis_message_id) for key in _redis_broker.client.scan_iter(match=acks_pattern): pipeline.srem(key, redis_message_id) pipeline.execute() async def _cancel_dramatiq_message(self, broker_id: str | None) -> None: """Run Redis queue cleanup for a Dramatiq message off the event loop.""" await asyncio.to_thread(self._cancel_dramatiq_message_sync, broker_id) async def _cancel_caption_render(self, job: Job) -> None: """Cancel the downstream Remotion render when it has already been queued.""" if job.job_type != JOB_TYPE_CAPTIONS_GENERATE: return output_data = job.output_data or {} render_id = output_data.get("render_id") or str(job.id) settings = get_settings() async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT_SECONDS) as client: response = await client.delete( f"{settings.remotion_service_url}/api/render/{render_id}" ) if response.status_code == 404: return response.raise_for_status() async def _create_job_and_webhook( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, ) -> tuple[Job, str]: """Create job and webhook, return job and webhook URL.""" broker_id = uuid.uuid4().hex job = await self._job_repo.create( requester=requester, data=JobCreate( broker_id=broker_id, project_id=project_id, input_data=input_data, job_type=job_type, ), ) webhook_url = _build_webhook_url(job.id) await self._webhook_repo.create( requester=requester, data=WebhookCreate( project_id=project_id, event=_build_webhook_event_name(job_type), url=webhook_url, ), ) return job, webhook_url async def _submit_task( self, *, requester: User, job_type: JobTypeEnum, project_id: uuid.UUID | None, input_data: dict, actor: Any, actor_kwargs: dict[str, Any], ) -> TaskSubmitResponse: job, webhook_url = await self._create_job_and_webhook( requester=requester, job_type=job_type, project_id=project_id, input_data=input_data, ) message = actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs) redis_message_id = message.options.get("redis_message_id") if redis_message_id: broker_reference = _serialize_broker_reference( message.queue_name, str(redis_message_id), ) await self._update_job_broker_reference(job, broker_reference) return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status=JOB_STATUS_PENDING, ) async def record_webhook_event(self, *, job_id: uuid.UUID, event: TaskWebhookEvent) -> Job: """Apply a webhook event to the job and store a job event record.""" job = await self._job_repo.get_by_id(job_id) if job is None: raise ValueError(f"Задача {job_id} не найдена") if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED): logger.info("Ignoring webhook for terminal job %s (status=%s)", job_id, job.status) return job job_update = JobUpdate( status=event.status, project_pct=event.progress_pct, current_message=event.current_message, error_message=event.error_message, output_data=event.output_data, started_at=event.started_at, finished_at=event.finished_at, ) job = await self._job_repo.update(job, job_update) event_type = _derive_event_type(event) payload = event.model_dump(mode="json", exclude_none=True) await self._event_repo.create( JobEventCreate(job_id=job.id, event_type=event_type, payload=payload) ) # Save artifacts BEFORE sending notifications so data exists when frontend refetches if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE: try: await self._save_transcription_artifacts(job) except Exception: logger.exception("Failed to save transcription artifacts for job %s", job_id) if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE: try: await self._save_convert_artifacts(job) except Exception: logger.exception("Failed to save convert artifacts for job %s", job_id) if job.job_type == JOB_TYPE_SILENCE_APPLY and event.status == JOB_STATUS_DONE: try: await self._save_silence_apply_artifacts(job) except Exception: logger.exception("Failed to save silence apply artifacts for job %s", job_id) if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE: try: await self._save_captions_artifacts(job) except Exception: logger.exception("Failed to save captions artifacts for job %s", job_id) # Push real-time notification via WebSocket (after artifacts are persisted) if job.user_id is not None: try: notification_service = NotificationService(self._session) await notification_service.create_task_notification( user_id=job.user_id, job=job, event=event ) except Exception: logger.exception("Failed to create notification for job %s", job_id) return job async def cancel_job(self, job: Job) -> Job: """Cancel a job, clean queued transport state and ignore late webhooks.""" if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED): return job try: await self._cancel_dramatiq_message(job.broker_id) except Exception: logger.exception("Failed to cancel Dramatiq message for job %s", job.id) try: await self._cancel_caption_render(job) except Exception: logger.exception("Failed to cancel caption render for job %s", job.id) finished_at = _utc_now() job = await self._job_repo.update( job, JobUpdate( status=JOB_STATUS_CANCELLED, current_message=MESSAGE_CANCELLED, finished_at=finished_at, ), ) await self._event_repo.create( JobEventCreate( job_id=job.id, event_type=f"{EVENT_TYPE_STATUS_PREFIX}{JOB_STATUS_CANCELLED}", payload={ "status": JOB_STATUS_CANCELLED, "current_message": MESSAGE_CANCELLED, "finished_at": finished_at.isoformat(), }, ) ) try: await self._create_cancellation_notification(job) except Exception: logger.exception("Failed to create cancellation notification for job %s", job.id) return job async def _save_transcription_artifacts(self, job: Job) -> None: """Create Transcription, ArtifactMediaFile and File records.""" input_data = job.input_data or {} output_data = job.output_data or {} file_key: str = input_data["file_key"] project_id: uuid.UUID | None = ( uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None ) engine_raw: str = input_data.get("engine", "whisper") language: str | None = input_data.get("language") document: dict = output_data["document"] # Resolve user user_repo = UserRepository(self._session) user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping artifact save", job.user_id) return # Find or create source File record file_repo = FileRepository(self._session) source_file = await file_repo.get_by_path(file_key) if source_file is None: source_file = await file_repo.create( requester=user, data=FileCreate( project_id=project_id, original_filename=file_key.rsplit("/", 1)[-1], path=file_key, storage_backend="S3", mime_type="application/octet-stream", size_bytes=0, is_uploaded=True, ), ) # Upload document JSON to S3 storage = _get_storage_service() user_folder = get_user_folder(user) json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8") # Build display name: "Транскрибация .json" video_stem = Path(source_file.original_filename).stem transcription_filename = f"Транскрибация {video_stem}.json" artifact_key = await storage.upload_fileobj( fileobj=io.BytesIO(json_bytes), file_name=transcription_filename, folder=f"{user_folder}/artifacts", gen_name=True, content_type="application/json", ) # Create File record for the JSON artifact (no project_id — only reachable via artifact) json_file = await file_repo.create( requester=user, data=FileCreate( project_id=None, original_filename=transcription_filename, path=artifact_key, storage_backend="S3", mime_type="application/json", size_bytes=len(json_bytes), file_format="json", is_uploaded=True, ), ) # Create ArtifactMediaFile (no media_file_id — transcription is not a media file) artifact_repo = ArtifactRepository(self._session) artifact = await artifact_repo.create( data=ArtifactMediaFileCreate( project_id=project_id, file_id=json_file.id, media_file_id=None, artifact_type="TRANSCRIPTION_JSON", ), ) # Create Transcription record transcription_repo = TranscriptionRepository(self._session) engine_db = ENGINE_MAP.get(engine_raw, "LOCAL_WHISPER") await transcription_repo.create( data=TranscriptionCreate( project_id=project_id, source_file_id=source_file.id, artifact_id=artifact.id, engine=engine_db, # type: ignore[arg-type] language=language, document=document, ), ) logger.info("Saved transcription artifacts for job %s", job.id) async def _save_convert_artifacts(self, job: Job) -> None: """Create File and ArtifactMediaFile records for converted MP4.""" input_data = job.input_data or {} output_data = job.output_data or {} file_key: str = input_data["file_key"] project_id: uuid.UUID | None = ( uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None ) file_path: str = output_data["file_path"] file_size: int = output_data.get("file_size", 0) # Resolve user user_repo = UserRepository(self._session) user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping convert artifact save", job.user_id) return # Derive output filename from source file file_repo = FileRepository(self._session) source_file = await file_repo.get_by_path(file_key) if source_file is not None: stem = Path(source_file.original_filename).stem else: stem = Path(file_key).stem converted_filename = f"Конвертированое видео {stem}.mp4" # Create File record for the converted MP4 (no project_id — only reachable via artifact) converted_file = await file_repo.create( requester=user, data=FileCreate( project_id=None, original_filename=converted_filename, path=file_path, storage_backend="S3", mime_type="video/mp4", size_bytes=file_size, file_format="mp4", is_uploaded=True, ), ) # Create ArtifactMediaFile record artifact_repo = ArtifactRepository(self._session) await artifact_repo.create( data=ArtifactMediaFileCreate( project_id=project_id, file_id=converted_file.id, media_file_id=None, artifact_type="CONVERTED_VIDEO", ), ) updated_output = dict(output_data) updated_output["file_id"] = str(converted_file.id) await self._job_repo.update(job, JobUpdate(output_data=updated_output)) logger.info("Saved convert artifacts for job %s", job.id) async def _save_silence_apply_artifacts(self, job: Job) -> None: """Create File and ArtifactMediaFile records for silence-applied video.""" input_data = job.input_data or {} output_data = job.output_data or {} file_key: str = input_data["file_key"] project_id: uuid.UUID | None = ( uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None ) file_path: str = output_data["file_path"] file_size: int = output_data.get("file_size", 0) user_repo = UserRepository(self._session) user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning( "User %s not found, skipping silence apply artifact save", job.user_id ) return file_repo = FileRepository(self._session) source_file = await file_repo.get_by_path(file_key) if source_file is not None: stem = Path(source_file.original_filename).stem else: stem = Path(file_key).stem processed_filename = f"Видео без тишины {stem}.mp4" processed_file = await file_repo.create( requester=user, data=FileCreate( project_id=project_id, original_filename=processed_filename, path=file_path, storage_backend="S3", mime_type="video/mp4", size_bytes=file_size, file_format="mp4", is_uploaded=True, ), ) artifact_repo = ArtifactRepository(self._session) await artifact_repo.create( data=ArtifactMediaFileCreate( project_id=project_id, file_id=processed_file.id, media_file_id=None, artifact_type="SILENCE_REMOVED_VIDEO", ), ) updated_output = dict(output_data) updated_output["file_id"] = str(processed_file.id) await self._job_repo.update(job, JobUpdate(output_data=updated_output)) logger.info( "Saved silence apply artifacts for job %s (file_id=%s)", job.id, processed_file.id, ) async def _save_captions_artifacts(self, job: Job) -> None: """Create File and ArtifactMediaFile records for captioned video.""" input_data = job.input_data or {} output_data = job.output_data or {} video_s3_path: str = input_data["video_s3_path"] project_id: uuid.UUID | None = ( uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None ) output_path: str = output_data["output_path"] # Resolve user user_repo = UserRepository(self._session) user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping captions artifact save", job.user_id) return # Get file size from S3 storage = _get_storage_service() file_size = await storage.size(output_path) # Derive output filename from source video file_repo = FileRepository(self._session) source_file = await file_repo.get_by_path(video_s3_path) if source_file is not None: stem = Path(source_file.original_filename).stem else: stem = Path(video_s3_path).stem captioned_filename = f"Видео с субтитрами {stem}.mp4" # Create File record captioned_file = await file_repo.create( requester=user, data=FileCreate( project_id=project_id, original_filename=captioned_filename, path=output_path, storage_backend="S3", mime_type="video/mp4", size_bytes=file_size, file_format="mp4", is_uploaded=True, ), ) # Create ArtifactMediaFile record artifact_repo = ArtifactRepository(self._session) await artifact_repo.create( data=ArtifactMediaFileCreate( project_id=project_id, file_id=captioned_file.id, media_file_id=None, artifact_type="RENDERED_VIDEO", ), ) # Update job output_data with file_id so frontend can reference it updated_output = dict(output_data) updated_output["file_id"] = str(captioned_file.id) job = await self._job_repo.update( job, JobUpdate(output_data=updated_output) ) logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id) async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest ) -> TaskSubmitResponse: """Submit media probe task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_MEDIA_PROBE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=media_probe_actor, actor_kwargs={"file_key": request.file_key}, ) async def submit_silence_remove( self, *, requester: User, request: SilenceRemoveRequest ) -> TaskSubmitResponse: """Submit silence removal task.""" user_folder = get_user_folder(requester) resolved_folder = ( f"{user_folder}/{request.out_folder}" if request.out_folder else f"{user_folder}/output_files" ) return await self._submit_task( requester=requester, job_type=JOB_TYPE_SILENCE_REMOVE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=silence_remove_actor, actor_kwargs={ "file_key": request.file_key, "out_folder": resolved_folder, "min_silence_duration_ms": request.min_silence_duration_ms, "silence_threshold_db": request.silence_threshold_db, "padding_ms": request.padding_ms, }, ) async def submit_silence_detect( self, *, requester: User, request: SilenceDetectRequest ) -> TaskSubmitResponse: """Submit silence detection task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_SILENCE_DETECT, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=silence_detect_actor, actor_kwargs={ "file_key": request.file_key, "min_silence_duration_ms": request.min_silence_duration_ms, "silence_threshold_db": request.silence_threshold_db, "padding_ms": request.padding_ms, }, ) async def submit_silence_apply( self, *, requester: User, request: SilenceApplyRequest ) -> TaskSubmitResponse: """Submit silence apply task.""" user_folder = get_user_folder(requester) resolved_folder = ( f"{user_folder}/{request.out_folder}" if request.out_folder else f"{user_folder}/output_files" ) return await self._submit_task( requester=requester, job_type=JOB_TYPE_SILENCE_APPLY, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=silence_apply_actor, actor_kwargs={ "file_key": request.file_key, "out_folder": resolved_folder, "cuts": request.cuts, "output_name": request.output_name, }, ) async def submit_media_convert( self, *, requester: User, request: MediaConvertRequest ) -> TaskSubmitResponse: """Submit media conversion task.""" user_folder = get_user_folder(requester) resolved_folder = ( f"{user_folder}/{request.out_folder}" if request.out_folder else f"{user_folder}/output_files" ) return await self._submit_task( requester=requester, job_type=JOB_TYPE_MEDIA_CONVERT, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=media_convert_actor, actor_kwargs={ "file_key": request.file_key, "out_folder": resolved_folder, "output_format": request.output_format, }, ) async def submit_transcription_generate( self, *, requester: User, request: TranscriptionGenerateRequest ) -> TaskSubmitResponse: """Submit transcription generation task.""" return await self._submit_task( requester=requester, job_type=JOB_TYPE_TRANSCRIPTION_GENERATE, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=transcription_generate_actor, actor_kwargs={ "file_key": request.file_key, "engine": request.engine, "language": request.language, "model": request.model, }, ) async def submit_frame_extract( self, *, requester: User, request: FrameExtractRequest ) -> TaskSubmitResponse: """Submit frame extraction task.""" from cpv3.modules.media.service import get_frames_folder user_folder = get_user_folder(requester) frames_folder = get_frames_folder(user_folder, request.file_key) return await self._submit_task( requester=requester, job_type=JOB_TYPE_FRAME_EXTRACT, project_id=request.project_id, input_data=request.model_dump(mode="json"), actor=frame_extract_actor, actor_kwargs={ "file_key": request.file_key, "frames_folder": frames_folder, "regenerate": request.regenerate, }, ) async def submit_captions_generate( self, *, requester: User, request: CaptionsGenerateRequest ) -> TaskSubmitResponse: """Submit captions generation task.""" from cpv3.modules.captions.service import CaptionPresetService input_data = request.model_dump(mode="json") existing_job = await self._find_duplicate_active_job( requester=requester, job_type=JOB_TYPE_CAPTIONS_GENERATE, project_id=request.project_id, input_data=input_data, ) if existing_job is not None: return TaskSubmitResponse( job_id=existing_job.id, webhook_url=_build_webhook_url(existing_job.id), status=existing_job.status, ) transcription_repo = TranscriptionRepository(self._session) transcription = await transcription_repo.get_by_id(request.transcription_id) if transcription is None: raise ValueError(f"Транскрипция {request.transcription_id} не найдена") user_folder = get_user_folder(requester) resolved_folder = ( f"{user_folder}/{request.folder}" if request.folder else f"{user_folder}/output_files" ) # Resolve style config from preset or inline override preset_service = CaptionPresetService(self._session) style_config = await preset_service.resolve_style_config( preset_id=request.preset_id, inline_config=request.style_config, ) return await self._submit_task( requester=requester, job_type=JOB_TYPE_CAPTIONS_GENERATE, project_id=request.project_id, input_data=input_data, actor=captions_generate_actor, actor_kwargs={ "video_s3_path": request.video_s3_path, "folder": resolved_folder, "transcription_json": transcription.document, "style_config": style_config, }, )