56 lines
1.4 KiB
Python
56 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Literal
|
|
|
|
import jwt
|
|
from passlib.context import CryptContext # type: ignore[import-untyped]
|
|
|
|
from cpv3.infrastructure.settings import get_settings
|
|
|
|
# Use bcrypt_sha256 to lift the 72-byte password limit while still verifying legacy bcrypt hashes.
|
|
pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
return pwd_context.verify(password, password_hash)
|
|
|
|
|
|
def utcnow() -> datetime:
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def create_token(
|
|
*,
|
|
subject: str,
|
|
token_type: Literal["access", "refresh"],
|
|
expires_in: timedelta,
|
|
extra: dict[str, Any] | None = None,
|
|
) -> str:
|
|
settings = get_settings()
|
|
|
|
now = utcnow()
|
|
payload: dict[str, Any] = {
|
|
"sub": subject,
|
|
"type": token_type,
|
|
"iat": int(now.timestamp()),
|
|
"exp": int((now + expires_in).timestamp()),
|
|
}
|
|
if extra:
|
|
payload.update(extra)
|
|
|
|
return jwt.encode(
|
|
payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm
|
|
)
|
|
|
|
|
|
def decode_token(token: str) -> dict[str, Any]:
|
|
settings = get_settings()
|
|
return jwt.decode(
|
|
token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]
|
|
)
|