diff --git a/.gitignore b/.gitignore index 99010fe..26105a5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,7 @@ __pycache__ .idea result*.json groups.json +diffable_dates.txt +parsed/ +parser.json +.env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..604a7d8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["python", "-u", "main.py"] \ No newline at end of file diff --git a/aigenerated.py b/aigenerated.py deleted file mode 100644 index 468509c..0000000 --- a/aigenerated.py +++ /dev/null @@ -1,201 +0,0 @@ -# Copyright GEMINI - - -import re - -# --- Ресурсы для алгоритма --- -STOP_WORDS = {'пр.', 'лек.', 'лаб.', 'семинар'} -TITLES = ('доц.', 'проф.', 'асс.', 'ст.пр.') -SURNAME_ENDINGS = ('ов', 'ев', 'ин', 'ский', 'цкой', 'их', 'ых', 'ова', 'ева', 'ина', 'ская', 'ян', 'ко', "ня", "ин") - -def is_surname_string(s: str) -> bool: - """ - Классифицирует строку, определяя, содержит ли она фамилию. - """ - if not isinstance(s, str) or not s: - return False - - # Шаг 1: Очистка - s_clean = s.strip() - - # Шаг 2: Жесткие правила "НЕ ФАМИЛИЯ" - if s_clean.lower() in STOP_WORDS: - return False - if s_clean.isupper() and len(s_clean) > 3: - return False - if re.search(r'[\(\)/«»%]', s_clean): - return False - - words = s_clean.split() - if len(words) > 3: - return False - - # Шаг 3: Жесткие правила "ТОЧНО ФАМИЛИЯ" - if re.search(r'\b[А-Я]\.?', s_clean): # Ищет "И.А." или "И.А" - return True - if s_clean.lower().startswith(TITLES): - return True - - # Шаг 4: Эвристический анализ для оставшихся случаев - score = 0 - - # Правило на капитализацию - if words and words[0][0].isupper(): - score += 5 - else: - # Если слово не с большой буквы, это почти точно не фамилия - return False - - # Правило на окончания - last_word = words[-1].lower() - if last_word.endswith(SURNAME_ENDINGS): - score += 6 - - # Правило на количество слов - if len(words) in [1, 2]: - score += 2 - - # Пороговое значение - THRESHOLD = 8 - return score >= THRESHOLD - - - -def extract_last_name(name_str: str) -> str or None: - """ - Извлекает из строки только фамилию. - - Справляется с приклеенными званиями (типа "ст.пр.Дмитриев") и отбрасывает инициалы. - Ищет первое слово, написанное с заглавной буквы. - - Args: - name_str: Исходная "грязная" строка с именем. - - Returns: - Чистая фамилия в виде строки, или None, если фамилия не найдена. - """ - # Проверка, что на вход подана строка - if not isinstance(name_str, str): - return None - - # Паттерн для поиска: - # [А-ЯЁ] - одна заглавная русская буква в начале - # [а-яё]+ - одна или более строчных русских букв после неё - # (?:-[А-ЯЁ][а-яё]+)? - опциональная часть для двойных фамилий (например, -Петров) - pattern = r'[А-ЯЁ][а-яё]+(?:-[А-ЯЁ][а-яё]+)?' - - match = re.search(pattern, name_str) - - if match: - return match.group(0) # Возвращаем найденное совпадение - else: - return None # Если ничего не найдено - - -# --- Шаг 0: Константы --- -POSITIVE_KEYWORDS = ['зал', 'ауд', 'каб', 'корп', 'кор.', "ЛК"] -NEGATIVE_KEYWORDS = ['доц', 'проф', 'асс', 'лек', 'пр'] - -# Транслитерация для унификации -TRANS_TABLE = str.maketrans('TCBAHIMK', 'ТКВАНІМК') # Латиница -> Кириллица - -def is_room_number(s: str) -> bool: - """ - Проверяет, является ли строка номером кабинета, по многоуровневому алгоритму. - """ - # --- Шаг 1: Быстрые негативные фильтры --- - if not isinstance(s, str) or not s.strip(): - return False # 1. Проверка на пустоту - - if ',' in s: - return False # 2. Проверка на запятые (даты) - - # 3. Проверка на "очевидный мусор" - first_word = s.strip().lower().split()[0] - if first_word in NEGATIVE_KEYWORDS: - return False - - # 4. Проверка на инициалы (напр. А.Е.) - if re.search(r'\b[А-Я]\.[А-Я]\.?\b', s): - return False - - # 5. Проверка на длинное слово без цифр - if not any(char.isdigit() for char in s) and len(s.split()) == 1 and len(s) > 10: - return False - - # --- Шаг 2: Быстрые позитивные фильтры --- - s_lower = s.lower() - if any(keyword in s_lower for keyword in POSITIVE_KEYWORDS): - return True # 1. Проверка по ключевым словам - - # --- Шаг 3: Основной анализ --- - if not any(char.isdigit() for char in s): - return False # 1. Требуется наличие цифр - - # 2. Создание "чистой" версии - clean_s = s.upper().translate(TRANS_TABLE).replace(' ', '') - - # 3. Комплексный паттерн для проверки - # Пояснение: - # ^...$ - шаблон должен соответствовать всей строке - # [А-ЯЁ]?-? - необязательная буква и необязательный дефис в начале - # \d+ - одна или более цифр (ядро номера) - # (?:[.-]\d+)* - необязательные группы ".число" или "-число" - # [А-ЯЁ]?$ - необязательная буква в конце - room_pattern = re.compile(r'^[А-ЯЁ]?-?\d+(?:[.-]\d+)*[А-ЯЁ]?$') - - if room_pattern.fullmatch(clean_s): - return True - - # --- Шаг 4: Финальное решение --- - return False - - -import requests -from urllib.parse import urlsplit, urlunsplit, quote - -def download_file_from_url(url, output_filename): - """ - Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем. - - Args: - url (str): Исходный URL, который может содержать пробелы и кириллицу. - output_filename (str): Имя файла для сохранения (например, 'calc.xls'). - """ - try: - # --- Шаг 1: Правильное кодирование URL --- - # Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '') - parts = urlsplit(url) - - # Кодируем только путь, оставляя слэши '/' безопасными - # Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д. - encoded_path = quote(parts.path, safe='/-_') - - # Собираем URL обратно из частей с уже закодированным путем - encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment)) - - - # --- Шаг 2: Скачивание файла --- - response = requests.get(encoded_url, stream=True) - - # Проверяем, успешен ли запрос (код 200 OK) - # Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение - response.raise_for_status() - - # --- Шаг 3: Сохранение файла --- - # Открываем файл для записи в бинарном режиме ('wb') - # Использование 'with' гарантирует, что файл будет закрыт автоматически - with open(output_filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - - print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'") - - except requests.exceptions.RequestException as e: - print(f"❌ Ошибка скачивания: {e}") - - except Exception as e: - print(f"❌ Произошла непредвиденная ошибка: {e}") - - - diff --git a/links_parser.py b/links_parser.py index 3553032..212ba30 100644 --- a/links_parser.py +++ b/links_parser.py @@ -10,6 +10,11 @@ from bs4 import BeautifulSoup BASE_URL = "https://www.vstu.ru/" RASP_PREFIX = "https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php?dep=" +def sibling_clear_to_date(s: str): + if s is None: + return "!!!Python None!!!" + return s.lower().replace("(последнее изменение:", "").replace(")", "").strip() + # Парсит ссылки на эксель .xls & .xlsx файлы и выдаёт их def parse_links(facultets): session = requests.Session() @@ -27,7 +32,7 @@ def parse_links(facultets): } ) - EXCEL_LINKS = {} + EXCEL_LINKS = [] for facultet in facultets: url = RASP_PREFIX + facultet print("getting...") @@ -38,19 +43,19 @@ def parse_links(facultets): # Ищем все теги , у которых атрибут href соответствует нашему паттерну excel_tags = soup.find_all('a', href=excel_pattern) - excel_links = [tag.get('href') for tag in excel_tags] + for a in excel_tags: + last_changed = sibling_clear_to_date(a.next_sibling) + url = urljoin(BASE_URL, a.get('href')) + disp = a.decode_contents() + record = { + "uniqpath": f"vstu.ru/rasp?dep={facultet}/{disp.strip()}", + "facultet": facultet, + "url": url, + "display_filename": disp, + "last_changed": last_changed + } + print("Found in vstu.ru: ", record) + EXCEL_LINKS.append(record) - # Предположим, вы уже получили excel_links из одного из методов выше - # excel_links = ['../../../upload/raspisanie/z/ОН_ХТФ_1 курс.xlsx', ...] - - absolute_links = [urljoin(BASE_URL, relative_link) for relative_link in excel_links] - - if facultet not in EXCEL_LINKS.keys(): - EXCEL_LINKS[facultet] = set() - - for excel_url in absolute_links: - EXCEL_LINKS[facultet].add(excel_url) - print(f"+url {excel_url}") - - return EXCEL_LINKS + return sorted(EXCEL_LINKS, key=lambda x: x['url']) diff --git a/main.py b/main.py index f78a644..d4341d0 100644 --- a/main.py +++ b/main.py @@ -4,133 +4,93 @@ import json +import pika import os +import random import time import traceback import uuid -import aigenerated import parser import translations import utils import json import links_parser import shutil +from dotenv import load_dotenv +load_dotenv() + +RABBITMQ_URL = os.environ.get("RABBITMQ_URL") +EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule") + +try: + connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL)) + channel = connection.channel() + + channel.exchange_declare(exchange=EXCHANGE_NAME, + exchange_type='topic', + durable=True) +except Exception as e: + print("Failed to connect RabbitMQ") + traceback.print_exception(e) def currt(): return round(time.time()) -FACULTETS = [ +FACULTETS = sorted([ "asp", "mag", "fastiv", "fat", "ftkm", "ftpp", "feu", "fevt", "htf", "vkf", "mmf", "fpik" -] +]) DIRNAME = "excels" +PARSED_DIR = "parsed" -DEBUG_ONE_FAC = None #'htf' -result_groups = {} -result = { - "version": 1, - "notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: Данные, доступ к API и т.д. предоставляется КАК-ЕСТЬ (AS-IS) без каких либо, явно или не явно подразумеваемых гарантий.\n\nПарсер написал: Миронов Станислав\n\nИсточник данных: https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php", - "actual_at": round(time.time()), - "documentation": "https://fazziclay.com/api/v1/vstu_schedule_parser/scheme.json", - "daypicture": "QwQ", - "daycite": "running on a rope", - "contact": "https://fazziclay.com/", - "university": "VSTU", - "university_site": "https://www.vstu.ru/", - "source": "https://fazziclay.com/api/v1/vstu_schedule_parser/result.json", - "stat": { - "total_parsing_time": -1, - }, - "api_notices": { - "updated_at": 1757688552, - "text": "Пожалуйста сохраняйте 'updated_at', это время изменения ЭТОГО текста. Тут возможно будут появлятся важные BREAKING CHANGES и дедлайны к ним.\nПо хорошему если updated_at другой по сравнению с вашем кэшем это сообщение должно отправляться вам в телеграм как уведомление о поедстоящих изменениях\nwarning=True значит 'text' содержит важное а не как щас hint.\n\n ~fazziclay aka Stanislav;", - "warning": False, - "tut-plavayuschaya-struktura": "required only 'updated_at', 'text' and 'warning'" - }, - "doubled_groups": [], - "debug": { - "bleu~~": 1 - }, - "excels": [], - "facultets": FACULTETS, +DEBUG_ONE_FAC = None #'fevt' +DEBUG_NO_SAVE_STATES = False - "emptykey1": "", - "emptykey2": "", - - "groups": result_groups, - - "emptykey3": "", - "emptykey4": "", - "see_header_at_top_of_this_file": "SEE TOP OF THIS FILE | ОБРАТИТЕ ВНИМАНИЕ НА ВЕРХ ЭТОГО ФАЙЛА" -} - -def process_excel_file(facultet, excel_url, counter, timeid): - is_xlsx = excel_url.endswith(".xlsx") - filename = f"{DIRNAME}/" + timeid + f"_[C{counter}]_" + facultet + ".xls" + ("x" if is_xlsx else "") - - excel_info = { - "filename": excel_url.split("/")[-1], - "url": excel_url, - "download_place": filename, - "stat": { - "download": -1, - "create_reader": -1, - "parse": -1, - "cycles": 0 - }, - "group_names_parsed": [], - "facultet": facultet, - "counter": counter - } - parser.LOGGING = False +parser.LOGGING = LOGGING = True +def parse_sheets(download_place): + to_return = {} try: - t = utils.StepTimeCounter() - aigenerated.download_file_from_url(excel_url, filename) - excel_info["stat"]['download'] = t.step() - - reader = translations.create_reader(filename) + reader = translations.create_reader(download_place) print("Reader info") print(reader.info()) - excel_info["stat"]['create_reader'] = t.step() - + while True: - excel_info['stat']['cycles'] += 1 + t = utils.StepTimeCounter() print(f"Parsing sheet №{reader.get_sheet_index()+1} (from 1)") + sheet_dict = { + "index": reader.get_sheet_index(), + "name": reader.get_sheet_name(), + "reader_info": reader.info(), + "groups": {} + } + to_return["SHEET_"+str(reader.get_sheet_index())] = sheet_dict prs = parser.Parser(reader) print("Parser created; parser.parse();") prs.parse() print("parsed done!") + sheet_dict['parse_time'] = round(t.step()) + + if len(prs.raw_no_schedule) > 0: + sheet_dict["other_raws"] = prs.raw_no_schedule + + if len(prs.features) > 0: + sheet_dict["features"] = sorted(prs.features) + if prs.parser_error is not None: - excel_info["parser_error_cycle_" + str(excel_info['stat']['cycles'])] = prs.parser_error + sheet_dict["parser_error"] = prs.parser_error + + if prs.parser_warnings is not None and len(prs.parser_warnings) > 0: + sheet_dict["parser_warnings"] = prs.parser_warnings - for group_name in prs.groups.keys(): - if group_name in result_groups.keys(): - print(f" -- WTF -- Doubled groups -- name: {group_name}") - if 'warning_doubled_groups_skip' not in excel_info.keys(): - excel_info['warning_doubled_groups_skip'] = [] - - excel_info['warning_doubled_groups_skip'].append(group_name) - result['doubled_groups'].append(group_name) + for group_name_key in prs.groups.keys(): + gr = prs.groups[group_name_key] + sheet_dict['week_keys_metadata'] = prs.week_keys_metadata + sheet_dict['groups'][group_name_key] = gr + - - continue - - gr = result_groups[group_name] = prs.groups[group_name] - gr['facultet'] = facultet - gr['data_source'] = excel_url.split("/")[-1] - gr['debug'] = { - "counter": counter, - "timeid": timeid, - "excel_url": excel_url, - "reader_info": reader.info(), - "reader_sheet_index": reader.get_sheet_index(), - "filename": filename - } - excel_info["group_names_parsed"].append(group_name) - - print(f"Populates {len(prs.groups)} groups to result: " + " ".join(prs.groups.keys())) + print(f"Populates {len(prs.groups)} groups: " + " ".join(prs.groups.keys())) if not reader.has_next_sheet(): print("File ended") @@ -138,33 +98,60 @@ def process_excel_file(facultet, excel_url, counter, timeid): else: reader.next_sheet() print("Next sheet!") - - excel_info["stat"]['parse'] = t.step() - except Exception as e: - print(f"Error while {excel_url}") print(e) traceback.print_exc() u = uuid.uuid4() - excel_info['error'] = { + to_return['error'] = { "smile": ":(", "error_message": str(e), "log_anchor": str(u), "time": currt() } print(f"Log Anchor: {u}") - faileds.append({ - "ex": e, - "fac": facultet, - "url": excel_url - }) - - result['excels'].append(excel_info) + + return to_return -faileds = [] -def main(): +def parsed_file_path(excel_filename: str): + format = excel_filename.split(".")[-1] + fl = format.lower() + + if fl not in ["json", "xls", "xlsx"]: + print(f"Unknown filename format: {excel_filename}") + return + + if fl != "json": + excel_filename = excel_filename.replace("." + format, ".json") + + excel_filename = excel_filename.lower() + filepath = PARSED_DIR + os.path.sep + excel_filename + return filepath + +def load_parsed_state(excel_filename): + filepath = parsed_file_path(excel_filename) + if not os.path.exists(filepath): + return + + with open(filepath, "r", encoding="utf-8") as fp: + return json.load(fp=fp) + +def save_parsed_state(excel_filename, obj): + filepath = parsed_file_path(excel_filename) + if DEBUG_NO_SAVE_STATES: + print("Saved! (fake because DEBUG_NO_SAVE_STATES)") + + with open(filepath, "w", encoding="utf-8") as fp: + json.dump(obj, fp=fp, ensure_ascii=False, sort_keys=True) + + print(f"Saved parsed state to '{filepath}'") + +def run_session(): + faileds = [] + t = utils.StepTimeCounter() + + # Delete tempdir try: try: shutil.rmtree(DIRNAME) @@ -177,42 +164,206 @@ def main(): print(f"Failed create '{DIRNAME}': ") raise e + print("main(); parse links starting...") EXCEL_LINKS = links_parser.parse_links(FACULTETS if DEBUG_ONE_FAC is None else [DEBUG_ONE_FAC]) - counter = 0 - timeid = str(round(time.time())) - for facultet in EXCEL_LINKS.keys(): - counter += 1000 - print(f"\n\n-- Факультет '{facultet}' --") - facultet_urls = EXCEL_LINKS[facultet] - for excel_url in facultet_urls: - counter += 1 - print(f"\n\n-- Ссылка --") - print(f"{excel_url}") + + if len(EXCEL_LINKS) < 5 and not DEBUG_ONE_FAC: + raise Exception("Safety exception: excel links count < 5; maybe in vstu.ru tech works") + + + last_changeds = set() + states = [] + changed = False + for excel_dict in EXCEL_LINKS: + try: + last_changeds.add(excel_dict['last_changed']) + + excel_url = excel_dict['url'] + facultet = excel_dict['facultet'] + excel_filename = excel_url.split("/")[-1] + excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1] + print(f"Processing {facultet} {excel_filename}") + + state = load_parsed_state(excel_filename) + is_new = state is None + if is_new: + state = {} - print("Start processing excel file") - process_excel_file(facultet, excel_url, counter, timeid) - print("Excel file processing done!") + else: + same_date = False + try: + same_date = state['excel']['last_changed'] == excel_dict['last_changed'] + print(f"Excel[{excel_filename}]: inServer={excel_dict['last_changed']}, inState={state['excel']['last_changed']} same={same_date}") + + except Exception as e: + print(f"Excel[{excel_filename}]: failed testify last_changed") + + r = "parser.excel_found." + ("same" if same_date else "different") + "." + facultet + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key=r, + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "excel_file_found", + "same": same_date, + "excel_dict": excel_dict + }, ensure_ascii=False).encode('utf-8') + ) + print(f"RabbitMQ published r={r}") + + if same_date: + state['actual_at'] = currt() + try: + del state['excel']['different_in_this_session'] + except: pass + states.append(state) + save_parsed_state(excel_filename, state) + continue + + changed = True + excel_dict['different_in_this_session'] = True + state['actual_at'] = currt() + state['excel'] = excel_dict + + is_xlsx = excel_url.endswith(".xlsx") + download_place = f"{DIRNAME}/" + excel_filename + "_" + facultet + ".xls" + ("x" if is_xlsx else "") + utils.download_file_from_url(excel_url, download_place) + sha1hash = utils.calculate_sha1(download_place) + state['excel']['sha1hash'] = sha1hash + + state['sheets'] = parse_sheets(download_place) + + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.excel_parsed." + facultet, + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "excel_file_parsed", + "is_new": is_new, + "state": state + }, ensure_ascii=False).encode('utf-8') + ) + + save_parsed_state(excel_filename, state) + states.append(state) + + except Exception as e: + faileds.append({ + "uuid": str(uuid.uuid4()), + "exception": str(e), + "traceback": traceback.format_exception(e), + "context": f"Failed process excel file {excel_dict['url']}" + }) + traceback.print_exception(e) - print("Saving result.json") - result['stat']['total_parsing_time'] = t.step() + with open("parser.json", 'w', encoding="utf-8") as fp: + lc = {"*_x": ":("} + try: + s = sorted(last_changeds) + lc = { + "early": s[0], + "newly": s[-1] + } + except: pass + + json.dump({ + "last_changeds": lc, + "actual_at": currt(), + "all_files": EXCEL_LINKS, + "faileds": faileds + }, fp=fp, ensure_ascii=False) + + if changed: + all_files = states + d = { + "version": 2, + "notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ", + "contact": "https://fazziclay.com/ или fazziclay@gmail.com", + "api_notices": { + "just_save_and_check_diffs": "просто сохраните и проверяйте разницу" + }, + "actual_at": currt(), + "all_files": sorted(all_files, key=lambda d: d['excel']['url']), + "faileds": faileds + } + with open("result_v2.json", 'w', encoding="utf-8") as fp: + json.dump(d, fp=fp, ensure_ascii=False) + + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.result_v2", + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "schedule_result_v2", + "data": d + }, ensure_ascii=False).encode('utf-8') + ) - json.dump(result, open('result.json', 'w'), indent=2, ensure_ascii=False) - print("Saved to result.json indent=2") - - json.dump(result, open('result-no-indent.json', 'w'), ensure_ascii=False) - print("Saved to result-no-indent.json") - - print("Faileds:") - print(faileds) - - # Delete a non-empty directory and its contents + # Delete a non-empty directory and its contents try: shutil.rmtree(DIRNAME) print(f"Directory '{DIRNAME}' and its contents deleted successfully.") except Exception as e: print(f"Error deleting directory '{DIRNAME}': {e}") + + return {"changed": changed} + +def check_dirs(): + + if not os.path.exists(PARSED_DIR): + os.mkdir(PARSED_DIR) + +def main(): + while True: + t = utils.StepTimeCounter() + err = None + sess = None + try: + check_dirs() + + print("BEGIN run_session();") + sess = run_session() + print("END run_session();") + + if DEBUG_ONE_FAC: + print("DEBUG_ONE_FAC; break infinity-loop") + break + + except Exception as e: + err = e + print("Exception in run_session();") + traceback.print_exception(e) + + channel.basic_publish( + exchange=EXCHANGE_NAME, + routing_key="parser.session_end." + ('complete' if err is None else 'failed'), + properties=pika.BasicProperties( + content_type="application/json", + delivery_mode=2 + ), + body=json.dumps({ + "type": "session_end", + "err": str(err) if err else None, + "duration": t.step(), + "session": sess + }, ensure_ascii=False).encode('utf-8') + ) + + print("Sleep for 30 minutes") + time.sleep(60*30) + print("Wake up!") + if __name__ == "__main__": diff --git a/parser.py b/parser.py index 4a3f5c8..220c450 100644 --- a/parser.py +++ b/parser.py @@ -3,223 +3,393 @@ PAIR_NUMS = [ "1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16" ] +WEEKDAYS_STARTSWITH = [ + "понед", + "вторник", + "среда", + "четверг", + "пятница", + "суббота", + "воскр" +] +BAD_GROUP_NAMES = [ + "янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг" +] + +from datetime import time import json -import uuid -import aigenerated from coord import Coord, Merged from translations import ExcelSheetReader import utils +from collections import defaultdict -LOGGING = True +LOGGING = False def pprint(*args, **kwargs): if LOGGING: print(*args, **kwargs) + +def is_weeknum(text): + for wd in WEEKDAYS_STARTSWITH: + if text.strip().replace(" ", "").lower().startswith(wd): + return True + return False + +def is_pair(text): + for p in PAIR_NUMS: + if text.strip().replace(" ", "").lower().startswith(p): + return True + return False class Parser: def __init__(self, reader: ExcelSheetReader): self.reader = reader - self.groups = {} - self.teachers = set() - self.places = set() - self.parser_error = None + self.groups = {} # Группы которые удалось распарсить + self.features = set() # фичи данной страницы + self.week_keys_metadata = {} # календарик + self.schedule_range_row = None # [min, max] диапазон col включительно где расписание + self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..) + + self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata) + self.parser_error = None # ошибка парсера перед выходом + self.parser_warnings = [] # предупреждения парсера pprint("Parser created for '{0}'".format(reader.info())) def parse(self): - monday = self.reader.find("ПОНЕДЕЛЬНИК") - if monday is None: + # Характерные признаки разных сеток + no_pair_numeration = False + col_distance_pair_weekday = None + weekday_firstly_calendar = False + + first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True) + + if first_weekday is None: + self.features.add("no_weekdays") print(" -- Failed parse! -- ") - print("ПОНЕДЕЛЬНИК НЕ НАЙДЕН!") - self.parser_error = "'ПОНЕДЕЛЬНИК' не найден в таблице." + print("дни недели не найдены!") + self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет." + self.parse_raw_no_schedule() return - head_rx = monday.row - 1 # выше первого понидельника + pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True) + if pair_num_any is None: + no_pair_numeration = True + self.features.add("no_pair_numeration") + self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}") + + else: + self.features.add("pair_numeration") + col_distance_pair_weekday = pair_num_any.col - first_weekday.col + + head_rx = first_weekday.row - 1 # выше первого понидельника + group_col_start = first_weekday.col + 2 + if col_distance_pair_weekday is not None: + if col_distance_pair_weekday > 1: + weekday_firstly_calendar = True + self.features.add("weekdays_before_calendar") + group_col_start = pair_num_any.col + 1 + if head_rx < 0: - raise Exception("head_rx < 0: Программа пыталась найти 'ПОНЕДЕЛЬНИК', но по всей видимости не нашла.") + raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.") head = self.reader.get_row_values(head_rx) # get all ROW (months, groups) pprint(f"head={head}") - self.groups = parse_groups(self.reader, head, monday, head_rx) # parse groups to self.groups + + head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()]) + print(head_joined) + if (len(head_joined) == 0) or "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined: + head_rx -= 1 + self.raw_no_schedule.append(head_joined) + head = self.reader.get_row_values(head_rx) # get all ROW (months, groups) + pprint(f"head (upper)={head}") + self.features.add("post_groups_info_row") + + self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}') - + pprint("\n\n\n") for group in self.groups.values(): pprint("\nSTART OF PROCESS GROUP\n") - self.process_group(group, monday) + self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None) pprint("\nEND OF PROCESS GROUP\n") - pprint(self.teachers) + # week metadatas parse + S = 9999999 + group_min_col = S + group_min_row = S + + for x in self.groups.values(): + p = x['position'] + group_min_row = min(p[0], group_min_row) + group_min_col = min(p[1], group_min_col) + + if group_min_row != S and group_min_col != S: + pprint("Process weekmetadatas!") + self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col)) + + # parse no-schedule raws (согласовано, и т.д.) + self.parse_raw_no_schedule() - def parse_potokoviy(self, merged: Merged): - speaker = None - location = None + + def parse_raw_no_schedule(self): + """Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule""" + if self.schedule_range_row is None: + self.schedule_range_row = [999999999, 999999999] # прекрасное далёко + + row = 0 + while row < self.reader.get_row_count(): + if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]: + row = self.schedule_range_row[1] + 1 + + row_values = self.reader.get_row_values(row) + row_values = [v for v in row_values if isinstance(v, str) and v.strip()] + if len(row_values) > 0: + self.raw_no_schedule.append(row_values) + + row += 1 - # speaker - low = merged.low - speaker_pos = low.shift(down=merged.height()) - speaker = speaker_pos.cell(self.reader).value + def process_weekmetadatas(self, first_group: "Coord"): + """Обработать календарик""" + for x in self.weeknums.keys(): + pprint(x) + set_of_merged: set = self.weeknums[x] + l = len(set_of_merged) + if l != 1: + self.week_keys_metadata[x] = { + "error": True, + "error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!" + } + self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<") + continue + + weekday_merged: Merged = set_of_merged.pop() + if weekday_merged.width() != 1: + self.week_keys_metadata[x] = { + "error": True, + "error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})" + } + self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})") + continue + + month_row = first_group.row + curr_col = first_group.col - 1 + while curr_col >= 0: + month_pos = Coord(month_row, curr_col) + month_cell = month_pos.cell(self.reader) + if month_cell.is_empty(): + pprint("month cell is empty") + curr_col -= 1 + continue + + month_name = str(month_cell.value).strip() + pprint(month_cell) + all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height()) + pprint(f"all_nums_of_month={all_nums_of_month}") + if (x not in self.week_keys_metadata.keys()): + self.week_keys_metadata[x] = {} + + if (month_name not in self.week_keys_metadata[x].keys()): + self.week_keys_metadata[x][month_name] = [] + + for x2 in all_nums_of_month: + if x2.lower() == month_name.lower(): + pprint(f"Skip {x2} month number because it == month_name") + continue + + m = self.week_keys_metadata[x][month_name] + if x2 not in m: + try: + m.append(str(x2).replace(".0", "")) + except: + m.append(x2) + + curr_col -= 1 + - # location - location = merged.high.shift(down=1).cell(self.reader).value + def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"): + self.weeknums[week_key_name].add(merged) + + def row_with_schedule_notify(self, row_coord): + """Вызывается каждый раз когда в переданной row обранужено расписание""" + if self.schedule_range_row is None: + self.schedule_range_row = [row_coord, row_coord] + + if self.schedule_range_row[1] < row_coord: + self.schedule_range_row[1] = row_coord + + if self.schedule_range_row[0] > row_coord: + self.schedule_range_row[0] = row_coord - return {"loc": str(location).strip(), "leader": str(speaker).strip(), "name": str(merged.cell(self.reader).value).strip()} - - def process_group(self, group, monday): + def process_group(self, group: dict, first_weekday: Coord, pair_pos_col): """ Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups) group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'} """ pprint(f"process_group group={group}") group_name = group['name'] - pprint(group_name) - row = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля) + pprint(F"Имя группы: {group_name}") + row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля) + self.row_with_schedule_notify(group['position'][0]) + group_header_pos = Coord(group['position'][0], group['position'][1]) + width = group['width'] weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии. - previous_pair = None - while row < self.reader.get_row_count(): # maybe условие чтобы не уйти ниже чем есть строк - pos = Coord(row, group['position'][1]) # текущая позиция, верхний левый угол (=low) - pprint(f"while pos={pos}") - pos_right = pos.shift(right=3) - pair_pos = pos.replace(col=5) - weekday_pos = pos.replace(col=4) - merged = self.reader.get_merged_coord(pos) - merged_cell = merged.cell(self.reader) - cv = merged_cell.value - # В конце (12 пара:>) название группы, можно использовать как якорь - if utils.unspace(cv) == group_name: - pprint("Lesson == group name; ending group loop.") - break - - weekday_mr = self.reader.get_merged_coord(weekday_pos) - weekday = utils.unspace(weekday_mr.cell(self.reader).value) - pair_mr = self.reader.get_merged_coord(pair_pos) - pair = utils.unspace(pair_mr.cell(self.reader).value) - - skip = 0 - if weekday == "": - if weeknum == 1: - weeknum += 1 - pprint("------") - skip = 1 - row += 1 - else: - break + + weekcycles = 0 + while row_c1 < self.reader.get_row_count(): + pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low) + self.row_with_schedule_notify(pos_c1.row) - if not skip: - next = 3 # на сколько пыгнуть для следующего шага? + if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name): + pprint("Ended with grpup name; stop moving down, break") + break + + weekday_pos = pos_c1.replace(col=first_weekday.col) + weekday_cell = weekday_pos.cell(self.reader) + weekday_mr = self.reader.get_merged_coord(weekday_pos) + weekday = weekday_cell.value + + if not is_weeknum(weekday): + row_c1 += 1 + pprint("Not weeknum!") + if weekcycles > 0: + if (weeknum != 2): + pprint("Weeknum now 2") + weekday = 0 + weeknum = 2 + continue + + pprint(weekday) + weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2") + self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr) + + # state + event_no = 1 + is_widely = False + override_col_range = None + all_raw = set() + pairs = set() + times = [] + first_coord = None + + row_c2 = row_c1 + while row_c2 <= weekday_mr.high.row: + pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low) + cell_c2 = pos_c2.cell(self.reader) + mr_c2 = self.reader.get_merged_coord(pos_c2) - is_empty_lesson = len(utils.parse_all_dirt(self.reader, pos, 4, 3)) == 0 # если в поле не найдено ничего.. - parsed_discipline_name = None - parsed_location = None - parsed_leader = None - pairs = 1 - wtf_tomanypairs = False - is_solid = pos_right in merged - parsed_uncotigorized = [] - is_wide_maybe_potokoviy = merged.width() > 4 # потоковая ли лекция (занимает несколько групп.) - - if not is_empty_lesson: - cur = pos.shift(down=2) - while utils.has_no_bottom_border(self.reader, cur): - next += 3 - pairs += 1 - pprint(f"next = {next} cur={cur}") - if pairs >= 7: - wtf_tomanypairs = True - break - cur = cur.shift(down=3) - - if is_wide_maybe_potokoviy: - ret = self.parse_potokoviy(merged) - parsed_location = ret['loc'] - parsed_leader = ret['leader'] - parsed_discipline_name = ret['name'] - parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, merged.width(), next)) - - else: - if (is_solid): - parsed_discipline_name = cv - - parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, 4, next)) - - # попытка исправить пару (1-2) если пустая. - fuck_empty_pair_in_excel = pair == "" - previous_dump = previous_pair - if fuck_empty_pair_in_excel: - if previous_pair is None or previous_pair == "": - pair = f"EMPTY_IN_EXCEL_{uuid.uuid4()}" - else: - pair = utils.next_element(PAIR_NUMS, previous_pair) + if first_coord is None: + first_coord = pos_c2.row - if pair != "": - previous_pair = pair if next == 3 else None # костыль чтобы избежать гипотетически не верной даты. + pair_num = None + pair_num_mr = None + if pair_pos_col is not None: + pair_num = pos_c2.replace(col=pair_pos_col) + pair_num_mr = self.reader.get_merged_coord(pair_num) + + if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1): + is_widely = True + override_col_range = (mr_c2.low.col, mr_c2.high.col) + + col_low = group_header_pos.col + col_high = group_header_pos.col + width - 1 + if override_col_range is not None: + col_low = min(col_low, override_col_range[0]) + col_high = max(col_high, override_col_range[1]) - # пытаемся из некотегорезированных данных выцепить место и лидера (препода) - prepods = set() - if parsed_leader is not None: prepods.add(parsed_leader.strip()) - - locations = set() - if parsed_location is not None: locations.add(parsed_location.strip().replace(" ", "")) - - for x in list(parsed_uncotigorized): - if aigenerated.is_surname_string(x): - prepods.add(x.strip()) - - if aigenerated.is_room_number(x): - locations.add(x.strip().replace(" ", "") if x is not None else None) - - # попытка починить пустую дисциплину - if parsed_discipline_name is None: - l = sorted(utils.remove_from_list(list(parsed_uncotigorized), list(locations | prepods | set([parsed_location, parsed_leader])))) - parsed_discipline_name = " ".join(l) - - # чистим сеты от мусора - utils.discards_list(prepods, nones=True, emptystrings=True) - utils.discards_list(locations, nones=True, emptystrings=True) - utils.discards_list(parsed_uncotigorized, nones=True, emptystrings=True) - - # если не пустой предмет то записываем его - if not is_empty_lesson: - slots = group['slots'] - w = weekday + ("_1" if weeknum == 1 else "_2") - if w not in slots.keys(): - slots[w] = {} + dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True) + if len(dirty_line) > 0: + if pair_num_mr is not None: + pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip() + if len(pair_num_to_add) == 0: + pair_num_to_add = "???" + pprint("Составители эксельки? Вы почему не указали номер пары ёклмн") + pairs.add(pair_num_to_add) - today = slots[w] - today[pair] = { - "excel_pos": str(pos), - "discipline_name": parsed_discipline_name.strip(), - "locations": sorted(locations), - "leads": sorted(prepods), - "is_solid": is_solid, - "time_coeff": pairs, - "is_flow": is_wide_maybe_potokoviy, - "lefttopmerged": { - "width": merged.width(), - "height": merged.height(), - "excel_range": utils.merged_humanize(merged.as_numbers()) - }, - "raw": sorted(parsed_uncotigorized), - "weekday": utils.weekday_to_num(weekday), - "weeknum": weeknum - } - if fuck_empty_pair_in_excel: - today[pair]['pair_num_empty'] = { - "prev": previous_dump, - "restored": pair != "", - "pair": pair - } - if wtf_tomanypairs: - today[pair]['to_many_parsing_time_coeff'] = True - + for cell in dirty_line: + if not cell.is_time: + all_raw.add(str(cell.value)) + else: + dt: time = cell.value + times.append(str(dt)) - # INCREMENT на next и конец цикла. - row += next + def clean_state(): + nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord + is_widely = False + override_col_range = None + event_no += 1 + all_raw = set() + pairs = set() + first_coord = None + times = [] + + + if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0): + if not (len(all_raw) == 0): + # this code last for current state event + pprint(f"№{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}") + + slots = group['slots'] + w = weekday_key_name + if w not in slots.keys(): + slots[w] = {} + + pair_name = "????" + try: + pair_name = sorted(pairs)[0] + except: pass + + obj = { + "object": "event", + "pairs": sorted(pairs), + "is_flow": is_widely, + "excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)), + "raw": sorted(all_raw), + "weekday": utils.weekday_to_num(weekday), + "weeknum": weeknum + } + if len(times) > 0: + obj['times'] = times + + def smart_insert(first_dict, key, to_insert): + if key not in first_dict.keys(): + first_dict[key] = {} + if isinstance(first_dict[key], dict): + if len(first_dict[key].keys()) == 0: + first_dict[key] = to_insert + else: + p = first_dict[key] + first_dict[key] = [p, to_insert] + + elif isinstance(first_dict[key], list): + first_dict[key].append(to_insert) + + else: + self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)") + + if pair_pos_col is None: + smart_insert(slots, w, obj) + + else: + smart_insert(slots[w], pair_name, obj) + # here may be a empty all_raw + clean_state() + first_coord = None + + + if row_c2 >= weekday_mr.high.row: + clean_state() + pprint("Last for weekday") -def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx): + row_c2 += 1 + + row_c1 += weekday_mr.height() + weekcycles += 1 + +def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx): """Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)""" groups = {} i = 0 @@ -227,21 +397,26 @@ def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx): x = head[i] pprint(f"while i={i} head[i]={x}") merged = reader.get_merged_coord(Coord(head_rx, i)) - if i > monday.col + 1: - if merged is None or x == "": - break - - if merged.width() != 4: - pprint(f"WARNING: group header witdh !=4 (found: {merged.width()}); blocks !=4 not supported by parser.") + if i >= col_start: + if merged is None or x == "" or x is None: break name = utils.unspace(x) - groups[name] = { - "name": name, - "position": [head_rx, i], - "position_human": utils.merged_humanize(merged.as_numbers()), - "slots": {} - } + skip = False + if "-" not in name: + for x in BAD_GROUP_NAMES: + if x in name.lower(): + skip = True + pprint(f"Skip groupname {name} because not dash in name and in blacklist") + + if not skip: + groups[name.lower()] = { + "name": name, + "position": [head_rx, i], + "width": merged.width(), + "position_human": utils.merged_humanize(merged.as_numbers()), + "slots": {} + } if merged is None: i += 1 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ac55965 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +openpyxl +xlrd +beautifulsoup4 +requests +pika +python-dotenv \ No newline at end of file diff --git a/translations.py b/translations.py index c61b79d..4c7466d 100644 --- a/translations.py +++ b/translations.py @@ -1,6 +1,7 @@ -# --- Абстрактный базовый класс (Контракт) --- +# Copyright Stanislav Mironov from abc import ABC, abstractmethod +from datetime import time import openpyxl import xlrd @@ -10,13 +11,23 @@ from coord import Coord, Merged EMPTY_CTYPES = [xlrd.XL_CELL_EMPTY, xlrd.XL_CELL_BLANK] class TranschendentnostCell: - def __init__(self, value, is_empty): + def __init__(self, value, is_empty, is_time=False): self.value = value + self.is_time = isinstance(value, time) or is_time self._is_empty = is_empty + + def is_nospace_nocase_same(self, query): + try: + if self.value.lower().replace(" ", "").strip() == query.lower().replace(" ", "").strip(): + return True + except: pass + + return False def is_empty(self): return self._is_empty +# --- Абстрактный базовый класс (Контракт) --- class ExcelSheetReader(ABC): """ Абстрактный базовый класс, определяющий интерфейс для чтения данных из листа Excel. @@ -28,6 +39,10 @@ class ExcelSheetReader(ABC): @abstractmethod def get_sheet_index(self): pass + + @abstractmethod + def get_sheet_name(self): + pass @abstractmethod def has_next_sheet(self): @@ -71,16 +86,28 @@ class ExcelSheetReader(ABC): return "TODO: info" @abstractmethod - def cell(self, row, col): + def cell(self, row, col) -> TranschendentnostCell: """Возвращает абстрактную клетку""" pass - def find(self, query = None): + def find(self, query = None, startswith=False, nospace=False): + return self.find_any([query], startswith=startswith, nospace=nospace) + + def find_any(self, query = None, startswith=False, nospace=False): for rx in range(self.get_row_count()): i = 0 for x in self.get_row_values(rx): - if x == query: - return Coord(rx, i) + if nospace: + x = str(x).replace(" ", "").strip() + + for query_selected in query: + if x == query_selected: + return Coord(rx, i) + elif startswith: + try: + if str(x).lower().startswith(query_selected.lower()): + return Coord(rx, i) + except: pass i += 1 return None @@ -104,7 +131,6 @@ class ExcelSheetReader(ABC): # --- Реализация №1: Обертка для xlrd --- - class XlrdSheetReader(ExcelSheetReader): def __init__(self, file_path, sheet_index=0): super().__init__(file_path) @@ -117,6 +143,9 @@ class XlrdSheetReader(ExcelSheetReader): def init_sheet(self): self.sheet = self.book.sheet_by_index(self.sheet_index) + + def get_sheet_name(self): + return self.sheet.name def has_next_sheet(self): return self.sheet_index < len(self.book.sheet_names())-1 @@ -140,7 +169,24 @@ class XlrdSheetReader(ExcelSheetReader): def cell(self, row, col): """Возвращает абстрактную клетку""" c = self.sheet.cell(row, col) - return TranschendentnostCell(c.value, c.ctype in EMPTY_CTYPES) + is_empty = c.ctype in EMPTY_CTYPES + is_time = c.ctype == xlrd.XL_CELL_DATE + value = c.value + if is_empty: + value = "" + elif is_time: + if isinstance(value, float): + if value <= 1: + seconds = round(value * 86400) + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + value = time(hour=hours, second=seconds, minute=minutes) + else: + print(f"TODO: value is {value} its unix? not 0.xxxxxxxx") + else: + is_time = False + print("IsTime but not float!") + return TranschendentnostCell(value, is_empty, is_time=is_time) def get_border_style(self, coord: Coord, side): row = coord.row @@ -173,7 +219,6 @@ class XlrdSheetReader(ExcelSheetReader): # --- Реализация №2: Обертка-транслятор для openpyxl --- - class OpenpyxlSheetReader(ExcelSheetReader): def __init__(self, file_path, sheet_name=None): super().__init__(file_path) @@ -192,6 +237,9 @@ class OpenpyxlSheetReader(ExcelSheetReader): def get_sheet_index(self): return self.sheet_index + def get_sheet_name(self): + return self.workbook.sheetnames[self.sheet_index] + def has_next_sheet(self): return self.sheet_index < len(self.workbook.sheetnames)-1 @@ -221,7 +269,7 @@ class OpenpyxlSheetReader(ExcelSheetReader): c = self._get_cell(row, col) is_empty = (c.value is None) - return TranschendentnostCell("" if is_empty else c.value, is_empty) + return TranschendentnostCell("" if is_empty else c.value, is_empty, is_time=isinstance(c.value, time)) def get_cell_value(self, row, col): cell = self._get_cell(row, col) @@ -260,8 +308,7 @@ class OpenpyxlSheetReader(ExcelSheetReader): return [] -# --- Фабричная функция (Ваша единственная точка входа) --- - +# --- Фабричная функция (единственная точка входа) --- def create_reader(file_path, **kwargs) -> ExcelSheetReader: """ Создает и возвращает подходящий экземпляр ридера в зависимости от расширения файла. diff --git a/utils.py b/utils.py index 7fc2cb2..29b515b 100644 --- a/utils.py +++ b/utils.py @@ -1,12 +1,82 @@ - # Copyright Stanislav Mironov import time import xlrd -from coord import Coord, Merged +from coord import Coord from translations import ExcelSheetReader -import re +import hashlib + +import requests +from urllib.parse import urlsplit, urlunsplit, quote + +def download_file_from_url(url, output_filename): + """ + Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем. + + Args: + url (str): Исходный URL, который может содержать пробелы и кириллицу. + output_filename (str): Имя файла для сохранения (например, 'calc.xls'). + """ + try: + # --- Шаг 1: Правильное кодирование URL --- + # Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '') + parts = urlsplit(url) + + # Кодируем только путь, оставляя слэши '/' безопасными + # Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д. + encoded_path = quote(parts.path, safe='/-_') + + # Собираем URL обратно из частей с уже закодированным путем + encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment)) + + + # --- Шаг 2: Скачивание файла --- + response = requests.get(encoded_url, stream=True) + + # Проверяем, успешен ли запрос (код 200 OK) + # Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение + response.raise_for_status() + + # --- Шаг 3: Сохранение файла --- + # Открываем файл для записи в бинарном режиме ('wb') + # Использование 'with' гарантирует, что файл будет закрыт автоматически + with open(output_filename, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'") + + except requests.exceptions.RequestException as e: + print(f"❌ Ошибка скачивания: {e}") + + except Exception as e: + print(f"❌ Произошла непредвиденная ошибка: {e}") + + +def calculate_sha1(filepath): + """ + Calculates the SHA1 hash of a given file. + + Args: + filepath (str): The path to the file. + + Returns: + str: The hexadecimal representation of the SHA1 hash, or None if the file is not found. + """ + sha1_hash = hashlib.sha1() + try: + with open(filepath, "rb") as f: + # Read the file in chunks to handle large files efficiently + for chunk in iter(lambda: f.read(4096), b""): + sha1_hash.update(chunk) + return sha1_hash.hexdigest() + except FileNotFoundError: + print(f"Error: File not found at {filepath}") + return None + except Exception as e: + print(f"An error occurred: {e}") + return None class StepTimeCounter: def __init__(self): @@ -53,7 +123,7 @@ def remove_from_list(l: list, todel: list): return l -def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down): +def parse_all_dirt(reader: "ExcelSheetReader", min_pos: Coord, right, down, with_cells=False): RET = set() row = min_pos.row @@ -61,67 +131,14 @@ def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down): col = min_pos.col while col < min_pos.col + right: #print(excel_coordinate(row, col)) - cv = reader.get_cell_value(row, col) - value = str(cv).strip() - if cv is not None and len(value) > 0: - RET.add(value) + cv = reader.cell(row, col) + if cv is not None and not cv.is_empty(): + RET.add(cv if with_cells else str(cv.value)) col += 1 row += 1 return RET -# GEMINI GENERATED -def normalize_name(raw_name): - """ - Приводит разнородные записи ФИО к единому структурированному виду. - """ - # Шаг 1: Очистка - name = re.sub(r'\s+', ' ', raw_name).strip() - - # Шаг 2: Извлечение звания - known_titles = ['проф.', 'доц.', 'акад.', 'к.т.н.', 'д.м.н.'] - title = None - for t in known_titles: - if name.lower().startswith(t): - title = t - # Удаляем звание из строки, убираем лишние пробелы - name = name[len(t):].strip() - break - - # Шаг 3 и 4: Разделение и идентификация - parts = name.split(' ') - last_name = None - initials = None - - # Простой эвристический анализ - # Ищем инициалы (содержат точку или состоят из 1-2 заглавных букв) - initials_parts = [] - name_parts = [] - - for part in parts: - if '.' in part or (1 <= len(part) <= 2 and part.isupper()): - initials_parts.append(part) - else: - # Считаем все остальное частью фамилии (для двойных фамилий) - name_parts.append(part) - - if name_parts: - last_name = " ".join(name_parts) - - if initials_parts: - initials = "".join(initials_parts) # Сливаем "А." и "Н." в "А.Н." - - # Если фамилия не найдена (например, только инициалы), - # но есть части, считаем первую часть фамилией - if not last_name and name_parts: - last_name = name_parts[0] - - return { - "last_name": last_name, - "initials": initials, - "title": title - } - def excel_coordinate(row, col): """ Преобразует координаты строки и столбца (начиная с 0) в эквивалент Excel (например, A7, CB34). @@ -165,7 +182,7 @@ def find(sh, query = None): return None def weekday_to_num(st: str): - if st.upper().strip() == "ПОНЕДЕЛЬНИК": + if st.upper().strip().startswith("ПОНЕД"): return 1 if st.upper().strip() == "ВТОРНИК": return 2 @@ -177,8 +194,9 @@ def weekday_to_num(st: str): return 5 if st.upper().strip() == "СУББОТА": return 6 - if st.upper().strip() == "ВОСКРЕСЕНЬЕ": + if st.upper().strip().startswith("ВОСКР"): return 7 + print(f"Unknown weekday num for str: {st}; returnted -1") return -1 \ No newline at end of file