import os import json import asyncio import logging import aiohttp import aio_pika from dotenv import load_dotenv load_dotenv() # --- КОНФИГУРАЦИЯ --- RABBITMQ_URL = os.getenv("RABBITMQ_URL") EXCHANGE_NAME = os.getenv("OUT_EXCHANGE", "vstu_schedule") VK_TOKEN = os.getenv("VK_TOKEN") VK_GROUP_ID = os.getenv("VK_GROUP_ID") # 237378775 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("VKPublisher") async def post_to_vk(text: str) -> bool: """Возвращает True при успехе, False при временной ошибке""" url = "https://api.vk.com/method/wall.post" params = { "owner_id": f"-{VK_GROUP_ID}", "from_group": 1, "message": text, "access_token": VK_TOKEN, "v": "5.131" } try: async with aiohttp.ClientSession() as session: async with session.post(url, data=params, timeout=15) as resp: result = await resp.json() if "error" in result: err_code = result['error']['error_code'] logger.error(f"[!] VK Error {err_code}: {result['error']['error_msg']}") # Если ошибка в токене или правах (код 5, 15, 100) — ретрай бесполезен if err_code in [5, 15, 100]: return True # Условно "успех", чтобы не зацикливать очередь return False # Временная ошибка (лимиты, сервер) return True except Exception as e: logger.error(f"Network error in post_to_vk: {e}") return False async def process_message(message: aio_pika.IncomingMessage): """Парсит событие от SLS с гарантией доставки""" try: # Мы НЕ используем 'async with message.process()', чтобы контролировать ack/nack вручную payload = json.loads(message.body.decode()) # 1. Подготовка текста (Логика данных) shm = payload.get("humanlike_message", "") if not shm: logger.warning("Пустое сообщение. Удаляем из очереди.") await message.ack() # Подтверждаем, чтобы не висело, раз оно пустое return facultet = payload.get("facultet", "news").upper() vk_text = f"#{facultet}@vstu_rasp\n\n{shm}" ai = payload.get("ai_summary") if ai and isinstance(ai, dict) and 'wide_review' in ai: vk_text += f"\n\n🤖 AI Резюме: {ai['wide_review']}" vk_text += "\n\nОтказ от ответственности: Предоставляется КАК-ЕСТЬ без каких-либо гарантий." # 2. Попытка публикации (Логика сети) success = await post_to_vk(vk_text) if success: # ТОЛЬКО ТЕПЕРЬ удаляем из очереди await message.ack() logger.info(f"[v] Сообщение успешно обработано и подтверждено.") else: # VK API недоступен или лимиты — возвращаем в очередь logger.warning(f"[!] Ошибка публикации. Возврат сообщения в очередь для повтора...") await asyncio.sleep(10) # Небольшая пауза, чтобы не спамить ретраями мгновенно await message.nack(requeue=True) except json.JSONDecodeError: logger.error("Критическая ошибка: битый JSON. Удаление сообщения.") await message.ack() # Ack, потому что ретрай не поможет распарсить плохой JSON except Exception as e: logger.error(f"Непредвиденная ошибка в воркере: {e}") # В случае неизвестной ошибки лучше вернуть в очередь (requeue) await message.nack(requeue=True) async def main(): if not VK_TOKEN: logger.error("VK_TOKEN не найден в .env!") return connection = await aio_pika.connect_robust(RABBITMQ_URL) channel = await connection.channel() # Объявляем тот же обменник, в который пишет SLS exchange = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True) # Создаем свою очередь для VK (чтобы сообщения не пересекались с телеграмом) queue = await channel.declare_queue("vk_publisher_queue", durable=True) # Подписываемся на события изменения файлов # Маска: schedule_logging_service.event.excel.# await queue.bind(exchange, routing_key="schedule_logging_service.event.excel.#") logger.info(f"[*] VK Publisher запущен. Ожидание событий из {EXCHANGE_NAME}...") await queue.consume(process_message) try: await asyncio.Future() # Вечный цикл finally: await connection.close() if __name__ == "__main__": asyncio.run(main())