Files
main_backend/docs/superpowers/specs/2026-03-15-backend-comprehensive-refactor-design.md
Daniil 4b90925c2a docs: fix spec issues from review
- Remove contradictory tasks/artifacts.py (artifacts go to owning modules)
- Distinguish simple (5) vs complex (3) actors for _run_actor() pattern
- Make apply_partial_update sync (no I/O)
- Remove soft_delete helper (one-liner, confusing with dual patterns)
- Fix false dead code claim about media/service.py json import
- Add all project_pct → progress_pct reference locations
- Add broker import order risk and worker command change
- Add _run_async() and _parse_frame_rate() placement

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 23:13:25 +03:00

390 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Backend Comprehensive Refactor — Design Spec
**Date:** 2026-03-15
**Scope:** `cofee_backend/` — all modules, common layer, infrastructure layer
**Approach:** Top-down (decompose tasks/ first, then extract shared helpers, then apply across modules)
## Problem Summary
The backend has grown feature-by-feature and accumulated structural debt:
1. **`tasks/service.py`** is 1673 lines mixing four concerns: broker infrastructure, actor lifecycle, artifact persistence, and task submission orchestration. Eight actors repeat an identical ~40-line lifecycle skeleton.
2. **CRUD modules** repeat the same patterns: repository update loops (`model_dump` + commit + refresh), router permission checks (fetch-or-404, owner-or-staff, raise 403), inline error strings.
3. **Auth/ownership** is inconsistent — some modules enforce ownership, others don't.
4. **Layering** is inconsistent — some routers bypass the service layer and call repositories directly.
5. **Error messages** are inline strings instead of `ERROR_` constants.
6. **Dead code**, naming issues, and minor bugs exist across several modules.
## Guiding Principles
- **DRY:** Extract repeated patterns into small free functions, not base classes
- **KISS:** Helpers are plain functions — no magic, no decorators, no hidden behavior
- **Developer-readable:** Every call site shows what it does; no hidden abstractions
- **Preserve structure:** Keep the existing module pattern; allow extra files only when a single `service.py` would exceed ~300 lines
- **Shared helpers in `common/`:** Application-level helpers (DB update, HTTP permission checks)
- **Infrastructure for infrastructure:** Only broker/transport concerns go in `infrastructure/`
## Phase 1: Decompose `tasks/service.py`
### 1.1 New file layout
```
cpv3/modules/tasks/
├── __init__.py (unchanged)
├── schemas.py (unchanged)
├── router.py (unchanged)
├── service.py TaskService class only: submit_* methods + record_webhook_event + cancel_job
├── actors.py NEW: all @dramatiq.actor functions, _run_actor() helper, _run_async(), _parse_frame_rate()
├── webhooks.py NEW: webhook transport (_send_webhook_event, _build_webhook_url, event helpers)
└── constants.py NEW: ERROR_, MESSAGE_, STATUS_ constants, JobCancelledError, all in Russian
```
**Note:** `tasks/artifacts.py` is NOT created. Artifact persistence moves directly to the owning modules (see Section 1.3).
New infrastructure file:
```
cpv3/infrastructure/dramatiq.py
- RedisBroker setup and initialization
- _serialize_broker_reference() / _parse_broker_reference()
- _get_job_status_sync() (raw psycopg2 cancellation check)
- _raise_if_job_cancelled()
- _run_async() (sync wrapper for running async code in worker context)
```
**Important:** `infrastructure/dramatiq.py` MUST be imported before any actor module to ensure the broker is set. Add `import cpv3.infrastructure.dramatiq` at the top of `tasks/actors.py`. The Dramatiq worker command changes from `uv run dramatiq cpv3.modules.tasks.service` to `uv run dramatiq cpv3.modules.tasks.actors`. Update `CLAUDE.md`, `docker-compose.yml`, and any deployment configs accordingly.
### 1.2 The `_run_actor()` pattern
Two variants: a simple lifecycle helper for straightforward actors, and manual lifecycle management for complex actors that need intermediate progress or polling.
#### Simple actors (5 of 8): `_run_actor()`
Used by: `media_probe_actor`, `silence_remove_actor`, `silence_detect_actor`, `silence_apply_actor`, `media_convert_actor`
```python
def _run_actor(
job_id: str,
webhook_url: str,
work_fn: Callable[..., dict[str, Any]],
*,
starting_message: str = MESSAGE_STARTING,
**work_kwargs: Any,
) -> None:
"""Shared actor lifecycle: cancel-check → RUNNING → work → DONE/FAILED.
work_fn is synchronous. For async work, use _run_async() inside work_fn.
work_fn receives job_uuid (uuid.UUID) and webhook_url (str) as kwargs
plus any additional **work_kwargs.
"""
job_uuid = uuid.UUID(job_id)
try:
_raise_if_job_cancelled(job_uuid)
except JobCancelledError:
logger.info("Actor cancelled before start: %s", job_uuid)
return
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_RUNNING,
current_message=starting_message,
started_at=_utc_now(),
))
try:
result = work_fn(job_uuid=job_uuid, webhook_url=webhook_url, **work_kwargs)
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_DONE,
output=result,
finished_at=_utc_now(),
))
except JobCancelledError:
logger.info("Actor cancelled mid-run: %s", job_uuid)
except Exception as exc:
logger.exception("Actor failed: %s", job_uuid)
_send_webhook_event(webhook_url, TaskWebhookEvent(
status=JOB_STATUS_FAILED,
error_message=str(exc),
finished_at=_utc_now(),
))
```
Each simple actor becomes ~5-15 lines defining only its unique work function and parameters.
#### Complex actors (3 of 8): manual lifecycle with shared helpers
`transcription_generate_actor`, `captions_generate_actor`, and `frame_extract_actor` do NOT use `_run_actor()` because they need:
- Intermediate progress webhooks from inside the work function
- Polling loops with `time.sleep()` and repeated cancellation checks (`captions_generate`)
- Progress callbacks from external tools (`_on_whisper_progress` in transcription)
- Multi-phase status updates (e.g., "deleting old frames → extracting → uploading" in frame_extract)
These actors keep their own lifecycle management but use the shared helpers from `tasks/webhooks.py` (`_send_webhook_event`) and `infrastructure/dramatiq.py` (`_raise_if_job_cancelled`). They still benefit from the decomposition because constants, webhook transport, and broker infrastructure are no longer duplicated — just the lifecycle wrapper is actor-specific.
Approximate line reduction: ~40-60 lines each (down from ~100-130) due to extracted constants, imports, and webhook helpers.
### 1.3 Artifact persistence → owning modules
Move artifact-saving logic to the modules that own those domain concepts:
- `_save_transcription_artifacts()``transcription/service.py` as `TranscriptionService.save_artifacts()`
- `_save_convert_artifacts()``media/service.py` as `MediaService.save_artifacts()`
- `_save_captions_artifacts()``captions/service.py` as `CaptionService.save_artifacts()`
`TaskService.record_webhook_event()` dispatches to these module-owned methods.
### 1.4 What stays in `tasks/service.py`
Only the `TaskService` class (~300-400 lines):
- `__init__()` with session + job repo
- `submit_media_probe()`, `submit_silence_detect()`, `submit_silence_remove()`, `submit_convert()`, `submit_frame_extract()`, `submit_transcription_generate()`, `submit_captions_generate()`
- `record_webhook_event()` — orchestration only, delegates artifact saving
- `cancel_job()`
### 1.5 Constants language
All user-facing constants in `tasks/constants.py` are in Russian. The current mix of English and Russian (`tasks/service.py:93-103`) is unified to Russian.
## Phase 2: Shared Helpers in `common/`
### 2.1 `common/db.py` — Repository helpers
Two free functions:
```python
def apply_partial_update(instance: object, data: BaseModel) -> None:
"""Apply non-None fields from a Pydantic schema to a SQLAlchemy model.
Synchronous — no I/O, just attribute setting."""
async def commit_and_refresh(session: AsyncSession, instance: object) -> None:
"""Commit the session and refresh the instance."""
```
**Note:** `apply_partial_update` is deliberately synchronous — it only does `model_dump` + `setattr`, no I/O. `soft_delete` is NOT extracted as a shared helper — it's a one-liner (`setattr` + commit) and the `is_active` vs `is_deleted` split makes a generic helper more confusing than helpful. Each repository keeps its own delete method with a comment noting which soft-delete pattern it uses.
Documentation comment in `common/db.py` explains the two soft-delete patterns:
```python
# Soft-delete patterns in the codebase:
# - is_active=False: users, projects, jobs, webhooks, transcription, captions
# - is_deleted=True: files, media (MediaFile, ArtifactMediaFile)
# NOT unified in this refactor — requires data migration.
# Each repository handles its own soft-delete; see the module's repository.py.
```
### 2.2 `common/http.py` — Router helpers
Two free functions + two error constants:
```python
ERROR_NOT_FOUND = "Объект не найден"
ERROR_FORBIDDEN = "Доступ запрещён"
def ensure_found(instance: object | None, detail: str = ERROR_NOT_FOUND) -> None:
"""Raise 404 if instance is None."""
def ensure_owner_or_staff(
instance: object,
user: User,
*,
owner_field: str = "owner_id",
) -> None:
"""Raise 403 if user is not staff and doesn't own the resource."""
```
## Phase 3: Apply Shared Helpers Across All CRUD Modules
### 3.1 Repository updates (7 modules)
Replace `model_dump` loop + commit + refresh with `apply_partial_update()` + `commit_and_refresh()`:
- `files/repository.py`
- `media/repository.py` (×2: MediaFile + Artifact)
- `projects/repository.py`
- `jobs/repository.py` (×2: Job + JobEvent)
- `webhooks/repository.py`
- `transcription/repository.py`
- `captions/repository.py` (special case: `style_config` nested serialization stays explicit)
### 3.2 Router permission checks (7 modules)
Replace inline 404/403 checks with `ensure_found()` + `ensure_owner_or_staff()`:
- `users/router.py` (3 sites)
- `projects/router.py` (3 sites)
- `files/router.py` (3 sites + storage path check)
- `media/router.py` (6 sites: mediafiles + artifacts)
- `jobs/router.py` (6 sites: jobs + events)
- `webhooks/router.py` (3 sites)
- `captions/router.py` (2 sites)
### 3.3 Add missing permission checks
While applying helpers, add ownership checks where currently absent:
| Module | Current state | Fix |
|--------|--------------|-----|
| `transcription/router.py` | No ownership checks | Add `ensure_owner_or_staff()` via parent project |
| `media/router.py` artifacts | `_ = current_user` (ignores user) | Add ownership check via parent media file |
| `captions/router.py` get-by-id | Returns any preset by ID | Only return if system preset or owned by requester |
| `jobs/router.py` events | No user scoping on events | Add ownership check via parent job |
## Phase 4: Error Handling, Naming, Dead Code, Security
### 4.1 Error message constants
Extract inline strings into `ERROR_` constants at the top of each file:
- `infrastructure/auth.py``ERROR_TOKEN_EXPIRED`, `ERROR_INVALID_TOKEN`, `ERROR_USER_NOT_FOUND`
- `users/service.py``ERROR_EMAIL_ALREADY_EXISTS`, `ERROR_INVALID_CREDENTIALS`
- `users/router.py``ERROR_PASSWORD_MISMATCH`, `ERROR_INCORRECT_PASSWORD`
- `files/router.py``ERROR_FILE_TOO_LARGE`, `ERROR_EMPTY_FILE`
- `media/service.py``ERROR_PROBE_FAILED`, `ERROR_NO_VIDEO_STREAM`, `ERROR_FFMPEG_FAILED`
### 4.2 Naming fix
Rename `project_pct``progress_pct` in:
- `jobs/models.py`
- `jobs/schemas.py` (3 occurrences)
- `tasks/service.py` (references in `record_webhook_event` and submit methods)
- `tasks/router.py` (maps job field to response)
- `notifications/service.py` (reads progress for notification display)
- `tests/integration/test_jobs_endpoints.py` (test data and assertions)
- Alembic migration for the DB column rename
- Check frontend for field dependency before applying
### 4.3 Dead code removal
| Dead code | Location |
|-----------|----------|
| Legacy service function exports | `users/service.py:60-105`, `projects/service.py:46-69` |
| `WordOptions.max_line_count` field | `transcription/schemas.py` — field defined but never read in any code path. Check frontend API types before removing to confirm it's not sent by the client. If frontend sends it, keep the field but add a `# TODO: implement` comment. |
| `filename_without_ext` dead assignment | `media/service.py:434` — the variable is assigned but never used at this line (earlier assignments at ~396 are valid) |
**Correction:** The top-level `import json` in `media/service.py:7` is NOT dead — `json.loads()` and `json.dumps()` are used at multiple points. Do not remove it.
### 4.4 Notification completeness
Add missing entries to `notifications/service.py`:
- `JOB_TYPE_LABELS`: add `FRAME_EXTRACT` (Russian label)
- `STATUS_TITLES`: add `CANCELLED` (Russian label)
- All notification messages must be in Russian
### 4.5 Duplicate WebSocket auth
Extract `decode_user_id_from_token(token: str) -> uuid.UUID` in `infrastructure/auth.py`.
Both `get_current_user()` and `notifications/router.py` WebSocket handler reuse it.
WebSocket handler keeps its own session management.
### 4.6 Settings fix
Change `remotion_service_url` default from `http://localhost:8001` to `http://localhost:3001` in `infrastructure/settings.py`.
### 4.7 Path traversal fix
Add root boundary check in `files/router.py` local file serving:
```python
full_path = (settings.local_storage_dir / file_path).resolve()
root = Path(settings.local_storage_dir).resolve()
if not str(full_path).startswith(str(root)):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_FORBIDDEN)
```
### 4.8 Mutable default fix
Change `media/schemas.py:138` from `streams: list[StreamSchema] = []` to `streams: list[StreamSchema] = Field(default_factory=list)`.
## Phase 5: Consistent Layering and Tests
### 5.1 Fix Router → Service → Repository layering
| Module | Current state | Fix |
|--------|--------------|-----|
| `media/router.py` | CRUD calls `MediaFileRepository` directly | Route through `MediaService` — add thin CRUD wrappers |
| `transcription/router.py` | All routes call `TranscriptionRepository` directly | Route through `TranscriptionService` — add thin CRUD wrappers |
| `notifications/router.py` | Uses `NotificationRepository` directly | Route through `NotificationService` — add list/get/mark_read |
Service methods are pass-through calls to the repository. No new business logic invented. Value is consistent architecture.
### 5.2 Test additions
| Test | Reason |
|------|--------|
| Unit test `_run_actor()` lifecycle (cancel/success/failure) | Core new abstraction |
| Unit test `ensure_found()` / `ensure_owner_or_staff()` | Shared helpers |
| Unit test `apply_partial_update()` / `commit_and_refresh()` | Shared helpers |
| Integration tests for new ownership checks (transcription, media artifacts, caption presets, job events) | New permission enforcement |
| Test path traversal prevention on local file route | Security fix |
No full test rewrites. No new integration test infrastructure.
### 5.3 Update `CLAUDE.md`
Reflect the new architecture:
- Module file rule updated: extra files allowed when `service.py` would exceed ~300 lines
- Document `common/db.py` and `common/http.py`
- Document `infrastructure/dramatiq.py`
- Document `ensure_found` / `ensure_owner_or_staff` as the standard router permission pattern
- Document `apply_partial_update` / `commit_and_refresh` as the standard repository update pattern
## Files Changed Summary
### New files
| File | Purpose |
|------|---------|
| `cpv3/common/db.py` | Repository helpers: apply_partial_update, commit_and_refresh |
| `cpv3/common/http.py` | Router helpers: ensure_found, ensure_owner_or_staff, ERROR_ constants |
| `cpv3/infrastructure/dramatiq.py` | RedisBroker setup, broker-id serialization, cancellation check, _run_async() |
| `cpv3/modules/tasks/actors.py` | All @dramatiq.actor functions + _run_actor() lifecycle helper + _parse_frame_rate() |
| `cpv3/modules/tasks/webhooks.py` | Webhook transport helpers |
| `cpv3/modules/tasks/constants.py` | All ERROR_, MESSAGE_, STATUS_ constants (Russian) |
### Modified files (by phase)
**Phase 1 — tasks decomposition:**
- `cpv3/modules/tasks/service.py` — stripped to TaskService class only
- `cpv3/modules/transcription/service.py` — receives `save_artifacts()`
- `cpv3/modules/media/service.py` — receives `save_artifacts()`
- `cpv3/modules/captions/service.py` — receives `save_artifacts()`
**Phase 3 — apply shared helpers:**
- All 7 module `repository.py` files — use `apply_partial_update()` + `commit_and_refresh()`
- All 7 module `router.py` files — use `ensure_found()` + `ensure_owner_or_staff()`
**Phase 4 — error handling, cleanup:**
- `cpv3/infrastructure/auth.py` — error constants + shared token decoder
- `cpv3/infrastructure/settings.py` — fix remotion port default
- `cpv3/modules/users/service.py` — error constants + remove dead exports
- `cpv3/modules/users/router.py` — error constants
- `cpv3/modules/projects/service.py` — remove dead exports
- `cpv3/modules/files/router.py` — path traversal fix + error constants
- `cpv3/modules/media/service.py` — error constants + dead code removal
- `cpv3/modules/media/schemas.py` — mutable default fix
- `cpv3/modules/transcription/schemas.py` — remove dead `max_line_count`
- `cpv3/modules/notifications/service.py` — complete label maps (Russian)
- `cpv3/modules/notifications/router.py` — use shared token decoder
- `cpv3/modules/jobs/models.py` — rename `project_pct``progress_pct`
- `cpv3/modules/jobs/schemas.py` — rename `project_pct``progress_pct`
**Phase 5 — layering + tests:**
- `cpv3/modules/media/router.py` — route through service
- `cpv3/modules/transcription/router.py` — route through service
- `cpv3/modules/notifications/router.py` — route through service
- `cofee_backend/CLAUDE.md` — updated architecture docs
- New test files for helpers and permission checks
### Alembic migration
One migration for `project_pct``progress_pct` column rename in jobs table.
## Risks and Mitigations
| Risk | Mitigation |
|------|-----------|
| Actor refactor breaks background processing | Test `_run_actor()` lifecycle thoroughly before deploying. Keep old actors available on a branch for rollback. |
| Dramatiq broker import order | `infrastructure/dramatiq.py` must be imported before any actor module. Enforce via explicit import at top of `tasks/actors.py`. Worker command changes to `uv run dramatiq cpv3.modules.tasks.actors`. |
| `progress_pct` rename breaks frontend | Check frontend API types for `project_pct` usage before applying migration. Coordinate with frontend update. |
| Missing auth checks break existing workflows | New permission checks only restrict access that was unintentionally open. Test with real user flows. |
| Shared helpers become a dumping ground | Keep `common/db.py` and `common/http.py` minimal. New helpers need clear justification. |