import os import asyncio import json import logging import traceback from dotenv import load_dotenv import aio_pika import aiohttp from aiogram import Bot, Dispatcher, types, F from aiogram.client.session.aiohttp import AiohttpSession from aiogram.utils.keyboard import InlineKeyboardBuilder from aiogram.filters import Command from aiohttp_socks import ProxyConnector from sqlalchemy import and_, select, delete from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from database import Chat, SentStatus, Subscription, SubType, Base from data_manager import DataManager import keyboards as kb import re import fnmatch from middleware import LoggingMiddleware load_dotenv() logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) # --- Настройки --- engine = create_async_engine(os.getenv("DATABASE_URL")) async_session = async_sessionmaker(engine, expire_on_commit=False) data_mgr = DataManager(os.getenv("FACULTETS_JSON_URL"), os.getenv("GROUPS_JSON_URL"), os.getenv("PARSER_JSON_URL")) bot = None dp = Dispatcher() # --- RabbitMQ Consumer v2 --- async def start_rabbitmq_consumer(): while bot is None: await asyncio.sleep(1) conn = await aio_pika.connect_robust(os.getenv("RABBITMQ_URL")) print("RabbitMQ conn created") async with conn: channel = await conn.channel() # В v2 используем Topic Exchange exchange = await channel.declare_exchange("vstu_schedule", aio_pika.ExchangeType.TOPIC, durable=True) queue = await channel.declare_queue(name="vstu_tg_public_bot-queue", durable=True) # Слушаем изменения в файлах await queue.bind(exchange, routing_key="schedule_logging_service.event.excel.#") async with queue.iterator() as q_iter: async for message in q_iter: async with message.process(): data = json.loads(message.body.decode('utf-8')) print(f"{message.body_size}: data={data}") try: await process_sls_event(data) except Exception as e: print(e) print("END") async def process_sls_event(data): affected_groups = data.get("diff_summary", {}).get("affected_groups", []) ai_text = data.get("ai_summary", {}) guid = data.get("log_guid", "???") async with async_session() as session: # Получаем ВСЕ подписки на группы, чтобы проверить паттерны # (В будущем здесь можно оптимизировать, выгружая только паттерны) stmt = select(Subscription).where(Subscription.deleted == False) all_subs = (await session.execute(stmt)).scalars().all() def is_match_func(sub: Subscription, check_value: str): if check_value is None or sub is None: return False if not sub.is_pattern and sub.value.lower() == check_value.lower(): return True elif sub.is_pattern and match_pattern(sub.value, check_value): return True return False for sub in all_subs: is_match = False value = None is_skip_humanlike = False if sub.sub_type == SubType.EXCEL: fn = data.get("filename", None) is_match = is_match_func(sub, fn) if is_match: value = fn elif sub.sub_type == SubType.GROUP: for g_name in affected_groups: if not is_match: is_match = is_match_func(sub, g_name) if is_match: is_skip_humanlike = not sub.is_pattern value = g_name else: print(f"Unknown sub_type: {sub.sub_type}") if is_match: text = data.get("humanlike_message", "") if not is_skip_humanlike else "" if value in ai_text.keys(): text = ai_text.get(value, "?_?") else: text += f"\n\nAI Ревью: {ai_text.get('wide_review', '???')}" await send_sub_text(session, sub, guid, text.strip()) async def send_sub_text(session: AsyncSession, sub: Subscription, guid: str, text: str, ads=None): print("send_sub_text") is_empty = len(text) == 0 if not is_empty and ads is not None: text += ("\n" + str(ads)) status = SentStatus(sub_id=sub.id, guid=guid, end_state="START_SENDING" if not is_empty else "EMPTY_TEXT", log=text) session.add(status) await session.commit() if is_empty: return await session.refresh(status) success = False err_text = None try: await bot.send_message( sub.chat_id, f"🔔 Обновление расписания ({sub.value}):\n{text}", parse_mode="HTML" ) print(f"SENT TO TG: chat_id={sub.chat_id}!") success = True except Exception as e: err_text = str(e) status.end_state = "SUCCESS" if success else "FAILED" status.log = err_text session.add(status) await session.commit() def match_pattern(pattern: str, text: str) -> bool: """Проверяет текст на соответствие маске (иит* -> ИИТ-273)""" # fnmatch делает именно то, что ты хочешь: иит* совпадет с иит-273 return fnmatch.fnmatch(text.lower(), pattern.lower()) # --- Handlers --- @dp.message(Command("start")) @dp.message(Command("vstu_rasp")) async def cmd_start(message: types.Message): builder = InlineKeyboardBuilder() builder.row(types.InlineKeyboardButton(text="👥 Подписка на группу", callback_data="sub_nav:group")) builder.row(types.InlineKeyboardButton(text="📄 Подписка на файл", callback_data="sub_nav:excel")) builder.row(types.InlineKeyboardButton(text="📋 Мои подписки", callback_data="my_subs")) builder.row(types.InlineKeyboardButton(text="🧩 Подписка по маске", callback_data="sub_mask_info")) await message.answer("Выберите действие:", reply_markup=builder.as_markup()) async with async_session() as session: await session.merge(Chat(chat_id=message.chat.id, name=message.chat.full_name, username=message.chat.username)) await session.commit() @dp.callback_query(F.data == "menu_start") async def back_to_start(callback: types.CallbackQuery): await cmd_start(callback.message) await callback.message.delete() # Навигация: Выбор факультета @dp.callback_query(F.data.startswith("sub_nav:")) async def sub_nav_fac(callback: types.CallbackQuery): mode = callback.data.split(":")[1] # group или excel await callback.message.edit_text("Выберите:", reply_markup=kb.fac_kb(f"fac_pick:{mode}", data_mgr.facs)) # Навигация: Выбор конкретного элемента @dp.callback_query(F.data.startswith("fac_pick:")) async def sub_nav_items(callback: types.CallbackQuery): _, mode, fac_id = callback.data.split(":") if fac_id == "asp": await callback.message.answer("Аспиратны: у вас беды с названием групп (и тг бот ломается), используйте маску с фамилией преподователя, должно работать.") return if mode == "group": items = data_mgr.get_groups_by_fac(fac_id) await callback.message.edit_text("Выберите группу:", reply_markup=kb.items_kb("view_group", items, max_col=3, back="sub_nav:group")) else: items = data_mgr.get_files_by_fac(fac_id) await callback.message.edit_text("Выберите файл:", reply_markup=kb.items_kb("view_excel", items, back="sub_nav:excel")) # Просмотр группы @dp.callback_query(F.data.startswith("view_group:")) async def view_group(callback: types.CallbackQuery): g_id = callback.data.split(":")[1] g_name = data_mgr.groups[g_id]['real_name'] builder = InlineKeyboardBuilder() builder.row(types.InlineKeyboardButton(text="🔔 Подписаться", callback_data=f"suco:group:{g_name}")) builder.row(types.InlineKeyboardButton(text="⬅️ Назад", callback_data="menu_start")) await callback.message.edit_text(f"🏫 Группа: *{g_name}*", reply_markup=builder.as_markup(), parse_mode="Markdown") # Просмотр файла @dp.callback_query(F.data.startswith("view_excel:")) async def view_excel(callback: types.CallbackQuery): pre_index = callback.data.split(":")[1] # Находим имя файла для красоты h_name = "Excel файл" try: ex = data_mgr.parser['all_files'][int(pre_index)] h_name = ex['display_filename'] except Exception as e: print(f"safe?: {e}") builder = InlineKeyboardBuilder() builder.row(types.InlineKeyboardButton(text="🔔 Подписаться на файл", callback_data=f"suco:excel:{pre_index}")) builder.row(types.InlineKeyboardButton(text="⬅️ Назад", callback_data="menu_start")) await callback.message.edit_text(f"📄 Файл: *{h_name}*", reply_markup=builder.as_markup(), parse_mode="Markdown") @dp.callback_query(F.data.startswith("suco:")) async def sub_confirm(callback: types.CallbackQuery): parts = callback.data.split(":") mode, val = parts[1], parts[2] h_name = parts[3] if len(parts) > 3 else val sub_value = val if mode == "excel": pre_index = h_name try: excel = data_mgr.parser['all_files'][int(pre_index)] sub_value = excel['url'].split("/")[-1] h_name = sub_value except Exception as e: print(f"safe? e={e}") async with async_session() as session: # Регистрация пользователя если нет await session.merge(Chat(chat_id=callback.message.chat.id, name=callback.message.chat.full_name)) session.add(Subscription(chat_id=callback.message.chat.id, sub_type=SubType(mode), value=sub_value, human_name=h_name, created_by=callback.from_user.id)) await session.commit() await callback.answer("Подписка оформлена!") await callback.message.answer(f"✅ Готово! Я уведомлю, когда *{h_name}* изменится.", parse_mode="Markdown") # Команда /sub для групп @dp.message(Command("sub")) async def cmd_sub_group(message: types.Message): query = message.text.replace("/sub", "").strip() if not query: return await message.reply("Напишите группу, например: `/sub ИИТ-273`", parse_mode="Markdown") g_id, g_name = data_mgr.find_group_by_name(query) if not g_id: return await message.reply("Группа не найдена в базе v2.") builder = InlineKeyboardBuilder() builder.row(types.InlineKeyboardButton(text="🔔 Подписаться", callback_data=f"sub_confirm:group:{g_name}")) # В группах отвечаем реплаем await message.reply(f"Найдена группа: *{g_name}*", reply_markup=builder.as_markup(), parse_mode="Markdown") @dp.callback_query(F.data == "my_subs") async def my_subs(callback: types.CallbackQuery): async with async_session() as session: stmt = select(Subscription).where(and_(Subscription.chat_id == callback.message.chat.id, Subscription.deleted == False)) subs = (await session.execute(stmt)).scalars().all() if not subs: return await callback.answer("Подписок нет", show_alert=True) builder = InlineKeyboardBuilder() for s in subs: builder.row(types.InlineKeyboardButton(text=f"❌ {s.human_name}", callback_data=f"unsub:{s.id}")) builder.row(types.InlineKeyboardButton(text="⬅️ Меню", callback_data="menu_start")) await callback.message.edit_text("Ваши подписки (нажми, чтобы удалить):", reply_markup=builder.as_markup()) @dp.callback_query(F.data.startswith("unsub:")) async def unsub(callback: types.CallbackQuery): sid = int(callback.data.split(":")[1]) chat_id = callback.message.chat.id async with async_session() as session: stmt = select(Subscription).where( Subscription.id == sid, Subscription.chat_id == chat_id, Subscription.deleted == False ) result = await session.execute(stmt) sub = result.scalar_one_or_none() sub.deleted = True session.add(sub) await session.commit() # Проверяем, было ли реально что-то удалено await callback.answer("Подписка удалена") await my_subs(callback) @dp.callback_query(F.data == "sub_mask_info") async def sub_mask_info(callback: types.CallbackQuery): await callback.message.answer( "Команда для подписки по маске:\n" "`/sub_mask ИИТ*` — все группы ИИТ\n" "`/sub_mask *161` — все группы первого курса (161)\n" "`/sub_mask *` — ВООБЩЕ ВСЕ изменения в вузе\n\n" "Отправь команду в чат.", parse_mode="Markdown" ) @dp.message(Command("sub_mask")) async def cmd_sub_mask(message: types.Message): pattern = message.text.replace("/sub_mask", "").strip() if not pattern: return await message.reply("Пример: `/sub_mask ИИТ*`") # Защита от слишком коротких масок (кроме '*') if len(pattern) < 3 and pattern != "*": return await message.reply("Слишком короткая маска (минимум 3 символа или '*')") async with async_session() as session: await session.merge(Chat(chat_id=message.chat.id, name=message.chat.full_name)) session.add(Subscription( chat_id=message.chat.id, created_by=message.from_user.id, sub_type=SubType.GROUP, value=pattern, human_name=f"Маска: {pattern}", is_pattern=True )) await session.commit() await message.reply(f"✅ Подписка на маску `{pattern}` оформлена!", parse_mode="Markdown") # --- Запуск --- async def main(): global bot # Настройка прокси с таймаутом v2 (90 сек) connector = ProxyConnector.from_url(os.getenv("SOCKS5_URL")) bot = Bot(token=os.getenv("BOT_TOKEN"), session=AiohttpSession(proxy=os.getenv("SOCKS5_URL"))) # Важно: ставим таймаут для сессии aiohttp внутри бота bot.session._timeout = aiohttp.ClientTimeout(total=90) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) await data_mgr.refresh_cache(connector) dp.update.middleware(LoggingMiddleware()) # <-- добавьте эту строку asyncio.create_task(start_rabbitmq_consumer()) # Периодическое обновление индексов async def index_refresher(): while True: await asyncio.sleep(3600) await data_mgr.refresh_cache(connector) asyncio.create_task(index_refresher()) print("Start polling") await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())