from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any, Literal import jwt from passlib.context import CryptContext # type: ignore[import-untyped] from cpv3.infrastructure.settings import get_settings # Use bcrypt_sha256 to lift the 72-byte password limit while still verifying legacy bcrypt hashes. pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def utcnow() -> datetime: return datetime.now(timezone.utc) def create_token( *, subject: str, token_type: Literal["access", "refresh"], expires_in: timedelta, extra: dict[str, Any] | None = None, ) -> str: settings = get_settings() now = utcnow() payload: dict[str, Any] = { "sub": subject, "type": token_type, "iat": int(now.timestamp()), "exp": int((now + expires_in).timestamp()), } if extra: payload.update(extra) return jwt.encode( payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm ) def decode_token(token: str) -> dict[str, Any]: settings = get_settings() return jwt.decode( token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm] )