Files
2026-04-04 00:08:27 +03:00

169 lines
6.1 KiB
Python

"""
Task request and response schemas.
"""
from __future__ import annotations
from datetime import datetime
from typing import Literal
from uuid import UUID
from pydantic import Field, model_validator
from cpv3.common.schemas import Schema
from cpv3.modules.jobs.schemas import JobStatusEnum, JobTypeEnum
TaskTypeEnum = JobTypeEnum
TaskStatusEnum = JobStatusEnum
# --- Request schemas ---
class MediaProbeRequest(Schema):
"""Request to probe media file metadata."""
file_key: str = Field(..., description="Storage key of the file to probe")
project_id: UUID | None = Field(default=None, description="Associated project ID")
class SilenceRemoveRequest(Schema):
"""Request to remove silence from media file."""
file_key: str = Field(..., description="Storage key of the input file")
out_folder: str = Field(..., description="Output folder for processed file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
min_silence_duration_ms: int = Field(
default=200, description="Minimum silence duration in milliseconds"
)
silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels")
padding_ms: int = Field(
default=100, description="Padding around non-silent segments in milliseconds"
)
class SilenceDetectRequest(Schema):
"""Request to detect silent segments in media file."""
file_key: str = Field(..., description="Storage key of the input file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
min_silence_duration_ms: int = Field(
default=200, description="Minimum silence duration in milliseconds"
)
silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels")
padding_ms: int = Field(
default=100, description="Padding around non-silent segments in milliseconds"
)
class SilenceApplyRequest(Schema):
"""Request to apply silence cuts to media file."""
file_key: str = Field(..., description="Storage key of the input file")
out_folder: str = Field(..., description="Output folder for processed file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
output_name: str | None = Field(default=None, description="Display name for the output file")
cuts: list[dict] = Field(
..., description="Cut regions: [{'start_ms': int, 'end_ms': int}, ...]"
)
class MediaConvertRequest(Schema):
"""Request to convert media file to different format."""
file_key: str = Field(..., description="Storage key of the input file")
out_folder: str = Field(..., description="Output folder for converted file")
output_format: str = Field(default="mp4", description="Target output format")
project_id: UUID | None = Field(default=None, description="Associated project ID")
class TranscriptionGenerateRequest(Schema):
"""Request to generate transcription from audio/video file."""
file_key: str = Field(..., description="Storage key of the input file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
engine: Literal["whisper", "google", "salutespeech"] = Field(
default="whisper", description="Transcription engine to use"
)
language: str | None = Field(default=None, description="Language code (e.g., 'en')")
model: str = Field(default="base", description="Model size for whisper")
class CaptionsGenerateRequest(Schema):
"""Request to generate captions/subtitles video."""
video_s3_path: str = Field(..., description="S3 path to the video file")
folder: str = Field(..., description="Output folder for rendered video")
transcription_id: UUID = Field(..., description="ID of the transcription to use")
project_id: UUID | None = Field(default=None, description="Associated project ID")
preset_id: UUID | None = Field(
default=None, description="Caption style preset ID (mutually exclusive with style_config)"
)
style_config: dict | None = Field(
default=None, description="Inline caption style config (overrides preset_id)"
)
class FrameExtractRequest(Schema):
"""Request to extract video frames for timeline thumbnails."""
file_key: str = Field(..., description="S3 key of the video file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
regenerate: bool = Field(default=False, description="Delete existing frames and re-extract")
# --- Response schemas ---
class TaskSubmitResponse(Schema):
"""Response after submitting a background task."""
job_id: UUID = Field(..., description="Job ID for tracking")
webhook_url: str = Field(..., description="Webhook URL for status updates")
status: TaskStatusEnum = Field(default="PENDING", description="Initial task status")
class TaskStatusResponse(Schema):
"""Response for task status query."""
job_id: UUID
status: TaskStatusEnum
job_type: TaskTypeEnum
progress_pct: float | None = None
current_message: str | None = None
error_message: str | None = None
output_data: dict | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
class TaskWebhookEvent(Schema):
"""Webhook event payload for task updates."""
status: TaskStatusEnum | None = None
progress_pct: float | None = None
current_message: str | None = None
error_message: str | None = None
output_data: dict | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
@model_validator(mode="after")
def validate_has_update(self) -> "TaskWebhookEvent":
has_update = any(
value is not None
for value in (
self.status,
self.progress_pct,
self.current_message,
self.error_message,
self.output_data,
self.started_at,
self.finished_at,
)
)
if not has_update:
raise ValueError("Событие вебхука должно содержать хотя бы одно обновляемое поле.")
return self