Files
Daniil 259d3da89f rev 4
2026-04-07 13:42:45 +03:00

90 lines
3.3 KiB
Python

from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Response, status
from sqlalchemy.ext.asyncio import AsyncSession
from cpv3.infrastructure.auth import get_current_user
from cpv3.db.session import get_db
from cpv3.modules.users.models import User
from cpv3.modules.webhooks.schemas import WebhookCreate, WebhookRead, WebhookUpdate
from cpv3.modules.webhooks.service import WebhookService
router = APIRouter(prefix="/api/webhooks", tags=["Webhooks"])
@router.get("/", response_model=list[WebhookRead])
async def list_all_webhooks(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> list[WebhookRead]:
service = WebhookService(db)
items = await service.list_webhooks(requester=current_user)
return [WebhookRead.model_validate(w) for w in items]
@router.post("/", response_model=WebhookRead, status_code=status.HTTP_201_CREATED)
async def create_webhook_endpoint(
body: WebhookCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> WebhookRead:
service = WebhookService(db)
webhook = await service.create_webhook(requester=current_user, data=body)
return WebhookRead.model_validate(webhook)
@router.get("/{webhook_id}/", response_model=WebhookRead)
async def retrieve_webhook_endpoint(
webhook_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> WebhookRead:
service = WebhookService(db)
webhook = await service.get_webhook(webhook_id)
if webhook is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
if not current_user.is_staff and webhook.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
return WebhookRead.model_validate(webhook)
@router.patch("/{webhook_id}/", response_model=WebhookRead)
async def patch_webhook_endpoint(
webhook_id: uuid.UUID,
body: WebhookUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> WebhookRead:
service = WebhookService(db)
webhook = await service.get_webhook(webhook_id)
if webhook is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
if not current_user.is_staff and webhook.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
webhook = await service.update_webhook(webhook, body)
return WebhookRead.model_validate(webhook)
@router.delete("/{webhook_id}/", status_code=status.HTTP_204_NO_CONTENT)
async def delete_webhook_endpoint(
webhook_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Response:
service = WebhookService(db)
webhook = await service.get_webhook(webhook_id)
if webhook is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
if not current_user.is_staff and webhook.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
await service.deactivate_webhook(webhook)
return Response(status_code=status.HTTP_204_NO_CONTENT)