71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
"""
|
|
auth.py:
|
|
Métodos y variables globales necesarias para la autenticación y auditoria.
|
|
Notas:
|
|
- El microservicio requiere que la aplicación que lo utilice comparta su SECRET_KEY
|
|
- El microservicio requiere que la aplicación que lo utilice implemente un campo "user_id" en su SECRET_KEY y una expiración: exp
|
|
* No se recomienda implementar dos sistemas que manipulen este token en simultaneo, puede provocar incongruencias en el módulo de auditoria. *
|
|
"""
|
|
|
|
import os
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import JWTError, jwt
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
SECRET_KEY = os.getenv("SECRET_KEY")
|
|
if not SECRET_KEY:
|
|
raise RuntimeError("SECRET_KEY no configurada. Abortando inicio.")
|
|
if len(SECRET_KEY) < 20:
|
|
raise RuntimeError("SECRET_KEY demasiado corta")
|
|
ALGORITHM = "HS256"
|
|
|
|
def get_current_user(token: str = Depends(oauth2_scheme)):
|
|
"""
|
|
Obtiene un ID de usuario del JWT para auditoria.
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
user_id = payload.get("user_id")
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Token inválido")
|
|
return user_id # simplemente el id
|
|
except JWTError:
|
|
raise HTTPException(status_code=401, detail="Token inválido")
|
|
|
|
def get_system_token() -> str:
|
|
"""Genera o renueva el token de sistema para uso interno."""
|
|
payload = {
|
|
"user_id": "system",
|
|
"exp": datetime.utcnow() + timedelta(hours=24)
|
|
}
|
|
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
# Token global para reutilizar
|
|
_system_token: str = None
|
|
|
|
def get_internal_headers() -> dict:
|
|
"""Headers con token de sistema para llamadas internas."""
|
|
global _system_token
|
|
|
|
# Renovar si no existe o está por vencer
|
|
if _system_token is None or _token_expiring_soon(_system_token):
|
|
_system_token = get_system_token()
|
|
|
|
return {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {_system_token}"
|
|
}
|
|
|
|
def _token_expiring_soon(token: str) -> bool:
|
|
"""
|
|
Verifica la expiración del token.
|
|
"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
exp = datetime.fromtimestamp(payload["exp"])
|
|
return exp - datetime.utcnow() < timedelta(hours=1)
|
|
except Exception:
|
|
return True # si falla, renovar |