chore: claude final touches

This commit is contained in:
Daniil
2026-03-17 18:11:23 +03:00
parent 4b90925c2a
commit 0299949553
21 changed files with 1915 additions and 101 deletions
+61
View File
@@ -0,0 +1,61 @@
---
name: codex
description: "Use the `codex exec` CLI to consult an OpenAI-powered codebase oracle. Trigger this skill whenever Claude Code would benefit from a second opinion on the current codebase — architecture questions, debugging strategies, implementation alternatives, understanding unfamiliar code patterns, or when the user explicitly asks to 'ask codex', 'check with codex', 'consult codex', or 'codex exec'. Also trigger when Claude is uncertain about a complex codebase decision, needs to understand legacy code, or wants to validate an approach before executing. The codex command scans the current codebase and sends context + the question to an OpenAI model, returning an independent analysis."
---
# Codex — Codebase Oracle via OpenAI
`codex` is a CLI tool that scans the current project and answers questions about it using an OpenAI model. Claude Code can shell out to it for a second perspective.
## Usage
```bash
codex exec "<your question here>"
```
The command automatically:
1. Scans the current working directory for codebase context
2. Sends the question + context to an OpenAI model
3. Returns the answer to stdout
## When to use codex
- **Architecture decisions** — "What pattern is this codebase using for state management?"
- **Debugging help** — "Why might the auth middleware be failing silently?"
- **Code understanding** — "Explain the data flow in the payments module"
- **Implementation review** — "What's the best way to add caching to this service?"
- **Second opinion** — When you want to validate your approach before making changes
- **Legacy code** — When encountering unfamiliar patterns or undocumented code
- **User asks directly** — Any time the user says "ask codex" or "check with codex"
## How to call it
Run it as a bash command. The question should be specific and self-contained — codex already has codebase context, so focus the question on what you need to know.
```bash
# Ask about architecture
codex exec "What design patterns are used in the src/services directory?"
# Ask for debugging help
codex exec "The /api/users endpoint returns 500 when called with a missing email field. What could cause this based on the route handler and validation logic?"
# Ask for implementation guidance
codex exec "What's the best way to add rate limiting to the existing Express middleware chain?"
```
## Reading the response
The answer comes back as plain text on stdout. Read it, weigh it against your own analysis, and use the best parts. Codex is a consultant, not an authority — if its suggestion conflicts with what you know about the codebase, trust your direct analysis of the code.
## Tips for good questions
- Be specific: "Why does UserService.create() call normalize() twice?" beats "What's wrong with user creation?"
- Include symptoms when debugging: mention error messages, failing tests, unexpected behavior
- Scope it: ask about a specific module or file path rather than the entire codebase at once
- One question at a time: don't bundle multiple unrelated questions into a single exec call
## Requirements
- The `codex` CLI must be installed and on PATH
- An OpenAI API key must be configured (codex handles this internally)
- Run from within a project directory so codex can scan the codebase
+10
View File
@@ -0,0 +1,10 @@
name: New MCP server
version: 0.0.1
schema: v1
mcpServers:
- name: New MCP server
command: npx
args:
- -y
- <your-mcp-server>
env: {}
+1
View File
@@ -0,0 +1 @@
REMOTION_SERVICE_URL = http://remotion:3001
+58
View File
@@ -0,0 +1,58 @@
# CLAUDE.md — Coffee Project Backend
See also the monorepo-level `../CLAUDE.md` for full architecture overview.
## Commands
```bash
uv sync # Install dependencies
uv run uvicorn cpv3.main:app --reload # Dev server (localhost:8000)
uv run pytest # Run all tests
uv run pytest tests/integration/<file>.py # Single test file
uv run alembic revision --autogenerate -m "msg" # Create migration
uv run alembic upgrade head # Apply migrations
uv run dramatiq cpv3.modules.tasks.service # Start background worker
uv run ruff check cpv3/ # Lint
uv run ruff format cpv3/ # Auto-format
docker-compose up # Full stack (DB, Redis, MinIO, API, Worker)
```
## Architecture
Layered module pattern. Each module has exactly 5-6 files (`__init__.py`, `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`). No subdirectories within modules. When in doubt, put logic in `service.py`.
```
cpv3/
├── main.py # FastAPI app, CORS, router registration
├── api/v1/router.py # Aggregates all module routers
├── common/schemas.py # Base Schema class (Pydantic, from_attributes=True)
├── db/ # Base, session, model imports
├── infrastructure/ # Auth, security, settings, storage
└── modules/ # Feature modules (users, projects, media, files, etc.)
```
## Key Patterns
- **Database**: PostgreSQL 16, async SQLAlchemy (`AsyncSession`), all models inherit `Base` + `BaseModelMixin`.
- **Soft deletes**: `is_deleted` flag — repositories filter by default.
- **Auth**: JWT access + refresh tokens in HTTP-only cookies. Use `get_current_user` dependency.
- **Config**: `get_settings()` from `cpv3/infrastructure/settings.py` (cached with `@lru_cache`).
- **Background tasks**: Dramatiq with Redis broker. Actors live in `cpv3/modules/tasks/service.py`.
- **Package manager**: **uv** only — `uv sync`, `uv add <pkg>`, `uv run <cmd>`.
- **Linting**: Ruff (line length 100, config in `pyproject.toml`).
## Common Gotchas
- **Async sessions**: Never mix sync and async DB operations. Use `await` on all session calls.
- **Module structure**: Do not create extra files or subdirectories. Standard files only: `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`.
- **Schemas**: Always inherit from `cpv3.common.schemas.Schema`, not raw Pydantic `BaseModel`.
- **Imports**: Use absolute paths (`from cpv3.modules.media.schemas import ...`), not relative.
- **Error messages**: Store as module-level constants with `ERROR_` prefix, not inline strings.
- **CPU-bound work**: Use `anyio.to_thread.run_sync()` to avoid blocking the event loop.
## Docker Services
```
postgres → localhost:5332 minio → localhost:9000 (console: 9001)
redis → localhost:6379 api → localhost:8000 (OpenAPI at /api/schema/)
```
+393
View File
@@ -0,0 +1,393 @@
# CODEX Refactor Plan
## Chosen direction
This plan adopts **Option 1: in-place cleanup inside current boundaries**.
The goal is to make backend feature work safer and the code easier to read **without moving far from the current implementation**.
This means:
- keep the existing module structure (`models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py`)
- keep public HTTP routes and response contracts stable
- avoid hidden abstractions, base classes, registries, or plugin-style indirection
- prefer explicit, local helpers over framework-like refactors
- optimize first for **developer readability and safer future changes**
## Why this option
The biggest current readability and change-risk hotspot is `cpv3/modules/tasks/service.py`.
Right now it mixes several responsibilities in one place:
- broker/bootstrap concerns
- Dramatiq actor definitions
- task submission flow
- job lifecycle side effects
- webhook application
- artifact persistence
- notification sending
- cancellation side effects
The highest concentration of risk is in:
- `cpv3/modules/tasks/service.py` — especially `TaskService` and `record_webhook_event()`
- `cpv3/modules/jobs/router.py` — cancellation path delegates into task orchestration
- `cpv3/modules/captions/service.py` — already contains both preset CRUD and Remotion client logic
A bigger ownership move would be possible later, but it is not the safest first step.
## Primary objectives
1. Make task/job orchestration easier to understand.
2. Reduce the blast radius of changes in `TaskService`.
3. Replace raw internal JSON/dict handling with typed, explicit internal contracts.
4. Keep behavior stable for current API consumers and existing module boundaries.
5. Improve confidence with focused unit and integration coverage before and during refactoring.
## Constraints
### Must keep
- Current backend module structure.
- Current route layout and public API behavior.
- Current job types and overall task flow.
- Current ownership model at module level.
- Explicit code over clever abstractions.
### Allowed
- Small internal contract cleanup.
- Small private helper extraction inside existing files.
- Better naming, smaller methods, and clearer sequencing.
- Add or expand tests.
### Not allowed in this phase
- New module subdirectories or extra architecture layers.
- Generic handler registries.
- Base service hierarchies.
- Large movement of lifecycle ownership from `tasks` into `jobs`.
- Pushing more orchestration into `captions`, `media`, or `transcription` before task contracts are cleaned up.
- Broad request/response or frontend-facing contract changes.
## Refactor scope
### In scope
- `cpv3/modules/tasks/service.py`
- `cpv3/modules/tasks/schemas.py`
- `cpv3/modules/tasks/router.py` where needed for clarity only
- `cpv3/modules/jobs/router.py` where needed for clearer delegation only
- tests around task submission, webhook handling, cancellation, and caption task behavior
### Nearby modules to touch carefully if needed
- `cpv3/modules/captions/service.py`
- `cpv3/modules/media/service.py`
- `cpv3/modules/transcription/repository.py`
- `cpv3/modules/jobs/service.py`
These are supporting touches only. The refactor should remain centered in `tasks`.
## Target end state
After this phase:
- `TaskService` still owns task orchestration,
- but each major step is explicit and small,
- stored job payloads are validated through typed internal schemas,
- webhook application is broken into readable stages,
- repeated submission logic is reduced through simple private helpers,
- actor flow is easier to scan,
- tests cover the critical seams that future feature work will depend on.
`tasks/service.py` may still be large after this phase, but it should stop being a "many responsibilities blurred together" file.
## Main decisions
### 1. Keep orchestration in `tasks` for now
Do **not** move job lifecycle ownership into `jobs/service.py` in this phase.
Reason:
- it is a valid long-term direction,
- but it changes semantic ownership of cancellation and webhook application,
- which raises regression risk before the current flow is properly typed and decomposed.
### 2. Add typed internal payload schemas
Add explicit internal schemas in `cpv3/modules/tasks/schemas.py` for per-job stored payloads.
These schemas are for internal readability and validation only.
They should:
- represent the current JSON shape stored in `job.input_data` / `job.output_data`
- remain compatible with existing persisted data
- avoid forcing a database migration in this phase
- be explicit per job type rather than generic
Examples of the kind of models expected:
- media probe input/output
- silence detect input/output
- silence apply input/output
- media convert input/output
- transcription generate input/output
- frame extract input/output
- captions generate input/output
Important: the database can still store JSON, but `TaskService` should stop passing raw dicts around when internal meaning matters.
### 3. Split `record_webhook_event()` into explicit stages
Refactor `record_webhook_event()` into small private methods with one purpose each.
Target shape:
- load and validate job state
- ignore terminal jobs when appropriate
- apply status update
- append job event row
- run job-type-specific done hooks
- send notification
- return updated job
This keeps behavior in one place while making the sequencing obvious.
### 4. Reduce duplication only where it is obvious
Allowed helper extraction:
- output folder resolution
- broker reference update
- source file lookup/create flow
- artifact persistence helpers
- notification dispatch helper
- common actor start/fail envelope if the resulting code is still explicit
Do **not** introduce a generic task framework.
### 5. Keep domain persistence mostly where it is today
Do not push transcription/media/captions result persistence back into their modules yet.
Reason:
- domain ownership may improve long-term,
- but doing it before task contracts are typed risks scattering orchestration across several large files.
This phase should first make the existing orchestration understandable.
## Execution phases
### Phase 0 — Lock behavior with tests
Before changing the flow, expand tests around the current seams.
Priority coverage:
- `tests/unit/test_task_service.py`
- duplicate active job reuse
- terminal job webhook ignore behavior
- webhook event update sequencing
- artifact persistence trigger points
- cancellation path behavior
- `tests/unit/test_caption_tasks.py`
- caption render fallback behavior
- callback confirmation behavior
- cancellation-related safety where relevant
- integration coverage where current behavior is externally visible
- `tests/integration/test_jobs_endpoints.py`
- `tests/integration/test_captions_endpoints.py` if affected
The goal is not broad test expansion. The goal is to pin the behavior of the exact code being refactored.
### Phase 1 — Introduce typed internal task payloads
Add internal schemas in `cpv3/modules/tasks/schemas.py` for job payloads.
Implementation rules:
- keep schema names explicit per job type
- support current field names
- make optional fields tolerant enough for existing rows where necessary
- use these schemas at service boundaries where `input_data` and `output_data` are read or produced
Expected outcome:
- internal payload meaning becomes visible from types instead of being inferred from raw dict access
- future changes become easier to review safely
### Phase 2 — Decompose webhook application flow
Refactor `TaskService.record_webhook_event()` into a sequence of small private methods.
Recommended decomposition:
- `_load_job_for_webhook(...)`
- `_should_ignore_webhook(...)`
- `_apply_job_status_update(...)`
- `_create_job_event(...)`
- `_run_done_side_effects(...)`
- `_send_job_notification(...)`
Method names may differ, but the responsibilities should remain this explicit.
Important:
- keep sequencing behavior unchanged
- preserve the current order where artifacts are saved before notifications when that order matters
### Phase 3 — Normalize submission helpers
Refactor repetitive submit methods without hiding behavior.
The submit methods should remain explicit, but repeated steps can be centralized where the code becomes clearer:
- resolve user output folder
- prepare actor kwargs
- create job + webhook
- enqueue actor
- persist broker reference
- build submit response
The final submit methods should still read as business flows, not as thin wrappers over a generic registry.
### Phase 4 — Clean actor envelope duplication
If still useful after earlier phases, standardize only the repeated envelope around actors:
- cancellation check
- running event emission
- success/failure reporting
This should stay simple and local.
If the helper makes actor behavior harder to follow, do not extract it.
### Phase 5 — Reassess, do not auto-expand scope
After the in-place cleanup is done, reassess whether a second phase is justified.
Only then consider:
- moving lifecycle semantics into `jobs`
- pushing result persistence into owning modules
Those are explicitly **not** part of the current plan.
## File-by-file plan
### `cpv3/modules/tasks/schemas.py`
- Add explicit internal payload schemas for job input/output.
- Keep public API request/response schemas stable.
- Prefer additive internal typing over schema churn.
### `cpv3/modules/tasks/service.py`
- Keep this file as the orchestration center for this phase.
- Decompose large methods into readable private steps.
- Replace raw internal dict usage with typed parsing/serialization where the meaning matters.
- Reduce repeated code only through local, explicit helpers.
### `cpv3/modules/tasks/router.py`
- Keep routes unchanged.
- Only adjust delegation or error handling if required to support clearer internal service boundaries.
### `cpv3/modules/jobs/router.py`
- Keep the current cancel entrypoint behavior.
- If touched, only make delegation clearer.
- Do not move cancel semantics into `jobs/service.py` in this phase.
### `cpv3/modules/captions/service.py`
- Keep current ownership.
- Do not add more orchestration here in phase 1.
- Only make supporting adjustments if typed payload work needs a clearer boundary.
### `cpv3/modules/media/service.py`
- Keep media processing helpers as-is unless task cleanup reveals a very small, obvious extraction need.
- Avoid expanding this file into a second orchestration center.
## Risk controls
### Preserve persisted JSON compatibility
Existing jobs may already contain payloads in the database.
Therefore:
- typed internal schemas must support current persisted shapes
- avoid renaming stored keys unless there is a very strong reason
- prefer backward-compatible parsing over cleanup for cleanups sake
### Preserve side-effect ordering
Current flow saves some artifacts before notification dispatch.
That ordering matters because clients may refetch immediately after notification.
Do not change this ordering unless there is a proven reason and test coverage for the new behavior.
### Avoid ownership churn during cleanup
This phase is about readability and safer edits, not about redistributing the architecture.
If a change starts to feel like "this really belongs in another module," mark it for reassessment after phase 1 instead of expanding scope immediately.
## Verification strategy
For implementation of this plan, verification should include:
- targeted unit tests for `TaskService`
- targeted unit tests for caption task actor behavior
- relevant integration tests for jobs/tasks endpoints
- Ruff checks for touched files
The most important verification is behavioral:
- task submission still works
- duplicate job reuse still works
- webhook updates still drive correct job state
- artifact persistence still happens before notification where required
- cancellation still behaves as before
## Definition of done for this phase
Phase 1 is complete when all of the following are true:
- internal job payload handling is typed where it matters
- `record_webhook_event()` is decomposed into explicit, readable steps
- repetitive submit flow is reduced without introducing hidden abstractions
- tests protect the main task orchestration seams
- public APIs remain effectively unchanged
- the backend is easier to change safely without rediscovering task flow each time
## Explicit non-goals
This plan does **not** include:
- moving lifecycle ownership into `jobs/service.py`
- redesigning the task system
- introducing a generic abstraction layer for tasks
- moving artifact persistence into every owning domain module
- broad route redesign
- frontend contract changes
- module structure changes
## Follow-up after this phase
Once this plan is implemented and stable, reassess whether there is enough remaining pressure to justify a second phase.
If yes, the next candidate should be:
- **Option 2** selectively, and only where phase 1 proves that lifecycle ownership is still the main source of friction.
Until then, keep the refactor intentionally conservative.
@@ -0,0 +1,192 @@
"""add caption_presets table
Revision ID: d5e6f7a8b9c0
Revises: c4d5e6f7a8b9
Create Date: 2026-03-14 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "d5e6f7a8b9c0"
down_revision: Union[str, None] = "c4d5e6f7a8b9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"caption_presets",
sa.Column("user_id", sa.UUID(), nullable=True),
sa.Column("name", sa.String(length=128), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("is_system", sa.Boolean(), nullable=False, server_default=sa.text("false")),
sa.Column("style_config", sa.JSON(), nullable=False),
sa.Column("preview_url", sa.String(length=512), nullable=True),
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_caption_presets_user_id"),
"caption_presets",
["user_id"],
unique=False,
)
# Seed system presets
caption_presets = sa.table(
"caption_presets",
sa.column("id", sa.UUID()),
sa.column("user_id", sa.UUID()),
sa.column("name", sa.String()),
sa.column("description", sa.Text()),
sa.column("is_system", sa.Boolean()),
sa.column("style_config", sa.JSON()),
sa.column("preview_url", sa.String()),
sa.column("is_active", sa.Boolean()),
)
op.bulk_insert(
caption_presets,
[
{
"id": "00000000-0000-4000-a000-000000000001",
"user_id": None,
"name": "Классические",
"description": "Белый текст с тенью, жёлтая подсветка слова",
"is_system": True,
"style_config": {
"text": {
"font_family": "Lobster",
"font_size": 40,
"font_weight": 400,
"text_color": "#FFFFFF",
"highlight_color": "#FFCC00",
"text_shadow": "2px 2px 4px rgba(0,0,0,0.5)",
"text_stroke_width": 0,
"text_stroke_color": "#000000",
},
"layout": {
"vertical_position": "bottom",
"horizontal_alignment": "center",
"padding_px": 20,
"max_width_pct": 90,
"lines_per_screen": 2,
},
"animation": {
"highlight_style": "color_scale",
"highlight_scale": 1.1,
"segment_transition": "fade",
"fade_duration_frames": 3,
"animation_speed": 1.0,
},
"background": {
"bg_color": "rgba(0,0,0,0)",
"bg_blur_px": 0,
"bg_glow_color": None,
"bg_border_radius_px": 0,
"bg_padding_px": 0,
},
},
"preview_url": None,
"is_active": True,
},
{
"id": "00000000-0000-4000-a000-000000000002",
"user_id": None,
"name": "Неон",
"description": "Голубой текст с неоновым свечением, пурпурная подсветка",
"is_system": True,
"style_config": {
"text": {
"font_family": "Courier New",
"font_size": 45,
"font_weight": 400,
"text_color": "#00FFFF",
"highlight_color": "#FF00FF",
"text_shadow": "0 0 5px #0ff, 0 0 10px #0ff, 0 0 20px #0ff",
"text_stroke_width": 0,
"text_stroke_color": "#000000",
},
"layout": {
"vertical_position": "bottom",
"horizontal_alignment": "center",
"padding_px": 20,
"max_width_pct": 90,
"lines_per_screen": 2,
},
"animation": {
"highlight_style": "color_scale",
"highlight_scale": 1.2,
"segment_transition": "fade",
"fade_duration_frames": 3,
"animation_speed": 1.0,
},
"background": {
"bg_color": "rgba(0,0,0,0.6)",
"bg_blur_px": 0,
"bg_glow_color": None,
"bg_border_radius_px": 15,
"bg_padding_px": 20,
},
},
"preview_url": None,
"is_active": True,
},
{
"id": "00000000-0000-4000-a000-000000000003",
"user_id": None,
"name": "Минимализм",
"description": "Маленький белый текст без фона, плавное появление",
"is_system": True,
"style_config": {
"text": {
"font_family": "Inter",
"font_size": 28,
"font_weight": 300,
"text_color": "#FFFFFF",
"highlight_color": "#FFFFFF",
"text_shadow": "1px 1px 2px rgba(0,0,0,0.3)",
"text_stroke_width": 0,
"text_stroke_color": "#000000",
},
"layout": {
"vertical_position": "bottom",
"horizontal_alignment": "center",
"padding_px": 30,
"max_width_pct": 80,
"lines_per_screen": 1,
},
"animation": {
"highlight_style": "color",
"highlight_scale": 1.0,
"segment_transition": "fade",
"fade_duration_frames": 5,
"animation_speed": 1.0,
},
"background": {
"bg_color": "rgba(0,0,0,0)",
"bg_blur_px": 0,
"bg_glow_color": None,
"bg_border_radius_px": 0,
"bg_padding_px": 0,
},
},
"preview_url": None,
"is_active": True,
},
],
)
def downgrade() -> None:
op.drop_index(op.f("ix_caption_presets_user_id"), table_name="caption_presets")
op.drop_table("caption_presets")
+2
View File
@@ -1,4 +1,5 @@
from cpv3.db.base import Base
from cpv3.modules.captions.models import CaptionPreset
from cpv3.modules.jobs.models import Job, JobEvent
from cpv3.modules.media.models import ArtifactMediaFile, MediaFile
from cpv3.modules.projects.models import Project
@@ -10,6 +11,7 @@ from cpv3.modules.webhooks.models import Webhook
__all__ = [
"Base",
"CaptionPreset",
"User",
"Project",
"File",
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
import uuid
from sqlalchemy import Boolean, ForeignKey, JSON, String, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from cpv3.db.base import Base, BaseModelMixin
class CaptionPreset(Base, BaseModelMixin):
__tablename__ = "caption_presets"
user_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
name: Mapped[str] = mapped_column(String(128))
description: Mapped[str | None] = mapped_column(Text, nullable=True)
is_system: Mapped[bool] = mapped_column(Boolean, default=False)
style_config: Mapped[dict] = mapped_column(JSON, nullable=False)
preview_url: Mapped[str | None] = mapped_column(String(512), nullable=True)
+70
View File
@@ -0,0 +1,70 @@
from __future__ import annotations
import uuid
from sqlalchemy import Select, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.modules.captions.models import CaptionPreset
from cpv3.modules.captions.schemas import CaptionPresetCreate, CaptionPresetUpdate
class CaptionPresetRepository:
"""Repository for CaptionPreset database operations."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def list_all_for_user(self, user_id: uuid.UUID) -> list[CaptionPreset]:
"""Return system presets + user's own presets."""
stmt: Select[tuple[CaptionPreset]] = (
select(CaptionPreset)
.where(CaptionPreset.is_active.is_(True))
.where(
or_(
CaptionPreset.is_system.is_(True),
CaptionPreset.user_id == user_id,
)
)
.order_by(CaptionPreset.is_system.desc(), CaptionPreset.created_at.desc())
)
result = await self._session.execute(stmt)
return list(result.scalars().all())
async def get_by_id(self, preset_id: uuid.UUID) -> CaptionPreset | None:
result = await self._session.execute(
select(CaptionPreset)
.where(CaptionPreset.id == preset_id)
.where(CaptionPreset.is_active.is_(True))
)
return result.scalar_one_or_none()
async def create(
self, *, user_id: uuid.UUID | None, data: CaptionPresetCreate
) -> CaptionPreset:
preset = CaptionPreset(
user_id=user_id,
name=data.name,
description=data.description,
is_system=user_id is None,
style_config=data.style_config.model_dump(mode="json"),
)
self._session.add(preset)
await self._session.commit()
await self._session.refresh(preset)
return preset
async def update(self, preset: CaptionPreset, data: CaptionPresetUpdate) -> CaptionPreset:
for key, value in data.model_dump(exclude_unset=True).items():
if value is not None:
if key == "style_config":
setattr(preset, key, value.model_dump(mode="json"))
else:
setattr(preset, key, value)
await self._session.commit()
await self._session.refresh(preset)
return preset
async def deactivate(self, preset: CaptionPreset) -> None:
preset.is_active = False
await self._session.commit()
+102 -3
View File
@@ -1,15 +1,30 @@
from __future__ import annotations
from fastapi import APIRouter, Depends
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.db.session import get_db
from cpv3.infrastructure.auth import get_current_user
from cpv3.modules.captions.schemas import CaptionsRequest, CaptionsResponse
from cpv3.modules.captions.service import generate_captions
from cpv3.modules.captions.schemas import (
CaptionPresetCreate,
CaptionPresetRead,
CaptionPresetUpdate,
CaptionsRequest,
CaptionsResponse,
)
from cpv3.modules.captions.service import CaptionPresetService, generate_captions
from cpv3.modules.users.models import User
router = APIRouter(prefix="/api/captions", tags=["Captions"])
# ---------------------------------------------------------------------------
# Legacy direct render endpoint
# ---------------------------------------------------------------------------
@router.post("/get_video/", response_model=CaptionsResponse)
async def get_video(
body: CaptionsRequest, current_user: User = Depends(get_current_user)
@@ -21,3 +36,87 @@ async def get_video(
transcription=body.transcription,
)
return CaptionsResponse(result=result)
# ---------------------------------------------------------------------------
# Preset CRUD
# ---------------------------------------------------------------------------
@router.get("/presets/", response_model=list[CaptionPresetRead])
async def list_presets(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[CaptionPresetRead]:
"""List system presets + user's own presets."""
service = CaptionPresetService(db)
presets = await service.list_presets(user_id=current_user.id)
return [CaptionPresetRead.model_validate(p, from_attributes=True) for p in presets]
@router.post(
"/presets/",
response_model=CaptionPresetRead,
status_code=status.HTTP_201_CREATED,
)
async def create_preset(
body: CaptionPresetCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> CaptionPresetRead:
"""Create a user preset."""
service = CaptionPresetService(db)
preset = await service.create_preset(user_id=current_user.id, data=body)
return CaptionPresetRead.model_validate(preset, from_attributes=True)
@router.get("/presets/{preset_id}/", response_model=CaptionPresetRead)
async def get_preset(
preset_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> CaptionPresetRead:
"""Get a single preset."""
_ = current_user
service = CaptionPresetService(db)
try:
preset = await service.get_preset(preset_id=preset_id)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
return CaptionPresetRead.model_validate(preset, from_attributes=True)
@router.patch("/presets/{preset_id}/", response_model=CaptionPresetRead)
async def update_preset(
preset_id: uuid.UUID,
body: CaptionPresetUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> CaptionPresetRead:
"""Update a user preset (cannot edit system or others' presets)."""
service = CaptionPresetService(db)
try:
preset = await service.update_preset(
preset_id=preset_id, user_id=current_user.id, data=body
)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
except PermissionError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc))
return CaptionPresetRead.model_validate(preset, from_attributes=True)
@router.delete("/presets/{preset_id}/", status_code=status.HTTP_204_NO_CONTENT)
async def delete_preset(
preset_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> None:
"""Delete a user preset (cannot delete system or others' presets)."""
service = CaptionPresetService(db)
try:
await service.delete_preset(preset_id=preset_id, user_id=current_user.id)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
except PermissionError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc))
+87
View File
@@ -1,9 +1,96 @@
from __future__ import annotations
from datetime import datetime
from typing import Literal
from uuid import UUID
from pydantic import Field
from cpv3.common.schemas import Schema
from cpv3.modules.transcription.schemas import Document
# ---------------------------------------------------------------------------
# Caption style config sub-schemas
# ---------------------------------------------------------------------------
class CaptionTextStyle(Schema):
font_family: str = "Lobster"
font_size: int = 40
font_weight: int = 400
text_color: str = "#FFFFFF"
highlight_color: str = "#FFCC00"
text_shadow: str | None = "2px 2px 4px rgba(0,0,0,0.5)"
text_stroke_width: float = 0
text_stroke_color: str = "#000000"
class CaptionLayoutStyle(Schema):
vertical_position: Literal["top", "center", "bottom"] = "bottom"
horizontal_alignment: Literal["left", "center", "right"] = "center"
padding_px: int = 20
max_width_pct: int = 90
lines_per_screen: int = 2
class CaptionAnimationStyle(Schema):
highlight_style: Literal["color", "scale", "underline", "color_scale"] = "color"
highlight_scale: float = 1.1
segment_transition: Literal["fade", "slide", "none"] = "fade"
fade_duration_frames: int = 3
animation_speed: float = 1.0
class CaptionBackgroundStyle(Schema):
bg_color: str = "rgba(0,0,0,0.6)"
bg_blur_px: int = 0
bg_glow_color: str | None = None
bg_border_radius_px: int = 15
bg_padding_px: int = 20
class CaptionStyleConfig(Schema):
text: CaptionTextStyle = Field(default_factory=CaptionTextStyle)
layout: CaptionLayoutStyle = Field(default_factory=CaptionLayoutStyle)
animation: CaptionAnimationStyle = Field(default_factory=CaptionAnimationStyle)
background: CaptionBackgroundStyle = Field(default_factory=CaptionBackgroundStyle)
# ---------------------------------------------------------------------------
# Preset CRUD schemas
# ---------------------------------------------------------------------------
class CaptionPresetCreate(Schema):
name: str = Field(..., max_length=128)
description: str | None = None
style_config: CaptionStyleConfig
class CaptionPresetUpdate(Schema):
name: str | None = Field(default=None, max_length=128)
description: str | None = None
style_config: CaptionStyleConfig | None = None
class CaptionPresetRead(Schema):
id: UUID
user_id: UUID | None
name: str
description: str | None
is_system: bool
style_config: CaptionStyleConfig
preview_url: str | None
created_at: datetime
updated_at: datetime
# ---------------------------------------------------------------------------
# Existing request/response schemas
# ---------------------------------------------------------------------------
class CaptionsRequest(Schema):
folder: str
video_s3_path: str
+107 -9
View File
@@ -1,31 +1,129 @@
from __future__ import annotations
import uuid
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.infrastructure.settings import get_settings
from cpv3.modules.captions.models import CaptionPreset
from cpv3.modules.captions.repository import CaptionPresetRepository
from cpv3.modules.captions.schemas import (
CaptionPresetCreate,
CaptionPresetUpdate,
CaptionStyleConfig,
)
from cpv3.modules.transcription.schemas import Document
ERROR_PRESET_NOT_FOUND = "Пресет субтитров не найден"
ERROR_PRESET_FORBIDDEN = "Нельзя изменять чужой или системный пресет"
# ---------------------------------------------------------------------------
# Caption rendering (calls Remotion service)
# ---------------------------------------------------------------------------
async def generate_captions(
*, video_s3_path: str, folder: str, transcription: Document
*,
video_s3_path: str,
folder: str,
transcription: Document,
style_config: dict | None = None,
callback_url: str | None = None,
render_id: str | None = None,
) -> str:
"""Generate captions for a video using the Remotion service."""
"""Generate captions for a video using the Remotion service.
Returns render_id (async mode with callback_url) or S3 output path (sync fallback).
"""
settings = get_settings()
payload = {
payload: dict = {
"folder": folder,
"videoSrc": video_s3_path,
"transcription": transcription.model_dump(),
}
if style_config is not None:
payload["styleConfig"] = style_config
if callback_url is not None:
payload["callbackUrl"] = callback_url
if render_id is not None:
payload["renderId"] = render_id
async with httpx.AsyncClient(timeout=300) as client:
resp = await client.post(
f"{settings.remotion_service_url}/api/render", json=payload
)
timeout = 30.0 if callback_url else 300.0
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(f"{settings.remotion_service_url}/api/render", json=payload)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict) or "output" not in data:
# Async mode: Remotion returns { renderId, status: "queued" }
if callback_url and "renderId" in data:
return str(data["renderId"])
# Sync fallback: Remotion returns { output: "s3/path" }
if isinstance(data, dict) and "output" in data:
return str(data["output"])
raise RuntimeError("Unexpected response from remotion service")
return str(data["output"])
# ---------------------------------------------------------------------------
# Preset service
# ---------------------------------------------------------------------------
class CaptionPresetService:
"""Business logic for caption preset CRUD with ownership checks."""
def __init__(self, session: AsyncSession) -> None:
self._repo = CaptionPresetRepository(session)
async def list_presets(self, *, user_id: uuid.UUID) -> list[CaptionPreset]:
return await self._repo.list_all_for_user(user_id)
async def get_preset(self, *, preset_id: uuid.UUID) -> CaptionPreset:
preset = await self._repo.get_by_id(preset_id)
if preset is None:
raise ValueError(ERROR_PRESET_NOT_FOUND)
return preset
async def create_preset(
self, *, user_id: uuid.UUID, data: CaptionPresetCreate
) -> CaptionPreset:
return await self._repo.create(user_id=user_id, data=data)
async def update_preset(
self,
*,
preset_id: uuid.UUID,
user_id: uuid.UUID,
data: CaptionPresetUpdate,
) -> CaptionPreset:
preset = await self.get_preset(preset_id=preset_id)
if preset.is_system or preset.user_id != user_id:
raise PermissionError(ERROR_PRESET_FORBIDDEN)
return await self._repo.update(preset, data)
async def delete_preset(self, *, preset_id: uuid.UUID, user_id: uuid.UUID) -> None:
preset = await self.get_preset(preset_id=preset_id)
if preset.is_system or preset.user_id != user_id:
raise PermissionError(ERROR_PRESET_FORBIDDEN)
await self._repo.deactivate(preset)
async def resolve_style_config(
self,
*,
preset_id: uuid.UUID | None = None,
inline_config: dict | None = None,
) -> dict | None:
"""Resolve a style config from preset_id or inline override."""
if inline_config is not None:
# Validate by parsing, then return as dict
cfg = CaptionStyleConfig.model_validate(inline_config)
return cfg.model_dump(mode="json")
if preset_id is not None:
preset = await self.get_preset(preset_id=preset_id)
return preset.style_config
return None
+25
View File
@@ -35,6 +35,31 @@ class JobRepository:
)
return result.scalar_one_or_none()
async def list_active_by_type(
self,
*,
requester: User,
job_type: str,
project_id: uuid.UUID | None,
statuses: tuple[str, ...],
) -> list[Job]:
stmt: Select[tuple[Job]] = (
select(Job)
.where(Job.is_active.is_(True))
.where(Job.user_id == requester.id)
.where(Job.job_type == job_type)
.where(Job.status.in_(statuses))
.order_by(Job.created_at.desc())
)
if project_id is None:
stmt = stmt.where(Job.project_id.is_(None))
else:
stmt = stmt.where(Job.project_id == project_id)
result = await self._session.execute(stmt)
return list(result.scalars().all())
async def create(self, *, requester: User, data: JobCreate) -> Job:
job = Job(
user_id=requester.id,
+6
View File
@@ -16,6 +16,7 @@ from cpv3.modules.jobs.schemas import (
JobUpdate,
)
from cpv3.modules.jobs.service import JobService
from cpv3.modules.tasks.service import TaskService
from cpv3.modules.users.models import User
jobs_router = APIRouter(prefix="/api/jobs", tags=["jobs"])
@@ -75,6 +76,11 @@ async def patch_job_endpoint(
if not current_user.is_staff and job.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
if body.status == "CANCELLED":
task_service = TaskService(db)
job = await task_service.cancel_job(job)
return JobRead.model_validate(job)
job = await service.update_job(job, body)
return JobRead.model_validate(job)
+1 -1
View File
@@ -339,7 +339,7 @@ async def convert_to_mp4(
try:
filename_without_ext = path.splitext(path.basename(file_key))[0]
mp4_filename = filename_without_ext + ".mp4"
mp4_filename = f"Конвертированое видео {filename_without_ext}.mp4"
with NamedTemporaryFile(suffix=".mp4", delete=False) as out:
out_path = out.name
+10 -12
View File
@@ -37,9 +37,7 @@ class SilenceRemoveRequest(Schema):
min_silence_duration_ms: int = Field(
default=200, description="Minimum silence duration in milliseconds"
)
silence_threshold_db: int = Field(
default=16, description="Silence threshold in decibels"
)
silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels")
padding_ms: int = Field(
default=100, description="Padding around non-silent segments in milliseconds"
)
@@ -53,9 +51,7 @@ class SilenceDetectRequest(Schema):
min_silence_duration_ms: int = Field(
default=200, description="Minimum silence duration in milliseconds"
)
silence_threshold_db: int = Field(
default=16, description="Silence threshold in decibels"
)
silence_threshold_db: int = Field(default=16, description="Silence threshold in decibels")
padding_ms: int = Field(
default=100, description="Padding around non-silent segments in milliseconds"
)
@@ -67,9 +63,7 @@ class SilenceApplyRequest(Schema):
file_key: str = Field(..., description="Storage key of the input file")
out_folder: str = Field(..., description="Output folder for processed file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
output_name: str | None = Field(
default=None, description="Display name for the output file"
)
output_name: str | None = Field(default=None, description="Display name for the output file")
cuts: list[dict] = Field(
..., description="Cut regions: [{'start_ms': int, 'end_ms': int}, ...]"
)
@@ -103,6 +97,12 @@ class CaptionsGenerateRequest(Schema):
folder: str = Field(..., description="Output folder for rendered video")
transcription_id: UUID = Field(..., description="ID of the transcription to use")
project_id: UUID | None = Field(default=None, description="Associated project ID")
preset_id: UUID | None = Field(
default=None, description="Caption style preset ID (mutually exclusive with style_config)"
)
style_config: dict | None = Field(
default=None, description="Inline caption style config (overrides preset_id)"
)
class FrameExtractRequest(Schema):
@@ -110,9 +110,7 @@ class FrameExtractRequest(Schema):
file_key: str = Field(..., description="S3 key of the video file")
project_id: UUID | None = Field(default=None, description="Associated project ID")
regenerate: bool = Field(
default=False, description="Delete existing frames and re-extract"
)
regenerate: bool = Field(default=False, description="Delete existing frames and re-extract")
# --- Response schemas ---
+480 -71
View File
@@ -10,17 +10,19 @@ import json
import logging
import time
import uuid
from pathlib import Path
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import dramatiq # type: ignore[import-untyped]
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
import httpx
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.infrastructure.deps import _get_storage_service
from cpv3.infrastructure.settings import get_settings
from cpv3.infrastructure.storage.utils import get_user_folder
from cpv3.modules.files.repository import FileRepository
from cpv3.modules.files.schemas import FileCreate
from cpv3.modules.jobs.models import Job
@@ -46,7 +48,6 @@ from cpv3.modules.tasks.schemas import (
TaskWebhookEvent,
TranscriptionGenerateRequest,
)
from cpv3.infrastructure.storage.utils import get_user_folder
from cpv3.modules.notifications.service import NotificationService
from cpv3.modules.transcription.repository import TranscriptionRepository
from cpv3.modules.transcription.schemas import TranscriptionCreate
@@ -61,6 +62,7 @@ JOB_STATUS_PENDING: JobStatusEnum = "PENDING"
JOB_STATUS_RUNNING: JobStatusEnum = "RUNNING"
JOB_STATUS_DONE: JobStatusEnum = "DONE"
JOB_STATUS_FAILED: JobStatusEnum = "FAILED"
JOB_STATUS_CANCELLED: JobStatusEnum = "CANCELLED"
JOB_TYPE_MEDIA_PROBE: JobTypeEnum = "MEDIA_PROBE"
JOB_TYPE_SILENCE_REMOVE: JobTypeEnum = "SILENCE_REMOVE"
@@ -94,6 +96,7 @@ MESSAGE_PROBING_MEDIA = "Probing media"
MESSAGE_PROCESSING = "Processing"
MESSAGE_CONVERTING = "Converting"
MESSAGE_RENDERING_CAPTIONS = "Rendering captions"
MESSAGE_CANCELLED = "Отменено пользователем"
MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров"
MESSAGE_UPLOADING_FRAMES = "Загрузка кадров"
MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров"
@@ -116,6 +119,13 @@ MESSAGE_APPLYING_CUTS = "Применение вырезок"
PROGRESS_THROTTLE_SECONDS = 3.0
ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING)
DRAMATIQ_BROKER_REF_SEPARATOR = ":"
class JobCancelledError(RuntimeError):
"""Raised when a job was cancelled before completion."""
# ---------------------------------------------------------------------------
# Dramatiq broker setup
# ---------------------------------------------------------------------------
@@ -163,9 +173,7 @@ def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None:
"""Send a task webhook event to the API."""
payload = event.model_dump(mode="json", exclude_none=True)
try:
response = httpx.post(
webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS
)
response = httpx.post(webhook_url, json=payload, timeout=WEBHOOK_TIMEOUT_SECONDS)
response.raise_for_status()
except Exception:
logger.exception("Failed to send task webhook event to %s", webhook_url)
@@ -197,17 +205,69 @@ def _run_async(coro: Any) -> Any:
loop.close()
def _serialize_broker_reference(queue_name: str, redis_message_id: str) -> str:
"""Serialize queue name and Dramatiq redis message id into one field."""
return f"{queue_name}{DRAMATIQ_BROKER_REF_SEPARATOR}{redis_message_id}"
def _parse_broker_reference(broker_id: str | None) -> tuple[str, str] | None:
"""Parse queue name and Dramatiq redis message id from stored broker_id."""
if not broker_id or DRAMATIQ_BROKER_REF_SEPARATOR not in broker_id:
return None
queue_name, redis_message_id = broker_id.split(DRAMATIQ_BROKER_REF_SEPARATOR, 1)
if not queue_name or not redis_message_id:
return None
return queue_name, redis_message_id
def _get_job_status_sync(job_id: uuid.UUID) -> JobStatusEnum | None:
"""Read current job status using a sync connection (safe for Dramatiq workers)."""
import psycopg2
settings = get_settings()
dsn = (
f"host={settings.postgres_host} port={settings.postgres_port} "
f"dbname={settings.postgres_database} "
f"user={settings.postgres_user} password={settings.postgres_password}"
)
try:
conn = psycopg2.connect(dsn)
try:
with conn.cursor() as cur:
cur.execute("SELECT status FROM jobs WHERE id = %s", (str(job_id),))
row = cur.fetchone()
return row[0] if row else None
finally:
conn.close()
except Exception:
logger.warning("Failed to check job status for %s", job_id, exc_info=True)
return None
def _raise_if_job_cancelled(job_id: uuid.UUID) -> None:
"""Stop worker execution when the job is already cancelled in the database."""
status = _get_job_status_sync(job_id)
if status == JOB_STATUS_CANCELLED:
raise JobCancelledError(MESSAGE_CANCELLED)
# ---------------------------------------------------------------------------
# Dramatiq actors
# ---------------------------------------------------------------------------
@dramatiq.actor(max_retries=3, min_backoff=1000)
@dramatiq.actor(max_retries=0)
def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
"""Probe media file to extract metadata."""
from cpv3.modules.media.service import probe_media
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("media_probe_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -237,6 +297,9 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("media_probe_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("media_probe_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -247,10 +310,9 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
@dramatiq.actor(max_retries=0)
def silence_remove_actor(
job_id: str,
webhook_url: str,
@@ -264,6 +326,11 @@ def silence_remove_actor(
from cpv3.modules.media.service import remove_silence
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_remove_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -306,6 +373,9 @@ def silence_remove_actor(
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_remove_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_remove_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -316,10 +386,9 @@ def silence_remove_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
@dramatiq.actor(max_retries=0)
def silence_detect_actor(
job_id: str,
webhook_url: str,
@@ -332,6 +401,11 @@ def silence_detect_actor(
from cpv3.modules.media.service import detect_silence
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_detect_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -369,6 +443,9 @@ def silence_detect_actor(
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_detect_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_detect_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -379,10 +456,9 @@ def silence_detect_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
@dramatiq.actor(max_retries=0)
def silence_apply_actor(
job_id: str,
webhook_url: str,
@@ -395,6 +471,11 @@ def silence_apply_actor(
from cpv3.modules.media.service import apply_silence_cuts
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("silence_apply_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -436,6 +517,9 @@ def silence_apply_actor(
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("silence_apply_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("silence_apply_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -446,10 +530,9 @@ def silence_apply_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=3, min_backoff=1000)
@dramatiq.actor(max_retries=0)
def media_convert_actor(
job_id: str,
webhook_url: str,
@@ -461,6 +544,11 @@ def media_convert_actor(
from cpv3.modules.media.service import convert_to_mp4
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("media_convert_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -482,9 +570,7 @@ def media_convert_actor(
progress_pct=PROGRESS_MEDIA_CONVERT,
),
)
result = _run_async(
convert_to_mp4(storage, file_key=file_key, out_folder=out_folder)
)
result = _run_async(convert_to_mp4(storage, file_key=file_key, out_folder=out_folder))
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -499,6 +585,9 @@ def media_convert_actor(
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("media_convert_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("media_convert_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -509,10 +598,9 @@ def media_convert_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
@dramatiq.actor(max_retries=0)
def transcription_generate_actor(
job_id: str,
webhook_url: str,
@@ -528,6 +616,11 @@ def transcription_generate_actor(
)
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -548,11 +641,15 @@ def transcription_generate_actor(
raise ValueError(ERROR_NO_AUDIO_STREAM)
# Extract probe metadata for artifact creation
duration_seconds = float(probe.format.duration) if probe.format and probe.format.duration else 0.0
duration_seconds = (
float(probe.format.duration) if probe.format and probe.format.duration else 0.0
)
video_stream = next((s for s in probe.streams if s.codec_type == "video"), None)
probe_meta = {
"duration_seconds": duration_seconds,
"frame_rate": _parse_frame_rate(video_stream.r_frame_rate) if video_stream and video_stream.r_frame_rate else None,
"frame_rate": _parse_frame_rate(video_stream.r_frame_rate)
if video_stream and video_stream.r_frame_rate
else None,
"width": video_stream.width if video_stream else None,
"height": video_stream.height if video_stream else None,
}
@@ -573,9 +670,9 @@ def transcription_generate_actor(
if now - last_report_time < PROGRESS_THROTTLE_SECONDS:
return
last_report_time = now
mapped = PROGRESS_TRANSCRIPTION_START + (
pct / 100.0
) * (PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START)
mapped = PROGRESS_TRANSCRIPTION_START + (pct / 100.0) * (
PROGRESS_TRANSCRIPTION_END - PROGRESS_TRANSCRIPTION_START
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -617,18 +714,9 @@ def transcription_generate_actor(
finished_at=_utc_now(),
),
)
except (ValueError, RuntimeError) as exc:
logger.exception(
"transcription_generate_actor failed (non-transient): %s", job_uuid
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("transcription_generate_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("transcription_generate_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -639,22 +727,31 @@ def transcription_generate_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
RENDER_POLL_INTERVAL_SECONDS = 5
RENDER_POLL_TIMEOUT_SECONDS = 600
@dramatiq.actor(max_retries=0)
def captions_generate_actor(
job_id: str,
webhook_url: str,
video_s3_path: str,
folder: str,
transcription_json: dict,
style_config: dict | None = None,
) -> None:
"""Generate captions on video."""
"""Generate captions on video (async via Remotion + BullMQ)."""
from cpv3.modules.captions.service import generate_captions
from cpv3.modules.transcription.schemas import Document
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("captions_generate_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -673,21 +770,96 @@ def captions_generate_actor(
),
)
document = Document.model_validate(transcription_json)
output_path = _run_async(
# Call Remotion with callback_url so it sends progress webhooks directly
render_id = _run_async(
generate_captions(
video_s3_path=video_s3_path, folder=folder, transcription=document
video_s3_path=video_s3_path,
folder=folder,
transcription=document,
style_config=style_config,
callback_url=webhook_url,
render_id=job_id,
)
)
_raise_if_job_cancelled(job_uuid)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
current_message=MESSAGE_RENDERING_CAPTIONS,
output_data={"render_id": render_id},
),
)
# Polling fallback: wait for Remotion to finish rendering
# Primary progress is delivered via Remotion → webhook directly
settings = get_settings()
elapsed = 0.0
last_polled_status: str | None = None
while elapsed < RENDER_POLL_TIMEOUT_SECONDS:
_raise_if_job_cancelled(job_uuid)
time.sleep(RENDER_POLL_INTERVAL_SECONDS)
elapsed += RENDER_POLL_INTERVAL_SECONDS
try:
resp = httpx.get(
f"{settings.remotion_service_url}/api/render/{render_id}",
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception:
logger.warning("Render poll failed for %s, retrying...", render_id)
continue
status = data.get("status")
if status != last_polled_status:
logger.info(
"Remotion render %s status=%s progress=%s callback_delivered=%s",
render_id,
status,
data.get("progress_pct"),
data.get("callback_delivered"),
)
last_polled_status = status
if status == "done":
if data.get("callback_delivered") is True:
# Remotion already sent DONE webhook — exit cleanly
return
output_path = data.get("output_path")
if not output_path:
raise RuntimeError(
"Remotion render finished without output_path in polling response"
)
logger.warning(
"Remotion render %s finished without confirmed DONE webhook, sending fallback completion",
render_id,
)
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
status=JOB_STATUS_DONE,
current_message=MESSAGE_COMPLETED,
progress_pct=PROGRESS_COMPLETE,
output_data={"output_path": output_path},
current_message="Готово",
output_data={"output_path": str(output_path)},
finished_at=_utc_now(),
),
)
return
if status == "failed":
error = data.get("error", "Render failed")
raise RuntimeError(f"Remotion render failed: {error}")
raise TimeoutError(
f"Render {render_id} did not complete within {RENDER_POLL_TIMEOUT_SECONDS}s"
)
except JobCancelledError:
logger.info("captions_generate_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("captions_generate_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -698,10 +870,9 @@ def captions_generate_actor(
finished_at=_utc_now(),
),
)
raise
@dramatiq.actor(max_retries=2, min_backoff=2000)
@dramatiq.actor(max_retries=0)
def frame_extract_actor(
job_id: str,
webhook_url: str,
@@ -717,6 +888,11 @@ def frame_extract_actor(
)
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("frame_extract_actor cancelled: %s", job_uuid)
return
_send_webhook_event(
webhook_url,
TaskWebhookEvent(
@@ -738,9 +914,7 @@ def frame_extract_actor(
progress_pct=PROGRESS_FRAME_EXTRACT_START,
),
)
old_meta = _run_async(
read_frames_metadata(storage, frames_folder=frames_folder)
)
old_meta = _run_async(read_frames_metadata(storage, frames_folder=frames_folder))
if old_meta is not None:
_run_async(
delete_frames(
@@ -797,6 +971,9 @@ def frame_extract_actor(
finished_at=_utc_now(),
),
)
except JobCancelledError:
logger.info("frame_extract_actor cancelled: %s", job_uuid)
return
except Exception as exc:
logger.exception("frame_extract_actor failed: %s", job_uuid)
_send_webhook_event(
@@ -807,7 +984,6 @@ def frame_extract_actor(
finished_at=_utc_now(),
),
)
raise
# ---------------------------------------------------------------------------
@@ -824,6 +1000,97 @@ class TaskService:
self._event_repo = JobEventRepository(session)
self._webhook_repo = WebhookRepository(session)
async def _update_job_broker_reference(self, job: Job, broker_reference: str) -> Job:
"""Persist the transport-specific broker reference after enqueueing."""
job.broker_id = broker_reference
await self._session.commit()
await self._session.refresh(job)
return job
async def _find_duplicate_active_job(
self,
*,
requester: User,
job_type: JobTypeEnum,
project_id: uuid.UUID | None,
input_data: dict,
) -> Job | None:
"""Reuse an already running job for the same request payload."""
jobs = await self._job_repo.list_active_by_type(
requester=requester,
job_type=job_type,
project_id=project_id,
statuses=ACTIVE_JOB_STATUSES,
)
for job in jobs:
if job.input_data == input_data:
return job
return None
async def _create_cancellation_notification(self, job: Job) -> None:
"""Emit a single cancellation notification to the current user."""
if job.user_id is None:
return
notification_service = NotificationService(self._session)
await notification_service.create_task_notification(
user_id=job.user_id,
job=job,
event=TaskWebhookEvent(
status=JOB_STATUS_CANCELLED,
current_message=MESSAGE_CANCELLED,
finished_at=job.finished_at,
),
)
def _cancel_dramatiq_message_sync(self, broker_id: str | None) -> None:
"""Remove a queued Dramatiq message from Redis when possible."""
broker_reference = _parse_broker_reference(broker_id)
if broker_reference is None:
return
queue_name, redis_message_id = broker_reference
namespace = _redis_broker.namespace
queue_key = f"{namespace}:{queue_name}"
queue_messages_key = f"{queue_key}.msgs"
delayed_queue_key = f"{queue_key}.DQ"
delayed_messages_key = f"{delayed_queue_key}.msgs"
acks_pattern = f"{namespace}:__acks__.*.{queue_name}"
pipeline = _redis_broker.client.pipeline()
pipeline.lrem(queue_key, 1, redis_message_id)
pipeline.hdel(queue_messages_key, redis_message_id)
pipeline.lrem(delayed_queue_key, 1, redis_message_id)
pipeline.hdel(delayed_messages_key, redis_message_id)
for key in _redis_broker.client.scan_iter(match=acks_pattern):
pipeline.srem(key, redis_message_id)
pipeline.execute()
async def _cancel_dramatiq_message(self, broker_id: str | None) -> None:
"""Run Redis queue cleanup for a Dramatiq message off the event loop."""
await asyncio.to_thread(self._cancel_dramatiq_message_sync, broker_id)
async def _cancel_caption_render(self, job: Job) -> None:
"""Cancel the downstream Remotion render when it has already been queued."""
if job.job_type != JOB_TYPE_CAPTIONS_GENERATE:
return
output_data = job.output_data or {}
render_id = output_data.get("render_id") or str(job.id)
settings = get_settings()
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT_SECONDS) as client:
response = await client.delete(
f"{settings.remotion_service_url}/api/render/{render_id}"
)
if response.status_code == 404:
return
response.raise_for_status()
async def _create_job_and_webhook(
self,
*,
@@ -873,21 +1140,31 @@ class TaskService:
project_id=project_id,
input_data=input_data,
)
actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs)
message = actor.send(job_id=str(job.id), webhook_url=webhook_url, **actor_kwargs)
redis_message_id = message.options.get("redis_message_id")
if redis_message_id:
broker_reference = _serialize_broker_reference(
message.queue_name,
str(redis_message_id),
)
await self._update_job_broker_reference(job, broker_reference)
return TaskSubmitResponse(
job_id=job.id,
webhook_url=webhook_url,
status=JOB_STATUS_PENDING,
)
async def record_webhook_event(
self, *, job_id: uuid.UUID, event: TaskWebhookEvent
) -> Job:
async def record_webhook_event(self, *, job_id: uuid.UUID, event: TaskWebhookEvent) -> Job:
"""Apply a webhook event to the job and store a job event record."""
job = await self._job_repo.get_by_id(job_id)
if job is None:
raise ValueError(f"Job {job_id} not found")
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
logger.info("Ignoring webhook for terminal job %s (status=%s)", job_id, job.status)
return job
job_update = JobUpdate(
status=event.status,
project_pct=event.progress_pct,
@@ -906,24 +1183,23 @@ class TaskService:
)
# Save artifacts BEFORE sending notifications so data exists when frontend refetches
if (
job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE
and event.status == JOB_STATUS_DONE
):
if job.job_type == JOB_TYPE_TRANSCRIPTION_GENERATE and event.status == JOB_STATUS_DONE:
try:
await self._save_transcription_artifacts(job)
except Exception:
logger.exception(
"Failed to save transcription artifacts for job %s", job_id
)
logger.exception("Failed to save transcription artifacts for job %s", job_id)
if job.job_type == JOB_TYPE_MEDIA_CONVERT and event.status == JOB_STATUS_DONE:
try:
await self._save_convert_artifacts(job)
except Exception:
logger.exception(
"Failed to save convert artifacts for job %s", job_id
)
logger.exception("Failed to save convert artifacts for job %s", job_id)
if job.job_type == JOB_TYPE_CAPTIONS_GENERATE and event.status == JOB_STATUS_DONE:
try:
await self._save_captions_artifacts(job)
except Exception:
logger.exception("Failed to save captions artifacts for job %s", job_id)
# Push real-time notification via WebSocket (after artifacts are persisted)
if job.user_id is not None:
@@ -937,6 +1213,50 @@ class TaskService:
return job
async def cancel_job(self, job: Job) -> Job:
"""Cancel a job, clean queued transport state and ignore late webhooks."""
if job.status in (JOB_STATUS_DONE, JOB_STATUS_FAILED, JOB_STATUS_CANCELLED):
return job
try:
await self._cancel_dramatiq_message(job.broker_id)
except Exception:
logger.exception("Failed to cancel Dramatiq message for job %s", job.id)
try:
await self._cancel_caption_render(job)
except Exception:
logger.exception("Failed to cancel caption render for job %s", job.id)
finished_at = _utc_now()
job = await self._job_repo.update(
job,
JobUpdate(
status=JOB_STATUS_CANCELLED,
current_message=MESSAGE_CANCELLED,
finished_at=finished_at,
),
)
await self._event_repo.create(
JobEventCreate(
job_id=job.id,
event_type=f"{EVENT_TYPE_STATUS_PREFIX}{JOB_STATUS_CANCELLED}",
payload={
"status": JOB_STATUS_CANCELLED,
"current_message": MESSAGE_CANCELLED,
"finished_at": finished_at.isoformat(),
},
)
)
try:
await self._create_cancellation_notification(job)
except Exception:
logger.exception("Failed to create cancellation notification for job %s", job.id)
return job
async def _save_transcription_artifacts(self, job: Job) -> None:
"""Create Transcription, ArtifactMediaFile and File records."""
input_data = job.input_data or {}
@@ -980,9 +1300,9 @@ class TaskService:
user_folder = get_user_folder(user)
json_bytes = json.dumps(document, ensure_ascii=False).encode("utf-8")
# Build display name: "Транскрипция <video_name>.json"
# Build display name: "Транскрибация <video_name>.json"
video_stem = Path(source_file.original_filename).stem
transcription_filename = f"Транскрипция {video_stem}.json"
transcription_filename = f"Транскрибация {video_stem}.json"
artifact_key = await storage.upload_fileobj(
fileobj=io.BytesIO(json_bytes),
@@ -1061,7 +1381,7 @@ class TaskService:
stem = Path(source_file.original_filename).stem
else:
stem = Path(file_key).stem
converted_filename = f"{stem}.mp4"
converted_filename = f"Конвертированое видео {stem}.mp4"
# Create File record for the converted MP4 (no project_id — only reachable via artifact)
converted_file = await file_repo.create(
@@ -1091,6 +1411,73 @@ class TaskService:
logger.info("Saved convert artifacts for job %s", job.id)
async def _save_captions_artifacts(self, job: Job) -> None:
"""Create File and ArtifactMediaFile records for captioned video."""
input_data = job.input_data or {}
output_data = job.output_data or {}
video_s3_path: str = input_data["video_s3_path"]
project_id: uuid.UUID | None = (
uuid.UUID(input_data["project_id"]) if input_data.get("project_id") else None
)
output_path: str = output_data["output_path"]
# Resolve user
user_repo = UserRepository(self._session)
user = await user_repo.get_by_id(job.user_id) # type: ignore[arg-type]
if user is None:
logger.warning("User %s not found, skipping captions artifact save", job.user_id)
return
# Get file size from S3
storage = _get_storage_service()
file_size = await storage.size(output_path)
# Derive output filename from source video
file_repo = FileRepository(self._session)
source_file = await file_repo.get_by_path(video_s3_path)
if source_file is not None:
stem = Path(source_file.original_filename).stem
else:
stem = Path(video_s3_path).stem
captioned_filename = f"Видео с субтитрами {stem}.mp4"
# Create File record
captioned_file = await file_repo.create(
requester=user,
data=FileCreate(
project_id=project_id,
original_filename=captioned_filename,
path=output_path,
storage_backend="S3",
mime_type="video/mp4",
size_bytes=file_size,
file_format="mp4",
is_uploaded=True,
),
)
# Create ArtifactMediaFile record
artifact_repo = ArtifactRepository(self._session)
await artifact_repo.create(
data=ArtifactMediaFileCreate(
project_id=project_id,
file_id=captioned_file.id,
media_file_id=None,
artifact_type="RENDERED_VIDEO",
),
)
# Update job output_data with file_id so frontend can reference it
updated_output = dict(output_data)
updated_output["file_id"] = str(captioned_file.id)
job = await self._job_repo.update(
job, JobUpdate(output_data=updated_output)
)
logger.info("Saved captions artifacts for job %s (file_id=%s)", job.id, captioned_file.id)
async def submit_media_probe(
self, *, requester: User, request: MediaProbeRequest
) -> TaskSubmitResponse:
@@ -1238,6 +1625,22 @@ class TaskService:
self, *, requester: User, request: CaptionsGenerateRequest
) -> TaskSubmitResponse:
"""Submit captions generation task."""
from cpv3.modules.captions.service import CaptionPresetService
input_data = request.model_dump(mode="json")
existing_job = await self._find_duplicate_active_job(
requester=requester,
job_type=JOB_TYPE_CAPTIONS_GENERATE,
project_id=request.project_id,
input_data=input_data,
)
if existing_job is not None:
return TaskSubmitResponse(
job_id=existing_job.id,
webhook_url=_build_webhook_url(existing_job.id),
status=existing_job.status,
)
transcription_repo = TranscriptionRepository(self._session)
transcription = await transcription_repo.get_by_id(request.transcription_id)
if transcription is None:
@@ -1245,20 +1648,26 @@ class TaskService:
user_folder = get_user_folder(requester)
resolved_folder = (
f"{user_folder}/{request.folder}"
if request.folder
else f"{user_folder}/output_files"
f"{user_folder}/{request.folder}" if request.folder else f"{user_folder}/output_files"
)
# Resolve style config from preset or inline override
preset_service = CaptionPresetService(self._session)
style_config = await preset_service.resolve_style_config(
preset_id=request.preset_id,
inline_config=request.style_config,
)
return await self._submit_task(
requester=requester,
job_type=JOB_TYPE_CAPTIONS_GENERATE,
project_id=request.project_id,
input_data=request.model_dump(mode="json"),
input_data=input_data,
actor=captions_generate_actor,
actor_kwargs={
"video_s3_path": request.video_s3_path,
"folder": resolved_folder,
"transcription_json": transcription.document,
"style_config": style_config,
},
)
+1 -1
View File
@@ -72,7 +72,7 @@ services:
REDIS_URL: redis://redis:6379/0
WEBHOOK_BASE_URL: http://api:8000
REMOTION_SERVICE_URL: ${REMOTION_SERVICE_URL:-http://localhost:8001}
REMOTION_SERVICE_URL: ${REMOTION_SERVICE_URL:-http://remotion:3001}
ports:
- "8000:8000"
command: >
+17
View File
@@ -0,0 +1,17 @@
The user wants to refactor a backend project following DRY and KISS, keeping it developer-readable without hidden abstractions.
The user explicitly rejected a plan that did "in-place cleanup" of `cpv3/modules/tasks/service.py` and instead wants a different approach.
Currently, `tasks/service.py` is a 1600+ line monolith that:
1. Defines Dramatiq actors
2. Orchestrates job creation and duplication-checks
3. Has `submit_X` methods (e.g. `submit_media_probe`, `submit_transcription_generate`)
4. Has a giant `record_webhook_event` that updates the `Job` state
5. Contains domain-specific artifact saving (e.g. `_save_transcription_artifacts`, `_save_convert_artifacts`)
6. Handles cancellation logic.
Constraints:
1. Must use the exact module structure: `models.py`, `schemas.py`, `repository.py`, `service.py`, `router.py` (no subdirectories within modules).
2. Explicit, local helpers over hidden abstractions (no generic handler registries).
What is the best architectural approach to distribute these responsibilities across `jobs`, `tasks`, and domain modules (`media`, `transcription`, `captions`)?
Provide a concrete file-by-file blueprint on where each responsibility belongs and how the cross-module calls would look like, avoiding circular dependencies.
+155
View File
@@ -0,0 +1,155 @@
from __future__ import annotations
import asyncio
from hmac import new
from typing import assert_type
import uuid
from types import SimpleNamespace
from cpv3.modules.captions import service as captions_service
from cpv3.modules.tasks import service as task_service
def _make_document_json() -> dict:
return {
"segments": [
{
"text": "Привет",
"semantic_tags": [],
"structure_tags": [],
"time": {"start": 0.0, "end": 1.0},
"lines": [
{
"text": "Привет",
"semantic_tags": [],
"structure_tags": [],
"time": {"start": 0.0, "end": 1.0},
"words": [
{
"text": "Привет",
"semantic_tags": [],
"structure_tags": [],
"time": {"start": 0.0, "end": 1.0},
}
],
}
],
}
]
}
class _FakeResponse:
def __init__(self, payload: dict) -> None:
self._payload = payload
def raise_for_status(self) -> None:
return None
def json(self) -> dict:
return self._payload
def test_captions_generate_actor_sends_fallback_done_when_callback_not_confirmed(
monkeypatch,
) -> None:
sent_events: list[task_service.TaskWebhookEvent] = []
async def fake_generate_captions(**_: object) -> str:
return "render-123"
monkeypatch.setattr(captions_service, "generate_captions", fake_generate_captions)
monkeypatch.setattr(task_service, "_run_async", asyncio.run)
monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None)
monkeypatch.setattr(
task_service,
"_send_webhook_event",
lambda _url, event: sent_events.append(event),
)
monkeypatch.setattr(task_service.time, "sleep", lambda _seconds: None)
monkeypatch.setattr(
task_service,
"get_settings",
lambda: SimpleNamespace(remotion_service_url="http://remotion.test"),
)
monkeypatch.setattr(
task_service.httpx,
"get",
lambda *_args, **_kwargs: _FakeResponse(
{
"status": "done",
"output_path": "projects/1/captioned/video.mp4",
"callback_delivered": False,
}
),
)
task_service.captions_generate_actor.fn(
job_id=str(uuid.uuid4()),
webhook_url="http://backend.test/api/tasks/webhook/job-1/",
video_s3_path="uploads/source.mp4",
folder="projects/1",
transcription_json=_make_document_json(),
style_config=None,
)
done_events = [
event for event in sent_events if event.status == task_service.JOB_STATUS_DONE
]
assert len(done_events) == 1
assert done_events[0].progress_pct == task_service.PROGRESS_COMPLETE
assert done_events[0].current_message == "Готово"
assert done_events[0].output_data == {
"output_path": "projects/1/captioned/video.mp4"
}
def test_captions_generate_actor_skips_fallback_when_done_webhook_confirmed(
monkeypatch,
) -> None:
sent_events: list[task_service.TaskWebhookEvent] = []
async def fake_generate_captions(**_: object) -> str:
return "render-123"
monkeypatch.setattr(captions_service, "generate_captions", fake_generate_captions)
monkeypatch.setattr(task_service, "_run_async", asyncio.run)
monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None)
monkeypatch.setattr(
task_service,
"_send_webhook_event",
lambda _url, event: sent_events.append(event),
)
monkeypatch.setattr(task_service.time, "sleep", lambda _seconds: None)
monkeypatch.setattr(
task_service,
"get_settings",
lambda: SimpleNamespace(remotion_service_url="http://remotion.test"),
)
monkeypatch.setattr(
task_service.httpx,
"get",
lambda *_args, **_kwargs: _FakeResponse(
{
"status": "done",
"output_path": "projects/1/captioned/video.mp4",
"callback_delivered": True,
}
),
)
task_service.captions_generate_actor.fn(
job_id=str(uuid.uuid4()),
webhook_url="http://backend.test/api/tasks/webhook/job-1/",
video_s3_path="uploads/source.mp4",
folder="projects/1",
transcription_json=_make_document_json(),
style_config=None,
)
done_events = [
event for event in sent_events if event.status == task_service.JOB_STATUS_DONE
]
assert done_events == []
+108
View File
@@ -0,0 +1,108 @@
from __future__ import annotations
import uuid
from types import SimpleNamespace
from unittest.mock import AsyncMock
import pytest
from cpv3.modules.tasks.schemas import CaptionsGenerateRequest, TaskWebhookEvent
from cpv3.modules.tasks.service import TaskService
@pytest.mark.asyncio
async def test_submit_captions_generate_reuses_existing_active_job() -> None:
service = TaskService(session=AsyncMock())
existing_job_id = uuid.uuid4()
existing_job = SimpleNamespace(
id=existing_job_id,
status="RUNNING",
)
service._find_duplicate_active_job = AsyncMock(return_value=existing_job)
service._submit_task = AsyncMock()
response = await service.submit_captions_generate(
requester=SimpleNamespace(id=uuid.uuid4()),
request=CaptionsGenerateRequest(
video_s3_path="projects/test/video.mp4",
folder="output_files",
transcription_id=uuid.uuid4(),
project_id=uuid.uuid4(),
preset_id=uuid.uuid4(),
),
)
assert response.job_id == existing_job_id
assert response.status == "RUNNING"
assert response.webhook_url.endswith(f"/api/tasks/webhook/{existing_job_id}/")
service._submit_task.assert_not_awaited()
@pytest.mark.asyncio
async def test_record_webhook_event_ignores_cancelled_job() -> None:
cancelled_job = SimpleNamespace(
id=uuid.uuid4(),
status="CANCELLED",
)
job_repo = SimpleNamespace(
get_by_id=AsyncMock(return_value=cancelled_job),
update=AsyncMock(),
)
event_repo = SimpleNamespace(create=AsyncMock())
service = TaskService(session=AsyncMock())
service._job_repo = job_repo
service._event_repo = event_repo
result = await service.record_webhook_event(
job_id=cancelled_job.id,
event=TaskWebhookEvent(
status="DONE",
current_message="Готово",
output_data={"output_path": "projects/test/output.mp4"},
),
)
assert result is cancelled_job
job_repo.update.assert_not_awaited()
event_repo.create.assert_not_awaited()
@pytest.mark.asyncio
async def test_cancel_job_marks_job_cancelled_and_keeps_record() -> None:
job_id = uuid.uuid4()
user_id = uuid.uuid4()
job = SimpleNamespace(
id=job_id,
status="PENDING",
broker_id="default:redis-message-id",
job_type="CAPTIONS_GENERATE",
user_id=user_id,
)
cancelled_job = SimpleNamespace(
id=job_id,
status="CANCELLED",
broker_id="default:redis-message-id",
job_type="CAPTIONS_GENERATE",
user_id=user_id,
current_message="Отменено пользователем",
)
service = TaskService(session=AsyncMock())
service._job_repo = SimpleNamespace(update=AsyncMock(return_value=cancelled_job))
service._event_repo = SimpleNamespace(create=AsyncMock())
service._cancel_dramatiq_message = AsyncMock()
service._cancel_caption_render = AsyncMock()
service._create_cancellation_notification = AsyncMock()
result = await service.cancel_job(job)
assert result is cancelled_job
service._job_repo.update.assert_awaited_once()
service._event_repo.create.assert_awaited_once()
service._cancel_dramatiq_message.assert_awaited_once_with(job.broker_id)
service._cancel_caption_render.assert_awaited_once_with(job)
service._create_cancellation_notification.assert_awaited_once_with(
cancelled_job
)