Files
VSTU_TG-Poster/main.py
FazziCLAY a696a76f24
All checks were successful
Build and Run VSTU TG Poster / build_and_run (push) Successful in 16s
fix notice
2026-04-04 19:17:47 +03:00

145 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import asyncio
import logging
from datetime import datetime, timezone
import aio_pika
from aiogram import Bot
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiohttp_socks import ProxyConnector
from dotenv import load_dotenv
from aiogram.client.session.aiohttp import AiohttpSession
load_dotenv()
# --- КОНФИГУРАЦИЯ ---
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
EXCHANGE_NAME = os.getenv("EXCHANGE_NAME", "vstu_schedule")
TG_TOKEN = os.getenv("TG_TOKEN")
TG_CHANNEL_ID = os.getenv("TG_CHANNEL_ID")
PROXY_URL = os.getenv("PROXY_URL")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("TGPublisher")
# Глобальный объект бота
bot = None
async def init_bot():
"""Инициализация бота с расширенными таймаутами и SOCKS5h"""
global bot
# 1. Настройка таймаутов (увеличиваем до 90 секунд)
# Это даст прокси-серверу больше времени на 'раскачку'
session = AiohttpSession(
proxy=PROXY_URL,
timeout=90.0 # Секунды
)
bot = Bot(
token=TG_TOKEN,
session=session, # Передаем настроенную сессию
default=DefaultBotProperties(parse_mode=ParseMode.HTML)
)
# Проверка связи при старте
try:
me = await bot.get_me()
logger.info(f"[+] Бот авторизован: @{me.username} через прокси {PROXY_URL}")
except Exception as e:
logger.error(f"[!!!] Бот НЕ СМОГ авторизоваться: {e}")
async def send_to_tg(text: str) -> bool:
"""Отправляет сообщение в канал. Возвращает True при успехе."""
try:
await bot.send_message(chat_id=TG_CHANNEL_ID, text=text)
logger.info("[+] Сообщение опубликовано в Telegram.")
return True
except Exception as e:
logger.error(f"[!] Ошибка Telegram API: {e}")
return False
async def process_message(message: aio_pika.IncomingMessage):
"""Обработка события от SLS с гарантией доставки (Manual Ack)"""
try:
# Мы НЕ используем context manager 'async with message.process()',
# так как нам нужен ручной ack только после успешного TG-поста
payload = json.loads(message.body.decode())
shm = payload.get("humanlike_message", "")
if not shm:
logger.warning("Получено пустое сообщение, скип.")
await message.ack()
return
# Формируем текст
facultet = payload.get("facultet", "news").upper()
# Экранируем HTML символы на всякий случай
shm_safe = shm.replace("<", "&lt;").replace(">", "&gt;")
tg_text = f"<b>#{facultet}</b>\n\n{shm_safe}"
ai = payload.get("ai_summary")
if ai and isinstance(ai, dict) and 'wide_review' in ai:
tg_text += f"\n\n🤖 <b>AI Резюме:</b>\n<i>{ai['wide_review']}</i>"
tg_text += "\n\nОтказ от ответственности: Предоставляется КАК-ЕСТЬ без каких-либо гарантий."
# Пытаемся отправить
success = await send_to_tg(tg_text)
if success:
await message.ack() # Удаляем из RabbitMQ только после успеха в TG
else:
logger.warning("Повтор попытки через 15 секунд...")
await asyncio.sleep(15)
await message.nack(requeue=True) # Возвращаем в очередь
except json.JSONDecodeError:
logger.error("Критическая ошибка: невалидный JSON. Удаление.")
await message.ack()
except Exception as e:
logger.error(f"Непредвиденная ошибка воркера: {e}")
await message.nack(requeue=True)
async def main():
if not TG_TOKEN or not TG_CHANNEL_ID:
logger.error("Кредиты Telegram (Token/ChannelID) не найдены!")
return
# Инициализируем бота
await init_bot()
# Коннект к RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
# Рекомендуется ограничить количество сообщений, обрабатываемых за раз
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True)
# Создаем независимую очередь для Telegram
queue = await channel.declare_queue("tg_publisher_queue", durable=True)
await queue.bind(exchange, routing_key="schedule_logging_service.event.excel.#")
logger.info(f"[*] TG Publisher готов к работе. Канал: {TG_CHANNEL_ID}")
await queue.consume(process_message)
try:
await asyncio.Future() # Вечный цикл
finally:
await connection.close()
# Не забываем закрыть сессию бота
if bot:
await bot.session.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass