""" auth.py: Métodos y variables globales necesarias para la autenticación y auditoria. Notas: - El microservicio requiere que la aplicación que lo utilice comparta su SECRET_KEY - El microservicio requiere que la aplicación que lo utilice implemente un campo "user_id" en su SECRET_KEY y una expiración: exp * No se recomienda implementar dos sistemas que manipulen este token en simultaneo, puede provocar incongruencias en el módulo de auditoria. * """ import os from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from datetime import datetime, timedelta oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") SECRET_KEY = os.getenv("SECRET_KEY") if not SECRET_KEY: raise RuntimeError("SECRET_KEY no configurada. Abortando inicio.") if len(SECRET_KEY) < 20: raise RuntimeError("SECRET_KEY demasiado corta") ALGORITHM = "HS256" def get_current_user(token: str = Depends(oauth2_scheme)): """ Obtiene un ID de usuario del JWT para auditoria. """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = payload.get("user_id") if not user_id: raise HTTPException(status_code=401, detail="Token inválido") return user_id # simplemente el id except JWTError: raise HTTPException(status_code=401, detail="Token inválido") def get_system_token() -> str: """Genera o renueva el token de sistema para uso interno.""" payload = { "user_id": "system", "exp": datetime.utcnow() + timedelta(hours=24) } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) # Token global para reutilizar _system_token: str = None def get_internal_headers() -> dict: """Headers con token de sistema para llamadas internas.""" global _system_token # Renovar si no existe o está por vencer if _system_token is None or _token_expiring_soon(_system_token): _system_token = get_system_token() return { "Content-Type": "application/json", "Authorization": f"Bearer {_system_token}" } def _token_expiring_soon(token: str) -> bool: """ Verifica la expiración del token. """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) exp = datetime.fromtimestamp(payload["exp"]) return exp - datetime.utcnow() < timedelta(hours=1) except Exception: return True # si falla, renovar