333 lines
11 KiB
Python
333 lines
11 KiB
Python
from sqlalchemy.orm import Session
|
|
from jose import JWTError, jwt
|
|
from passlib.context import CryptContext
|
|
from fastapi import HTTPException, status
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
from typing import Optional
|
|
|
|
from models import User, UserAudit
|
|
from schemas import UserCreate, UserUpdate
|
|
|
|
# Configuración de seguridad
|
|
SECRET_KEY = os.getenv("SECRET_KEY")
|
|
if not SECRET_KEY:
|
|
raise RuntimeError("SECRET_KEY no configurada. Abortando inicio.")
|
|
if len(SECRET_KEY) < 20:
|
|
raise RuntimeError("SECRET_KEY demasiado corta")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
class AuthService:
|
|
@staticmethod
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
@staticmethod
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
@staticmethod
|
|
def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
|
|
user = db.query(User).filter(User.email == email, User.active == True).first()
|
|
if not user:
|
|
return None
|
|
if not AuthService.verify_password(password, user.password):
|
|
return None
|
|
return user
|
|
|
|
@staticmethod
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
@staticmethod
|
|
def verify_token(token: str):
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
user_id: int = payload.get("user_id")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
return user_id
|
|
except JWTError:
|
|
raise credentials_exception
|
|
|
|
class UserService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_user_by_id(self, user_id: int) -> Optional[User]:
|
|
return self.db.query(User).filter(User.id == user_id).first()
|
|
|
|
def get_user_by_email(self, email: str) -> Optional[User]:
|
|
return self.db.query(User).filter(User.email == email).first()
|
|
|
|
def get_all_users(self, skip: int = 0, limit: int = 100):
|
|
return self.db.query(User).offset(skip).limit(limit).all()
|
|
|
|
def get_pending_users(self, skip: int = 0, limit: int = 100):
|
|
return self.db.query(User).filter(User.active == False).offset(skip).limit(limit).all()
|
|
|
|
def count_active_admins(self) -> int:
|
|
"""Helper function to count the number of active admin users."""
|
|
return self.db.query(User).filter(User.rol == 'admin', User.active == True).count()
|
|
|
|
def create_user(self, user_data: UserCreate, updater_id: Optional[int] = None, ip_address: str = None) -> User:
|
|
if self.get_user_by_email(user_data.email):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Failed to create user"
|
|
)
|
|
|
|
if user_data.rol not in ['admin', 'operator']:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid role specified"
|
|
)
|
|
|
|
if user_data.password and len(user_data.password) < 8:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Password must be at least 8 characters long"
|
|
)
|
|
|
|
hashed_password = AuthService.hash_password(user_data.password)
|
|
|
|
user = User(
|
|
name=user_data.name,
|
|
email=user_data.email,
|
|
password=hashed_password,
|
|
rol=user_data.rol,
|
|
active=user_data.active
|
|
)
|
|
|
|
self.db.add(user)
|
|
self.db.flush()
|
|
|
|
# If no updater_id provided (unauthenticated creation), mark the user as the updater (self-created)
|
|
audit_updater = updater_id if updater_id is not None else user.id
|
|
|
|
audit = UserAudit(
|
|
user_id=user.id,
|
|
updater_id=audit_updater,
|
|
action="create",
|
|
after_value=f"User created with role: {user.rol}",
|
|
snapshot={
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": user.active
|
|
},
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
return user
|
|
|
|
def update_user(self, user_id: int, user_data: UserUpdate, updater_id: int, ip_address: str = None) -> User:
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if user_data.rol not in ['admin', 'operator']:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid role specified"
|
|
)
|
|
|
|
if user_data.password and len(user_data.password) < 8:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Password must be at least 8 characters long"
|
|
)
|
|
|
|
changes = []
|
|
snapshot = {
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": user.active
|
|
}
|
|
|
|
for field, value in user_data.dict(exclude_unset=True).items():
|
|
if hasattr(user, field):
|
|
old_value = getattr(user, field)
|
|
|
|
if field == "password":
|
|
value = AuthService.hash_password(value)
|
|
old_value = "***"
|
|
|
|
if old_value != value:
|
|
setattr(user, field, value)
|
|
|
|
audit = UserAudit(
|
|
user_id=user_id,
|
|
updater_id=updater_id,
|
|
action="update",
|
|
updated_attribute=field,
|
|
before_value=str(old_value),
|
|
after_value=str(value) if field != "password" else "***",
|
|
snapshot=snapshot,
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
changes.append(field)
|
|
|
|
if changes:
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
return user
|
|
|
|
def change_user_role(self, user_id: int, new_role: str, updater_id: int, ip_address: str = None) -> User:
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
old_role = user.rol
|
|
snapshot = {
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": user.active
|
|
}
|
|
|
|
user.rol = new_role
|
|
|
|
audit = UserAudit(
|
|
user_id=user_id,
|
|
updater_id=updater_id,
|
|
action="role_change",
|
|
updated_attribute="rol",
|
|
before_value=old_role,
|
|
after_value=new_role,
|
|
snapshot=snapshot,
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
return user
|
|
|
|
def deactivate_user(self, user_id: int, updater_id: int, ip_address: str = None) -> User:
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user.active = False
|
|
|
|
audit = UserAudit(
|
|
user_id=user_id,
|
|
updater_id=updater_id,
|
|
action="deactivate",
|
|
updated_attribute="active",
|
|
before_value="True",
|
|
after_value="False",
|
|
snapshot={
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": True
|
|
},
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
return user
|
|
|
|
def activate_user(self, user_id: int, updater_id: int, ip_address: str = None) -> User:
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user.active = True
|
|
|
|
audit = UserAudit(
|
|
user_id=user_id,
|
|
updater_id=updater_id,
|
|
action="activate",
|
|
updated_attribute="active",
|
|
before_value="False",
|
|
after_value="True",
|
|
snapshot={
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": False
|
|
},
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
|
|
return user
|
|
|
|
def get_user_modifications(self, user_id: int, skip: int = 0, limit: int = 50):
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
return (self.db.query(UserAudit)
|
|
.filter(UserAudit.user_id == user_id)
|
|
.order_by(UserAudit.modification_date.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all())
|
|
|
|
def get_users_modifications(self, skip: int = 0, limit: int = 100):
|
|
return (self.db.query(UserAudit)
|
|
.order_by(UserAudit.modification_date.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all())
|
|
|
|
def delete_user(self, user_id: int, updater_id: int, ip_address: str = None):
|
|
user = self.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
snapshot = {
|
|
"name": user.name,
|
|
"email": user.email,
|
|
"rol": user.rol,
|
|
"active": user.active
|
|
}
|
|
|
|
self.db.delete(user)
|
|
|
|
audit = UserAudit(
|
|
user_id=user_id,
|
|
updater_id=updater_id,
|
|
action="delete",
|
|
after_value="User deleted",
|
|
snapshot=snapshot,
|
|
ip_address=ip_address
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit() |