Files
remotion_service/docs/superpowers/specs/2026-04-03-salutespeech-transcription-design.md
2026-04-03 23:47:58 +03:00

14 KiB

SaluteSpeech Transcription Engine — Design Spec

Date: 2026-04-03 Status: Approved Scope: Backend (primary), Frontend (minor)

Overview

Add SaluteSpeech (Sber) as a third transcription engine alongside Local Whisper and Google Speech Cloud. SaluteSpeech provides async REST-based speech recognition with word-level timestamps, domain-specific models (general/finance/medicine), and supports Russian and English.

Decisions

Decision Choice Rationale
API protocol REST (not gRPC) No gRPC deps in codebase, REST covers full async flow
Implementation pattern Direct integration (Approach A) Matches existing if/elif dispatch, no new abstractions
HTTP client httpx (sync) Already used in workers (tasks/service.py:12)
TLS certificates Bundled PEM in repo, path via Settings Self-contained, no Dockerfile changes
Token caching Module-level globals + threading.Lock Thread-safe for Dramatiq multi-thread workers, matches existing pattern
Token TTL time.monotonic() + actual expires_at from response Avoids clock drift vs hardcoded 30 min
Engine short name "salutespeech" API boundary name, maps to DB "SALUTE_SPEECH"
SaluteSpeech plan SALUTE_SPEECH_PERS Personal scope, max 5 parallel streams
pip package None (raw HTTP) salute_speech package is unmaintained
Frontend model selector Shown for SaluteSpeech (general/finance/medicine) Meaningful differentiator, follows Whisper conditional pattern

SaluteSpeech API Flow

1. Auth:    POST https://ngw.devices.sberbank.ru:9443/api/v2/oauth
2. Upload:  POST https://smartspeech.sber.ru/rest/v1/data:upload
3. Task:    POST https://smartspeech.sber.ru/rest/v1/speech:async_recognize
4. Poll:    GET  https://smartspeech.sber.ru/rest/v1/task:get?id=<task_id>
5. Download: GET https://smartspeech.sber.ru/rest/v1/data:download?response_file_id=<id>

Token TTL: 30 min (from API response expires_at). Refresh when < 60s remaining. Uploaded files retained 72 hours server-side. Task statuses: NEW → RUNNING → DONE | ERROR.

Backend — Authentication & HTTP Client

Token Cache

Module-level cache with threading.Lock for Dramatiq thread safety:

import threading

_salute_token_lock = threading.Lock()
_salute_token: str | None = None
_salute_token_expires_at: float = 0.0  # time.monotonic()

def _get_salute_access_token(client: httpx.Client) -> str:
    global _salute_token, _salute_token_expires_at
    with _salute_token_lock:
        if _salute_token and time.monotonic() < _salute_token_expires_at - SALUTE_TOKEN_REFRESH_MARGIN_SECONDS:
            return _salute_token
        settings = get_settings()
        response = client.post(
            SALUTE_AUTH_URL,
            headers={
                "Authorization": f"Basic {settings.salute_auth_key}",
                "RqUID": str(uuid.uuid4()),
                "Content-Type": "application/x-www-form-urlencoded",
            },
            content=f"scope={settings.salute_scope}",
        )
        response.raise_for_status()
        data = response.json()
        _salute_token = data["access_token"]
        # expires_at is Unix ms; convert to monotonic offset
        expires_in_seconds = (data["expires_at"] / 1000) - time.time()
        _salute_token_expires_at = time.monotonic() + expires_in_seconds
        return _salute_token

Settings (3 new fields in infrastructure/settings.py)

# SaluteSpeech
salute_auth_key: str = Field(default="", alias="SALUTE_AUTH_KEY")
salute_ca_cert_path: Path | None = Field(default=None, alias="SALUTE_CA_CERT_PATH")
salute_scope: str = Field(default="SALUTE_SPEECH_PERS", alias="SALUTE_SCOPE")
  • SALUTE_AUTH_KEY — base64 Authorization Key from Sber Studio
  • SALUTE_CA_CERT_PATH — path to bundled Russian CA PEM (e.g., ./.certs/russian_trusted_root_ca.pem)
  • SALUTE_SCOPE — OAuth scope (SALUTE_SPEECH_PERS)

Per-Job httpx Client

Created in _salute_transcribe_sync(), passed to all helpers for connection reuse:

verify = str(settings.salute_ca_cert_path) if settings.salute_ca_cert_path else True
with httpx.Client(verify=verify, timeout=30.0) as client:
    token = _get_salute_access_token(client)
    file_id = _upload_salute_audio(client, token, audio_bytes, content_type)
    task_id = _create_salute_task(client, token, file_id, language, model, encoding, sample_rate)
    result_file_id = _poll_salute_task(client, token, task_id, job_uuid, on_progress)
    raw_result = _download_salute_result(client, token, result_file_id)
    return _build_document_from_salute_result(raw_result)

Cert File

