from fastapi import APIRouter, Depends, HTTPException, status, Request from sqlalchemy.orm import Session from typing import List from slowapi import Limiter from slowapi.util import get_remote_address import re from database import get_db from models import User from schemas import ( UserCreate, UserUpdate, UserResponse, UserWithModifications, RoleChangeRequest, LoginRequest, Token, ModificationResponse ) from services import UserService, AuthService from dependencies import get_current_user, require_admin, require_admin_or_owner, oauth2_scheme router = APIRouter() limiter = Limiter(key_func=get_remote_address) # Rutas de Autenticación @router.post("/login", response_model=Token) def login( login_data: LoginRequest, db: Session = Depends(get_db) ): user_service = UserService(db) user = AuthService.authenticate_user(db, login_data.email, login_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", ) access_token = AuthService.create_access_token(data={"user_id": user.id}) return { "access_token": access_token, "token_type": "bearer", "user": user } # Rutas de Usuarios @router.get("/users/", response_model=List[UserResponse]) def get_all_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) return user_service.get_all_users(skip, limit) @router.get("/users/pending", response_model=List[UserResponse]) def get_pending_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) return user_service.get_pending_users(skip, limit) @router.get("/users/me", response_model=UserResponse) def get_current_user_info(current_user: User = Depends(get_current_user)): return current_user @router.get("/users/{user_id}", response_model=UserWithModifications) def get_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): user_service = UserService(db) user = user_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") # Verificar permisos (admin o el propio usuario) if current_user.rol != 'admin' and current_user.id != user_id: raise HTTPException(status_code=403, detail="Insufficient permissions") return user @router.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user_data: UserCreate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) ip_address = request.client.host if request.client else None return user_service.create_user(user_data, current_user.id, ip_address) @router.post("/users/common", response_model=UserResponse, status_code=status.HTTP_201_CREATED) @limiter.limit("5/minute") def create_common_user( user_data: UserCreate, request: Request, db: Session = Depends(get_db), ): """Endpoint público para que cualquier usuario (no autenticado) pueda registrarse. The created user will always have `rol='operator'` and `active=False`. For audit purposes, when no authenticated updater is available we record the created user itself as the `updater` (self-created). """ user_service = UserService(db) ip_address = request.client.host if request.client else None # Ensure role and active defaults regardless of input user_data = user_data.copy(update={"rol": "operator", "active": False}) # Pass updater_id=None so the service will mark the new user as the updater return user_service.create_user(user_data, updater_id=None, ip_address=ip_address) @router.get("/logs/users", response_model=List[ModificationResponse]) def get_users_modifications( skip: int = 0, limit: int = 200, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): # Verificar permisos if current_user.rol != 'admin': raise HTTPException(status_code=403, detail="Insufficient permissions") user_service = UserService(db) return user_service.get_users_modifications(skip, limit) @router.put("/users/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_data: UserUpdate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): user_service = UserService(db) # Verificar permisos if current_user.rol != 'admin' and current_user.id != user_id: raise HTTPException(status_code=403, detail="Insufficient permissions") edited_user = user_service.get_user_by_id(user_id) """Verify is the desactivated used is and admin, in this case check if there is at least another active admin before allowing the activation.""" if user_data.active is not None and edited_user.rol == 'admin' and user_data.active == False: active_admins = user_service.count_active_admins() if active_admins <= 1: raise HTTPException( status_code=400, detail="Cannot deactivate this user because it is the only active admin. Please activate another admin first." ) ip_address = request.client.host if request.client else None return user_service.update_user(user_id, user_data, current_user.id, ip_address) @router.patch("/users/{user_id}/role", response_model=UserResponse) def change_user_role( user_id: int, role_data: RoleChangeRequest, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) ip_address = request.client.host if request.client else None return user_service.change_user_role(user_id, role_data.new_role, current_user.id, ip_address) @router.post("/users/{user_id}/deactivate", response_model=UserResponse) def deactivate_user( user_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) ip_address = request.client.host if request.client else None """Verify is the desactivated used is and admin, in this case check if there is at least another active admin before allowing the activation.""" if current_user.rol == 'admin': desactivated_user = user_service.get_user_by_id(user_id) if desactivated_user and desactivated_user.rol == 'admin': active_admins = user_service.count_active_admins() if active_admins <= 1: raise HTTPException( status_code=400, detail="Cannot activate this user because it is the only active admin. Please activate another admin first." ) return user_service.deactivate_user(user_id, current_user.id, ip_address) @router.post("/users/{user_id}/activate", response_model=UserResponse) def activate_user( user_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) ip_address = request.client.host if request.client else None return user_service.activate_user(user_id, current_user.id, ip_address) @router.get("/users/{user_id}/modifications", response_model=List[ModificationResponse]) def get_user_modifications( user_id: int, skip: int = 0, limit: int = 200, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): # Verificar permisos if current_user.rol != 'admin' and current_user.id != user_id: raise HTTPException(status_code=403, detail="Insufficient permissions") user_service = UserService(db) return user_service.get_user_modifications(user_id, skip, limit) @router.delete("/users/{user_id}", response_model=UserResponse) def delete_user( user_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin) ): user_service = UserService(db) ip_address = request.client.host if request.client else None if current_user.rol != 'admin' and current_user.id != user_id: raise HTTPException(status_code=403, detail="Insufficient permissions") return user_service.delete_user(user_id, current_user.id, ip_address)