From ccb97eba2e62832af225cac00c0d9dd0d671b06e Mon Sep 17 00:00:00 2001 From: Daniil Date: Sun, 3 May 2026 19:59:30 +0300 Subject: [PATCH] fix: fix dramatiq failing ffmpeg task --- cpv3/modules/tasks/service.py | 88 +++++++++++++++----------- tests/unit/test_media_convert_tasks.py | 39 +++++++++++- 2 files changed, 90 insertions(+), 37 deletions(-) diff --git a/cpv3/modules/tasks/service.py b/cpv3/modules/tasks/service.py index a568a0e..d725c6b 100644 --- a/cpv3/modules/tasks/service.py +++ b/cpv3/modules/tasks/service.py @@ -17,6 +17,7 @@ from typing import Any import dramatiq # type: ignore[import-untyped] import httpx from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped] +from dramatiq.middleware.time_limit import TimeLimitExceeded from sqlalchemy.ext.asyncio import AsyncSession @@ -103,6 +104,7 @@ MESSAGE_UPLOADING_RESULT = "Загрузка результата" MESSAGE_SAVING_RESULT = "Сохранение результата" MESSAGE_RENDERING_CAPTIONS = "Рендеринг субтитров" MESSAGE_CANCELLED = "Отменено пользователем" +MESSAGE_TASK_TIMED_OUT = "Задача превысила лимит времени" MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров" MESSAGE_UPLOADING_FRAMES = "Загрузка кадров" MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров" @@ -136,6 +138,7 @@ PROGRESS_CONVERT_THROTTLE_SECONDS = 1.0 ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING) DRAMATIQ_BROKER_REF_SEPARATOR = ":" +MEDIA_TASK_TIME_LIMIT_MS = 60 * 60 * 1000 class JobCancelledError(RuntimeError): @@ -196,6 +199,35 @@ def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None: raise +def _send_failed_webhook_event( + webhook_url: str, + *, + error_message: str, + current_message: str | None = None, +) -> None: + """Send a FAILED event with a user-facing message.""" + _send_webhook_event( + webhook_url, + TaskWebhookEvent( + status=JOB_STATUS_FAILED, + progress_pct=0, + current_message=current_message, + error_message=error_message, + finished_at=_utc_now(), + ), + ) + + +def _handle_time_limit_exceeded(actor_name: str, job_id: uuid.UUID, webhook_url: str) -> None: + """Mark a timed-out actor as failed instead of leaving the job RUNNING.""" + logger.exception("%s timed out: %s", actor_name, job_id) + _send_failed_webhook_event( + webhook_url, + error_message=MESSAGE_TASK_TIMED_OUT, + current_message=MESSAGE_TASK_TIMED_OUT, + ) + + def _derive_event_type(event: TaskWebhookEvent) -> str: """Derive a job event type from a webhook event payload.""" if event.status is not None: @@ -328,7 +360,7 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None: ) -@dramatiq.actor(max_retries=0) +@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS) def silence_remove_actor( job_id: str, webhook_url: str, @@ -392,19 +424,15 @@ def silence_remove_actor( except JobCancelledError: logger.info("silence_remove_actor cancelled: %s", job_uuid) return + except TimeLimitExceeded: + _handle_time_limit_exceeded("silence_remove_actor", job_uuid, webhook_url) + return except Exception as exc: logger.exception("silence_remove_actor failed: %s", job_uuid) - _send_webhook_event( - webhook_url, - TaskWebhookEvent( - status=JOB_STATUS_FAILED, - error_message=str(exc), - finished_at=_utc_now(), - ), - ) + _send_failed_webhook_event(webhook_url, error_message=str(exc)) -@dramatiq.actor(max_retries=0) +@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS) def silence_detect_actor( job_id: str, webhook_url: str, @@ -462,19 +490,15 @@ def silence_detect_actor( except JobCancelledError: logger.info("silence_detect_actor cancelled: %s", job_uuid) return + except TimeLimitExceeded: + _handle_time_limit_exceeded("silence_detect_actor", job_uuid, webhook_url) + return except Exception as exc: logger.exception("silence_detect_actor failed: %s", job_uuid) - _send_webhook_event( - webhook_url, - TaskWebhookEvent( - status=JOB_STATUS_FAILED, - error_message=str(exc), - finished_at=_utc_now(), - ), - ) + _send_failed_webhook_event(webhook_url, error_message=str(exc)) -@dramatiq.actor(max_retries=0) +@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS) def silence_apply_actor( job_id: str, webhook_url: str, @@ -580,19 +604,15 @@ def silence_apply_actor( except JobCancelledError: logger.info("silence_apply_actor cancelled: %s", job_uuid) return + except TimeLimitExceeded: + _handle_time_limit_exceeded("silence_apply_actor", job_uuid, webhook_url) + return except Exception as exc: logger.exception("silence_apply_actor failed: %s", job_uuid) - _send_webhook_event( - webhook_url, - TaskWebhookEvent( - status=JOB_STATUS_FAILED, - error_message=str(exc), - finished_at=_utc_now(), - ), - ) + _send_failed_webhook_event(webhook_url, error_message=str(exc)) -@dramatiq.actor(max_retries=0) +@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS) def media_convert_actor( job_id: str, webhook_url: str, @@ -698,16 +718,12 @@ def media_convert_actor( except JobCancelledError: logger.info("media_convert_actor cancelled: %s", job_uuid) return + except TimeLimitExceeded: + _handle_time_limit_exceeded("media_convert_actor", job_uuid, webhook_url) + return except Exception as exc: logger.exception("media_convert_actor failed: %s", job_uuid) - _send_webhook_event( - webhook_url, - TaskWebhookEvent( - status=JOB_STATUS_FAILED, - error_message=str(exc), - finished_at=_utc_now(), - ), - ) + _send_failed_webhook_event(webhook_url, error_message=str(exc)) @dramatiq.actor(max_retries=0) diff --git a/tests/unit/test_media_convert_tasks.py b/tests/unit/test_media_convert_tasks.py index 03ce9c7..4e6ee17 100644 --- a/tests/unit/test_media_convert_tasks.py +++ b/tests/unit/test_media_convert_tasks.py @@ -2,7 +2,8 @@ from __future__ import annotations import asyncio import uuid -from types import SimpleNamespace + +from dramatiq.middleware.time_limit import TimeLimitExceeded from cpv3.infrastructure.storage.types import FileInfo from cpv3.modules.media import service as media_service @@ -91,3 +92,39 @@ def test_media_convert_actor_emits_precise_progress_updates(monkeypatch) -> None (99.0, "Сохранение результата"), (100.0, "Завершено"), ] + + +def test_media_convert_actor_marks_job_failed_on_time_limit(monkeypatch) -> None: + sent_events: list[task_service.TaskWebhookEvent] = [] + + async def fake_convert_to_mp4( + _storage: object, + *, + file_key: str, + out_folder: str, + on_progress, + ) -> FileInfo: + _ = (file_key, out_folder, on_progress) + raise TimeLimitExceeded + + monkeypatch.setattr(media_service, "convert_to_mp4", fake_convert_to_mp4) + monkeypatch.setattr(task_service, "_run_async", asyncio.run) + monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None) + monkeypatch.setattr(task_service, "_get_storage_service", lambda: object()) + monkeypatch.setattr( + task_service, + "_send_webhook_event", + lambda _url, event: sent_events.append(event), + ) + + task_service.media_convert_actor.fn( + job_id=str(uuid.uuid4()), + webhook_url="http://backend.test/api/tasks/webhook/job-1/", + file_key="uploads/source.mkv", + out_folder="projects/1", + output_format="mp4", + ) + + assert sent_events[-1].status == "FAILED" + assert sent_events[-1].current_message == "Задача превысила лимит времени" + assert sent_events[-1].error_message == "Задача превысила лимит времени"