Files
remotion_service/docs/superpowers/specs/2026-04-03-salutespeech-transcription-design.md
2026-04-03 23:47:58 +03:00

411 lines
14 KiB
Markdown

# SaluteSpeech Transcription Engine — Design Spec
**Date:** 2026-04-03
**Status:** Approved
**Scope:** Backend (primary), Frontend (minor)
## Overview
Add SaluteSpeech (Sber) as a third transcription engine alongside Local Whisper and Google Speech Cloud. SaluteSpeech provides async REST-based speech recognition with word-level timestamps, domain-specific models (general/finance/medicine), and supports Russian and English.
## Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| API protocol | REST (not gRPC) | No gRPC deps in codebase, REST covers full async flow |
| Implementation pattern | Direct integration (Approach A) | Matches existing if/elif dispatch, no new abstractions |
| HTTP client | `httpx` (sync) | Already used in workers (`tasks/service.py:12`) |
| TLS certificates | Bundled PEM in repo, path via Settings | Self-contained, no Dockerfile changes |
| Token caching | Module-level globals + `threading.Lock` | Thread-safe for Dramatiq multi-thread workers, matches existing pattern |
| Token TTL | `time.monotonic()` + actual `expires_at` from response | Avoids clock drift vs hardcoded 30 min |
| Engine short name | `"salutespeech"` | API boundary name, maps to DB `"SALUTE_SPEECH"` |
| SaluteSpeech plan | `SALUTE_SPEECH_PERS` | Personal scope, max 5 parallel streams |
| pip package | None (raw HTTP) | `salute_speech` package is unmaintained |
| Frontend model selector | Shown for SaluteSpeech (general/finance/medicine) | Meaningful differentiator, follows Whisper conditional pattern |
## SaluteSpeech API Flow
```
1. Auth: POST https://ngw.devices.sberbank.ru:9443/api/v2/oauth
2. Upload: POST https://smartspeech.sber.ru/rest/v1/data:upload
3. Task: POST https://smartspeech.sber.ru/rest/v1/speech:async_recognize
4. Poll: GET https://smartspeech.sber.ru/rest/v1/task:get?id=<task_id>
5. Download: GET https://smartspeech.sber.ru/rest/v1/data:download?response_file_id=<id>
```
Token TTL: 30 min (from API response `expires_at`). Refresh when < 60s remaining.
Uploaded files retained 72 hours server-side.
Task statuses: NEW → RUNNING → DONE | ERROR.
## Backend — Authentication & HTTP Client
### Token Cache
Module-level cache with `threading.Lock` for Dramatiq thread safety:
```python
import threading
_salute_token_lock = threading.Lock()
_salute_token: str | None = None
_salute_token_expires_at: float = 0.0 # time.monotonic()
def _get_salute_access_token(client: httpx.Client) -> str:
global _salute_token, _salute_token_expires_at
with _salute_token_lock:
if _salute_token and time.monotonic() < _salute_token_expires_at - SALUTE_TOKEN_REFRESH_MARGIN_SECONDS:
return _salute_token
settings = get_settings()
response = client.post(
SALUTE_AUTH_URL,
headers={
"Authorization": f"Basic {settings.salute_auth_key}",
"RqUID": str(uuid.uuid4()),
"Content-Type": "application/x-www-form-urlencoded",
},
content=f"scope={settings.salute_scope}",
)
response.raise_for_status()
data = response.json()
_salute_token = data["access_token"]
# expires_at is Unix ms; convert to monotonic offset
expires_in_seconds = (data["expires_at"] / 1000) - time.time()
_salute_token_expires_at = time.monotonic() + expires_in_seconds
return _salute_token
```
### Settings (3 new fields in `infrastructure/settings.py`)
```python
# SaluteSpeech
salute_auth_key: str = Field(default="", alias="SALUTE_AUTH_KEY")
salute_ca_cert_path: Path | None = Field(default=None, alias="SALUTE_CA_CERT_PATH")
salute_scope: str = Field(default="SALUTE_SPEECH_PERS", alias="SALUTE_SCOPE")
```
- `SALUTE_AUTH_KEY` — base64 Authorization Key from Sber Studio
- `SALUTE_CA_CERT_PATH` — path to bundled Russian CA PEM (e.g., `./.certs/russian_trusted_root_ca.pem`)
- `SALUTE_SCOPE` — OAuth scope (`SALUTE_SPEECH_PERS`)
### Per-Job httpx Client
Created in `_salute_transcribe_sync()`, passed to all helpers for connection reuse:
```python
verify = str(settings.salute_ca_cert_path) if settings.salute_ca_cert_path else True
with httpx.Client(verify=verify, timeout=30.0) as client:
token = _get_salute_access_token(client)
file_id = _upload_salute_audio(client, token, audio_bytes, content_type)
task_id = _create_salute_task(client, token, file_id, language, model, encoding, sample_rate)
result_file_id = _poll_salute_task(client, token, task_id, job_uuid, on_progress)
raw_result = _download_salute_result(client, token, result_file_id)
return _build_document_from_salute_result(raw_result)
```
### Cert File
Bundled at `cofee_backend/.certs/russian_trusted_root_ca.pem`. Downloaded from `https://gu-st.ru/content/Other/doc/russian_trusted_root_ca.cer`. Only the public root CA — no private keys or secrets.
## Backend — Transcription Flow & Helpers
### Function Structure (in `transcription/service.py`)
```
_get_salute_access_token(client) → str
_upload_salute_audio(client, token, data, content_type) → str (request_file_id)
_create_salute_task(client, token, file_id, lang, model, ...) → str (task_id)
_poll_salute_task(client, token, task_id, job_uuid, on_prog) → str (response_file_id)
_download_salute_result(client, token, response_file_id) → dict
_parse_salute_time(s: str) → float → "0.480s" → 0.48
_build_document_from_salute_result(raw: dict) → Document
_salute_transcribe_sync(*, local_file_path, language, model, job_id, on_progress) → Document
async transcribe_with_salute_speech(storage, *, file_key, ...) → Document
```
### Upload
Read local file as bytes, send raw binary to `/data:upload` with appropriate `Content-Type`. No ffmpeg conversion — SaluteSpeech natively supports MP3, WAV, OGG, FLAC.
### Audio Encoding Detection
```python
SALUTE_ENCODING_MAP: dict[str, str] = {
".mp3": "MP3",
".wav": "PCM_S16LE",
".ogg": "opus",
".flac": "FLAC",
}
SALUTE_CONTENT_TYPE_MAP: dict[str, str] = {
".mp3": "audio/mpeg",
".wav": "audio/wav",
".ogg": "audio/ogg",
".flac": "audio/flac",
}
```
### Create Task
JSON body with `request_file_id` + options:
```json
{
"options": {
"audio_encoding": "MP3",
"sample_rate": 16000,
"language": "ru-RU",
"model": "general",
"channels_count": 1,
"hypotheses_count": 1
},
"request_file_id": "<file_id>"
}
```
Language mapping: `"ru"``"ru-RU"`, `"en"``"en-US"`, `None`/auto → `"ru-RU"` (default).
`sample_rate` — extracted from probe data (the actor already runs `probe_media()` before transcription). Parse from the audio stream's `sample_rate` field, fallback to `16000`.
### Poll Loop
Check every 5 seconds. Three critical additions vs existing engines:
1. **Cancellation check**`_raise_if_job_cancelled(job_uuid)` each iteration
2. **Progress reporting**`on_progress` callback during poll so UI shows activity
3. **Timeout**`SALUTE_POLL_TIMEOUT_SECONDS = 600`
```python
def _poll_salute_task(client, token, task_id, job_uuid, on_progress):
start = time.monotonic()
while True:
if time.monotonic() - start > SALUTE_POLL_TIMEOUT_SECONDS:
raise TimeoutError(ERROR_SALUTE_TIMEOUT)
_raise_if_job_cancelled(job_uuid)
resp = client.get(f"{SALUTE_API_BASE}/task:get", params={"id": task_id}, ...)
status = resp.json()["result"]["status"]
if status == "DONE":
return resp.json()["result"]["response_file_id"]
if status == "ERROR":
raise RuntimeError(ERROR_SALUTE_TASK_FAILED.format(detail=...))
# Progress: estimate based on poll iteration
if on_progress:
elapsed = time.monotonic() - start
on_progress(min(elapsed / SALUTE_POLL_TIMEOUT_SECONDS * 100, 95.0))
time.sleep(SALUTE_POLL_INTERVAL_SECONDS)
```
### Download & Parse
Download JSON from `/data:download`. Result structure:
```json
{
"results": [{
"text": "...",
"normalized_text": "...",
"start": "0.480s",
"end": "3.600s",
"word_alignments": [
{"word": "...", "start": "0.480s", "end": "0.840s"}
]
}]
}
```
Parse into `SaluteSpeechSegment`/`SaluteSpeechWord`, then `_make_document_from_segments()``Document`.
### Constants
```python
SALUTE_POLL_INTERVAL_SECONDS = 5.0
SALUTE_POLL_TIMEOUT_SECONDS = 600
SALUTE_AUTH_URL = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
SALUTE_API_BASE = "https://smartspeech.sber.ru/rest/v1"
SALUTE_TOKEN_REFRESH_MARGIN_SECONDS = 60
ERROR_SALUTE_AUTH_FAILED = "Ошибка авторизации SaluteSpeech: {detail}"
ERROR_SALUTE_UPLOAD_FAILED = "Ошибка загрузки файла в SaluteSpeech: {detail}"
ERROR_SALUTE_TASK_FAILED = "Ошибка распознавания SaluteSpeech: {detail}"
ERROR_SALUTE_TIMEOUT = "Превышено время ожидания распознавания SaluteSpeech"
```
## Backend — Schemas & DB Model
### New Schemas (in `transcription/schemas.py`)
```python
class SaluteSpeechWord(Schema):
word: str
start: float
end: float
class SaluteSpeechSegment(Schema):
text: str
start: float
end: float
words: list[SaluteSpeechWord] = []
class SaluteSpeechResult(Schema):
text: str
segments: list[SaluteSpeechSegment]
language: str
class SaluteSpeechParams(Schema):
file_path: str
language: str | None = None
model: str = "general"
```
### Engine Enum
```python
# transcription/schemas.py
TranscriptionEngineEnum = Literal["LOCAL_WHISPER", "GOOGLE_SPEECH_CLOUD", "SALUTE_SPEECH"]
```
### Type Unions
Extend `_make_document_from_segments()` and `DocumentBuilder.compute_segment_lines()` to accept `SaluteSpeechSegment` in their type unions.
### DB Model
No changes. `engine` column is `String(32)`, stores `"SALUTE_SPEECH"` as a plain string. No migration needed.
## Backend — Task Dispatch
### ENGINE_MAP (`tasks/service.py`)
```python
ENGINE_MAP: dict[str, str] = {
"whisper": "LOCAL_WHISPER",
"google": "GOOGLE_SPEECH_CLOUD",
"salutespeech": "SALUTE_SPEECH",
}
```
### Task Schema (`tasks/schemas.py`)
```python
engine: Literal["whisper", "google", "salutespeech"] = "whisper"
```
### Actor Dispatch
New `elif` branch in `transcription_generate_actor` after the Google branch:
```python
elif engine == "salutespeech":
document = _run_async(
transcribe_with_salute_speech(
storage,
file_key=file_key,
language=language,
model=model,
job_id=job_uuid,
on_progress=_on_whisper_progress,
)
)
```
### Direct Endpoint (optional, for testing)
```python
# transcription/router.py
@router.post("/salute-speech/", response_model=Document)
```
## Frontend Changes
### TranscriptionModal.tsx & TranscriptionSettingsStep.tsx
Both files get identical changes (constants are duplicated in both):
**Engine options:**
```typescript
const ENGINE_OPTIONS = [
{ value: "whisper", label: "Whisper (локальный)" },
{ value: "google", label: "Google Speech" },
{ value: "salutespeech", label: "SaluteSpeech" },
]
```
**Type:**
```typescript
engine: "whisper" | "google" | "salutespeech"
```
**Model options — split by engine:**
```typescript
const WHISPER_MODEL_OPTIONS = [
{ value: "base", label: "Base" },
{ value: "small", label: "Small" },
{ value: "medium", label: "Medium" },
{ value: "large", label: "Large" },
]
const SALUTE_MODEL_OPTIONS = [
{ value: "general", label: "Общая" },
{ value: "finance", label: "Финансы" },
{ value: "medicine", label: "Медицина" },
]
```
**Conditional model dropdown:**
```typescript
{(engine === "whisper" || engine === "salutespeech") && (
<Select
options={engine === "whisper" ? WHISPER_MODEL_OPTIONS : SALUTE_MODEL_OPTIONS}
/>
)}
```
**Model reset on engine change**`useEffect` on engine field, reset model to `"base"` (whisper) or `"general"` (salutespeech).
**Language options** — no changes. Existing `auto / ru / en` covers both SaluteSpeech languages. Mapping (`"ru"``"ru-RU"`) happens in backend.
## Files Changed
### Backend (8 files)
| File | Change |
|------|--------|
| `infrastructure/settings.py` | Add 3 SaluteSpeech settings fields |
| `transcription/schemas.py` | Add SaluteSpeech schema types, extend engine enum |
| `transcription/service.py` | Add ~8 functions for SaluteSpeech flow |
| `transcription/router.py` | Add optional `/salute-speech/` direct endpoint |
| `tasks/schemas.py` | Extend engine Literal to include `"salutespeech"` |
| `tasks/service.py` | Add `ENGINE_MAP` entry + `elif` dispatch branch |
| `.certs/russian_trusted_root_ca.pem` | New file — bundled Russian CA cert |
| `.env` | Add `SALUTE_AUTH_KEY`, `SALUTE_CA_CERT_PATH` |
### Frontend (2 files)
| File | Change |
|------|--------|
| `TranscriptionModal.tsx` | Add engine option, split model options, engine change effect |
| `TranscriptionSettingsStep.tsx` | Same changes (duplicated constants) |
## Error Handling
- **Auth failure** (401/403) → `ERROR_SALUTE_AUTH_FAILED` with detail, job fails
- **Upload failure** (4xx/5xx) → `ERROR_SALUTE_UPLOAD_FAILED`, job fails
- **Task error** (status=ERROR) → `ERROR_SALUTE_TASK_FAILED`, job fails
- **Poll timeout** (>600s) → `ERROR_SALUTE_TIMEOUT`, job fails
- **Job cancelled** → `JobCancelledError` raised during poll loop, actor exits cleanly
- **Partial failure** (upload ok, task creation fails) → no cleanup needed, uploaded files expire after 72h
No retry logic for 4xx errors. Connect/timeout errors bubble up to Dramatiq (max_retries=0).
## Not In Scope
- Speaker diarization (available in API but not exposed)
- Profanity filter (available but not exposed)
- Hint words (available but not exposed)
- Emotion detection (available but not exposed)
- Sync recognition mode (only async implemented)
- Additional languages beyond ru/en (kk-KZ, ky-KG, uz-UZ require special arrangement with Sber)
These can be added later by extending `SaluteSpeechParams` and the task creation options.