Files
2026-04-27 23:19:04 +03:00

1900 lines
65 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task service for submitting and managing background tasks.
"""
from __future__ import annotations
import asyncio
import io
import json
import logging
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import dramatiq # type: ignore[import-untyped]
import httpx
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.infrastructure.deps import _get_storage_service
from cpv3.infrastructure.settings import get_settings
from cpv3.infrastructure.storage.utils import get_user_folder
from cpv3.modules.files.repository import FileRepository
from cpv3.modules.files.schemas import FileCreate
from cpv3.modules.jobs.models import Job
from cpv3.modules.jobs.repository import JobEventRepository, JobRepository
from cpv3.modules.jobs.schemas import (
JobCreate,
JobEventCreate,
JobStatusEnum,
JobTypeEnum,
JobUpdate,
)
from cpv3.modules.media.repository import ArtifactRepository
from cpv3.modules.media.schemas import ArtifactMediaFileCreate
from cpv3.modules.project_workspaces.service import ProjectWorkspaceService
from cpv3.modules.tasks.schemas import (
CaptionsGenerateRequest,
FrameExtractRequest,
MediaConvertRequest,
MediaProbeRequest,
SilenceApplyRequest,
SilenceDetectRequest,
SilenceRemoveRequest,
TaskSubmitResponse,
TaskWebhookEvent,
TranscriptionGenerateRequest,
)
from cpv3.modules.notifications.service import NotificationService
from cpv3.modules.transcription.repository import TranscriptionRepository
from cpv3.modules.transcription.schemas import TranscriptionCreate
from cpv3.modules.users.models import User
from cpv3.modules.users.repository import UserRepository
from cpv3.modules.webhooks.repository import WebhookRepository
from cpv3.modules.webhooks.schemas import WebhookCreate
logger = logging.getLogger(__name__)
JOB_STATUS_PENDING: JobStatusEnum = "PENDING"
JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING"
JOB_STATUS_DONE: JobStatusEnum = "DONE"
JOB_STATUS_FAILED: JobStatusEnum = "FAILED"
JOB_STATUS_CANCELLED: JobStatusEnum = "CANCELLED"
JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE"
JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE"
JOB_TYPE_SILENCE_DETECT: JobTypeEnum = "SILENCE_DETECT"
JOB_TYPE_SILENCE_APPLY: JobTypeEnum = "SILENCE_APPLY"
JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT"
JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE"
JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE"
JOB_TYPE_FRAME_EXTRACT: JobTypeEnum = "FRAME_EXTRACT"
EVENT_TYPE_STATUS_PREFIX = "status_"
EVENT_TYPE_PROGRESS = "progress"
EVENT_TYPE_LOG = "log"
EVENT_TYPE_OUTPUT = "output"
EVENT_TYPE_ERROR = "error"
TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/"
WEBHOOK_TIMEOUT_SECONDS = 10
ERROR_NO_AUDIO_STREAM = "Файл не содержит аудиодорожки"
ERROR_UNKNOWN_ENGINE = "Неизвестный движок транскрипции: {engine}"
ENGINE_MAP: dict[str, str] = {
"whisper": "LOCAL_WHISPER",
"google": "GOOGLE_SPEECH_CLOUD",
"salutespeech": "SALUTE_SPEECH",
}
MESSAGE_STARTING = "Запуск"
MESSAGE_COMPLETED = "Завершено"
MESSAGE_PROBING_MEDIA = "Анализ медиафайла"
MESSAGE_PROCESSING = "Обработка"
MESSAGE_CONVERTING = "Конвертация"
MESSAGE_PREPARING_FILE = "Подготовка файла"
MESSAGE_CONVERTING_VIDEO = "Конвертация видео"
MESSAGE_UPLOADING_RESULT = "Загрузка результата"
MESSAGE_SAVING_RESULT = "Сохранение результата"
MESSAGE_RENDERING_CAPTIONS = "Рендеринг субтитров"
MESSAGE_CANCELLED = "Отменено пользователем"
MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров"
MESSAGE_UPLOADING_FRAMES = "Загрузка кадров"
MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров"
PROGRESS_COMPLETE = 100.0
PROGRESS_MEDIA_PROBE = 50.0
PROGRESS_SILENCE_REMOVE = 30.0
PROGRESS_MEDIA_CONVERT = 30.0
PROGRESS_MEDIA_CONVERT_PREPARING = 5.0
PROGRESS_MEDIA_CONVERT_START = 10.0
PROGRESS_MEDIA_CONVERT_END = 95.0
PROGRESS_MEDIA_CONVERT_SAVING = 99.0
PROGRESS_SILENCE_APPLY_PREPARING = 5.0
PROGRESS_SILENCE_APPLY_START = 10.0
PROGRESS_SILENCE_APPLY_END = 95.0
PROGRESS_SILENCE_APPLY_SAVING = 99.0
PROGRESS_TRANSCRIPTION_START = 20.0
PROGRESS_TRANSCRIPTION_END = 95.0
PROGRESS_CAPTIONS = 30.0
PROGRESS_FRAME_EXTRACT_START = 10.0
PROGRESS_FRAME_EXTRACT_END = 95.0
PROGRESS_SILENCE_DETECT = 30.0
PROGRESS_SILENCE_APPLY = 30.0
MESSAGE_DETECTING_SILENCE = "Обнаружение тишины"
MESSAGE_APPLYING_CUTS = "Применение вырезок"
PROGRESS_THROTTLE_SECONDS = 3.0
PROGRESS_CONVERT_THROTTLE_SECONDS = 1.0
ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING)
DRAMATIQ_BROKER_REF_SEPARATOR = ":"
class JobCancelledError(RuntimeError):
"""Raised when a job was cancelled before completion."""
# ---------------------------------------------------------------------------
# Dramatiq broker setup
# ---------------------------------------------------------------------------
_settings = get_settings()
_redis_broker = RedisBroker(url=_settings.redis_url)
dramatiq.set_broker(_redis_broker)
# ---------------------------------------------------------------------------
# Webhook helpers for Dramatiq workers
# ---------------------------------------------------------------------------
def _utc_now() -> datetime:
"""Return current UTC time."""
return datetime.now(timezone.utc)
def _parse_frame_rate(rate_str: str) -> float | None:
"""Parse ffprobe frame rate string like '30/1' or '30000/1001'."""
try:
if "/" in rate_str:
num, den = rate_str.split("/")
den_val = int(den)
return round(int(num) / den_val, 3) if den_val else None
return float(rate_str)
except (ValueError, ZeroDivisionError):
return None
def _build_webhook_url(job_id: uuid.UUID) -> str:
"""Build the internal webhook URL for task updates."""
settings = get_settings()
base_url = settings.webhook_base_url.rstrip("/")
return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}"
def _build_webhook_event_name(job_type: JobTypeEnum) -> str:
"""Build webhook event name for a job type."""
return f"task.{job_type.lower()}"
def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None:
"""Send a task webhook event to the API."""
payload = event.model_dump(mode="json", exclude_none=True)
try:
response = httpx.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS)
response.raise_for_status()
except Exception:
logger.exception("Failed to send task webhook event to %s", webhook_url)
raise
def _derive_event_type(event: TaskWebhookEvent) -> str:
"""Derive a job event type from a webhook event payload."""
if event.status is not None:
return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}"
if event.error_message is not None:
return EVENT_TYPE_ERROR
if event.progress_pct is not None:
return EVENT_TYPE_PROGRESS
if event.current_message is not None:
return EVENT_TYPE_LOG
if event.output_data is not None:
return EVENT_TYPE_OUTPUT
return EVENT_TYPE_LOG
def _run_async(coro: Any) -> Any:
"""Run async function in new event loop (for sync Dramatiq actors)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _serialize_broker_reference(queue_name: str, redis_message_id: str) -> str:
"""Serialize queue name and Dramatiq redis message id into one field."""
return f"{queue_name}{DRAMATIQ_BROKER_REF_SEPARATOR}{redis_message_id}"
def _parse_broker_reference(broker_id: str | None) -> tuple[str, str] | None:
"""Parse queue name and Dramatiq redis message id from stored broker_id."""
if not broker_id or DRAMATIQ_BROKER_REF_SEPARATOR not in broker_id:
return None
queue_name, redis_message_id = broker_id.split(DRAMATIQ_BROKER_REF_SEPARATOR, 1)
if not queue_name or not redis_message_id:
return None
return queue_name, redis_message_id
def _get_job_status_sync(job_id: uuid.UUID) -> JobStatusEnum | None:
"""Read current job status using a sync connection (safe for Dramatiq workers)."""
import psycopg2
settings = get_settings()
dsn = (
f"host={settings.postgres_host} port={settings.postgres_port} "
f"dbname={settings.postgres_database} "
f"user={settings.postgres_user} password={settings.postgres_password}"
)
try:
conn = psycopg2.connect(dsn)
try:
with conn.cursor() as cur:
cur.execute("SELECT status FROM jobs WHERE id = %s", (str(job_id),))
row = cur.fetchone()
return row[0] if row else None
finally:
conn.close()
except Exception:
logger.warning("Failed to check job status for %s", job_id, exc_info=True)
return None
def _raise_if_job_cancelled(job_id: uuid.UUID) -> None:
"""Stop worker execution when the job is already cancelled in the database."""
status = _get_job_status_sync(job_id)
if status == JOB_STATUS_CANCELLED:
raise JobCancelledError(MESSAGE_CANCELLED)
# ---------------------------------------------------------------------------
# Dramatiq actors
# ---------------------------------------------------------------------------
@dramatiq.actor(max_retries=0)
def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
"""Probe media file to extract metadata."""
from cpv3.modules.media.service import probe_media
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("media_probe_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_PROBING_MEDIA,
progress_pct=PROGRESS_MEDIA_PROBE,
),
)
result = _run_async(probe_media(storage, file_key=file_key))
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=result.model_dump(mode="json"),
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("media_probe_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("media_probe_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def silence_remove_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
min_silence_duration_ms: int,
silence_threshold_db: int,
padding_ms: int,
) -> None:
"""Remove silence from media file."""
from cpv3.modules.media.service import remove_silence
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_remove_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_PROCESSING,
progress_pct=PROGRESS_SILENCE_REMOVE,
),
)
result = _run_async(
remove_silence(
storage,
file_key=file_key,
out_folder=out_folder,
min_silence_duration_ms=min_silence_duration_ms,
silence_threshold_db=silence_threshold_db,
padding_ms=padding_ms,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_remove_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_remove_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def silence_detect_actor(
job_id: str,
webhook_url: str,
file_key: str,
min_silence_duration_ms: int,
silence_threshold_db: int,
padding_ms: int,
) -> None:
"""Detect silent segments in media file."""
from cpv3.modules.media.service import detect_silence
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_detect_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_DETECTING_SILENCE,
progress_pct=PROGRESS_SILENCE_DETECT,
),
)
result = _run_async(
detect_silence(
storage,
file_key=file_key,
min_silence_duration_ms=min_silence_duration_ms,
silence_threshold_db=silence_threshold_db,
padding_ms=padding_ms,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=result,
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_detect_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_detect_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def silence_apply_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
cuts: list[dict],
output_name: str | None,
) -> None:
"""Apply silence cuts to media file."""
from cpv3.modules.media.service import apply_silence_cuts
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_apply_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_PREPARING_FILE,
progress_pct=PROGRESS_SILENCE_APPLY_PREPARING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
last_report_time = 0.0
last_progress = PROGRESS_SILENCE_APPLY_PREPARING
def _emit_silence_apply_progress(stage: str, pct: float | None) -> None:
nonlocal last_report_time, last_progress
if stage == "applying_cuts":
raw_pct = min(max(pct or 0.0, 0.0), 100.0)
if raw_pct >= 100.0:
return
mapped = PROGRESS_SILENCE_APPLY_START + (raw_pct / 100.0) * (
PROGRESS_SILENCE_APPLY_END - PROGRESS_SILENCE_APPLY_START
)
message = MESSAGE_APPLYING_CUTS
force = raw_pct == 0.0
elif stage == "uploading":
mapped = PROGRESS_SILENCE_APPLY_END
message = MESSAGE_UPLOADING_RESULT
force = True
else:
return
mapped = round(mapped, 1)
now = time.monotonic()
if not force:
if mapped <= last_progress:
return
if mapped - last_progress < 1.0:
return
if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS:
return
last_report_time = now
last_progress = max(last_progress, mapped)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=message,
progress_pct=last_progress,
),
)
result = _run_async(
apply_silence_cuts(
storage,
file_key=file_key,
out_folder=out_folder,
cuts=cuts,
output_name=output_name,
on_progress=_emit_silence_apply_progress,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_SAVING_RESULT,
progress_pct=PROGRESS_SILENCE_APPLY_SAVING,
),
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_apply_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_apply_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def media_convert_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
output_format: str,
) -> None:
"""Convert media file to specified format."""
from cpv3.modules.media.service import convert_to_mp4
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("media_convert_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_PREPARING_FILE,
progress_pct=PROGRESS_MEDIA_CONVERT_PREPARING,
started_at=_utc_now(),
),
)
try:
if output_format.lower() != "mp4":
raise ValueError(f"Неподдерживаемый формат: {output_format}")
storage = _get_storage_service()
last_report_time = 0.0
last_progress = PROGRESS_MEDIA_CONVERT_PREPARING
def _emit_convert_progress(stage: str, pct: float | None) -> None:
nonlocal last_report_time, last_progress
if stage == "converting":
raw_pct = min(max(pct or 0.0, 0.0), 100.0)
if raw_pct >= 100.0:
return
mapped = PROGRESS_MEDIA_CONVERT_START + (raw_pct / 100.0) * (
PROGRESS_MEDIA_CONVERT_END - PROGRESS_MEDIA_CONVERT_START
)
message = MESSAGE_CONVERTING_VIDEO
force = raw_pct == 0.0
elif stage == "uploading":
mapped = PROGRESS_MEDIA_CONVERT_END
message = MESSAGE_UPLOADING_RESULT
force = True
else:
return
mapped = round(mapped, 1)
now = time.monotonic()
if not force:
if mapped <= last_progress:
return
if mapped - last_progress < 1.0:
return
if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS:
return
last_report_time = now
last_progress = max(last_progress, mapped)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=message,
progress_pct=last_progress,
),
)
result = _run_async(
convert_to_mp4(
storage,
file_key=file_key,
out_folder=out_folder,
on_progress=_emit_convert_progress,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_SAVING_RESULT,
progress_pct=PROGRESS_MEDIA_CONVERT_SAVING,
),
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("media_convert_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("media_convert_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def transcription_generate_actor(
job_id: str,
webhook_url: str,
file_key: str,
engine: str,
language: str | None,
model: str,
) -> None:
"""Generate transcription from audio/video file."""
from cpv3.modules.transcription.service import (
transcribe_with_google_speech,
transcribe_with_salute_speech,
transcribe_with_whisper,
)
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
from cpv3.modules.media.service import probe_media
storage = _get_storage_service()
probe = _run_async(probe_media(storage, file_key=file_key))
has_audio = any(s.codec_type == "audio" for s in probe.streams)
if not has_audio:
raise ValueError(ERROR_NO_AUDIO_STREAM)
# Extract probe metadata for artifact creation
duration_seconds = (
float(probe.format.duration) if probe.format and probe.format.duration else 0.0
)
video_stream = next((s for s in probe.streams if s.codec_type == "video"), None)
probe_meta = {
"duration_seconds": duration_seconds,
"frame_rate": _parse_frame_rate(video_stream.r_frame_rate)
if video_stream and video_stream.r_frame_rate
else None,
"width": video_stream.width if video_stream else None,
"height": video_stream.height if video_stream else None,
}
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=f"Транскрибирование ({engine})",
progress_pct=PROGRESS_TRANSCRIPTION_START,
),
)
last_report_time = time.monotonic()
def _on_whisper_progress(pct: float) -> None:
nonlocal last_report_time
now = time.monotonic()
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
return
last_report_time = now
mapped = PROGRESS_TRANSCRIPTION_START + (pct / 100.0) * (
PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=f"Транскрибирование ({engine})",
progress_pct=round(mapped, 1),
),
)
if engine == "whisper":
document = _run_async(
transcribe_with_whisper(
storage,
file_key=file_key,
model_name=model,
language=language,
on_progress=_on_whisper_progress,
)
)
elif engine == "google":
language_codes = [language] if language else None
document = _run_async(
transcribe_with_google_speech(
storage, file_key=file_key, language_codes=language_codes
)
)
elif engine == "salutespeech":
audio_stream = next((s for s in probe.streams if s.codec_type == "audio"), None)
sr = (
int(audio_stream.sample_rate)
if audio_stream and audio_stream.sample_rate
else 16000
)
document = _run_async(
transcribe_with_salute_speech(
storage,
file_key=file_key,
language=language,
model=model,
sample_rate=sr,
job_id=job_uuid,
on_progress=_on_whisper_progress,
)
)
else:
raise ValueError(ERROR_UNKNOWN_ENGINE.format(engine=engine))
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"document": document.model_dump(mode="json"),
"probe": probe_meta,
},
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("transcription_generate_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
RENDER_POLL_INTERVAL_SECONDS = 5
RENDER_POLL_TIMEOUT_SECONDS = 600
@dramatiq.actor(max_retries=0)
def captions_generate_actor(
job_id: str,
webhook_url: str,
video_s3_path: str,
folder: str,
transcription_json: dict,
style_config: dict | None = None,
) -> None:
"""Generate captions on video (async via Remotion + BullMQ)."""
from cpv3.modules.captions.service import generate_captions
from cpv3.modules.transcription.schemas import Document
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("captions_generate_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_RENDERING_CAPTIONS,
progress_pct=PROGRESS_CAPTIONS,
),
)
document = Document.model_validate(transcription_json)
# Call Remotion with callback_url so it sends progress webhooks directly
render_id = _run_async(
generate_captions(
video_s3_path=video_s3_path,
folder=folder,
transcription=document,
style_config=style_config,
callback_url=webhook_url,
render_id=job_id,
)
)
_raise_if_job_cancelled(job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_RENDERING_CAPTIONS,
output_data={"render_id": render_id},
),
)
# Polling fallback: wait for Remotion to finish rendering
# Primary progress is delivered via Remotion → webhook directly
settings = get_settings()
elapsed = 0.0
last_polled_status: str | None = None
while elapsed < RENDER_POLL_TIMEOUT_SECONDS:
_raise_if_job_cancelled(job_uuid)
time.sleep(RENDER_POLL_INTERVAL_SECONDS)
elapsed += RENDER_POLL_INTERVAL_SECONDS
try:
resp = httpx.get(
f"{settings.remotion_service_url}/api/render/{render_id}",
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception:
logger.warning("Render poll failed for %s, retrying...", render_id)
continue
status = data.get("status")
if status != last_polled_status:
logger.info(
"Remotion render %s status=%s progress=%s callback_delivered=%s",
render_id,
status,
data.get("progress_pct"),
data.get("callback_delivered"),
)
last_polled_status = status
if status == "done":
if data.get("callback_delivered") is True:
# Remotion already sent DONE webhook — exit cleanly
return
output_path = data.get("output_path")
if not output_path:
raise RuntimeError(
"Remotion render finished without output_path in polling response"
)
logger.warning(
"Remotion render %s finished without confirmed DONE webhook, sending fallback completion",
render_id,
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
progress_pct=PROGRESS_COMPLETE,
current_message="Готово",
output_data={"output_path": str(output_path)},
finished_at=_utc_now(),
),
)
return
if status == "failed":
error = data.get("error", "Render failed")
raise RuntimeError(f"Remotion render failed: {error}")
raise TimeoutError(
f"Render {render_id} did not complete within {RENDER_POLL_TIMEOUT_SECONDS}s"
)
except JobCancelledError:
logger.info("captions_generate_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("captions_generate_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
@dramatiq.actor(max_retries=0)
def frame_extract_actor(
job_id: str,
webhook_url: str,
file_key: str,
frames_folder: str,
regenerate: bool,
) -> None:
"""Extract video frames at 1fps for timeline thumbnails."""
from cpv3.modules.media.service import (
delete_frames,
extract_frames,
read_frames_metadata,
)
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("frame_extract_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
# Delete old frames if regenerating
if regenerate:
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_DELETING_OLD_FRAMES,
progress_pct=PROGRESS_FRAME_EXTRACT_START,
),
)
old_meta = _run_async(read_frames_metadata(storage, frames_folder=frames_folder))
if old_meta is not None:
_run_async(
delete_frames(
storage,
frames_folder=frames_folder,
frame_count=old_meta.frame_count,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_EXTRACTING_FRAMES,
progress_pct=PROGRESS_FRAME_EXTRACT_START,
),
)
last_report_time = time.monotonic()
def _on_progress(current: int, total: int) -> None:
nonlocal last_report_time
now = time.monotonic()
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
return
last_report_time = now
pct = current / total
mapped = PROGRESS_FRAME_EXTRACT_START + pct * (
PROGRESS_FRAME_EXTRACT_END - PROGRESS_FRAME_EXTRACT_START
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_UPLOADING_FRAMES,
progress_pct=round(mapped, 1),
),
)
metadata = _run_async(
extract_frames(
storage,
file_key=file_key,
frames_folder=frames_folder,
on_progress=_on_progress,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=metadata.model_dump(mode="json"),
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("frame_extract_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("frame_extract_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
# ---------------------------------------------------------------------------
# Task Service
# ---------------------------------------------------------------------------
class TaskService:
"""Service for submitting background tasks and recording webhook updates."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._job_repo = JobRepository(session)
self._event_repo = JobEventRepository(session)
self._webhook_repo = WebhookRepository(session)
def _get_project_workspace_service(self):
return ProjectWorkspaceService(self._session)
async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job:
"""Persist the transport-specific broker reference after enqueueing."""
job.broker_id = broker_reference
await self._session.commit()
await self._session.refresh(job)
return job
async def _find_duplicate_active_job(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
) -> Job | None:
"""Reuse an already running job for the same request payload."""
jobs = await self._job_repo.list_active_by_type(
requester=requester,
job_type=job_type,
project_id=project_id,
statuses=ACTIVE_JOB_STATUSES,
)
for job in jobs:
if job.input_data == input_data:
return job
return None
async def _create_cancellation_notification(self, job: Job) -> None:
"""Emit a single cancellation notification to the current user."""
if job.user_id is None:
return
notification_service = NotificationService(self._session)
await notification_service.create_task_notification(
user_id=job.user_id,
job=job,
event=TaskWebhookEvent(
status=JOB_STATUS_CANCELLED,
current_message=MESSAGE_CANCELLED,
finished_at=job.finished_at,
),
)
def _cancel_dramatiq_message_sync(self, broker_id: str | None) -> None:
"""Remove a queued Dramatiq message from Redis when possible."""
broker_reference = _parse_broker_reference(broker_id)
if broker_reference is None:
return
queue_name, redis_message_id = broker_reference
namespace = _redis_broker.namespace
queue_key = f"{namespace}:{queue_name}"
queue_messages_key = f"{queue_key}.msgs"
delayed_queue_key = f"{queue_key}.DQ"
delayed_messages_key = f"{delayed_queue_key}.msgs"
acks_pattern = f"{namespace}:__acks__.*.{queue_name}"
pipeline = _redis_broker.client.pipeline()
pipeline.lrem(queue_key, 1, redis_message_id)
pipeline.hdel(queue_messages_key, redis_message_id)
pipeline.lrem(delayed_queue_key, 1, redis_message_id)
pipeline.hdel(delayed_messages_key, redis_message_id)
for key in _redis_broker.client.scan_iter(match=acks_pattern):
pipeline.srem(key, redis_message_id)
pipeline.execute()
async def _cancel_dramatiq_message(self, broker_id: str | None) -> None:
"""Run Redis queue cleanup for a Dramatiq message off the event loop."""
await asyncio.to_thread(self._cancel_dramatiq_message_sync, broker_id)
async def _cancel_caption_render(self, job: Job) -> None:
"""Cancel the downstream Remotion render when it has already been queued."""
if job.job_type != JOB_TYPE_CAPTIONS_GENERATE:
return
output_data = job.output_data or {}
render_id = output_data.get("render_id") or str(job.id)
settings = get_settings()
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT_SECONDS) as client:
response = await client.delete(
f"{settings.remotion_service_url}/api/render/{render_id}"
)
if response.status_code == 404:
return
response.raise_for_status()
async def _create_job_and_webhook(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
) -> tuple[Job, str]:
"""Create job and webhook, return job and webhook URL."""
broker_id = uuid.uuid4().hex
job = await self._job_repo.create(
requester=requester,
data=JobCreate(
broker_id=broker_id,
project_id=project_id,
input_data=input_data,
job_type=job_type,
),
)
webhook_url = _build_webhook_url(job.id)
await self._webhook_repo.create(
requester=requester,
data=WebhookCreate(
project_id=project_id,
event=_build_webhook_event_name(job_type),
url=webhook_url,
),
)
return job, webhook_url
async def _submit_task(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
actor: Any,
actor_kwargs: dict[str, Any],
) -> TaskSubmitResponse:
job, webhook_url = await self._create_job_and_webhook(
requester=requester,
job_type=job_type,
project_id=project_id,
input_data=input_data,
)
message = actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs)
redis_message_id = message.options.get("redis_message_id")
if redis_message_id:
broker_reference = _serialize_broker_reference(
message.queue_name,
str(redis_message_id),
)
await self._update_job_broker_reference(job, broker_reference)
return TaskSubmitResponse(
job_id=job.id,
webhook_url=webhook_url,
status=JOB_STATUS_PENDING,
)
async def record_webhook_event(self, *, job_id: uuid.UUID, event: TaskWebhookEvent) -> Job:
"""Apply a webhook event to the job and store a job event record."""
job = await self._job_repo.get_by_id(job_id)
if job is None:
raise ValueError(f"Задача {job_id} не найдена")
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
logger.info("Ignoring webhook for terminal job %s (status=%s)", job_id, job.status)
return job
job_update = JobUpdate(
status=event.status,
project_pct=event.progress_pct,
current_message=event.current_message,
error_message=event.error_message,
output_data=event.output_data,
started_at=event.started_at,
finished_at=event.finished_at,
)
job = await self._job_repo.update(job, job_update)
event_type = _derive_event_type(event)
payload = event.model_dump(mode="json", exclude_none=True)
await self._event_repo.create(
JobEventCreate(job_id=job.id, event_type=event_type, payload=payload)
)
# Save artifacts BEFORE sending notifications so data exists when frontend refetches
if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE:
try:
job = await self._save_transcription_artifacts(job)
except Exception:
logger.exception("Failed to save transcription artifacts for job %s", job_id)
if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE:
try:
job = await self._save_convert_artifacts(job)
except Exception:
logger.exception("Failed to save convert artifacts for job %s", job_id)
if job.job_type == JOB_TYPE_SILENCE_APPLY and event.status == JOB_STATUS_DONE:
try:
job = await self._save_silence_apply_artifacts(job)
except Exception:
logger.exception("Failed to save silence apply artifacts for job %s", job_id)
if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE:
try:
job = await self._save_captions_artifacts(job)
except Exception:
logger.exception("Failed to save captions artifacts for job %s", job_id)
try:
await self._sync_project_workspace_after_webhook(job)
except Exception:
logger.exception("Failed to project workspace state for job %s", job_id)
# Push real-time notification via WebSocket (after artifacts are persisted)
if job.user_id is not None:
try:
notification_service = NotificationService(self._session)
await notification_service.create_task_notification(
user_id=job.user_id, job=job, event=event
)
except Exception:
logger.exception("Failed to create notification for job %s", job_id)
return job
async def _sync_project_workspace_after_webhook(self, job: Job) -> None:
if job.project_id is None:
return
await self._get_project_workspace_service().handle_job_update(job=job)
async def cancel_job(self, job: Job) -> Job:
"""Cancel a job, clean queued transport state and ignore late webhooks."""
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
return job
try:
await self._cancel_dramatiq_message(job.broker_id)
except Exception:
logger.exception("Failed to cancel Dramatiq message for job %s", job.id)
try:
await self._cancel_caption_render(job)
except Exception:
logger.exception("Failed to cancel caption render for job %s", job.id)
finished_at = _utc_now()
job = await self._job_repo.update(
job,
JobUpdate(
status=JOB_STATUS_CANCELLED,
current_message=MESSAGE_CANCELLED,
finished_at=finished_at,
),
)
await self._event_repo.create(
JobEventCreate(
job_id=job.id,
event_type=f"{EVENT_TYPE_STATUS_PREFIX}{JOB_STATUS_CANCELLED}",
payload={
"status": JOB_STATUS_CANCELLED,
"current_message": MESSAGE_CANCELLED,
"finished_at": finished_at.isoformat(),
},
)
)
try:
await self._create_cancellation_notification(job)
except Exception:
logger.exception("Failed to create cancellation notification for job %s", job.id)
try:
await self._sync_project_workspace_after_webhook(job)
except Exception:
logger.exception("Failed to project workspace state for cancelled job %s", job.id)
return job
async def _save_transcription_artifacts(self, job: Job) -> Job:
"""Create Transcription, ArtifactMediaFile and File records."""
input_data = job.input_data or {}
output_data = job.output_data or {}
file_key: str = input_data["file_key"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
engine_raw: str = input_data.get("engine", "whisper")
language: str | None = input_data.get("language")
document: dict = output_data["document"]
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping artifact save", job.user_id)
return job
# Find or create source File record
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(file_key)
if source_file is None:
source_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=project_id,
original_filename=file_key.rsplit("/", 1)[-1],
path=file_key,
storage_backend="S3",
mime_type="application/octet-stream",
size_bytes=0,
is_uploaded=True,
),
)
# Upload document JSON to S3
storage = _get_storage_service()
user_folder = get_user_folder(user)
json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8")
# Build display name: "Транскрибация <video_name>.json"
video_stem = Path(source_file.original_filename).stem
transcription_filename = f"Транскрибация {video_stem}.json"
artifact_key = await storage.upload_fileobj(
fileobj=io.BytesIO(json_bytes),
file_name=transcription_filename,
folder=f"{user_folder}/artifacts",
gen_name=True,
content_type="application/json",
)
# Create File record for the JSON artifact (no project_id — only reachable via artifact)
json_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=None,
original_filename=transcription_filename,
path=artifact_key,
storage_backend="S3",
mime_type="application/json",
size_bytes=len(json_bytes),
file_format="json",
is_uploaded=True,
),
)
# Create ArtifactMediaFile (no media_file_id — transcription is not a media file)
artifact_repo = ArtifactRepository(self._session)
artifact = await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=json_file.id,
media_file_id=None,
artifact_type="TRANSCRIPTION_JSON",
),
)
# Create Transcription record
transcription_repo = TranscriptionRepository(self._session)
engine_db = ENGINE_MAP.get(engine_raw, "LOCAL_WHISPER")
transcription = await transcription_repo.create(
data=TranscriptionCreate(
project_id=project_id,
source_file_id=source_file.id,
artifact_id=artifact.id,
engine=engine_db, # type: ignore[arg-type]
language=language,
document=document,
),
)
updated_output = dict(output_data)
updated_output["artifact_id"] = str(artifact.id)
updated_output["transcription_id"] = str(transcription.id)
updated_output["source_file_id"] = str(source_file.id)
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
logger.info("Saved transcription artifacts for job %s", job.id)
return job
async def _save_convert_artifacts(self, job: Job) -> Job:
"""Create File and ArtifactMediaFile records for converted MP4."""
input_data = job.input_data or {}
output_data = job.output_data or {}
file_key: str = input_data["file_key"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
file_path: str = output_data["file_path"]
file_size: int = output_data.get("file_size", 0)
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping convert artifact save", job.user_id)
return job
# Derive output filename from source file
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(file_key)
if source_file is not None:
stem = Path(source_file.original_filename).stem
else:
stem = Path(file_key).stem
converted_filename = f"Конвертированое видео {stem}.mp4"
# Create File record for the converted MP4 (no project_id — only reachable via artifact)
converted_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=None,
original_filename=converted_filename,
path=file_path,
storage_backend="S3",
mime_type="video/mp4",
size_bytes=file_size,
file_format="mp4",
is_uploaded=True,
),
)
# Create ArtifactMediaFile record
artifact_repo = ArtifactRepository(self._session)
await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=converted_file.id,
media_file_id=None,
artifact_type="CONVERTED_VIDEO",
),
)
updated_output = dict(output_data)
updated_output["file_id"] = str(converted_file.id)
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
logger.info("Saved convert artifacts for job %s", job.id)
return job
async def _save_silence_apply_artifacts(self, job: Job) -> Job:
"""Create File and ArtifactMediaFile records for silence-applied video."""
input_data = job.input_data or {}
output_data = job.output_data or {}
file_key: str = input_data["file_key"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
file_path: str = output_data["file_path"]
file_size: int = output_data.get("file_size", 0)
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping silence apply artifact save", job.user_id)
return job
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(file_key)
if source_file is not None:
stem = Path(source_file.original_filename).stem
else:
stem = Path(file_key).stem
processed_filename = f"Видео без тишины {stem}.mp4"
processed_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=project_id,
original_filename=processed_filename,
path=file_path,
storage_backend="S3",
mime_type="video/mp4",
size_bytes=file_size,
file_format="mp4",
is_uploaded=True,
),
)
artifact_repo = ArtifactRepository(self._session)
await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=processed_file.id,
media_file_id=None,
artifact_type="SILENCE_REMOVED_VIDEO",
),
)
updated_output = dict(output_data)
updated_output["file_id"] = str(processed_file.id)
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
logger.info(
"Saved silence apply artifacts for job %s (file_id=%s)",
job.id,
processed_file.id,
)
return job
async def _save_captions_artifacts(self, job: Job) -> Job:
"""Create File and ArtifactMediaFile records for captioned video."""
input_data = job.input_data or {}
output_data = job.output_data or {}
video_s3_path: str = input_data["video_s3_path"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
output_path: str = output_data["output_path"]
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping captions artifact save", job.user_id)
return job
# Get file size from S3
storage = _get_storage_service()
file_size = await storage.size(output_path)
# Derive output filename from source video
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(video_s3_path)
if source_file is not None:
stem = Path(source_file.original_filename).stem
else:
stem = Path(video_s3_path).stem
captioned_filename = f"Видео с субтитрами {stem}.mp4"
# Create File record
captioned_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=project_id,
original_filename=captioned_filename,
path=output_path,
storage_backend="S3",
mime_type="video/mp4",
size_bytes=file_size,
file_format="mp4",
is_uploaded=True,
),
)
# Create ArtifactMediaFile record
artifact_repo = ArtifactRepository(self._session)
await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=captioned_file.id,
media_file_id=None,
artifact_type="RENDERED_VIDEO",
),
)
# Update job output_data with file_id so frontend can reference it
updated_output = dict(output_data)
updated_output["file_id"] = str(captioned_file.id)
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id)
return job
async def submit_media_probe(
self, *, requester: User, request: MediaProbeRequest
) -> TaskSubmitResponse:
"""Submit media probe task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_MEDIA_PROBE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=media_probe_actor,
actor_kwargs={"file_key": request.file_key},
)
async def submit_silence_remove(
self, *, requester: User, request: SilenceRemoveRequest
) -> TaskSubmitResponse:
"""Submit silence removal task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_REMOVE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_remove_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"min_silence_duration_ms": request.min_silence_duration_ms,
"silence_threshold_db": request.silence_threshold_db,
"padding_ms": request.padding_ms,
},
)
async def submit_silence_detect(
self, *, requester: User, request: SilenceDetectRequest
) -> TaskSubmitResponse:
"""Submit silence detection task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_DETECT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_detect_actor,
actor_kwargs={
"file_key": request.file_key,
"min_silence_duration_ms": request.min_silence_duration_ms,
"silence_threshold_db": request.silence_threshold_db,
"padding_ms": request.padding_ms,
},
)
async def submit_silence_apply(
self, *, requester: User, request: SilenceApplyRequest
) -> TaskSubmitResponse:
"""Submit silence apply task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_APPLY,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_apply_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"cuts": request.cuts,
"output_name": request.output_name,
},
)
async def submit_media_convert(
self, *, requester: User, request: MediaConvertRequest
) -> TaskSubmitResponse:
"""Submit media conversion task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_MEDIA_CONVERT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=media_convert_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"output_format": request.output_format,
},
)
async def submit_transcription_generate(
self, *, requester: User, request: TranscriptionGenerateRequest
) -> TaskSubmitResponse:
"""Submit transcription generation task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_TRANSCRIPTION_GENERATE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=transcription_generate_actor,
actor_kwargs={
"file_key": request.file_key,
"engine": request.engine,
"language": request.language,
"model": request.model,
},
)
async def submit_frame_extract(
self, *, requester: User, request: FrameExtractRequest
) -> TaskSubmitResponse:
"""Submit frame extraction task."""
from cpv3.modules.media.service import get_frames_folder
user_folder = get_user_folder(requester)
frames_folder = get_frames_folder(user_folder, request.file_key)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_FRAME_EXTRACT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=frame_extract_actor,
actor_kwargs={
"file_key": request.file_key,
"frames_folder": frames_folder,
"regenerate": request.regenerate,
},
)
async def submit_captions_generate(
self, *, requester: User, request: CaptionsGenerateRequest
) -> TaskSubmitResponse:
"""Submit captions generation task."""
from cpv3.modules.captions.service import CaptionPresetService
input_data = request.model_dump(mode="json")
existing_job = await self._find_duplicate_active_job(
requester=requester,
job_type=JOB_TYPE_CAPTIONS_GENERATE,
project_id=request.project_id,
input_data=input_data,
)
if existing_job is not None:
return TaskSubmitResponse(
job_id=existing_job.id,
webhook_url=_build_webhook_url(existing_job.id),
status=existing_job.status,
)
transcription_repo = TranscriptionRepository(self._session)
transcription = await transcription_repo.get_by_id(request.transcription_id)
if transcription is None:
raise ValueError(f"Транскрипция {request.transcription_id} не найдена")
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.folder}" if request.folder else f"{user_folder}/output_files"
)
# Resolve style config from preset or inline override
preset_service = CaptionPresetService(self._session)
style_config = await preset_service.resolve_style_config(
preset_id=request.preset_id,
inline_config=request.style_config,
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_CAPTIONS_GENERATE,
project_id=request.project_id,
input_data=input_data,
actor=captions_generate_actor,
actor_kwargs={
"video_s3_path": request.video_s3_path,
"folder": resolved_folder,
"transcription_json": transcription.document,
"style_config": style_config,
},
)