rabbitmq added

This commit is contained in:
2026-03-26 00:12:37 +03:00
parent 2f799fc198
commit 98d413712e
3 changed files with 107 additions and 18 deletions

1
.env Normal file
View File

@@ -0,0 +1 @@
RABBITMQ_URL=amqp://guest:guest@localhost/

View File

@@ -46,9 +46,12 @@ def parse_links(facultets):
for a in excel_tags: for a in excel_tags:
last_changed = sibling_clear_to_date(a.next_sibling) last_changed = sibling_clear_to_date(a.next_sibling)
url = urljoin(BASE_URL, a.get('href')) url = urljoin(BASE_URL, a.get('href'))
disp = a.decode_contents()
record = { record = {
"uniqpath": f"vstu.ru/rasp?dep={facultet}/{disp.strip()}",
"facultet": facultet, "facultet": facultet,
"url": url, "url": url,
"display_filename": disp,
"last_changed": last_changed "last_changed": last_changed
} }
print("Found in vstu.ru: ", record) print("Found in vstu.ru: ", record)

121
main.py
View File

@@ -4,6 +4,7 @@
import json import json
import pika
import os import os
import random import random
import time import time
@@ -15,6 +16,22 @@ import utils
import json import json
import links_parser import links_parser
import shutil import shutil
from dotenv import load_dotenv
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule")
try:
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME,
exchange_type='topic',
durable=True)
except Exception as e:
print("Failed to connect RabbitMQ")
traceback.print_exception(e)
def currt(): def currt():
return round(time.time()) return round(time.time())
@@ -129,7 +146,6 @@ def save_parsed_state(excel_filename, obj):
print(f"Saved parsed state to '{filepath}'") print(f"Saved parsed state to '{filepath}'")
def run_session(): def run_session():
faileds = [] faileds = []
@@ -158,6 +174,7 @@ def run_session():
last_changeds = set() last_changeds = set()
states = [] states = []
changed = False
for excel_dict in EXCEL_LINKS: for excel_dict in EXCEL_LINKS:
try: try:
last_changeds.add(excel_dict['last_changed']) last_changeds.add(excel_dict['last_changed'])
@@ -166,7 +183,7 @@ def run_session():
facultet = excel_dict['facultet'] facultet = excel_dict['facultet']
excel_filename = excel_url.split("/")[-1] excel_filename = excel_url.split("/")[-1]
excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1] excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1]
print(f"Processing {facultet} {excel_filename}")
state = load_parsed_state(excel_filename) state = load_parsed_state(excel_filename)
is_new = state is None is_new = state is None
@@ -182,6 +199,22 @@ def run_session():
except Exception as e: except Exception as e:
print(f"Excel[{excel_filename}]: failed testify last_changed") print(f"Excel[{excel_filename}]: failed testify last_changed")
r = "parser.excel_found." + ("same" if same_date else "different") + "." + facultet
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=r,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_found",
"same": same_date,
"excel_dict": excel_dict
}, ensure_ascii=False).encode('utf-8')
)
print(f"RabbitMQ published r={r}")
if same_date: if same_date:
state['actual_at'] = currt() state['actual_at'] = currt()
try: try:
@@ -191,6 +224,7 @@ def run_session():
save_parsed_state(excel_filename, state) save_parsed_state(excel_filename, state)
continue continue
changed = True
excel_dict['different_in_this_session'] = True excel_dict['different_in_this_session'] = True
state['actual_at'] = currt() state['actual_at'] = currt()
state['excel'] = excel_dict state['excel'] = excel_dict
@@ -203,6 +237,20 @@ def run_session():
state['sheets'] = parse_sheets(download_place) state['sheets'] = parse_sheets(download_place)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.excel_parsed." + facultet,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_parsed",
"is_new": is_new,
"state": state
}, ensure_ascii=False).encode('utf-8')
)
save_parsed_state(excel_filename, state) save_parsed_state(excel_filename, state)
states.append(state) states.append(state)
@@ -232,20 +280,35 @@ def run_session():
"all_files": EXCEL_LINKS, "all_files": EXCEL_LINKS,
"faileds": faileds "faileds": faileds
}, fp=fp, ensure_ascii=False) }, fp=fp, ensure_ascii=False)
with open("result_v2.json", 'w', encoding="utf-8") as fp: if changed:
all_files = states all_files = states
json.dump({ d = {
"version": 2, "version": 2,
"notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ", "notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ",
"contact": "https://fazziclay.com/ или fazziclay@gmail.com", "contact": "https://fazziclay.com/ или fazziclay@gmail.com",
"api_notices": { "api_notices": {
"just_save_and_check_diffs": "просто сохраните и проверяйте разницу" "just_save_and_check_diffs": "просто сохраните и проверяйте разницу"
}, },
"actual_at": currt(), "actual_at": currt(),
"all_files": sorted(all_files, key=lambda d: d['excel']['url']), "all_files": sorted(all_files, key=lambda d: d['excel']['url']),
"faileds": faileds "faileds": faileds
}, fp=fp, ensure_ascii=False) }
with open("result_v2.json", 'w', encoding="utf-8") as fp:
json.dump(d, fp=fp, ensure_ascii=False)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.result_v2",
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "schedule_result_v2",
"data": d
}, ensure_ascii=False).encode('utf-8')
)
# Delete a non-empty directory and its contents # Delete a non-empty directory and its contents
try: try:
@@ -253,18 +316,24 @@ def run_session():
print(f"Directory '{DIRNAME}' and its contents deleted successfully.") print(f"Directory '{DIRNAME}' and its contents deleted successfully.")
except Exception as e: except Exception as e:
print(f"Error deleting directory '{DIRNAME}': {e}") print(f"Error deleting directory '{DIRNAME}': {e}")
return {"changed": changed}
def check_dirs(): def check_dirs():
if not os.path.exists(PARSED_DIR): if not os.path.exists(PARSED_DIR):
os.mkdir(PARSED_DIR) os.mkdir(PARSED_DIR)
def main(): def main():
while True: while True:
t = utils.StepTimeCounter()
err = None
sess = None
try: try:
check_dirs() check_dirs()
print("BEGIN run_session();") print("BEGIN run_session();")
run_session() sess = run_session()
print("END run_session();") print("END run_session();")
if DEBUG_ONE_FAC: if DEBUG_ONE_FAC:
@@ -272,8 +341,24 @@ def main():
break break
except Exception as e: except Exception as e:
err = e
print("Exception in run_session();") print("Exception in run_session();")
traceback.print_exception(e) traceback.print_exception(e)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.session_end." + ('complete' if err is None else 'failed'),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "session_end",
"err": err,
"duration": t.step(),
"session": sess
}, ensure_ascii=False).encode('utf-8')
)
print("Sleep for 30 minutes") print("Sleep for 30 minutes")
time.sleep(60*30) time.sleep(60*30)