Files
2026-06-09 21:18:13 -03:00

127 lines
3.8 KiB
Python

"""
Senders.py
Contiene los endpoints de CRUD de remitentes.
"""
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy.orm import Session
from database import get_db
import models
import schemas
from typing import List
from auth import get_current_user
from audit import log_action
router = APIRouter()
@router.post("/senders/", response_model=schemas.SenderResponse, tags=['Senders'])
def create_sender(
sender: schemas.SenderCreate,
request: Request,
db: Session = Depends(get_db),
current_user: int = Depends(get_current_user)
):
existing = db.query(models.Sender).filter(
models.Sender.id_telegram == sender.id_telegram
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Sender with Telegram ID {sender.id_telegram} already exists"
)
db_sender = models.Sender(**sender.model_dump())
db.add(db_sender)
db.flush()
log_action(
db=db, entity_type='sender', entity_id=db_sender.id_telegram,
action='create', user_id=current_user,
after=db_sender, ip_address=request.client.host
)
db.commit()
db.refresh(db_sender)
return db_sender
@router.get("/senders/", response_model=List[schemas.SenderResponse], tags=['Senders'])
def read_senders(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
current_user: int = Depends(get_current_user)
):
return db.query(models.Sender).offset(skip).limit(limit).all()
@router.get("/senders/{sender_id}", response_model=schemas.SenderResponse, tags=['Senders'])
def read_sender(
sender_id: int,
db: Session = Depends(get_db),
current_user: int = Depends(get_current_user)
):
db_sender = db.query(models.Sender).filter(
models.Sender.id_telegram == sender_id
).first()
if not db_sender:
raise HTTPException(status_code=404, detail="Sender not found")
return db_sender
@router.put("/senders/{sender_id}", response_model=schemas.SenderResponse, tags=['Senders'])
def update_sender(
sender_id: int,
sender: schemas.SenderCreate,
request: Request,
db: Session = Depends(get_db),
current_user: int = Depends(get_current_user)
):
db_sender = db.query(models.Sender).filter(
models.Sender.id_telegram == sender_id
).first()
if not db_sender:
raise HTTPException(status_code=404, detail="Sender not found")
before_snapshot = {
'id_telegram': db_sender.id_telegram,
'type': db_sender.type,
'username': db_sender.username,
'first_name': db_sender.first_name,
'last_name': db_sender.last_name,
'phone': db_sender.phone,
}
for field, value in sender.model_dump().items():
setattr(db_sender, field, value)
log_action(
db=db, entity_type='sender', entity_id=sender_id,
action='update', user_id=current_user,
before=before_snapshot, after=db_sender,
ip_address=request.client.host
)
db.commit()
db.refresh(db_sender)
return db_sender
@router.delete("/senders/{sender_id}", tags=['Senders'])
def delete_sender(
sender_id: int,
request: Request,
db: Session = Depends(get_db),
current_user: int = Depends(get_current_user)
):
db_sender = db.query(models.Sender).filter(
models.Sender.id_telegram == sender_id
).first()
if not db_sender:
raise HTTPException(status_code=404, detail="Sender not found")
log_action(
db=db, entity_type='sender', entity_id=sender_id,
action='delete', user_id=current_user,
before=db_sender, ip_address=request.client.host
)
db.delete(db_sender)
db.commit()
return {"message": "Sender deleted successfully"}