Files
VSTU_Schedule_Parser/main.py
2026-03-18 22:15:49 +03:00

254 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright Stanislav Mironov
# Общее правило проекта, сначала в координатах идёт ROW а потом COL, нумерация с нуля
import json
import os
import random
import time
import traceback
import uuid
import parser
import translations
import utils
import json
import links_parser
import shutil
def currt():
return round(time.time())
FACULTETS = sorted([
"asp", "mag", "fastiv", "fat", "ftkm", "ftpp", "feu", "fevt", "htf", "vkf", "mmf", "fpik"
])
DIRNAME = "excels"
PARSED_DIR = "parsed"
DEBUG_ONE_FAC = None #'fevt'
parser.LOGGING = LOGGING = False
def parse_sheets(download_place):
to_return = {}
try:
reader = translations.create_reader(download_place)
print("Reader info")
print(reader.info())
while True:
t = utils.StepTimeCounter()
print(f"Parsing sheet №{reader.get_sheet_index()+1} (from 1)")
sheet_dict = {
"index": reader.get_sheet_index(),
"name": reader.get_sheet_name(),
"reader_info": reader.info(),
"groups": {}
}
to_return["SHEET_"+str(reader.get_sheet_index())] = sheet_dict
prs = parser.Parser(reader)
print("Parser created; parser.parse();")
prs.parse()
print("parsed done!")
sheet_dict['parse_time'] = round(t.step())
if len(prs.raw_no_schedule) > 0:
sheet_dict["other_raws"] = prs.raw_no_schedule
if len(prs.features) > 0:
sheet_dict["features"] = sorted(prs.features)
if prs.parser_error is not None:
sheet_dict["parser_error"] = prs.parser_error
if prs.parser_warnings is not None and len(prs.parser_warnings) > 0:
sheet_dict["parser_warnings"] = prs.parser_warnings
for group_name_key in prs.groups.keys():
gr = prs.groups[group_name_key]
sheet_dict['week_keys_metadata'] = prs.week_keys_metadata
sheet_dict['groups'][group_name_key] = gr
print(f"Populates {len(prs.groups)} groups: " + " ".join(prs.groups.keys()))
if not reader.has_next_sheet():
print("File ended")
break
else:
reader.next_sheet()
print("Next sheet!")
except Exception as e:
print(e)
traceback.print_exc()
u = uuid.uuid4()
to_return['error'] = {
"smile": ":(",
"error_message": str(e),
"log_anchor": str(u),
"time": currt()
}
print(f"Log Anchor: {u}")
return to_return
def parsed_file_path(excel_filename: str):
format = excel_filename.split(".")[-1]
fl = format.lower()
if fl not in ["json", "xls", "xlsx"]:
print(f"Unknown filename format: {excel_filename}")
return
if fl != "json":
excel_filename = excel_filename.replace("." + format, ".json")
excel_filename = excel_filename.lower()
filepath = PARSED_DIR + os.path.sep + excel_filename
return filepath
def load_parsed_state(excel_filename):
filepath = parsed_file_path(excel_filename)
if not os.path.exists(filepath):
return
with open(filepath, "r", encoding="utf-8") as fp:
return json.load(fp=fp)
def save_parsed_state(excel_filename, obj):
filepath = parsed_file_path(excel_filename)
with open(filepath, "w", encoding="utf-8") as fp:
json.dump(obj, fp=fp, ensure_ascii=False, sort_keys=True)
print(f"Saved parsed state to '{filepath}'")
def run_session():
faileds = []
t = utils.StepTimeCounter()
# Delete tempdir
try:
try:
shutil.rmtree(DIRNAME)
print(f"Directory '{DIRNAME}' and its contents deleted successfully.")
except Exception as e:
print(f"Error deleting directory '{DIRNAME}': {e}")
os.mkdir(DIRNAME)
print(f"Directory '{DIRNAME}' created successfully.")
except Exception as e:
print(f"Failed create '{DIRNAME}': ")
raise e
print("main(); parse links starting...")
EXCEL_LINKS = links_parser.parse_links(FACULTETS if DEBUG_ONE_FAC is None else [DEBUG_ONE_FAC])
last_changeds = set()
for excel_dict in EXCEL_LINKS:
try:
last_changeds.add(excel_dict['last_changed'])
excel_url = excel_dict['url']
facultet = excel_dict['facultet']
excel_filename = excel_url.split("/")[-1]
excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1]
state = load_parsed_state(excel_filename)
is_new = state is None
if is_new:
state = {}
else:
same_date = False
try:
same_date = state['excel']['last_changed'] == excel_dict['last_changed']
print(f"Excel[{excel_filename}]: inServer={excel_dict['last_changed']}, inState={state['excel']['last_changed']} same={same_date}")
except Exception as e:
print(f"Excel[{excel_filename}]: failed testify last_changed")
if same_date:
state['actual_at'] = currt()
try:
del state['excel']['different_in_this_session']
except: pass
save_parsed_state(excel_filename, state)
continue
excel_dict['different_in_this_session'] = True
state['actual_at'] = currt()
state['excel'] = excel_dict
is_xlsx = excel_url.endswith(".xlsx")
download_place = f"{DIRNAME}/" + excel_filename + "_" + facultet + ".xls" + ("x" if is_xlsx else "")
utils.download_file_from_url(excel_url, download_place)
sha1hash = utils.calculate_sha1(download_place)
state['excel']['sha1hash'] = sha1hash
state['sheets'] = parse_sheets(download_place)
save_parsed_state(excel_filename, state)
except Exception as e:
faileds.append({
"uuid": str(uuid.uuid4()),
"exception": str(e),
"traceback": traceback.format_exception(e),
"context": f"Failed process excel file {excel_dict['url']}"
})
traceback.print_exception(e)
with open("parser.json", 'w', encoding="utf-8") as fp:
lc = {"*_x": ":("}
try:
s = sorted(last_changeds)
lc = {
"early": s[0],
"newly": s[-1]
}
except: pass
json.dump({
"last_changeds": lc,
"actual_at": currt(),
"all_files": EXCEL_LINKS,
"faileds": faileds
}, fp=fp, ensure_ascii=False)
# Delete a non-empty directory and its contents
try:
shutil.rmtree(DIRNAME)
print(f"Directory '{DIRNAME}' and its contents deleted successfully.")
except Exception as e:
print(f"Error deleting directory '{DIRNAME}': {e}")
def main():
while True:
try:
print("BEGIN run_session();")
run_session()
print("END run_session();")
except Exception as e:
print("Exception in run_session();")
traceback.print_exception(e)
print("Sleep for 30 minutes")
time.sleep(60*30)
print("Wake up!")
if __name__ == "__main__":
print("Start")
main()
print("Bye!")