Merge pull request 'economy_resources' (#1) from economy_resources into main

Reviewed-on: https://1e101_gitea.fazziclay.com/fazziclay/VSTU_Schedule_Parser/pulls/1
This commit is contained in:
2026-03-28 22:22:54 +03:00
9 changed files with 812 additions and 601 deletions

4
.gitignore vendored
View File

@@ -4,3 +4,7 @@ __pycache__
.idea .idea
result*.json result*.json
groups.json groups.json
diffable_dates.txt
parsed/
parser.json
.env

6
Dockerfile Normal file
View File

@@ -0,0 +1,6 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-u", "main.py"]

View File

@@ -1,201 +0,0 @@
# Copyright GEMINI
import re
# --- Ресурсы для алгоритма ---
STOP_WORDS = {'пр.', 'лек.', 'лаб.', 'семинар'}
TITLES = ('доц.', 'проф.', 'асс.', 'ст.пр.')
SURNAME_ENDINGS = ('ов', 'ев', 'ин', 'ский', 'цкой', 'их', 'ых', 'ова', 'ева', 'ина', 'ская', 'ян', 'ко', "ня", "ин")
def is_surname_string(s: str) -> bool:
"""
Классифицирует строку, определяя, содержит ли она фамилию.
"""
if not isinstance(s, str) or not s:
return False
# Шаг 1: Очистка
s_clean = s.strip()
# Шаг 2: Жесткие правила "НЕ ФАМИЛИЯ"
if s_clean.lower() in STOP_WORDS:
return False
if s_clean.isupper() and len(s_clean) > 3:
return False
if re.search(r'[\(\)/«»%]', s_clean):
return False
words = s_clean.split()
if len(words) > 3:
return False
# Шаг 3: Жесткие правила "ТОЧНО ФАМИЛИЯ"
if re.search(r'\b[А-Я]\.?', s_clean): # Ищет "И.А." или "И.А"
return True
if s_clean.lower().startswith(TITLES):
return True
# Шаг 4: Эвристический анализ для оставшихся случаев
score = 0
# Правило на капитализацию
if words and words[0][0].isupper():
score += 5
else:
# Если слово не с большой буквы, это почти точно не фамилия
return False
# Правило на окончания
last_word = words[-1].lower()
if last_word.endswith(SURNAME_ENDINGS):
score += 6
# Правило на количество слов
if len(words) in [1, 2]:
score += 2
# Пороговое значение
THRESHOLD = 8
return score >= THRESHOLD
def extract_last_name(name_str: str) -> str or None:
"""
Извлекает из строки только фамилию.
Справляется с приклеенными званиями (типа "ст.пр.Дмитриев") и отбрасывает инициалы.
Ищет первое слово, написанное с заглавной буквы.
Args:
name_str: Исходная "грязная" строка с именем.
Returns:
Чистая фамилия в виде строки, или None, если фамилия не найдена.
"""
# Проверка, что на вход подана строка
if not isinstance(name_str, str):
return None
# Паттерн для поиска:
# [А-ЯЁ] - одна заглавная русская буква в начале
# [а-яё]+ - одна или более строчных русских букв после неё
# (?:-[А-ЯЁ][а-яё]+)? - опциональная часть для двойных фамилий (например, -Петров)
pattern = r'[А-ЯЁ][а-яё]+(?:-[А-ЯЁ][а-яё]+)?'
match = re.search(pattern, name_str)
if match:
return match.group(0) # Возвращаем найденное совпадение
else:
return None # Если ничего не найдено
# --- Шаг 0: Константы ---
POSITIVE_KEYWORDS = ['зал', 'ауд', 'каб', 'корп', 'кор.', "ЛК"]
NEGATIVE_KEYWORDS = ['доц', 'проф', 'асс', 'лек', 'пр']
# Транслитерация для унификации
TRANS_TABLE = str.maketrans('TCBAHIMK', 'ТКВАНІМК') # Латиница -> Кириллица
def is_room_number(s: str) -> bool:
"""
Проверяет, является ли строка номером кабинета, по многоуровневому алгоритму.
"""
# --- Шаг 1: Быстрые негативные фильтры ---
if not isinstance(s, str) or not s.strip():
return False # 1. Проверка на пустоту
if ',' in s:
return False # 2. Проверка на запятые (даты)
# 3. Проверка на "очевидный мусор"
first_word = s.strip().lower().split()[0]
if first_word in NEGATIVE_KEYWORDS:
return False
# 4. Проверка на инициалы (напр. А.Е.)
if re.search(r'\b[А-Я]\.[А-Я]\.?\b', s):
return False
# 5. Проверка на длинное слово без цифр
if not any(char.isdigit() for char in s) and len(s.split()) == 1 and len(s) > 10:
return False
# --- Шаг 2: Быстрые позитивные фильтры ---
s_lower = s.lower()
if any(keyword in s_lower for keyword in POSITIVE_KEYWORDS):
return True # 1. Проверка по ключевым словам
# --- Шаг 3: Основной анализ ---
if not any(char.isdigit() for char in s):
return False # 1. Требуется наличие цифр
# 2. Создание "чистой" версии
clean_s = s.upper().translate(TRANS_TABLE).replace(' ', '')
# 3. Комплексный паттерн для проверки
# Пояснение:
# ^...$ - шаблон должен соответствовать всей строке
# [А-ЯЁ]?-? - необязательная буква и необязательный дефис в начале
# \d+ - одна или более цифр (ядро номера)
# (?:[.-]\d+)* - необязательные группы ".число" или "-число"
# [А-ЯЁ]?$ - необязательная буква в конце
room_pattern = re.compile(r'^[А-ЯЁ]?-?\d+(?:[.-]\d+)*[А-ЯЁ]?$')
if room_pattern.fullmatch(clean_s):
return True
# --- Шаг 4: Финальное решение ---
return False
import requests
from urllib.parse import urlsplit, urlunsplit, quote
def download_file_from_url(url, output_filename):
"""
Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем.
Args:
url (str): Исходный URL, который может содержать пробелы и кириллицу.
output_filename (str): Имя файла для сохранения (например, 'calc.xls').
"""
try:
# --- Шаг 1: Правильное кодирование URL ---
# Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '')
parts = urlsplit(url)
# Кодируем только путь, оставляя слэши '/' безопасными
# Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д.
encoded_path = quote(parts.path, safe='/-_')
# Собираем URL обратно из частей с уже закодированным путем
encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment))
# --- Шаг 2: Скачивание файла ---
response = requests.get(encoded_url, stream=True)
# Проверяем, успешен ли запрос (код 200 OK)
# Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение
response.raise_for_status()
# --- Шаг 3: Сохранение файла ---
# Открываем файл для записи в бинарном режиме ('wb')
# Использование 'with' гарантирует, что файл будет закрыт автоматически
with open(output_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'")
except requests.exceptions.RequestException as e:
print(f"❌ Ошибка скачивания: {e}")
except Exception as e:
print(f"❌ Произошла непредвиденная ошибка: {e}")

View File

@@ -10,6 +10,11 @@ from bs4 import BeautifulSoup
BASE_URL = "https://www.vstu.ru/" BASE_URL = "https://www.vstu.ru/"
RASP_PREFIX = "https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php?dep=" RASP_PREFIX = "https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php?dep="
def sibling_clear_to_date(s: str):
if s is None:
return "!!!Python None!!!"
return s.lower().replace("(последнее изменение:", "").replace(")", "").strip()
# Парсит ссылки на эксель .xls & .xlsx файлы и выдаёт их # Парсит ссылки на эксель .xls & .xlsx файлы и выдаёт их
def parse_links(facultets): def parse_links(facultets):
session = requests.Session() session = requests.Session()
@@ -27,7 +32,7 @@ def parse_links(facultets):
} }
) )
EXCEL_LINKS = {} EXCEL_LINKS = []
for facultet in facultets: for facultet in facultets:
url = RASP_PREFIX + facultet url = RASP_PREFIX + facultet
print("getting...") print("getting...")
@@ -38,19 +43,19 @@ def parse_links(facultets):
# Ищем все теги <a>, у которых атрибут href соответствует нашему паттерну # Ищем все теги <a>, у которых атрибут href соответствует нашему паттерну
excel_tags = soup.find_all('a', href=excel_pattern) excel_tags = soup.find_all('a', href=excel_pattern)
excel_links = [tag.get('href') for tag in excel_tags] for a in excel_tags:
last_changed = sibling_clear_to_date(a.next_sibling)
url = urljoin(BASE_URL, a.get('href'))
disp = a.decode_contents()
record = {
"uniqpath": f"vstu.ru/rasp?dep={facultet}/{disp.strip()}",
"facultet": facultet,
"url": url,
"display_filename": disp,
"last_changed": last_changed
}
print("Found in vstu.ru: ", record)
EXCEL_LINKS.append(record)
# Предположим, вы уже получили excel_links из одного из методов выше return sorted(EXCEL_LINKS, key=lambda x: x['url'])
# excel_links = ['../../../upload/raspisanie/z/ОН_ХТФ_1 курс.xlsx', ...]
absolute_links = [urljoin(BASE_URL, relative_link) for relative_link in excel_links]
if facultet not in EXCEL_LINKS.keys():
EXCEL_LINKS[facultet] = set()
for excel_url in absolute_links:
EXCEL_LINKS[facultet].add(excel_url)
print(f"+url {excel_url}")
return EXCEL_LINKS

