231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
import re
|
|
|
|
|
|
from database import get_db
|
|
from models import User
|
|
from schemas import (
|
|
UserCreate, UserUpdate, UserResponse, UserWithModifications,
|
|
RoleChangeRequest, LoginRequest, Token, ModificationResponse
|
|
)
|
|
from services import UserService, AuthService
|
|
from dependencies import get_current_user, require_admin, require_admin_or_owner, oauth2_scheme
|
|
|
|
router = APIRouter()
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
# Rutas de Autenticación
|
|
@router.post("/login", response_model=Token)
|
|
def login(
|
|
login_data: LoginRequest,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
user_service = UserService(db)
|
|
user = AuthService.authenticate_user(db, login_data.email, login_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password",
|
|
)
|
|
|
|
access_token = AuthService.create_access_token(data={"user_id": user.id})
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user": user
|
|
}
|
|
|
|
# Rutas de Usuarios
|
|
@router.get("/users/", response_model=List[UserResponse])
|
|
def get_all_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
return user_service.get_all_users(skip, limit)
|
|
|
|
@router.get("/users/pending", response_model=List[UserResponse])
|
|
def get_pending_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
return user_service.get_pending_users(skip, limit)
|
|
|
|
@router.get("/users/me", response_model=UserResponse)
|
|
def get_current_user_info(current_user: User = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
@router.get("/users/{user_id}", response_model=UserWithModifications)
|
|
def get_user(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
user_service = UserService(db)
|
|
user = user_service.get_user_by_id(user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
# Verificar permisos (admin o el propio usuario)
|
|
if current_user.rol != 'admin' and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
|
|
|
return user
|
|
|
|
@router.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_user(
|
|
user_data: UserCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
return user_service.create_user(user_data, current_user.id, ip_address)
|
|
|
|
@router.post("/users/common", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
@limiter.limit("5/minute")
|
|
def create_common_user(
|
|
user_data: UserCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Endpoint público para que cualquier usuario (no autenticado) pueda registrarse.
|
|
|
|
The created user will always have `rol='operator'` and `active=False`.
|
|
For audit purposes, when no authenticated updater is available we record the
|
|
created user itself as the `updater` (self-created).
|
|
"""
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
|
|
# Ensure role and active defaults regardless of input
|
|
user_data = user_data.copy(update={"rol": "operator", "active": False})
|
|
|
|
# Pass updater_id=None so the service will mark the new user as the updater
|
|
return user_service.create_user(user_data, updater_id=None, ip_address=ip_address)
|
|
|
|
@router.get("/logs/users", response_model=List[ModificationResponse])
|
|
def get_users_modifications(
|
|
skip: int = 0,
|
|
limit: int = 200,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
# Verificar permisos
|
|
if current_user.rol != 'admin':
|
|
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
|
|
|
user_service = UserService(db)
|
|
return user_service.get_users_modifications(skip, limit)
|
|
|
|
@router.put("/users/{user_id}", response_model=UserResponse)
|
|
def update_user(
|
|
user_id: int,
|
|
user_data: UserUpdate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
user_service = UserService(db)
|
|
|
|
# Verificar permisos
|
|
if current_user.rol != 'admin' and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
|
edited_user = user_service.get_user_by_id(user_id)
|
|
|
|
"""Verify is the desactivated used is and admin, in this case check if there is at least another active admin before allowing the activation."""
|
|
if user_data.active is not None and edited_user.rol == 'admin' and user_data.active == False:
|
|
active_admins = user_service.count_active_admins()
|
|
if active_admins <= 1:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot deactivate this user because it is the only active admin. Please activate another admin first."
|
|
)
|
|
|
|
|
|
ip_address = request.client.host if request.client else None
|
|
return user_service.update_user(user_id, user_data, current_user.id, ip_address)
|
|
|
|
@router.patch("/users/{user_id}/role", response_model=UserResponse)
|
|
def change_user_role(
|
|
user_id: int,
|
|
role_data: RoleChangeRequest,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
return user_service.change_user_role(user_id, role_data.new_role, current_user.id, ip_address)
|
|
|
|
@router.post("/users/{user_id}/deactivate", response_model=UserResponse)
|
|
def deactivate_user(
|
|
user_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
"""Verify is the desactivated used is and admin, in this case check if there is at least another active admin before allowing the activation."""
|
|
if current_user.rol == 'admin':
|
|
desactivated_user = user_service.get_user_by_id(user_id)
|
|
if desactivated_user and desactivated_user.rol == 'admin':
|
|
active_admins = user_service.count_active_admins()
|
|
if active_admins <= 1:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot activate this user because it is the only active admin. Please activate another admin first."
|
|
)
|
|
return user_service.deactivate_user(user_id, current_user.id, ip_address)
|
|
|
|
@router.post("/users/{user_id}/activate", response_model=UserResponse)
|
|
def activate_user(
|
|
user_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
return user_service.activate_user(user_id, current_user.id, ip_address)
|
|
|
|
@router.get("/users/{user_id}/modifications", response_model=List[ModificationResponse])
|
|
def get_user_modifications(
|
|
user_id: int,
|
|
skip: int = 0,
|
|
limit: int = 200,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
# Verificar permisos
|
|
if current_user.rol != 'admin' and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
|
|
|
user_service = UserService(db)
|
|
return user_service.get_user_modifications(user_id, skip, limit)
|
|
|
|
|
|
@router.delete("/users/{user_id}", response_model=UserResponse)
|
|
def delete_user(
|
|
user_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin)
|
|
):
|
|
user_service = UserService(db)
|
|
ip_address = request.client.host if request.client else None
|
|
if current_user.rol != 'admin' and current_user.id != user_id:
|
|
raise HTTPException(status_code=403, detail="Insufficient permissions")
|
|
return user_service.delete_user(user_id, current_user.id, ip_address) |