"""add project_workspaces table Revision ID: e6f7a8b9c0d1 Revises: d5e6f7a8b9c0 Create Date: 2026-04-07 16:00:00.000000 """ from __future__ import annotations import uuid from datetime import datetime, timezone from typing import Any, Sequence, Union from alembic import op import sqlalchemy as sa from sqlalchemy.dialects import postgresql revision: str = "e6f7a8b9c0d1" down_revision: Union[str, None] = "d5e6f7a8b9c0" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def _utc_now() -> datetime: return datetime.now(timezone.utc) def _parse_uuid(raw_value: object) -> str | None: if raw_value is None: return None try: return str(uuid.UUID(str(raw_value))) except (TypeError, ValueError): return None def _default_state() -> dict[str, Any]: return { "version": 1, "phase": "INGEST", "active_job": None, "source_file_id": None, "workspace_view": { "used_file_ids": [], "selected_file_id": None, }, "silence": { "status": "IDLE", "settings": { "min_silence_duration_ms": 200, "silence_threshold_db": 16, "padding_ms": 100, }, "detect_job_id": None, "detected_segments": [], "reviewed_cuts": [], "duration_ms": None, "applied_output_file_id": None, }, "transcription": { "status": "IDLE", "request": { "engine": "whisper", "language": None, "model": "base", }, "job_id": None, "artifact_id": None, "transcription_id": None, "reviewed": False, }, "captions": { "status": "IDLE", "preset_id": None, "style_config": None, "render_job_id": None, "output_file_id": None, }, } def _backfill_state(legacy_workspace_state: dict | None) -> dict[str, Any]: state = _default_state() if not isinstance(legacy_workspace_state, dict): return state wizard = legacy_workspace_state.get("wizard") if not isinstance(wizard, dict): wizard = {} current_step = wizard.get("current_step") step_phase_map = { "upload": "INGEST", "verify": "VERIFY", "silence-settings": "SILENCE", "processing": "SILENCE", "fragments": "SILENCE", "silence-apply-processing": "SILENCE", "transcription-settings": "TRANSCRIPTION", "transcription-processing": "TRANSCRIPTION", "subtitle-revision": "TRANSCRIPTION", "caption-settings": "CAPTIONS", "caption-processing": "CAPTIONS", "caption-result": "DONE", } if current_step in step_phase_map: state["phase"] = step_phase_map[current_step] source_file_id = _parse_uuid(wizard.get("primary_file_id")) if source_file_id is not None: state["source_file_id"] = source_file_id silence_job_id = _parse_uuid(wizard.get("silence_job_id")) if silence_job_id is not None: state["silence"]["detect_job_id"] = silence_job_id transcription_artifact_id = _parse_uuid(wizard.get("transcription_artifact_id")) if transcription_artifact_id is not None: state["transcription"]["artifact_id"] = transcription_artifact_id caption_preset_id = _parse_uuid(wizard.get("caption_preset_id")) if caption_preset_id is not None: state["captions"]["preset_id"] = caption_preset_id caption_style_config = wizard.get("caption_style_config") if isinstance(caption_style_config, dict): state["captions"]["style_config"] = caption_style_config captioned_video_file_id = _parse_uuid(wizard.get("captioned_video_file_id")) if captioned_video_file_id is not None: state["captions"]["output_file_id"] = captioned_video_file_id state["captions"]["status"] = "COMPLETED" state["phase"] = "DONE" active_job_id = _parse_uuid(wizard.get("active_job_id")) active_job_type = wizard.get("active_job_type") if active_job_id is not None and isinstance(active_job_type, str): state["active_job"] = { "job_id": active_job_id, "job_type": active_job_type, } if active_job_type == "TRANSCRIPTION_GENERATE": state["transcription"]["job_id"] = active_job_id if active_job_type == "CAPTIONS_GENERATE": state["captions"]["render_job_id"] = active_job_id silence_settings = wizard.get("silence_settings") if isinstance(silence_settings, dict): state["silence"]["settings"] = { "min_silence_duration_ms": silence_settings.get("min_silence_duration_ms", 200), "silence_threshold_db": silence_settings.get("silence_threshold_db", 16), "padding_ms": silence_settings.get("padding_ms", 100), } state["silence"]["status"] = "CONFIGURED" if current_step == "processing": state["silence"]["status"] = "DETECTING" elif current_step == "fragments": state["silence"]["status"] = "REVIEWING" elif current_step == "silence-apply-processing": state["silence"]["status"] = "APPLYING" elif current_step == "transcription-processing": state["transcription"]["status"] = "PROCESSING" elif current_step == "subtitle-revision": state["transcription"]["status"] = "REVIEWING" elif current_step == "caption-settings": state["captions"]["status"] = "CONFIGURED" elif current_step == "caption-processing": state["captions"]["status"] = "PROCESSING" elif current_step == "caption-result": state["captions"]["status"] = "COMPLETED" used_files = legacy_workspace_state.get("used_files") if isinstance(used_files, list): parsed_ids: list[str] = [] for item in used_files: if not isinstance(item, dict): continue parsed_id = _parse_uuid(item.get("id")) if parsed_id is not None and parsed_id not in parsed_ids: parsed_ids.append(parsed_id) state["workspace_view"]["used_file_ids"] = parsed_ids if source_file_id in parsed_ids: state["workspace_view"]["selected_file_id"] = source_file_id return state def upgrade() -> None: op.create_table( "project_workspaces", sa.Column( "project_id", postgresql.UUID(as_uuid=True), sa.ForeignKey("projects.id", ondelete="CASCADE"), primary_key=True, ), sa.Column("revision", sa.Integer(), nullable=False, server_default=sa.text("0")), sa.Column("state", postgresql.JSONB(astext_type=sa.Text()), nullable=False), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), ) connection = op.get_bind() projects_table = sa.table( "projects", sa.column("id", postgresql.UUID(as_uuid=True)), sa.column("workspace_state", sa.JSON()), sa.column("is_active", sa.Boolean()), ) workspaces_table = sa.table( "project_workspaces", sa.column("project_id", postgresql.UUID(as_uuid=True)), sa.column("revision", sa.Integer()), sa.column("state", postgresql.JSONB(astext_type=sa.Text())), sa.column("created_at", sa.DateTime(timezone=True)), sa.column("updated_at", sa.DateTime(timezone=True)), ) rows = connection.execute( sa.select(projects_table.c.id, projects_table.c.workspace_state).where( projects_table.c.is_active.is_(True) ) ) now = _utc_now() payloads = [ { "project_id": row.id, "revision": 0, "state": _backfill_state(row.workspace_state), "created_at": now, "updated_at": now, } for row in rows ] if payloads: connection.execute(sa.insert(workspaces_table), payloads) def downgrade() -> None: op.drop_table("project_workspaces")