From 98d413712ecceab19eeb05647f4b6b25e3791808 Mon Sep 17 00:00:00 2001 From: FazziCLAY Date: Thu, 26 Mar 2026 00:12:37 +0300 Subject: [PATCH] rabbitmq added --- .env | 1 + links_parser.py | 3 ++ main.py | 121 +++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..416200b --- /dev/null +++ b/.env @@ -0,0 +1 @@ +RABBITMQ_URL=amqp://guest:guest@localhost/ \ No newline at end of file diff --git a/links_parser.py b/links_parser.py index d2c032c..212ba30 100644 --- a/links_parser.py +++ b/links_parser.py @@ -46,9 +46,12 @@ def parse_links(facultets): for a in excel_tags: last_changed = sibling_clear_to_date(a.next_sibling) url = urljoin(BASE_URL, a.get('href')) + disp = a.decode_contents() record = { + "uniqpath": f"vstu.ru/rasp?dep={facultet}/{disp.strip()}", "facultet": facultet, "url": url, + "display_filename": disp, "last_changed": last_changed } print("Found in vstu.ru: ", record) diff --git a/main.py b/main.py index 4e7d5f3..8ad46eb 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import json +import pika import os import random import time @@ -15,6 +16,22 @@ import utils import json import links_parser import shutil +from dotenv import load_dotenv +load_dotenv() + +RABBITMQ_URL = os.environ.get("RABBITMQ_URL") +EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule") + +try: + connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL)) + channel = connection.channel() + + channel.exchange_declare(exchange=EXCHANGE_NAME, + exchange_type='topic', + durable=True) +except Exception as e: + print("Failed to connect RabbitMQ") + traceback.print_exception(e) def currt(): return round(time.time()) @@ -129,7 +146,6 @@ def save_parsed_state(excel_filename, obj): print(f"Saved parsed state to '{filepath}'") - def run_session(): faileds = [] @@ -158,6 +174,7 @@ def run_session(): last_changeds = set() states = [] + changed = False for excel_dict in EXCEL_LINKS: try: last_changeds.add(excel_dict['last_changed']) @@ -166,7 +183,7 @@ def run_session(): facultet = excel_dict['facultet'] excel_filename = excel_url.split("/")[-1] excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1] - + print(f"Processing {facultet} {excel_filename}") state = load_parsed_state(excel_filename) is_new = state is None @@ -182,6 +199,22 @@ def run_session(): except Exception as e: print(f"Excel[{excel_filename}]: failed testify last_changed") + r = "parser.excel_found." + ("same" if same_date else "different") + "." + facultet + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key=r, + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "excel_file_found", + "same": same_date, + "excel_dict": excel_dict + }, ensure_ascii=False).encode('utf-8') + ) + print(f"RabbitMQ published r={r}") + if same_date: state['actual_at'] = currt() try: @@ -191,6 +224,7 @@ def run_session(): save_parsed_state(excel_filename, state) continue + changed = True excel_dict['different_in_this_session'] = True state['actual_at'] = currt() state['excel'] = excel_dict @@ -203,6 +237,20 @@ def run_session(): state['sheets'] = parse_sheets(download_place) + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.excel_parsed." + facultet, + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "excel_file_parsed", + "is_new": is_new, + "state": state + }, ensure_ascii=False).encode('utf-8') + ) + save_parsed_state(excel_filename, state) states.append(state) @@ -232,20 +280,35 @@ def run_session(): "all_files": EXCEL_LINKS, "faileds": faileds }, fp=fp, ensure_ascii=False) - - with open("result_v2.json", 'w', encoding="utf-8") as fp: + + if changed: all_files = states - json.dump({ - "version": 2, - "notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ", - "contact": "https://fazziclay.com/ или fazziclay@gmail.com", - "api_notices": { - "just_save_and_check_diffs": "просто сохраните и проверяйте разницу" - }, - "actual_at": currt(), - "all_files": sorted(all_files, key=lambda d: d['excel']['url']), - "faileds": faileds - }, fp=fp, ensure_ascii=False) + d = { + "version": 2, + "notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ", + "contact": "https://fazziclay.com/ или fazziclay@gmail.com", + "api_notices": { + "just_save_and_check_diffs": "просто сохраните и проверяйте разницу" + }, + "actual_at": currt(), + "all_files": sorted(all_files, key=lambda d: d['excel']['url']), + "faileds": faileds + } + with open("result_v2.json", 'w', encoding="utf-8") as fp: + json.dump(d, fp=fp, ensure_ascii=False) + + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.result_v2", + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "schedule_result_v2", + "data": d + }, ensure_ascii=False).encode('utf-8') + ) # Delete a non-empty directory and its contents try: @@ -253,18 +316,24 @@ def run_session(): print(f"Directory '{DIRNAME}' and its contents deleted successfully.") except Exception as e: print(f"Error deleting directory '{DIRNAME}': {e}") + + return {"changed": changed} def check_dirs(): + if not os.path.exists(PARSED_DIR): os.mkdir(PARSED_DIR) -def main(): +def main(): while True: + t = utils.StepTimeCounter() + err = None + sess = None try: check_dirs() - print("BEGIN run_session();") - run_session() + print("BEGIN run_session();") + sess = run_session() print("END run_session();") if DEBUG_ONE_FAC: @@ -272,8 +341,24 @@ def main(): break except Exception as e: + err = e print("Exception in run_session();") traceback.print_exception(e) + + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.session_end." + ('complete' if err is None else 'failed'), + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "session_end", + "err": err, + "duration": t.step(), + "session": sess + }, ensure_ascii=False).encode('utf-8') + ) print("Sleep for 30 minutes") time.sleep(60*30)