417
main.py
View File

@@ -4,133 +4,93 @@
import json import json
import pika
import os import os
import random
import time import time
import traceback import traceback
import uuid import uuid
import aigenerated
import parser import parser
import translations import translations
import utils import utils
import json import json
import links_parser import links_parser
import shutil import shutil
from dotenv import load_dotenv
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule")
try:
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME,
exchange_type='topic',
durable=True)
except Exception as e:
print("Failed to connect RabbitMQ")
traceback.print_exception(e)
def currt(): def currt():
return round(time.time()) return round(time.time())
FACULTETS = [ FACULTETS = sorted([
"asp", "mag", "fastiv", "fat", "ftkm", "ftpp", "feu", "fevt", "htf", "vkf", "mmf", "fpik" "asp", "mag", "fastiv", "fat", "ftkm", "ftpp", "feu", "fevt", "htf", "vkf", "mmf", "fpik"
] ])
DIRNAME = "excels" DIRNAME = "excels"
PARSED_DIR = "parsed"
DEBUG_ONE_FAC = None #'htf' DEBUG_ONE_FAC = None #'fevt'
result_groups = {} DEBUG_NO_SAVE_STATES = False
result = {
"version": 1,
"notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: Данные, доступ к API и т.д. предоставляется КАК-ЕСТЬ (AS-IS) без каких либо, явно или не явно подразумеваемых гарантий.\n\nПарсер написал: Миронов Станислав\n\nИсточник данных: https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php",
"actual_at": round(time.time()),
"documentation": "https://fazziclay.com/api/v1/vstu_schedule_parser/scheme.json",
"daypicture": "QwQ",
"daycite": "running on a rope",
"contact": "https://fazziclay.com/",
"university": "VSTU",
"university_site": "https://www.vstu.ru/",
"source": "https://fazziclay.com/api/v1/vstu_schedule_parser/result.json",
"stat": {
"total_parsing_time": -1,
},
"api_notices": {
"updated_at": 1757688552,
"text": "Пожалуйста сохраняйте 'updated_at', это время изменения ЭТОГО текста. Тут возможно будут появлятся важные BREAKING CHANGES и дедлайны к ним.\nПо хорошему если updated_at другой по сравнению с вашем кэшем это сообщение должно отправляться вам в телеграм как уведомление о поедстоящих изменениях\nwarning=True значит 'text' содержит важное а не как щас hint.\n\n ~fazziclay aka Stanislav;",
"warning": False,
"tut-plavayuschaya-struktura": "required only 'updated_at', 'text' and 'warning'"
},
"doubled_groups": [],
"debug": {
"bleu~~": 1
},
"excels": [],
"facultets": FACULTETS,
"emptykey1": "", parser.LOGGING = LOGGING = True
"emptykey2": "",
"groups": result_groups,
"emptykey3": "",
"emptykey4": "",
"see_header_at_top_of_this_file": "SEE TOP OF THIS FILE | ОБРАТИТЕ ВНИМАНИЕ НА ВЕРХ ЭТОГО ФАЙЛА"
}
def process_excel_file(facultet, excel_url, counter, timeid):
is_xlsx = excel_url.endswith(".xlsx")
filename = f"{DIRNAME}/" + timeid + f"_[C{counter}]_" + facultet + ".xls" + ("x" if is_xlsx else "")
excel_info = {
"filename": excel_url.split("/")[-1],
"url": excel_url,
"download_place": filename,
"stat": {
"download": -1,
"create_reader": -1,
"parse": -1,
"cycles": 0
},
"group_names_parsed": [],
"facultet": facultet,
"counter": counter
}
parser.LOGGING = False
def parse_sheets(download_place):
to_return = {}
try: try:
t = utils.StepTimeCounter() reader = translations.create_reader(download_place)
aigenerated.download_file_from_url(excel_url, filename)
excel_info["stat"]['download'] = t.step()
reader = translations.create_reader(filename)
print("Reader info") print("Reader info")
print(reader.info()) print(reader.info())
excel_info["stat"]['create_reader'] = t.step()
while True: while True:
excel_info['stat']['cycles'] += 1 t = utils.StepTimeCounter()
print(f"Parsing sheet №{reader.get_sheet_index()+1} (from 1)") print(f"Parsing sheet №{reader.get_sheet_index()+1} (from 1)")
sheet_dict = {
"index": reader.get_sheet_index(),
"name": reader.get_sheet_name(),
"reader_info": reader.info(),
"groups": {}
}
to_return["SHEET_"+str(reader.get_sheet_index())] = sheet_dict
prs = parser.Parser(reader) prs = parser.Parser(reader)
print("Parser created; parser.parse();") print("Parser created; parser.parse();")
prs.parse() prs.parse()
print("parsed done!") print("parsed done!")
sheet_dict['parse_time'] = round(t.step())
if len(prs.raw_no_schedule) > 0:
sheet_dict["other_raws"] = prs.raw_no_schedule
if len(prs.features) > 0:
sheet_dict["features"] = sorted(prs.features)
if prs.parser_error is not None: if prs.parser_error is not None:
excel_info["parser_error_cycle_" + str(excel_info['stat']['cycles'])] = prs.parser_error sheet_dict["parser_error"] = prs.parser_error
if prs.parser_warnings is not None and len(prs.parser_warnings) > 0:
sheet_dict["parser_warnings"] = prs.parser_warnings
for group_name in prs.groups.keys(): for group_name_key in prs.groups.keys():
if group_name in result_groups.keys(): gr = prs.groups[group_name_key]
print(f" -- WTF -- Doubled groups -- name: {group_name}") sheet_dict['week_keys_metadata'] = prs.week_keys_metadata
if 'warning_doubled_groups_skip' not in excel_info.keys(): sheet_dict['groups'][group_name_key] = gr
excel_info['warning_doubled_groups_skip'] = []
excel_info['warning_doubled_groups_skip'].append(group_name)
result['doubled_groups'].append(group_name)
print(f"Populates {len(prs.groups)} groups: " + " ".join(prs.groups.keys()))
continue
gr = result_groups[group_name] = prs.groups[group_name]
gr['facultet'] = facultet
gr['data_source'] = excel_url.split("/")[-1]
gr['debug'] = {
"counter": counter,
"timeid": timeid,
"excel_url": excel_url,
"reader_info": reader.info(),
"reader_sheet_index": reader.get_sheet_index(),
"filename": filename
}
excel_info["group_names_parsed"].append(group_name)
print(f"Populates {len(prs.groups)} groups to result: " + " ".join(prs.groups.keys()))
if not reader.has_next_sheet(): if not reader.has_next_sheet():
print("File ended") print("File ended")
@@ -138,33 +98,60 @@ def process_excel_file(facultet, excel_url, counter, timeid):
else: else:
reader.next_sheet() reader.next_sheet()
print("Next sheet!") print("Next sheet!")
excel_info["stat"]['parse'] = t.step()
except Exception as e: except Exception as e:
print(f"Error while {excel_url}")
print(e) print(e)
traceback.print_exc() traceback.print_exc()
u = uuid.uuid4() u = uuid.uuid4()
excel_info['error'] = { to_return['error'] = {
"smile": ":(", "smile": ":(",
"error_message": str(e), "error_message": str(e),
"log_anchor": str(u), "log_anchor": str(u),
"time": currt() "time": currt()
} }
print(f"Log Anchor: {u}") print(f"Log Anchor: {u}")
faileds.append({
"ex": e, return to_return
"fac": facultet,
"url": excel_url
})
result['excels'].append(excel_info)
faileds = [] def parsed_file_path(excel_filename: str):
def main(): format = excel_filename.split(".")[-1]
fl = format.lower()
if fl not in ["json", "xls", "xlsx"]:
print(f"Unknown filename format: {excel_filename}")
return
if fl != "json":
excel_filename = excel_filename.replace("." + format, ".json")
excel_filename = excel_filename.lower()
filepath = PARSED_DIR + os.path.sep + excel_filename
return filepath
def load_parsed_state(excel_filename):
filepath = parsed_file_path(excel_filename)
if not os.path.exists(filepath):
return
with open(filepath, "r", encoding="utf-8") as fp:
return json.load(fp=fp)
def save_parsed_state(excel_filename, obj):
filepath = parsed_file_path(excel_filename)
if DEBUG_NO_SAVE_STATES:
print("Saved! (fake because DEBUG_NO_SAVE_STATES)")
with open(filepath, "w", encoding="utf-8") as fp:
json.dump(obj, fp=fp, ensure_ascii=False, sort_keys=True)
print(f"Saved parsed state to '{filepath}'")
def run_session():
faileds = []
t = utils.StepTimeCounter() t = utils.StepTimeCounter()
# Delete tempdir
try: try:
try: try:
shutil.rmtree(DIRNAME) shutil.rmtree(DIRNAME)
@@ -177,42 +164,206 @@ def main():
print(f"Failed create '{DIRNAME}': ") print(f"Failed create '{DIRNAME}': ")
raise e raise e
print("main(); parse links starting...") print("main(); parse links starting...")
EXCEL_LINKS = links_parser.parse_links(FACULTETS if DEBUG_ONE_FAC is None else [DEBUG_ONE_FAC]) EXCEL_LINKS = links_parser.parse_links(FACULTETS if DEBUG_ONE_FAC is None else [DEBUG_ONE_FAC])
counter = 0
timeid = str(round(time.time())) if len(EXCEL_LINKS) < 5 and not DEBUG_ONE_FAC:
for facultet in EXCEL_LINKS.keys(): raise Exception("Safety exception: excel links count < 5; maybe in vstu.ru tech works")
counter += 1000
print(f"\n\n-- Факультет '{facultet}' --")
facultet_urls = EXCEL_LINKS[facultet] last_changeds = set()
for excel_url in facultet_urls: states = []
counter += 1 changed = False
print(f"\n\n-- Ссылка --") for excel_dict in EXCEL_LINKS:
print(f"{excel_url}") try:
last_changeds.add(excel_dict['last_changed'])
excel_url = excel_dict['url']
facultet = excel_dict['facultet']
excel_filename = excel_url.split("/")[-1]
excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1]
print(f"Processing {facultet} {excel_filename}")
state = load_parsed_state(excel_filename)
is_new = state is None
if is_new:
state = {}
print("Start processing excel file") else:
process_excel_file(facultet, excel_url, counter, timeid) same_date = False
print("Excel file processing done!") try:
same_date = state['excel']['last_changed'] == excel_dict['last_changed']
print(f"Excel[{excel_filename}]: inServer={excel_dict['last_changed']}, inState={state['excel']['last_changed']} same={same_date}")
except Exception as e:
print(f"Excel[{excel_filename}]: failed testify last_changed")
r = "parser.excel_found." + ("same" if same_date else "different") + "." + facultet
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=r,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_found",
"same": same_date,
"excel_dict": excel_dict
}, ensure_ascii=False).encode('utf-8')
)
print(f"RabbitMQ published r={r}")
if same_date:
state['actual_at'] = currt()
try:
del state['excel']['different_in_this_session']
except: pass
states.append(state)
save_parsed_state(excel_filename, state)
continue
changed = True
excel_dict['different_in_this_session'] = True
state['actual_at'] = currt()
state['excel'] = excel_dict
is_xlsx = excel_url.endswith(".xlsx")
download_place = f"{DIRNAME}/" + excel_filename + "_" + facultet + ".xls" + ("x" if is_xlsx else "")
utils.download_file_from_url(excel_url, download_place)
sha1hash = utils.calculate_sha1(download_place)
state['excel']['sha1hash'] = sha1hash
state['sheets'] = parse_sheets(download_place)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.excel_parsed." + facultet,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_parsed",
"is_new": is_new,
"state": state
}, ensure_ascii=False).encode('utf-8')
)
save_parsed_state(excel_filename, state)
states.append(state)
except Exception as e:
faileds.append({
"uuid": str(uuid.uuid4()),
"exception": str(e),
"traceback": traceback.format_exception(e),
"context": f"Failed process excel file {excel_dict['url']}"
})
traceback.print_exception(e)
print("Saving result.json")
result['stat']['total_parsing_time'] = t.step() with open("parser.json", 'w', encoding="utf-8") as fp:
lc = {"*_x": ":("}
try:
s = sorted(last_changeds)
lc = {
"early": s[0],
"newly": s[-1]
}
except: pass
json.dump({
"last_changeds": lc,
"actual_at": currt(),
"all_files": EXCEL_LINKS,
"faileds": faileds
}, fp=fp, ensure_ascii=False)
if changed:
all_files = states
d = {
"version": 2,
"notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ",
"contact": "https://fazziclay.com/ или fazziclay@gmail.com",
"api_notices": {
"just_save_and_check_diffs": "просто сохраните и проверяйте разницу"
},
"actual_at": currt(),
"all_files": sorted(all_files, key=lambda d: d['excel']['url']),
"faileds": faileds
}
with open("result_v2.json", 'w', encoding="utf-8") as fp:
json.dump(d, fp=fp, ensure_ascii=False)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.result_v2",
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "schedule_result_v2",
"data": d
}, ensure_ascii=False).encode('utf-8')
)
json.dump(result, open('result.json', 'w'), indent=2, ensure_ascii=False) # Delete a non-empty directory and its contents
print("Saved to result.json indent=2")
json.dump(result, open('result-no-indent.json', 'w'), ensure_ascii=False)
print("Saved to result-no-indent.json")
print("Faileds:")
print(faileds)
# Delete a non-empty directory and its contents
try: try:
shutil.rmtree(DIRNAME) shutil.rmtree(DIRNAME)
print(f"Directory '{DIRNAME}' and its contents deleted successfully.") print(f"Directory '{DIRNAME}' and its contents deleted successfully.")
except Exception as e: except Exception as e:
print(f"Error deleting directory '{DIRNAME}': {e}") print(f"Error deleting directory '{DIRNAME}': {e}")
return {"changed": changed}
def check_dirs():
if not os.path.exists(PARSED_DIR):
os.mkdir(PARSED_DIR)
def main():
while True:
t = utils.StepTimeCounter()
err = None
sess = None
try:
check_dirs()
print("BEGIN run_session();")
sess = run_session()
print("END run_session();")
if DEBUG_ONE_FAC:
print("DEBUG_ONE_FAC; break infinity-loop")
break
except Exception as e:
err = e
print("Exception in run_session();")
traceback.print_exception(e)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.session_end." + ('complete' if err is None else 'failed'),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "session_end",
"err": str(err) if err else None,
"duration": t.step(),
"session": sess
}, ensure_ascii=False).encode('utf-8')
)
print("Sleep for 30 minutes")
time.sleep(60*30)
print("Wake up!")
if __name__ == "__main__": if __name__ == "__main__":

