356 lines
10 KiB
TypeScript
356 lines
10 KiB
TypeScript
import { Queue, Worker, type Job } from "bullmq";
|
|
import { serverConfig } from "@/srv/config";
|
|
import {
|
|
S3Service,
|
|
type UploadProgress,
|
|
} from "@/srv/services/s3";
|
|
import {
|
|
RENDER_CANCELLED_ERROR_MESSAGE,
|
|
renderCaptionedVideo,
|
|
type RenderProgress,
|
|
} from "@/srv/services/render_video";
|
|
import { sendWebhook } from "@/srv/services/webhook";
|
|
import type { DocumentInterface } from "@/srv/types/DocumentSchema";
|
|
import type { CaptionStyleConfigType } from "@/srv/types/CaptionStyleSchema";
|
|
|
|
const QUEUE_NAME = "caption-renders";
|
|
const PROGRESS_THROTTLE_MS = 3_000;
|
|
const UPLOAD_PROGRESS_THROTTLE_MS = 1_500;
|
|
const RENDER_PROGRESS_MIN_PCT = 5;
|
|
const RENDER_PROGRESS_MAX_PCT = 95;
|
|
const UPLOAD_PROGRESS_START_PCT = 95;
|
|
const UPLOAD_PROGRESS_END_PCT = 99;
|
|
const KILOBYTE = 1024;
|
|
const MEGABYTE = 1024 * KILOBYTE;
|
|
const RENDER_CANCELLATION_KEY_PREFIX = "caption-render-cancel:";
|
|
const RENDER_CANCELLATION_TTL_SECONDS = 60 * 60;
|
|
const RENDER_CANCELLATION_POLL_MS = 1_000;
|
|
|
|
export type RenderJobData = {
|
|
renderId: string;
|
|
folder?: string;
|
|
videoSrc: string;
|
|
transcription: DocumentInterface;
|
|
styleConfig?: CaptionStyleConfigType;
|
|
callbackUrl?: string;
|
|
};
|
|
|
|
export type RenderJobResult = {
|
|
outputPath: string;
|
|
callbackDelivered: boolean;
|
|
};
|
|
|
|
export type CancelRenderResult = {
|
|
renderId: string;
|
|
status:
|
|
| "cancelled"
|
|
| "cancellation_requested"
|
|
| "completed"
|
|
| "failed"
|
|
| "not_found";
|
|
};
|
|
|
|
function formatBytes(bytes: number): string {
|
|
if (bytes >= MEGABYTE) {
|
|
return `${(bytes / MEGABYTE).toFixed(1)} MB`;
|
|
}
|
|
|
|
if (bytes >= KILOBYTE) {
|
|
return `${(bytes / KILOBYTE).toFixed(1)} KB`;
|
|
}
|
|
|
|
return `${bytes} B`;
|
|
}
|
|
|
|
function getRenderCancellationKey(renderId: string): string {
|
|
return `${RENDER_CANCELLATION_KEY_PREFIX}${renderId}`;
|
|
}
|
|
|
|
async function requestRenderCancellation(renderId: string): Promise<void> {
|
|
const client = await renderQueue.client;
|
|
await client.set(
|
|
getRenderCancellationKey(renderId),
|
|
"1",
|
|
"EX",
|
|
RENDER_CANCELLATION_TTL_SECONDS,
|
|
);
|
|
}
|
|
|
|
async function isRenderCancellationRequested(renderId: string): Promise<boolean> {
|
|
const client = await renderQueue.client;
|
|
const value = await client.get(getRenderCancellationKey(renderId));
|
|
return value === "1";
|
|
}
|
|
|
|
async function clearRenderCancellation(renderId: string): Promise<void> {
|
|
const client = await renderQueue.client;
|
|
await client.del(getRenderCancellationKey(renderId));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Queue (enqueue side)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const redisConnection = { url: serverConfig.REDIS_URL };
|
|
|
|
export const renderQueue = new Queue<RenderJobData, RenderJobResult>(
|
|
QUEUE_NAME,
|
|
{ connection: redisConnection },
|
|
);
|
|
|
|
export async function enqueueRender(data: RenderJobData): Promise<string> {
|
|
const job = await renderQueue.add("render", data, {
|
|
jobId: data.renderId,
|
|
attempts: 2,
|
|
backoff: { type: "exponential", delay: 5_000 },
|
|
removeOnComplete: { age: 3600 },
|
|
removeOnFail: { age: 7200 },
|
|
});
|
|
return job.id!;
|
|
}
|
|
|
|
export async function cancelRender(renderId: string): Promise<CancelRenderResult> {
|
|
const job = await renderQueue.getJob(renderId);
|
|
if (!job) {
|
|
return { renderId, status: "not_found" };
|
|
}
|
|
|
|
const state = await job.getState();
|
|
if (state === "completed") {
|
|
await clearRenderCancellation(renderId);
|
|
return { renderId, status: "completed" };
|
|
}
|
|
|
|
if (state === "failed") {
|
|
await clearRenderCancellation(renderId);
|
|
return { renderId, status: "failed" };
|
|
}
|
|
|
|
if (state === "active") {
|
|
await requestRenderCancellation(renderId);
|
|
return { renderId, status: "cancellation_requested" };
|
|
}
|
|
|
|
await clearRenderCancellation(renderId);
|
|
await job.remove();
|
|
return { renderId, status: "cancelled" };
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Worker (processing side)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async function processRenderJob(job: Job<RenderJobData, RenderJobResult>) {
|
|
const { folder, videoSrc, transcription, styleConfig, callbackUrl } =
|
|
job.data;
|
|
const logPrefix = `[render-job:${job.id}]`;
|
|
const renderId = String(job.id);
|
|
|
|
const s3Service = new S3Service("captioned", folder);
|
|
const filename = s3Service.getFileName(videoSrc);
|
|
const videoUrl = await s3Service.getFileURL(videoSrc);
|
|
const uploadAbortController = new AbortController();
|
|
let cancellationPollTimer: ReturnType<typeof setInterval> | null = null;
|
|
|
|
console.log(`${logPrefix} Starting job for ${videoSrc}`);
|
|
|
|
const ensureNotCancelled = async () => {
|
|
if (await isRenderCancellationRequested(renderId)) {
|
|
throw new Error(RENDER_CANCELLED_ERROR_MESSAGE);
|
|
}
|
|
};
|
|
|
|
await ensureNotCancelled();
|
|
|
|
cancellationPollTimer = setInterval(() => {
|
|
void isRenderCancellationRequested(renderId).then((cancelRequested) => {
|
|
if (!cancelRequested) {
|
|
return;
|
|
}
|
|
|
|
uploadAbortController.abort(RENDER_CANCELLED_ERROR_MESSAGE);
|
|
});
|
|
}, RENDER_CANCELLATION_POLL_MS);
|
|
|
|
// Send RUNNING webhook
|
|
if (callbackUrl) {
|
|
const delivered = await sendWebhook(callbackUrl, {
|
|
status: "RUNNING",
|
|
progress_pct: RENDER_PROGRESS_MIN_PCT,
|
|
current_message: "Рендер субтитров",
|
|
started_at: new Date().toISOString(),
|
|
});
|
|
console.log(`${logPrefix} Start webhook delivered: ${delivered}`);
|
|
}
|
|
|
|
await job.updateProgress(RENDER_PROGRESS_MIN_PCT);
|
|
|
|
let lastProgressSend = 0;
|
|
let lastUploadProgressSend = 0;
|
|
let lastUploadPct = UPLOAD_PROGRESS_START_PCT;
|
|
|
|
const pushRunningProgress = (progressPct: number, currentMessage: string) => {
|
|
if (callbackUrl) {
|
|
void sendWebhook(callbackUrl, {
|
|
status: "RUNNING",
|
|
progress_pct: progressPct,
|
|
current_message: currentMessage,
|
|
});
|
|
}
|
|
|
|
void job.updateProgress(progressPct).catch((error) => {
|
|
console.error(`${logPrefix} Failed to update BullMQ progress:`, error);
|
|
});
|
|
};
|
|
|
|
const onProgress = (progress: RenderProgress) => {
|
|
if (progress.totalFrames <= 0) {
|
|
return;
|
|
}
|
|
|
|
const now = Date.now();
|
|
if (now - lastProgressSend >= PROGRESS_THROTTLE_MS) {
|
|
lastProgressSend = now;
|
|
const pct = Math.min(
|
|
Math.round(
|
|
(progress.renderedFrames / progress.totalFrames) *
|
|
(RENDER_PROGRESS_MAX_PCT - RENDER_PROGRESS_MIN_PCT),
|
|
) + RENDER_PROGRESS_MIN_PCT,
|
|
RENDER_PROGRESS_MAX_PCT,
|
|
);
|
|
pushRunningProgress(
|
|
pct,
|
|
`Рендер субтитров: ${progress.renderedFrames}/${progress.totalFrames}`,
|
|
);
|
|
}
|
|
};
|
|
|
|
const onUploadProgress = (progress: UploadProgress) => {
|
|
const now = Date.now();
|
|
const uploadPct = Math.min(
|
|
UPLOAD_PROGRESS_START_PCT +
|
|
Math.round(
|
|
progress.percent *
|
|
(UPLOAD_PROGRESS_END_PCT - UPLOAD_PROGRESS_START_PCT),
|
|
),
|
|
UPLOAD_PROGRESS_END_PCT,
|
|
);
|
|
const shouldSend =
|
|
uploadPct > lastUploadPct ||
|
|
now - lastUploadProgressSend >= UPLOAD_PROGRESS_THROTTLE_MS;
|
|
|
|
if (!shouldSend) {
|
|
return;
|
|
}
|
|
|
|
lastUploadProgressSend = now;
|
|
lastUploadPct = uploadPct;
|
|
|
|
pushRunningProgress(
|
|
uploadPct,
|
|
`Загрузка видео: ${Math.round(progress.percent * 100)}% (${formatBytes(progress.uploadedBytes)} / ${formatBytes(progress.totalBytes)})`,
|
|
);
|
|
};
|
|
|
|
let outputLocalPath: string | undefined;
|
|
try {
|
|
const result = await renderCaptionedVideo(
|
|
transcription,
|
|
videoUrl,
|
|
filename,
|
|
styleConfig,
|
|
onProgress,
|
|
() => isRenderCancellationRequested(renderId),
|
|
);
|
|
outputLocalPath = result.output;
|
|
console.log(`${logPrefix} Render finished, starting upload`);
|
|
|
|
await ensureNotCancelled();
|
|
|
|
pushRunningProgress(
|
|
UPLOAD_PROGRESS_START_PCT,
|
|
"Рендер завершён, начинаем загрузку видео",
|
|
);
|
|
|
|
const s3OutPath = await s3Service.uploadFile(
|
|
result.output,
|
|
filename,
|
|
onUploadProgress,
|
|
uploadAbortController.signal,
|
|
);
|
|
console.log(`${logPrefix} Upload completed: ${s3OutPath}`);
|
|
|
|
pushRunningProgress(
|
|
UPLOAD_PROGRESS_END_PCT,
|
|
"Загрузка завершена, публикуем результат",
|
|
);
|
|
|
|
// Send DONE webhook
|
|
let callbackDelivered = false;
|
|
if (callbackUrl) {
|
|
callbackDelivered = await sendWebhook(callbackUrl, {
|
|
status: "DONE",
|
|
progress_pct: 100,
|
|
current_message: "Готово",
|
|
output_data: { output_path: s3OutPath },
|
|
finished_at: new Date().toISOString(),
|
|
});
|
|
console.log(`${logPrefix} Done webhook delivered: ${callbackDelivered}`);
|
|
}
|
|
|
|
return { outputPath: s3OutPath, callbackDelivered };
|
|
} catch (err) {
|
|
if (
|
|
err instanceof Error &&
|
|
err.message === RENDER_CANCELLED_ERROR_MESSAGE
|
|
) {
|
|
console.log(`${logPrefix} Cancellation requested`);
|
|
throw err;
|
|
}
|
|
|
|
console.error(`${logPrefix} Job failed:`, err);
|
|
// Send FAILED webhook
|
|
if (callbackUrl) {
|
|
await sendWebhook(callbackUrl, {
|
|
status: "FAILED",
|
|
progress_pct: 0,
|
|
current_message: "Ошибка рендера",
|
|
error_message: err instanceof Error ? err.message : String(err),
|
|
finished_at: new Date().toISOString(),
|
|
});
|
|
}
|
|
throw err;
|
|
} finally {
|
|
if (cancellationPollTimer) {
|
|
clearInterval(cancellationPollTimer);
|
|
}
|
|
await clearRenderCancellation(renderId);
|
|
|
|
if (outputLocalPath) {
|
|
await Bun.file(outputLocalPath).delete().catch(() => {});
|
|
console.log(`${logPrefix} Cleaned up local output ${outputLocalPath}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
export function startRenderWorker(): Worker<RenderJobData, RenderJobResult> {
|
|
const worker = new Worker<RenderJobData, RenderJobResult>(
|
|
QUEUE_NAME,
|
|
processRenderJob,
|
|
{
|
|
connection: redisConnection,
|
|
concurrency: serverConfig.MAX_CONCURRENT_RENDERS,
|
|
},
|
|
);
|
|
|
|
worker.on("failed", (job, err) => {
|
|
console.error(`Render job ${job?.id} failed:`, err.message);
|
|
});
|
|
|
|
worker.on("completed", (job) => {
|
|
console.log(`Render job ${job.id} completed`);
|
|
});
|
|
|
|
return worker;
|
|
}
|