This commit is contained in:
@@ -17,6 +17,7 @@ from typing import Any
|
||||
import dramatiq # type: ignore[import-untyped]
|
||||
import httpx
|
||||
from dramatiq.brokers.redis import RedisBroker # type: ignore[import-untyped]
|
||||
from dramatiq.middleware.time_limit import TimeLimitExceeded
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
|
||||
@@ -103,6 +104,7 @@ MESSAGE_UPLOADING_RESULT = "Загрузка результата"
|
||||
MESSAGE_SAVING_RESULT = "Сохранение результата"
|
||||
MESSAGE_RENDERING_CAPTIONS = "Рендеринг субтитров"
|
||||
MESSAGE_CANCELLED = "Отменено пользователем"
|
||||
MESSAGE_TASK_TIMED_OUT = "Задача превысила лимит времени"
|
||||
MESSAGE_EXTRACTING_FRAMES = "Извлечение кадров"
|
||||
MESSAGE_UPLOADING_FRAMES = "Загрузка кадров"
|
||||
MESSAGE_DELETING_OLD_FRAMES = "Удаление старых кадров"
|
||||
@@ -136,6 +138,7 @@ PROGRESS_CONVERT_THROTTLE_SECONDS = 1.0
|
||||
|
||||
ACTIVE_JOB_STATUSES = (JOB_STATUS_PENDING, JOB_STATUS_RUNNING)
|
||||
DRAMATIQ_BROKER_REF_SEPARATOR = ":"
|
||||
MEDIA_TASK_TIME_LIMIT_MS = 60 * 60 * 1000
|
||||
|
||||
|
||||
class JobCancelledError(RuntimeError):
|
||||
@@ -196,6 +199,35 @@ def _send_webhook_event(webhook_url: str, event: TaskWebhookEvent) -> None:
|
||||
raise
|
||||
|
||||
|
||||
def _send_failed_webhook_event(
|
||||
webhook_url: str,
|
||||
*,
|
||||
error_message: str,
|
||||
current_message: str | None = None,
|
||||
) -> None:
|
||||
"""Send a FAILED event with a user-facing message."""
|
||||
_send_webhook_event(
|
||||
webhook_url,
|
||||
TaskWebhookEvent(
|
||||
status=JOB_STATUS_FAILED,
|
||||
progress_pct=0,
|
||||
current_message=current_message,
|
||||
error_message=error_message,
|
||||
finished_at=_utc_now(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _handle_time_limit_exceeded(actor_name: str, job_id: uuid.UUID, webhook_url: str) -> None:
|
||||
"""Mark a timed-out actor as failed instead of leaving the job RUNNING."""
|
||||
logger.exception("%s timed out: %s", actor_name, job_id)
|
||||
_send_failed_webhook_event(
|
||||
webhook_url,
|
||||
error_message=MESSAGE_TASK_TIMED_OUT,
|
||||
current_message=MESSAGE_TASK_TIMED_OUT,
|
||||
)
|
||||
|
||||
|
||||
def _derive_event_type(event: TaskWebhookEvent) -> str:
|
||||
"""Derive a job event type from a webhook event payload."""
|
||||
if event.status is not None:
|
||||
@@ -328,7 +360,7 @@ def media_probe_actor(job_id: str, webhook_url: str, file_key: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
@dramatiq.actor(max_retries=0)
|
||||
@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS)
|
||||
def silence_remove_actor(
|
||||
job_id: str,
|
||||
webhook_url: str,
|
||||
@@ -392,19 +424,15 @@ def silence_remove_actor(
|
||||
except JobCancelledError:
|
||||
logger.info("silence_remove_actor cancelled: %s", job_uuid)
|
||||
return
|
||||
except TimeLimitExceeded:
|
||||
_handle_time_limit_exceeded("silence_remove_actor", job_uuid, webhook_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("silence_remove_actor failed: %s", job_uuid)
|
||||
_send_webhook_event(
|
||||
webhook_url,
|
||||
TaskWebhookEvent(
|
||||
status=JOB_STATUS_FAILED,
|
||||
error_message=str(exc),
|
||||
finished_at=_utc_now(),
|
||||
),
|
||||
)
|
||||
_send_failed_webhook_event(webhook_url, error_message=str(exc))
|
||||
|
||||
|
||||
@dramatiq.actor(max_retries=0)
|
||||
@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS)
|
||||
def silence_detect_actor(
|
||||
job_id: str,
|
||||
webhook_url: str,
|
||||
@@ -462,19 +490,15 @@ def silence_detect_actor(
|
||||
except JobCancelledError:
|
||||
logger.info("silence_detect_actor cancelled: %s", job_uuid)
|
||||
return
|
||||
except TimeLimitExceeded:
|
||||
_handle_time_limit_exceeded("silence_detect_actor", job_uuid, webhook_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("silence_detect_actor failed: %s", job_uuid)
|
||||
_send_webhook_event(
|
||||
webhook_url,
|
||||
TaskWebhookEvent(
|
||||
status=JOB_STATUS_FAILED,
|
||||
error_message=str(exc),
|
||||
finished_at=_utc_now(),
|
||||
),
|
||||
)
|
||||
_send_failed_webhook_event(webhook_url, error_message=str(exc))
|
||||
|
||||
|
||||
@dramatiq.actor(max_retries=0)
|
||||
@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS)
|
||||
def silence_apply_actor(
|
||||
job_id: str,
|
||||
webhook_url: str,
|
||||
@@ -580,19 +604,15 @@ def silence_apply_actor(
|
||||
except JobCancelledError:
|
||||
logger.info("silence_apply_actor cancelled: %s", job_uuid)
|
||||
return
|
||||
except TimeLimitExceeded:
|
||||
_handle_time_limit_exceeded("silence_apply_actor", job_uuid, webhook_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("silence_apply_actor failed: %s", job_uuid)
|
||||
_send_webhook_event(
|
||||
webhook_url,
|
||||
TaskWebhookEvent(
|
||||
status=JOB_STATUS_FAILED,
|
||||
error_message=str(exc),
|
||||
finished_at=_utc_now(),
|
||||
),
|
||||
)
|
||||
_send_failed_webhook_event(webhook_url, error_message=str(exc))
|
||||
|
||||
|
||||
@dramatiq.actor(max_retries=0)
|
||||
@dramatiq.actor(max_retries=0, time_limit=MEDIA_TASK_TIME_LIMIT_MS)
|
||||
def media_convert_actor(
|
||||
job_id: str,
|
||||
webhook_url: str,
|
||||
@@ -698,16 +718,12 @@ def media_convert_actor(
|
||||
except JobCancelledError:
|
||||
logger.info("media_convert_actor cancelled: %s", job_uuid)
|
||||
return
|
||||
except TimeLimitExceeded:
|
||||
_handle_time_limit_exceeded("media_convert_actor", job_uuid, webhook_url)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("media_convert_actor failed: %s", job_uuid)
|
||||
_send_webhook_event(
|
||||
webhook_url,
|
||||
TaskWebhookEvent(
|
||||
status=JOB_STATUS_FAILED,
|
||||
error_message=str(exc),
|
||||
finished_at=_utc_now(),
|
||||
),
|
||||
)
|
||||
_send_failed_webhook_event(webhook_url, error_message=str(exc))
|
||||
|
||||
|
||||
@dramatiq.actor(max_retries=0)
|
||||
|
||||
@@ -2,7 +2,8 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
from types import SimpleNamespace
|
||||
|
||||
from dramatiq.middleware.time_limit import TimeLimitExceeded
|
||||
|
||||
from cpv3.infrastructure.storage.types import FileInfo
|
||||
from cpv3.modules.media import service as media_service
|
||||
@@ -91,3 +92,39 @@ def test_media_convert_actor_emits_precise_progress_updates(monkeypatch) -> None
|
||||
(99.0, "Сохранение результата"),
|
||||
(100.0, "Завершено"),
|
||||
]
|
||||
|
||||
|
||||
def test_media_convert_actor_marks_job_failed_on_time_limit(monkeypatch) -> None:
|
||||
sent_events: list[task_service.TaskWebhookEvent] = []
|
||||
|
||||
async def fake_convert_to_mp4(
|
||||
_storage: object,
|
||||
*,
|
||||
file_key: str,
|
||||
out_folder: str,
|
||||
on_progress,
|
||||
) -> FileInfo:
|
||||
_ = (file_key, out_folder, on_progress)
|
||||
raise TimeLimitExceeded
|
||||
|
||||
monkeypatch.setattr(media_service, "convert_to_mp4", fake_convert_to_mp4)
|
||||
monkeypatch.setattr(task_service, "_run_async", asyncio.run)
|
||||
monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None)
|
||||
monkeypatch.setattr(task_service, "_get_storage_service", lambda: object())
|
||||
monkeypatch.setattr(
|
||||
task_service,
|
||||
"_send_webhook_event",
|
||||
lambda _url, event: sent_events.append(event),
|
||||
)
|
||||
|
||||
task_service.media_convert_actor.fn(
|
||||
job_id=str(uuid.uuid4()),
|
||||
webhook_url="http://backend.test/api/tasks/webhook/job-1/",
|
||||
file_key="uploads/source.mkv",
|
||||
out_folder="projects/1",
|
||||
output_format="mp4",
|
||||
)
|
||||
|
||||
assert sent_events[-1].status == "FAILED"
|
||||
assert sent_events[-1].current_message == "Задача превысила лимит времени"
|
||||
assert sent_events[-1].error_message == "Задача превысила лимит времени"
|
||||
|
||||
Reference in New Issue
Block a user