1900 lines
65 KiB
Python
1900 lines
65 KiB
Python
"""
|
||
Task service for submitting and managing background tasks.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import io
|
||
import json
|
||
import logging
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import dramatiq # type: ignore[import-untyped]
|
||
import httpx
|
||
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
|
||
from cpv3.infrastructure.deps import _get_storage_service
|
||
from cpv3.infrastructure.settings import get_settings
|
||
from cpv3.infrastructure.storage.utils import get_user_folder
|
||
from cpv3.modules.files.repository import FileRepository
|
||
from cpv3.modules.files.schemas import FileCreate
|
||
from cpv3.modules.jobs.models import Job
|
||
from cpv3.modules.jobs.repository import JobEventRepository, JobRepository
|
||
from cpv3.modules.jobs.schemas import (
|
||
JobCreate,
|
||
JobEventCreate,
|
||
JobStatusEnum,
|
||
JobTypeEnum,
|
||
JobUpdate,
|
||
)
|
||
from cpv3.modules.media.repository import ArtifactRepository
|
||
from cpv3.modules.media.schemas import ArtifactMediaFileCreate
|
||
from cpv3.modules.project_workspaces.service import ProjectWorkspaceService
|
||
from cpv3.modules.tasks.schemas import (
|
||
CaptionsGenerateRequest,
|
||
FrameExtractRequest,
|
||
MediaConvertRequest,
|
||
MediaProbeRequest,
|
||
SilenceApplyRequest,
|
||
SilenceDetectRequest,
|
||
SilenceRemoveRequest,
|
||
TaskSubmitResponse,
|
||
TaskWebhookEvent,
|
||
TranscriptionGenerateRequest,
|
||
)
|
||
from cpv3.modules.notifications.service import NotificationService
|
||
from cpv3.modules.transcription.repository import TranscriptionRepository
|
||
from cpv3.modules.transcription.schemas import TranscriptionCreate
|
||
from cpv3.modules.users.models import User
|
||
from cpv3.modules.users.repository import UserRepository
|
||
from cpv3.modules.webhooks.repository import WebhookRepository
|
||
from cpv3.modules.webhooks.schemas import WebhookCreate
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
JOB_STATUS_PENDING: JobStatusEnum = "PENDING"
|
||
JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING"
|
||
JOB_STATUS_DONE: JobStatusEnum = "DONE"
|
||
JOB_STATUS_FAILED: JobStatusEnum = "FAILED"
|
||
JOB_STATUS_CANCELLED: JobStatusEnum = "CANCELLED"
|
||
|
||
JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE"
|
||
JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE"
|
||
JOB_TYPE_SILENCE_DETECT: JobTypeEnum = "SILENCE_DETECT"
|
||
JOB_TYPE_SILENCE_APPLY: JobTypeEnum = "SILENCE_APPLY"
|
||
JOB_TYPE_MEDIA_CONVERT: JobTypeEnum = "MEDIA_CONVERT"
|
||
JOB_TYPE_TRANSCRIPTION_GENERATE: JobTypeEnum = "TRANSCRIPTION_GENERATE"
|
||
JOB_TYPE_CAPTIONS_GENERATE: JobTypeEnum = "CAPTIONS_GENERATE"
|
||
JOB_TYPE_FRAME_EXTRACT: JobTypeEnum = "FRAME_EXTRACT"
|
||
|
||
EVENT_TYPE_STATUS_PREFIX = "status_"
|
||
EVENT_TYPE_PROGRESS = "progress"
|
||
EVENT_TYPE_LOG = "log"
|
||
EVENT_TYPE_OUTPUT = "output"
|
||
EVENT_TYPE_ERROR = "error"
|
||
|
||
TASK_WEBHOOK_PATH = "/api/tasks/webhook/{job_id}/"
|
||
WEBHOOK_TIMEOUT_SECONDS = 10
|
||
|
||
ERROR_NO_AUDIO_STREAM = "Файл не содержит аудиодорожки"
|
||
ERROR_UNKNOWN_ENGINE = "Неизвестный движок транскрипции: {engine}"
|
||
|
||
ENGINE_MAP: dict[str, str] = {
|
||
"whisper": "LOCAL_WHISPER",
|
||
"google": "GOOGLE_SPEECH_CLOUD",
|
||
"salutespeech": "SALUTE_SPEECH",
|
||
}
|
||
|
||
MESSAGE_STARTING = "Запуск"
|
||
MESSAGE_COMPLETED = "Завершено"
|
||
MESSAGE_PROBING_MEDIA = "Анализ медиафайла"
|
||
MESSAGE_PROCESSING = "Обработка"
|
||
MESSAGE_CONVERTING = "Конвертация"
|
||
MESSAGE_PREPARING_FILE = "Подготовка файла"
|
||
MESSAGE_CONVERTING_VIDEO = "Конвертация видео"
|
||
MESSAGE_UPLOADING_RESULT = "Загрузка результата"
|
||
MESSAGE_SAVING_RESULT = "Сохранение результата"
|
||
MESSAGE_RENDERING_CAPTIONS = "Рендеринг субтитров"
|
||
MESSAGE_CANCELLED = "Отменено пользователем"
|
||
MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров"
|
||
MESSAGE_UPLOADING_FRAMES = "Загрузка кадров"
|
||
MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров"
|
||
|
||
PROGRESS_COMPLETE = 100.0
|
||
PROGRESS_MEDIA_PROBE = 50.0
|
||
PROGRESS_SILENCE_REMOVE = 30.0
|
||
PROGRESS_MEDIA_CONVERT = 30.0
|
||
PROGRESS_MEDIA_CONVERT_PREPARING = 5.0
|
||
PROGRESS_MEDIA_CONVERT_START = 10.0
|
||
PROGRESS_MEDIA_CONVERT_END = 95.0
|
||
PROGRESS_MEDIA_CONVERT_SAVING = 99.0
|
||
PROGRESS_SILENCE_APPLY_PREPARING = 5.0
|
||
PROGRESS_SILENCE_APPLY_START = 10.0
|
||
PROGRESS_SILENCE_APPLY_END = 95.0
|
||
PROGRESS_SILENCE_APPLY_SAVING = 99.0
|
||
PROGRESS_TRANSCRIPTION_START = 20.0
|
||
PROGRESS_TRANSCRIPTION_END = 95.0
|
||
PROGRESS_CAPTIONS = 30.0
|
||
PROGRESS_FRAME_EXTRACT_START = 10.0
|
||
PROGRESS_FRAME_EXTRACT_END = 95.0
|
||
|
||
PROGRESS_SILENCE_DETECT = 30.0
|
||
PROGRESS_SILENCE_APPLY = 30.0
|
||
|
||
MESSAGE_DETECTING_SILENCE = "Обнаружение тишины"
|
||
MESSAGE_APPLYING_CUTS = "Применение вырезок"
|
||
|
||
PROGRESS_THROTTLE_SECONDS = 3.0
|
||
PROGRESS_CONVERT_THROTTLE_SECONDS = 1.0
|
||
|
||
ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING)
|
||
DRAMATIQ_BROKER_REF_SEPARATOR = ":"
|
||
|
||
|
||
class JobCancelledError(RuntimeError):
|
||
"""Raised when a job was cancelled before completion."""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dramatiq broker setup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_settings = get_settings()
|
||
_redis_broker = RedisBroker(url=_settings.redis_url)
|
||
dramatiq.set_broker(_redis_broker)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Webhook helpers for Dramatiq workers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _utc_now() -> datetime:
|
||
"""Return current UTC time."""
|
||
return datetime.now(timezone.utc)
|
||
|
||
|
||
def _parse_frame_rate(rate_str: str) -> float | None:
|
||
"""Parse ffprobe frame rate string like '30/1' or '30000/1001'."""
|
||
try:
|
||
if "/" in rate_str:
|
||
num, den = rate_str.split("/")
|
||
den_val = int(den)
|
||
return round(int(num) / den_val, 3) if den_val else None
|
||
return float(rate_str)
|
||
except (ValueError, ZeroDivisionError):
|
||
return None
|
||
|
||
|
||
def _build_webhook_url(job_id: uuid.UUID) -> str:
|
||
"""Build the internal webhook URL for task updates."""
|
||
settings = get_settings()
|
||
base_url = settings.webhook_base_url.rstrip("/")
|
||
return f"{base_url}{TASK_WEBHOOK_PATH.format(job_id=job_id)}"
|
||
|
||
|
||
def _build_webhook_event_name(job_type: JobTypeEnum) -> str:
|
||
"""Build webhook event name for a job type."""
|
||
return f"task.{job_type.lower()}"
|
||
|
||
|
||
def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None:
|
||
"""Send a task webhook event to the API."""
|
||
payload = event.model_dump(mode="json", exclude_none=True)
|
||
try:
|
||
response = httpx.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS)
|
||
response.raise_for_status()
|
||
except Exception:
|
||
logger.exception("Failed to send task webhook event to %s", webhook_url)
|
||
raise
|
||
|
||
|
||
def _derive_event_type(event: TaskWebhookEvent) -> str:
|
||
"""Derive a job event type from a webhook event payload."""
|
||
if event.status is not None:
|
||
return f"{EVENT_TYPE_STATUS_PREFIX}{event.status}"
|
||
if event.error_message is not None:
|
||
return EVENT_TYPE_ERROR
|
||
if event.progress_pct is not None:
|
||
return EVENT_TYPE_PROGRESS
|
||
if event.current_message is not None:
|
||
return EVENT_TYPE_LOG
|
||
if event.output_data is not None:
|
||
return EVENT_TYPE_OUTPUT
|
||
return EVENT_TYPE_LOG
|
||
|
||
|
||
def _run_async(coro: Any) -> Any:
|
||
"""Run async function in new event loop (for sync Dramatiq actors)."""
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
return loop.run_until_complete(coro)
|
||
finally:
|
||
loop.close()
|
||
|
||
|
||
def _serialize_broker_reference(queue_name: str, redis_message_id: str) -> str:
|
||
"""Serialize queue name and Dramatiq redis message id into one field."""
|
||
return f"{queue_name}{DRAMATIQ_BROKER_REF_SEPARATOR}{redis_message_id}"
|
||
|
||
|
||
def _parse_broker_reference(broker_id: str | None) -> tuple[str, str] | None:
|
||
"""Parse queue name and Dramatiq redis message id from stored broker_id."""
|
||
if not broker_id or DRAMATIQ_BROKER_REF_SEPARATOR not in broker_id:
|
||
return None
|
||
|
||
queue_name, redis_message_id = broker_id.split(DRAMATIQ_BROKER_REF_SEPARATOR, 1)
|
||
if not queue_name or not redis_message_id:
|
||
return None
|
||
return queue_name, redis_message_id
|
||
|
||
|
||
def _get_job_status_sync(job_id: uuid.UUID) -> JobStatusEnum | None:
|
||
"""Read current job status using a sync connection (safe for Dramatiq workers)."""
|
||
import psycopg2
|
||
|
||
settings = get_settings()
|
||
dsn = (
|
||
f"host={settings.postgres_host} port={settings.postgres_port} "
|
||
f"dbname={settings.postgres_database} "
|
||
f"user={settings.postgres_user} password={settings.postgres_password}"
|
||
)
|
||
try:
|
||
conn = psycopg2.connect(dsn)
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT status FROM jobs WHERE id = %s", (str(job_id),))
|
||
row = cur.fetchone()
|
||
return row[0] if row else None
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
logger.warning("Failed to check job status for %s", job_id, exc_info=True)
|
||
return None
|
||
|
||
|
||
def _raise_if_job_cancelled(job_id: uuid.UUID) -> None:
|
||
"""Stop worker execution when the job is already cancelled in the database."""
|
||
status = _get_job_status_sync(job_id)
|
||
if status == JOB_STATUS_CANCELLED:
|
||
raise JobCancelledError(MESSAGE_CANCELLED)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dramatiq actors
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
|
||
"""Probe media file to extract metadata."""
|
||
from cpv3.modules.media.service import probe_media
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("media_probe_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
storage = _get_storage_service()
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_PROBING_MEDIA,
|
||
progress_pct=PROGRESS_MEDIA_PROBE,
|
||
),
|
||
)
|
||
result = _run_async(probe_media(storage, file_key=file_key))
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data=result.model_dump(mode="json"),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("media_probe_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("media_probe_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def silence_remove_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
out_folder: str,
|
||
min_silence_duration_ms: int,
|
||
silence_threshold_db: int,
|
||
padding_ms: int,
|
||
) -> None:
|
||
"""Remove silence from media file."""
|
||
from cpv3.modules.media.service import remove_silence
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("silence_remove_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
storage = _get_storage_service()
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_PROCESSING,
|
||
progress_pct=PROGRESS_SILENCE_REMOVE,
|
||
),
|
||
)
|
||
result = _run_async(
|
||
remove_silence(
|
||
storage,
|
||
file_key=file_key,
|
||
out_folder=out_folder,
|
||
min_silence_duration_ms=min_silence_duration_ms,
|
||
silence_threshold_db=silence_threshold_db,
|
||
padding_ms=padding_ms,
|
||
)
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data={
|
||
"file_path": result.file_path,
|
||
"file_url": result.file_url,
|
||
"file_size": result.file_size,
|
||
},
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("silence_remove_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("silence_remove_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def silence_detect_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
min_silence_duration_ms: int,
|
||
silence_threshold_db: int,
|
||
padding_ms: int,
|
||
) -> None:
|
||
"""Detect silent segments in media file."""
|
||
from cpv3.modules.media.service import detect_silence
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("silence_detect_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
storage = _get_storage_service()
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_DETECTING_SILENCE,
|
||
progress_pct=PROGRESS_SILENCE_DETECT,
|
||
),
|
||
)
|
||
result = _run_async(
|
||
detect_silence(
|
||
storage,
|
||
file_key=file_key,
|
||
min_silence_duration_ms=min_silence_duration_ms,
|
||
silence_threshold_db=silence_threshold_db,
|
||
padding_ms=padding_ms,
|
||
)
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data=result,
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("silence_detect_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("silence_detect_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def silence_apply_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
out_folder: str,
|
||
cuts: list[dict],
|
||
output_name: str | None,
|
||
) -> None:
|
||
"""Apply silence cuts to media file."""
|
||
from cpv3.modules.media.service import apply_silence_cuts
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("silence_apply_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_PREPARING_FILE,
|
||
progress_pct=PROGRESS_SILENCE_APPLY_PREPARING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
storage = _get_storage_service()
|
||
last_report_time = 0.0
|
||
last_progress = PROGRESS_SILENCE_APPLY_PREPARING
|
||
|
||
def _emit_silence_apply_progress(stage: str, pct: float | None) -> None:
|
||
nonlocal last_report_time, last_progress
|
||
|
||
if stage == "applying_cuts":
|
||
raw_pct = min(max(pct or 0.0, 0.0), 100.0)
|
||
if raw_pct >= 100.0:
|
||
return
|
||
mapped = PROGRESS_SILENCE_APPLY_START + (raw_pct / 100.0) * (
|
||
PROGRESS_SILENCE_APPLY_END - PROGRESS_SILENCE_APPLY_START
|
||
)
|
||
message = MESSAGE_APPLYING_CUTS
|
||
force = raw_pct == 0.0
|
||
elif stage == "uploading":
|
||
mapped = PROGRESS_SILENCE_APPLY_END
|
||
message = MESSAGE_UPLOADING_RESULT
|
||
force = True
|
||
else:
|
||
return
|
||
|
||
mapped = round(mapped, 1)
|
||
now = time.monotonic()
|
||
if not force:
|
||
if mapped <= last_progress:
|
||
return
|
||
if mapped - last_progress < 1.0:
|
||
return
|
||
if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS:
|
||
return
|
||
|
||
last_report_time = now
|
||
last_progress = max(last_progress, mapped)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=message,
|
||
progress_pct=last_progress,
|
||
),
|
||
)
|
||
|
||
result = _run_async(
|
||
apply_silence_cuts(
|
||
storage,
|
||
file_key=file_key,
|
||
out_folder=out_folder,
|
||
cuts=cuts,
|
||
output_name=output_name,
|
||
on_progress=_emit_silence_apply_progress,
|
||
)
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_SAVING_RESULT,
|
||
progress_pct=PROGRESS_SILENCE_APPLY_SAVING,
|
||
),
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data={
|
||
"file_path": result.file_path,
|
||
"file_url": result.file_url,
|
||
"file_size": result.file_size,
|
||
},
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("silence_apply_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("silence_apply_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def media_convert_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
out_folder: str,
|
||
output_format: str,
|
||
) -> None:
|
||
"""Convert media file to specified format."""
|
||
from cpv3.modules.media.service import convert_to_mp4
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("media_convert_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_PREPARING_FILE,
|
||
progress_pct=PROGRESS_MEDIA_CONVERT_PREPARING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
if output_format.lower() != "mp4":
|
||
raise ValueError(f"Неподдерживаемый формат: {output_format}")
|
||
|
||
storage = _get_storage_service()
|
||
last_report_time = 0.0
|
||
last_progress = PROGRESS_MEDIA_CONVERT_PREPARING
|
||
|
||
def _emit_convert_progress(stage: str, pct: float | None) -> None:
|
||
nonlocal last_report_time, last_progress
|
||
|
||
if stage == "converting":
|
||
raw_pct = min(max(pct or 0.0, 0.0), 100.0)
|
||
if raw_pct >= 100.0:
|
||
return
|
||
mapped = PROGRESS_MEDIA_CONVERT_START + (raw_pct / 100.0) * (
|
||
PROGRESS_MEDIA_CONVERT_END - PROGRESS_MEDIA_CONVERT_START
|
||
)
|
||
message = MESSAGE_CONVERTING_VIDEO
|
||
force = raw_pct == 0.0
|
||
elif stage == "uploading":
|
||
mapped = PROGRESS_MEDIA_CONVERT_END
|
||
message = MESSAGE_UPLOADING_RESULT
|
||
force = True
|
||
else:
|
||
return
|
||
|
||
mapped = round(mapped, 1)
|
||
now = time.monotonic()
|
||
if not force:
|
||
if mapped <= last_progress:
|
||
return
|
||
if mapped - last_progress < 1.0:
|
||
return
|
||
if now - last_report_time < PROGRESS_CONVERT_THROTTLE_SECONDS:
|
||
return
|
||
|
||
last_report_time = now
|
||
last_progress = max(last_progress, mapped)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=message,
|
||
progress_pct=last_progress,
|
||
),
|
||
)
|
||
|
||
result = _run_async(
|
||
convert_to_mp4(
|
||
storage,
|
||
file_key=file_key,
|
||
out_folder=out_folder,
|
||
on_progress=_emit_convert_progress,
|
||
)
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_SAVING_RESULT,
|
||
progress_pct=PROGRESS_MEDIA_CONVERT_SAVING,
|
||
),
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data={
|
||
"file_path": result.file_path,
|
||
"file_url": result.file_url,
|
||
"file_size": result.file_size,
|
||
},
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("media_convert_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("media_convert_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def transcription_generate_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
engine: str,
|
||
language: str | None,
|
||
model: str,
|
||
) -> None:
|
||
"""Generate transcription from audio/video file."""
|
||
from cpv3.modules.transcription.service import (
|
||
transcribe_with_google_speech,
|
||
transcribe_with_salute_speech,
|
||
transcribe_with_whisper,
|
||
)
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
from cpv3.modules.media.service import probe_media
|
||
|
||
storage = _get_storage_service()
|
||
|
||
probe = _run_async(probe_media(storage, file_key=file_key))
|
||
has_audio = any(s.codec_type == "audio" for s in probe.streams)
|
||
if not has_audio:
|
||
raise ValueError(ERROR_NO_AUDIO_STREAM)
|
||
|
||
# Extract probe metadata for artifact creation
|
||
duration_seconds = (
|
||
float(probe.format.duration) if probe.format and probe.format.duration else 0.0
|
||
)
|
||
video_stream = next((s for s in probe.streams if s.codec_type == "video"), None)
|
||
probe_meta = {
|
||
"duration_seconds": duration_seconds,
|
||
"frame_rate": _parse_frame_rate(video_stream.r_frame_rate)
|
||
if video_stream and video_stream.r_frame_rate
|
||
else None,
|
||
"width": video_stream.width if video_stream else None,
|
||
"height": video_stream.height if video_stream else None,
|
||
}
|
||
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=f"Транскрибирование ({engine})",
|
||
progress_pct=PROGRESS_TRANSCRIPTION_START,
|
||
),
|
||
)
|
||
|
||
last_report_time = time.monotonic()
|
||
|
||
def _on_whisper_progress(pct: float) -> None:
|
||
nonlocal last_report_time
|
||
now = time.monotonic()
|
||
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
|
||
return
|
||
last_report_time = now
|
||
mapped = PROGRESS_TRANSCRIPTION_START + (pct / 100.0) * (
|
||
PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=f"Транскрибирование ({engine})",
|
||
progress_pct=round(mapped, 1),
|
||
),
|
||
)
|
||
|
||
if engine == "whisper":
|
||
document = _run_async(
|
||
transcribe_with_whisper(
|
||
storage,
|
||
file_key=file_key,
|
||
model_name=model,
|
||
language=language,
|
||
on_progress=_on_whisper_progress,
|
||
)
|
||
)
|
||
elif engine == "google":
|
||
language_codes = [language] if language else None
|
||
document = _run_async(
|
||
transcribe_with_google_speech(
|
||
storage, file_key=file_key, language_codes=language_codes
|
||
)
|
||
)
|
||
elif engine == "salutespeech":
|
||
audio_stream = next((s for s in probe.streams if s.codec_type == "audio"), None)
|
||
sr = (
|
||
int(audio_stream.sample_rate)
|
||
if audio_stream and audio_stream.sample_rate
|
||
else 16000
|
||
)
|
||
document = _run_async(
|
||
transcribe_with_salute_speech(
|
||
storage,
|
||
file_key=file_key,
|
||
language=language,
|
||
model=model,
|
||
sample_rate=sr,
|
||
job_id=job_uuid,
|
||
on_progress=_on_whisper_progress,
|
||
)
|
||
)
|
||
else:
|
||
raise ValueError(ERROR_UNKNOWN_ENGINE.format(engine=engine))
|
||
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data={
|
||
"document": document.model_dump(mode="json"),
|
||
"probe": probe_meta,
|
||
},
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("transcription_generate_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
RENDER_POLL_INTERVAL_SECONDS = 5
|
||
RENDER_POLL_TIMEOUT_SECONDS = 600
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def captions_generate_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
video_s3_path: str,
|
||
folder: str,
|
||
transcription_json: dict,
|
||
style_config: dict | None = None,
|
||
) -> None:
|
||
"""Generate captions on video (async via Remotion + BullMQ)."""
|
||
from cpv3.modules.captions.service import generate_captions
|
||
from cpv3.modules.transcription.schemas import Document
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("captions_generate_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_RENDERING_CAPTIONS,
|
||
progress_pct=PROGRESS_CAPTIONS,
|
||
),
|
||
)
|
||
document = Document.model_validate(transcription_json)
|
||
|
||
# Call Remotion with callback_url so it sends progress webhooks directly
|
||
render_id = _run_async(
|
||
generate_captions(
|
||
video_s3_path=video_s3_path,
|
||
folder=folder,
|
||
transcription=document,
|
||
style_config=style_config,
|
||
callback_url=webhook_url,
|
||
render_id=job_id,
|
||
)
|
||
)
|
||
_raise_if_job_cancelled(job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_RENDERING_CAPTIONS,
|
||
output_data={"render_id": render_id},
|
||
),
|
||
)
|
||
|
||
# Polling fallback: wait for Remotion to finish rendering
|
||
# Primary progress is delivered via Remotion → webhook directly
|
||
settings = get_settings()
|
||
elapsed = 0.0
|
||
last_polled_status: str | None = None
|
||
while elapsed < RENDER_POLL_TIMEOUT_SECONDS:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
time.sleep(RENDER_POLL_INTERVAL_SECONDS)
|
||
elapsed += RENDER_POLL_INTERVAL_SECONDS
|
||
|
||
try:
|
||
resp = httpx.get(
|
||
f"{settings.remotion_service_url}/api/render/{render_id}",
|
||
timeout=10,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
except Exception:
|
||
logger.warning("Render poll failed for %s, retrying...", render_id)
|
||
continue
|
||
|
||
status = data.get("status")
|
||
if status != last_polled_status:
|
||
logger.info(
|
||
"Remotion render %s status=%s progress=%s callback_delivered=%s",
|
||
render_id,
|
||
status,
|
||
data.get("progress_pct"),
|
||
data.get("callback_delivered"),
|
||
)
|
||
last_polled_status = status
|
||
|
||
if status == "done":
|
||
if data.get("callback_delivered") is True:
|
||
# Remotion already sent DONE webhook — exit cleanly
|
||
return
|
||
|
||
output_path = data.get("output_path")
|
||
if not output_path:
|
||
raise RuntimeError(
|
||
"Remotion render finished without output_path in polling response"
|
||
)
|
||
|
||
logger.warning(
|
||
"Remotion render %s finished without confirmed DONE webhook, sending fallback completion",
|
||
render_id,
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
current_message="Готово",
|
||
output_data={"output_path": str(output_path)},
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
return
|
||
if status == "failed":
|
||
error = data.get("error", "Render failed")
|
||
raise RuntimeError(f"Remotion render failed: {error}")
|
||
|
||
raise TimeoutError(
|
||
f"Render {render_id} did not complete within {RENDER_POLL_TIMEOUT_SECONDS}s"
|
||
)
|
||
|
||
except JobCancelledError:
|
||
logger.info("captions_generate_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("captions_generate_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
@dramatiq.actor(max_retries=0)
|
||
def frame_extract_actor(
|
||
job_id: str,
|
||
webhook_url: str,
|
||
file_key: str,
|
||
frames_folder: str,
|
||
regenerate: bool,
|
||
) -> None:
|
||
"""Extract video frames at 1fps for timeline thumbnails."""
|
||
from cpv3.modules.media.service import (
|
||
delete_frames,
|
||
extract_frames,
|
||
read_frames_metadata,
|
||
)
|
||
|
||
job_uuid = uuid.UUID(job_id)
|
||
try:
|
||
_raise_if_job_cancelled(job_uuid)
|
||
except JobCancelledError:
|
||
logger.info("frame_extract_actor cancelled: %s", job_uuid)
|
||
return
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_RUNNING,
|
||
current_message=MESSAGE_STARTING,
|
||
started_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
try:
|
||
storage = _get_storage_service()
|
||
|
||
# Delete old frames if regenerating
|
||
if regenerate:
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_DELETING_OLD_FRAMES,
|
||
progress_pct=PROGRESS_FRAME_EXTRACT_START,
|
||
),
|
||
)
|
||
old_meta = _run_async(read_frames_metadata(storage, frames_folder=frames_folder))
|
||
if old_meta is not None:
|
||
_run_async(
|
||
delete_frames(
|
||
storage,
|
||
frames_folder=frames_folder,
|
||
frame_count=old_meta.frame_count,
|
||
)
|
||
)
|
||
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_EXTRACTING_FRAMES,
|
||
progress_pct=PROGRESS_FRAME_EXTRACT_START,
|
||
),
|
||
)
|
||
|
||
last_report_time = time.monotonic()
|
||
|
||
def _on_progress(current: int, total: int) -> None:
|
||
nonlocal last_report_time
|
||
now = time.monotonic()
|
||
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
|
||
return
|
||
last_report_time = now
|
||
pct = current / total
|
||
mapped = PROGRESS_FRAME_EXTRACT_START + pct * (
|
||
PROGRESS_FRAME_EXTRACT_END - PROGRESS_FRAME_EXTRACT_START
|
||
)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
current_message=MESSAGE_UPLOADING_FRAMES,
|
||
progress_pct=round(mapped, 1),
|
||
),
|
||
)
|
||
|
||
metadata = _run_async(
|
||
extract_frames(
|
||
storage,
|
||
file_key=file_key,
|
||
frames_folder=frames_folder,
|
||
on_progress=_on_progress,
|
||
)
|
||
)
|
||
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_DONE,
|
||
current_message=MESSAGE_COMPLETED,
|
||
progress_pct=PROGRESS_COMPLETE,
|
||
output_data=metadata.model_dump(mode="json"),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
except JobCancelledError:
|
||
logger.info("frame_extract_actor cancelled: %s", job_uuid)
|
||
return
|
||
except Exception as exc:
|
||
logger.exception("frame_extract_actor failed: %s", job_uuid)
|
||
_send_webhook_event(
|
||
webhook_url,
|
||
TaskWebhookEvent(
|
||
status=JOB_STATUS_FAILED,
|
||
error_message=str(exc),
|
||
finished_at=_utc_now(),
|
||
),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Task Service
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TaskService:
|
||
"""Service for submitting background tasks and recording webhook updates."""
|
||
|
||
def __init__(self, session: AsyncSession) -> None:
|
||
self._session = session
|
||
self._job_repo = JobRepository(session)
|
||
self._event_repo = JobEventRepository(session)
|
||
self._webhook_repo = WebhookRepository(session)
|
||
|
||
def _get_project_workspace_service(self):
|
||
return ProjectWorkspaceService(self._session)
|
||
|
||
async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job:
|
||
"""Persist the transport-specific broker reference after enqueueing."""
|
||
job.broker_id = broker_reference
|
||
await self._session.commit()
|
||
await self._session.refresh(job)
|
||
return job
|
||
|
||
async def _find_duplicate_active_job(
|
||
self,
|
||
*,
|
||
requester: User,
|
||
job_type: JobTypeEnum,
|
||
project_id: uuid.UUID | None,
|
||
input_data: dict,
|
||
) -> Job | None:
|
||
"""Reuse an already running job for the same request payload."""
|
||
jobs = await self._job_repo.list_active_by_type(
|
||
requester=requester,
|
||
job_type=job_type,
|
||
project_id=project_id,
|
||
statuses=ACTIVE_JOB_STATUSES,
|
||
)
|
||
|
||
for job in jobs:
|
||
if job.input_data == input_data:
|
||
return job
|
||
|
||
return None
|
||
|
||
async def _create_cancellation_notification(self, job: Job) -> None:
|
||
"""Emit a single cancellation notification to the current user."""
|
||
if job.user_id is None:
|
||
return
|
||
|
||
notification_service = NotificationService(self._session)
|
||
await notification_service.create_task_notification(
|
||
user_id=job.user_id,
|
||
job=job,
|
||
event=TaskWebhookEvent(
|
||
status=JOB_STATUS_CANCELLED,
|
||
current_message=MESSAGE_CANCELLED,
|
||
finished_at=job.finished_at,
|
||
),
|
||
)
|
||
|
||
def _cancel_dramatiq_message_sync(self, broker_id: str | None) -> None:
|
||
"""Remove a queued Dramatiq message from Redis when possible."""
|
||
broker_reference = _parse_broker_reference(broker_id)
|
||
if broker_reference is None:
|
||
return
|
||
|
||
queue_name, redis_message_id = broker_reference
|
||
namespace = _redis_broker.namespace
|
||
queue_key = f"{namespace}:{queue_name}"
|
||
queue_messages_key = f"{queue_key}.msgs"
|
||
delayed_queue_key = f"{queue_key}.DQ"
|
||
delayed_messages_key = f"{delayed_queue_key}.msgs"
|
||
acks_pattern = f"{namespace}:__acks__.*.{queue_name}"
|
||
|
||
pipeline = _redis_broker.client.pipeline()
|
||
pipeline.lrem(queue_key, 1, redis_message_id)
|
||
pipeline.hdel(queue_messages_key, redis_message_id)
|
||
pipeline.lrem(delayed_queue_key, 1, redis_message_id)
|
||
pipeline.hdel(delayed_messages_key, redis_message_id)
|
||
|
||
for key in _redis_broker.client.scan_iter(match=acks_pattern):
|
||
pipeline.srem(key, redis_message_id)
|
||
|
||
pipeline.execute()
|
||
|
||
async def _cancel_dramatiq_message(self, broker_id: str | None) -> None:
|
||
"""Run Redis queue cleanup for a Dramatiq message off the event loop."""
|
||
await asyncio.to_thread(self._cancel_dramatiq_message_sync, broker_id)
|
||
|
||
async def _cancel_caption_render(self, job: Job) -> None:
|
||
"""Cancel the downstream Remotion render when it has already been queued."""
|
||
if job.job_type != JOB_TYPE_CAPTIONS_GENERATE:
|
||
return
|
||
|
||
output_data = job.output_data or {}
|
||
render_id = output_data.get("render_id") or str(job.id)
|
||
|
||
settings = get_settings()
|
||
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT_SECONDS) as client:
|
||
response = await client.delete(
|
||
f"{settings.remotion_service_url}/api/render/{render_id}"
|
||
)
|
||
if response.status_code == 404:
|
||
return
|
||
response.raise_for_status()
|
||
|
||
async def _create_job_and_webhook(
|
||
self,
|
||
*,
|
||
requester: User,
|
||
job_type: JobTypeEnum,
|
||
project_id: uuid.UUID | None,
|
||
input_data: dict,
|
||
) -> tuple[Job, str]:
|
||
"""Create job and webhook, return job and webhook URL."""
|
||
broker_id = uuid.uuid4().hex
|
||
|
||
job = await self._job_repo.create(
|
||
requester=requester,
|
||
data=JobCreate(
|
||
broker_id=broker_id,
|
||
project_id=project_id,
|
||
input_data=input_data,
|
||
job_type=job_type,
|
||
),
|
||
)
|
||
|
||
webhook_url = _build_webhook_url(job.id)
|
||
await self._webhook_repo.create(
|
||
requester=requester,
|
||
data=WebhookCreate(
|
||
project_id=project_id,
|
||
event=_build_webhook_event_name(job_type),
|
||
url=webhook_url,
|
||
),
|
||
)
|
||
|
||
return job, webhook_url
|
||
|
||
async def _submit_task(
|
||
self,
|
||
*,
|
||
requester: User,
|
||
job_type: JobTypeEnum,
|
||
project_id: uuid.UUID | None,
|
||
input_data: dict,
|
||
actor: Any,
|
||
actor_kwargs: dict[str, Any],
|
||
) -> TaskSubmitResponse:
|
||
job, webhook_url = await self._create_job_and_webhook(
|
||
requester=requester,
|
||
job_type=job_type,
|
||
project_id=project_id,
|
||
input_data=input_data,
|
||
)
|
||
message = actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs)
|
||
redis_message_id = message.options.get("redis_message_id")
|
||
if redis_message_id:
|
||
broker_reference = _serialize_broker_reference(
|
||
message.queue_name,
|
||
str(redis_message_id),
|
||
)
|
||
await self._update_job_broker_reference(job, broker_reference)
|
||
|
||
return TaskSubmitResponse(
|
||
job_id=job.id,
|
||
webhook_url=webhook_url,
|
||
status=JOB_STATUS_PENDING,
|
||
)
|
||
|
||
async def record_webhook_event(self, *, job_id: uuid.UUID, event: TaskWebhookEvent) -> Job:
|
||
"""Apply a webhook event to the job and store a job event record."""
|
||
job = await self._job_repo.get_by_id(job_id)
|
||
if job is None:
|
||
raise ValueError(f"Задача {job_id} не найдена")
|
||
|
||
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
|
||
logger.info("Ignoring webhook for terminal job %s (status=%s)", job_id, job.status)
|
||
return job
|
||
|
||
job_update = JobUpdate(
|
||
status=event.status,
|
||
project_pct=event.progress_pct,
|
||
current_message=event.current_message,
|
||
error_message=event.error_message,
|
||
output_data=event.output_data,
|
||
started_at=event.started_at,
|
||
finished_at=event.finished_at,
|
||
)
|
||
job = await self._job_repo.update(job, job_update)
|
||
|
||
event_type = _derive_event_type(event)
|
||
payload = event.model_dump(mode="json", exclude_none=True)
|
||
await self._event_repo.create(
|
||
JobEventCreate(job_id=job.id, event_type=event_type, payload=payload)
|
||
)
|
||
|
||
# Save artifacts BEFORE sending notifications so data exists when frontend refetches
|
||
if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE:
|
||
try:
|
||
job = await self._save_transcription_artifacts(job)
|
||
except Exception:
|
||
logger.exception("Failed to save transcription artifacts for job %s", job_id)
|
||
|
||
if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE:
|
||
try:
|
||
job = await self._save_convert_artifacts(job)
|
||
except Exception:
|
||
logger.exception("Failed to save convert artifacts for job %s", job_id)
|
||
|
||
if job.job_type == JOB_TYPE_SILENCE_APPLY and event.status == JOB_STATUS_DONE:
|
||
try:
|
||
job = await self._save_silence_apply_artifacts(job)
|
||
except Exception:
|
||
logger.exception("Failed to save silence apply artifacts for job %s", job_id)
|
||
|
||
if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE:
|
||
try:
|
||
job = await self._save_captions_artifacts(job)
|
||
except Exception:
|
||
logger.exception("Failed to save captions artifacts for job %s", job_id)
|
||
|
||
try:
|
||
await self._sync_project_workspace_after_webhook(job)
|
||
except Exception:
|
||
logger.exception("Failed to project workspace state for job %s", job_id)
|
||
|
||
# Push real-time notification via WebSocket (after artifacts are persisted)
|
||
if job.user_id is not None:
|
||
try:
|
||
notification_service = NotificationService(self._session)
|
||
await notification_service.create_task_notification(
|
||
user_id=job.user_id, job=job, event=event
|
||
)
|
||
except Exception:
|
||
logger.exception("Failed to create notification for job %s", job_id)
|
||
|
||
return job
|
||
|
||
async def _sync_project_workspace_after_webhook(self, job: Job) -> None:
|
||
if job.project_id is None:
|
||
return
|
||
await self._get_project_workspace_service().handle_job_update(job=job)
|
||
|
||
async def cancel_job(self, job: Job) -> Job:
|
||
"""Cancel a job, clean queued transport state and ignore late webhooks."""
|
||
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
|
||
return job
|
||
|
||
try:
|
||
await self._cancel_dramatiq_message(job.broker_id)
|
||
except Exception:
|
||
logger.exception("Failed to cancel Dramatiq message for job %s", job.id)
|
||
|
||
try:
|
||
await self._cancel_caption_render(job)
|
||
except Exception:
|
||
logger.exception("Failed to cancel caption render for job %s", job.id)
|
||
|
||
finished_at = _utc_now()
|
||
job = await self._job_repo.update(
|
||
job,
|
||
JobUpdate(
|
||
status=JOB_STATUS_CANCELLED,
|
||
current_message=MESSAGE_CANCELLED,
|
||
finished_at=finished_at,
|
||
),
|
||
)
|
||
|
||
await self._event_repo.create(
|
||
JobEventCreate(
|
||
job_id=job.id,
|
||
event_type=f"{EVENT_TYPE_STATUS_PREFIX}{JOB_STATUS_CANCELLED}",
|
||
payload={
|
||
"status": JOB_STATUS_CANCELLED,
|
||
"current_message": MESSAGE_CANCELLED,
|
||
"finished_at": finished_at.isoformat(),
|
||
},
|
||
)
|
||
)
|
||
|
||
try:
|
||
await self._create_cancellation_notification(job)
|
||
except Exception:
|
||
logger.exception("Failed to create cancellation notification for job %s", job.id)
|
||
|
||
try:
|
||
await self._sync_project_workspace_after_webhook(job)
|
||
except Exception:
|
||
logger.exception("Failed to project workspace state for cancelled job %s", job.id)
|
||
|
||
return job
|
||
|
||
async def _save_transcription_artifacts(self, job: Job) -> Job:
|
||
"""Create Transcription, ArtifactMediaFile and File records."""
|
||
input_data = job.input_data or {}
|
||
output_data = job.output_data or {}
|
||
|
||
file_key: str = input_data["file_key"]
|
||
project_id: uuid.UUID | None = (
|
||
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
|
||
)
|
||
engine_raw: str = input_data.get("engine", "whisper")
|
||
language: str | None = input_data.get("language")
|
||
|
||
document: dict = output_data["document"]
|
||
|
||
# Resolve user
|
||
user_repo = UserRepository(self._session)
|
||
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
|
||
if user is None:
|
||
logger.warning("User %s not found, skipping artifact save", job.user_id)
|
||
return job
|
||
|
||
# Find or create source File record
|
||
file_repo = FileRepository(self._session)
|
||
source_file = await file_repo.get_by_path(file_key)
|
||
if source_file is None:
|
||
source_file = await file_repo.create(
|
||
requester=user,
|
||
data=FileCreate(
|
||
project_id=project_id,
|
||
original_filename=file_key.rsplit("/", 1)[-1],
|
||
path=file_key,
|
||
storage_backend="S3",
|
||
mime_type="application/octet-stream",
|
||
size_bytes=0,
|
||
is_uploaded=True,
|
||
),
|
||
)
|
||
|
||
# Upload document JSON to S3
|
||
storage = _get_storage_service()
|
||
user_folder = get_user_folder(user)
|
||
json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8")
|
||
|
||
# Build display name: "Транскрибация <video_name>.json"
|
||
video_stem = Path(source_file.original_filename).stem
|
||
transcription_filename = f"Транскрибация {video_stem}.json"
|
||
|
||
artifact_key = await storage.upload_fileobj(
|
||
fileobj=io.BytesIO(json_bytes),
|
||
file_name=transcription_filename,
|
||
folder=f"{user_folder}/artifacts",
|
||
gen_name=True,
|
||
content_type="application/json",
|
||
)
|
||
|
||
# Create File record for the JSON artifact (no project_id — only reachable via artifact)
|
||
json_file = await file_repo.create(
|
||
requester=user,
|
||
data=FileCreate(
|
||
project_id=None,
|
||
original_filename=transcription_filename,
|
||
path=artifact_key,
|
||
storage_backend="S3",
|
||
mime_type="application/json",
|
||
size_bytes=len(json_bytes),
|
||
file_format="json",
|
||
is_uploaded=True,
|
||
),
|
||
)
|
||
|
||
# Create ArtifactMediaFile (no media_file_id — transcription is not a media file)
|
||
artifact_repo = ArtifactRepository(self._session)
|
||
artifact = await artifact_repo.create(
|
||
data=ArtifactMediaFileCreate(
|
||
project_id=project_id,
|
||
file_id=json_file.id,
|
||
media_file_id=None,
|
||
artifact_type="TRANSCRIPTION_JSON",
|
||
),
|
||
)
|
||
|
||
# Create Transcription record
|
||
transcription_repo = TranscriptionRepository(self._session)
|
||
engine_db = ENGINE_MAP.get(engine_raw, "LOCAL_WHISPER")
|
||
transcription = await transcription_repo.create(
|
||
data=TranscriptionCreate(
|
||
project_id=project_id,
|
||
source_file_id=source_file.id,
|
||
artifact_id=artifact.id,
|
||
engine=engine_db, # type: ignore[arg-type]
|
||
language=language,
|
||
document=document,
|
||
),
|
||
)
|
||
|
||
updated_output = dict(output_data)
|
||
updated_output["artifact_id"] = str(artifact.id)
|
||
updated_output["transcription_id"] = str(transcription.id)
|
||
updated_output["source_file_id"] = str(source_file.id)
|
||
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
|
||
|
||
logger.info("Saved transcription artifacts for job %s", job.id)
|
||
return job
|
||
|
||
async def _save_convert_artifacts(self, job: Job) -> Job:
|
||
"""Create File and ArtifactMediaFile records for converted MP4."""
|
||
input_data = job.input_data or {}
|
||
output_data = job.output_data or {}
|
||
|
||
file_key: str = input_data["file_key"]
|
||
project_id: uuid.UUID | None = (
|
||
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
|
||
)
|
||
|
||
file_path: str = output_data["file_path"]
|
||
file_size: int = output_data.get("file_size", 0)
|
||
|
||
# Resolve user
|
||
user_repo = UserRepository(self._session)
|
||
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
|
||
if user is None:
|
||
logger.warning("User %s not found, skipping convert artifact save", job.user_id)
|
||
return job
|
||
|
||
# Derive output filename from source file
|
||
file_repo = FileRepository(self._session)
|
||
source_file = await file_repo.get_by_path(file_key)
|
||
if source_file is not None:
|
||
stem = Path(source_file.original_filename).stem
|
||
else:
|
||
stem = Path(file_key).stem
|
||
converted_filename = f"Конвертированое видео {stem}.mp4"
|
||
|
||
# Create File record for the converted MP4 (no project_id — only reachable via artifact)
|
||
converted_file = await file_repo.create(
|
||
requester=user,
|
||
data=FileCreate(
|
||
project_id=None,
|
||
original_filename=converted_filename,
|
||
path=file_path,
|
||
storage_backend="S3",
|
||
mime_type="video/mp4",
|
||
size_bytes=file_size,
|
||
file_format="mp4",
|
||
is_uploaded=True,
|
||
),
|
||
)
|
||
|
||
# Create ArtifactMediaFile record
|
||
artifact_repo = ArtifactRepository(self._session)
|
||
await artifact_repo.create(
|
||
data=ArtifactMediaFileCreate(
|
||
project_id=project_id,
|
||
file_id=converted_file.id,
|
||
media_file_id=None,
|
||
artifact_type="CONVERTED_VIDEO",
|
||
),
|
||
)
|
||
|
||
updated_output = dict(output_data)
|
||
updated_output["file_id"] = str(converted_file.id)
|
||
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
|
||
|
||
logger.info("Saved convert artifacts for job %s", job.id)
|
||
return job
|
||
|
||
async def _save_silence_apply_artifacts(self, job: Job) -> Job:
|
||
"""Create File and ArtifactMediaFile records for silence-applied video."""
|
||
input_data = job.input_data or {}
|
||
output_data = job.output_data or {}
|
||
|
||
file_key: str = input_data["file_key"]
|
||
project_id: uuid.UUID | None = (
|
||
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
|
||
)
|
||
|
||
file_path: str = output_data["file_path"]
|
||
file_size: int = output_data.get("file_size", 0)
|
||
|
||
user_repo = UserRepository(self._session)
|
||
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
|
||
if user is None:
|
||
logger.warning("User %s not found, skipping silence apply artifact save", job.user_id)
|
||
return job
|
||
|
||
file_repo = FileRepository(self._session)
|
||
source_file = await file_repo.get_by_path(file_key)
|
||
if source_file is not None:
|
||
stem = Path(source_file.original_filename).stem
|
||
else:
|
||
stem = Path(file_key).stem
|
||
processed_filename = f"Видео без тишины {stem}.mp4"
|
||
|
||
processed_file = await file_repo.create(
|
||
requester=user,
|
||
data=FileCreate(
|
||
project_id=project_id,
|
||
original_filename=processed_filename,
|
||
path=file_path,
|
||
storage_backend="S3",
|
||
mime_type="video/mp4",
|
||
size_bytes=file_size,
|
||
file_format="mp4",
|
||
is_uploaded=True,
|
||
),
|
||
)
|
||
|
||
artifact_repo = ArtifactRepository(self._session)
|
||
await artifact_repo.create(
|
||
data=ArtifactMediaFileCreate(
|
||
project_id=project_id,
|
||
file_id=processed_file.id,
|
||
media_file_id=None,
|
||
artifact_type="SILENCE_REMOVED_VIDEO",
|
||
),
|
||
)
|
||
|
||
updated_output = dict(output_data)
|
||
updated_output["file_id"] = str(processed_file.id)
|
||
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
|
||
|
||
logger.info(
|
||
"Saved silence apply artifacts for job %s (file_id=%s)",
|
||
job.id,
|
||
processed_file.id,
|
||
)
|
||
return job
|
||
|
||
async def _save_captions_artifacts(self, job: Job) -> Job:
|
||
"""Create File and ArtifactMediaFile records for captioned video."""
|
||
input_data = job.input_data or {}
|
||
output_data = job.output_data or {}
|
||
|
||
video_s3_path: str = input_data["video_s3_path"]
|
||
project_id: uuid.UUID | None = (
|
||
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
|
||
)
|
||
|
||
output_path: str = output_data["output_path"]
|
||
|
||
# Resolve user
|
||
user_repo = UserRepository(self._session)
|
||
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
|
||
if user is None:
|
||
logger.warning("User %s not found, skipping captions artifact save", job.user_id)
|
||
return job
|
||
|
||
# Get file size from S3
|
||
storage = _get_storage_service()
|
||
file_size = await storage.size(output_path)
|
||
|
||
# Derive output filename from source video
|
||
file_repo = FileRepository(self._session)
|
||
source_file = await file_repo.get_by_path(video_s3_path)
|
||
if source_file is not None:
|
||
stem = Path(source_file.original_filename).stem
|
||
else:
|
||
stem = Path(video_s3_path).stem
|
||
captioned_filename = f"Видео с субтитрами {stem}.mp4"
|
||
|
||
# Create File record
|
||
captioned_file = await file_repo.create(
|
||
requester=user,
|
||
data=FileCreate(
|
||
project_id=project_id,
|
||
original_filename=captioned_filename,
|
||
path=output_path,
|
||
storage_backend="S3",
|
||
mime_type="video/mp4",
|
||
size_bytes=file_size,
|
||
file_format="mp4",
|
||
is_uploaded=True,
|
||
),
|
||
)
|
||
|
||
# Create ArtifactMediaFile record
|
||
artifact_repo = ArtifactRepository(self._session)
|
||
await artifact_repo.create(
|
||
data=ArtifactMediaFileCreate(
|
||
project_id=project_id,
|
||
file_id=captioned_file.id,
|
||
media_file_id=None,
|
||
artifact_type="RENDERED_VIDEO",
|
||
),
|
||
)
|
||
|
||
# Update job output_data with file_id so frontend can reference it
|
||
updated_output = dict(output_data)
|
||
updated_output["file_id"] = str(captioned_file.id)
|
||
job = await self._job_repo.update(job, JobUpdate(output_data=updated_output))
|
||
|
||
logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id)
|
||
return job
|
||
|
||
async def submit_media_probe(
|
||
self, *, requester: User, request: MediaProbeRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit media probe task."""
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_MEDIA_PROBE,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=media_probe_actor,
|
||
actor_kwargs={"file_key": request.file_key},
|
||
)
|
||
|
||
async def submit_silence_remove(
|
||
self, *, requester: User, request: SilenceRemoveRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit silence removal task."""
|
||
user_folder = get_user_folder(requester)
|
||
resolved_folder = (
|
||
f"{user_folder}/{request.out_folder}"
|
||
if request.out_folder
|
||
else f"{user_folder}/output_files"
|
||
)
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_SILENCE_REMOVE,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=silence_remove_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"out_folder": resolved_folder,
|
||
"min_silence_duration_ms": request.min_silence_duration_ms,
|
||
"silence_threshold_db": request.silence_threshold_db,
|
||
"padding_ms": request.padding_ms,
|
||
},
|
||
)
|
||
|
||
async def submit_silence_detect(
|
||
self, *, requester: User, request: SilenceDetectRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit silence detection task."""
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_SILENCE_DETECT,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=silence_detect_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"min_silence_duration_ms": request.min_silence_duration_ms,
|
||
"silence_threshold_db": request.silence_threshold_db,
|
||
"padding_ms": request.padding_ms,
|
||
},
|
||
)
|
||
|
||
async def submit_silence_apply(
|
||
self, *, requester: User, request: SilenceApplyRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit silence apply task."""
|
||
user_folder = get_user_folder(requester)
|
||
resolved_folder = (
|
||
f"{user_folder}/{request.out_folder}"
|
||
if request.out_folder
|
||
else f"{user_folder}/output_files"
|
||
)
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_SILENCE_APPLY,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=silence_apply_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"out_folder": resolved_folder,
|
||
"cuts": request.cuts,
|
||
"output_name": request.output_name,
|
||
},
|
||
)
|
||
|
||
async def submit_media_convert(
|
||
self, *, requester: User, request: MediaConvertRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit media conversion task."""
|
||
user_folder = get_user_folder(requester)
|
||
resolved_folder = (
|
||
f"{user_folder}/{request.out_folder}"
|
||
if request.out_folder
|
||
else f"{user_folder}/output_files"
|
||
)
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_MEDIA_CONVERT,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=media_convert_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"out_folder": resolved_folder,
|
||
"output_format": request.output_format,
|
||
},
|
||
)
|
||
|
||
async def submit_transcription_generate(
|
||
self, *, requester: User, request: TranscriptionGenerateRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit transcription generation task."""
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_TRANSCRIPTION_GENERATE,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=transcription_generate_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"engine": request.engine,
|
||
"language": request.language,
|
||
"model": request.model,
|
||
},
|
||
)
|
||
|
||
async def submit_frame_extract(
|
||
self, *, requester: User, request: FrameExtractRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit frame extraction task."""
|
||
from cpv3.modules.media.service import get_frames_folder
|
||
|
||
user_folder = get_user_folder(requester)
|
||
frames_folder = get_frames_folder(user_folder, request.file_key)
|
||
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_FRAME_EXTRACT,
|
||
project_id=request.project_id,
|
||
input_data=request.model_dump(mode="json"),
|
||
actor=frame_extract_actor,
|
||
actor_kwargs={
|
||
"file_key": request.file_key,
|
||
"frames_folder": frames_folder,
|
||
"regenerate": request.regenerate,
|
||
},
|
||
)
|
||
|
||
async def submit_captions_generate(
|
||
self, *, requester: User, request: CaptionsGenerateRequest
|
||
) -> TaskSubmitResponse:
|
||
"""Submit captions generation task."""
|
||
from cpv3.modules.captions.service import CaptionPresetService
|
||
|
||
input_data = request.model_dump(mode="json")
|
||
existing_job = await self._find_duplicate_active_job(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_CAPTIONS_GENERATE,
|
||
project_id=request.project_id,
|
||
input_data=input_data,
|
||
)
|
||
if existing_job is not None:
|
||
return TaskSubmitResponse(
|
||
job_id=existing_job.id,
|
||
webhook_url=_build_webhook_url(existing_job.id),
|
||
status=existing_job.status,
|
||
)
|
||
|
||
transcription_repo = TranscriptionRepository(self._session)
|
||
transcription = await transcription_repo.get_by_id(request.transcription_id)
|
||
if transcription is None:
|
||
raise ValueError(f"Транскрипция {request.transcription_id} не найдена")
|
||
|
||
user_folder = get_user_folder(requester)
|
||
resolved_folder = (
|
||
f"{user_folder}/{request.folder}" if request.folder else f"{user_folder}/output_files"
|
||
)
|
||
|
||
# Resolve style config from preset or inline override
|
||
preset_service = CaptionPresetService(self._session)
|
||
style_config = await preset_service.resolve_style_config(
|
||
preset_id=request.preset_id,
|
||
inline_config=request.style_config,
|
||
)
|
||
|
||
return await self._submit_task(
|
||
requester=requester,
|
||
job_type=JOB_TYPE_CAPTIONS_GENERATE,
|
||
project_id=request.project_id,
|
||
input_data=input_data,
|
||
actor=captions_generate_actor,
|
||
actor_kwargs={
|
||
"video_s3_path": request.video_s3_path,
|
||
"folder": resolved_folder,
|
||
"transcription_json": transcription.document,
|
||
"style_config": style_config,
|
||
},
|
||
)
|