33 lines
1.2 KiB
Python
33 lines
1.2 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from cpv3.modules.webhooks.models import Webhook
|
|
from cpv3.modules.webhooks.repository import WebhookRepository
|
|
from cpv3.modules.webhooks.schemas import WebhookCreate, WebhookUpdate
|
|
from cpv3.modules.users.models import User
|
|
|
|
|
|
class WebhookService:
|
|
"""Service for webhook business logic and orchestration."""
|
|
|
|
def __init__(self, session: AsyncSession) -> None:
|
|
self._repo = WebhookRepository(session)
|
|
|
|
async def list_webhooks(self, *, requester: User) -> list[Webhook]:
|
|
return await self._repo.list_all(requester=requester)
|
|
|
|
async def get_webhook(self, webhook_id: uuid.UUID) -> Webhook | None:
|
|
return await self._repo.get_by_id(webhook_id)
|
|
|
|
async def create_webhook(self, *, requester: User, data: WebhookCreate) -> Webhook:
|
|
return await self._repo.create(requester=requester, data=data)
|
|
|
|
async def update_webhook(self, webhook: Webhook, data: WebhookUpdate) -> Webhook:
|
|
return await self._repo.update(webhook, data)
|
|
|
|
async def deactivate_webhook(self, webhook: Webhook) -> None:
|
|
await self._repo.deactivate(webhook)
|