Bundled at cofee_backend/.certs/russian_trusted_root_ca.pem. Downloaded from https://gu-st.ru/content/Other/doc/russian_trusted_root_ca.cer. Only the public root CA — no private keys or secrets.

Backend — Transcription Flow & Helpers

Function Structure (in transcription/service.py)

_get_salute_access_token(client)                              → str
_upload_salute_audio(client, token, data, content_type)       → str (request_file_id)
_create_salute_task(client, token, file_id, lang, model, ...) → str (task_id)
_poll_salute_task(client, token, task_id, job_uuid, on_prog)  → str (response_file_id)
_download_salute_result(client, token, response_file_id)      → dict
_parse_salute_time(s: str) → float                            → "0.480s" → 0.48
_build_document_from_salute_result(raw: dict) → Document
_salute_transcribe_sync(*, local_file_path, language, model, job_id, on_progress) → Document
async transcribe_with_salute_speech(storage, *, file_key, ...) → Document

Upload

Read local file as bytes, send raw binary to /data:upload with appropriate Content-Type. No ffmpeg conversion — SaluteSpeech natively supports MP3, WAV, OGG, FLAC.

Audio Encoding Detection

SALUTE_ENCODING_MAP: dict[str, str] = {
    ".mp3": "MP3",
    ".wav": "PCM_S16LE",
    ".ogg": "opus",
    ".flac": "FLAC",
}

SALUTE_CONTENT_TYPE_MAP: dict[str, str] = {
    ".mp3": "audio/mpeg",
    ".wav": "audio/wav",
    ".ogg": "audio/ogg",
    ".flac": "audio/flac",
}

Create Task

JSON body with request_file_id + options:

{
    "options": {
        "audio_encoding": "MP3",
        "sample_rate": 16000,
        "language": "ru-RU",
        "model": "general",
        "channels_count": 1,
        "hypotheses_count": 1
    },
    "request_file_id": "<file_id>"
}

Language mapping: "ru""ru-RU", "en""en-US", None/auto → "ru-RU" (default).

sample_rate — extracted from probe data (the actor already runs probe_media() before transcription). Parse from the audio stream's sample_rate field, fallback to 16000.

Poll Loop

Check every 5 seconds. Three critical additions vs existing engines:

  1. Cancellation check_raise_if_job_cancelled(job_uuid) each iteration
  2. Progress reportingon_progress callback during poll so UI shows activity
  3. TimeoutSALUTE_POLL_TIMEOUT_SECONDS = 600
def _poll_salute_task(client, token, task_id, job_uuid, on_progress):
    start = time.monotonic()
    while True:
        if time.monotonic() - start > SALUTE_POLL_TIMEOUT_SECONDS:
            raise TimeoutError(ERROR_SALUTE_TIMEOUT)
        _raise_if_job_cancelled(job_uuid)

        resp = client.get(f"{SALUTE_API_BASE}/task:get", params={"id": task_id}, ...)
        status = resp.json()["result"]["status"]

        if status == "DONE":
            return resp.json()["result"]["response_file_id"]
        if status == "ERROR":
            raise RuntimeError(ERROR_SALUTE_TASK_FAILED.format(detail=...))

        # Progress: estimate based on poll iteration
        if on_progress:
            elapsed = time.monotonic() - start
            on_progress(min(elapsed / SALUTE_POLL_TIMEOUT_SECONDS * 100, 95.0))

        time.sleep(SALUTE_POLL_INTERVAL_SECONDS)

Download & Parse

Download JSON from /data:download. Result structure:

{
    "results": [{
        "text": "...",
        "normalized_text": "...",
        "start": "0.480s",
        "end": "3.600s",
        "word_alignments": [
            {"word": "...", "start": "0.480s", "end": "0.840s"}
        ]
    }]
}

Parse into SaluteSpeechSegment/SaluteSpeechWord, then _make_document_from_segments()Document.

Constants

SALUTE_POLL_INTERVAL_SECONDS = 5.0
SALUTE_POLL_TIMEOUT_SECONDS = 600
SALUTE_AUTH_URL = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
SALUTE_API_BASE = "https://smartspeech.sber.ru/rest/v1"
SALUTE_TOKEN_REFRESH_MARGIN_SECONDS = 60

ERROR_SALUTE_AUTH_FAILED = "Ошибка авторизации SaluteSpeech: {detail}"
ERROR_SALUTE_UPLOAD_FAILED = "Ошибка загрузки файла в SaluteSpeech: {detail}"
ERROR_SALUTE_TASK_FAILED = "Ошибка распознавания SaluteSpeech: {detail}"
ERROR_SALUTE_TIMEOUT = "Превышено время ожидания распознавания SaluteSpeech"

Backend — Schemas & DB Model

New Schemas (in transcription/schemas.py)

class SaluteSpeechWord(Schema):
    word: str
    start: float
    end: float

