from __future__ import annotations from enum import StrEnum from typing import Annotated, Literal, get_args from uuid import UUID from pydantic import AliasChoices, Field, model_validator from cpv3.common.schemas import Schema from cpv3.modules.jobs.schemas import JobTypeEnum WORKFLOW_VERSION = 1 VALID_JOB_TYPES = set(get_args(JobTypeEnum)) WorkspaceScreenEnum = Literal[ "upload", "verify", "silence-settings", "processing", "fragments", "silence-apply-processing", "transcription-settings", "transcription-processing", "subtitle-revision", "caption-settings", "caption-processing", "caption-result", ] class WorkflowPhase(StrEnum): INGEST = "INGEST" VERIFY = "VERIFY" SILENCE = "SILENCE" TRANSCRIPTION = "TRANSCRIPTION" CAPTIONS = "CAPTIONS" DONE = "DONE" class SilenceWorkflowStatus(StrEnum): IDLE = "IDLE" CONFIGURED = "CONFIGURED" DETECTING = "DETECTING" REVIEWING = "REVIEWING" APPLYING = "APPLYING" COMPLETED = "COMPLETED" SKIPPED = "SKIPPED" class TranscriptionWorkflowStatus(StrEnum): IDLE = "IDLE" PROCESSING = "PROCESSING" REVIEWING = "REVIEWING" COMPLETED = "COMPLETED" class CaptionsWorkflowStatus(StrEnum): IDLE = "IDLE" CONFIGURED = "CONFIGURED" PROCESSING = "PROCESSING" COMPLETED = "COMPLETED" class ActiveJobState(Schema): job_id: UUID job_type: JobTypeEnum class WorkspaceViewState(Schema): used_file_ids: list[UUID] = Field(default_factory=list) selected_file_id: UUID | None = None class SilenceSettingsState(Schema): min_silence_duration_ms: int = 200 silence_threshold_db: int = 16 padding_ms: int = 100 class CutRegionState(Schema): start_ms: int end_ms: int class SilenceState(Schema): status: SilenceWorkflowStatus = SilenceWorkflowStatus.IDLE settings: SilenceSettingsState = Field(default_factory=SilenceSettingsState) detect_job_id: UUID | None = None detected_segments: list[CutRegionState] = Field(default_factory=list) reviewed_cuts: list[CutRegionState] = Field( default_factory=list, validation_alias=AliasChoices("reviewed_cuts", "cut_regions"), serialization_alias="reviewed_cuts", ) duration_ms: int | None = None applied_output_file_id: UUID | None = Field( default=None, validation_alias=AliasChoices("applied_output_file_id", "output_file_id"), serialization_alias="applied_output_file_id", ) class TranscriptionRequestState(Schema): engine: Literal["whisper", "google", "salutespeech"] = "whisper" language: str | None = None model: str = "base" class TranscriptionState(Schema): status: TranscriptionWorkflowStatus = TranscriptionWorkflowStatus.IDLE request: TranscriptionRequestState = Field(default_factory=TranscriptionRequestState) job_id: UUID | None = None artifact_id: UUID | None = None transcription_id: UUID | None = None reviewed: bool = False class CaptionsState(Schema): status: CaptionsWorkflowStatus = CaptionsWorkflowStatus.IDLE preset_id: UUID | None = None style_config: dict | None = None render_job_id: UUID | None = Field( default=None, validation_alias=AliasChoices("render_job_id", "job_id"), serialization_alias="render_job_id", ) output_file_id: UUID | None = None class ProjectWorkspaceState(Schema): version: int = WORKFLOW_VERSION phase: WorkflowPhase = WorkflowPhase.INGEST active_job: ActiveJobState | None = None source_file_id: UUID | None = None workspace_view: WorkspaceViewState = Field(default_factory=WorkspaceViewState) silence: SilenceState = Field(default_factory=SilenceState) transcription: TranscriptionState = Field(default_factory=TranscriptionState) captions: CaptionsState = Field(default_factory=CaptionsState) class ProjectWorkspaceRead(Schema): project_id: UUID revision: int version: int phase: WorkflowPhase current_screen: WorkspaceScreenEnum active_job: ActiveJobState | None source_file_id: UUID | None workspace_view: WorkspaceViewState silence: SilenceState transcription: TranscriptionState captions: CaptionsState class WorkflowActionBase(Schema): type: str revision: int class SetSourceFileAction(WorkflowActionBase): type: Literal["SET_SOURCE_FILE"] file_id: UUID = Field( validation_alias=AliasChoices("file_id", "source_file_id"), serialization_alias="file_id", ) class ResetSourceFileAction(WorkflowActionBase): type: Literal["RESET_SOURCE_FILE"] class StartMediaConvertAction(WorkflowActionBase): type: Literal["START_MEDIA_CONVERT"] output_format: str = "mp4" out_folder: str = "output_files" class ConfirmVerifyAction(WorkflowActionBase): type: Literal["CONFIRM_VERIFY"] class SetSilenceSettingsAction(WorkflowActionBase): type: Literal["SET_SILENCE_SETTINGS"] settings: SilenceSettingsState = Field(default_factory=SilenceSettingsState) @model_validator(mode="before") @classmethod def normalize_settings(cls, data: object) -> object: if not isinstance(data, dict) or "settings" in data: return data return { **data, "settings": { "min_silence_duration_ms": data.get("min_silence_duration_ms", 200), "silence_threshold_db": data.get("silence_threshold_db", 16), "padding_ms": data.get("padding_ms", 100), }, } class StartSilenceDetectAction(WorkflowActionBase): type: Literal["START_SILENCE_DETECT"] class SetSilenceCutsAction(WorkflowActionBase): type: Literal["SET_SILENCE_CUTS"] cuts: list[CutRegionState] = Field( validation_alias=AliasChoices("cuts", "reviewed_cuts", "cut_regions"), ) class SkipSilenceApplyAction(WorkflowActionBase): type: Literal["SKIP_SILENCE_APPLY"] class StartSilenceApplyAction(WorkflowActionBase): type: Literal["START_SILENCE_APPLY"] cuts: list[CutRegionState] | None = None out_folder: str = "output_files" output_name: str | None = None class ReopenSilenceReviewAction(WorkflowActionBase): type: Literal["REOPEN_SILENCE_REVIEW"] class StartTranscriptionAction(WorkflowActionBase): type: Literal["START_TRANSCRIPTION"] engine: Literal["whisper", "google", "salutespeech"] = "whisper" language: str | None = None model: str = "base" request: TranscriptionRequestState | None = None @model_validator(mode="after") def normalize_request(self) -> "StartTranscriptionAction": if self.request is None: self.request = TranscriptionRequestState( engine=self.engine, language=self.language, model=self.model, ) return self self.engine = self.request.engine self.language = self.request.language self.model = self.request.model return self class ReopenTranscriptionConfigAction(WorkflowActionBase): type: Literal["REOPEN_TRANSCRIPTION_CONFIG"] class MarkTranscriptionReviewedAction(WorkflowActionBase): type: Literal["MARK_TRANSCRIPTION_REVIEWED"] class SelectCaptionPresetAction(WorkflowActionBase): type: Literal["SELECT_CAPTION_PRESET"] preset_id: UUID | None = None style_config: dict | None = None class StartCaptionRenderAction(WorkflowActionBase): type: Literal["START_CAPTION_RENDER"] folder: str = "output_files" class ReopenCaptionConfigAction(WorkflowActionBase): type: Literal["REOPEN_CAPTION_CONFIG"] class SetWorkspaceViewAction(WorkflowActionBase): type: Literal["SET_WORKSPACE_VIEW"] workspace_view: WorkspaceViewState @model_validator(mode="before") @classmethod def normalize_workspace_view(cls, data: object) -> object: if not isinstance(data, dict) or "workspace_view" in data: return data return { **data, "workspace_view": { "used_file_ids": data.get("used_file_ids", []), "selected_file_id": data.get("selected_file_id"), }, } WorkflowActionRequest = Annotated[ ( SetSourceFileAction | ResetSourceFileAction | StartMediaConvertAction | ConfirmVerifyAction | SetSilenceSettingsAction | StartSilenceDetectAction | SetSilenceCutsAction | SkipSilenceApplyAction | StartSilenceApplyAction | ReopenSilenceReviewAction | StartTranscriptionAction | ReopenTranscriptionConfigAction | MarkTranscriptionReviewedAction | SelectCaptionPresetAction | StartCaptionRenderAction | ReopenCaptionConfigAction | SetWorkspaceViewAction ), Field(discriminator="type"), ] def build_default_workspace_state() -> ProjectWorkspaceState: return ProjectWorkspaceState() def build_workspace_state_from_legacy( legacy_workspace_state: dict | None, ) -> ProjectWorkspaceState: state = build_default_workspace_state() if not isinstance(legacy_workspace_state, dict): return state wizard = legacy_workspace_state.get("wizard") if not isinstance(wizard, dict): wizard = {} source_file_id = _parse_uuid(wizard.get("primary_file_id")) if source_file_id is not None: state.source_file_id = source_file_id used_file_ids: list[UUID] = [] used_files = legacy_workspace_state.get("used_files") if isinstance(used_files, list): for item in used_files: if not isinstance(item, dict): continue file_id = _parse_uuid(item.get("id")) if file_id is not None and file_id not in used_file_ids: used_file_ids.append(file_id) if source_file_id is not None and source_file_id not in used_file_ids: used_file_ids.insert(0, source_file_id) state.workspace_view.used_file_ids = used_file_ids if source_file_id is not None and source_file_id in used_file_ids: state.workspace_view.selected_file_id = source_file_id active_job_id = _parse_uuid(wizard.get("active_job_id")) active_job_type = wizard.get("active_job_type") if active_job_id is not None and active_job_type in VALID_JOB_TYPES: state.active_job = ActiveJobState( job_id=active_job_id, job_type=active_job_type, ) silence_job_id = _parse_uuid(wizard.get("silence_job_id")) if silence_job_id is not None: state.silence.detect_job_id = silence_job_id transcription_artifact_id = _parse_uuid(wizard.get("transcription_artifact_id")) if transcription_artifact_id is not None: state.transcription.artifact_id = transcription_artifact_id caption_preset_id = _parse_uuid(wizard.get("caption_preset_id")) if caption_preset_id is not None: state.captions.preset_id = caption_preset_id caption_style_config = wizard.get("caption_style_config") if isinstance(caption_style_config, dict): state.captions.style_config = caption_style_config captioned_video_file_id = _parse_uuid(wizard.get("captioned_video_file_id")) if captioned_video_file_id is not None: state.captions.output_file_id = captioned_video_file_id silence_settings = wizard.get("silence_settings") if isinstance(silence_settings, dict): state.silence.settings = SilenceSettingsState.model_validate(silence_settings) state.silence.status = SilenceWorkflowStatus.CONFIGURED current_step = wizard.get("current_step") step_phase_map = { "upload": WorkflowPhase.INGEST, "verify": WorkflowPhase.VERIFY, "silence-settings": WorkflowPhase.SILENCE, "processing": WorkflowPhase.SILENCE, "fragments": WorkflowPhase.SILENCE, "silence-apply-processing": WorkflowPhase.SILENCE, "transcription-settings": WorkflowPhase.TRANSCRIPTION, "transcription-processing": WorkflowPhase.TRANSCRIPTION, "subtitle-revision": WorkflowPhase.TRANSCRIPTION, "caption-settings": WorkflowPhase.CAPTIONS, "caption-processing": WorkflowPhase.CAPTIONS, "caption-result": WorkflowPhase.DONE, } if current_step in step_phase_map: state.phase = step_phase_map[current_step] if current_step == "processing": state.silence.status = SilenceWorkflowStatus.DETECTING elif current_step == "fragments": state.silence.status = SilenceWorkflowStatus.REVIEWING elif current_step == "silence-apply-processing": state.silence.status = SilenceWorkflowStatus.APPLYING elif current_step == "transcription-processing": state.transcription.status = TranscriptionWorkflowStatus.PROCESSING elif current_step == "subtitle-revision": state.transcription.status = TranscriptionWorkflowStatus.REVIEWING elif current_step == "caption-settings": state.captions.status = CaptionsWorkflowStatus.CONFIGURED elif current_step == "caption-processing": state.captions.status = CaptionsWorkflowStatus.PROCESSING elif current_step == "caption-result": state.captions.status = CaptionsWorkflowStatus.COMPLETED if state.active_job is not None: if state.active_job.job_type == "MEDIA_CONVERT": state.phase = WorkflowPhase.VERIFY elif state.active_job.job_type == "SILENCE_DETECT": state.phase = WorkflowPhase.SILENCE state.silence.status = SilenceWorkflowStatus.DETECTING state.silence.detect_job_id = state.active_job.job_id elif state.active_job.job_type == "SILENCE_APPLY": state.phase = WorkflowPhase.SILENCE state.silence.status = SilenceWorkflowStatus.APPLYING elif state.active_job.job_type == "TRANSCRIPTION_GENERATE": state.phase = WorkflowPhase.TRANSCRIPTION state.transcription.status = TranscriptionWorkflowStatus.PROCESSING state.transcription.job_id = state.active_job.job_id elif state.active_job.job_type == "CAPTIONS_GENERATE": state.phase = WorkflowPhase.CAPTIONS state.captions.status = CaptionsWorkflowStatus.PROCESSING state.captions.render_job_id = state.active_job.job_id if captioned_video_file_id is not None: state.phase = WorkflowPhase.DONE state.captions.status = CaptionsWorkflowStatus.COMPLETED elif transcription_artifact_id is not None and ( state.transcription.status == TranscriptionWorkflowStatus.IDLE ): state.phase = WorkflowPhase.TRANSCRIPTION state.transcription.status = TranscriptionWorkflowStatus.REVIEWING elif silence_job_id is not None and state.silence.status == SilenceWorkflowStatus.IDLE: state.phase = WorkflowPhase.SILENCE state.silence.status = SilenceWorkflowStatus.REVIEWING elif source_file_id is not None and state.phase == WorkflowPhase.INGEST: state.phase = WorkflowPhase.VERIFY return state def _parse_uuid(value: object) -> UUID | None: if value is None: return None try: return UUID(str(value)) except (TypeError, ValueError): return None # Backward-compatible aliases used by existing tests and frontend hand-written types. TaskWorkflowActiveJob = ActiveJobState SilenceSettingsPayload = SilenceSettingsState WorkflowSilenceState = SilenceState WorkflowTranscriptionRequest = TranscriptionRequestState