init: new structure + fix lint errors
This commit is contained in:
@@ -0,0 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Literal
|
||||
|
||||
import jwt
|
||||
from passlib.context import CryptContext # type: ignore[import-untyped]
|
||||
|
||||
from cpv3.infrastructure.settings import get_settings
|
||||
|
||||
# Use bcrypt_sha256 to lift the 72-byte password limit while still verifying legacy bcrypt hashes.
|
||||
pwd_context = CryptContext(schemes=["bcrypt_sha256", "bcrypt"], deprecated="auto")
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def verify_password(password: str, password_hash: str) -> bool:
|
||||
return pwd_context.verify(password, password_hash)
|
||||
|
||||
|
||||
def utcnow() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def create_token(
|
||||
*,
|
||||
subject: str,
|
||||
token_type: Literal["access", "refresh"],
|
||||
expires_in: timedelta,
|
||||
extra: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
settings = get_settings()
|
||||
|
||||
now = utcnow()
|
||||
payload: dict[str, Any] = {
|
||||
"sub": subject,
|
||||
"type": token_type,
|
||||
"iat": int(now.timestamp()),
|
||||
"exp": int((now + expires_in).timestamp()),
|
||||
}
|
||||
if extra:
|
||||
payload.update(extra)
|
||||
|
||||
return jwt.encode(
|
||||
payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm
|
||||
)
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict[str, Any]:
|
||||
settings = get_settings()
|
||||
return jwt.decode(
|
||||
token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]
|
||||
)
|
||||
Reference in New Issue
Block a user