diff --git a/.claude/skills/codex/SKILL.md b/.claude/skills/codex/SKILL.md new file mode 100644 index 0000000..58a4a74 --- /dev/null +++ b/.claude/skills/codex/SKILL.md @@ -0,0 +1,61 @@ +--- +name: codex +description: "Use the `codex exec` CLI to consult an OpenAI-powered codebase oracle. Trigger this skill whenever Claude Code would benefit from a second opinion on the current codebase — architecture questions, debugging strategies, implementation alternatives, understanding unfamiliar code patterns, or when the user explicitly asks to 'ask codex', 'check with codex', 'consult codex', or 'codex exec'. Also trigger when Claude is uncertain about a complex codebase decision, needs to understand legacy code, or wants to validate an approach before executing. The codex command scans the current codebase and sends context + the question to an OpenAI model, returning an independent analysis." +--- + +# Codex — Codebase Oracle via OpenAI + +`codex` is a CLI tool that scans the current project and answers questions about it using an OpenAI model. Claude Code can shell out to it for a second perspective. + +## Usage + +```bash +codex exec "" +``` + +The command automatically: +1. Scans the current working directory for codebase context +2. Sends the question + context to an OpenAI model +3. Returns the answer to stdout + +## When to use codex + +- **Architecture decisions** — "What pattern is this codebase using for state management?" +- **Debugging help** — "Why might the auth middleware be failing silently?" +- **Code understanding** — "Explain the data flow in the payments module" +- **Implementation review** — "What's the best way to add caching to this service?" +- **Second opinion** — When you want to validate your approach before making changes +- **Legacy code** — When encountering unfamiliar patterns or undocumented code +- **User asks directly** — Any time the user says "ask codex" or "check with codex" + +## How to call it + +Run it as a bash command. The question should be specific and self-contained — codex already has codebase context, so focus the question on what you need to know. + +```bash +# Ask about architecture +codex exec "What design patterns are used in the src/services directory?" + +# Ask for debugging help +codex exec "The /api/users endpoint returns 500 when called with a missing email field. What could cause this based on the route handler and validation logic?" + +# Ask for implementation guidance +codex exec "What's the best way to add rate limiting to the existing Express middleware chain?" +``` + +## Reading the response + +The answer comes back as plain text on stdout. Read it, weigh it against your own analysis, and use the best parts. Codex is a consultant, not an authority — if its suggestion conflicts with what you know about the codebase, trust your direct analysis of the code. + +## Tips for good questions + +- Be specific: "Why does UserService.create() call normalize() twice?" beats "What's wrong with user creation?" +- Include symptoms when debugging: mention error messages, failing tests, unexpected behavior +- Scope it: ask about a specific module or file path rather than the entire codebase at once +- One question at a time: don't bundle multiple unrelated questions into a single exec call + +## Requirements + +- The `codex` CLI must be installed and on PATH +- An OpenAI API key must be configured (codex handles this internally) +- Run from within a project directory so codex can scan the codebase diff --git a/.continue/mcpServers/new-mcp-server.yaml b/.continue/mcpServers/new-mcp-server.yaml new file mode 100644 index 0000000..0e32aa6 --- /dev/null +++ b/.continue/mcpServers/new-mcp-server.yaml @@ -0,0 +1,10 @@ +name: New MCP server +version: 0.0.1 +schema: v1 +mcpServers: + - name: New MCP server + command: npx + args: + - -y + - + env: {} diff --git a/.env b/.env new file mode 100644 index 0000000..ffa3be5 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +REMOTION_SERVICE_URL = http://remotion:3001 \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..b89c8b7 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,58 @@ +# CLAUDE.md — Coffee Project Backend + +See also the monorepo-level `../CLAUDE.md` for full architecture overview. + +## Commands + +```bash +uv sync # Install dependencies +uv run uvicorn cpv3.main:app --reload # Dev server (localhost:8000) +uv run pytest # Run all tests +uv run pytest tests/integration/.py # Single test file +uv run alembic revision --autogenerate -m "msg" # Create migration +uv run alembic upgrade head # Apply migrations +uv run dramatiq cpv3.modules.tasks.service # Start background worker +uv run ruff check cpv3/ # Lint +uv run ruff format cpv3/ # Auto-format +docker-compose up # Full stack (DB, Redis, MinIO, API, Worker) +``` + +## Architecture + +Layered module pattern. Each module has exactly 5-6 files (`__init__.py`, `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`). No subdirectories within modules. When in doubt, put logic in `service.py`. + +``` +cpv3/ +├── main.py # FastAPI app, CORS, router registration +├── api/v1/router.py # Aggregates all module routers +├── common/schemas.py # Base Schema class (Pydantic, from_attributes=True) +├── db/ # Base, session, model imports +├── infrastructure/ # Auth, security, settings, storage +└── modules/ # Feature modules (users, projects, media, files, etc.) +``` + +## Key Patterns + +- **Database**: PostgreSQL 16, async SQLAlchemy (`AsyncSession`), all models inherit `Base` + `BaseModelMixin`. +- **Soft deletes**: `is_deleted` flag — repositories filter by default. +- **Auth**: JWT access + refresh tokens in HTTP-only cookies. Use `get_current_user` dependency. +- **Config**: `get_settings()` from `cpv3/infrastructure/settings.py` (cached with `@lru_cache`). +- **Background tasks**: Dramatiq with Redis broker. Actors live in `cpv3/modules/tasks/service.py`. +- **Package manager**: **uv** only — `uv sync`, `uv add `, `uv run `. +- **Linting**: Ruff (line length 100, config in `pyproject.toml`). + +## Common Gotchas + +- **Async sessions**: Never mix sync and async DB operations. Use `await` on all session calls. +- **Module structure**: Do not create extra files or subdirectories. Standard files only: `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`. +- **Schemas**: Always inherit from `cpv3.common.schemas.Schema`, not raw Pydantic `BaseModel`. +- **Imports**: Use absolute paths (`from cpv3.modules.media.schemas import ...`), not relative. +- **Error messages**: Store as module-level constants with `ERROR_` prefix, not inline strings. +- **CPU-bound work**: Use `anyio.to_thread.run_sync()` to avoid blocking the event loop. + +## Docker Services + +``` +postgres → localhost:5332 minio → localhost:9000 (console: 9001) +redis → localhost:6379 api → localhost:8000 (OpenAPI at /api/schema/) +``` diff --git a/CODEX_REFACTOR_PLAN.md b/CODEX_REFACTOR_PLAN.md new file mode 100644 index 0000000..c538c2e --- /dev/null +++ b/CODEX_REFACTOR_PLAN.md @@ -0,0 +1,393 @@ +# CODEX Refactor Plan + +## Chosen direction + +This plan adopts **Option 1: in-place cleanup inside current boundaries**. + +The goal is to make backend feature work safer and the code easier to read **without moving far from the current implementation**. + +This means: + +- keep the existing module structure (`models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`) +- keep public HTTP routes and response contracts stable +- avoid hidden abstractions, base classes, registries, or plugin-style indirection +- prefer explicit, local helpers over framework-like refactors +- optimize first for **developer readability and safer future changes** + +## Why this option + +The biggest current readability and change-risk hotspot is `cpv3/modules/tasks/service.py`. + +Right now it mixes several responsibilities in one place: + +- broker/bootstrap concerns +- Dramatiq actor definitions +- task submission flow +- job lifecycle side effects +- webhook application +- artifact persistence +- notification sending +- cancellation side effects + +The highest concentration of risk is in: + +- `cpv3/modules/tasks/service.py` — especially `TaskService` and `record_webhook_event()` +- `cpv3/modules/jobs/router.py` — cancellation path delegates into task orchestration +- `cpv3/modules/captions/service.py` — already contains both preset CRUD and Remotion client logic + +A bigger ownership move would be possible later, but it is not the safest first step. + +## Primary objectives + +1. Make task/job orchestration easier to understand. +2. Reduce the blast radius of changes in `TaskService`. +3. Replace raw internal JSON/dict handling with typed, explicit internal contracts. +4. Keep behavior stable for current API consumers and existing module boundaries. +5. Improve confidence with focused unit and integration coverage before and during refactoring. + +## Constraints + +### Must keep + +- Current backend module structure. +- Current route layout and public API behavior. +- Current job types and overall task flow. +- Current ownership model at module level. +- Explicit code over clever abstractions. + +### Allowed + +- Small internal contract cleanup. +- Small private helper extraction inside existing files. +- Better naming, smaller methods, and clearer sequencing. +- Add or expand tests. + +### Not allowed in this phase + +- New module subdirectories or extra architecture layers. +- Generic handler registries. +- Base service hierarchies. +- Large movement of lifecycle ownership from `tasks` into `jobs`. +- Pushing more orchestration into `captions`, `media`, or `transcription` before task contracts are cleaned up. +- Broad request/response or frontend-facing contract changes. + +## Refactor scope + +### In scope + +- `cpv3/modules/tasks/service.py` +- `cpv3/modules/tasks/schemas.py` +- `cpv3/modules/tasks/router.py` where needed for clarity only +- `cpv3/modules/jobs/router.py` where needed for clearer delegation only +- tests around task submission, webhook handling, cancellation, and caption task behavior + +### Nearby modules to touch carefully if needed + +- `cpv3/modules/captions/service.py` +- `cpv3/modules/media/service.py` +- `cpv3/modules/transcription/repository.py` +- `cpv3/modules/jobs/service.py` + +These are supporting touches only. The refactor should remain centered in `tasks`. + +## Target end state + +After this phase: + +- `TaskService` still owns task orchestration, +- but each major step is explicit and small, +- stored job payloads are validated through typed internal schemas, +- webhook application is broken into readable stages, +- repeated submission logic is reduced through simple private helpers, +- actor flow is easier to scan, +- tests cover the critical seams that future feature work will depend on. + +`tasks/service.py` may still be large after this phase, but it should stop being a "many responsibilities blurred together" file. + +## Main decisions + +### 1. Keep orchestration in `tasks` for now + +Do **not** move job lifecycle ownership into `jobs/service.py` in this phase. + +Reason: + +- it is a valid long-term direction, +- but it changes semantic ownership of cancellation and webhook application, +- which raises regression risk before the current flow is properly typed and decomposed. + +### 2. Add typed internal payload schemas + +Add explicit internal schemas in `cpv3/modules/tasks/schemas.py` for per-job stored payloads. + +These schemas are for internal readability and validation only. + +They should: + +- represent the current JSON shape stored in `job.input_data` / `job.output_data` +- remain compatible with existing persisted data +- avoid forcing a database migration in this phase +- be explicit per job type rather than generic + +Examples of the kind of models expected: + +- media probe input/output +- silence detect input/output +- silence apply input/output +- media convert input/output +- transcription generate input/output +- frame extract input/output +- captions generate input/output + +Important: the database can still store JSON, but `TaskService` should stop passing raw dicts around when internal meaning matters. + +### 3. Split `record_webhook_event()` into explicit stages + +Refactor `record_webhook_event()` into small private methods with one purpose each. + +Target shape: + +- load and validate job state +- ignore terminal jobs when appropriate +- apply status update +- append job event row +- run job-type-specific done hooks +- send notification +- return updated job + +This keeps behavior in one place while making the sequencing obvious. + +### 4. Reduce duplication only where it is obvious + +Allowed helper extraction: + +- output folder resolution +- broker reference update +- source file lookup/create flow +- artifact persistence helpers +- notification dispatch helper +- common actor start/fail envelope if the resulting code is still explicit + +Do **not** introduce a generic task framework. + +### 5. Keep domain persistence mostly where it is today + +Do not push transcription/media/captions result persistence back into their modules yet. + +Reason: + +- domain ownership may improve long-term, +- but doing it before task contracts are typed risks scattering orchestration across several large files. + +This phase should first make the existing orchestration understandable. + +## Execution phases + +### Phase 0 — Lock behavior with tests + +Before changing the flow, expand tests around the current seams. + +Priority coverage: + +- `tests/unit/test_task_service.py` + - duplicate active job reuse + - terminal job webhook ignore behavior + - webhook event update sequencing + - artifact persistence trigger points + - cancellation path behavior +- `tests/unit/test_caption_tasks.py` + - caption render fallback behavior + - callback confirmation behavior + - cancellation-related safety where relevant +- integration coverage where current behavior is externally visible + - `tests/integration/test_jobs_endpoints.py` + - `tests/integration/test_captions_endpoints.py` if affected + +The goal is not broad test expansion. The goal is to pin the behavior of the exact code being refactored. + +### Phase 1 — Introduce typed internal task payloads + +Add internal schemas in `cpv3/modules/tasks/schemas.py` for job payloads. + +Implementation rules: + +- keep schema names explicit per job type +- support current field names +- make optional fields tolerant enough for existing rows where necessary +- use these schemas at service boundaries where `input_data` and `output_data` are read or produced + +Expected outcome: + +- internal payload meaning becomes visible from types instead of being inferred from raw dict access +- future changes become easier to review safely + +### Phase 2 — Decompose webhook application flow + +Refactor `TaskService.record_webhook_event()` into a sequence of small private methods. + +Recommended decomposition: + +- `_load_job_for_webhook(...)` +- `_should_ignore_webhook(...)` +- `_apply_job_status_update(...)` +- `_create_job_event(...)` +- `_run_done_side_effects(...)` +- `_send_job_notification(...)` + +Method names may differ, but the responsibilities should remain this explicit. + +Important: + +- keep sequencing behavior unchanged +- preserve the current order where artifacts are saved before notifications when that order matters + +### Phase 3 — Normalize submission helpers + +Refactor repetitive submit methods without hiding behavior. + +The submit methods should remain explicit, but repeated steps can be centralized where the code becomes clearer: + +- resolve user output folder +- prepare actor kwargs +- create job + webhook +- enqueue actor +- persist broker reference +- build submit response + +The final submit methods should still read as business flows, not as thin wrappers over a generic registry. + +### Phase 4 — Clean actor envelope duplication + +If still useful after earlier phases, standardize only the repeated envelope around actors: + +- cancellation check +- running event emission +- success/failure reporting + +This should stay simple and local. + +If the helper makes actor behavior harder to follow, do not extract it. + +### Phase 5 — Reassess, do not auto-expand scope + +After the in-place cleanup is done, reassess whether a second phase is justified. + +Only then consider: + +- moving lifecycle semantics into `jobs` +- pushing result persistence into owning modules + +Those are explicitly **not** part of the current plan. + +## File-by-file plan + +### `cpv3/modules/tasks/schemas.py` + +- Add explicit internal payload schemas for job input/output. +- Keep public API request/response schemas stable. +- Prefer additive internal typing over schema churn. + +### `cpv3/modules/tasks/service.py` + +- Keep this file as the orchestration center for this phase. +- Decompose large methods into readable private steps. +- Replace raw internal dict usage with typed parsing/serialization where the meaning matters. +- Reduce repeated code only through local, explicit helpers. + +### `cpv3/modules/tasks/router.py` + +- Keep routes unchanged. +- Only adjust delegation or error handling if required to support clearer internal service boundaries. + +### `cpv3/modules/jobs/router.py` + +- Keep the current cancel entrypoint behavior. +- If touched, only make delegation clearer. +- Do not move cancel semantics into `jobs/service.py` in this phase. + +### `cpv3/modules/captions/service.py` + +- Keep current ownership. +- Do not add more orchestration here in phase 1. +- Only make supporting adjustments if typed payload work needs a clearer boundary. + +### `cpv3/modules/media/service.py` + +- Keep media processing helpers as-is unless task cleanup reveals a very small, obvious extraction need. +- Avoid expanding this file into a second orchestration center. + +## Risk controls + +### Preserve persisted JSON compatibility + +Existing jobs may already contain payloads in the database. + +Therefore: + +- typed internal schemas must support current persisted shapes +- avoid renaming stored keys unless there is a very strong reason +- prefer backward-compatible parsing over cleanup for cleanup’s sake + +### Preserve side-effect ordering + +Current flow saves some artifacts before notification dispatch. + +That ordering matters because clients may refetch immediately after notification. + +Do not change this ordering unless there is a proven reason and test coverage for the new behavior. + +### Avoid ownership churn during cleanup + +This phase is about readability and safer edits, not about redistributing the architecture. + +If a change starts to feel like "this really belongs in another module," mark it for reassessment after phase 1 instead of expanding scope immediately. + +## Verification strategy + +For implementation of this plan, verification should include: + +- targeted unit tests for `TaskService` +- targeted unit tests for caption task actor behavior +- relevant integration tests for jobs/tasks endpoints +- Ruff checks for touched files + +The most important verification is behavioral: + +- task submission still works +- duplicate job reuse still works +- webhook updates still drive correct job state +- artifact persistence still happens before notification where required +- cancellation still behaves as before + +## Definition of done for this phase + +Phase 1 is complete when all of the following are true: + +- internal job payload handling is typed where it matters +- `record_webhook_event()` is decomposed into explicit, readable steps +- repetitive submit flow is reduced without introducing hidden abstractions +- tests protect the main task orchestration seams +- public APIs remain effectively unchanged +- the backend is easier to change safely without rediscovering task flow each time + +## Explicit non-goals + +This plan does **not** include: + +- moving lifecycle ownership into `jobs/service.py` +- redesigning the task system +- introducing a generic abstraction layer for tasks +- moving artifact persistence into every owning domain module +- broad route redesign +- frontend contract changes +- module structure changes + +## Follow-up after this phase + +Once this plan is implemented and stable, reassess whether there is enough remaining pressure to justify a second phase. + +If yes, the next candidate should be: + +- **Option 2** selectively, and only where phase 1 proves that lifecycle ownership is still the main source of friction. + +Until then, keep the refactor intentionally conservative. diff --git a/alembic/versions/d5e6f7a8b9c0_add_caption_presets_table.py b/alembic/versions/d5e6f7a8b9c0_add_caption_presets_table.py new file mode 100644 index 0000000..41a90d3 --- /dev/null +++ b/alembic/versions/d5e6f7a8b9c0_add_caption_presets_table.py @@ -0,0 +1,192 @@ +"""add caption_presets table + +Revision ID: d5e6f7a8b9c0 +Revises: c4d5e6f7a8b9 +Create Date: 2026-03-14 12:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "d5e6f7a8b9c0" +down_revision: Union[str, None] = "c4d5e6f7a8b9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "caption_presets", + sa.Column("user_id", sa.UUID(), nullable=True), + sa.Column("name", sa.String(length=128), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("is_system", sa.Boolean(), nullable=False, server_default=sa.text("false")), + sa.Column("style_config", sa.JSON(), nullable=False), + sa.Column("preview_url", sa.String(length=512), nullable=True), + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_caption_presets_user_id"), + "caption_presets", + ["user_id"], + unique=False, + ) + + # Seed system presets + caption_presets = sa.table( + "caption_presets", + sa.column("id", sa.UUID()), + sa.column("user_id", sa.UUID()), + sa.column("name", sa.String()), + sa.column("description", sa.Text()), + sa.column("is_system", sa.Boolean()), + sa.column("style_config", sa.JSON()), + sa.column("preview_url", sa.String()), + sa.column("is_active", sa.Boolean()), + ) + + op.bulk_insert( + caption_presets, + [ + { + "id": "00000000-0000-4000-a000-000000000001", + "user_id": None, + "name": "Классические", + "description": "Белый текст с тенью, жёлтая подсветка слова", + "is_system": True, + "style_config": { + "text": { + "font_family": "Lobster", + "font_size": 40, + "font_weight": 400, + "text_color": "#FFFFFF", + "highlight_color": "#FFCC00", + "text_shadow": "2px 2px 4px rgba(0,0,0,0.5)", + "text_stroke_width": 0, + "text_stroke_color": "#000000", + }, + "layout": { + "vertical_position": "bottom", + "horizontal_alignment": "center", + "padding_px": 20, + "max_width_pct": 90, + "lines_per_screen": 2, + }, + "animation": { + "highlight_style": "color_scale", + "highlight_scale": 1.1, + "segment_transition": "fade", + "fade_duration_frames": 3, + "animation_speed": 1.0, + }, + "background": { + "bg_color": "rgba(0,0,0,0)", + "bg_blur_px": 0, + "bg_glow_color": None, + "bg_border_radius_px": 0, + "bg_padding_px": 0, + }, + }, + "preview_url": None, + "is_active": True, + }, + { + "id": "00000000-0000-4000-a000-000000000002", + "user_id": None, + "name": "Неон", + "description": "Голубой текст с неоновым свечением, пурпурная подсветка", + "is_system": True, + "style_config": { + "text": { + "font_family": "Courier New", + "font_size": 45, + "font_weight": 400, + "text_color": "#00FFFF", + "highlight_color": "#FF00FF", + "text_shadow": "0 0 5px #0ff, 0 0 10px #0ff, 0 0 20px #0ff", + "text_stroke_width": 0, + "text_stroke_color": "#000000", + }, + "layout": { + "vertical_position": "bottom", + "horizontal_alignment": "center", + "padding_px": 20, + "max_width_pct": 90, + "lines_per_screen": 2, + }, + "animation": { + "highlight_style": "color_scale", + "highlight_scale": 1.2, + "segment_transition": "fade", + "fade_duration_frames": 3, + "animation_speed": 1.0, + }, + "background": { + "bg_color": "rgba(0,0,0,0.6)", + "bg_blur_px": 0, + "bg_glow_color": None, + "bg_border_radius_px": 15, + "bg_padding_px": 20, + }, + }, + "preview_url": None, + "is_active": True, + }, + { + "id": "00000000-0000-4000-a000-000000000003", + "user_id": None, + "name": "Минимализм", + "description": "Маленький белый текст без фона, плавное появление", + "is_system": True, + "style_config": { + "text": { + "font_family": "Inter", + "font_size": 28, + "font_weight": 300, + "text_color": "#FFFFFF", + "highlight_color": "#FFFFFF", + "text_shadow": "1px 1px 2px rgba(0,0,0,0.3)", + "text_stroke_width": 0, + "text_stroke_color": "#000000", + }, + "layout": { + "vertical_position": "bottom", + "horizontal_alignment": "center", + "padding_px": 30, + "max_width_pct": 80, + "lines_per_screen": 1, + }, + "animation": { + "highlight_style": "color", + "highlight_scale": 1.0, + "segment_transition": "fade", + "fade_duration_frames": 5, + "animation_speed": 1.0, + }, + "background": { + "bg_color": "rgba(0,0,0,0)", + "bg_blur_px": 0, + "bg_glow_color": None, + "bg_border_radius_px": 0, + "bg_padding_px": 0, + }, + }, + "preview_url": None, + "is_active": True, + }, + ], + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_caption_presets_user_id"), table_name="caption_presets") + op.drop_table("caption_presets") diff --git a/cpv3/db/models.py b/cpv3/db/models.py index 3bd05a1..7778e56 100644 --- a/cpv3/db/models.py +++ b/cpv3/db/models.py @@ -1,4 +1,5 @@ from cpv3.db.base import Base +from cpv3.modules.captions.models import CaptionPreset from cpv3.modules.jobs.models import Job, JobEvent from cpv3.modules.media.models import ArtifactMediaFile, MediaFile from cpv3.modules.projects.models import Project @@ -10,6 +11,7 @@ from cpv3.modules.webhooks.models import Webhook __all__ = [ "Base", + "CaptionPreset", "User", "Project", "File", diff --git a/cpv3/modules/captions/models.py b/cpv3/modules/captions/models.py new file mode 100644 index 0000000..855fb37 --- /dev/null +++ b/cpv3/modules/captions/models.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import uuid + +from sqlalchemy import Boolean, ForeignKey, JSON, String, Text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from cpv3.db.base import Base, BaseModelMixin + + +class CaptionPreset(Base, BaseModelMixin): + __tablename__ = "caption_presets" + + user_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="CASCADE"), + nullable=True, + index=True, + ) + name: Mapped[str] = mapped_column(String(128)) + description: Mapped[str | None] = mapped_column(Text, nullable=True) + is_system: Mapped[bool] = mapped_column(Boolean, default=False) + style_config: Mapped[dict] = mapped_column(JSON, nullable=False) + preview_url: Mapped[str | None] = mapped_column(String(512), nullable=True) diff --git a/cpv3/modules/captions/repository.py b/cpv3/modules/captions/repository.py new file mode 100644 index 0000000..4a68ae2 --- /dev/null +++ b/cpv3/modules/captions/repository.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import uuid + +from sqlalchemy import Select, or_, select +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.modules.captions.models import CaptionPreset +from cpv3.modules.captions.schemas import CaptionPresetCreate, CaptionPresetUpdate + + +class CaptionPresetRepository: + """Repository for CaptionPreset database operations.""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + + async def list_all_for_user(self, user_id: uuid.UUID) -> list[CaptionPreset]: + """Return system presets + user's own presets.""" + stmt: Select[tuple[CaptionPreset]] = ( + select(CaptionPreset) + .where(CaptionPreset.is_active.is_(True)) + .where( + or_( + CaptionPreset.is_system.is_(True), + CaptionPreset.user_id == user_id, + ) + ) + .order_by(CaptionPreset.is_system.desc(), CaptionPreset.created_at.desc()) + ) + result = await self._session.execute(stmt) + return list(result.scalars().all()) + + async def get_by_id(self, preset_id: uuid.UUID) -> CaptionPreset | None: + result = await self._session.execute( + select(CaptionPreset) + .where(CaptionPreset.id == preset_id) + .where(CaptionPreset.is_active.is_(True)) + ) + return result.scalar_one_or_none() + + async def create( + self, *, user_id: uuid.UUID | None, data: CaptionPresetCreate + ) -> CaptionPreset: + preset = CaptionPreset( + user_id=user_id, + name=data.name, + description=data.description, + is_system=user_id is None, + style_config=data.style_config.model_dump(mode="json"), + ) + self._session.add(preset) + await self._session.commit() + await self._session.refresh(preset) + return preset + + async def update(self, preset: CaptionPreset, data: CaptionPresetUpdate) -> CaptionPreset: + for key, value in data.model_dump(exclude_unset=True).items(): + if value is not None: + if key == "style_config": + setattr(preset, key, value.model_dump(mode="json")) + else: + setattr(preset, key, value) + await self._session.commit() + await self._session.refresh(preset) + return preset + + async def deactivate(self, preset: CaptionPreset) -> None: + preset.is_active = False + await self._session.commit() diff --git a/cpv3/modules/captions/router.py b/cpv3/modules/captions/router.py index 96ee3a6..940f4f7 100644 --- a/cpv3/modules/captions/router.py +++ b/cpv3/modules/captions/router.py @@ -1,15 +1,30 @@ from __future__ import annotations -from fastapi import APIRouter, Depends +import uuid +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession + +from cpv3.db.session import get_db from cpv3.infrastructure.auth import get_current_user -from cpv3.modules.captions.schemas import CaptionsRequest, CaptionsResponse -from cpv3.modules.captions.service import generate_captions +from cpv3.modules.captions.schemas import ( + CaptionPresetCreate, + CaptionPresetRead, + CaptionPresetUpdate, + CaptionsRequest, + CaptionsResponse, +) +from cpv3.modules.captions.service import CaptionPresetService, generate_captions from cpv3.modules.users.models import User router = APIRouter(prefix="/api/captions", tags=["Captions"]) +# --------------------------------------------------------------------------- +# Legacy direct render endpoint +# --------------------------------------------------------------------------- + + @router.post("/get_video/", response_model=CaptionsResponse) async def get_video( body: CaptionsRequest, current_user: User = Depends(get_current_user) @@ -21,3 +36,87 @@ async def get_video( transcription=body.transcription, ) return CaptionsResponse(result=result) + + +# --------------------------------------------------------------------------- +# Preset CRUD +# --------------------------------------------------------------------------- + + +@router.get("/presets/", response_model=list[CaptionPresetRead]) +async def list_presets( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> list[CaptionPresetRead]: + """List system presets + user's own presets.""" + service = CaptionPresetService(db) + presets = await service.list_presets(user_id=current_user.id) + return [CaptionPresetRead.model_validate(p, from_attributes=True) for p in presets] + + +@router.post( + "/presets/", + response_model=CaptionPresetRead, + status_code=status.HTTP_201_CREATED, +) +async def create_preset( + body: CaptionPresetCreate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> CaptionPresetRead: + """Create a user preset.""" + service = CaptionPresetService(db) + preset = await service.create_preset(user_id=current_user.id, data=body) + return CaptionPresetRead.model_validate(preset, from_attributes=True) + + +@router.get("/presets/{preset_id}/", response_model=CaptionPresetRead) +async def get_preset( + preset_id: uuid.UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> CaptionPresetRead: + """Get a single preset.""" + _ = current_user + service = CaptionPresetService(db) + try: + preset = await service.get_preset(preset_id=preset_id) + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) + return CaptionPresetRead.model_validate(preset, from_attributes=True) + + +@router.patch("/presets/{preset_id}/", response_model=CaptionPresetRead) +async def update_preset( + preset_id: uuid.UUID, + body: CaptionPresetUpdate, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> CaptionPresetRead: + """Update a user preset (cannot edit system or others' presets).""" + service = CaptionPresetService(db) + try: + preset = await service.update_preset( + preset_id=preset_id, user_id=current_user.id, data=body + ) + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) + except PermissionError as exc: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) + return CaptionPresetRead.model_validate(preset, from_attributes=True) + + +@router.delete("/presets/{preset_id}/", status_code=status.HTTP_204_NO_CONTENT) +async def delete_preset( + preset_id: uuid.UUID, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +) -> None: + """Delete a user preset (cannot delete system or others' presets).""" + service = CaptionPresetService(db) + try: + await service.delete_preset(preset_id=preset_id, user_id=current_user.id) + except ValueError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) + except PermissionError as exc: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) diff --git a/cpv3/modules/captions/schemas.py b/cpv3/modules/captions/schemas.py index 8fe6012..79ef612 100644 --- a/cpv3/modules/captions/schemas.py +++ b/cpv3/modules/captions/schemas.py @@ -1,9 +1,96 @@ from __future__ import annotations +from datetime import datetime +from typing import Literal +from uuid import UUID + +from pydantic import Field + from cpv3.common.schemas import Schema from cpv3.modules.transcription.schemas import Document +# --------------------------------------------------------------------------- +# Caption style config sub-schemas +# --------------------------------------------------------------------------- + + +class CaptionTextStyle(Schema): + font_family: str = "Lobster" + font_size: int = 40 + font_weight: int = 400 + text_color: str = "#FFFFFF" + highlight_color: str = "#FFCC00" + text_shadow: str | None = "2px 2px 4px rgba(0,0,0,0.5)" + text_stroke_width: float = 0 + text_stroke_color: str = "#000000" + + +class CaptionLayoutStyle(Schema): + vertical_position: Literal["top", "center", "bottom"] = "bottom" + horizontal_alignment: Literal["left", "center", "right"] = "center" + padding_px: int = 20 + max_width_pct: int = 90 + lines_per_screen: int = 2 + + +class CaptionAnimationStyle(Schema): + highlight_style: Literal["color", "scale", "underline", "color_scale"] = "color" + highlight_scale: float = 1.1 + segment_transition: Literal["fade", "slide", "none"] = "fade" + fade_duration_frames: int = 3 + animation_speed: float = 1.0 + + +class CaptionBackgroundStyle(Schema): + bg_color: str = "rgba(0,0,0,0.6)" + bg_blur_px: int = 0 + bg_glow_color: str | None = None + bg_border_radius_px: int = 15 + bg_padding_px: int = 20 + + +class CaptionStyleConfig(Schema): + text: CaptionTextStyle = Field(default_factory=CaptionTextStyle) + layout: CaptionLayoutStyle = Field(default_factory=CaptionLayoutStyle) + animation: CaptionAnimationStyle = Field(default_factory=CaptionAnimationStyle) + background: CaptionBackgroundStyle = Field(default_factory=CaptionBackgroundStyle) + + +# --------------------------------------------------------------------------- +# Preset CRUD schemas +# --------------------------------------------------------------------------- + + +class CaptionPresetCreate(Schema): + name: str = Field(..., max_length=128) + description: str | None = None + style_config: CaptionStyleConfig + + +class CaptionPresetUpdate(Schema): + name: str | None = Field(default=None, max_length=128) + description: str | None = None + style_config: CaptionStyleConfig | None = None + + +class CaptionPresetRead(Schema): + id: UUID + user_id: UUID | None + name: str + description: str | None + is_system: bool + style_config: CaptionStyleConfig + preview_url: str | None + created_at: datetime + updated_at: datetime + + +# --------------------------------------------------------------------------- +# Existing request/response schemas +# --------------------------------------------------------------------------- + + class CaptionsRequest(Schema): folder: str video_s3_path: str diff --git a/cpv3/modules/captions/service.py b/cpv3/modules/captions/service.py index 2824d80..703040a 100644 --- a/cpv3/modules/captions/service.py +++ b/cpv3/modules/captions/service.py @@ -1,31 +1,129 @@ from __future__ import annotations +import uuid + import httpx +from sqlalchemy.ext.asyncio import AsyncSession from cpv3.infrastructure.settings import get_settings +from cpv3.modules.captions.models import CaptionPreset +from cpv3.modules.captions.repository import CaptionPresetRepository +from cpv3.modules.captions.schemas import ( + CaptionPresetCreate, + CaptionPresetUpdate, + CaptionStyleConfig, +) from cpv3.modules.transcription.schemas import Document +ERROR_PRESET_NOT_FOUND = "Пресет субтитров не найден" +ERROR_PRESET_FORBIDDEN = "Нельзя изменять чужой или системный пресет" + + +# --------------------------------------------------------------------------- +# Caption rendering (calls Remotion service) +# --------------------------------------------------------------------------- + async def generate_captions( - *, video_s3_path: str, folder: str, transcription: Document + *, + video_s3_path: str, + folder: str, + transcription: Document, + style_config: dict | None = None, + callback_url: str | None = None, + render_id: str | None = None, ) -> str: - """Generate captions for a video using the Remotion service.""" + """Generate captions for a video using the Remotion service. + + Returns render_id (async mode with callback_url) or S3 output path (sync fallback). + """ settings = get_settings() - payload = { + payload: dict = { "folder": folder, "videoSrc": video_s3_path, "transcription": transcription.model_dump(), } + if style_config is not None: + payload["styleConfig"] = style_config + if callback_url is not None: + payload["callbackUrl"] = callback_url + if render_id is not None: + payload["renderId"] = render_id - async with httpx.AsyncClient(timeout=300) as client: - resp = await client.post( - f"{settings.remotion_service_url}/api/render", json=payload - ) + timeout = 30.0 if callback_url else 300.0 + + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.post(f"{settings.remotion_service_url}/api/render", json=payload) resp.raise_for_status() data = resp.json() - if not isinstance(data, dict) or "output" not in data: - raise RuntimeError("Unexpected response from remotion service") + # Async mode: Remotion returns { renderId, status: "queued" } + if callback_url and "renderId" in data: + return str(data["renderId"]) - return str(data["output"]) + # Sync fallback: Remotion returns { output: "s3/path" } + if isinstance(data, dict) and "output" in data: + return str(data["output"]) + + raise RuntimeError("Unexpected response from remotion service") + + +# --------------------------------------------------------------------------- +# Preset service +# --------------------------------------------------------------------------- + + +class CaptionPresetService: + """Business logic for caption preset CRUD with ownership checks.""" + + def __init__(self, session: AsyncSession) -> None: + self._repo = CaptionPresetRepository(session) + + async def list_presets(self, *, user_id: uuid.UUID) -> list[CaptionPreset]: + return await self._repo.list_all_for_user(user_id) + + async def get_preset(self, *, preset_id: uuid.UUID) -> CaptionPreset: + preset = await self._repo.get_by_id(preset_id) + if preset is None: + raise ValueError(ERROR_PRESET_NOT_FOUND) + return preset + + async def create_preset( + self, *, user_id: uuid.UUID, data: CaptionPresetCreate + ) -> CaptionPreset: + return await self._repo.create(user_id=user_id, data=data) + + async def update_preset( + self, + *, + preset_id: uuid.UUID, + user_id: uuid.UUID, + data: CaptionPresetUpdate, + ) -> CaptionPreset: + preset = await self.get_preset(preset_id=preset_id) + if preset.is_system or preset.user_id != user_id: + raise PermissionError(ERROR_PRESET_FORBIDDEN) + return await self._repo.update(preset, data) + + async def delete_preset(self, *, preset_id: uuid.UUID, user_id: uuid.UUID) -> None: + preset = await self.get_preset(preset_id=preset_id) + if preset.is_system or preset.user_id != user_id: + raise PermissionError(ERROR_PRESET_FORBIDDEN) + await self._repo.deactivate(preset) + + async def resolve_style_config( + self, + *, + preset_id: uuid.UUID | None = None, + inline_config: dict | None = None, + ) -> dict | None: + """Resolve a style config from preset_id or inline override.""" + if inline_config is not None: + # Validate by parsing, then return as dict + cfg = CaptionStyleConfig.model_validate(inline_config) + return cfg.model_dump(mode="json") + if preset_id is not None: + preset = await self.get_preset(preset_id=preset_id) + return preset.style_config + return None diff --git a/cpv3/modules/jobs/repository.py b/cpv3/modules/jobs/repository.py index ce6002c..1cc7d90 100644 --- a/cpv3/modules/jobs/repository.py +++ b/cpv3/modules/jobs/repository.py @@ -35,6 +35,31 @@ class JobRepository: ) return result.scalar_one_or_none() + async def list_active_by_type( + self, + *, + requester: User, + job_type: str, + project_id: uuid.UUID | None, + statuses: tuple[str, ...], + ) -> list[Job]: + stmt: Select[tuple[Job]] = ( + select(Job) + .where(Job.is_active.is_(True)) + .where(Job.user_id == requester.id) + .where(Job.job_type == job_type) + .where(Job.status.in_(statuses)) + .order_by(Job.created_at.desc()) + ) + + if project_id is None: + stmt = stmt.where(Job.project_id.is_(None)) + else: + stmt = stmt.where(Job.project_id == project_id) + + result = await self._session.execute(stmt) + return list(result.scalars().all()) + async def create(self, *, requester: User, data: JobCreate) -> Job: job = Job( user_id=requester.id, diff --git a/cpv3/modules/jobs/router.py b/cpv3/modules/jobs/router.py index 0d48dcd..4b6a80a 100644 --- a/cpv3/modules/jobs/router.py +++ b/cpv3/modules/jobs/router.py @@ -16,6 +16,7 @@ from cpv3.modules.jobs.schemas import ( JobUpdate, ) from cpv3.modules.jobs.service import JobService +from cpv3.modules.tasks.service import TaskService from cpv3.modules.users.models import User jobs_router = APIRouter(prefix="/api/jobs", tags=["jobs"]) @@ -75,6 +76,11 @@ async def patch_job_endpoint( if not current_user.is_staff and job.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden") + if body.status == "CANCELLED": + task_service = TaskService(db) + job = await task_service.cancel_job(job) + return JobRead.model_validate(job) + job = await service.update_job(job, body) return JobRead.model_validate(job) diff --git a/cpv3/modules/media/service.py b/cpv3/modules/media/service.py index 3000098..4b0279e 100644 --- a/cpv3/modules/media/service.py +++ b/cpv3/modules/media/service.py @@ -339,7 +339,7 @@ async def convert_to_mp4( try: filename_without_ext = path.splitext(path.basename(file_key))[0] - mp4_filename = filename_without_ext + ".mp4" + mp4_filename = f"Конвертированое видео {filename_without_ext}.mp4" with NamedTemporaryFile(suffix=".mp4", delete=False) as out: out_path = out.name diff --git a/cpv3/modules/tasks/schemas.py b/cpv3/modules/tasks/schemas.py index c5abe21..c57d0a9 100644 --- a/cpv3/modules/tasks/schemas.py +++ b/cpv3/modules/tasks/schemas.py @@ -37,9 +37,7 @@ class SilenceRemoveRequest(Schema): min_silence_duration_ms: int = Field( default=200, description="Minimum silence duration in milliseconds" ) - silence_threshold_db: int = Field( - default=16, description="Silence threshold in decibels" - ) + silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels") padding_ms: int = Field( default=100, description="Padding around non-silent segments in milliseconds" ) @@ -53,9 +51,7 @@ class SilenceDetectRequest(Schema): min_silence_duration_ms: int = Field( default=200, description="Minimum silence duration in milliseconds" ) - silence_threshold_db: int = Field( - default=16, description="Silence threshold in decibels" - ) + silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels") padding_ms: int = Field( default=100, description="Padding around non-silent segments in milliseconds" ) @@ -67,9 +63,7 @@ class SilenceApplyRequest(Schema): file_key: str = Field(..., description="Storage key of the input file") out_folder: str = Field(..., description="Output folder for processed file") project_id: UUID | None = Field(default=None, description="Associated project ID") - output_name: str | None = Field( - default=None, description="Display name for the output file" - ) + output_name: str | None = Field(default=None, description="Display name for the output file") cuts: list[dict] = Field( ..., description="Cut regions: [{'start_ms': int, 'end_ms': int}, ...]" ) @@ -103,6 +97,12 @@ class CaptionsGenerateRequest(Schema): folder: str = Field(..., description="Output folder for rendered video") transcription_id: UUID = Field(..., description="ID of the transcription to use") project_id: UUID | None = Field(default=None, description="Associated project ID") + preset_id: UUID | None = Field( + default=None, description="Caption style preset ID (mutually exclusive with style_config)" + ) + style_config: dict | None = Field( + default=None, description="Inline caption style config (overrides preset_id)" + ) class FrameExtractRequest(Schema): @@ -110,9 +110,7 @@ class FrameExtractRequest(Schema): file_key: str = Field(..., description="S3 key of the video file") project_id: UUID | None = Field(default=None, description="Associated project ID") - regenerate: bool = Field( - default=False, description="Delete existing frames and re-extract" - ) + regenerate: bool = Field(default=False, description="Delete existing frames and re-extract") # --- Response schemas --- diff --git a/cpv3/modules/tasks/service.py b/cpv3/modules/tasks/service.py index cb2bb08..6f753dc 100644 --- a/cpv3/modules/tasks/service.py +++ b/cpv3/modules/tasks/service.py @@ -10,17 +10,19 @@ import json import logging import time import uuid -from pathlib import Path from datetime import datetime, timezone +from pathlib import Path from typing import Any import dramatiq # type: ignore[import-untyped] -from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] import httpx +from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] from sqlalchemy.ext.asyncio import AsyncSession + from cpv3.infrastructure.deps import _get_storage_service from cpv3.infrastructure.settings import get_settings +from cpv3.infrastructure.storage.utils import get_user_folder from cpv3.modules.files.repository import FileRepository from cpv3.modules.files.schemas import FileCreate from cpv3.modules.jobs.models import Job @@ -46,7 +48,6 @@ from cpv3.modules.tasks.schemas import ( TaskWebhookEvent, TranscriptionGenerateRequest, ) -from cpv3.infrastructure.storage.utils import get_user_folder from cpv3.modules.notifications.service import NotificationService from cpv3.modules.transcription.repository import TranscriptionRepository from cpv3.modules.transcription.schemas import TranscriptionCreate @@ -61,6 +62,7 @@ JOB_STATUS_PENDING: JobStatusEnum = "PENDING" JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING" JOB_STATUS_DONE: JobStatusEnum = "DONE" JOB_STATUS_FAILED: JobStatusEnum = "FAILED" +JOB_STATUS_CANCELLED: JobStatusEnum = "CANCELLED" JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE" JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE" @@ -94,6 +96,7 @@ MESSAGE_PROBING_MEDIA = "Probing media" MESSAGE_PROCESSING = "Processing" MESSAGE_CONVERTING = "Converting" MESSAGE_RENDERING_CAPTIONS = "Rendering captions" +MESSAGE_CANCELLED = "Отменено пользователем" MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров" MESSAGE_UPLOADING_FRAMES = "Загрузка кадров" MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров" @@ -116,6 +119,13 @@ MESSAGE_APPLYING_CUTS = "Применение вырезок" PROGRESS_THROTTLE_SECONDS = 3.0 +ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING) +DRAMATIQ_BROKER_REF_SEPARATOR = ":" + + +class JobCancelledError(RuntimeError): + """Raised when a job was cancelled before completion.""" + # --------------------------------------------------------------------------- # Dramatiq broker setup # --------------------------------------------------------------------------- @@ -163,9 +173,7 @@ def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None: """Send a task webhook event to the API.""" payload = event.model_dump(mode="json", exclude_none=True) try: - response = httpx.post( - webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS - ) + response = httpx.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS) response.raise_for_status() except Exception: logger.exception("Failed to send task webhook event to %s", webhook_url) @@ -197,17 +205,69 @@ def _run_async(coro: Any) -> Any: loop.close() +def _serialize_broker_reference(queue_name: str, redis_message_id: str) -> str: + """Serialize queue name and Dramatiq redis message id into one field.""" + return f"{queue_name}{DRAMATIQ_BROKER_REF_SEPARATOR}{redis_message_id}" + + +def _parse_broker_reference(broker_id: str | None) -> tuple[str, str] | None: + """Parse queue name and Dramatiq redis message id from stored broker_id.""" + if not broker_id or DRAMATIQ_BROKER_REF_SEPARATOR not in broker_id: + return None + + queue_name, redis_message_id = broker_id.split(DRAMATIQ_BROKER_REF_SEPARATOR, 1) + if not queue_name or not redis_message_id: + return None + return queue_name, redis_message_id + + +def _get_job_status_sync(job_id: uuid.UUID) -> JobStatusEnum | None: + """Read current job status using a sync connection (safe for Dramatiq workers).""" + import psycopg2 + + settings = get_settings() + dsn = ( + f"host={settings.postgres_host} port={settings.postgres_port} " + f"dbname={settings.postgres_database} " + f"user={settings.postgres_user} password={settings.postgres_password}" + ) + try: + conn = psycopg2.connect(dsn) + try: + with conn.cursor() as cur: + cur.execute("SELECT status FROM jobs WHERE id = %s", (str(job_id),)) + row = cur.fetchone() + return row[0] if row else None + finally: + conn.close() + except Exception: + logger.warning("Failed to check job status for %s", job_id, exc_info=True) + return None + + +def _raise_if_job_cancelled(job_id: uuid.UUID) -> None: + """Stop worker execution when the job is already cancelled in the database.""" + status = _get_job_status_sync(job_id) + if status == JOB_STATUS_CANCELLED: + raise JobCancelledError(MESSAGE_CANCELLED) + + # --------------------------------------------------------------------------- # Dramatiq actors # --------------------------------------------------------------------------- -@dramatiq.actor(max_retries=3, min_backoff=1000) +@dramatiq.actor(max_retries=0) def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: """Probe media file to extract metadata.""" from cpv3.modules.media.service import probe_media job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("media_probe_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -237,6 +297,9 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("media_probe_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("media_probe_actor failed: %s", job_uuid) _send_webhook_event( @@ -247,10 +310,9 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=3, min_backoff=1000) +@dramatiq.actor(max_retries=0) def silence_remove_actor( job_id: str, webhook_url: str, @@ -264,6 +326,11 @@ def silence_remove_actor( from cpv3.modules.media.service import remove_silence job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("silence_remove_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -306,6 +373,9 @@ def silence_remove_actor( finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("silence_remove_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("silence_remove_actor failed: %s", job_uuid) _send_webhook_event( @@ -316,10 +386,9 @@ def silence_remove_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=3, min_backoff=1000) +@dramatiq.actor(max_retries=0) def silence_detect_actor( job_id: str, webhook_url: str, @@ -332,6 +401,11 @@ def silence_detect_actor( from cpv3.modules.media.service import detect_silence job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("silence_detect_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -369,6 +443,9 @@ def silence_detect_actor( finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("silence_detect_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("silence_detect_actor failed: %s", job_uuid) _send_webhook_event( @@ -379,10 +456,9 @@ def silence_detect_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=3, min_backoff=1000) +@dramatiq.actor(max_retries=0) def silence_apply_actor( job_id: str, webhook_url: str, @@ -395,6 +471,11 @@ def silence_apply_actor( from cpv3.modules.media.service import apply_silence_cuts job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("silence_apply_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -436,6 +517,9 @@ def silence_apply_actor( finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("silence_apply_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("silence_apply_actor failed: %s", job_uuid) _send_webhook_event( @@ -446,10 +530,9 @@ def silence_apply_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=3, min_backoff=1000) +@dramatiq.actor(max_retries=0) def media_convert_actor( job_id: str, webhook_url: str, @@ -461,6 +544,11 @@ def media_convert_actor( from cpv3.modules.media.service import convert_to_mp4 job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("media_convert_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -482,9 +570,7 @@ def media_convert_actor( progress_pct=PROGRESS_MEDIA_CONVERT, ), ) - result = _run_async( - convert_to_mp4(storage, file_key=file_key, out_folder=out_folder) - ) + result = _run_async(convert_to_mp4(storage, file_key=file_key, out_folder=out_folder)) _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -499,6 +585,9 @@ def media_convert_actor( finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("media_convert_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("media_convert_actor failed: %s", job_uuid) _send_webhook_event( @@ -509,10 +598,9 @@ def media_convert_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=2, min_backoff=2000) +@dramatiq.actor(max_retries=0) def transcription_generate_actor( job_id: str, webhook_url: str, @@ -528,6 +616,11 @@ def transcription_generate_actor( ) job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("transcription_generate_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -548,11 +641,15 @@ def transcription_generate_actor( raise ValueError(ERROR_NO_AUDIO_STREAM) # Extract probe metadata for artifact creation - duration_seconds = float(probe.format.duration) if probe.format and probe.format.duration else 0.0 + duration_seconds = ( + float(probe.format.duration) if probe.format and probe.format.duration else 0.0 + ) video_stream = next((s for s in probe.streams if s.codec_type == "video"), None) probe_meta = { "duration_seconds": duration_seconds, - "frame_rate": _parse_frame_rate(video_stream.r_frame_rate) if video_stream and video_stream.r_frame_rate else None, + "frame_rate": _parse_frame_rate(video_stream.r_frame_rate) + if video_stream and video_stream.r_frame_rate + else None, "width": video_stream.width if video_stream else None, "height": video_stream.height if video_stream else None, } @@ -573,9 +670,9 @@ def transcription_generate_actor( if now - last_report_time < PROGRESS_THROTTLE_SECONDS: return last_report_time = now - mapped = PROGRESS_TRANSCRIPTION_START + ( - pct / 100.0 - ) * (PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START) + mapped = PROGRESS_TRANSCRIPTION_START + (pct / 100.0) * ( + PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START + ) _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -617,18 +714,9 @@ def transcription_generate_actor( finished_at=_utc_now(), ), ) - except (ValueError, RuntimeError) as exc: - logger.exception( - "transcription_generate_actor failed (non-transient): %s", job_uuid - ) - _send_webhook_event( - webhook_url, - TaskWebhookEvent( - status=JOB_STATUS_FAILED, - error_message=str(exc), - finished_at=_utc_now(), - ), - ) + except JobCancelledError: + logger.info("transcription_generate_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("transcription_generate_actor failed: %s", job_uuid) _send_webhook_event( @@ -639,22 +727,31 @@ def transcription_generate_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=2, min_backoff=2000) +RENDER_POLL_INTERVAL_SECONDS = 5 +RENDER_POLL_TIMEOUT_SECONDS = 600 + + +@dramatiq.actor(max_retries=0) def captions_generate_actor( job_id: str, webhook_url: str, video_s3_path: str, folder: str, transcription_json: dict, + style_config: dict | None = None, ) -> None: - """Generate captions on video.""" + """Generate captions on video (async via Remotion + BullMQ).""" from cpv3.modules.captions.service import generate_captions from cpv3.modules.transcription.schemas import Document job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("captions_generate_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -673,21 +770,96 @@ def captions_generate_actor( ), ) document = Document.model_validate(transcription_json) - output_path = _run_async( + + # Call Remotion with callback_url so it sends progress webhooks directly + render_id = _run_async( generate_captions( - video_s3_path=video_s3_path, folder=folder, transcription=document + video_s3_path=video_s3_path, + folder=folder, + transcription=document, + style_config=style_config, + callback_url=webhook_url, + render_id=job_id, ) ) + _raise_if_job_cancelled(job_uuid) _send_webhook_event( webhook_url, TaskWebhookEvent( - status=JOB_STATUS_DONE, - current_message=MESSAGE_COMPLETED, - progress_pct=PROGRESS_COMPLETE, - output_data={"output_path": output_path}, - finished_at=_utc_now(), + current_message=MESSAGE_RENDERING_CAPTIONS, + output_data={"render_id": render_id}, ), ) + + # Polling fallback: wait for Remotion to finish rendering + # Primary progress is delivered via Remotion → webhook directly + settings = get_settings() + elapsed = 0.0 + last_polled_status: str | None = None + while elapsed < RENDER_POLL_TIMEOUT_SECONDS: + _raise_if_job_cancelled(job_uuid) + time.sleep(RENDER_POLL_INTERVAL_SECONDS) + elapsed += RENDER_POLL_INTERVAL_SECONDS + + try: + resp = httpx.get( + f"{settings.remotion_service_url}/api/render/{render_id}", + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + except Exception: + logger.warning("Render poll failed for %s, retrying...", render_id) + continue + + status = data.get("status") + if status != last_polled_status: + logger.info( + "Remotion render %s status=%s progress=%s callback_delivered=%s", + render_id, + status, + data.get("progress_pct"), + data.get("callback_delivered"), + ) + last_polled_status = status + + if status == "done": + if data.get("callback_delivered") is True: + # Remotion already sent DONE webhook — exit cleanly + return + + output_path = data.get("output_path") + if not output_path: + raise RuntimeError( + "Remotion render finished without output_path in polling response" + ) + + logger.warning( + "Remotion render %s finished without confirmed DONE webhook, sending fallback completion", + render_id, + ) + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_DONE, + progress_pct=PROGRESS_COMPLETE, + current_message="Готово", + output_data={"output_path": str(output_path)}, + finished_at=_utc_now(), + ), + ) + return + if status == "failed": + error = data.get("error", "Render failed") + raise RuntimeError(f"Remotion render failed: {error}") + + raise TimeoutError( + f"Render {render_id} did not complete within {RENDER_POLL_TIMEOUT_SECONDS}s" + ) + + except JobCancelledError: + logger.info("captions_generate_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("captions_generate_actor failed: %s", job_uuid) _send_webhook_event( @@ -698,10 +870,9 @@ def captions_generate_actor( finished_at=_utc_now(), ), ) - raise -@dramatiq.actor(max_retries=2, min_backoff=2000) +@dramatiq.actor(max_retries=0) def frame_extract_actor( job_id: str, webhook_url: str, @@ -717,6 +888,11 @@ def frame_extract_actor( ) job_uuid = uuid.UUID(job_id) + try: + _raise_if_job_cancelled(job_uuid) + except JobCancelledError: + logger.info("frame_extract_actor cancelled: %s", job_uuid) + return _send_webhook_event( webhook_url, TaskWebhookEvent( @@ -738,9 +914,7 @@ def frame_extract_actor( progress_pct=PROGRESS_FRAME_EXTRACT_START, ), ) - old_meta = _run_async( - read_frames_metadata(storage, frames_folder=frames_folder) - ) + old_meta = _run_async(read_frames_metadata(storage, frames_folder=frames_folder)) if old_meta is not None: _run_async( delete_frames( @@ -797,6 +971,9 @@ def frame_extract_actor( finished_at=_utc_now(), ), ) + except JobCancelledError: + logger.info("frame_extract_actor cancelled: %s", job_uuid) + return except Exception as exc: logger.exception("frame_extract_actor failed: %s", job_uuid) _send_webhook_event( @@ -807,7 +984,6 @@ def frame_extract_actor( finished_at=_utc_now(), ), ) - raise # --------------------------------------------------------------------------- @@ -824,6 +1000,97 @@ class TaskService: self._event_repo = JobEventRepository(session) self._webhook_repo = WebhookRepository(session) + async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job: + """Persist the transport-specific broker reference after enqueueing.""" + job.broker_id = broker_reference + await self._session.commit() + await self._session.refresh(job) + return job + + async def _find_duplicate_active_job( + self, + *, + requester: User, + job_type: JobTypeEnum, + project_id: uuid.UUID | None, + input_data: dict, + ) -> Job | None: + """Reuse an already running job for the same request payload.""" + jobs = await self._job_repo.list_active_by_type( + requester=requester, + job_type=job_type, + project_id=project_id, + statuses=ACTIVE_JOB_STATUSES, + ) + + for job in jobs: + if job.input_data == input_data: + return job + + return None + + async def _create_cancellation_notification(self, job: Job) -> None: + """Emit a single cancellation notification to the current user.""" + if job.user_id is None: + return + + notification_service = NotificationService(self._session) + await notification_service.create_task_notification( + user_id=job.user_id, + job=job, + event=TaskWebhookEvent( + status=JOB_STATUS_CANCELLED, + current_message=MESSAGE_CANCELLED, + finished_at=job.finished_at, + ), + ) + + def _cancel_dramatiq_message_sync(self, broker_id: str | None) -> None: + """Remove a queued Dramatiq message from Redis when possible.""" + broker_reference = _parse_broker_reference(broker_id) + if broker_reference is None: + return + + queue_name, redis_message_id = broker_reference + namespace = _redis_broker.namespace + queue_key = f"{namespace}:{queue_name}" + queue_messages_key = f"{queue_key}.msgs" + delayed_queue_key = f"{queue_key}.DQ" + delayed_messages_key = f"{delayed_queue_key}.msgs" + acks_pattern = f"{namespace}:__acks__.*.{queue_name}" + + pipeline = _redis_broker.client.pipeline() + pipeline.lrem(queue_key, 1, redis_message_id) + pipeline.hdel(queue_messages_key, redis_message_id) + pipeline.lrem(delayed_queue_key, 1, redis_message_id) + pipeline.hdel(delayed_messages_key, redis_message_id) + + for key in _redis_broker.client.scan_iter(match=acks_pattern): + pipeline.srem(key, redis_message_id) + + pipeline.execute() + + async def _cancel_dramatiq_message(self, broker_id: str | None) -> None: + """Run Redis queue cleanup for a Dramatiq message off the event loop.""" + await asyncio.to_thread(self._cancel_dramatiq_message_sync, broker_id) + + async def _cancel_caption_render(self, job: Job) -> None: + """Cancel the downstream Remotion render when it has already been queued.""" + if job.job_type != JOB_TYPE_CAPTIONS_GENERATE: + return + + output_data = job.output_data or {} + render_id = output_data.get("render_id") or str(job.id) + + settings = get_settings() + async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT_SECONDS) as client: + response = await client.delete( + f"{settings.remotion_service_url}/api/render/{render_id}" + ) + if response.status_code == 404: + return + response.raise_for_status() + async def _create_job_and_webhook( self, *, @@ -873,21 +1140,31 @@ class TaskService: project_id=project_id, input_data=input_data, ) - actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs) + message = actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs) + redis_message_id = message.options.get("redis_message_id") + if redis_message_id: + broker_reference = _serialize_broker_reference( + message.queue_name, + str(redis_message_id), + ) + await self._update_job_broker_reference(job, broker_reference) + return TaskSubmitResponse( job_id=job.id, webhook_url=webhook_url, status=JOB_STATUS_PENDING, ) - async def record_webhook_event( - self, *, job_id: uuid.UUID, event: TaskWebhookEvent - ) -> Job: + async def record_webhook_event(self, *, job_id: uuid.UUID, event: TaskWebhookEvent) -> Job: """Apply a webhook event to the job and store a job event record.""" job = await self._job_repo.get_by_id(job_id) if job is None: raise ValueError(f"Job {job_id} not found") + if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED): + logger.info("Ignoring webhook for terminal job %s (status=%s)", job_id, job.status) + return job + job_update = JobUpdate( status=event.status, project_pct=event.progress_pct, @@ -906,24 +1183,23 @@ class TaskService: ) # Save artifacts BEFORE sending notifications so data exists when frontend refetches - if ( - job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE - and event.status == JOB_STATUS_DONE - ): + if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE: try: await self._save_transcription_artifacts(job) except Exception: - logger.exception( - "Failed to save transcription artifacts for job %s", job_id - ) + logger.exception("Failed to save transcription artifacts for job %s", job_id) if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE: try: await self._save_convert_artifacts(job) except Exception: - logger.exception( - "Failed to save convert artifacts for job %s", job_id - ) + logger.exception("Failed to save convert artifacts for job %s", job_id) + + if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE: + try: + await self._save_captions_artifacts(job) + except Exception: + logger.exception("Failed to save captions artifacts for job %s", job_id) # Push real-time notification via WebSocket (after artifacts are persisted) if job.user_id is not None: @@ -937,6 +1213,50 @@ class TaskService: return job + async def cancel_job(self, job: Job) -> Job: + """Cancel a job, clean queued transport state and ignore late webhooks.""" + if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED): + return job + + try: + await self._cancel_dramatiq_message(job.broker_id) + except Exception: + logger.exception("Failed to cancel Dramatiq message for job %s", job.id) + + try: + await self._cancel_caption_render(job) + except Exception: + logger.exception("Failed to cancel caption render for job %s", job.id) + + finished_at = _utc_now() + job = await self._job_repo.update( + job, + JobUpdate( + status=JOB_STATUS_CANCELLED, + current_message=MESSAGE_CANCELLED, + finished_at=finished_at, + ), + ) + + await self._event_repo.create( + JobEventCreate( + job_id=job.id, + event_type=f"{EVENT_TYPE_STATUS_PREFIX}{JOB_STATUS_CANCELLED}", + payload={ + "status": JOB_STATUS_CANCELLED, + "current_message": MESSAGE_CANCELLED, + "finished_at": finished_at.isoformat(), + }, + ) + ) + + try: + await self._create_cancellation_notification(job) + except Exception: + logger.exception("Failed to create cancellation notification for job %s", job.id) + + return job + async def _save_transcription_artifacts(self, job: Job) -> None: """Create Transcription, ArtifactMediaFile and File records.""" input_data = job.input_data or {} @@ -980,9 +1300,9 @@ class TaskService: user_folder = get_user_folder(user) json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8") - # Build display name: "Транскрипция .json" + # Build display name: "Транскрибация .json" video_stem = Path(source_file.original_filename).stem - transcription_filename = f"Транскрипция {video_stem}.json" + transcription_filename = f"Транскрибация {video_stem}.json" artifact_key = await storage.upload_fileobj( fileobj=io.BytesIO(json_bytes), @@ -1061,7 +1381,7 @@ class TaskService: stem = Path(source_file.original_filename).stem else: stem = Path(file_key).stem - converted_filename = f"{stem}.mp4" + converted_filename = f"Конвертированое видео {stem}.mp4" # Create File record for the converted MP4 (no project_id — only reachable via artifact) converted_file = await file_repo.create( @@ -1091,6 +1411,73 @@ class TaskService: logger.info("Saved convert artifacts for job %s", job.id) + async def _save_captions_artifacts(self, job: Job) -> None: + """Create File and ArtifactMediaFile records for captioned video.""" + input_data = job.input_data or {} + output_data = job.output_data or {} + + video_s3_path: str = input_data["video_s3_path"] + project_id: uuid.UUID | None = ( + uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None + ) + + output_path: str = output_data["output_path"] + + # Resolve user + user_repo = UserRepository(self._session) + user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type] + if user is None: + logger.warning("User %s not found, skipping captions artifact save", job.user_id) + return + + # Get file size from S3 + storage = _get_storage_service() + file_size = await storage.size(output_path) + + # Derive output filename from source video + file_repo = FileRepository(self._session) + source_file = await file_repo.get_by_path(video_s3_path) + if source_file is not None: + stem = Path(source_file.original_filename).stem + else: + stem = Path(video_s3_path).stem + captioned_filename = f"Видео с субтитрами {stem}.mp4" + + # Create File record + captioned_file = await file_repo.create( + requester=user, + data=FileCreate( + project_id=project_id, + original_filename=captioned_filename, + path=output_path, + storage_backend="S3", + mime_type="video/mp4", + size_bytes=file_size, + file_format="mp4", + is_uploaded=True, + ), + ) + + # Create ArtifactMediaFile record + artifact_repo = ArtifactRepository(self._session) + await artifact_repo.create( + data=ArtifactMediaFileCreate( + project_id=project_id, + file_id=captioned_file.id, + media_file_id=None, + artifact_type="RENDERED_VIDEO", + ), + ) + + # Update job output_data with file_id so frontend can reference it + updated_output = dict(output_data) + updated_output["file_id"] = str(captioned_file.id) + job = await self._job_repo.update( + job, JobUpdate(output_data=updated_output) + ) + + logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id) + async def submit_media_probe( self, *, requester: User, request: MediaProbeRequest ) -> TaskSubmitResponse: @@ -1238,6 +1625,22 @@ class TaskService: self, *, requester: User, request: CaptionsGenerateRequest ) -> TaskSubmitResponse: """Submit captions generation task.""" + from cpv3.modules.captions.service import CaptionPresetService + + input_data = request.model_dump(mode="json") + existing_job = await self._find_duplicate_active_job( + requester=requester, + job_type=JOB_TYPE_CAPTIONS_GENERATE, + project_id=request.project_id, + input_data=input_data, + ) + if existing_job is not None: + return TaskSubmitResponse( + job_id=existing_job.id, + webhook_url=_build_webhook_url(existing_job.id), + status=existing_job.status, + ) + transcription_repo = TranscriptionRepository(self._session) transcription = await transcription_repo.get_by_id(request.transcription_id) if transcription is None: @@ -1245,20 +1648,26 @@ class TaskService: user_folder = get_user_folder(requester) resolved_folder = ( - f"{user_folder}/{request.folder}" - if request.folder - else f"{user_folder}/output_files" + f"{user_folder}/{request.folder}" if request.folder else f"{user_folder}/output_files" + ) + + # Resolve style config from preset or inline override + preset_service = CaptionPresetService(self._session) + style_config = await preset_service.resolve_style_config( + preset_id=request.preset_id, + inline_config=request.style_config, ) return await self._submit_task( requester=requester, job_type=JOB_TYPE_CAPTIONS_GENERATE, project_id=request.project_id, - input_data=request.model_dump(mode="json"), + input_data=input_data, actor=captions_generate_actor, actor_kwargs={ "video_s3_path": request.video_s3_path, "folder": resolved_folder, "transcription_json": transcription.document, + "style_config": style_config, }, ) diff --git a/docker-compose.yml b/docker-compose.yml index 9d01eec..97228b3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,7 +72,7 @@ services: REDIS_URL: redis://redis:6379/0 WEBHOOK_BASE_URL: http://api:8000 - REMOTION_SERVICE_URL: ${REMOTION_SERVICE_URL:-http://localhost:8001} + REMOTION_SERVICE_URL: ${REMOTION_SERVICE_URL:-http://remotion:3001} ports: - "8000:8000" command: > diff --git a/query_codex.md b/query_codex.md new file mode 100644 index 0000000..70ee27a --- /dev/null +++ b/query_codex.md @@ -0,0 +1,17 @@ +The user wants to refactor a backend project following DRY and KISS, keeping it developer-readable without hidden abstractions. +The user explicitly rejected a plan that did "in-place cleanup" of `cpv3/modules/tasks/service.py` and instead wants a different approach. + +Currently, `tasks/service.py` is a 1600+ line monolith that: +1. Defines Dramatiq actors +2. Orchestrates job creation and duplication-checks +3. Has `submit_X` methods (e.g. `submit_media_probe`, `submit_transcription_generate`) +4. Has a giant `record_webhook_event` that updates the `Job` state +5. Contains domain-specific artifact saving (e.g. `_save_transcription_artifacts`, `_save_convert_artifacts`) +6. Handles cancellation logic. + +Constraints: +1. Must use the exact module structure: `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py` (no subdirectories within modules). +2. Explicit, local helpers over hidden abstractions (no generic handler registries). + +What is the best architectural approach to distribute these responsibilities across `jobs`, `tasks`, and domain modules (`media`, `transcription`, `captions`)? +Provide a concrete file-by-file blueprint on where each responsibility belongs and how the cross-module calls would look like, avoiding circular dependencies. diff --git a/tests/unit/test_caption_tasks.py b/tests/unit/test_caption_tasks.py new file mode 100644 index 0000000..8e7e863 --- /dev/null +++ b/tests/unit/test_caption_tasks.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import asyncio +from hmac import new +from typing import assert_type +import uuid +from types import SimpleNamespace + +from cpv3.modules.captions import service as captions_service +from cpv3.modules.tasks import service as task_service + + +def _make_document_json() -> dict: + return { + "segments": [ + { + "text": "Привет", + "semantic_tags": [], + "structure_tags": [], + "time": {"start": 0.0, "end": 1.0}, + "lines": [ + { + "text": "Привет", + "semantic_tags": [], + "structure_tags": [], + "time": {"start": 0.0, "end": 1.0}, + "words": [ + { + "text": "Привет", + "semantic_tags": [], + "structure_tags": [], + "time": {"start": 0.0, "end": 1.0}, + } + ], + } + ], + } + ] + } + + +class _FakeResponse: + def __init__(self, payload: dict) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict: + return self._payload + + +def test_captions_generate_actor_sends_fallback_done_when_callback_not_confirmed( + monkeypatch, +) -> None: + sent_events: list[task_service.TaskWebhookEvent] = [] + + async def fake_generate_captions(**_: object) -> str: + return "render-123" + + monkeypatch.setattr(captions_service, "generate_captions", fake_generate_captions) + monkeypatch.setattr(task_service, "_run_async", asyncio.run) + monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None) + monkeypatch.setattr( + task_service, + "_send_webhook_event", + lambda _url, event: sent_events.append(event), + ) + monkeypatch.setattr(task_service.time, "sleep", lambda _seconds: None) + monkeypatch.setattr( + task_service, + "get_settings", + lambda: SimpleNamespace(remotion_service_url="http://remotion.test"), + ) + monkeypatch.setattr( + task_service.httpx, + "get", + lambda *_args, **_kwargs: _FakeResponse( + { + "status": "done", + "output_path": "projects/1/captioned/video.mp4", + "callback_delivered": False, + } + ), + ) + + task_service.captions_generate_actor.fn( + job_id=str(uuid.uuid4()), + webhook_url="http://backend.test/api/tasks/webhook/job-1/", + video_s3_path="uploads/source.mp4", + folder="projects/1", + transcription_json=_make_document_json(), + style_config=None, + ) + + done_events = [ + event for event in sent_events if event.status == task_service.JOB_STATUS_DONE + ] + + assert len(done_events) == 1 + assert done_events[0].progress_pct == task_service.PROGRESS_COMPLETE + assert done_events[0].current_message == "Готово" + assert done_events[0].output_data == { + "output_path": "projects/1/captioned/video.mp4" + } + + +def test_captions_generate_actor_skips_fallback_when_done_webhook_confirmed( + monkeypatch, +) -> None: + sent_events: list[task_service.TaskWebhookEvent] = [] + + async def fake_generate_captions(**_: object) -> str: + return "render-123" + + monkeypatch.setattr(captions_service, "generate_captions", fake_generate_captions) + monkeypatch.setattr(task_service, "_run_async", asyncio.run) + monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None) + monkeypatch.setattr( + task_service, + "_send_webhook_event", + lambda _url, event: sent_events.append(event), + ) + monkeypatch.setattr(task_service.time, "sleep", lambda _seconds: None) + monkeypatch.setattr( + task_service, + "get_settings", + lambda: SimpleNamespace(remotion_service_url="http://remotion.test"), + ) + monkeypatch.setattr( + task_service.httpx, + "get", + lambda *_args, **_kwargs: _FakeResponse( + { + "status": "done", + "output_path": "projects/1/captioned/video.mp4", + "callback_delivered": True, + } + ), + ) + + task_service.captions_generate_actor.fn( + job_id=str(uuid.uuid4()), + webhook_url="http://backend.test/api/tasks/webhook/job-1/", + video_s3_path="uploads/source.mp4", + folder="projects/1", + transcription_json=_make_document_json(), + style_config=None, + ) + + done_events = [ + event for event in sent_events if event.status == task_service.JOB_STATUS_DONE + ] + + assert done_events == [] diff --git a/tests/unit/test_task_service.py b/tests/unit/test_task_service.py new file mode 100644 index 0000000..80bf9ab --- /dev/null +++ b/tests/unit/test_task_service.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import uuid +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from cpv3.modules.tasks.schemas import CaptionsGenerateRequest, TaskWebhookEvent +from cpv3.modules.tasks.service import TaskService + + +@pytest.mark.asyncio +async def test_submit_captions_generate_reuses_existing_active_job() -> None: + service = TaskService(session=AsyncMock()) + existing_job_id = uuid.uuid4() + existing_job = SimpleNamespace( + id=existing_job_id, + status="RUNNING", + ) + + service._find_duplicate_active_job = AsyncMock(return_value=existing_job) + service._submit_task = AsyncMock() + + response = await service.submit_captions_generate( + requester=SimpleNamespace(id=uuid.uuid4()), + request=CaptionsGenerateRequest( + video_s3_path="projects/test/video.mp4", + folder="output_files", + transcription_id=uuid.uuid4(), + project_id=uuid.uuid4(), + preset_id=uuid.uuid4(), + ), + ) + + assert response.job_id == existing_job_id + assert response.status == "RUNNING" + assert response.webhook_url.endswith(f"/api/tasks/webhook/{existing_job_id}/") + service._submit_task.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_record_webhook_event_ignores_cancelled_job() -> None: + cancelled_job = SimpleNamespace( + id=uuid.uuid4(), + status="CANCELLED", + ) + job_repo = SimpleNamespace( + get_by_id=AsyncMock(return_value=cancelled_job), + update=AsyncMock(), + ) + event_repo = SimpleNamespace(create=AsyncMock()) + + service = TaskService(session=AsyncMock()) + service._job_repo = job_repo + service._event_repo = event_repo + + result = await service.record_webhook_event( + job_id=cancelled_job.id, + event=TaskWebhookEvent( + status="DONE", + current_message="Готово", + output_data={"output_path": "projects/test/output.mp4"}, + ), + ) + + assert result is cancelled_job + job_repo.update.assert_not_awaited() + event_repo.create.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_cancel_job_marks_job_cancelled_and_keeps_record() -> None: + job_id = uuid.uuid4() + user_id = uuid.uuid4() + job = SimpleNamespace( + id=job_id, + status="PENDING", + broker_id="default:redis-message-id", + job_type="CAPTIONS_GENERATE", + user_id=user_id, + ) + cancelled_job = SimpleNamespace( + id=job_id, + status="CANCELLED", + broker_id="default:redis-message-id", + job_type="CAPTIONS_GENERATE", + user_id=user_id, + current_message="Отменено пользователем", + ) + + service = TaskService(session=AsyncMock()) + service._job_repo = SimpleNamespace(update=AsyncMock(return_value=cancelled_job)) + service._event_repo = SimpleNamespace(create=AsyncMock()) + service._cancel_dramatiq_message = AsyncMock() + service._cancel_caption_render = AsyncMock() + service._create_cancellation_notification = AsyncMock() + + result = await service.cancel_job(job) + + assert result is cancelled_job + service._job_repo.update.assert_awaited_once() + service._event_repo.create.assert_awaited_once() + service._cancel_dramatiq_message.assert_awaited_once_with(job.broker_id) + service._cancel_caption_render.assert_awaited_once_with(job) + service._create_cancellation_notification.assert_awaited_once_with( + cancelled_job + )