Files
VSTU_Vk-Poster/main.py
FazziCLAY 6b72b9d479
All checks were successful
Build and Run VSTU Vk Poster / build_and_run (push) Successful in 24s
Initial commit
2026-04-04 18:18:05 +03:00

119 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import asyncio
import logging
import aiohttp
import aio_pika
from dotenv import load_dotenv
load_dotenv()
# --- КОНФИГУРАЦИЯ ---
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
EXCHANGE_NAME = os.getenv("OUT_EXCHANGE", "vstu_schedule")
VK_TOKEN = os.getenv("VK_TOKEN")
VK_GROUP_ID = os.getenv("VK_GROUP_ID") # 237378775
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("VKPublisher")
async def post_to_vk(text: str) -> bool:
"""Возвращает True при успехе, False при временной ошибке"""
url = "https://api.vk.com/method/wall.post"
params = {
"owner_id": f"-{VK_GROUP_ID}",
"from_group": 1,
"message": text,
"access_token": VK_TOKEN,
"v": "5.131"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, data=params, timeout=15) as resp:
result = await resp.json()
if "error" in result:
err_code = result['error']['error_code']
logger.error(f"[!] VK Error {err_code}: {result['error']['error_msg']}")
# Если ошибка в токене или правах (код 5, 15, 100) — ретрай бесполезен
if err_code in [5, 15, 100]:
return True # Условно "успех", чтобы не зацикливать очередь
return False # Временная ошибка (лимиты, сервер)
return True
except Exception as e:
logger.error(f"Network error in post_to_vk: {e}")
return False
async def process_message(message: aio_pika.IncomingMessage):
"""Парсит событие от SLS с гарантией доставки"""
try:
# Мы НЕ используем 'async with message.process()', чтобы контролировать ack/nack вручную
payload = json.loads(message.body.decode())
# 1. Подготовка текста (Логика данных)
shm = payload.get("humanlike_message", "")
if not shm:
logger.warning("Пустое сообщение. Удаляем из очереди.")
await message.ack() # Подтверждаем, чтобы не висело, раз оно пустое
return
facultet = payload.get("facultet", "news").upper()
vk_text = f"#{facultet}@vstu_rasp\n\n{shm}"
ai = payload.get("ai_summary")
if ai and isinstance(ai, dict) and 'wide_review' in ai:
vk_text += f"\n\n🤖 AI Резюме: {ai['wide_review']}"
vk_text += "\n\n🔗 fazziclay.com"
# 2. Попытка публикации (Логика сети)
success = await post_to_vk(vk_text)
if success:
# ТОЛЬКО ТЕПЕРЬ удаляем из очереди
await message.ack()
logger.info(f"[v] Сообщение успешно обработано и подтверждено.")
else:
# VK API недоступен или лимиты — возвращаем в очередь
logger.warning(f"[!] Ошибка публикации. Возврат сообщения в очередь для повтора...")
await asyncio.sleep(10) # Небольшая пауза, чтобы не спамить ретраями мгновенно
await message.nack(requeue=True)
except json.JSONDecodeError:
logger.error("Критическая ошибка: битый JSON. Удаление сообщения.")
await message.ack() # Ack, потому что ретрай не поможет распарсить плохой JSON
except Exception as e:
logger.error(f"Непредвиденная ошибка в воркере: {e}")
# В случае неизвестной ошибки лучше вернуть в очередь (requeue)
await message.nack(requeue=True)
async def main():
if not VK_TOKEN:
logger.error("VK_TOKEN не найден в .env!")
return
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()
# Объявляем тот же обменник, в который пишет SLS
exchange = await channel.declare_exchange(EXCHANGE_NAME, aio_pika.ExchangeType.TOPIC, durable=True)
# Создаем свою очередь для VK (чтобы сообщения не пересекались с телеграмом)
queue = await channel.declare_queue("vk_publisher_queue", durable=True)
# Подписываемся на события изменения файлов
# Маска: schedule_logging_service.event.excel.#
await queue.bind(exchange, routing_key="schedule_logging_service.event.excel.#")
logger.info(f"[*] VK Publisher запущен. Ожидание событий из {EXCHANGE_NAME}...")
await queue.consume(process_message)
try:
await asyncio.Future() # Вечный цикл
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())