diff --git a/alembic/versions/e6f7a8b9c0d1_add_project_workspaces_table.py b/alembic/versions/e6f7a8b9c0d1_add_project_workspaces_table.py new file mode 100644 index 0000000..6a94dd7 --- /dev/null +++ b/alembic/versions/e6f7a8b9c0d1_add_project_workspaces_table.py @@ -0,0 +1,244 @@ +"""add project_workspaces table + +Revision ID: e6f7a8b9c0d1 +Revises: d5e6f7a8b9c0 +Create Date: 2026-04-07 16:00:00.000000 + +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from typing import Any, Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +revision: str = "e6f7a8b9c0d1" +down_revision: Union[str, None] = "d5e6f7a8b9c0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _parse_uuid(raw_value: object) -> str | None: + if raw_value is None: + return None + try: + return str(uuid.UUID(str(raw_value))) + except (TypeError, ValueError): + return None + + +def _default_state() -> dict[str, Any]: + return { + "version": 1, + "phase": "INGEST", + "active_job": None, + "source_file_id": None, + "workspace_view": { + "used_file_ids": [], + "selected_file_id": None, + }, + "silence": { + "status": "IDLE", + "settings": { + "min_silence_duration_ms": 200, + "silence_threshold_db": 16, + "padding_ms": 100, + }, + "detect_job_id": None, + "detected_segments": [], + "reviewed_cuts": [], + "duration_ms": None, + "applied_output_file_id": None, + }, + "transcription": { + "status": "IDLE", + "request": { + "engine": "whisper", + "language": None, + "model": "base", + }, + "job_id": None, + "artifact_id": None, + "transcription_id": None, + "reviewed": False, + }, + "captions": { + "status": "IDLE", + "preset_id": None, + "style_config": None, + "render_job_id": None, + "output_file_id": None, + }, + } + + +def _backfill_state(legacy_workspace_state: dict | None) -> dict[str, Any]: + state = _default_state() + if not isinstance(legacy_workspace_state, dict): + return state + + wizard = legacy_workspace_state.get("wizard") + if not isinstance(wizard, dict): + wizard = {} + + current_step = wizard.get("current_step") + step_phase_map = { + "upload": "INGEST", + "verify": "VERIFY", + "silence-settings": "SILENCE", + "processing": "SILENCE", + "fragments": "SILENCE", + "silence-apply-processing": "SILENCE", + "transcription-settings": "TRANSCRIPTION", + "transcription-processing": "TRANSCRIPTION", + "subtitle-revision": "TRANSCRIPTION", + "caption-settings": "CAPTIONS", + "caption-processing": "CAPTIONS", + "caption-result": "DONE", + } + if current_step in step_phase_map: + state["phase"] = step_phase_map[current_step] + + source_file_id = _parse_uuid(wizard.get("primary_file_id")) + if source_file_id is not None: + state["source_file_id"] = source_file_id + + silence_job_id = _parse_uuid(wizard.get("silence_job_id")) + if silence_job_id is not None: + state["silence"]["detect_job_id"] = silence_job_id + + transcription_artifact_id = _parse_uuid(wizard.get("transcription_artifact_id")) + if transcription_artifact_id is not None: + state["transcription"]["artifact_id"] = transcription_artifact_id + + caption_preset_id = _parse_uuid(wizard.get("caption_preset_id")) + if caption_preset_id is not None: + state["captions"]["preset_id"] = caption_preset_id + + caption_style_config = wizard.get("caption_style_config") + if isinstance(caption_style_config, dict): + state["captions"]["style_config"] = caption_style_config + + captioned_video_file_id = _parse_uuid(wizard.get("captioned_video_file_id")) + if captioned_video_file_id is not None: + state["captions"]["output_file_id"] = captioned_video_file_id + state["captions"]["status"] = "COMPLETED" + state["phase"] = "DONE" + + active_job_id = _parse_uuid(wizard.get("active_job_id")) + active_job_type = wizard.get("active_job_type") + if active_job_id is not None and isinstance(active_job_type, str): + state["active_job"] = { + "job_id": active_job_id, + "job_type": active_job_type, + } + if active_job_type == "TRANSCRIPTION_GENERATE": + state["transcription"]["job_id"] = active_job_id + if active_job_type == "CAPTIONS_GENERATE": + state["captions"]["render_job_id"] = active_job_id + + silence_settings = wizard.get("silence_settings") + if isinstance(silence_settings, dict): + state["silence"]["settings"] = { + "min_silence_duration_ms": silence_settings.get("min_silence_duration_ms", 200), + "silence_threshold_db": silence_settings.get("silence_threshold_db", 16), + "padding_ms": silence_settings.get("padding_ms", 100), + } + state["silence"]["status"] = "CONFIGURED" + + if current_step == "processing": + state["silence"]["status"] = "DETECTING" + elif current_step == "fragments": + state["silence"]["status"] = "REVIEWING" + elif current_step == "silence-apply-processing": + state["silence"]["status"] = "APPLYING" + elif current_step == "transcription-processing": + state["transcription"]["status"] = "PROCESSING" + elif current_step == "subtitle-revision": + state["transcription"]["status"] = "REVIEWING" + elif current_step == "caption-settings": + state["captions"]["status"] = "CONFIGURED" + elif current_step == "caption-processing": + state["captions"]["status"] = "PROCESSING" + elif current_step == "caption-result": + state["captions"]["status"] = "COMPLETED" + + used_files = legacy_workspace_state.get("used_files") + if isinstance(used_files, list): + parsed_ids: list[str] = [] + for item in used_files: + if not isinstance(item, dict): + continue + parsed_id = _parse_uuid(item.get("id")) + if parsed_id is not None and parsed_id not in parsed_ids: + parsed_ids.append(parsed_id) + state["workspace_view"]["used_file_ids"] = parsed_ids + if source_file_id in parsed_ids: + state["workspace_view"]["selected_file_id"] = source_file_id + + return state + + +def upgrade() -> None: + op.create_table( + "project_workspaces", + sa.Column( + "project_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("projects.id", ondelete="CASCADE"), + primary_key=True, + ), + sa.Column("revision", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("state", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + connection = op.get_bind() + projects_table = sa.table( + "projects", + sa.column("id", postgresql.UUID(as_uuid=True)), + sa.column("workspace_state", sa.JSON()), + sa.column("is_active", sa.Boolean()), + ) + workspaces_table = sa.table( + "project_workspaces", + sa.column("project_id", postgresql.UUID(as_uuid=True)), + sa.column("revision", sa.Integer()), + sa.column("state", postgresql.JSONB(astext_type=sa.Text())), + sa.column("created_at", sa.DateTime(timezone=True)), + sa.column("updated_at", sa.DateTime(timezone=True)), + ) + + rows = connection.execute( + sa.select(projects_table.c.id, projects_table.c.workspace_state).where( + projects_table.c.is_active.is_(True) + ) + ) + + now = _utc_now() + payloads = [ + { + "project_id": row.id, + "revision": 0, + "state": _backfill_state(row.workspace_state), + "created_at": now, + "updated_at": now, + } + for row in rows + ] + if payloads: + connection.execute(sa.insert(workspaces_table), payloads) + + +def downgrade() -> None: + op.drop_table("project_workspaces") diff --git a/cpv3/api/v1/router.py b/cpv3/api/v1/router.py index 8192d04..739cf31 100644 --- a/cpv3/api/v1/router.py +++ b/cpv3/api/v1/router.py @@ -10,6 +10,7 @@ from cpv3.modules.captions.router import router as captions_router from cpv3.modules.files.router import router as files_router from cpv3.modules.jobs.router import events_router, jobs_router from cpv3.modules.media.router import artifacts_router, media_router, mediafiles_router +from cpv3.modules.project_workspaces.router import router as project_workspaces_router from cpv3.modules.projects.router import router as projects_router from cpv3.modules.system.router import router as system_router from cpv3.modules.tasks.router import router as tasks_router @@ -29,6 +30,7 @@ api_router.include_router(users_router) # Projects api_router.include_router(projects_router) +api_router.include_router(project_workspaces_router) # Files (storage module renamed) api_router.include_router(files_router) diff --git a/cpv3/db/models.py b/cpv3/db/models.py index 7778e56..43127fe 100644 --- a/cpv3/db/models.py +++ b/cpv3/db/models.py @@ -2,6 +2,7 @@ from cpv3.db.base import Base from cpv3.modules.captions.models import CaptionPreset from cpv3.modules.jobs.models import Job, JobEvent from cpv3.modules.media.models import ArtifactMediaFile, MediaFile +from cpv3.modules.project_workspaces.models import ProjectWorkspace from cpv3.modules.projects.models import Project from cpv3.modules.files.models import File from cpv3.modules.transcription.models import Transcription @@ -14,6 +15,7 @@ __all__ = [ "CaptionPreset", "User", "Project", + "ProjectWorkspace", "File", "MediaFile", "ArtifactMediaFile", diff --git a/cpv3/modules/project_workspaces/__init__.py b/cpv3/modules/project_workspaces/__init__.py new file mode 100644 index 0000000..96e4e13 --- /dev/null +++ b/cpv3/modules/project_workspaces/__init__.py @@ -0,0 +1 @@ +"""Typed project workspace module.""" diff --git a/cpv3/modules/project_workspaces/models.py b/cpv3/modules/project_workspaces/models.py new file mode 100644 index 0000000..b92bd6e --- /dev/null +++ b/cpv3/modules/project_workspaces/models.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, Integer, JSON +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import Mapped, mapped_column + +from cpv3.db.base import Base, utcnow + +STATE_JSON_TYPE = JSON().with_variant(JSONB(), "postgresql") + + +class ProjectWorkspace(Base): + __tablename__ = "project_workspaces" + + project_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("projects.id", ondelete="CASCADE"), + primary_key=True, + ) + revision: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + state: Mapped[dict] = mapped_column(STATE_JSON_TYPE, default=dict, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utcnow, + onupdate=utcnow, + ) diff --git a/cpv3/modules/project_workspaces/repository.py b/cpv3/modules/project_workspaces/repository.py new file mode 100644 index 0000000..20da3c6 --- /dev/null +++ b/cpv3/modules/project_workspaces/repository.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import uuid + +from sqlalchemy import select, update +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.db.base import utcnow +from cpv3.modules.project_workspaces.models import ProjectWorkspace + + +class WorkspaceRevisionConflictError(RuntimeError): + """Raised when the optimistic workspace revision check fails.""" + + +class ProjectWorkspaceRepository: + def __init__(self, session: AsyncSession) -> None: + self._session = session + + async def get_by_project_id(self, project_id: uuid.UUID) -> ProjectWorkspace | None: + result = await self._session.execute( + select(ProjectWorkspace).where(ProjectWorkspace.project_id == project_id) + ) + return result.scalar_one_or_none() + + async def create(self, *, project_id: uuid.UUID, state: dict) -> ProjectWorkspace: + workspace = ProjectWorkspace(project_id=project_id, revision=0, state=state) + self._session.add(workspace) + await self._session.commit() + await self._session.refresh(workspace) + return workspace + + async def get_or_create(self, *, project_id: uuid.UUID, state: dict) -> ProjectWorkspace: + workspace = await self.get_by_project_id(project_id) + if workspace is not None: + return workspace + + try: + return await self.create(project_id=project_id, state=state) + except IntegrityError: + await self._session.rollback() + workspace = await self.get_by_project_id(project_id) + if workspace is None: + raise + return workspace + + async def update_state( + self, + *, + project_id: uuid.UUID, + expected_revision: int, + state: dict, + ) -> ProjectWorkspace: + stmt = ( + update(ProjectWorkspace) + .where(ProjectWorkspace.project_id == project_id) + .where(ProjectWorkspace.revision == expected_revision) + .values( + state=state, + revision=expected_revision + 1, + updated_at=utcnow(), + ) + ) + result = await self._session.execute(stmt) + if result.rowcount != 1: + await self._session.rollback() + raise WorkspaceRevisionConflictError + + await self._session.commit() + workspace = await self.get_by_project_id(project_id) + if workspace is None: + raise RuntimeError("Workspace disappeared after update") + return workspace diff --git a/cpv3/modules/project_workspaces/router.py b/cpv3/modules/project_workspaces/router.py new file mode 100644 index 0000000..eec0b04 --- /dev/null +++ b/cpv3/modules/project_workspaces/router.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import uuid + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.db.session import get_db +from cpv3.infrastructure.auth import get_current_user +from cpv3.modules.projects.service import ProjectService +from cpv3.modules.project_workspaces.schemas import ( + ProjectWorkspaceRead, + WorkflowActionRequest, +) +from cpv3.modules.project_workspaces.service import ( + ProjectWorkspaceRevisionConflictError, + ProjectWorkspaceService, + ProjectWorkflowValidationError, +) +from cpv3.modules.users.models import User + +router = APIRouter(prefix="/api/projects", tags=["Project Workspaces"]) + + +@router.get("/{project_id}/workspace", response_model=ProjectWorkspaceRead) +@router.get( + "/{project_id}/workspace/", + response_model=ProjectWorkspaceRead, + include_in_schema=False, +) +async def get_project_workspace( + project_id: uuid.UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> ProjectWorkspaceRead: + project_service = ProjectService(db) + project = await project_service.get_project(project_id) + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено") + + if not current_user.is_staff and project.owner_id != current_user.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён") + + workspace_service = ProjectWorkspaceService(db) + return await workspace_service.get_workspace(project=project) + + +@router.post("/{project_id}/workflow/actions", response_model=ProjectWorkspaceRead) +@router.post( + "/{project_id}/workflow/actions/", + response_model=ProjectWorkspaceRead, + include_in_schema=False, +) +async def dispatch_project_workflow_action( + project_id: uuid.UUID, + body: WorkflowActionRequest, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> ProjectWorkspaceRead: + project_service = ProjectService(db) + project = await project_service.get_project(project_id) + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено") + + if not current_user.is_staff and project.owner_id != current_user.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён") + + workspace_service = ProjectWorkspaceService(db) + try: + return await workspace_service.apply_action( + project=project, + requester=current_user, + action=body, + ) + except ProjectWorkspaceRevisionConflictError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except ProjectWorkflowValidationError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc diff --git a/cpv3/modules/project_workspaces/schemas.py b/cpv3/modules/project_workspaces/schemas.py new file mode 100644 index 0000000..cb485e0 --- /dev/null +++ b/cpv3/modules/project_workspaces/schemas.py @@ -0,0 +1,471 @@ +from __future__ import annotations + +from enum import StrEnum +from typing import Annotated, Literal, get_args +from uuid import UUID + +from pydantic import AliasChoices, Field, model_validator + +from cpv3.common.schemas import Schema +from cpv3.modules.jobs.schemas import JobTypeEnum + + +WORKFLOW_VERSION = 1 +VALID_JOB_TYPES = set(get_args(JobTypeEnum)) + +WorkspaceScreenEnum = Literal[ + "upload", + "verify", + "silence-settings", + "processing", + "fragments", + "silence-apply-processing", + "transcription-settings", + "transcription-processing", + "subtitle-revision", + "caption-settings", + "caption-processing", + "caption-result", +] + + +class WorkflowPhase(StrEnum): + INGEST = "INGEST" + VERIFY = "VERIFY" + SILENCE = "SILENCE" + TRANSCRIPTION = "TRANSCRIPTION" + CAPTIONS = "CAPTIONS" + DONE = "DONE" + + +class SilenceWorkflowStatus(StrEnum): + IDLE = "IDLE" + CONFIGURED = "CONFIGURED" + DETECTING = "DETECTING" + REVIEWING = "REVIEWING" + APPLYING = "APPLYING" + COMPLETED = "COMPLETED" + SKIPPED = "SKIPPED" + + +class TranscriptionWorkflowStatus(StrEnum): + IDLE = "IDLE" + PROCESSING = "PROCESSING" + REVIEWING = "REVIEWING" + COMPLETED = "COMPLETED" + + +class CaptionsWorkflowStatus(StrEnum): + IDLE = "IDLE" + CONFIGURED = "CONFIGURED" + PROCESSING = "PROCESSING" + COMPLETED = "COMPLETED" + + +class ActiveJobState(Schema): + job_id: UUID + job_type: JobTypeEnum + + +class WorkspaceViewState(Schema): + used_file_ids: list[UUID] = Field(default_factory=list) + selected_file_id: UUID | None = None + + +class SilenceSettingsState(Schema): + min_silence_duration_ms: int = 200 + silence_threshold_db: int = 16 + padding_ms: int = 100 + + +class CutRegionState(Schema): + start_ms: int + end_ms: int + + +class SilenceState(Schema): + status: SilenceWorkflowStatus = SilenceWorkflowStatus.IDLE + settings: SilenceSettingsState = Field(default_factory=SilenceSettingsState) + detect_job_id: UUID | None = None + detected_segments: list[CutRegionState] = Field(default_factory=list) + reviewed_cuts: list[CutRegionState] = Field( + default_factory=list, + validation_alias=AliasChoices("reviewed_cuts", "cut_regions"), + serialization_alias="reviewed_cuts", + ) + duration_ms: int | None = None + applied_output_file_id: UUID | None = Field( + default=None, + validation_alias=AliasChoices("applied_output_file_id", "output_file_id"), + serialization_alias="applied_output_file_id", + ) + + +class TranscriptionRequestState(Schema): + engine: Literal["whisper", "google", "salutespeech"] = "whisper" + language: str | None = None + model: str = "base" + + +class TranscriptionState(Schema): + status: TranscriptionWorkflowStatus = TranscriptionWorkflowStatus.IDLE + request: TranscriptionRequestState = Field(default_factory=TranscriptionRequestState) + job_id: UUID | None = None + artifact_id: UUID | None = None + transcription_id: UUID | None = None + reviewed: bool = False + + +class CaptionsState(Schema): + status: CaptionsWorkflowStatus = CaptionsWorkflowStatus.IDLE + preset_id: UUID | None = None + style_config: dict | None = None + render_job_id: UUID | None = Field( + default=None, + validation_alias=AliasChoices("render_job_id", "job_id"), + serialization_alias="render_job_id", + ) + output_file_id: UUID | None = None + + +class ProjectWorkspaceState(Schema): + version: int = WORKFLOW_VERSION + phase: WorkflowPhase = WorkflowPhase.INGEST + active_job: ActiveJobState | None = None + source_file_id: UUID | None = None + workspace_view: WorkspaceViewState = Field(default_factory=WorkspaceViewState) + silence: SilenceState = Field(default_factory=SilenceState) + transcription: TranscriptionState = Field(default_factory=TranscriptionState) + captions: CaptionsState = Field(default_factory=CaptionsState) + + +class ProjectWorkspaceRead(Schema): + project_id: UUID + revision: int + version: int + phase: WorkflowPhase + current_screen: WorkspaceScreenEnum + active_job: ActiveJobState | None + source_file_id: UUID | None + workspace_view: WorkspaceViewState + silence: SilenceState + transcription: TranscriptionState + captions: CaptionsState + + +class WorkflowActionBase(Schema): + type: str + revision: int + + +class SetSourceFileAction(WorkflowActionBase): + type: Literal["SET_SOURCE_FILE"] + file_id: UUID = Field( + validation_alias=AliasChoices("file_id", "source_file_id"), + serialization_alias="file_id", + ) + + +class ResetSourceFileAction(WorkflowActionBase): + type: Literal["RESET_SOURCE_FILE"] + + +class StartMediaConvertAction(WorkflowActionBase): + type: Literal["START_MEDIA_CONVERT"] + output_format: str = "mp4" + out_folder: str = "output_files" + + +class ConfirmVerifyAction(WorkflowActionBase): + type: Literal["CONFIRM_VERIFY"] + + +class SetSilenceSettingsAction(WorkflowActionBase): + type: Literal["SET_SILENCE_SETTINGS"] + settings: SilenceSettingsState = Field(default_factory=SilenceSettingsState) + + @model_validator(mode="before") + @classmethod + def normalize_settings(cls, data: object) -> object: + if not isinstance(data, dict) or "settings" in data: + return data + + return { + **data, + "settings": { + "min_silence_duration_ms": data.get("min_silence_duration_ms", 200), + "silence_threshold_db": data.get("silence_threshold_db", 16), + "padding_ms": data.get("padding_ms", 100), + }, + } + + +class StartSilenceDetectAction(WorkflowActionBase): + type: Literal["START_SILENCE_DETECT"] + + +class SetSilenceCutsAction(WorkflowActionBase): + type: Literal["SET_SILENCE_CUTS"] + cuts: list[CutRegionState] = Field( + validation_alias=AliasChoices("cuts", "reviewed_cuts", "cut_regions"), + ) + + +class SkipSilenceApplyAction(WorkflowActionBase): + type: Literal["SKIP_SILENCE_APPLY"] + + +class StartSilenceApplyAction(WorkflowActionBase): + type: Literal["START_SILENCE_APPLY"] + cuts: list[CutRegionState] | None = None + out_folder: str = "output_files" + output_name: str | None = None + + +class ReopenSilenceReviewAction(WorkflowActionBase): + type: Literal["REOPEN_SILENCE_REVIEW"] + + +class StartTranscriptionAction(WorkflowActionBase): + type: Literal["START_TRANSCRIPTION"] + engine: Literal["whisper", "google", "salutespeech"] = "whisper" + language: str | None = None + model: str = "base" + request: TranscriptionRequestState | None = None + + @model_validator(mode="after") + def normalize_request(self) -> "StartTranscriptionAction": + if self.request is None: + self.request = TranscriptionRequestState( + engine=self.engine, + language=self.language, + model=self.model, + ) + return self + + self.engine = self.request.engine + self.language = self.request.language + self.model = self.request.model + return self + + +class ReopenTranscriptionConfigAction(WorkflowActionBase): + type: Literal["REOPEN_TRANSCRIPTION_CONFIG"] + + +class MarkTranscriptionReviewedAction(WorkflowActionBase): + type: Literal["MARK_TRANSCRIPTION_REVIEWED"] + + +class SelectCaptionPresetAction(WorkflowActionBase): + type: Literal["SELECT_CAPTION_PRESET"] + preset_id: UUID | None = None + style_config: dict | None = None + + +class StartCaptionRenderAction(WorkflowActionBase): + type: Literal["START_CAPTION_RENDER"] + folder: str = "output_files" + + +class ReopenCaptionConfigAction(WorkflowActionBase): + type: Literal["REOPEN_CAPTION_CONFIG"] + + +class SetWorkspaceViewAction(WorkflowActionBase): + type: Literal["SET_WORKSPACE_VIEW"] + workspace_view: WorkspaceViewState + + @model_validator(mode="before") + @classmethod + def normalize_workspace_view(cls, data: object) -> object: + if not isinstance(data, dict) or "workspace_view" in data: + return data + + return { + **data, + "workspace_view": { + "used_file_ids": data.get("used_file_ids", []), + "selected_file_id": data.get("selected_file_id"), + }, + } + + +WorkflowActionRequest = Annotated[ + ( + SetSourceFileAction + | ResetSourceFileAction + | StartMediaConvertAction + | ConfirmVerifyAction + | SetSilenceSettingsAction + | StartSilenceDetectAction + | SetSilenceCutsAction + | SkipSilenceApplyAction + | StartSilenceApplyAction + | ReopenSilenceReviewAction + | StartTranscriptionAction + | ReopenTranscriptionConfigAction + | MarkTranscriptionReviewedAction + | SelectCaptionPresetAction + | StartCaptionRenderAction + | ReopenCaptionConfigAction + | SetWorkspaceViewAction + ), + Field(discriminator="type"), +] + + +def build_default_workspace_state() -> ProjectWorkspaceState: + return ProjectWorkspaceState() + + +def build_workspace_state_from_legacy( + legacy_workspace_state: dict | None, +) -> ProjectWorkspaceState: + state = build_default_workspace_state() + if not isinstance(legacy_workspace_state, dict): + return state + + wizard = legacy_workspace_state.get("wizard") + if not isinstance(wizard, dict): + wizard = {} + + source_file_id = _parse_uuid(wizard.get("primary_file_id")) + if source_file_id is not None: + state.source_file_id = source_file_id + + used_file_ids: list[UUID] = [] + used_files = legacy_workspace_state.get("used_files") + if isinstance(used_files, list): + for item in used_files: + if not isinstance(item, dict): + continue + file_id = _parse_uuid(item.get("id")) + if file_id is not None and file_id not in used_file_ids: + used_file_ids.append(file_id) + + if source_file_id is not None and source_file_id not in used_file_ids: + used_file_ids.insert(0, source_file_id) + + state.workspace_view.used_file_ids = used_file_ids + if source_file_id is not None and source_file_id in used_file_ids: + state.workspace_view.selected_file_id = source_file_id + + active_job_id = _parse_uuid(wizard.get("active_job_id")) + active_job_type = wizard.get("active_job_type") + if active_job_id is not None and active_job_type in VALID_JOB_TYPES: + state.active_job = ActiveJobState( + job_id=active_job_id, + job_type=active_job_type, + ) + + silence_job_id = _parse_uuid(wizard.get("silence_job_id")) + if silence_job_id is not None: + state.silence.detect_job_id = silence_job_id + + transcription_artifact_id = _parse_uuid(wizard.get("transcription_artifact_id")) + if transcription_artifact_id is not None: + state.transcription.artifact_id = transcription_artifact_id + + caption_preset_id = _parse_uuid(wizard.get("caption_preset_id")) + if caption_preset_id is not None: + state.captions.preset_id = caption_preset_id + + caption_style_config = wizard.get("caption_style_config") + if isinstance(caption_style_config, dict): + state.captions.style_config = caption_style_config + + captioned_video_file_id = _parse_uuid(wizard.get("captioned_video_file_id")) + if captioned_video_file_id is not None: + state.captions.output_file_id = captioned_video_file_id + + silence_settings = wizard.get("silence_settings") + if isinstance(silence_settings, dict): + state.silence.settings = SilenceSettingsState.model_validate(silence_settings) + state.silence.status = SilenceWorkflowStatus.CONFIGURED + + current_step = wizard.get("current_step") + step_phase_map = { + "upload": WorkflowPhase.INGEST, + "verify": WorkflowPhase.VERIFY, + "silence-settings": WorkflowPhase.SILENCE, + "processing": WorkflowPhase.SILENCE, + "fragments": WorkflowPhase.SILENCE, + "silence-apply-processing": WorkflowPhase.SILENCE, + "transcription-settings": WorkflowPhase.TRANSCRIPTION, + "transcription-processing": WorkflowPhase.TRANSCRIPTION, + "subtitle-revision": WorkflowPhase.TRANSCRIPTION, + "caption-settings": WorkflowPhase.CAPTIONS, + "caption-processing": WorkflowPhase.CAPTIONS, + "caption-result": WorkflowPhase.DONE, + } + if current_step in step_phase_map: + state.phase = step_phase_map[current_step] + + if current_step == "processing": + state.silence.status = SilenceWorkflowStatus.DETECTING + elif current_step == "fragments": + state.silence.status = SilenceWorkflowStatus.REVIEWING + elif current_step == "silence-apply-processing": + state.silence.status = SilenceWorkflowStatus.APPLYING + elif current_step == "transcription-processing": + state.transcription.status = TranscriptionWorkflowStatus.PROCESSING + elif current_step == "subtitle-revision": + state.transcription.status = TranscriptionWorkflowStatus.REVIEWING + elif current_step == "caption-settings": + state.captions.status = CaptionsWorkflowStatus.CONFIGURED + elif current_step == "caption-processing": + state.captions.status = CaptionsWorkflowStatus.PROCESSING + elif current_step == "caption-result": + state.captions.status = CaptionsWorkflowStatus.COMPLETED + + if state.active_job is not None: + if state.active_job.job_type == "MEDIA_CONVERT": + state.phase = WorkflowPhase.VERIFY + elif state.active_job.job_type == "SILENCE_DETECT": + state.phase = WorkflowPhase.SILENCE + state.silence.status = SilenceWorkflowStatus.DETECTING + state.silence.detect_job_id = state.active_job.job_id + elif state.active_job.job_type == "SILENCE_APPLY": + state.phase = WorkflowPhase.SILENCE + state.silence.status = SilenceWorkflowStatus.APPLYING + elif state.active_job.job_type == "TRANSCRIPTION_GENERATE": + state.phase = WorkflowPhase.TRANSCRIPTION + state.transcription.status = TranscriptionWorkflowStatus.PROCESSING + state.transcription.job_id = state.active_job.job_id + elif state.active_job.job_type == "CAPTIONS_GENERATE": + state.phase = WorkflowPhase.CAPTIONS + state.captions.status = CaptionsWorkflowStatus.PROCESSING + state.captions.render_job_id = state.active_job.job_id + + if captioned_video_file_id is not None: + state.phase = WorkflowPhase.DONE + state.captions.status = CaptionsWorkflowStatus.COMPLETED + elif transcription_artifact_id is not None and ( + state.transcription.status == TranscriptionWorkflowStatus.IDLE + ): + state.phase = WorkflowPhase.TRANSCRIPTION + state.transcription.status = TranscriptionWorkflowStatus.REVIEWING + elif silence_job_id is not None and state.silence.status == SilenceWorkflowStatus.IDLE: + state.phase = WorkflowPhase.SILENCE + state.silence.status = SilenceWorkflowStatus.REVIEWING + elif source_file_id is not None and state.phase == WorkflowPhase.INGEST: + state.phase = WorkflowPhase.VERIFY + + return state + + +def _parse_uuid(value: object) -> UUID | None: + if value is None: + return None + try: + return UUID(str(value)) + except (TypeError, ValueError): + return None + + +# Backward-compatible aliases used by existing tests and frontend hand-written types. +TaskWorkflowActiveJob = ActiveJobState +SilenceSettingsPayload = SilenceSettingsState +WorkflowSilenceState = SilenceState +WorkflowTranscriptionRequest = TranscriptionRequestState diff --git a/cpv3/modules/project_workspaces/service.py b/cpv3/modules/project_workspaces/service.py new file mode 100644 index 0000000..bce09d0 --- /dev/null +++ b/cpv3/modules/project_workspaces/service.py @@ -0,0 +1,824 @@ +from __future__ import annotations + +from dataclasses import dataclass +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.modules.captions.models import CaptionPreset +from cpv3.modules.captions.repository import CaptionPresetRepository +from cpv3.modules.files.models import File +from cpv3.modules.files.repository import FileRepository +from cpv3.modules.jobs.models import Job +from cpv3.modules.media.repository import ArtifactRepository +from cpv3.modules.project_workspaces.models import ProjectWorkspace +from cpv3.modules.project_workspaces.repository import ( + ProjectWorkspaceRepository, + WorkspaceRevisionConflictError as RepositoryWorkspaceRevisionConflictError, +) +from cpv3.modules.project_workspaces.schemas import ( + ActiveJobState, + CaptionsState, + CaptionsWorkflowStatus, + ConfirmVerifyAction, + CutRegionState, + MarkTranscriptionReviewedAction, + ProjectWorkspaceRead, + ProjectWorkspaceState, + ReopenCaptionConfigAction, + ReopenSilenceReviewAction, + ReopenTranscriptionConfigAction, + ResetSourceFileAction, + SelectCaptionPresetAction, + SetSilenceCutsAction, + SetSilenceSettingsAction, + SetSourceFileAction, + SetWorkspaceViewAction, + SilenceState, + SilenceWorkflowStatus, + SkipSilenceApplyAction, + StartCaptionRenderAction, + StartMediaConvertAction, + StartSilenceApplyAction, + StartSilenceDetectAction, + StartTranscriptionAction, + TranscriptionState, + TranscriptionWorkflowStatus, + WorkflowActionRequest, + WorkflowPhase, + WorkspaceScreenEnum, + WorkspaceViewState, + build_workspace_state_from_legacy, +) +from cpv3.modules.projects.models import Project +from cpv3.modules.projects.repository import ProjectRepository +from cpv3.modules.tasks.schemas import ( + CaptionsGenerateRequest, + MediaConvertRequest, + SilenceApplyRequest, + SilenceDetectRequest, + TranscriptionGenerateRequest, +) +from cpv3.modules.transcription.repository import TranscriptionRepository +from cpv3.modules.users.models import User + + +class ProjectWorkspaceRevisionConflictError(RuntimeError): + pass + + +class ProjectWorkflowValidationError(ValueError): + pass + + +WorkspaceRevisionConflictError = ProjectWorkspaceRevisionConflictError + + +@dataclass(slots=True) +class _WorkspaceItemValidationResult: + item_id: UUID + file: File | None = None + + +class ProjectWorkspaceService: + def __init__(self, session: AsyncSession) -> None: + self._session = session + self._repo = ProjectWorkspaceRepository(session) + self._project_repo = ProjectRepository(session) + self._file_repo = FileRepository(session) + self._artifact_repo = ArtifactRepository(session) + self._transcription_repo = TranscriptionRepository(session) + self._caption_preset_repo = CaptionPresetRepository(session) + self._task_service_factory = self._build_task_service + + async def create_for_project(self, project: Project) -> ProjectWorkspaceRead: + workspace = await self._get_or_create_workspace(project=project) + return self._to_read_model(workspace) + + async def get_workspace(self, *, project: Project) -> ProjectWorkspaceRead: + workspace = await self._get_or_create_workspace(project=project) + return self._to_read_model(workspace) + + async def apply_action( + self, + *, + project: Project, + requester: User, + action: WorkflowActionRequest, + ) -> ProjectWorkspaceRead: + workspace = await self._get_or_create_workspace(project=project) + if workspace.revision != action.revision: + raise ProjectWorkspaceRevisionConflictError("Версия рабочего пространства устарела") + + state = ProjectWorkspaceState.model_validate(workspace.state) + next_state = await self._apply_action_to_state( + project=project, + requester=requester, + state=state, + action=action, + ) + workspace = await self._save_workspace_state( + project_id=project.id, + expected_revision=workspace.revision, + state=next_state, + ) + return self._to_read_model(workspace) + + async def handle_job_update(self, *, job: Job) -> ProjectWorkspaceRead | None: + if job.project_id is None: + return None + + project = await self._project_repo.get_by_id(job.project_id) + if project is None: + return None + + workspace = await self._get_or_create_workspace(project=project) + state = ProjectWorkspaceState.model_validate(workspace.state) + next_state = self._apply_job_event_to_state(state, job) + + if next_state.model_dump(mode="json") == state.model_dump(mode="json"): + return self._to_read_model(workspace) + + try: + workspace = await self._save_workspace_state( + project_id=project.id, + expected_revision=workspace.revision, + state=next_state, + ) + except ProjectWorkspaceRevisionConflictError: + workspace = await self._get_or_create_workspace(project=project) + + return self._to_read_model(workspace) + + async def apply_job_update( + self, + *, + project: Project, + job: Job, + ) -> ProjectWorkspaceRead | None: + workspace = await self._get_or_create_workspace(project=project) + state = ProjectWorkspaceState.model_validate(workspace.state) + next_state = self._apply_job_event_to_state(state, job) + + if next_state.model_dump(mode="json") == state.model_dump(mode="json"): + return self._to_read_model(workspace) + + try: + workspace = await self._save_workspace_state( + project_id=project.id, + expected_revision=workspace.revision, + state=next_state, + ) + except ProjectWorkspaceRevisionConflictError: + workspace = await self._get_or_create_workspace(project=project) + + return self._to_read_model(workspace) + + async def apply_job_event(self, job: Job) -> None: + await self.handle_job_update(job=job) + + async def _get_or_create_workspace(self, *, project: Project) -> ProjectWorkspace: + workspace = await self._repo.get_by_project_id(project.id) + if workspace is not None: + return workspace + + initial_state = build_workspace_state_from_legacy(getattr(project, "workspace_state", None)) + state_payload = initial_state.model_dump(mode="json") + get_or_create = getattr(self._repo, "get_or_create", None) + if callable(get_or_create): + return await get_or_create(project_id=project.id, state=state_payload) + return await self._repo.create(project_id=project.id, state=state_payload) + + async def _save_workspace_state( + self, + *, + project_id: UUID, + expected_revision: int, + state: ProjectWorkspaceState, + ) -> ProjectWorkspace: + try: + return await self._repo.update_state( + project_id=project_id, + expected_revision=expected_revision, + state=state.model_dump(mode="json"), + ) + except RepositoryWorkspaceRevisionConflictError as exc: + raise ProjectWorkspaceRevisionConflictError( + "Версия рабочего пространства устарела" + ) from exc + + async def _apply_action_to_state( + self, + *, + project: Project, + requester: User, + state: ProjectWorkspaceState, + action: WorkflowActionRequest, + ) -> ProjectWorkspaceState: + next_state = state.model_copy(deep=True) + + if isinstance(action, SetSourceFileAction): + file_result = await self._validate_accessible_file( + requester=requester, + project=project, + file_id=action.file_id, + ) + next_state.phase = WorkflowPhase.VERIFY + next_state.active_job = None + next_state.source_file_id = file_result.item_id + next_state.silence = SilenceState() + next_state.transcription = TranscriptionState() + next_state.captions = CaptionsState() + next_state.workspace_view = WorkspaceViewState( + used_file_ids=[file_result.item_id], + selected_file_id=file_result.item_id, + ) + return next_state + + if isinstance(action, ResetSourceFileAction): + return ProjectWorkspaceState(version=state.version) + + if isinstance(action, StartMediaConvertAction): + self._require_phase(next_state, WorkflowPhase.VERIFY) + source_file = await self._require_source_file( + next_state, + project=project, + requester=requester, + ) + task_service = self._task_service_factory() + response = await task_service.submit_media_convert( + requester=requester, + request=MediaConvertRequest( + file_key=source_file.path, + out_folder=action.out_folder, + output_format=action.output_format, + project_id=project.id, + ), + ) + next_state.active_job = ActiveJobState( + job_id=response.job_id, + job_type="MEDIA_CONVERT", + ) + return next_state + + if isinstance(action, ConfirmVerifyAction): + self._require_phase(next_state, WorkflowPhase.VERIFY) + await self._require_source_file( + next_state, + project=project, + requester=requester, + ) + next_state.phase = WorkflowPhase.SILENCE + next_state.active_job = None + next_state.silence.status = SilenceWorkflowStatus.CONFIGURED + return next_state + + if isinstance(action, SetSilenceSettingsAction): + self._require_phase(next_state, WorkflowPhase.SILENCE) + if next_state.silence.status in { + SilenceWorkflowStatus.DETECTING, + SilenceWorkflowStatus.APPLYING, + }: + raise ProjectWorkflowValidationError("Нельзя менять настройки во время обработки") + next_state.silence.settings = action.settings + next_state.silence.status = SilenceWorkflowStatus.CONFIGURED + return next_state + + if isinstance(action, StartSilenceDetectAction): + self._require_phase(next_state, WorkflowPhase.SILENCE) + source_file = await self._require_source_file( + next_state, + project=project, + requester=requester, + ) + task_service = self._task_service_factory() + response = await task_service.submit_silence_detect( + requester=requester, + request=SilenceDetectRequest( + file_key=source_file.path, + project_id=project.id, + min_silence_duration_ms=next_state.silence.settings.min_silence_duration_ms, + silence_threshold_db=next_state.silence.settings.silence_threshold_db, + padding_ms=next_state.silence.settings.padding_ms, + ), + ) + next_state.active_job = ActiveJobState( + job_id=response.job_id, + job_type="SILENCE_DETECT", + ) + next_state.silence.status = SilenceWorkflowStatus.DETECTING + next_state.silence.detect_job_id = response.job_id + next_state.silence.detected_segments = [] + next_state.silence.reviewed_cuts = [] + next_state.silence.duration_ms = None + next_state.silence.applied_output_file_id = None + return next_state + + if isinstance(action, SetSilenceCutsAction): + self._require_phase(next_state, WorkflowPhase.SILENCE) + next_state.silence.reviewed_cuts = [ + CutRegionState.model_validate(cut) for cut in action.cuts + ] + next_state.silence.status = SilenceWorkflowStatus.REVIEWING + return next_state + + if isinstance(action, SkipSilenceApplyAction): + self._require_phase(next_state, WorkflowPhase.SILENCE) + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.active_job = None + next_state.silence.status = SilenceWorkflowStatus.SKIPPED + return next_state + + if isinstance(action, StartSilenceApplyAction): + self._require_phase(next_state, WorkflowPhase.SILENCE) + source_file = await self._require_source_file( + next_state, + project=project, + requester=requester, + ) + if action.cuts is not None: + next_state.silence.reviewed_cuts = [ + CutRegionState.model_validate(cut) for cut in action.cuts + ] + if not next_state.silence.reviewed_cuts: + raise ProjectWorkflowValidationError("Нет выбранных фрагментов для применения") + task_service = self._task_service_factory() + response = await task_service.submit_silence_apply( + requester=requester, + request=SilenceApplyRequest( + file_key=source_file.path, + out_folder=action.out_folder, + project_id=project.id, + output_name=action.output_name, + cuts=[cut.model_dump(mode="json") for cut in next_state.silence.reviewed_cuts], + ), + ) + next_state.active_job = ActiveJobState( + job_id=response.job_id, + job_type="SILENCE_APPLY", + ) + next_state.silence.status = SilenceWorkflowStatus.APPLYING + return next_state + + if isinstance(action, ReopenSilenceReviewAction): + if not next_state.silence.detected_segments: + raise ProjectWorkflowValidationError("Нет результатов детекции тишины") + next_state.phase = WorkflowPhase.SILENCE + next_state.active_job = None + next_state.silence.status = SilenceWorkflowStatus.REVIEWING + return next_state + + if isinstance(action, StartTranscriptionAction): + if next_state.phase not in {WorkflowPhase.SILENCE, WorkflowPhase.TRANSCRIPTION}: + raise ProjectWorkflowValidationError("Транскрибация пока недоступна") + transcription_file = await self._resolve_transcription_input_file( + next_state, + project=project, + requester=requester, + ) + request_payload = action.request or TranscriptionGenerateRequest( + engine=action.engine, + language=action.language, + model=action.model, + file_key=transcription_file.path, + project_id=project.id, + ) + task_service = self._task_service_factory() + response = await task_service.submit_transcription_generate( + requester=requester, + request=TranscriptionGenerateRequest( + file_key=transcription_file.path, + project_id=project.id, + engine=request_payload.engine, + language=request_payload.language, + model=request_payload.model, + ), + ) + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.active_job = ActiveJobState( + job_id=response.job_id, + job_type="TRANSCRIPTION_GENERATE", + ) + next_state.transcription.status = TranscriptionWorkflowStatus.PROCESSING + next_state.transcription.request = action.request or next_state.transcription.request + next_state.transcription.job_id = response.job_id + next_state.transcription.artifact_id = None + next_state.transcription.transcription_id = None + next_state.transcription.reviewed = False + next_state.captions = CaptionsState() + return next_state + + if isinstance(action, ReopenTranscriptionConfigAction): + if next_state.phase not in { + WorkflowPhase.TRANSCRIPTION, + WorkflowPhase.CAPTIONS, + WorkflowPhase.DONE, + }: + raise ProjectWorkflowValidationError("Нельзя вернуться к настройкам транскрибации") + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.active_job = None + next_state.transcription = TranscriptionState( + request=next_state.transcription.request, + ) + next_state.captions = CaptionsState() + return next_state + + if isinstance(action, MarkTranscriptionReviewedAction): + self._require_phase(next_state, WorkflowPhase.TRANSCRIPTION) + if next_state.transcription.transcription_id is None: + raise ProjectWorkflowValidationError("Сначала завершите транскрибацию") + next_state.transcription.reviewed = True + next_state.transcription.status = TranscriptionWorkflowStatus.COMPLETED + next_state.phase = WorkflowPhase.CAPTIONS + if next_state.captions.status == CaptionsWorkflowStatus.IDLE: + next_state.captions.status = CaptionsWorkflowStatus.CONFIGURED + return next_state + + if isinstance(action, SelectCaptionPresetAction): + if next_state.phase not in {WorkflowPhase.CAPTIONS, WorkflowPhase.DONE}: + raise ProjectWorkflowValidationError("Сначала завершите транскрибацию") + preset = await self._validate_caption_preset( + requester=requester, + preset_id=action.preset_id, + ) + next_state.phase = WorkflowPhase.CAPTIONS + next_state.active_job = None + next_state.captions.preset_id = action.preset_id + next_state.captions.style_config = ( + action.style_config + if action.style_config is not None + else (preset.style_config if preset is not None else None) + ) + next_state.captions.render_job_id = None + next_state.captions.output_file_id = None + next_state.captions.status = CaptionsWorkflowStatus.CONFIGURED + return next_state + + if isinstance(action, StartCaptionRenderAction): + if next_state.phase not in {WorkflowPhase.CAPTIONS, WorkflowPhase.DONE}: + raise ProjectWorkflowValidationError("Рендер субтитров пока недоступен") + if next_state.transcription.transcription_id is None: + raise ProjectWorkflowValidationError("Сначала завершите транскрибацию") + video_file = await self._resolve_caption_video_file( + next_state, + project=project, + requester=requester, + ) + transcription = await self._transcription_repo.get_by_id( + next_state.transcription.transcription_id + ) + if transcription is None: + raise ProjectWorkflowValidationError("Транскрипция не найдена") + + task_service = self._task_service_factory() + response = await task_service.submit_captions_generate( + requester=requester, + request=CaptionsGenerateRequest( + video_s3_path=video_file.path, + folder=action.folder, + transcription_id=transcription.id, + project_id=project.id, + preset_id=next_state.captions.preset_id, + style_config=next_state.captions.style_config, + ), + ) + next_state.phase = WorkflowPhase.CAPTIONS + next_state.active_job = ActiveJobState( + job_id=response.job_id, + job_type="CAPTIONS_GENERATE", + ) + next_state.captions.render_job_id = response.job_id + next_state.captions.output_file_id = None + next_state.captions.status = CaptionsWorkflowStatus.PROCESSING + return next_state + + if isinstance(action, ReopenCaptionConfigAction): + if next_state.phase not in {WorkflowPhase.CAPTIONS, WorkflowPhase.DONE}: + raise ProjectWorkflowValidationError("Нельзя вернуться к настройкам рендера") + next_state.phase = WorkflowPhase.CAPTIONS + next_state.active_job = None + next_state.captions.render_job_id = None + next_state.captions.output_file_id = None + next_state.captions.status = CaptionsWorkflowStatus.CONFIGURED + return next_state + + if isinstance(action, SetWorkspaceViewAction): + await self._validate_workspace_view_items( + requester=requester, + project=project, + used_file_ids=action.workspace_view.used_file_ids, + selected_file_id=action.workspace_view.selected_file_id, + ) + next_state.workspace_view = action.workspace_view + return next_state + + raise ProjectWorkflowValidationError("Неподдерживаемое действие") + + def _apply_job_event_to_state( + self, + state: ProjectWorkspaceState, + job: Job, + ) -> ProjectWorkspaceState: + next_state = state.model_copy(deep=True) + output_data = job.output_data or {} + + if next_state.active_job is not None and next_state.active_job.job_id == job.id: + next_state.active_job = None + + if job.status in {"FAILED", "CANCELLED"}: + if job.job_type == "MEDIA_CONVERT": + next_state.phase = WorkflowPhase.VERIFY + elif job.job_type == "SILENCE_DETECT": + next_state.phase = WorkflowPhase.SILENCE + next_state.silence.status = SilenceWorkflowStatus.CONFIGURED + next_state.silence.detect_job_id = None + elif job.job_type == "SILENCE_APPLY": + next_state.phase = WorkflowPhase.SILENCE + next_state.silence.status = ( + SilenceWorkflowStatus.REVIEWING + if next_state.silence.detected_segments + else SilenceWorkflowStatus.CONFIGURED + ) + elif job.job_type == "TRANSCRIPTION_GENERATE": + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.transcription.status = TranscriptionWorkflowStatus.IDLE + next_state.transcription.job_id = None + elif job.job_type == "CAPTIONS_GENERATE": + next_state.phase = WorkflowPhase.CAPTIONS + next_state.captions.status = CaptionsWorkflowStatus.CONFIGURED + next_state.captions.render_job_id = None + return next_state + + if job.status != "DONE": + return next_state + + if job.job_type == "MEDIA_CONVERT": + converted_file_id = self._parse_uuid(output_data.get("file_id")) + if converted_file_id is None: + return next_state + next_state.source_file_id = converted_file_id + next_state.phase = WorkflowPhase.VERIFY + self._append_used_file(next_state, converted_file_id) + next_state.workspace_view.selected_file_id = converted_file_id + return next_state + + if job.job_type == "SILENCE_DETECT": + silent_segments = output_data.get("silent_segments") + if not isinstance(silent_segments, list): + return next_state + cut_regions = [CutRegionState.model_validate(item) for item in silent_segments] + next_state.phase = WorkflowPhase.SILENCE + next_state.silence.detect_job_id = job.id + next_state.silence.detected_segments = cut_regions + next_state.silence.reviewed_cuts = cut_regions + next_state.silence.duration_ms = self._parse_int(output_data.get("duration_ms")) + next_state.silence.status = SilenceWorkflowStatus.REVIEWING + return next_state + + if job.job_type == "SILENCE_APPLY": + output_file_id = self._parse_uuid(output_data.get("file_id")) + if output_file_id is None: + return next_state + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.silence.applied_output_file_id = output_file_id + next_state.silence.status = SilenceWorkflowStatus.COMPLETED + self._append_used_file(next_state, output_file_id) + next_state.workspace_view.selected_file_id = output_file_id + return next_state + + if job.job_type == "TRANSCRIPTION_GENERATE": + artifact_id = self._parse_uuid(output_data.get("artifact_id")) + transcription_id = self._parse_uuid(output_data.get("transcription_id")) + if artifact_id is None or transcription_id is None: + return next_state + next_state.phase = WorkflowPhase.TRANSCRIPTION + next_state.transcription.status = TranscriptionWorkflowStatus.REVIEWING + next_state.transcription.job_id = job.id + next_state.transcription.artifact_id = artifact_id + next_state.transcription.transcription_id = transcription_id + next_state.transcription.reviewed = False + return next_state + + if job.job_type == "CAPTIONS_GENERATE": + output_file_id = self._parse_uuid(output_data.get("file_id")) + if output_file_id is None: + return next_state + next_state.phase = WorkflowPhase.DONE + next_state.captions.status = CaptionsWorkflowStatus.COMPLETED + next_state.captions.render_job_id = job.id + next_state.captions.output_file_id = output_file_id + self._append_used_file(next_state, output_file_id) + next_state.workspace_view.selected_file_id = output_file_id + return next_state + + return next_state + + def _to_read_model(self, workspace: ProjectWorkspace) -> ProjectWorkspaceRead: + state = ProjectWorkspaceState.model_validate(workspace.state) + return ProjectWorkspaceRead( + project_id=workspace.project_id, + revision=workspace.revision, + version=state.version, + phase=state.phase, + current_screen=self._derive_current_screen(state), + active_job=state.active_job, + source_file_id=state.source_file_id, + workspace_view=state.workspace_view, + silence=state.silence, + transcription=state.transcription, + captions=state.captions, + ) + + def _derive_current_screen(self, state: ProjectWorkspaceState) -> WorkspaceScreenEnum: + if state.phase == WorkflowPhase.INGEST: + return "upload" + if state.phase == WorkflowPhase.VERIFY: + return "verify" + if state.phase == WorkflowPhase.SILENCE: + if state.silence.status == SilenceWorkflowStatus.DETECTING: + return "processing" + if state.silence.status == SilenceWorkflowStatus.REVIEWING: + return "fragments" + if state.silence.status == SilenceWorkflowStatus.APPLYING: + return "silence-apply-processing" + return "silence-settings" + if state.phase == WorkflowPhase.TRANSCRIPTION: + if state.transcription.status == TranscriptionWorkflowStatus.PROCESSING: + return "transcription-processing" + if state.transcription.status in { + TranscriptionWorkflowStatus.REVIEWING, + TranscriptionWorkflowStatus.COMPLETED, + }: + return "subtitle-revision" + return "transcription-settings" + if state.phase == WorkflowPhase.CAPTIONS: + if state.captions.status == CaptionsWorkflowStatus.PROCESSING: + return "caption-processing" + if state.captions.status == CaptionsWorkflowStatus.COMPLETED: + return "caption-result" + return "caption-settings" + return "caption-result" + + async def _validate_accessible_file( + self, + *, + requester: User, + project: Project, + file_id: UUID, + ) -> _WorkspaceItemValidationResult: + file = await self._file_repo.get_by_id(file_id) + if file is None: + raise ProjectWorkflowValidationError("Файл не найден") + if file.project_id not in {None, project.id}: + raise ProjectWorkflowValidationError("Файл не относится к текущему проекту") + if not requester.is_staff and file.owner_id not in {None, requester.id}: + raise ProjectWorkflowValidationError("Файл недоступен") + return _WorkspaceItemValidationResult(item_id=file.id, file=file) + + async def _require_source_file( + self, + state: ProjectWorkspaceState, + *, + project: Project, + requester: User, + ) -> File: + if state.source_file_id is None: + raise ProjectWorkflowValidationError("Сначала выберите исходный файл") + validation = await self._validate_accessible_file( + requester=requester, + project=project, + file_id=state.source_file_id, + ) + if validation.file is None: + raise ProjectWorkflowValidationError("Исходный файл не найден") + return validation.file + + async def _resolve_transcription_input_file( + self, + state: ProjectWorkspaceState, + *, + project: Project, + requester: User, + ) -> File: + if state.silence.applied_output_file_id is not None: + validation = await self._validate_accessible_file( + requester=requester, + project=project, + file_id=state.silence.applied_output_file_id, + ) + if validation.file is not None: + return validation.file + return await self._require_source_file( + state, + project=project, + requester=requester, + ) + + async def _resolve_caption_video_file( + self, + state: ProjectWorkspaceState, + *, + project: Project, + requester: User, + ) -> File: + return await self._resolve_transcription_input_file( + state, + project=project, + requester=requester, + ) + + async def _validate_caption_preset( + self, + *, + requester: User, + preset_id: UUID | None, + ) -> CaptionPreset | None: + if preset_id is None: + return None + preset = await self._caption_preset_repo.get_by_id(preset_id) + if preset is None: + raise ProjectWorkflowValidationError("Пресет субтитров не найден") + if not requester.is_staff and not preset.is_system and preset.user_id != requester.id: + raise ProjectWorkflowValidationError("Пресет субтитров недоступен") + return preset + + async def _validate_workspace_view_items( + self, + *, + requester: User, + project: Project, + used_file_ids: list[UUID], + selected_file_id: UUID | None, + ) -> None: + if selected_file_id is not None and selected_file_id not in used_file_ids: + raise ProjectWorkflowValidationError( + "Выбранный файл должен входить в список используемых файлов" + ) + seen: set[UUID] = set() + for item_id in used_file_ids: + if item_id in seen: + continue + seen.add(item_id) + await self._validate_workspace_item( + requester=requester, + project=project, + item_id=item_id, + ) + + async def _validate_workspace_item( + self, + *, + requester: User, + project: Project, + item_id: UUID, + ) -> None: + file = await self._file_repo.get_by_id(item_id) + if file is not None: + if file.project_id not in {None, project.id}: + raise ProjectWorkflowValidationError("Файл не относится к текущему проекту") + if not requester.is_staff and file.owner_id not in {None, requester.id}: + raise ProjectWorkflowValidationError("Файл недоступен") + return + + artifact = await self._artifact_repo.get_by_id(item_id) + if artifact is None or artifact.project_id not in {None, project.id}: + raise ProjectWorkflowValidationError("Элемент рабочего пространства не найден") + + def _build_task_service(self): + from cpv3.modules.tasks.service import TaskService + + return TaskService(self._session) + + def _require_phase( + self, + state: ProjectWorkspaceState, + required_phase: WorkflowPhase, + ) -> None: + if state.phase != required_phase: + raise ProjectWorkflowValidationError( + f"Ожидалась фаза {required_phase}, текущая фаза {state.phase}" + ) + + def _append_used_file( + self, + state: ProjectWorkspaceState, + file_id: UUID, + ) -> None: + if file_id not in state.workspace_view.used_file_ids: + state.workspace_view.used_file_ids.append(file_id) + + def _parse_int(self, value: object) -> int | None: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def _parse_uuid(self, value: object) -> UUID | None: + if value is None: + return None + try: + return UUID(str(value)) + except (TypeError, ValueError): + return None diff --git a/cpv3/modules/projects/schemas.py b/cpv3/modules/projects/schemas.py index a8cdc80..b827bc3 100644 --- a/cpv3/modules/projects/schemas.py +++ b/cpv3/modules/projects/schemas.py @@ -19,8 +19,6 @@ class ProjectRead(Schema): folder: str | None status: ProjectStatusEnum - workspace_state: dict | None - is_active: bool created_at: datetime updated_at: datetime @@ -38,4 +36,3 @@ class ProjectUpdate(Schema): language: str | None = None folder: str | None = None status: ProjectStatusEnum | None = None - workspace_state: dict | None = None diff --git a/cpv3/modules/projects/service.py b/cpv3/modules/projects/service.py index 87114be..86c546a 100644 --- a/cpv3/modules/projects/service.py +++ b/cpv3/modules/projects/service.py @@ -14,6 +14,7 @@ class ProjectService: """Service for project business logic and orchestration.""" def __init__(self, session: AsyncSession) -> None: + self._session = session self._repo = ProjectRepository(session) async def list_projects( @@ -32,9 +33,13 @@ class ProjectService: async def create_project(self, *, requester: User, data: ProjectCreate) -> Project: folder = f"/{requester.username}/{data.name}" - return await self._repo.create( + project = await self._repo.create( requester=requester, data=data, folder=folder, status="DRAFT", ) + from cpv3.modules.project_workspaces.service import ProjectWorkspaceService + + await ProjectWorkspaceService(self._session).create_for_project(project) + return project async def update_project(self, project: Project, data: ProjectUpdate) -> Project: return await self._repo.update(project, data) diff --git a/cpv3/modules/tasks/service.py b/cpv3/modules/tasks/service.py index cb04e7c..a568a0e 100644 --- a/cpv3/modules/tasks/service.py +++ b/cpv3/modules/tasks/service.py @@ -36,6 +36,7 @@ from cpv3.modules.jobs.schemas import ( ) from cpv3.modules.media.repository import ArtifactRepository from cpv3.modules.media.schemas import ArtifactMediaFileCreate +from cpv3.modules.project_workspaces.service import ProjectWorkspaceService from cpv3.modules.tasks.schemas import ( CaptionsGenerateRequest, FrameExtractRequest, @@ -140,6 +141,7 @@ DRAMATIQ_BROKER_REF_SEPARATOR = ":" class JobCancelledError(RuntimeError): """Raised when a job was cancelled before completion.""" + # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- @@ -808,10 +810,12 @@ def transcription_generate_actor( ) ) elif engine == "salutespeech": - audio_stream = next( - (s for s in probe.streams if s.codec_type == "audio"), None + audio_stream = next((s for s in probe.streams if s.codec_type == "audio"), None) + sr = ( + int(audio_stream.sample_rate) + if audio_stream and audio_stream.sample_rate + else 16000 ) - sr = int(audio_stream.sample_rate) if audio_stream and audio_stream.sample_rate else 16000 document = _run_async( transcribe_with_salute_speech( storage, @@ -1125,6 +1129,9 @@ class TaskService: self._event_repo = JobEventRepository(session) self._webhook_repo = WebhookRepository(session) + def _get_project_workspace_service(self): + return ProjectWorkspaceService(self._session) + async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job: """Persist the transport-specific broker reference after enqueueing.""" job.broker_id = broker_reference @@ -1310,28 +1317,33 @@ class TaskService: # Save artifacts BEFORE sending notifications so data exists when frontend refetches if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE: try: - await self._save_transcription_artifacts(job) + job = await self._save_transcription_artifacts(job) except Exception: logger.exception("Failed to save transcription artifacts for job %s", job_id) if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE: try: - await self._save_convert_artifacts(job) + job = await self._save_convert_artifacts(job) except Exception: logger.exception("Failed to save convert artifacts for job %s", job_id) if job.job_type == JOB_TYPE_SILENCE_APPLY and event.status == JOB_STATUS_DONE: try: - await self._save_silence_apply_artifacts(job) + job = await self._save_silence_apply_artifacts(job) except Exception: logger.exception("Failed to save silence apply artifacts for job %s", job_id) if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE: try: - await self._save_captions_artifacts(job) + job = await self._save_captions_artifacts(job) except Exception: logger.exception("Failed to save captions artifacts for job %s", job_id) + try: + await self._sync_project_workspace_after_webhook(job) + except Exception: + logger.exception("Failed to project workspace state for job %s", job_id) + # Push real-time notification via WebSocket (after artifacts are persisted) if job.user_id is not None: try: @@ -1344,6 +1356,11 @@ class TaskService: return job + async def _sync_project_workspace_after_webhook(self, job: Job) -> None: + if job.project_id is None: + return + await self._get_project_workspace_service().handle_job_update(job=job) + async def cancel_job(self, job: Job) -> Job: """Cancel a job, clean queued transport state and ignore late webhooks.""" if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED): @@ -1386,9 +1403,14 @@ class TaskService: except Exception: logger.exception("Failed to create cancellation notification for job %s", job.id) + try: + await self._sync_project_workspace_after_webhook(job) + except Exception: + logger.exception("Failed to project workspace state for cancelled job %s", job.id) + return job - async def _save_transcription_artifacts(self, job: Job) -> None: + async def _save_transcription_artifacts(self, job: Job) -> Job: """Create Transcription, ArtifactMediaFile and File records.""" input_data = job.input_data or {} output_data = job.output_data or {} @@ -1407,7 +1429,7 @@ class TaskService: user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping artifact save", job.user_id) - return + return job # Find or create source File record file_repo = FileRepository(self._session) @@ -1472,7 +1494,7 @@ class TaskService: # Create Transcription record transcription_repo = TranscriptionRepository(self._session) engine_db = ENGINE_MAP.get(engine_raw, "LOCAL_WHISPER") - await transcription_repo.create( + transcription = await transcription_repo.create( data=TranscriptionCreate( project_id=project_id, source_file_id=source_file.id, @@ -1483,9 +1505,16 @@ class TaskService: ), ) - logger.info("Saved transcription artifacts for job %s", job.id) + updated_output = dict(output_data) + updated_output["artifact_id"] = str(artifact.id) + updated_output["transcription_id"] = str(transcription.id) + updated_output["source_file_id"] = str(source_file.id) + job = await self._job_repo.update(job, JobUpdate(output_data=updated_output)) - async def _save_convert_artifacts(self, job: Job) -> None: + logger.info("Saved transcription artifacts for job %s", job.id) + return job + + async def _save_convert_artifacts(self, job: Job) -> Job: """Create File and ArtifactMediaFile records for converted MP4.""" input_data = job.input_data or {} output_data = job.output_data or {} @@ -1503,7 +1532,7 @@ class TaskService: user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping convert artifact save", job.user_id) - return + return job # Derive output filename from source file file_repo = FileRepository(self._session) @@ -1542,11 +1571,12 @@ class TaskService: updated_output = dict(output_data) updated_output["file_id"] = str(converted_file.id) - await self._job_repo.update(job, JobUpdate(output_data=updated_output)) + job = await self._job_repo.update(job, JobUpdate(output_data=updated_output)) logger.info("Saved convert artifacts for job %s", job.id) + return job - async def _save_silence_apply_artifacts(self, job: Job) -> None: + async def _save_silence_apply_artifacts(self, job: Job) -> Job: """Create File and ArtifactMediaFile records for silence-applied video.""" input_data = job.input_data or {} output_data = job.output_data or {} @@ -1562,10 +1592,8 @@ class TaskService: user_repo = UserRepository(self._session) user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: - logger.warning( - "User %s not found, skipping silence apply artifact save", job.user_id - ) - return + logger.warning("User %s not found, skipping silence apply artifact save", job.user_id) + return job file_repo = FileRepository(self._session) source_file = await file_repo.get_by_path(file_key) @@ -1601,15 +1629,16 @@ class TaskService: updated_output = dict(output_data) updated_output["file_id"] = str(processed_file.id) - await self._job_repo.update(job, JobUpdate(output_data=updated_output)) + job = await self._job_repo.update(job, JobUpdate(output_data=updated_output)) logger.info( "Saved silence apply artifacts for job %s (file_id=%s)", job.id, processed_file.id, ) + return job - async def _save_captions_artifacts(self, job: Job) -> None: + async def _save_captions_artifacts(self, job: Job) -> Job: """Create File and ArtifactMediaFile records for captioned video.""" input_data = job.input_data or {} output_data = job.output_data or {} @@ -1626,7 +1655,7 @@ class TaskService: user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] if user is None: logger.warning("User %s not found, skipping captions artifact save", job.user_id) - return + return job # Get file size from S3 storage = _get_storage_service() @@ -1670,11 +1699,10 @@ class TaskService: # Update job output_data with file_id so frontend can reference it updated_output = dict(output_data) updated_output["file_id"] = str(captioned_file.id) - job = await self._job_repo.update( - job, JobUpdate(output_data=updated_output) - ) + job = await self._job_repo.update(job, JobUpdate(output_data=updated_output)) logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id) + return job async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest diff --git a/pyproject.toml b/pyproject.toml index d6a220a..98a9dd9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "redis>=5.0.0", "psycopg2-binary>=2.9.9", "tiktoken>=0.3.3", + "greenlet>=3.3.0", ] [dependency-groups] diff --git a/tests/conftest.py b/tests/conftest.py index f694b4e..1729999 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,11 +6,14 @@ from __future__ import annotations import uuid from datetime import timedelta +from pathlib import Path +from tempfile import NamedTemporaryFile from typing import AsyncGenerator from unittest.mock import AsyncMock, MagicMock import pytest from httpx import ASGITransport, AsyncClient +from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from cpv3.db.base import Base @@ -22,20 +25,23 @@ from cpv3.main import app from cpv3.modules.users.models import User -# Use in-memory SQLite for tests (or configure a test database) -TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" - - @pytest.fixture async def test_engine(): """Create a test database engine with tables.""" - engine = create_async_engine(TEST_DATABASE_URL, echo=False) - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.create_all) - yield engine - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await engine.dispose() + with NamedTemporaryFile(suffix=".sqlite3", delete=False) as tmp_db: + db_path = Path(tmp_db.name) + + sync_engine = create_engine(f"sqlite:///{db_path}", echo=False) + Base.metadata.create_all(bind=sync_engine) + + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", echo=False) + try: + yield engine + finally: + await engine.dispose() + Base.metadata.drop_all(bind=sync_engine) + sync_engine.dispose() + db_path.unlink(missing_ok=True) @pytest.fixture diff --git a/tests/integration/test_project_workspaces_endpoints.py b/tests/integration/test_project_workspaces_endpoints.py new file mode 100644 index 0000000..b235876 --- /dev/null +++ b/tests/integration/test_project_workspaces_endpoints.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import uuid + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.modules.files.models import File +from cpv3.modules.projects.models import Project +from cpv3.modules.tasks.schemas import TaskSubmitResponse +from cpv3.modules.tasks.service import TaskService +from cpv3.modules.users.models import User + +pytest.importorskip("greenlet") + + +@pytest.fixture +async def workflow_project(test_db_session: AsyncSession, test_user: User) -> Project: + project = Project( + id=uuid.uuid4(), + owner_id=test_user.id, + name="Workflow Project", + description="Typed workflow test project", + language="ru", + status="DRAFT", + is_active=True, + ) + test_db_session.add(project) + await test_db_session.commit() + await test_db_session.refresh(project) + return project + + +@pytest.fixture +async def source_file( + test_db_session: AsyncSession, + test_user: User, + workflow_project: Project, +) -> File: + file = File( + id=uuid.uuid4(), + owner_id=test_user.id, + project_id=workflow_project.id, + original_filename="source.mp4", + path="users/test/source.mp4", + storage_backend="S3", + mime_type="video/mp4", + size_bytes=1024, + file_format="mp4", + is_uploaded=True, + is_deleted=False, + ) + test_db_session.add(file) + await test_db_session.commit() + await test_db_session.refresh(file) + return file + + +class TestProjectWorkspaceEndpoints: + async def test_get_workspace_returns_default_state( + self, + auth_client: AsyncClient, + workflow_project: Project, + ) -> None: + response = await auth_client.get(f"/api/projects/{workflow_project.id}/workspace") + + assert response.status_code == 200 + data = response.json() + assert data["project_id"] == str(workflow_project.id) + assert data["revision"] == 0 + assert data["version"] == 1 + assert data["phase"] == "INGEST" + assert data["current_screen"] == "upload" + assert data["source_file_id"] is None + assert data["active_job"] is None + assert data["workspace_view"] == { + "used_file_ids": [], + "selected_file_id": None, + } + + async def test_get_workspace_forbidden_for_other_users_project( + self, + auth_client: AsyncClient, + test_db_session: AsyncSession, + other_user: User, + ) -> None: + foreign_project = Project( + id=uuid.uuid4(), + owner_id=other_user.id, + name="Other Project", + description=None, + language="ru", + status="DRAFT", + is_active=True, + ) + test_db_session.add(foreign_project) + await test_db_session.commit() + + response = await auth_client.get(f"/api/projects/{foreign_project.id}/workspace") + + assert response.status_code == 403 + + async def test_set_source_file_action_updates_workspace( + self, + auth_client: AsyncClient, + workflow_project: Project, + source_file: File, + ) -> None: + response = await auth_client.post( + f"/api/projects/{workflow_project.id}/workflow/actions", + json={ + "type": "SET_SOURCE_FILE", + "revision": 0, + "file_id": str(source_file.id), + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["revision"] == 1 + assert data["phase"] == "VERIFY" + assert data["current_screen"] == "verify" + assert data["source_file_id"] == str(source_file.id) + assert data["workspace_view"] == { + "used_file_ids": [str(source_file.id)], + "selected_file_id": str(source_file.id), + } + + async def test_action_returns_conflict_on_stale_revision( + self, + auth_client: AsyncClient, + workflow_project: Project, + source_file: File, + ) -> None: + first_response = await auth_client.post( + f"/api/projects/{workflow_project.id}/workflow/actions", + json={ + "type": "SET_SOURCE_FILE", + "revision": 0, + "file_id": str(source_file.id), + }, + ) + assert first_response.status_code == 200 + + response = await auth_client.post( + f"/api/projects/{workflow_project.id}/workflow/actions", + json={ + "type": "RESET_SOURCE_FILE", + "revision": 0, + }, + ) + + assert response.status_code == 409 + + async def test_start_media_convert_action_sets_active_job( + self, + auth_client: AsyncClient, + workflow_project: Project, + source_file: File, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + async def fake_submit_media_convert( + self, + *, + requester: User, + request, + ) -> TaskSubmitResponse: + assert requester.id == workflow_project.owner_id + assert request.file_key == source_file.path + assert request.project_id == workflow_project.id + return TaskSubmitResponse( + job_id=uuid.UUID("00000000-0000-4000-a000-000000000123"), + webhook_url=("http://test/api/tasks/webhook/00000000-0000-4000-a000-000000000123/"), + status="PENDING", + ) + + monkeypatch.setattr( + TaskService, + "submit_media_convert", + fake_submit_media_convert, + ) + + set_source_response = await auth_client.post( + f"/api/projects/{workflow_project.id}/workflow/actions", + json={ + "type": "SET_SOURCE_FILE", + "revision": 0, + "file_id": str(source_file.id), + }, + ) + assert set_source_response.status_code == 200 + + response = await auth_client.post( + f"/api/projects/{workflow_project.id}/workflow/actions", + json={ + "type": "START_MEDIA_CONVERT", + "revision": 1, + "output_format": "mp4", + "out_folder": "output_files", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["revision"] == 2 + assert data["phase"] == "VERIFY" + assert data["current_screen"] == "verify" + assert data["active_job"] == { + "job_id": "00000000-0000-4000-a000-000000000123", + "job_type": "MEDIA_CONVERT", + } diff --git a/tests/unit/test_project_workspace_service.py b/tests/unit/test_project_workspace_service.py new file mode 100644 index 0000000..3b388da --- /dev/null +++ b/tests/unit/test_project_workspace_service.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import uuid +from types import SimpleNamespace + +import pytest + +from cpv3.modules.project_workspaces.schemas import ( + ProjectWorkspaceState, + build_workspace_state_from_legacy, +) +from cpv3.modules.project_workspaces.service import ProjectWorkspaceService + + +def test_build_workspace_state_from_legacy_maps_known_fields() -> None: + source_file_id = uuid.uuid4() + active_job_id = uuid.uuid4() + silence_job_id = uuid.uuid4() + artifact_id = uuid.uuid4() + + workspace = build_workspace_state_from_legacy( + { + "wizard": { + "current_step": "subtitle-revision", + "primary_file_id": str(source_file_id), + "active_job_id": str(active_job_id), + "active_job_type": "TRANSCRIPTION_GENERATE", + "silence_job_id": str(silence_job_id), + "transcription_artifact_id": str(artifact_id), + "silence_settings": { + "min_silence_duration_ms": 350, + "silence_threshold_db": 21, + "padding_ms": 180, + }, + "unknown_field": "ignored", + }, + "used_files": [ + {"id": str(source_file_id), "path": "users/test/source.mp4"}, + {"id": "not-a-uuid", "path": "broken"}, + ], + "unknown_root": {"ignored": True}, + } + ) + + assert workspace.phase == "TRANSCRIPTION" + assert workspace.source_file_id == source_file_id + assert workspace.active_job is not None + assert workspace.active_job.job_id == active_job_id + assert workspace.active_job.job_type == "TRANSCRIPTION_GENERATE" + assert workspace.workspace_view.used_file_ids == [source_file_id] + assert workspace.workspace_view.selected_file_id == source_file_id + assert workspace.silence.detect_job_id == silence_job_id + assert workspace.silence.settings.min_silence_duration_ms == 350 + assert workspace.silence.settings.silence_threshold_db == 21 + assert workspace.silence.settings.padding_ms == 180 + assert workspace.transcription.artifact_id == artifact_id + assert workspace.captions.output_file_id is None + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("job_type", "output_data", "initial_state", "expected"), + [ + ( + "MEDIA_CONVERT", + {"file_id": "00000000-0000-4000-a000-000000000101"}, + { + "phase": "VERIFY", + "source_file_id": "00000000-0000-4000-a000-000000000001", + "active_job": { + "job_id": "00000000-0000-4000-a000-000000000010", + "job_type": "MEDIA_CONVERT", + }, + }, + { + "phase": "VERIFY", + "source_file_id": "00000000-0000-4000-a000-000000000101", + "active_job": None, + "current_screen": "verify", + }, + ), + ( + "SILENCE_DETECT", + { + "silent_segments": [{"start_ms": 100, "end_ms": 220}], + "duration_ms": 1000, + }, + { + "phase": "SILENCE", + "source_file_id": "00000000-0000-4000-a000-000000000001", + "active_job": { + "job_id": "00000000-0000-4000-a000-000000000011", + "job_type": "SILENCE_DETECT", + }, + "silence": {"status": "DETECTING"}, + }, + { + "phase": "SILENCE", + "active_job": None, + "silence_status": "REVIEWING", + "current_screen": "fragments", + }, + ), + ( + "SILENCE_APPLY", + {"file_id": "00000000-0000-4000-a000-000000000102"}, + { + "phase": "SILENCE", + "source_file_id": "00000000-0000-4000-a000-000000000001", + "active_job": { + "job_id": "00000000-0000-4000-a000-000000000012", + "job_type": "SILENCE_APPLY", + }, + "silence": {"status": "APPLYING"}, + }, + { + "phase": "TRANSCRIPTION", + "active_job": None, + "silence_status": "COMPLETED", + "current_screen": "transcription-settings", + }, + ), + ( + "TRANSCRIPTION_GENERATE", + { + "artifact_id": "00000000-0000-4000-a000-000000000103", + "transcription_id": "00000000-0000-4000-a000-000000000104", + }, + { + "phase": "TRANSCRIPTION", + "source_file_id": "00000000-0000-4000-a000-000000000001", + "active_job": { + "job_id": "00000000-0000-4000-a000-000000000013", + "job_type": "TRANSCRIPTION_GENERATE", + }, + "transcription": {"status": "PROCESSING"}, + }, + { + "phase": "TRANSCRIPTION", + "active_job": None, + "transcription_status": "REVIEWING", + "current_screen": "subtitle-revision", + }, + ), + ( + "CAPTIONS_GENERATE", + {"file_id": "00000000-0000-4000-a000-000000000105"}, + { + "phase": "CAPTIONS", + "source_file_id": "00000000-0000-4000-a000-000000000001", + "active_job": { + "job_id": "00000000-0000-4000-a000-000000000014", + "job_type": "CAPTIONS_GENERATE", + }, + "captions": {"status": "PROCESSING"}, + }, + { + "phase": "DONE", + "active_job": None, + "captions_status": "COMPLETED", + "current_screen": "caption-result", + }, + ), + ], +) +async def test_apply_job_event_advances_workspace_for_done_jobs( + job_type: str, + output_data: dict[str, object], + initial_state: dict[str, object], + expected: dict[str, object], +) -> None: + service = ProjectWorkspaceService(session=SimpleNamespace()) + + state = ProjectWorkspaceState.model_validate( + { + "version": 1, + "phase": "INGEST", + "active_job": None, + "source_file_id": None, + "workspace_view": {"used_file_ids": [], "selected_file_id": None}, + "silence": {}, + "transcription": {}, + "captions": {}, + **initial_state, + } + ) + + job = SimpleNamespace( + id=uuid.UUID(str(state.active_job.job_id)) if state.active_job else uuid.uuid4(), + project_id=uuid.uuid4(), + job_type=job_type, + status="DONE", + output_data=output_data, + ) + + next_state = service._apply_job_event_to_state(state, job) + current_screen = service._derive_current_screen(next_state) + + assert next_state.phase == expected["phase"] + assert next_state.active_job == expected["active_job"] + assert current_screen == expected["current_screen"] + + if "source_file_id" in expected: + assert str(next_state.source_file_id) == expected["source_file_id"] + if "silence_status" in expected: + assert next_state.silence.status == expected["silence_status"] + if "transcription_status" in expected: + assert next_state.transcription.status == expected["transcription_status"] + if "captions_status" in expected: + assert next_state.captions.status == expected["captions_status"] diff --git a/tests/unit/test_project_workspaces_service.py b/tests/unit/test_project_workspaces_service.py new file mode 100644 index 0000000..ad2951d --- /dev/null +++ b/tests/unit/test_project_workspaces_service.py @@ -0,0 +1,331 @@ +from __future__ import annotations + +import uuid +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from cpv3.modules.project_workspaces.schemas import ( + ActiveJobState, + ProjectWorkspaceState, + SetSourceFileAction, + SilenceSettingsState, + SilenceState, + StartSilenceDetectAction, + StartTranscriptionAction, + TranscriptionRequestState, +) +from cpv3.modules.project_workspaces.service import ( + ProjectWorkspaceService, + WorkspaceRevisionConflictError, +) + + +@pytest.mark.asyncio +async def test_get_workspace_returns_default_state_when_workspace_missing() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=None), + create=AsyncMock( + return_value=SimpleNamespace( + project_id=project.id, + revision=0, + state=ProjectWorkspaceState().model_dump(mode="json"), + ) + ), + ) + + workspace = await service.get_workspace(project=project) + + assert workspace.revision == 0 + assert workspace.phase == "INGEST" + assert workspace.current_screen == "upload" + assert workspace.active_job is None + assert workspace.source_file_id is None + assert workspace.workspace_view.used_file_ids == [] + assert workspace.workspace_view.selected_file_id is None + + +@pytest.mark.asyncio +async def test_apply_action_set_source_file_moves_workspace_to_verify() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + requester = SimpleNamespace(id=uuid.uuid4(), is_staff=False) + file_id = uuid.uuid4() + workspace_row = SimpleNamespace( + project_id=project.id, + revision=0, + state=ProjectWorkspaceState().model_dump(mode="json"), + ) + saved_state: dict[str, object] = {} + + async def update_state(*, project_id, expected_revision, state): + saved_state.update(state) + return SimpleNamespace( + project_id=project_id, + revision=expected_revision + 1, + state=state, + ) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=workspace_row), + create=AsyncMock(), + update_state=AsyncMock(side_effect=update_state), + ) + service._file_repo = SimpleNamespace( + get_by_id=AsyncMock( + return_value=SimpleNamespace( + id=file_id, + owner_id=requester.id, + project_id=project.id, + path="users/test/source.mp4", + ) + ) + ) + + workspace = await service.apply_action( + requester=requester, + project=project, + action=SetSourceFileAction( + type="SET_SOURCE_FILE", + revision=0, + file_id=file_id, + ), + ) + + assert workspace.revision == 1 + assert workspace.phase == "VERIFY" + assert workspace.current_screen == "verify" + assert workspace.source_file_id == file_id + assert saved_state["source_file_id"] == str(file_id) + assert saved_state["workspace_view"] == { + "used_file_ids": [str(file_id)], + "selected_file_id": str(file_id), + } + + +@pytest.mark.asyncio +async def test_apply_action_rejects_stale_revision() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + workspace_row = SimpleNamespace( + project_id=project.id, + revision=2, + state=ProjectWorkspaceState().model_dump(mode="json"), + ) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=workspace_row), + create=AsyncMock(), + update_state=AsyncMock(), + ) + + with pytest.raises(WorkspaceRevisionConflictError): + await service.apply_action( + requester=SimpleNamespace(id=uuid.uuid4(), is_staff=False), + project=project, + action=SetSourceFileAction( + type="SET_SOURCE_FILE", + revision=1, + file_id=uuid.uuid4(), + ), + ) + + +@pytest.mark.asyncio +async def test_start_silence_detect_submits_task_and_tracks_active_job() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + requester = SimpleNamespace(id=uuid.uuid4(), is_staff=False) + source_file_id = uuid.uuid4() + workspace_state = ProjectWorkspaceState( + phase="SILENCE", + source_file_id=source_file_id, + silence=SilenceState( + status="CONFIGURED", + settings=SilenceSettingsState( + min_silence_duration_ms=250, + silence_threshold_db=18, + padding_ms=125, + ), + ), + ) + workspace_row = SimpleNamespace( + project_id=project.id, + revision=0, + state=workspace_state.model_dump(mode="json"), + ) + submitted_response = SimpleNamespace(job_id=uuid.uuid4(), status="PENDING") + + async def update_state(*, project_id, expected_revision, state): + return SimpleNamespace( + project_id=project_id, + revision=expected_revision + 1, + state=state, + ) + + task_service = SimpleNamespace( + submit_silence_detect=AsyncMock(return_value=submitted_response), + ) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=workspace_row), + create=AsyncMock(), + update_state=AsyncMock(side_effect=update_state), + ) + service._file_repo = SimpleNamespace( + get_by_id=AsyncMock( + return_value=SimpleNamespace( + id=source_file_id, + owner_id=requester.id, + project_id=project.id, + path="projects/test/video.mp4", + ) + ) + ) + service._task_service_factory = lambda: task_service + + workspace = await service.apply_action( + requester=requester, + project=project, + action=StartSilenceDetectAction( + type="START_SILENCE_DETECT", + revision=0, + ), + ) + + task_service.submit_silence_detect.assert_awaited_once() + assert workspace.current_screen == "processing" + assert workspace.active_job == ActiveJobState( + job_id=submitted_response.job_id, + job_type="SILENCE_DETECT", + ) + assert workspace.silence.detect_job_id == submitted_response.job_id + + +@pytest.mark.asyncio +async def test_start_transcription_persists_request_and_processing_job() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + requester = SimpleNamespace(id=uuid.uuid4(), is_staff=False) + source_file_id = uuid.uuid4() + workspace_state = ProjectWorkspaceState( + phase="TRANSCRIPTION", + source_file_id=source_file_id, + ) + workspace_row = SimpleNamespace( + project_id=project.id, + revision=3, + state=workspace_state.model_dump(mode="json"), + ) + submitted_response = SimpleNamespace(job_id=uuid.uuid4(), status="PENDING") + + async def update_state(*, project_id, expected_revision, state): + return SimpleNamespace( + project_id=project_id, + revision=expected_revision + 1, + state=state, + ) + + task_service = SimpleNamespace( + submit_transcription_generate=AsyncMock(return_value=submitted_response), + ) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=workspace_row), + create=AsyncMock(), + update_state=AsyncMock(side_effect=update_state), + ) + service._file_repo = SimpleNamespace( + get_by_id=AsyncMock( + return_value=SimpleNamespace( + id=source_file_id, + owner_id=requester.id, + project_id=project.id, + path="projects/test/video.mp4", + ) + ) + ) + service._task_service_factory = lambda: task_service + + request = TranscriptionRequestState(engine="whisper", language="ru", model="base") + workspace = await service.apply_action( + requester=requester, + project=project, + action=StartTranscriptionAction( + type="START_TRANSCRIPTION", + revision=3, + request=request, + ), + ) + + assert workspace.current_screen == "transcription-processing" + assert workspace.transcription.request == request + assert workspace.active_job == ActiveJobState( + job_id=submitted_response.job_id, + job_type="TRANSCRIPTION_GENERATE", + ) + + +@pytest.mark.asyncio +async def test_apply_job_update_moves_transcription_job_to_review() -> None: + project = SimpleNamespace(id=uuid.uuid4(), workspace_state=None) + job_id = uuid.uuid4() + transcription_id = uuid.uuid4() + artifact_id = uuid.uuid4() + workspace_state = ProjectWorkspaceState( + phase="TRANSCRIPTION", + active_job=ActiveJobState(job_id=job_id, job_type="TRANSCRIPTION_GENERATE"), + transcription={ + "status": "PROCESSING", + "job_id": job_id, + "artifact_id": None, + "transcription_id": None, + "reviewed": False, + }, + ) + workspace_row = SimpleNamespace( + project_id=project.id, + revision=4, + state=workspace_state.model_dump(mode="json"), + ) + + async def update_state(*, project_id, expected_revision, state): + return SimpleNamespace( + project_id=project_id, + revision=expected_revision + 1, + state=state, + ) + + service = ProjectWorkspaceService(session=AsyncMock()) + service._repo = SimpleNamespace( + get_by_project_id=AsyncMock(return_value=workspace_row), + create=AsyncMock(), + update_state=AsyncMock(side_effect=update_state), + ) + + workspace = await service.apply_job_update( + project=project, + job=SimpleNamespace( + id=job_id, + project_id=project.id, + job_type="TRANSCRIPTION_GENERATE", + status="DONE", + output_data={ + "transcription_id": str(transcription_id), + "artifact_id": str(artifact_id), + }, + ), + ) + + assert workspace is not None + assert workspace.revision == 5 + assert workspace.phase == "TRANSCRIPTION" + assert workspace.current_screen == "subtitle-revision" + assert workspace.active_job is None + assert workspace.transcription.transcription_id == transcription_id + assert workspace.transcription.artifact_id == artifact_id + assert workspace.transcription.reviewed is False diff --git a/tests/unit/test_task_service.py b/tests/unit/test_task_service.py index faa0c8b..d421af3 100644 --- a/tests/unit/test_task_service.py +++ b/tests/unit/test_task_service.py @@ -12,134 +12,218 @@ from cpv3.modules.tasks.service import TaskService @pytest.mark.asyncio async def test_submit_captions_generate_reuses_existing_active_job() -> None: - service = TaskService(session=AsyncMock()) - existing_job_id = uuid.uuid4() - existing_job = SimpleNamespace( - id=existing_job_id, - status="RUNNING", - ) + service = TaskService(session=AsyncMock()) + existing_job_id = uuid.uuid4() + existing_job = SimpleNamespace( + id=existing_job_id, + status="RUNNING", + ) - service._find_duplicate_active_job = AsyncMock(return_value=existing_job) - service._submit_task = AsyncMock() + service._find_duplicate_active_job = AsyncMock(return_value=existing_job) + service._submit_task = AsyncMock() - response = await service.submit_captions_generate( - requester=SimpleNamespace(id=uuid.uuid4()), - request=CaptionsGenerateRequest( - video_s3_path="projects/test/video.mp4", - folder="output_files", - transcription_id=uuid.uuid4(), - project_id=uuid.uuid4(), - preset_id=uuid.uuid4(), - ), - ) + response = await service.submit_captions_generate( + requester=SimpleNamespace(id=uuid.uuid4()), + request=CaptionsGenerateRequest( + video_s3_path="projects/test/video.mp4", + folder="output_files", + transcription_id=uuid.uuid4(), + project_id=uuid.uuid4(), + preset_id=uuid.uuid4(), + ), + ) - assert response.job_id == existing_job_id - assert response.status == "RUNNING" - assert response.webhook_url.endswith(f"/api/tasks/webhook/{existing_job_id}/") - service._submit_task.assert_not_awaited() + assert response.job_id == existing_job_id + assert response.status == "RUNNING" + assert response.webhook_url.endswith(f"/api/tasks/webhook/{existing_job_id}/") + service._submit_task.assert_not_awaited() @pytest.mark.asyncio async def test_record_webhook_event_ignores_cancelled_job() -> None: - cancelled_job = SimpleNamespace( - id=uuid.uuid4(), - status="CANCELLED", - ) - job_repo = SimpleNamespace( - get_by_id=AsyncMock(return_value=cancelled_job), - update=AsyncMock(), - ) - event_repo = SimpleNamespace(create=AsyncMock()) + cancelled_job = SimpleNamespace( + id=uuid.uuid4(), + status="CANCELLED", + ) + job_repo = SimpleNamespace( + get_by_id=AsyncMock(return_value=cancelled_job), + update=AsyncMock(), + ) + event_repo = SimpleNamespace(create=AsyncMock()) - service = TaskService(session=AsyncMock()) - service._job_repo = job_repo - service._event_repo = event_repo + service = TaskService(session=AsyncMock()) + service._job_repo = job_repo + service._event_repo = event_repo - result = await service.record_webhook_event( - job_id=cancelled_job.id, - event=TaskWebhookEvent( - status="DONE", - current_message="Готово", - output_data={"output_path": "projects/test/output.mp4"}, - ), - ) + result = await service.record_webhook_event( + job_id=cancelled_job.id, + event=TaskWebhookEvent( + status="DONE", + current_message="Готово", + output_data={"output_path": "projects/test/output.mp4"}, + ), + ) - assert result is cancelled_job - job_repo.update.assert_not_awaited() - event_repo.create.assert_not_awaited() + assert result is cancelled_job + job_repo.update.assert_not_awaited() + event_repo.create.assert_not_awaited() @pytest.mark.asyncio async def test_cancel_job_marks_job_cancelled_and_keeps_record() -> None: - job_id = uuid.uuid4() - user_id = uuid.uuid4() - job = SimpleNamespace( - id=job_id, - status="PENDING", - broker_id="default:redis-message-id", - job_type="CAPTIONS_GENERATE", - user_id=user_id, - ) - cancelled_job = SimpleNamespace( - id=job_id, - status="CANCELLED", - broker_id="default:redis-message-id", - job_type="CAPTIONS_GENERATE", - user_id=user_id, - current_message="Отменено пользователем", - ) + job_id = uuid.uuid4() + user_id = uuid.uuid4() + job = SimpleNamespace( + id=job_id, + status="PENDING", + broker_id="default:redis-message-id", + job_type="CAPTIONS_GENERATE", + user_id=user_id, + ) + cancelled_job = SimpleNamespace( + id=job_id, + status="CANCELLED", + broker_id="default:redis-message-id", + job_type="CAPTIONS_GENERATE", + user_id=user_id, + current_message="Отменено пользователем", + ) - service = TaskService(session=AsyncMock()) - service._job_repo = SimpleNamespace(update=AsyncMock(return_value=cancelled_job)) - service._event_repo = SimpleNamespace(create=AsyncMock()) - service._cancel_dramatiq_message = AsyncMock() - service._cancel_caption_render = AsyncMock() - service._create_cancellation_notification = AsyncMock() + service = TaskService(session=AsyncMock()) + service._job_repo = SimpleNamespace(update=AsyncMock(return_value=cancelled_job)) + service._event_repo = SimpleNamespace(create=AsyncMock()) + service._cancel_dramatiq_message = AsyncMock() + service._cancel_caption_render = AsyncMock() + service._create_cancellation_notification = AsyncMock() + service._sync_project_workspace_after_webhook = AsyncMock() - result = await service.cancel_job(job) + result = await service.cancel_job(job) - assert result is cancelled_job - service._job_repo.update.assert_awaited_once() - service._event_repo.create.assert_awaited_once() - service._cancel_dramatiq_message.assert_awaited_once_with(job.broker_id) - service._cancel_caption_render.assert_awaited_once_with(job) - service._create_cancellation_notification.assert_awaited_once_with( - cancelled_job - ) + assert result is cancelled_job + service._job_repo.update.assert_awaited_once() + service._event_repo.create.assert_awaited_once() + service._cancel_dramatiq_message.assert_awaited_once_with(job.broker_id) + service._cancel_caption_render.assert_awaited_once_with(job) + service._create_cancellation_notification.assert_awaited_once_with(cancelled_job) + service._sync_project_workspace_after_webhook.assert_awaited_once_with(cancelled_job) @pytest.mark.asyncio async def test_record_webhook_event_updates_progress_for_conversion_job() -> None: - job = SimpleNamespace( - id=uuid.uuid4(), - status="RUNNING", - job_type="MEDIA_CONVERT", - user_id=None, - ) - updated_job = SimpleNamespace(**job.__dict__, project_pct=52.5) - job_repo = SimpleNamespace( - get_by_id=AsyncMock(return_value=job), - update=AsyncMock(return_value=updated_job), - ) - event_repo = SimpleNamespace(create=AsyncMock()) + job = SimpleNamespace( + id=uuid.uuid4(), + status="RUNNING", + job_type="MEDIA_CONVERT", + project_id=uuid.uuid4(), + user_id=None, + ) + updated_job = SimpleNamespace(**job.__dict__, project_pct=52.5) + job_repo = SimpleNamespace( + get_by_id=AsyncMock(return_value=job), + update=AsyncMock(return_value=updated_job), + ) + event_repo = SimpleNamespace(create=AsyncMock()) - service = TaskService(session=AsyncMock()) - service._job_repo = job_repo - service._event_repo = event_repo + service = TaskService(session=AsyncMock()) + service._job_repo = job_repo + service._event_repo = event_repo + service._sync_project_workspace_after_webhook = AsyncMock() - result = await service.record_webhook_event( - job_id=job.id, - event=TaskWebhookEvent( - progress_pct=52.5, - current_message="Конвертация видео", - ), - ) + result = await service.record_webhook_event( + job_id=job.id, + event=TaskWebhookEvent( + progress_pct=52.5, + current_message="Конвертация видео", + ), + ) - update_call = job_repo.update.await_args.args[1] - event_call = event_repo.create.await_args.args[0] + update_call = job_repo.update.await_args.args[1] + event_call = event_repo.create.await_args.args[0] - assert result is updated_job - assert update_call.project_pct == 52.5 - assert update_call.current_message == "Конвертация видео" - assert event_call.event_type == "progress" - assert event_call.payload["progress_pct"] == 52.5 + assert result is updated_job + assert update_call.project_pct == 52.5 + assert update_call.current_message == "Конвертация видео" + assert event_call.event_type == "progress" + assert event_call.payload["progress_pct"] == 52.5 + + +@pytest.mark.asyncio +async def test_record_webhook_event_syncs_workspace_for_completed_supported_job() -> None: + job = SimpleNamespace( + id=uuid.uuid4(), + status="RUNNING", + job_type="SILENCE_DETECT", + user_id=None, + project_id=uuid.uuid4(), + output_data={"silent_segments": []}, + ) + updated_job = SimpleNamespace(**{**job.__dict__, "status": "DONE"}) + job_repo = SimpleNamespace( + get_by_id=AsyncMock(return_value=job), + update=AsyncMock(return_value=updated_job), + ) + event_repo = SimpleNamespace(create=AsyncMock()) + + service = TaskService(session=AsyncMock()) + service._job_repo = job_repo + service._event_repo = event_repo + service._sync_project_workspace_after_webhook = AsyncMock() + + result = await service.record_webhook_event( + job_id=job.id, + event=TaskWebhookEvent( + status="DONE", + current_message="Готово", + output_data={"silent_segments": []}, + ), + ) + + assert result is updated_job + service._sync_project_workspace_after_webhook.assert_awaited_once_with(updated_job) + + +@pytest.mark.asyncio +async def test_record_webhook_event_projects_workspace_after_done_job() -> None: + job = SimpleNamespace( + id=uuid.uuid4(), + status="RUNNING", + job_type="MEDIA_CONVERT", + project_id=uuid.uuid4(), + user_id=None, + output_data={"file_path": "users/test/converted.mp4"}, + ) + updated_job = SimpleNamespace( + **{ + **job.__dict__, + "status": "DONE", + "output_data": { + "file_path": "users/test/converted.mp4", + "file_id": "00000000-0000-4000-a000-000000000777", + }, + } + ) + job_repo = SimpleNamespace( + get_by_id=AsyncMock(return_value=job), + update=AsyncMock(return_value=updated_job), + ) + event_repo = SimpleNamespace(create=AsyncMock()) + workspace_service = SimpleNamespace(handle_job_update=AsyncMock()) + + service = TaskService(session=AsyncMock()) + service._job_repo = job_repo + service._event_repo = event_repo + service._save_convert_artifacts = AsyncMock(return_value=updated_job) + service._get_project_workspace_service = lambda: workspace_service + + result = await service.record_webhook_event( + job_id=job.id, + event=TaskWebhookEvent( + status="DONE", + current_message="Готово", + output_data={"file_path": "users/test/converted.mp4"}, + ), + ) + + assert result is updated_job + service._save_convert_artifacts.assert_awaited_once_with(updated_job) + workspace_service.handle_job_update.assert_awaited_once_with(job=updated_job) diff --git a/uv.lock b/uv.lock index 25b7d3a..a328f0a 100644 --- a/uv.lock +++ b/uv.lock @@ -404,6 +404,7 @@ dependencies = [ { name = "dramatiq", extra = ["redis"] }, { name = "fastapi" }, { name = "google-cloud-speech" }, + { name = "greenlet" }, { name = "httpx" }, { name = "openai-whisper" }, { name = "passlib", extra = ["bcrypt"] }, @@ -444,6 +445,7 @@ requires-dist = [ { name = "dramatiq", extras = ["redis"], specifier = ">=1.17.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "google-cloud-speech", specifier = ">=2.34.0" }, + { name = "greenlet", specifier = ">=3.3.0" }, { name = "httpx", specifier = ">=0.27.0" }, { name = "openai-whisper", specifier = ">=20250625" }, { name = "passlib", extras = ["bcrypt"], specifier = ">=1.7.4" }, @@ -628,6 +630,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/cb/48e964c452ca2b92175a9b2dca037a553036cb053ba69e284650ce755f13/greenlet-3.3.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e29f3018580e8412d6aaf5641bb7745d38c85228dacf51a73bd4e26ddf2a6a8e", size = 274908, upload-time = "2025-12-04T14:23:26.435Z" }, { url = "https://files.pythonhosted.org/packages/28/da/38d7bff4d0277b594ec557f479d65272a893f1f2a716cad91efeb8680953/greenlet-3.3.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a687205fb22794e838f947e2194c0566d3812966b41c78709554aa883183fb62", size = 577113, upload-time = "2025-12-04T14:50:05.493Z" }, { url = "https://files.pythonhosted.org/packages/3c/f2/89c5eb0faddc3ff014f1c04467d67dee0d1d334ab81fadbf3744847f8a8a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:4243050a88ba61842186cb9e63c7dfa677ec146160b0efd73b855a3d9c7fcf32", size = 590338, upload-time = "2025-12-04T14:57:41.136Z" }, + { url = "https://files.pythonhosted.org/packages/80/d7/db0a5085035d05134f8c089643da2b44cc9b80647c39e93129c5ef170d8f/greenlet-3.3.0-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:670d0f94cd302d81796e37299bcd04b95d62403883b24225c6b5271466612f45", size = 601098, upload-time = "2025-12-04T15:07:11.898Z" }, { url = "https://files.pythonhosted.org/packages/dc/a6/e959a127b630a58e23529972dbc868c107f9d583b5a9f878fb858c46bc1a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6cb3a8ec3db4a3b0eb8a3c25436c2d49e3505821802074969db017b87bc6a948", size = 590206, upload-time = "2025-12-04T14:26:01.254Z" }, { url = "https://files.pythonhosted.org/packages/48/60/29035719feb91798693023608447283b266b12efc576ed013dd9442364bb/greenlet-3.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:2de5a0b09eab81fc6a382791b995b1ccf2b172a9fec934747a7a23d2ff291794", size = 1550668, upload-time = "2025-12-04T15:04:22.439Z" }, { url = "https://files.pythonhosted.org/packages/0a/5f/783a23754b691bfa86bd72c3033aa107490deac9b2ef190837b860996c9f/greenlet-3.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4449a736606bd30f27f8e1ff4678ee193bc47f6ca810d705981cfffd6ce0d8c5", size = 1615483, upload-time = "2025-12-04T14:27:28.083Z" }, @@ -635,6 +638,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, + { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -642,6 +646,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, + { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -649,6 +654,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" }, { url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" }, { url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" }, + { url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" }, { url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" }, { url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" }, { url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" }, @@ -656,6 +662,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" }, { url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" }, { url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" }, + { url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" }, { url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" }, { url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" }, { url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" },