First commit

This commit is contained in:
unknown
2026-06-09 21:18:13 -03:00
commit 5bff6b938b
66 changed files with 10922 additions and 0 deletions
+71
View File
@@ -0,0 +1,71 @@
"""
auth.py:
Métodos y variables globales necesarias para la autenticación y auditoria.
Notas:
- El microservicio requiere que la aplicación que lo utilice comparta su SECRET_KEY
- El microservicio requiere que la aplicación que lo utilice implemente un campo "user_id" en su SECRET_KEY y una expiración: exp
* No se recomienda implementar dos sistemas que manipulen este token en simultaneo, puede provocar incongruencias en el módulo de auditoria. *
"""
import os
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise RuntimeError("SECRET_KEY no configurada. Abortando inicio.")
if len(SECRET_KEY) < 20:
raise RuntimeError("SECRET_KEY demasiado corta")
ALGORITHM = "HS256"
def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Obtiene un ID de usuario del JWT para auditoria.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Token inválido")
return user_id # simplemente el id
except JWTError:
raise HTTPException(status_code=401, detail="Token inválido")
def get_system_token() -> str:
"""Genera o renueva el token de sistema para uso interno."""
payload = {
"user_id": "system",
"exp": datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# Token global para reutilizar
_system_token: str = None
def get_internal_headers() -> dict:
"""Headers con token de sistema para llamadas internas."""
global _system_token
# Renovar si no existe o está por vencer
if _system_token is None or _token_expiring_soon(_system_token):
_system_token = get_system_token()
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {_system_token}"
}
def _token_expiring_soon(token: str) -> bool:
"""
Verifica la expiración del token.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
exp = datetime.fromtimestamp(payload["exp"])
return exp - datetime.utcnow() < timedelta(hours=1)
except Exception:
return True # si falla, renovar