class SaluteSpeechSegment(Schema):
    text: str
    start: float
    end: float
    words: list[SaluteSpeechWord] = []

class SaluteSpeechResult(Schema):
    text: str
    segments: list[SaluteSpeechSegment]
    language: str

class SaluteSpeechParams(Schema):
    file_path: str
    language: str | None = None
    model: str = "general"

Engine Enum

# transcription/schemas.py
TranscriptionEngineEnum = Literal["LOCAL_WHISPER", "GOOGLE_SPEECH_CLOUD", "SALUTE_SPEECH"]

Type Unions

Extend _make_document_from_segments() and DocumentBuilder.compute_segment_lines() to accept SaluteSpeechSegment in their type unions.

DB Model

No changes. engine column is String(32), stores "SALUTE_SPEECH" as a plain string. No migration needed.

Backend — Task Dispatch

ENGINE_MAP (tasks/service.py)

ENGINE_MAP: dict[str, str] = {
    "whisper": "LOCAL_WHISPER",
    "google": "GOOGLE_SPEECH_CLOUD",
    "salutespeech": "SALUTE_SPEECH",
}

Task Schema (tasks/schemas.py)

engine: Literal["whisper", "google", "salutespeech"] = "whisper"

Actor Dispatch

New elif branch in transcription_generate_actor after the Google branch:

elif engine == "salutespeech":
    document = _run_async(
        transcribe_with_salute_speech(
            storage,
            file_key=file_key,
            language=language,
            model=model,
            job_id=job_uuid,
            on_progress=_on_whisper_progress,
        )
    )

Direct Endpoint (optional, for testing)

# transcription/router.py
@router.post("/salute-speech/", response_model=Document)

Frontend Changes

TranscriptionModal.tsx & TranscriptionSettingsStep.tsx

Both files get identical changes (constants are duplicated in both):

Engine options:

const ENGINE_OPTIONS = [
    { value: "whisper", label: "Whisper (локальный)" },
    { value: "google", label: "Google Speech" },
    { value: "salutespeech", label: "SaluteSpeech" },
]

Type:

engine: "whisper" | "google" | "salutespeech"

Model options — split by engine:

const WHISPER_MODEL_OPTIONS = [
    { value: "base", label: "Base" },
    { value: "small", label: "Small" },
    { value: "medium", label: "Medium" },
    { value: "large", label: "Large" },
]

const SALUTE_MODEL_OPTIONS = [
    { value: "general", label: "Общая" },
    { value: "finance", label: "Финансы" },
    { value: "medicine", label: "Медицина" },
]

Conditional model dropdown:

{(engine === "whisper" || engine === "salutespeech") && (
    <Select
        options={engine === "whisper" ? WHISPER_MODEL_OPTIONS : SALUTE_MODEL_OPTIONS}
    />
)}

Model reset on engine changeuseEffect on engine field, reset model to "base" (whisper) or "general" (salutespeech).

Language options — no changes. Existing auto / ru / en covers both SaluteSpeech languages. Mapping ("ru""ru-RU") happens in backend.

Files Changed

Backend (8 files)

File Change
infrastructure/settings.py Add 3 SaluteSpeech settings fields
transcription/schemas.py Add SaluteSpeech schema types, extend engine enum
transcription/service.py Add ~8 functions for SaluteSpeech flow
transcription/router.py Add optional /salute-speech/ direct endpoint
tasks/schemas.py Extend engine Literal to include "salutespeech"
tasks/service.py Add ENGINE_MAP entry + elif dispatch branch
.certs/russian_trusted_root_ca.pem New file — bundled Russian CA cert
.env Add SALUTE_AUTH_KEY, SALUTE_CA_CERT_PATH

Frontend (2 files)

File Change
TranscriptionModal.tsx Add engine option, split model options, engine change effect
TranscriptionSettingsStep.tsx Same changes (duplicated constants)

Error Handling

  • Auth failure (401/403) → ERROR_SALUTE_AUTH_FAILED with detail, job fails
  • Upload failure (4xx/5xx) → ERROR_SALUTE_UPLOAD_FAILED, job fails
  • Task error (status=ERROR) → ERROR_SALUTE_TASK_FAILED, job fails
  • Poll timeout (>600s) → ERROR_SALUTE_TIMEOUT, job fails
  • Job cancelledJobCancelledError raised during poll loop, actor exits cleanly
  • Partial failure (upload ok, task creation fails) → no cleanup needed, uploaded files expire after 72h

No retry logic for 4xx errors. Connect/timeout errors bubble up to Dramatiq (max_retries=0).

Not In Scope

  • Speaker diarization (available in API but not exposed)
  • Profanity filter (available but not exposed)
  • Hint words (available but not exposed)
  • Emotion detection (available but not exposed)
  • Sync recognition mode (only async implemented)
  • Additional languages beyond ru/en (kk-KZ, ky-KG, uz-UZ require special arrangement with Sber)

These can be added later by extending SaluteSpeechParams and the task creation options.