""" api_implementations.py Contiene métodos utilizados por el alimentador a lo largo de "chats.py" para manipular los endpoints del sistema. """ from dotenv import load_dotenv import os import requests from fastapi import APIRouter, HTTPException, status from typing import Dict, Optional from auth import get_internal_headers ##TODO EL QUILOMBO NUEVO## API_BASE_URL = os.getenv('API_URL') TIMEOUT = 5.0 router = APIRouter() load_dotenv() def patch_api_sync(validated_data: Dict, endpoint: str) -> Optional[Dict]: """Función síncrona para hacer PUT a la API""" try: response = requests.patch( f"{API_BASE_URL}/{endpoint}/", json=validated_data, headers=get_internal_headers(), timeout=TIMEOUT ) if response.status_code == status.HTTP_400_BAD_REQUEST: error_detail = response.json().get("detail", "") return error_detail response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Connection failed: {str(e)}" ) def post_api_sync(validated_data: Dict, endpoint: str) -> Optional[Dict]: """Función síncrona para hacer POST a la API""" try: response = requests.post( f"{API_BASE_URL}/{endpoint}/", json=validated_data, headers=get_internal_headers(), timeout=TIMEOUT ) if response.status_code == status.HTTP_400_BAD_REQUEST: error_detail = response.json().get("detail", "") if "already exists" in error_detail: return None response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Connection failed: {str(e)}" ) def get_api_sync(endpoint: str): """Función síncrona para hacer GET a la API""" try: response = requests.get( f"{API_BASE_URL}/{endpoint}/", headers=get_internal_headers() ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Connection failed: {str(e)}" ) def create_telegram_group(group_data: Dict) -> Optional[Dict]: """Crea un grupo en la API remota (versión síncrona)""" try: validated_data = { "id_telegram": group_data["id_telegram"], "name": group_data["name"], "description": group_data["description"], "type": group_data["type"], "message_position": 0 } return post_api_sync(validated_data, "groups") except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" ) def refresh_telegram_group(channel: int, group_data: Dict) -> Optional[Dict]: """Crea un grupo en la API remota (versión síncrona)""" try: validated_data = { "name": str(group_data["name"]), "description": str(group_data["description"]), "type": str(group_data["type"]) } print(f"La data que vamos intentar cargar es {validated_data}") return patch_api_sync(validated_data, f"groups/{channel}") except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" ) def act_message_posittion(group_id: int, message_id: int): try: validated_data = { "message_position": message_id } return patch_api_sync(validated_data, f"groups/{group_id}/update-position") except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" ) def create_sender(sender: Dict) -> Optional[Dict]: """Crea un mensaje en la API remota (versión síncrona)""" try: sender_data = { "id_telegram": sender["id_telegram"], "type": sender["type"], "username": sender["username"], "first_name": str(sender["first_name"]), "last_name": str(sender["last_name"]), "phone": str(sender["phone"]) } print(sender_data) return post_api_sync(sender_data, "senders") except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" ) def create_message(message: Dict) -> Optional[Dict]: """Crea un mensaje en la API remota (versión síncrona)""" try: message_data = { "id_mess_g": message["id_mess_g"], "content": str(message["content"]), "date": message["date"], "sender_id": message["sender_id"], "group_id": message["group_id"] } response = post_api_sync(message_data, "messages") if response is None: return None if message['attachments'] is not None: try: create_attachment(message['attachments']) except Exception as e: print(f"Failed to create attachment for message {response['id_mess_g']}: {str(e)}") return response except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" ) def create_attachment(attachment: Dict) -> Optional[Dict]: """Crea un mensaje en la API remota (versión síncrona)""" try: attachment_data = { "message_id": attachment["message_id"], "type": attachment["type"], "description": str(attachment["description"]), "isDownloaded": "false", "group_id": attachment["group_id"] } print(f"Trying to load attachment data: {attachment_data}") return post_api_sync(attachment_data, "attachments") except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid group data: {str(e)}" )