import os import json import asyncio import logging from datetime import datetime, timezone import aio_pika from aiogram import Bot from aiogram.client.default import DefaultBotProperties from aiogram.enums import ParseMode from aiohttp_socks import ProxyConnector from dotenv import load_dotenv from aiogram.client.session.aiohttp import AiohttpSession load_dotenv() # --- КОНФИГУРАЦИЯ --- RABBITMQ_URL = os.getenv("RABBITMQ_URL") EXCHANGE_NAME = os.getenv("EXCHANGE_NAME", "vstu_schedule") TG_TOKEN = os.getenv("TG_TOKEN") TG_CHANNEL_ID = os.getenv("TG_CHANNEL_ID") PROXY_URL = os.getenv("PROXY_URL") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("TGPublisher") # Глобальный объект бота bot = None async def init_bot(): """Инициализация бота с расширенными таймаутами и SOCKS5h""" global bot # 1. Настройка таймаутов (увеличиваем до 90 секунд) # Это даст прокси-серверу больше времени на 'раскачку' session = AiohttpSession( proxy=PROXY_URL, timeout=90.0 # Секунды ) bot = Bot( token=TG_TOKEN, session=session, # Передаем настроенную сессию default=DefaultBotProperties(parse_mode=ParseMode.HTML) ) # Проверка связи при старте try: me = await bot.get_me() logger.info(f"[+] Бот авторизован: @{me.username} через прокси {PROXY_URL}") except Exception as e: logger.error(f"[!!!] Бот НЕ СМОГ авторизоваться: {e}") async def send_to_tg(text: str) -> bool: """Отправляет сообщение в канал. Возвращает True при успехе.""" try: await bot.send_message(chat_id=TG_CHANNEL_ID, text=text) logger.info("[+] Сообщение опубликовано в Telegram.") return True except Exception as e: logger.error(f"[!] Ошибка Telegram API: {e}") return False async def process_message(message: aio_pika.IncomingMessage): """Обработка события от SLS с гарантией доставки (Manual Ack)""" try: # Мы НЕ используем context manager 'async with message.process()', # так как нам нужен ручной ack только после успешного TG-поста payload = json.loads(message.body.decode()) shm = payload.get("humanlike_message", "") if not shm: logger.warning("Получено пустое сообщение, скип.") await message.ack() return # Формируем текст facultet = payload.get("facultet", "news").upper() # Экранируем HTML символы на всякий случай shm_safe = shm.replace("<", "<").replace(">", ">") tg_text = f"#{facultet}\n\n{shm_safe}" ai = payload.get("ai_summary") if ai and isinstance(ai, dict) and 'wide_review' in ai: tg_text += f"\n\n🤖 AI Резюме:\n{ai['wide_review']}" tg_text += "\n\n🔗 fazziclay.com" # Пытаемся отправить success = await send_to_tg(tg_text) if success: await message.ack() # Удаляем из RabbitMQ только после успеха в TG else: logger.warning("Повтор попытки через 15 секунд...") await asyncio.sleep(15) await message.nack(requeue=True) # Возвращаем в очередь except json.JSONDecodeError: logger.error("Критическая ошибка: невалидный JSON. Удаление.") await message.ack() except Exception as e: logger.error(f"Непредвиденная ошибка воркера: {e}") await message.nack(requeue=True) async def main(): if not TG_TOKEN or not TG_CHANNEL_ID: logger.error("Кредиты Telegram (Token/ChannelID) не найдены!") return # Инициализируем бота await init_bot() # Коннект к RabbitMQ connection = await aio_pika.connect_robust(RABBITMQ_URL) channel = await connection.channel() # Рекомендуется ограничить количество сообщений, обрабатываемых за раз await channel.set_qos(prefetch_count=1) exchange = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True) # Создаем независимую очередь для Telegram queue = await channel.declare_queue("tg_publisher_queue", durable=True) await queue.bind(exchange, routing_key="schedule_logging_service.event.excel.#") logger.info(f"[*] TG Publisher готов к работе. Канал: {TG_CHANNEL_ID}") await queue.consume(process_message) try: await asyncio.Future() # Вечный цикл finally: await connection.close() # Не забываем закрыть сессию бота if bot: await bot.session.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass