import { Queue, Worker, type Job } from "bullmq"; import { serverConfig } from "@/srv/config"; import { S3Service, type UploadProgress, } from "@/srv/services/s3"; import { RENDER_CANCELLED_ERROR_MESSAGE, renderCaptionedVideo, type RenderProgress, } from "@/srv/services/render_video"; import { sendWebhook } from "@/srv/services/webhook"; import type { DocumentInterface } from "@/srv/types/DocumentSchema"; import type { CaptionStyleConfigType } from "@/srv/types/CaptionStyleSchema"; const QUEUE_NAME = "caption-renders"; const PROGRESS_THROTTLE_MS = 3_000; const UPLOAD_PROGRESS_THROTTLE_MS = 1_500; const RENDER_PROGRESS_MIN_PCT = 5; const RENDER_PROGRESS_MAX_PCT = 95; const UPLOAD_PROGRESS_START_PCT = 95; const UPLOAD_PROGRESS_END_PCT = 99; const KILOBYTE = 1024; const MEGABYTE = 1024 * KILOBYTE; const RENDER_CANCELLATION_KEY_PREFIX = "caption-render-cancel:"; const RENDER_CANCELLATION_TTL_SECONDS = 60 * 60; const RENDER_CANCELLATION_POLL_MS = 1_000; export type RenderJobData = { renderId: string; folder?: string; videoSrc: string; transcription: DocumentInterface; styleConfig?: CaptionStyleConfigType; callbackUrl?: string; }; export type RenderJobResult = { outputPath: string; callbackDelivered: boolean; }; export type CancelRenderResult = { renderId: string; status: | "cancelled" | "cancellation_requested" | "completed" | "failed" | "not_found"; }; function formatBytes(bytes: number): string { if (bytes >= MEGABYTE) { return `${(bytes / MEGABYTE).toFixed(1)} MB`; } if (bytes >= KILOBYTE) { return `${(bytes / KILOBYTE).toFixed(1)} KB`; } return `${bytes} B`; } function getRenderCancellationKey(renderId: string): string { return `${RENDER_CANCELLATION_KEY_PREFIX}${renderId}`; } async function requestRenderCancellation(renderId: string): Promise { const client = await renderQueue.client; await client.set( getRenderCancellationKey(renderId), "1", "EX", RENDER_CANCELLATION_TTL_SECONDS, ); } async function isRenderCancellationRequested(renderId: string): Promise { const client = await renderQueue.client; const value = await client.get(getRenderCancellationKey(renderId)); return value === "1"; } async function clearRenderCancellation(renderId: string): Promise { const client = await renderQueue.client; await client.del(getRenderCancellationKey(renderId)); } // --------------------------------------------------------------------------- // Queue (enqueue side) // --------------------------------------------------------------------------- const redisConnection = { url: serverConfig.REDIS_URL }; export const renderQueue = new Queue( QUEUE_NAME, { connection: redisConnection }, ); export async function enqueueRender(data: RenderJobData): Promise { const job = await renderQueue.add("render", data, { jobId: data.renderId, attempts: 2, backoff: { type: "exponential", delay: 5_000 }, removeOnComplete: { age: 3600 }, removeOnFail: { age: 7200 }, }); return job.id!; } export async function cancelRender(renderId: string): Promise { const job = await renderQueue.getJob(renderId); if (!job) { return { renderId, status: "not_found" }; } const state = await job.getState(); if (state === "completed") { await clearRenderCancellation(renderId); return { renderId, status: "completed" }; } if (state === "failed") { await clearRenderCancellation(renderId); return { renderId, status: "failed" }; } if (state === "active") { await requestRenderCancellation(renderId); return { renderId, status: "cancellation_requested" }; } await clearRenderCancellation(renderId); await job.remove(); return { renderId, status: "cancelled" }; } // --------------------------------------------------------------------------- // Worker (processing side) // --------------------------------------------------------------------------- async function processRenderJob(job: Job) { const { folder, videoSrc, transcription, styleConfig, callbackUrl } = job.data; const logPrefix = `[render-job:${job.id}]`; const renderId = String(job.id); const s3Service = new S3Service("captioned", folder); const filename = s3Service.getFileName(videoSrc); const videoUrl = await s3Service.getFileURL(videoSrc); const uploadAbortController = new AbortController(); let cancellationPollTimer: ReturnType | null = null; console.log(`${logPrefix} Starting job for ${videoSrc}`); const ensureNotCancelled = async () => { if (await isRenderCancellationRequested(renderId)) { throw new Error(RENDER_CANCELLED_ERROR_MESSAGE); } }; await ensureNotCancelled(); cancellationPollTimer = setInterval(() => { void isRenderCancellationRequested(renderId).then((cancelRequested) => { if (!cancelRequested) { return; } uploadAbortController.abort(RENDER_CANCELLED_ERROR_MESSAGE); }); }, RENDER_CANCELLATION_POLL_MS); // Send RUNNING webhook if (callbackUrl) { const delivered = await sendWebhook(callbackUrl, { status: "RUNNING", progress_pct: RENDER_PROGRESS_MIN_PCT, current_message: "Рендер субтитров", started_at: new Date().toISOString(), }); console.log(`${logPrefix} Start webhook delivered: ${delivered}`); } await job.updateProgress(RENDER_PROGRESS_MIN_PCT); let lastProgressSend = 0; let lastUploadProgressSend = 0; let lastUploadPct = UPLOAD_PROGRESS_START_PCT; const pushRunningProgress = (progressPct: number, currentMessage: string) => { if (callbackUrl) { void sendWebhook(callbackUrl, { status: "RUNNING", progress_pct: progressPct, current_message: currentMessage, }); } void job.updateProgress(progressPct).catch((error) => { console.error(`${logPrefix} Failed to update BullMQ progress:`, error); }); }; const onProgress = (progress: RenderProgress) => { if (progress.totalFrames <= 0) { return; } const now = Date.now(); if (now - lastProgressSend >= PROGRESS_THROTTLE_MS) { lastProgressSend = now; const pct = Math.min( Math.round( (progress.renderedFrames / progress.totalFrames) * (RENDER_PROGRESS_MAX_PCT - RENDER_PROGRESS_MIN_PCT), ) + RENDER_PROGRESS_MIN_PCT, RENDER_PROGRESS_MAX_PCT, ); pushRunningProgress( pct, `Рендер субтитров: ${progress.renderedFrames}/${progress.totalFrames}`, ); } }; const onUploadProgress = (progress: UploadProgress) => { const now = Date.now(); const uploadPct = Math.min( UPLOAD_PROGRESS_START_PCT + Math.round( progress.percent * (UPLOAD_PROGRESS_END_PCT - UPLOAD_PROGRESS_START_PCT), ), UPLOAD_PROGRESS_END_PCT, ); const shouldSend = uploadPct > lastUploadPct || now - lastUploadProgressSend >= UPLOAD_PROGRESS_THROTTLE_MS; if (!shouldSend) { return; } lastUploadProgressSend = now; lastUploadPct = uploadPct; pushRunningProgress( uploadPct, `Загрузка видео: ${Math.round(progress.percent * 100)}% (${formatBytes(progress.uploadedBytes)} / ${formatBytes(progress.totalBytes)})`, ); }; let outputLocalPath: string | undefined; try { const result = await renderCaptionedVideo( transcription, videoUrl, filename, styleConfig, onProgress, () => isRenderCancellationRequested(renderId), ); outputLocalPath = result.output; console.log(`${logPrefix} Render finished, starting upload`); await ensureNotCancelled(); pushRunningProgress( UPLOAD_PROGRESS_START_PCT, "Рендер завершён, начинаем загрузку видео", ); const s3OutPath = await s3Service.uploadFile( result.output, filename, onUploadProgress, uploadAbortController.signal, ); console.log(`${logPrefix} Upload completed: ${s3OutPath}`); pushRunningProgress( UPLOAD_PROGRESS_END_PCT, "Загрузка завершена, публикуем результат", ); // Send DONE webhook let callbackDelivered = false; if (callbackUrl) { callbackDelivered = await sendWebhook(callbackUrl, { status: "DONE", progress_pct: 100, current_message: "Готово", output_data: { output_path: s3OutPath }, finished_at: new Date().toISOString(), }); console.log(`${logPrefix} Done webhook delivered: ${callbackDelivered}`); } return { outputPath: s3OutPath, callbackDelivered }; } catch (err) { if ( err instanceof Error && err.message === RENDER_CANCELLED_ERROR_MESSAGE ) { console.log(`${logPrefix} Cancellation requested`); throw err; } console.error(`${logPrefix} Job failed:`, err); // Send FAILED webhook if (callbackUrl) { await sendWebhook(callbackUrl, { status: "FAILED", progress_pct: 0, current_message: "Ошибка рендера", error_message: err instanceof Error ? err.message : String(err), finished_at: new Date().toISOString(), }); } throw err; } finally { if (cancellationPollTimer) { clearInterval(cancellationPollTimer); } await clearRenderCancellation(renderId); if (outputLocalPath) { await Bun.file(outputLocalPath).delete().catch(() => {}); console.log(`${logPrefix} Cleaned up local output ${outputLocalPath}`); } } } export function startRenderWorker(): Worker { const worker = new Worker( QUEUE_NAME, processRenderJob, { connection: redisConnection, concurrency: serverConfig.MAX_CONCURRENT_RENDERS, }, ); worker.on("failed", (job, err) => { console.error(`Render job ${job?.id} failed:`, err.message); }); worker.on("completed", (job) => { console.log(`Render job ${job.id} completed`); }); return worker; }