from __future__ import annotations import uuid from types import SimpleNamespace from unittest.mock import AsyncMock import pytest from cpv3.modules.tasks.schemas import CaptionsGenerateRequest, TaskWebhookEvent from cpv3.modules.tasks.service import TaskService @pytest.mark.asyncio async def test_submit_captions_generate_reuses_existing_active_job() -> None: service = TaskService(session=AsyncMock()) existing_job_id = uuid.uuid4() existing_job = SimpleNamespace( id=existing_job_id, status="RUNNING", ) service._find_duplicate_active_job = AsyncMock(return_value=existing_job) service._submit_task = AsyncMock() response = await service.submit_captions_generate( requester=SimpleNamespace(id=uuid.uuid4()), request=CaptionsGenerateRequest( video_s3_path="projects/test/video.mp4", folder="output_files", transcription_id=uuid.uuid4(), project_id=uuid.uuid4(), preset_id=uuid.uuid4(), ), ) assert response.job_id == existing_job_id assert response.status == "RUNNING" assert response.webhook_url.endswith(f"/api/tasks/webhook/{existing_job_id}/") service._submit_task.assert_not_awaited() @pytest.mark.asyncio async def test_record_webhook_event_ignores_cancelled_job() -> None: cancelled_job = SimpleNamespace( id=uuid.uuid4(), status="CANCELLED", ) job_repo = SimpleNamespace( get_by_id=AsyncMock(return_value=cancelled_job), update=AsyncMock(), ) event_repo = SimpleNamespace(create=AsyncMock()) service = TaskService(session=AsyncMock()) service._job_repo = job_repo service._event_repo = event_repo result = await service.record_webhook_event( job_id=cancelled_job.id, event=TaskWebhookEvent( status="DONE", current_message="Готово", output_data={"output_path": "projects/test/output.mp4"}, ), ) assert result is cancelled_job job_repo.update.assert_not_awaited() event_repo.create.assert_not_awaited() @pytest.mark.asyncio async def test_cancel_job_marks_job_cancelled_and_keeps_record() -> None: job_id = uuid.uuid4() user_id = uuid.uuid4() job = SimpleNamespace( id=job_id, status="PENDING", broker_id="default:redis-message-id", job_type="CAPTIONS_GENERATE", user_id=user_id, ) cancelled_job = SimpleNamespace( id=job_id, status="CANCELLED", broker_id="default:redis-message-id", job_type="CAPTIONS_GENERATE", user_id=user_id, current_message="Отменено пользователем", ) service = TaskService(session=AsyncMock()) service._job_repo = SimpleNamespace(update=AsyncMock(return_value=cancelled_job)) service._event_repo = SimpleNamespace(create=AsyncMock()) service._cancel_dramatiq_message = AsyncMock() service._cancel_caption_render = AsyncMock() service._create_cancellation_notification = AsyncMock() service._sync_project_workspace_after_webhook = AsyncMock() result = await service.cancel_job(job) assert result is cancelled_job service._job_repo.update.assert_awaited_once() service._event_repo.create.assert_awaited_once() service._cancel_dramatiq_message.assert_awaited_once_with(job.broker_id) service._cancel_caption_render.assert_awaited_once_with(job) service._create_cancellation_notification.assert_awaited_once_with(cancelled_job) service._sync_project_workspace_after_webhook.assert_awaited_once_with(cancelled_job) @pytest.mark.asyncio async def test_record_webhook_event_updates_progress_for_conversion_job() -> None: job = SimpleNamespace( id=uuid.uuid4(), status="RUNNING", job_type="MEDIA_CONVERT", project_id=uuid.uuid4(), user_id=None, ) updated_job = SimpleNamespace(**job.__dict__, project_pct=52.5) job_repo = SimpleNamespace( get_by_id=AsyncMock(return_value=job), update=AsyncMock(return_value=updated_job), ) event_repo = SimpleNamespace(create=AsyncMock()) service = TaskService(session=AsyncMock()) service._job_repo = job_repo service._event_repo = event_repo service._sync_project_workspace_after_webhook = AsyncMock() result = await service.record_webhook_event( job_id=job.id, event=TaskWebhookEvent( progress_pct=52.5, current_message="Конвертация видео", ), ) update_call = job_repo.update.await_args.args[1] event_call = event_repo.create.await_args.args[0] assert result is updated_job assert update_call.project_pct == 52.5 assert update_call.current_message == "Конвертация видео" assert event_call.event_type == "progress" assert event_call.payload["progress_pct"] == 52.5 @pytest.mark.asyncio async def test_record_webhook_event_syncs_workspace_for_completed_supported_job() -> None: job = SimpleNamespace( id=uuid.uuid4(), status="RUNNING", job_type="SILENCE_DETECT", user_id=None, project_id=uuid.uuid4(), output_data={"silent_segments": []}, ) updated_job = SimpleNamespace(**{**job.__dict__, "status": "DONE"}) job_repo = SimpleNamespace( get_by_id=AsyncMock(return_value=job), update=AsyncMock(return_value=updated_job), ) event_repo = SimpleNamespace(create=AsyncMock()) service = TaskService(session=AsyncMock()) service._job_repo = job_repo service._event_repo = event_repo service._sync_project_workspace_after_webhook = AsyncMock() result = await service.record_webhook_event( job_id=job.id, event=TaskWebhookEvent( status="DONE", current_message="Готово", output_data={"silent_segments": []}, ), ) assert result is updated_job service._sync_project_workspace_after_webhook.assert_awaited_once_with(updated_job) @pytest.mark.asyncio async def test_record_webhook_event_projects_workspace_after_done_job() -> None: job = SimpleNamespace( id=uuid.uuid4(), status="RUNNING", job_type="MEDIA_CONVERT", project_id=uuid.uuid4(), user_id=None, output_data={"file_path": "users/test/converted.mp4"}, ) updated_job = SimpleNamespace( **{ **job.__dict__, "status": "DONE", "output_data": { "file_path": "users/test/converted.mp4", "file_id": "00000000-0000-4000-a000-000000000777", }, } ) job_repo = SimpleNamespace( get_by_id=AsyncMock(return_value=job), update=AsyncMock(return_value=updated_job), ) event_repo = SimpleNamespace(create=AsyncMock()) workspace_service = SimpleNamespace(handle_job_update=AsyncMock()) service = TaskService(session=AsyncMock()) service._job_repo = job_repo service._event_repo = event_repo service._save_convert_artifacts = AsyncMock(return_value=updated_job) service._get_project_workspace_service = lambda: workspace_service result = await service.record_webhook_event( job_id=job.id, event=TaskWebhookEvent( status="DONE", current_message="Готово", output_data={"file_path": "users/test/converted.mp4"}, ), ) assert result is updated_job service._save_convert_artifacts.assert_awaited_once_with(updated_job) workspace_service.handle_job_update.assert_awaited_once_with(job=updated_job)