90 lines
3.3 KiB
Python
90 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Response, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from cpv3.infrastructure.auth import get_current_user
|
|
from cpv3.db.session import get_db
|
|
from cpv3.modules.users.models import User
|
|
from cpv3.modules.webhooks.schemas import WebhookCreate, WebhookRead, WebhookUpdate
|
|
from cpv3.modules.webhooks.service import WebhookService
|
|
|
|
router = APIRouter(prefix="/api/webhooks", tags=["Webhooks"])
|
|
|
|
|
|
@router.get("/", response_model=list[WebhookRead])
|
|
async def list_all_webhooks(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> list[WebhookRead]:
|
|
service = WebhookService(db)
|
|
items = await service.list_webhooks(requester=current_user)
|
|
return [WebhookRead.model_validate(w) for w in items]
|
|
|
|
|
|
@router.post("/", response_model=WebhookRead, status_code=status.HTTP_201_CREATED)
|
|
async def create_webhook_endpoint(
|
|
body: WebhookCreate,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> WebhookRead:
|
|
service = WebhookService(db)
|
|
webhook = await service.create_webhook(requester=current_user, data=body)
|
|
return WebhookRead.model_validate(webhook)
|
|
|
|
|
|
@router.get("/{webhook_id}/", response_model=WebhookRead)
|
|
async def retrieve_webhook_endpoint(
|
|
webhook_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> WebhookRead:
|
|
service = WebhookService(db)
|
|
webhook = await service.get_webhook(webhook_id)
|
|
if webhook is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
|
|
|
|
if not current_user.is_staff and webhook.user_id != current_user.id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
|
|
|
|
return WebhookRead.model_validate(webhook)
|
|
|
|
|
|
@router.patch("/{webhook_id}/", response_model=WebhookRead)
|
|
async def patch_webhook_endpoint(
|
|
webhook_id: uuid.UUID,
|
|
body: WebhookUpdate,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> WebhookRead:
|
|
service = WebhookService(db)
|
|
webhook = await service.get_webhook(webhook_id)
|
|
if webhook is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
|
|
|
|
if not current_user.is_staff and webhook.user_id != current_user.id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
|
|
|
|
webhook = await service.update_webhook(webhook, body)
|
|
return WebhookRead.model_validate(webhook)
|
|
|
|
|
|
@router.delete("/{webhook_id}/", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_webhook_endpoint(
|
|
webhook_id: uuid.UUID,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> Response:
|
|
service = WebhookService(db)
|
|
webhook = await service.get_webhook(webhook_id)
|
|
if webhook is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Не найдено")
|
|
|
|
if not current_user.is_staff and webhook.user_id != current_user.id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Доступ запрещён")
|
|
|
|
await service.deactivate_webhook(webhook)
|
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|