531
parser.py
View File

@@ -3,223 +3,393 @@
PAIR_NUMS = [ PAIR_NUMS = [
"1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16" "1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16"
] ]
WEEKDAYS_STARTSWITH = [
"понед",
"вторник",
"среда",
"четверг",
"пятница",
"суббота",
"воскр"
]
BAD_GROUP_NAMES = [
"янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг"
]
from datetime import time
import json import json
import uuid
import aigenerated
from coord import Coord, Merged from coord import Coord, Merged
from translations import ExcelSheetReader from translations import ExcelSheetReader
import utils import utils
from collections import defaultdict
LOGGING = True LOGGING = False
def pprint(*args, **kwargs): def pprint(*args, **kwargs):
if LOGGING: if LOGGING:
print(*args, **kwargs) print(*args, **kwargs)
def is_weeknum(text):
for wd in WEEKDAYS_STARTSWITH:
if text.strip().replace(" ", "").lower().startswith(wd):
return True
return False
def is_pair(text):
for p in PAIR_NUMS:
if text.strip().replace(" ", "").lower().startswith(p):
return True
return False
class Parser: class Parser:
def __init__(self, reader: ExcelSheetReader): def __init__(self, reader: ExcelSheetReader):
self.reader = reader self.reader = reader
self.groups = {} self.groups = {} # Группы которые удалось распарсить
self.teachers = set() self.features = set() # фичи данной страницы
self.places = set() self.week_keys_metadata = {} # календарик
self.parser_error = None self.schedule_range_row = None # [min, max] диапазон col включительно где расписание
self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..)
self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata)
self.parser_error = None # ошибка парсера перед выходом
self.parser_warnings = [] # предупреждения парсера
pprint("Parser created for '{0}'".format(reader.info())) pprint("Parser created for '{0}'".format(reader.info()))
def parse(self): def parse(self):
monday = self.reader.find("ПОНЕДЕЛЬНИК") # Характерные признаки разных сеток
if monday is None: no_pair_numeration = False
col_distance_pair_weekday = None
weekday_firstly_calendar = False
first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True)
if first_weekday is None:
self.features.add("no_weekdays")
print(" -- Failed parse! -- ") print(" -- Failed parse! -- ")
print("ПОНЕДЕЛЬНИК НЕ НАЙДЕН!") print("дни недели не найдены!")
self.parser_error = "'ПОНЕДЕЛЬНИК' не найден в таблице." self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет."
self.parse_raw_no_schedule()
return return
head_rx = monday.row - 1 # выше первого понидельника pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True)
if pair_num_any is None:
no_pair_numeration = True
self.features.add("no_pair_numeration")
self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}")
else:
self.features.add("pair_numeration")
col_distance_pair_weekday = pair_num_any.col - first_weekday.col
head_rx = first_weekday.row - 1 # выше первого понидельника
group_col_start = first_weekday.col + 2
if col_distance_pair_weekday is not None:
if col_distance_pair_weekday > 1:
weekday_firstly_calendar = True
self.features.add("weekdays_before_calendar")
group_col_start = pair_num_any.col + 1
if head_rx < 0: if head_rx < 0:
raise Exception("head_rx < 0: Программа пыталась найти 'ПОНЕДЕЛЬНИК', но по всей видимости не нашла.") raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.")
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups) head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head={head}") pprint(f"head={head}")
self.groups = parse_groups(self.reader, head, monday, head_rx) # parse groups to self.groups
head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()])
print(head_joined)
if (len(head_joined) == 0) or "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined:
head_rx -= 1
self.raw_no_schedule.append(head_joined)
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head (upper)={head}")
self.features.add("post_groups_info_row")
self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups
pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}') pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}')
pprint("\n\n\n") pprint("\n\n\n")
for group in self.groups.values(): for group in self.groups.values():
pprint("\nSTART OF PROCESS GROUP\n") pprint("\nSTART OF PROCESS GROUP\n")
self.process_group(group, monday) self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None)
pprint("\nEND OF PROCESS GROUP\n") pprint("\nEND OF PROCESS GROUP\n")
pprint(self.teachers) # week metadatas parse
S = 9999999
group_min_col = S
group_min_row = S
for x in self.groups.values():
p = x['position']
group_min_row = min(p[0], group_min_row)
group_min_col = min(p[1], group_min_col)
if group_min_row != S and group_min_col != S:
pprint("Process weekmetadatas!")
self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col))
# parse no-schedule raws (согласовано, и т.д.)
self.parse_raw_no_schedule()
def parse_potokoviy(self, merged: Merged):
speaker = None def parse_raw_no_schedule(self):
location = None """Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule"""
if self.schedule_range_row is None:
self.schedule_range_row = [999999999, 999999999] # прекрасное далёко
row = 0
while row < self.reader.get_row_count():
if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]:
row = self.schedule_range_row[1] + 1
row_values = self.reader.get_row_values(row)
row_values = [v for v in row_values if isinstance(v, str) and v.strip()]
if len(row_values) > 0:
self.raw_no_schedule.append(row_values)
row += 1
# speaker def process_weekmetadatas(self, first_group: "Coord"):
low = merged.low """Обработать календарик"""
speaker_pos = low.shift(down=merged.height()) for x in self.weeknums.keys():
speaker = speaker_pos.cell(self.reader).value pprint(x)
set_of_merged: set = self.weeknums[x]
l = len(set_of_merged)
if l != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<")
continue
weekday_merged: Merged = set_of_merged.pop()
if weekday_merged.width() != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})")
continue
month_row = first_group.row
curr_col = first_group.col - 1
while curr_col >= 0:
month_pos = Coord(month_row, curr_col)
month_cell = month_pos.cell(self.reader)
if month_cell.is_empty():
pprint("month cell is empty")
curr_col -= 1
continue
month_name = str(month_cell.value).strip()
pprint(month_cell)
all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height())
pprint(f"all_nums_of_month={all_nums_of_month}")
if (x not in self.week_keys_metadata.keys()):
self.week_keys_metadata[x] = {}
if (month_name not in self.week_keys_metadata[x].keys()):
self.week_keys_metadata[x][month_name] = []
for x2 in all_nums_of_month:
if x2.lower() == month_name.lower():
pprint(f"Skip {x2} month number because it == month_name")
continue
m = self.week_keys_metadata[x][month_name]
if x2 not in m:
try:
m.append(str(x2).replace(".0", ""))
except:
m.append(x2)
curr_col -= 1
# location def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"):
location = merged.high.shift(down=1).cell(self.reader).value self.weeknums[week_key_name].add(merged)
def row_with_schedule_notify(self, row_coord):
"""Вызывается каждый раз когда в переданной row обранужено расписание"""
if self.schedule_range_row is None:
self.schedule_range_row = [row_coord, row_coord]
if self.schedule_range_row[1] < row_coord:
self.schedule_range_row[1] = row_coord
if self.schedule_range_row[0] > row_coord:
self.schedule_range_row[0] = row_coord
return {"loc": str(location).strip(), "leader": str(speaker).strip(), "name": str(merged.cell(self.reader).value).strip()} def process_group(self, group: dict, first_weekday: Coord, pair_pos_col):
def process_group(self, group, monday):
""" """
Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups) Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups)
group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'} group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'}
""" """
pprint(f"process_group group={group}") pprint(f"process_group group={group}")
group_name = group['name'] group_name = group['name']
pprint(group_name) pprint(F"Имя группы: {group_name}")
row = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля) row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
self.row_with_schedule_notify(group['position'][0])
group_header_pos = Coord(group['position'][0], group['position'][1])
width = group['width']
weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии. weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии.
previous_pair = None
while row < self.reader.get_row_count(): # maybe условие чтобы не уйти ниже чем есть строк weekcycles = 0
pos = Coord(row, group['position'][1]) # текущая позиция, верхний левый угол (=low) while row_c1 < self.reader.get_row_count():
pprint(f"while pos={pos}") pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low)
pos_right = pos.shift(right=3) self.row_with_schedule_notify(pos_c1.row)
pair_pos = pos.replace(col=5)
weekday_pos = pos.replace(col=4)
merged = self.reader.get_merged_coord(pos)
merged_cell = merged.cell(self.reader)
cv = merged_cell.value
# В конце (12 пара:>) название группы, можно использовать как якорь
if utils.unspace(cv) == group_name:
pprint("Lesson == group name; ending group loop.")
break
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = utils.unspace(weekday_mr.cell(self.reader).value)
pair_mr = self.reader.get_merged_coord(pair_pos)
pair = utils.unspace(pair_mr.cell(self.reader).value)
skip = 0
if weekday == "":
if weeknum == 1:
weeknum += 1
pprint("------")
skip = 1
row += 1
else:
break
if not skip: if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name):
next = 3 # на сколько пыгнуть для следующего шага? pprint("Ended with grpup name; stop moving down, break")
break
weekday_pos = pos_c1.replace(col=first_weekday.col)
weekday_cell = weekday_pos.cell(self.reader)
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = weekday_cell.value
if not is_weeknum(weekday):
row_c1 += 1
pprint("Not weeknum!")
if weekcycles > 0:
if (weeknum != 2):
pprint("Weeknum now 2")
weekday = 0
weeknum = 2
continue
pprint(weekday)
weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2")
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
# state
event_no = 1
is_widely = False
override_col_range = None
all_raw = set()
pairs = set()
times = []
first_coord = None
row_c2 = row_c1
while row_c2 <= weekday_mr.high.row:
pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low)
cell_c2 = pos_c2.cell(self.reader)
mr_c2 = self.reader.get_merged_coord(pos_c2)
is_empty_lesson = len(utils.parse_all_dirt(self.reader, pos, 4, 3)) == 0 # если в поле не найдено ничего.. if first_coord is None:
parsed_discipline_name = None first_coord = pos_c2.row
parsed_location = None
parsed_leader = None
pairs = 1
wtf_tomanypairs = False
is_solid = pos_right in merged
parsed_uncotigorized = []
is_wide_maybe_potokoviy = merged.width() > 4 # потоковая ли лекция (занимает несколько групп.)
if not is_empty_lesson:
cur = pos.shift(down=2)
while utils.has_no_bottom_border(self.reader, cur):
next += 3
pairs += 1
pprint(f"next = {next} cur={cur}")
if pairs >= 7:
wtf_tomanypairs = True
break
cur = cur.shift(down=3)
if is_wide_maybe_potokoviy:
ret = self.parse_potokoviy(merged)
parsed_location = ret['loc']
parsed_leader = ret['leader']
parsed_discipline_name = ret['name']
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, merged.width(), next))
else:
if (is_solid):
parsed_discipline_name = cv
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, 4, next))
# попытка исправить пару (1-2) если пустая.
fuck_empty_pair_in_excel = pair == ""
previous_dump = previous_pair
if fuck_empty_pair_in_excel:
if previous_pair is None or previous_pair == "":
pair = f"EMPTY_IN_EXCEL_{uuid.uuid4()}"
else:
pair = utils.next_element(PAIR_NUMS, previous_pair)
if pair != "": pair_num = None
previous_pair = pair if next == 3 else None # костыль чтобы избежать гипотетически не верной даты. pair_num_mr = None
if pair_pos_col is not None:
pair_num = pos_c2.replace(col=pair_pos_col)
pair_num_mr = self.reader.get_merged_coord(pair_num)
if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1):
is_widely = True
override_col_range = (mr_c2.low.col, mr_c2.high.col)
col_low = group_header_pos.col
col_high = group_header_pos.col + width - 1
if override_col_range is not None:
col_low = min(col_low, override_col_range[0])
col_high = max(col_high, override_col_range[1])
# пытаемся из некотегорезированных данных выцепить место и лидера (препода) dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True)
prepods = set() if len(dirty_line) > 0:
if parsed_leader is not None: prepods.add(parsed_leader.strip()) if pair_num_mr is not None:
pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip()
locations = set() if len(pair_num_to_add) == 0:
if parsed_location is not None: locations.add(parsed_location.strip().replace(" ", "")) pair_num_to_add = "???"
pprint("Составители эксельки? Вы почему не указали номер пары ёклмн")
for x in list(parsed_uncotigorized): pairs.add(pair_num_to_add)
if aigenerated.is_surname_string(x):
prepods.add(x.strip())
if aigenerated.is_room_number(x):
locations.add(x.strip().replace(" ", "") if x is not None else None)
# попытка починить пустую дисциплину
if parsed_discipline_name is None:
l = sorted(utils.remove_from_list(list(parsed_uncotigorized), list(locations | prepods | set([parsed_location, parsed_leader]))))
parsed_discipline_name = " ".join(l)
# чистим сеты от мусора
utils.discards_list(prepods, nones=True, emptystrings=True)
utils.discards_list(locations, nones=True, emptystrings=True)
utils.discards_list(parsed_uncotigorized, nones=True, emptystrings=True)
# если не пустой предмет то записываем его
if not is_empty_lesson:
slots = group['slots']
w = weekday + ("_1" if weeknum == 1 else "_2")
if w not in slots.keys():
slots[w] = {}
today = slots[w] for cell in dirty_line:
today[pair] = { if not cell.is_time:
"excel_pos": str(pos), all_raw.add(str(cell.value))
"discipline_name": parsed_discipline_name.strip(), else:
"locations": sorted(locations), dt: time = cell.value
"leads": sorted(prepods), times.append(str(dt))
"is_solid": is_solid,
"time_coeff": pairs,
"is_flow": is_wide_maybe_potokoviy,
"lefttopmerged": {
"width": merged.width(),
"height": merged.height(),
"excel_range": utils.merged_humanize(merged.as_numbers())
},
"raw": sorted(parsed_uncotigorized),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if fuck_empty_pair_in_excel:
today[pair]['pair_num_empty'] = {
"prev": previous_dump,
"restored": pair != "",
"pair": pair
}
if wtf_tomanypairs:
today[pair]['to_many_parsing_time_coeff'] = True
# INCREMENT на next и конец цикла. def clean_state():
row += next nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord
is_widely = False
override_col_range = None
event_no += 1
all_raw = set()
pairs = set()
first_coord = None
times = []
if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0):
if not (len(all_raw) == 0):
# this code last for current state event
pprint(f"{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}")
slots = group['slots']
w = weekday_key_name
if w not in slots.keys():
slots[w] = {}
pair_name = "????"
try:
pair_name = sorted(pairs)[0]
except: pass
obj = {
"object": "event",
"pairs": sorted(pairs),
"is_flow": is_widely,
"excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)),
"raw": sorted(all_raw),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if len(times) > 0:
obj['times'] = times
def smart_insert(first_dict, key, to_insert):
if key not in first_dict.keys():
first_dict[key] = {}
if isinstance(first_dict[key], dict):
if len(first_dict[key].keys()) == 0:
first_dict[key] = to_insert
else:
p = first_dict[key]
first_dict[key] = [p, to_insert]
elif isinstance(first_dict[key], list):
first_dict[key].append(to_insert)
else:
self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)")
if pair_pos_col is None:
smart_insert(slots, w, obj)
else:
smart_insert(slots[w], pair_name, obj)
# here may be a empty all_raw
clean_state()
first_coord = None
if row_c2 >= weekday_mr.high.row:
clean_state()
pprint("Last for weekday")
def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx): row_c2 += 1
row_c1 += weekday_mr.height()
weekcycles += 1
def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx):
"""Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)""" """Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)"""
groups = {} groups = {}
i = 0 i = 0
@@ -227,21 +397,26 @@ def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx):
x = head[i] x = head[i]
pprint(f"while i={i} head[i]={x}") pprint(f"while i={i} head[i]={x}")
merged = reader.get_merged_coord(Coord(head_rx, i)) merged = reader.get_merged_coord(Coord(head_rx, i))
if i > monday.col + 1: if i >= col_start:
if merged is None or x == "": if merged is None or x == "" or x is None:
break
if merged.width() != 4:
pprint(f"WARNING: group header witdh !=4 (found: {merged.width()}); blocks !=4 not supported by parser.")
break break
name = utils.unspace(x) name = utils.unspace(x)
groups[name] = { skip = False
"name": name, if "-" not in name:
"position": [head_rx, i], for x in BAD_GROUP_NAMES:
"position_human": utils.merged_humanize(merged.as_numbers()), if x in name.lower():
"slots": {} skip = True
} pprint(f"Skip groupname {name} because not dash in name and in blacklist")
if not skip:
groups[name.lower()] = {
"name": name,
"position": [head_rx, i],
"width": merged.width(),
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
if merged is None: if merged is None:
i += 1 i += 1

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
openpyxl
xlrd
beautifulsoup4
requests
pika
python-dotenv

View File

@@ -1,6 +1,7 @@
# --- Абстрактный базовый класс (Контракт) --- # Copyright Stanislav Mironov
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import time
import openpyxl import openpyxl
import xlrd import xlrd
@@ -10,13 +11,23 @@ from coord import Coord, Merged
EMPTY_CTYPES = [xlrd.XL_CELL_EMPTY, xlrd.XL_CELL_BLANK] EMPTY_CTYPES = [xlrd.XL_CELL_EMPTY, xlrd.XL_CELL_BLANK]
class TranschendentnostCell: class TranschendentnostCell:
def __init__(self, value, is_empty): def __init__(self, value, is_empty, is_time=False):
self.value = value self.value = value
self.is_time = isinstance(value, time) or is_time
self._is_empty = is_empty self._is_empty = is_empty
def is_nospace_nocase_same(self, query):
try:
if self.value.lower().replace(" ", "").strip() == query.lower().replace(" ", "").strip():
return True
except: pass
return False
def is_empty(self): def is_empty(self):
return self._is_empty return self._is_empty
# --- Абстрактный базовый класс (Контракт) ---
class ExcelSheetReader(ABC): class ExcelSheetReader(ABC):
""" """
Абстрактный базовый класс, определяющий интерфейс для чтения данных из листа Excel. Абстрактный базовый класс, определяющий интерфейс для чтения данных из листа Excel.
@@ -28,6 +39,10 @@ class ExcelSheetReader(ABC):
@abstractmethod @abstractmethod
def get_sheet_index(self): def get_sheet_index(self):
pass pass
@abstractmethod
def get_sheet_name(self):
pass
@abstractmethod @abstractmethod
def has_next_sheet(self): def has_next_sheet(self):
@@ -71,16 +86,28 @@ class ExcelSheetReader(ABC):
return "TODO: info" return "TODO: info"
@abstractmethod @abstractmethod
def cell(self, row, col): def cell(self, row, col) -> TranschendentnostCell:
"""Возвращает абстрактную клетку""" """Возвращает абстрактную клетку"""
pass pass
def find(self, query = None): def find(self, query = None, startswith=False, nospace=False):
return self.find_any([query], startswith=startswith, nospace=nospace)
def find_any(self, query = None, startswith=False, nospace=False):
for rx in range(self.get_row_count()): for rx in range(self.get_row_count()):
i = 0 i = 0
for x in self.get_row_values(rx): for x in self.get_row_values(rx):
if x == query: if nospace:
return Coord(rx, i) x = str(x).replace(" ", "").strip()
for query_selected in query:
if x == query_selected:
return Coord(rx, i)
elif startswith:
try:
if str(x).lower().startswith(query_selected.lower()):
return Coord(rx, i)
except: pass
i += 1 i += 1
return None return None
@@ -104,7 +131,6 @@ class ExcelSheetReader(ABC):
# --- Реализация №1: Обертка для xlrd --- # --- Реализация №1: Обертка для xlrd ---
class XlrdSheetReader(ExcelSheetReader): class XlrdSheetReader(ExcelSheetReader):
def __init__(self, file_path, sheet_index=0): def __init__(self, file_path, sheet_index=0):
super().__init__(file_path) super().__init__(file_path)
@@ -117,6 +143,9 @@ class XlrdSheetReader(ExcelSheetReader):
def init_sheet(self): def init_sheet(self):
self.sheet = self.book.sheet_by_index(self.sheet_index) self.sheet = self.book.sheet_by_index(self.sheet_index)
def get_sheet_name(self):
return self.sheet.name
def has_next_sheet(self): def has_next_sheet(self):
return self.sheet_index < len(self.book.sheet_names())-1 return self.sheet_index < len(self.book.sheet_names())-1
@@ -140,7 +169,24 @@ class XlrdSheetReader(ExcelSheetReader):
def cell(self, row, col): def cell(self, row, col):
"""Возвращает абстрактную клетку""" """Возвращает абстрактную клетку"""
c = self.sheet.cell(row, col) c = self.sheet.cell(row, col)
return TranschendentnostCell(c.value, c.ctype in EMPTY_CTYPES) is_empty = c.ctype in EMPTY_CTYPES
is_time = c.ctype == xlrd.XL_CELL_DATE
value = c.value
if is_empty:
value = ""
elif is_time:
if isinstance(value, float):
if value <= 1:
seconds = round(value * 86400)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
value = time(hour=hours, second=seconds, minute=minutes)
else:
print(f"TODO: value is {value} its unix? not 0.xxxxxxxx")
else:
is_time = False
print("IsTime but not float!")
return TranschendentnostCell(value, is_empty, is_time=is_time)
def get_border_style(self, coord: Coord, side): def get_border_style(self, coord: Coord, side):
row = coord.row row = coord.row
@@ -173,7 +219,6 @@ class XlrdSheetReader(ExcelSheetReader):
# --- Реализация №2: Обертка-транслятор для openpyxl --- # --- Реализация №2: Обертка-транслятор для openpyxl ---
class OpenpyxlSheetReader(ExcelSheetReader): class OpenpyxlSheetReader(ExcelSheetReader):
def __init__(self, file_path, sheet_name=None): def __init__(self, file_path, sheet_name=None):
super().__init__(file_path) super().__init__(file_path)
@@ -192,6 +237,9 @@ class OpenpyxlSheetReader(ExcelSheetReader):
def get_sheet_index(self): def get_sheet_index(self):
return self.sheet_index return self.sheet_index
def get_sheet_name(self):
return self.workbook.sheetnames[self.sheet_index]
def has_next_sheet(self): def has_next_sheet(self):
return self.sheet_index < len(self.workbook.sheetnames)-1 return self.sheet_index < len(self.workbook.sheetnames)-1
@@ -221,7 +269,7 @@ class OpenpyxlSheetReader(ExcelSheetReader):
c = self._get_cell(row, col) c = self._get_cell(row, col)
is_empty = (c.value is None) is_empty = (c.value is None)
return TranschendentnostCell("" if is_empty else c.value, is_empty) return TranschendentnostCell("" if is_empty else c.value, is_empty, is_time=isinstance(c.value, time))
def get_cell_value(self, row, col): def get_cell_value(self, row, col):
cell = self._get_cell(row, col) cell = self._get_cell(row, col)
@@ -260,8 +308,7 @@ class OpenpyxlSheetReader(ExcelSheetReader):
return [] return []
# --- Фабричная функция (Ваша единственная точка входа) --- # --- Фабричная функция (единственная точка входа) ---
def create_reader(file_path, **kwargs) -> ExcelSheetReader: def create_reader(file_path, **kwargs) -> ExcelSheetReader:
""" """
Создает и возвращает подходящий экземпляр ридера в зависимости от расширения файла. Создает и возвращает подходящий экземпляр ридера в зависимости от расширения файла.

142
utils.py
View File

@@ -1,12 +1,82 @@
# Copyright Stanislav Mironov # Copyright Stanislav Mironov
import time import time
import xlrd import xlrd
from coord import Coord, Merged from coord import Coord
from translations import ExcelSheetReader from translations import ExcelSheetReader
import re
import hashlib
import requests
from urllib.parse import urlsplit, urlunsplit, quote
def download_file_from_url(url, output_filename):
"""
Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем.
Args:
url (str): Исходный URL, который может содержать пробелы и кириллицу.
output_filename (str): Имя файла для сохранения (например, 'calc.xls').
"""
try:
# --- Шаг 1: Правильное кодирование URL ---
# Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '')
parts = urlsplit(url)
# Кодируем только путь, оставляя слэши '/' безопасными
# Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д.
encoded_path = quote(parts.path, safe='/-_')
# Собираем URL обратно из частей с уже закодированным путем
encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment))
# --- Шаг 2: Скачивание файла ---
response = requests.get(encoded_url, stream=True)
# Проверяем, успешен ли запрос (код 200 OK)
# Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение
response.raise_for_status()
# --- Шаг 3: Сохранение файла ---
# Открываем файл для записи в бинарном режиме ('wb')
# Использование 'with' гарантирует, что файл будет закрыт автоматически
with open(output_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'")
except requests.exceptions.RequestException as e:
print(f"❌ Ошибка скачивания: {e}")
except Exception as e:
print(f"❌ Произошла непредвиденная ошибка: {e}")
def calculate_sha1(filepath):
"""
Calculates the SHA1 hash of a given file.
Args:
filepath (str): The path to the file.
Returns:
str: The hexadecimal representation of the SHA1 hash, or None if the file is not found.
"""
sha1_hash = hashlib.sha1()
try:
with open(filepath, "rb") as f:
# Read the file in chunks to handle large files efficiently
for chunk in iter(lambda: f.read(4096), b""):
sha1_hash.update(chunk)
return sha1_hash.hexdigest()
except FileNotFoundError:
print(f"Error: File not found at {filepath}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
class StepTimeCounter: class StepTimeCounter:
def __init__(self): def __init__(self):
@@ -53,7 +123,7 @@ def remove_from_list(l: list, todel: list):
return l return l
def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down): def parse_all_dirt(reader: "ExcelSheetReader", min_pos: Coord, right, down, with_cells=False):
RET = set() RET = set()
row = min_pos.row row = min_pos.row
@@ -61,67 +131,14 @@ def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down):
col = min_pos.col col = min_pos.col
while col < min_pos.col + right: while col < min_pos.col + right:
#print(excel_coordinate(row, col)) #print(excel_coordinate(row, col))
cv = reader.get_cell_value(row, col) cv = reader.cell(row, col)
value = str(cv).strip() if cv is not None and not cv.is_empty():
if cv is not None and len(value) > 0: RET.add(cv if with_cells else str(cv.value))
RET.add(value)
col += 1 col += 1
row += 1 row += 1
return RET return RET
# GEMINI GENERATED
def normalize_name(raw_name):
"""
Приводит разнородные записи ФИО к единому структурированному виду.
"""
# Шаг 1: Очистка
name = re.sub(r'\s+', ' ', raw_name).strip()
# Шаг 2: Извлечение звания
known_titles = ['проф.', 'доц.', 'акад.', 'к.т.н.', 'д.м.н.']
title = None
for t in known_titles:
if name.lower().startswith(t):
title = t
# Удаляем звание из строки, убираем лишние пробелы
name = name[len(t):].strip()
break
# Шаг 3 и 4: Разделение и идентификация
parts = name.split(' ')
last_name = None
initials = None
# Простой эвристический анализ
# Ищем инициалы (содержат точку или состоят из 1-2 заглавных букв)
initials_parts = []
name_parts = []
for part in parts:
if '.' in part or (1 <= len(part) <= 2 and part.isupper()):
initials_parts.append(part)
else:
# Считаем все остальное частью фамилии (для двойных фамилий)
name_parts.append(part)
if name_parts:
last_name = " ".join(name_parts)
if initials_parts:
initials = "".join(initials_parts) # Сливаем "А." и "Н." в "А.Н."
# Если фамилия не найдена (например, только инициалы),
# но есть части, считаем первую часть фамилией
if not last_name and name_parts:
last_name = name_parts[0]
return {
"last_name": last_name,
"initials": initials,
"title": title
}
def excel_coordinate(row, col): def excel_coordinate(row, col):
""" """
Преобразует координаты строки и столбца (начиная с 0) в эквивалент Excel (например, A7, CB34). Преобразует координаты строки и столбца (начиная с 0) в эквивалент Excel (например, A7, CB34).
@@ -165,7 +182,7 @@ def find(sh, query = None):
return None return None
def weekday_to_num(st: str): def weekday_to_num(st: str):
if st.upper().strip() == "ПОНЕДЕЛЬНИК": if st.upper().strip().startswith("ПОНЕД"):
return 1 return 1
if st.upper().strip() == "ВТОРНИК": if st.upper().strip() == "ВТОРНИК":
return 2 return 2
@@ -177,8 +194,9 @@ def weekday_to_num(st: str):
return 5 return 5
if st.upper().strip() == "СУББОТА": if st.upper().strip() == "СУББОТА":
return 6 return 6
if st.upper().strip() == "ВОСКРЕСЕНЬЕ": if st.upper().strip().startswith("ВОСКР"):
return 7 return 7
print(f"Unknown weekday num for str: {st}; returnted -1")
return -1 return -1