from __future__ import annotations import uuid from sqlalchemy.ext.asyncio import AsyncSession from cpv3.modules.webhooks.models import Webhook from cpv3.modules.webhooks.repository import WebhookRepository from cpv3.modules.webhooks.schemas import WebhookCreate, WebhookUpdate from cpv3.modules.users.models import User class WebhookService: """Service for webhook business logic and orchestration.""" def __init__(self, session: AsyncSession) -> None: self._repo = WebhookRepository(session) async def list_webhooks(self, *, requester: User) -> list[Webhook]: return await self._repo.list_all(requester=requester) async def get_webhook(self, webhook_id: uuid.UUID) -> Webhook | None: return await self._repo.get_by_id(webhook_id) async def create_webhook(self, *, requester: User, data: WebhookCreate) -> Webhook: return await self._repo.create(requester=requester, data=data) async def update_webhook(self, webhook: Webhook, data: WebhookUpdate) -> Webhook: return await self._repo.update(webhook, data) async def deactivate_webhook(self, webhook: Webhook) -> None: await self._repo.deactivate(webhook)