from __future__ import annotations import asyncio from itertools import count from types import SimpleNamespace import uuid from cpv3.modules.media import service as media_service from cpv3.modules.tasks import service as task_service def test_parse_ffmpeg_progress_time_seconds_from_timecode() -> None: assert media_service._get_ffmpeg_output_time_seconds( { "out_time": "00:00:12.500000", "progress": "continue", } ) == 12.5 def test_parse_ffmpeg_progress_time_seconds_returns_none_for_invalid_snapshot() -> None: assert media_service._get_ffmpeg_output_time_seconds( { "out_time": "not-a-timecode", "progress": "continue", } ) is None def test_media_convert_actor_emits_intermediate_progress_events(monkeypatch) -> None: sent_events: list[task_service.TaskWebhookEvent] = [] monotonic_values = count(step=2) async def fake_convert_to_mp4( _storage: object, *, file_key: str, out_folder: str, on_progress: object | None = None, ) -> SimpleNamespace: assert file_key == "uploads/source.mov" assert out_folder == "projects/1" assert callable(on_progress) on_progress("converting", 0.0) on_progress("converting", 50.0) on_progress("converting", 100.0) on_progress("uploading", None) return SimpleNamespace( file_path="projects/1/converted/video.mp4", file_url="https://example.com/video.mp4", file_size=123, ) monkeypatch.setattr(task_service, "_run_async", asyncio.run) monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None) monkeypatch.setattr(task_service, "_get_storage_service", lambda: object()) monkeypatch.setattr( task_service, "_send_webhook_event", lambda _url, event: sent_events.append(event), ) monkeypatch.setattr( task_service.time, "monotonic", lambda: float(next(monotonic_values)), ) monkeypatch.setattr(media_service, "convert_to_mp4", fake_convert_to_mp4) task_service.media_convert_actor.fn( job_id=str(uuid.uuid4()), webhook_url="http://backend.test/api/tasks/webhook/job-1/", file_key="uploads/source.mov", out_folder="projects/1", output_format="mp4", ) progress_events = [event for event in sent_events if event.progress_pct is not None] assert [event.progress_pct for event in progress_events] == [ 5.0, 10.0, 52.5, 95.0, 99.0, 100.0, ] assert [event.current_message for event in progress_events] == [ "Подготовка файла", "Конвертация видео", "Конвертация видео", "Загрузка результата", "Сохранение результата", "Завершено", ] assert sent_events[-1].status == task_service.JOB_STATUS_DONE assert sent_events[-1].output_data == { "file_path": "projects/1/converted/video.mp4", "file_url": "https://example.com/video.mp4", "file_size": 123, } def test_silence_apply_actor_emits_intermediate_progress_events(monkeypatch) -> None: sent_events: list[task_service.TaskWebhookEvent] = [] monotonic_values = count(step=2) async def fake_apply_silence_cuts( _storage: object, *, file_key: str, out_folder: str, cuts: list[dict], output_name: str | None = None, on_progress: object | None = None, ) -> SimpleNamespace: assert file_key == "uploads/source.mp4" assert out_folder == "projects/1" assert cuts == [{"start_ms": 100, "end_ms": 200}] assert output_name == "edited.mp4" assert callable(on_progress) on_progress("applying_cuts", 0.0) on_progress("applying_cuts", 50.0) on_progress("applying_cuts", 100.0) on_progress("uploading", None) return SimpleNamespace( file_path="projects/1/silent/edited.mp4", file_url="https://example.com/edited.mp4", file_size=456, ) monkeypatch.setattr(task_service, "_run_async", asyncio.run) monkeypatch.setattr(task_service, "_raise_if_job_cancelled", lambda _job_id: None) monkeypatch.setattr(task_service, "_get_storage_service", lambda: object()) monkeypatch.setattr( task_service, "_send_webhook_event", lambda _url, event: sent_events.append(event), ) monkeypatch.setattr( task_service.time, "monotonic", lambda: float(next(monotonic_values)), ) monkeypatch.setattr(media_service, "apply_silence_cuts", fake_apply_silence_cuts) task_service.silence_apply_actor.fn( job_id=str(uuid.uuid4()), webhook_url="http://backend.test/api/tasks/webhook/job-2/", file_key="uploads/source.mp4", out_folder="projects/1", cuts=[{"start_ms": 100, "end_ms": 200}], output_name="edited.mp4", ) progress_events = [event for event in sent_events if event.progress_pct is not None] assert [event.progress_pct for event in progress_events] == [ 5.0, 10.0, 52.5, 95.0, 99.0, 100.0, ] assert [event.current_message for event in progress_events] == [ "Подготовка файла", "Применение вырезок", "Применение вырезок", "Загрузка результата", "Сохранение результата", "Завершено", ] assert sent_events[-1].status == task_service.JOB_STATUS_DONE assert sent_events[-1].output_data == { "file_path": "projects/1/silent/edited.mp4", "file_url": "https://example.com/edited.mp4", "file_size": 456, }