127 lines
3.8 KiB
Python
127 lines
3.8 KiB
Python
"""
|
|
Senders.py
|
|
Contiene los endpoints de CRUD de remitentes.
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from sqlalchemy.orm import Session
|
|
from database import get_db
|
|
import models
|
|
import schemas
|
|
from typing import List
|
|
from auth import get_current_user
|
|
from audit import log_action
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/senders/", response_model=schemas.SenderResponse, tags=['Senders'])
|
|
def create_sender(
|
|
sender: schemas.SenderCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: int = Depends(get_current_user)
|
|
):
|
|
existing = db.query(models.Sender).filter(
|
|
models.Sender.id_telegram == sender.id_telegram
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Sender with Telegram ID {sender.id_telegram} already exists"
|
|
)
|
|
|
|
db_sender = models.Sender(**sender.model_dump())
|
|
db.add(db_sender)
|
|
db.flush()
|
|
|
|
log_action(
|
|
db=db, entity_type='sender', entity_id=db_sender.id_telegram,
|
|
action='create', user_id=current_user,
|
|
after=db_sender, ip_address=request.client.host
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(db_sender)
|
|
return db_sender
|
|
|
|
@router.get("/senders/", response_model=List[schemas.SenderResponse], tags=['Senders'])
|
|
def read_senders(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
current_user: int = Depends(get_current_user)
|
|
):
|
|
return db.query(models.Sender).offset(skip).limit(limit).all()
|
|
|
|
@router.get("/senders/{sender_id}", response_model=schemas.SenderResponse, tags=['Senders'])
|
|
def read_sender(
|
|
sender_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: int = Depends(get_current_user)
|
|
):
|
|
db_sender = db.query(models.Sender).filter(
|
|
models.Sender.id_telegram == sender_id
|
|
).first()
|
|
if not db_sender:
|
|
raise HTTPException(status_code=404, detail="Sender not found")
|
|
return db_sender
|
|
|
|
@router.put("/senders/{sender_id}", response_model=schemas.SenderResponse, tags=['Senders'])
|
|
def update_sender(
|
|
sender_id: int,
|
|
sender: schemas.SenderCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: int = Depends(get_current_user)
|
|
):
|
|
db_sender = db.query(models.Sender).filter(
|
|
models.Sender.id_telegram == sender_id
|
|
).first()
|
|
if not db_sender:
|
|
raise HTTPException(status_code=404, detail="Sender not found")
|
|
|
|
before_snapshot = {
|
|
'id_telegram': db_sender.id_telegram,
|
|
'type': db_sender.type,
|
|
'username': db_sender.username,
|
|
'first_name': db_sender.first_name,
|
|
'last_name': db_sender.last_name,
|
|
'phone': db_sender.phone,
|
|
}
|
|
|
|
for field, value in sender.model_dump().items():
|
|
setattr(db_sender, field, value)
|
|
|
|
log_action(
|
|
db=db, entity_type='sender', entity_id=sender_id,
|
|
action='update', user_id=current_user,
|
|
before=before_snapshot, after=db_sender,
|
|
ip_address=request.client.host
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(db_sender)
|
|
return db_sender
|
|
|
|
@router.delete("/senders/{sender_id}", tags=['Senders'])
|
|
def delete_sender(
|
|
sender_id: int,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: int = Depends(get_current_user)
|
|
):
|
|
db_sender = db.query(models.Sender).filter(
|
|
models.Sender.id_telegram == sender_id
|
|
).first()
|
|
if not db_sender:
|
|
raise HTTPException(status_code=404, detail="Sender not found")
|
|
|
|
log_action(
|
|
db=db, entity_type='sender', entity_id=sender_id,
|
|
action='delete', user_id=current_user,
|
|
before=db_sender, ip_address=request.client.host
|
|
)
|
|
|
|
db.delete(db_sender)
|
|
db.commit()
|
|
return {"message": "Sender deleted successfully"} |