Files
2026-06-09 21:18:13 -03:00

150 lines
4.5 KiB
Python

"""
Main.py
Contiene métodos principales del sistema.
"""
import datetime
from xmlrpc import client
from fastapi import FastAPI
import models
from database import engine
from routers import groups, messages, senders, attachments, rules, alerts, mannage, notes, audit, stats
from integrations.chats import TelegramChatSingleton
import asyncio
import time
models.Base.metadata.create_all(bind=engine)
app = FastAPI(
title="Telegram Monitor API",
description="API para monitoreo de mensajes de Telegram",
version="0.1.0"
)
app.include_router(groups.router)
app.include_router(messages.router)
app.include_router(senders.router)
app.include_router(attachments.router)
app.include_router(rules.router)
app.include_router(alerts.router)
app.include_router(notes.router)
app.include_router(mannage.router)
app.include_router(audit.router)
app.include_router(stats.router)
@app.get("/")
def read_root():
return {"message": "Bienvenido a la API de Monitor de Telegram"}
@app.get("/health")
def health_check():
return {"status": "healthy"}
""""
Diccionario para almacenar las variables de estado.
"""
scraper_status = {
"is_active": False,
"last_cycle": None,
"last_error": None,
}
"""
Configuración del alimentador:
* Se recomienda no tener un cicle_time acotado para evitar bloqueos por "Flood" a la API de Telegram.
* Error_delay representa el tiempo a esperar hasta que se reinicie el alimentador tras un error en el ciclo.
"""
scraper_config = {
"cicle_time": 200,
"error_delay": 60,
}
@app.get("/telegram-status")
def telegram_status():
"""
Verificar estado del cliente del bucle principal
Reintenta la conexión desde un hilo nuevo para verificar si el archivo de sesión aún es válido
Modifica "is_active" y "last_error" dependiendo del estado de la sesión
"""
manager = TelegramChatSingleton._manager
# Verificar si el manager y cliente existen
if manager is None:
status = {
"is_active": False,
"last_cycle": scraper_status.get("last_cycle"),
"last_error": "Cliente de Telegram no inicializado",
}
print(status)
return status
# Verificar estado de la sesión con timeout
try:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(manager.is_active)
try:
is_active = future.result(timeout=5)
scraper_status["is_active"] = is_active
if is_active:
scraper_status["last_error"] = None
else:
scraper_status["last_error"] = "Sesión no autorizada"
except concurrent.futures.TimeoutError:
scraper_status["is_active"] = False
scraper_status["last_error"] = "Timeout verificando estado de sesión"
except Exception as e:
scraper_status["is_active"] = False
scraper_status["last_error"] = f"Error verificando estado: {str(e)}"
except Exception as e:
scraper_status["is_active"] = False
scraper_status["last_error"] = f"Error inesperado: {str(e)}"
status = {
"is_active": scraper_status["is_active"],
"last_cycle": scraper_status.get("last_cycle"),
"last_error": scraper_status.get("last_error"),
}
print(status)
return status
def run_sync_scraper():
"""
Inicializa el alimentador,
"""
while True:
try:
scraper = TelegramChatSingleton.get_scraper()
scraper_status["is_active"] = (
scraper.client is not None and scraper.client.is_connected()
)
scraper_status["last_error"] = None
print("Updatring Telegram Groups")
scraper.add_chats()
print("Feeding Groups")
scraper.feeder_loop()
scraper_status["last_cycle"] = datetime.datetime.utcnow().isoformat()
print("Cycle completed, waiting 5 minutes")
time.sleep(scraper_config["cicle_time"])
except Exception as e:
print(f"Error in scraper: {e}")
scraper_status["is_active"] = False
scraper_status["last_error"] = str(e)
# Ya no reseteamos scraper=None — el Singleton y el manager
# manejan la reconexión internamente en get_client()
time.sleep(scraper_config["error_delay"])
@app.on_event("startup")
async def startup_event():
asyncio.create_task(asyncio.to_thread(run_sync_scraper))