from __future__ import annotations import uuid from sqlalchemy import Select, select from sqlalchemy.ext.asyncio import AsyncSession from cpv3.modules.users.models import User from cpv3.modules.webhooks.models import Webhook from cpv3.modules.webhooks.schemas import WebhookCreate, WebhookUpdate class WebhookRepository: """Repository for Webhook database operations.""" def __init__(self, session: AsyncSession) -> None: self._session = session async def list_all(self, *, requester: User) -> list[Webhook]: stmt: Select[tuple[Webhook]] = select(Webhook).where( Webhook.is_active.is_(True) ) if not requester.is_staff: stmt = stmt.where(Webhook.user_id == requester.id) result = await self._session.execute(stmt.order_by(Webhook.created_at.desc())) return list(result.scalars().all()) async def get_by_id(self, webhook_id: uuid.UUID) -> Webhook | None: result = await self._session.execute( select(Webhook) .where(Webhook.id == webhook_id) .where(Webhook.is_active.is_(True)) ) return result.scalar_one_or_none() async def create(self, *, requester: User, data: WebhookCreate) -> Webhook: webhook = Webhook( user_id=requester.id, project_id=data.project_id, event=data.event, url=data.url, secret=data.secret, is_active=data.is_active, ) self._session.add(webhook) await self._session.commit() await self._session.refresh(webhook) return webhook async def update(self, webhook: Webhook, data: WebhookUpdate) -> Webhook: for key, value in data.model_dump(exclude_unset=True).items(): if value is not None: setattr(webhook, key, value) await self._session.commit() await self._session.refresh(webhook) return webhook async def deactivate(self, webhook: Webhook) -> None: webhook.is_active = False await self._session.commit()