- Remove contradictory tasks/artifacts.py (artifacts go to owning modules) - Distinguish simple (5) vs complex (3) actors for _run_actor() pattern - Make apply_partial_update sync (no I/O) - Remove soft_delete helper (one-liner, confusing with dual patterns) - Fix false dead code claim about media/service.py json import - Add all project_pct → progress_pct reference locations - Add broker import order risk and worker command change - Add _run_async() and _parse_frame_rate() placement Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
18 KiB
Backend Comprehensive Refactor — Design Spec
Date: 2026-03-15
Scope: cofee_backend/ — all modules, common layer, infrastructure layer
Approach: Top-down (decompose tasks/ first, then extract shared helpers, then apply across modules)
Problem Summary
The backend has grown feature-by-feature and accumulated structural debt:
tasks/service.pyis 1673 lines mixing four concerns: broker infrastructure, actor lifecycle, artifact persistence, and task submission orchestration. Eight actors repeat an identical ~40-line lifecycle skeleton.- CRUD modules repeat the same patterns: repository update loops (
model_dump+ commit + refresh), router permission checks (fetch-or-404, owner-or-staff, raise 403), inline error strings. - Auth/ownership is inconsistent — some modules enforce ownership, others don't.
- Layering is inconsistent — some routers bypass the service layer and call repositories directly.
- Error messages are inline strings instead of
ERROR_constants. - Dead code, naming issues, and minor bugs exist across several modules.
Guiding Principles
- DRY: Extract repeated patterns into small free functions, not base classes
- KISS: Helpers are plain functions — no magic, no decorators, no hidden behavior
- Developer-readable: Every call site shows what it does; no hidden abstractions
- Preserve structure: Keep the existing module pattern; allow extra files only when a single
service.pywould exceed ~300 lines - Shared helpers in
common/: Application-level helpers (DB update, HTTP permission checks) - Infrastructure for infrastructure: Only broker/transport concerns go in
infrastructure/
Phase 1: Decompose tasks/service.py
1.1 New file layout
cpv3/modules/tasks/
├── __init__.py (unchanged)
├── schemas.py (unchanged)
├── router.py (unchanged)
├── service.py TaskService class only: submit_* methods + record_webhook_event + cancel_job
├── actors.py NEW: all @dramatiq.actor functions, _run_actor() helper, _run_async(), _parse_frame_rate()
├── webhooks.py NEW: webhook transport (_send_webhook_event, _build_webhook_url, event helpers)
└── constants.py NEW: ERROR_, MESSAGE_, STATUS_ constants, JobCancelledError, all in Russian
Note: tasks/artifacts.py is NOT created. Artifact persistence moves directly to the owning modules (see Section 1.3).
New infrastructure file:
cpv3/infrastructure/dramatiq.py
- RedisBroker setup and initialization
- _serialize_broker_reference() / _parse_broker_reference()
- _get_job_status_sync() (raw psycopg2 cancellation check)
- _raise_if_job_cancelled()
- _run_async() (sync wrapper for running async code in worker context)
Important: infrastructure/dramatiq.py MUST be imported before any actor module to ensure the broker is set. Add import cpv3.infrastructure.dramatiq at the top of tasks/actors.py. The Dramatiq worker command changes from uv run dramatiq cpv3.modules.tasks.service to uv run dramatiq cpv3.modules.tasks.actors. Update CLAUDE.md, docker-compose.yml, and any deployment configs accordingly.
1.2 The _run_actor() pattern
Two variants: a simple lifecycle helper for straightforward actors, and manual lifecycle management for complex actors that need intermediate progress or polling.
Simple actors (5 of 8): _run_actor()
Used by: media_probe_actor, silence_remove_actor, silence_detect_actor, silence_apply_actor, media_convert_actor
def _run_actor(
job_id: str,
webhook_url: str,
work_fn: Callable[..., dict[str, Any]],
*,
starting_message: str = MESSAGE_STARTING,
**work_kwargs: Any,
) -> None:
"""Shared actor lifecycle: cancel-check → RUNNING → work → DONE/FAILED.
work_fn is synchronous. For async work, use _run_async() inside work_fn.
work_fn receives job_uuid (uuid.UUID) and webhook_url (str) as kwargs
plus any additional **work_kwargs.
"""
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("Actor cancelled before start: %s", job_uuid)
return
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=starting_message,
started_at=_utc_now(),
))
try:
result = work_fn(job_uuid=job_uuid, webhook_url=webhook_url, **work_kwargs)
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_DONE,
output=result,
finished_at=_utc_now(),
))
except JobCancelledError:
logger.info("Actor cancelled mid-run: %s", job_uuid)
except Exception as exc:
logger.exception("Actor failed: %s", job_uuid)
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
))
Each simple actor becomes ~5-15 lines defining only its unique work function and parameters.
Complex actors (3 of 8): manual lifecycle with shared helpers
transcription_generate_actor, captions_generate_actor, and frame_extract_actor do NOT use _run_actor() because they need:
- Intermediate progress webhooks from inside the work function
- Polling loops with
time.sleep()and repeated cancellation checks (captions_generate) - Progress callbacks from external tools (
_on_whisper_progressin transcription) - Multi-phase status updates (e.g., "deleting old frames → extracting → uploading" in frame_extract)
These actors keep their own lifecycle management but use the shared helpers from tasks/webhooks.py (_send_webhook_event) and infrastructure/dramatiq.py (_raise_if_job_cancelled). They still benefit from the decomposition because constants, webhook transport, and broker infrastructure are no longer duplicated — just the lifecycle wrapper is actor-specific.
Approximate line reduction: ~40-60 lines each (down from ~100-130) due to extracted constants, imports, and webhook helpers.
1.3 Artifact persistence → owning modules
Move artifact-saving logic to the modules that own those domain concepts:
_save_transcription_artifacts()→transcription/service.pyasTranscriptionService.save_artifacts()_save_convert_artifacts()→media/service.pyasMediaService.save_artifacts()_save_captions_artifacts()→captions/service.pyasCaptionService.save_artifacts()
TaskService.record_webhook_event() dispatches to these module-owned methods.
1.4 What stays in tasks/service.py
Only the TaskService class (~300-400 lines):
__init__()with session + job reposubmit_media_probe(),submit_silence_detect(),submit_silence_remove(),submit_convert(),submit_frame_extract(),submit_transcription_generate(),submit_captions_generate()record_webhook_event()— orchestration only, delegates artifact savingcancel_job()
1.5 Constants language
All user-facing constants in tasks/constants.py are in Russian. The current mix of English and Russian (tasks/service.py:93-103) is unified to Russian.
Phase 2: Shared Helpers in common/
2.1 common/db.py — Repository helpers
Two free functions:
def apply_partial_update(instance: object, data: BaseModel) -> None:
"""Apply non-None fields from a Pydantic schema to a SQLAlchemy model.
Synchronous — no I/O, just attribute setting."""
async def commit_and_refresh(session: AsyncSession, instance: object) -> None:
"""Commit the session and refresh the instance."""
Note: apply_partial_update is deliberately synchronous — it only does model_dump + setattr, no I/O. soft_delete is NOT extracted as a shared helper — it's a one-liner (setattr + commit) and the is_active vs is_deleted split makes a generic helper more confusing than helpful. Each repository keeps its own delete method with a comment noting which soft-delete pattern it uses.
Documentation comment in common/db.py explains the two soft-delete patterns:
# Soft-delete patterns in the codebase:
# - is_active=False: users, projects, jobs, webhooks, transcription, captions
# - is_deleted=True: files, media (MediaFile, ArtifactMediaFile)
# NOT unified in this refactor — requires data migration.
# Each repository handles its own soft-delete; see the module's repository.py.
2.2 common/http.py — Router helpers
Two free functions + two error constants:
ERROR_NOT_FOUND = "Объект не найден"
ERROR_FORBIDDEN = "Доступ запрещён"
def ensure_found(instance: object | None, detail: str = ERROR_NOT_FOUND) -> None:
"""Raise 404 if instance is None."""
def ensure_owner_or_staff(
instance: object,
user: User,
*,
owner_field: str = "owner_id",
) -> None:
"""Raise 403 if user is not staff and doesn't own the resource."""
Phase 3: Apply Shared Helpers Across All CRUD Modules
3.1 Repository updates (7 modules)
Replace model_dump loop + commit + refresh with apply_partial_update() + commit_and_refresh():
files/repository.pymedia/repository.py(×2: MediaFile + Artifact)projects/repository.pyjobs/repository.py(×2: Job + JobEvent)webhooks/repository.pytranscription/repository.pycaptions/repository.py(special case:style_confignested serialization stays explicit)
3.2 Router permission checks (7 modules)
Replace inline 404/403 checks with ensure_found() + ensure_owner_or_staff():
users/router.py(3 sites)projects/router.py(3 sites)files/router.py(3 sites + storage path check)media/router.py(6 sites: mediafiles + artifacts)jobs/router.py(6 sites: jobs + events)webhooks/router.py(3 sites)captions/router.py(2 sites)
3.3 Add missing permission checks
While applying helpers, add ownership checks where currently absent:
| Module | Current state | Fix |
|---|---|---|
transcription/router.py |
No ownership checks | Add ensure_owner_or_staff() via parent project |
media/router.py artifacts |
_ = current_user (ignores user) |
Add ownership check via parent media file |
captions/router.py get-by-id |
Returns any preset by ID | Only return if system preset or owned by requester |
jobs/router.py events |
No user scoping on events | Add ownership check via parent job |
Phase 4: Error Handling, Naming, Dead Code, Security
4.1 Error message constants
Extract inline strings into ERROR_ constants at the top of each file:
infrastructure/auth.py→ERROR_TOKEN_EXPIRED,ERROR_INVALID_TOKEN,ERROR_USER_NOT_FOUNDusers/service.py→ERROR_EMAIL_ALREADY_EXISTS,ERROR_INVALID_CREDENTIALSusers/router.py→ERROR_PASSWORD_MISMATCH,ERROR_INCORRECT_PASSWORDfiles/router.py→ERROR_FILE_TOO_LARGE,ERROR_EMPTY_FILEmedia/service.py→ERROR_PROBE_FAILED,ERROR_NO_VIDEO_STREAM,ERROR_FFMPEG_FAILED
4.2 Naming fix
Rename project_pct → progress_pct in:
jobs/models.pyjobs/schemas.py(3 occurrences)tasks/service.py(references inrecord_webhook_eventand submit methods)tasks/router.py(maps job field to response)notifications/service.py(reads progress for notification display)tests/integration/test_jobs_endpoints.py(test data and assertions)- Alembic migration for the DB column rename
- Check frontend for field dependency before applying
4.3 Dead code removal
| Dead code | Location |
|---|---|
| Legacy service function exports | users/service.py:60-105, projects/service.py:46-69 |
WordOptions.max_line_count field |
transcription/schemas.py — field defined but never read in any code path. Check frontend API types before removing to confirm it's not sent by the client. If frontend sends it, keep the field but add a # TODO: implement comment. |
filename_without_ext dead assignment |
media/service.py:434 — the variable is assigned but never used at this line (earlier assignments at ~396 are valid) |
Correction: The top-level import json in media/service.py:7 is NOT dead — json.loads() and json.dumps() are used at multiple points. Do not remove it.
4.4 Notification completeness
Add missing entries to notifications/service.py:
JOB_TYPE_LABELS: addFRAME_EXTRACT(Russian label)STATUS_TITLES: addCANCELLED(Russian label)- All notification messages must be in Russian
4.5 Duplicate WebSocket auth
Extract decode_user_id_from_token(token: str) -> uuid.UUID in infrastructure/auth.py.
Both get_current_user() and notifications/router.py WebSocket handler reuse it.
WebSocket handler keeps its own session management.
4.6 Settings fix
Change remotion_service_url default from http://localhost:8001 to http://localhost:3001 in infrastructure/settings.py.
4.7 Path traversal fix
Add root boundary check in files/router.py local file serving:
full_path = (settings.local_storage_dir / file_path).resolve()
root = Path(settings.local_storage_dir).resolve()
if not str(full_path).startswith(str(root)):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_FORBIDDEN)
4.8 Mutable default fix
Change media/schemas.py:138 from streams: list[StreamSchema] = [] to streams: list[StreamSchema] = Field(default_factory=list).
Phase 5: Consistent Layering and Tests
5.1 Fix Router → Service → Repository layering
| Module | Current state | Fix |
|---|---|---|
media/router.py |
CRUD calls MediaFileRepository directly |
Route through MediaService — add thin CRUD wrappers |
transcription/router.py |
All routes call TranscriptionRepository directly |
Route through TranscriptionService — add thin CRUD wrappers |
notifications/router.py |
Uses NotificationRepository directly |
Route through NotificationService — add list/get/mark_read |
Service methods are pass-through calls to the repository. No new business logic invented. Value is consistent architecture.
5.2 Test additions
| Test | Reason |
|---|---|
Unit test _run_actor() lifecycle (cancel/success/failure) |
Core new abstraction |
Unit test ensure_found() / ensure_owner_or_staff() |
Shared helpers |
Unit test apply_partial_update() / commit_and_refresh() |
Shared helpers |
| Integration tests for new ownership checks (transcription, media artifacts, caption presets, job events) | New permission enforcement |
| Test path traversal prevention on local file route | Security fix |
No full test rewrites. No new integration test infrastructure.
5.3 Update CLAUDE.md
Reflect the new architecture:
- Module file rule updated: extra files allowed when
service.pywould exceed ~300 lines - Document
common/db.pyandcommon/http.py - Document
infrastructure/dramatiq.py - Document
ensure_found/ensure_owner_or_staffas the standard router permission pattern - Document
apply_partial_update/commit_and_refreshas the standard repository update pattern
Files Changed Summary
New files
| File | Purpose |
|---|---|
cpv3/common/db.py |
Repository helpers: apply_partial_update, commit_and_refresh |
cpv3/common/http.py |
Router helpers: ensure_found, ensure_owner_or_staff, ERROR_ constants |
cpv3/infrastructure/dramatiq.py |
RedisBroker setup, broker-id serialization, cancellation check, _run_async() |
cpv3/modules/tasks/actors.py |
All @dramatiq.actor functions + _run_actor() lifecycle helper + _parse_frame_rate() |
cpv3/modules/tasks/webhooks.py |
Webhook transport helpers |
cpv3/modules/tasks/constants.py |
All ERROR_, MESSAGE_, STATUS_ constants (Russian) |
Modified files (by phase)
Phase 1 — tasks decomposition:
cpv3/modules/tasks/service.py— stripped to TaskService class onlycpv3/modules/transcription/service.py— receivessave_artifacts()cpv3/modules/media/service.py— receivessave_artifacts()cpv3/modules/captions/service.py— receivessave_artifacts()
Phase 3 — apply shared helpers:
- All 7 module
repository.pyfiles — useapply_partial_update()+commit_and_refresh() - All 7 module
router.pyfiles — useensure_found()+ensure_owner_or_staff()
Phase 4 — error handling, cleanup:
cpv3/infrastructure/auth.py— error constants + shared token decodercpv3/infrastructure/settings.py— fix remotion port defaultcpv3/modules/users/service.py— error constants + remove dead exportscpv3/modules/users/router.py— error constantscpv3/modules/projects/service.py— remove dead exportscpv3/modules/files/router.py— path traversal fix + error constantscpv3/modules/media/service.py— error constants + dead code removalcpv3/modules/media/schemas.py— mutable default fixcpv3/modules/transcription/schemas.py— remove deadmax_line_countcpv3/modules/notifications/service.py— complete label maps (Russian)cpv3/modules/notifications/router.py— use shared token decodercpv3/modules/jobs/models.py— renameproject_pct→progress_pctcpv3/modules/jobs/schemas.py— renameproject_pct→progress_pct
Phase 5 — layering + tests:
cpv3/modules/media/router.py— route through servicecpv3/modules/transcription/router.py— route through servicecpv3/modules/notifications/router.py— route through servicecofee_backend/CLAUDE.md— updated architecture docs- New test files for helpers and permission checks
Alembic migration
One migration for project_pct → progress_pct column rename in jobs table.
Risks and Mitigations
| Risk | Mitigation |
|---|---|
| Actor refactor breaks background processing | Test _run_actor() lifecycle thoroughly before deploying. Keep old actors available on a branch for rollback. |
| Dramatiq broker import order | infrastructure/dramatiq.py must be imported before any actor module. Enforce via explicit import at top of tasks/actors.py. Worker command changes to uv run dramatiq cpv3.modules.tasks.actors. |
progress_pct rename breaks frontend |
Check frontend API types for project_pct usage before applying migration. Coordinate with frontend update. |
| Missing auth checks break existing workflows | New permission checks only restrict access that was unintentionally open. Test with real user flows. |
| Shared helpers become a dumping ground | Keep common/db.py and common/http.py minimal. New helpers need clear justification. |