Files
main_backend/cpv3/modules/tasks/service.py
T
2026-02-27 23:33:56 +03:00

1265 lines
41 KiB
Python

"""
Task service for submitting and managing background tasks.
"""
from __future__ import annotations
import asyncio
import io
import json
import logging
import time
import uuid
from pathlib import Path
from datetime import datetime, timezone
from typing import Any
import dramatiq # type: ignore[import-untyped]
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.infrastructure.deps import _get_storage_service
from cpv3.infrastructure.settings import get_settings
from cpv3.modules.files.repository import FileRepository
from cpv3.modules.files.schemas import FileCreate
from cpv3.modules.jobs.models import Job
from cpv3.modules.jobs.repository import JobEventRepository, JobRepository
from cpv3.modules.jobs.schemas import (
JobCreate,
JobEventCreate,
JobStatusEnum,
JobTypeEnum,
JobUpdate,
)
from cpv3.modules.media.repository import ArtifactRepository
from cpv3.modules.media.schemas import ArtifactMediaFileCreate
from cpv3.modules.tasks.schemas import (
CaptionsGenerateRequest,
FrameExtractRequest,
MediaConvertRequest,
MediaProbeRequest,
SilenceApplyRequest,
SilenceDetectRequest,
SilenceRemoveRequest,
TaskSubmitResponse,
TaskWebhookEvent,
TranscriptionGenerateRequest,
)
from cpv3.infrastructure.storage.utils import get_user_folder
from cpv3.modules.notifications.service import NotificationService
from cpv3.modules.transcription.repository import TranscriptionRepository
from cpv3.modules.transcription.schemas import TranscriptionCreate
from cpv3.modules.users.models import User
from cpv3.modules.users.repository import UserRepository
from cpv3.modules.webhooks.repository import WebhookRepository
from cpv3.modules.webhooks.schemas import WebhookCreate
logger = logging.getLogger(__name__)
JOB_STATUS_PENDING: JobStatusEnum = "PENDING"
JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING"
JOB_STATUS_DONE: JobStatusEnum = "DONE"
JOB_STATUS_FAILED: JobStatusEnum = "FAILED"
JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE"
JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE"
JOB_TYPE_SILENCE_DETECT: JobTypeEnum = "SILENCE_DETECT"
JOB_TYPE_SILENCE_APPLY: JobTypeEnum = "SILENCE_APPLY"
JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT"
JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE"
JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE"
JOB_TYPE_FRAME_EXTRACT: JobTypeEnum = "FRAME_EXTRACT"
EVENT_TYPE_STATUS_PREFIX = "status_"
EVENT_TYPE_PROGRESS = "progress"
EVENT_TYPE_LOG = "log"
EVENT_TYPE_OUTPUT = "output"
EVENT_TYPE_ERROR = "error"
TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/"
WEBHOOK_TIMEOUT_SECONDS = 10
ERROR_NO_AUDIO_STREAM = "Файл не содержит аудиодорожки"
ERROR_UNKNOWN_ENGINE = "Неизвестный движок транскрипции: {engine}"
ENGINE_MAP: dict[str, str] = {
"whisper": "LOCAL_WHISPER",
"google": "GOOGLE_SPEECH_CLOUD",
}
MESSAGE_STARTING = "Starting"
MESSAGE_COMPLETED = "Completed"
MESSAGE_PROBING_MEDIA = "Probing media"
MESSAGE_PROCESSING = "Processing"
MESSAGE_CONVERTING = "Converting"
MESSAGE_RENDERING_CAPTIONS = "Rendering captions"
MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров"
MESSAGE_UPLOADING_FRAMES = "Загрузка кадров"
MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров"
PROGRESS_COMPLETE = 100.0
PROGRESS_MEDIA_PROBE = 50.0
PROGRESS_SILENCE_REMOVE = 30.0
PROGRESS_MEDIA_CONVERT = 30.0
PROGRESS_TRANSCRIPTION_START = 20.0
PROGRESS_TRANSCRIPTION_END = 95.0
PROGRESS_CAPTIONS = 30.0
PROGRESS_FRAME_EXTRACT_START = 10.0
PROGRESS_FRAME_EXTRACT_END = 95.0
PROGRESS_SILENCE_DETECT = 30.0
PROGRESS_SILENCE_APPLY = 30.0
MESSAGE_DETECTING_SILENCE = "Обнаружение тишины"
MESSAGE_APPLYING_CUTS = "Применение вырезок"
PROGRESS_THROTTLE_SECONDS = 3.0
# ---------------------------------------------------------------------------
# Dramatiq broker setup
# ---------------------------------------------------------------------------
_settings = get_settings()
_redis_broker = RedisBroker(url=_settings.redis_url)
dramatiq.set_broker(_redis_broker)
# ---------------------------------------------------------------------------
# Webhook helpers for Dramatiq workers
# ---------------------------------------------------------------------------
def _utc_now() -> datetime:
"""Return current UTC time."""
return datetime.now(timezone.utc)
def _parse_frame_rate(rate_str: str) -> float | None:
"""Parse ffprobe frame rate string like '30/1' or '30000/1001'."""
try:
if "/" in rate_str:
num, den = rate_str.split("/")
den_val = int(den)
return round(int(num) / den_val, 3) if den_val else None
return float(rate_str)
except (ValueError, ZeroDivisionError):
return None
def _build_webhook_url(job_id: uuid.UUID) -> str:
"""Build the internal webhook URL for task updates."""
settings = get_settings()
base_url = settings.webhook_base_url.rstrip("/")
return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}"
def _build_webhook_event_name(job_type: JobTypeEnum) -> str:
"""Build webhook event name for a job type."""
return f"task.{job_type.lower()}"
def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None:
"""Send a task webhook event to the API."""
payload = event.model_dump(mode="json", exclude_none=True)
try:
response = httpx.post(
webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS
)
response.raise_for_status()
except Exception:
logger.exception("Failed to send task webhook event to %s", webhook_url)
raise
def _derive_event_type(event: TaskWebhookEvent) -> str:
"""Derive a job event type from a webhook event payload."""
if event.status is not None:
return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}"
if event.error_message is not None:
return EVENT_TYPE_ERROR
if event.progress_pct is not None:
return EVENT_TYPE_PROGRESS
if event.current_message is not None:
return EVENT_TYPE_LOG
if event.output_data is not None:
return EVENT_TYPE_OUTPUT
return EVENT_TYPE_LOG
def _run_async(coro: Any) -> Any:
"""Run async function in new event loop (for sync Dramatiq actors)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
# ---------------------------------------------------------------------------
# Dramatiq actors
# ---------------------------------------------------------------------------
@dramatiq.actor(max_retries=3, min_backoff=1000)
def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
"""Probe media file to extract metadata."""
from cpv3.modules.media.service import probe_media
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_PROBING_MEDIA,
progress_pct=PROGRESS_MEDIA_PROBE,
),
)
result = _run_async(probe_media(storage, file_key=file_key))
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=result.model_dump(mode="json"),
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("media_probe_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
def silence_remove_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
min_silence_duration_ms: int,
silence_threshold_db: int,
padding_ms: int,
) -> None:
"""Remove silence from media file."""
from cpv3.modules.media.service import remove_silence
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_PROCESSING,
progress_pct=PROGRESS_SILENCE_REMOVE,
),
)
result = _run_async(
remove_silence(
storage,
file_key=file_key,
out_folder=out_folder,
min_silence_duration_ms=min_silence_duration_ms,
silence_threshold_db=silence_threshold_db,
padding_ms=padding_ms,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("silence_remove_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
def silence_detect_actor(
job_id: str,
webhook_url: str,
file_key: str,
min_silence_duration_ms: int,
silence_threshold_db: int,
padding_ms: int,
) -> None:
"""Detect silent segments in media file."""
from cpv3.modules.media.service import detect_silence
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_DETECTING_SILENCE,
progress_pct=PROGRESS_SILENCE_DETECT,
),
)
result = _run_async(
detect_silence(
storage,
file_key=file_key,
min_silence_duration_ms=min_silence_duration_ms,
silence_threshold_db=silence_threshold_db,
padding_ms=padding_ms,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=result,
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("silence_detect_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
def silence_apply_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
cuts: list[dict],
output_name: str | None,
) -> None:
"""Apply silence cuts to media file."""
from cpv3.modules.media.service import apply_silence_cuts
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_APPLYING_CUTS,
progress_pct=PROGRESS_SILENCE_APPLY,
),
)
result = _run_async(
apply_silence_cuts(
storage,
file_key=file_key,
out_folder=out_folder,
cuts=cuts,
output_name=output_name,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("silence_apply_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
def media_convert_actor(
job_id: str,
webhook_url: str,
file_key: str,
out_folder: str,
output_format: str,
) -> None:
"""Convert media file to specified format."""
from cpv3.modules.media.service import convert_to_mp4
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
if output_format.lower() != "mp4":
raise ValueError(f"Unsupported format: {output_format}")
storage = _get_storage_service()
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_CONVERTING,
progress_pct=PROGRESS_MEDIA_CONVERT,
),
)
result = _run_async(
convert_to_mp4(storage, file_key=file_key, out_folder=out_folder)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"file_path": result.file_path,
"file_url": result.file_url,
"file_size": result.file_size,
},
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("media_convert_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
def transcription_generate_actor(
job_id: str,
webhook_url: str,
file_key: str,
engine: str,
language: str | None,
model: str,
) -> None:
"""Generate transcription from audio/video file."""
from cpv3.modules.transcription.service import (
transcribe_with_google_speech,
transcribe_with_whisper,
)
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
from cpv3.modules.media.service import probe_media
storage = _get_storage_service()
probe = _run_async(probe_media(storage, file_key=file_key))
has_audio = any(s.codec_type == "audio" for s in probe.streams)
if not has_audio:
raise ValueError(ERROR_NO_AUDIO_STREAM)
# Extract probe metadata for artifact creation
duration_seconds = float(probe.format.duration) if probe.format and probe.format.duration else 0.0
video_stream = next((s for s in probe.streams if s.codec_type == "video"), None)
probe_meta = {
"duration_seconds": duration_seconds,
"frame_rate": _parse_frame_rate(video_stream.r_frame_rate) if video_stream and video_stream.r_frame_rate else None,
"width": video_stream.width if video_stream else None,
"height": video_stream.height if video_stream else None,
}
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=f"Транскрибирование ({engine})",
progress_pct=PROGRESS_TRANSCRIPTION_START,
),
)
last_report_time = time.monotonic()
def _on_whisper_progress(pct: float) -> None:
nonlocal last_report_time
now = time.monotonic()
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
return
last_report_time = now
mapped = PROGRESS_TRANSCRIPTION_START + (
pct / 100.0
) * (PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=f"Транскрибирование ({engine})",
progress_pct=round(mapped, 1),
),
)
if engine == "whisper":
document = _run_async(
transcribe_with_whisper(
storage,
file_key=file_key,
model_name=model,
language=language,
on_progress=_on_whisper_progress,
)
)
elif engine == "google":
language_codes = [language] if language else None
document = _run_async(
transcribe_with_google_speech(
storage, file_key=file_key, language_codes=language_codes
)
)
else:
raise ValueError(ERROR_UNKNOWN_ENGINE.format(engine=engine))
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={
"document": document.model_dump(mode="json"),
"probe": probe_meta,
},
finished_at=_utc_now(),
),
)
except (ValueError, RuntimeError) as exc:
logger.exception(
"transcription_generate_actor failed (non-transient): %s", job_uuid
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("transcription_generate_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
def captions_generate_actor(
job_id: str,
webhook_url: str,
video_s3_path: str,
folder: str,
transcription_json: dict,
) -> None:
"""Generate captions on video."""
from cpv3.modules.captions.service import generate_captions
from cpv3.modules.transcription.schemas import Document
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_RENDERING_CAPTIONS,
progress_pct=PROGRESS_CAPTIONS,
),
)
document = Document.model_validate(transcription_json)
output_path = _run_async(
generate_captions(
video_s3_path=video_s3_path, folder=folder, transcription=document
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={"output_path": output_path},
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("captions_generate_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
def frame_extract_actor(
job_id: str,
webhook_url: str,
file_key: str,
frames_folder: str,
regenerate: bool,
) -> None:
"""Extract video frames at 1fps for timeline thumbnails."""
from cpv3.modules.media.service import (
delete_frames,
extract_frames,
read_frames_metadata,
)
job_uuid = uuid.UUID(job_id)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=MESSAGE_STARTING,
started_at=_utc_now(),
),
)
try:
storage = _get_storage_service()
# Delete old frames if regenerating
if regenerate:
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_DELETING_OLD_FRAMES,
progress_pct=PROGRESS_FRAME_EXTRACT_START,
),
)
old_meta = _run_async(
read_frames_metadata(storage, frames_folder=frames_folder)
)
if old_meta is not None:
_run_async(
delete_frames(
storage,
frames_folder=frames_folder,
frame_count=old_meta.frame_count,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_EXTRACTING_FRAMES,
progress_pct=PROGRESS_FRAME_EXTRACT_START,
),
)
last_report_time = time.monotonic()
def _on_progress(current: int, total: int) -> None:
nonlocal last_report_time
now = time.monotonic()
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
return
last_report_time = now
pct = current / total
mapped = PROGRESS_FRAME_EXTRACT_START + pct * (
PROGRESS_FRAME_EXTRACT_END - PROGRESS_FRAME_EXTRACT_START
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_UPLOADING_FRAMES,
progress_pct=round(mapped, 1),
),
)
metadata = _run_async(
extract_frames(
storage,
file_key=file_key,
frames_folder=frames_folder,
on_progress=_on_progress,
)
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data=metadata.model_dump(mode="json"),
finished_at=_utc_now(),
),
)
except Exception as exc:
logger.exception("frame_extract_actor failed: %s", job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
raise
# ---------------------------------------------------------------------------
# Task Service
# ---------------------------------------------------------------------------
class TaskService:
"""Service for submitting background tasks and recording webhook updates."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._job_repo = JobRepository(session)
self._event_repo = JobEventRepository(session)
self._webhook_repo = WebhookRepository(session)
async def _create_job_and_webhook(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
) -> tuple[Job, str]:
"""Create job and webhook, return job and webhook URL."""
broker_id = uuid.uuid4().hex
job = await self._job_repo.create(
requester=requester,
data=JobCreate(
broker_id=broker_id,
project_id=project_id,
input_data=input_data,
job_type=job_type,
),
)
webhook_url = _build_webhook_url(job.id)
await self._webhook_repo.create(
requester=requester,
data=WebhookCreate(
project_id=project_id,
event=_build_webhook_event_name(job_type),
url=webhook_url,
),
)
return job, webhook_url
async def _submit_task(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
actor: Any,
actor_kwargs: dict[str, Any],
) -> TaskSubmitResponse:
job, webhook_url = await self._create_job_and_webhook(
requester=requester,
job_type=job_type,
project_id=project_id,
input_data=input_data,
)
actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs)
return TaskSubmitResponse(
job_id=job.id,
webhook_url=webhook_url,
status=JOB_STATUS_PENDING,
)
async def record_webhook_event(
self, *, job_id: uuid.UUID, event: TaskWebhookEvent
) -> Job:
"""Apply a webhook event to the job and store a job event record."""
job = await self._job_repo.get_by_id(job_id)
if job is None:
raise ValueError(f"Job {job_id} not found")
job_update = JobUpdate(
status=event.status,
project_pct=event.progress_pct,
current_message=event.current_message,
error_message=event.error_message,
output_data=event.output_data,
started_at=event.started_at,
finished_at=event.finished_at,
)
job = await self._job_repo.update(job, job_update)
event_type = _derive_event_type(event)
payload = event.model_dump(mode="json", exclude_none=True)
await self._event_repo.create(
JobEventCreate(job_id=job.id, event_type=event_type, payload=payload)
)
# Save artifacts BEFORE sending notifications so data exists when frontend refetches
if (
job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE
and event.status == JOB_STATUS_DONE
):
try:
await self._save_transcription_artifacts(job)
except Exception:
logger.exception(
"Failed to save transcription artifacts for job %s", job_id
)
if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE:
try:
await self._save_convert_artifacts(job)
except Exception:
logger.exception(
"Failed to save convert artifacts for job %s", job_id
)
# Push real-time notification via WebSocket (after artifacts are persisted)
if job.user_id is not None:
try:
notification_service = NotificationService(self._session)
await notification_service.create_task_notification(
user_id=job.user_id, job=job, event=event
)
except Exception:
logger.exception("Failed to create notification for job %s", job_id)
return job
async def _save_transcription_artifacts(self, job: Job) -> None:
"""Create Transcription, ArtifactMediaFile and File records."""
input_data = job.input_data or {}
output_data = job.output_data or {}
file_key: str = input_data["file_key"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
engine_raw: str = input_data.get("engine", "whisper")
language: str | None = input_data.get("language")
document: dict = output_data["document"]
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping artifact save", job.user_id)
return
# Find or create source File record
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(file_key)
if source_file is None:
source_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=project_id,
original_filename=file_key.rsplit("/", 1)[-1],
path=file_key,
storage_backend="S3",
mime_type="application/octet-stream",
size_bytes=0,
is_uploaded=True,
),
)
# Upload document JSON to S3
storage = _get_storage_service()
user_folder = get_user_folder(user)
json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8")
# Build display name: "Транскрипция <video_name>.json"
video_stem = Path(source_file.original_filename).stem
transcription_filename = f"Транскрипция {video_stem}.json"
artifact_key = await storage.upload_fileobj(
fileobj=io.BytesIO(json_bytes),
file_name=transcription_filename,
folder=f"{user_folder}/artifacts",
gen_name=True,
content_type="application/json",
)
# Create File record for the JSON artifact (no project_id — only reachable via artifact)
json_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=None,
original_filename=transcription_filename,
path=artifact_key,
storage_backend="S3",
mime_type="application/json",
size_bytes=len(json_bytes),
file_format="json",
is_uploaded=True,
),
)
# Create ArtifactMediaFile (no media_file_id — transcription is not a media file)
artifact_repo = ArtifactRepository(self._session)
artifact = await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=json_file.id,
media_file_id=None,
artifact_type="TRANSCRIPTION_JSON",
),
)
# Create Transcription record
transcription_repo = TranscriptionRepository(self._session)
engine_db = ENGINE_MAP.get(engine_raw, "LOCAL_WHISPER")
await transcription_repo.create(
data=TranscriptionCreate(
project_id=project_id,
source_file_id=source_file.id,
artifact_id=artifact.id,
engine=engine_db, # type: ignore[arg-type]
language=language,
document=document,
),
)
logger.info("Saved transcription artifacts for job %s", job.id)
async def _save_convert_artifacts(self, job: Job) -> None:
"""Create File and ArtifactMediaFile records for converted MP4."""
input_data = job.input_data or {}
output_data = job.output_data or {}
file_key: str = input_data["file_key"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
file_path: str = output_data["file_path"]
file_size: int = output_data.get("file_size", 0)
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping convert artifact save", job.user_id)
return
# Derive output filename from source file
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(file_key)
if source_file is not None:
stem = Path(source_file.original_filename).stem
else:
stem = Path(file_key).stem
converted_filename = f"{stem}.mp4"
# Create File record for the converted MP4 (no project_id — only reachable via artifact)
converted_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=None,
original_filename=converted_filename,
path=file_path,
storage_backend="S3",
mime_type="video/mp4",
size_bytes=file_size,
file_format="mp4",
is_uploaded=True,
),
)
# Create ArtifactMediaFile record
artifact_repo = ArtifactRepository(self._session)
await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=converted_file.id,
media_file_id=None,
artifact_type="CONVERTED_VIDEO",
),
)
logger.info("Saved convert artifacts for job %s", job.id)
async def submit_media_probe(
self, *, requester: User, request: MediaProbeRequest
) -> TaskSubmitResponse:
"""Submit media probe task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_MEDIA_PROBE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=media_probe_actor,
actor_kwargs={"file_key": request.file_key},
)
async def submit_silence_remove(
self, *, requester: User, request: SilenceRemoveRequest
) -> TaskSubmitResponse:
"""Submit silence removal task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_REMOVE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_remove_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"min_silence_duration_ms": request.min_silence_duration_ms,
"silence_threshold_db": request.silence_threshold_db,
"padding_ms": request.padding_ms,
},
)
async def submit_silence_detect(
self, *, requester: User, request: SilenceDetectRequest
) -> TaskSubmitResponse:
"""Submit silence detection task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_DETECT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_detect_actor,
actor_kwargs={
"file_key": request.file_key,
"min_silence_duration_ms": request.min_silence_duration_ms,
"silence_threshold_db": request.silence_threshold_db,
"padding_ms": request.padding_ms,
},
)
async def submit_silence_apply(
self, *, requester: User, request: SilenceApplyRequest
) -> TaskSubmitResponse:
"""Submit silence apply task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_SILENCE_APPLY,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=silence_apply_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"cuts": request.cuts,
"output_name": request.output_name,
},
)
async def submit_media_convert(
self, *, requester: User, request: MediaConvertRequest
) -> TaskSubmitResponse:
"""Submit media conversion task."""
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.out_folder}"
if request.out_folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_MEDIA_CONVERT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=media_convert_actor,
actor_kwargs={
"file_key": request.file_key,
"out_folder": resolved_folder,
"output_format": request.output_format,
},
)
async def submit_transcription_generate(
self, *, requester: User, request: TranscriptionGenerateRequest
) -> TaskSubmitResponse:
"""Submit transcription generation task."""
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_TRANSCRIPTION_GENERATE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=transcription_generate_actor,
actor_kwargs={
"file_key": request.file_key,
"engine": request.engine,
"language": request.language,
"model": request.model,
},
)
async def submit_frame_extract(
self, *, requester: User, request: FrameExtractRequest
) -> TaskSubmitResponse:
"""Submit frame extraction task."""
from cpv3.modules.media.service import get_frames_folder
user_folder = get_user_folder(requester)
frames_folder = get_frames_folder(user_folder, request.file_key)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_FRAME_EXTRACT,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=frame_extract_actor,
actor_kwargs={
"file_key": request.file_key,
"frames_folder": frames_folder,
"regenerate": request.regenerate,
},
)
async def submit_captions_generate(
self, *, requester: User, request: CaptionsGenerateRequest
) -> TaskSubmitResponse:
"""Submit captions generation task."""
transcription_repo = TranscriptionRepository(self._session)
transcription = await transcription_repo.get_by_id(request.transcription_id)
if transcription is None:
raise ValueError(f"Transcription {request.transcription_id} not found")
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.folder}"
if request.folder
else f"{user_folder}/output_files"
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_CAPTIONS_GENERATE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
actor=captions_generate_actor,
actor_kwargs={
"video_s3_path": request.video_s3_path,
"folder": resolved_folder,
"transcription_json": transcription.document,
},
)