economy_resources #1

Merged
fazziclay merged 14 commits from economy_resources into main 2026-03-28 22:22:55 +03:00
9 changed files with 812 additions and 601 deletions

4
.gitignore vendored
View File

@@ -4,3 +4,7 @@ __pycache__
.idea
result*.json
groups.json
diffable_dates.txt
parsed/
parser.json
.env

6
Dockerfile Normal file
View File

@@ -0,0 +1,6 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "-u", "main.py"]

View File

@@ -1,201 +0,0 @@
# Copyright GEMINI
import re
# --- Ресурсы для алгоритма ---
STOP_WORDS = {'пр.', 'лек.', 'лаб.', 'семинар'}
TITLES = ('доц.', 'проф.', 'асс.', 'ст.пр.')
SURNAME_ENDINGS = ('ов', 'ев', 'ин', 'ский', 'цкой', 'их', 'ых', 'ова', 'ева', 'ина', 'ская', 'ян', 'ко', "ня", "ин")
def is_surname_string(s: str) -> bool:
"""
Классифицирует строку, определяя, содержит ли она фамилию.
"""
if not isinstance(s, str) or not s:
return False
# Шаг 1: Очистка
s_clean = s.strip()
# Шаг 2: Жесткие правила "НЕ ФАМИЛИЯ"
if s_clean.lower() in STOP_WORDS:
return False
if s_clean.isupper() and len(s_clean) > 3:
return False
if re.search(r'[\(\)/«»%]', s_clean):
return False
words = s_clean.split()
if len(words) > 3:
return False
# Шаг 3: Жесткие правила "ТОЧНО ФАМИЛИЯ"
if re.search(r'\b[А-Я]\.?', s_clean): # Ищет "И.А." или "И.А"
return True
if s_clean.lower().startswith(TITLES):
return True
# Шаг 4: Эвристический анализ для оставшихся случаев
score = 0
# Правило на капитализацию
if words and words[0][0].isupper():
score += 5
else:
# Если слово не с большой буквы, это почти точно не фамилия
return False
# Правило на окончания
last_word = words[-1].lower()
if last_word.endswith(SURNAME_ENDINGS):
score += 6
# Правило на количество слов
if len(words) in [1, 2]:
score += 2
# Пороговое значение
THRESHOLD = 8
return score >= THRESHOLD
def extract_last_name(name_str: str) -> str or None:
"""
Извлекает из строки только фамилию.
Справляется с приклеенными званиями (типа "ст.пр.Дмитриев") и отбрасывает инициалы.
Ищет первое слово, написанное с заглавной буквы.
Args:
name_str: Исходная "грязная" строка с именем.
Returns:
Чистая фамилия в виде строки, или None, если фамилия не найдена.
"""
# Проверка, что на вход подана строка
if not isinstance(name_str, str):
return None
# Паттерн для поиска:
# [А-ЯЁ] - одна заглавная русская буква в начале
# [а-яё]+ - одна или более строчных русских букв после неё
# (?:-[А-ЯЁ][а-яё]+)? - опциональная часть для двойных фамилий (например, -Петров)
pattern = r'[А-ЯЁ][а-яё]+(?:-[А-ЯЁ][а-яё]+)?'
match = re.search(pattern, name_str)
if match:
return match.group(0) # Возвращаем найденное совпадение
else:
return None # Если ничего не найдено
# --- Шаг 0: Константы ---
POSITIVE_KEYWORDS = ['зал', 'ауд', 'каб', 'корп', 'кор.', "ЛК"]
NEGATIVE_KEYWORDS = ['доц', 'проф', 'асс', 'лек', 'пр']
# Транслитерация для унификации
TRANS_TABLE = str.maketrans('TCBAHIMK', 'ТКВАНІМК') # Латиница -> Кириллица
def is_room_number(s: str) -> bool:
"""
Проверяет, является ли строка номером кабинета, по многоуровневому алгоритму.
"""
# --- Шаг 1: Быстрые негативные фильтры ---
if not isinstance(s, str) or not s.strip():
return False # 1. Проверка на пустоту
if ',' in s:
return False # 2. Проверка на запятые (даты)
# 3. Проверка на "очевидный мусор"
first_word = s.strip().lower().split()[0]
if first_word in NEGATIVE_KEYWORDS:
return False
# 4. Проверка на инициалы (напр. А.Е.)
if re.search(r'\b[А-Я]\.[А-Я]\.?\b', s):
return False
# 5. Проверка на длинное слово без цифр
if not any(char.isdigit() for char in s) and len(s.split()) == 1 and len(s) > 10:
return False
# --- Шаг 2: Быстрые позитивные фильтры ---
s_lower = s.lower()
if any(keyword in s_lower for keyword in POSITIVE_KEYWORDS):
return True # 1. Проверка по ключевым словам
# --- Шаг 3: Основной анализ ---
if not any(char.isdigit() for char in s):
return False # 1. Требуется наличие цифр
# 2. Создание "чистой" версии
clean_s = s.upper().translate(TRANS_TABLE).replace(' ', '')
# 3. Комплексный паттерн для проверки
# Пояснение:
# ^...$ - шаблон должен соответствовать всей строке
# [А-ЯЁ]?-? - необязательная буква и необязательный дефис в начале
# \d+ - одна или более цифр (ядро номера)
# (?:[.-]\d+)* - необязательные группы ".число" или "-число"
# [А-ЯЁ]?$ - необязательная буква в конце
room_pattern = re.compile(r'^[А-ЯЁ]?-?\d+(?:[.-]\d+)*[А-ЯЁ]?$')
if room_pattern.fullmatch(clean_s):
return True
# --- Шаг 4: Финальное решение ---
return False
import requests
from urllib.parse import urlsplit, urlunsplit, quote
def download_file_from_url(url, output_filename):
"""
Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем.
Args:
url (str): Исходный URL, который может содержать пробелы и кириллицу.
output_filename (str): Имя файла для сохранения (например, 'calc.xls').
"""
try:
# --- Шаг 1: Правильное кодирование URL ---
# Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '')
parts = urlsplit(url)
# Кодируем только путь, оставляя слэши '/' безопасными
# Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д.
encoded_path = quote(parts.path, safe='/-_')
# Собираем URL обратно из частей с уже закодированным путем
encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment))
# --- Шаг 2: Скачивание файла ---
response = requests.get(encoded_url, stream=True)
# Проверяем, успешен ли запрос (код 200 OK)
# Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение
response.raise_for_status()
# --- Шаг 3: Сохранение файла ---
# Открываем файл для записи в бинарном режиме ('wb')
# Использование 'with' гарантирует, что файл будет закрыт автоматически
with open(output_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'")
except requests.exceptions.RequestException as e:
print(f"❌ Ошибка скачивания: {e}")
except Exception as e:
print(f"❌ Произошла непредвиденная ошибка: {e}")

View File

@@ -10,6 +10,11 @@ from bs4 import BeautifulSoup
BASE_URL = "https://www.vstu.ru/"
RASP_PREFIX = "https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php?dep="
def sibling_clear_to_date(s: str):
if s is None:
return "!!!Python None!!!"
return s.lower().replace("(последнее изменение:", "").replace(")", "").strip()
# Парсит ссылки на эксель .xls & .xlsx файлы и выдаёт их
def parse_links(facultets):
session = requests.Session()
@@ -27,7 +32,7 @@ def parse_links(facultets):
}
)
EXCEL_LINKS = {}
EXCEL_LINKS = []
for facultet in facultets:
url = RASP_PREFIX + facultet
print("getting...")
@@ -38,19 +43,19 @@ def parse_links(facultets):
# Ищем все теги <a>, у которых атрибут href соответствует нашему паттерну
excel_tags = soup.find_all('a', href=excel_pattern)
excel_links = [tag.get('href') for tag in excel_tags]
for a in excel_tags:
last_changed = sibling_clear_to_date(a.next_sibling)
url = urljoin(BASE_URL, a.get('href'))
disp = a.decode_contents()
record = {
"uniqpath": f"vstu.ru/rasp?dep={facultet}/{disp.strip()}",
"facultet": facultet,
"url": url,
"display_filename": disp,
"last_changed": last_changed
}
print("Found in vstu.ru: ", record)
EXCEL_LINKS.append(record)
# Предположим, вы уже получили excel_links из одного из методов выше
# excel_links = ['../../../upload/raspisanie/z/ОН_ХТФ_1 курс.xlsx', ...]
absolute_links = [urljoin(BASE_URL, relative_link) for relative_link in excel_links]
if facultet not in EXCEL_LINKS.keys():
EXCEL_LINKS[facultet] = set()
for excel_url in absolute_links:
EXCEL_LINKS[facultet].add(excel_url)
print(f"+url {excel_url}")
return EXCEL_LINKS
return sorted(EXCEL_LINKS, key=lambda x: x['url'])

417
main.py
View File

@@ -4,133 +4,93 @@
import json
import pika
import os
import random
import time
import traceback
import uuid
import aigenerated
import parser
import translations
import utils
import json
import links_parser
import shutil
from dotenv import load_dotenv
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
EXCHANGE_NAME = os.environ.get("RABBITMQ_EXCHANGE", "vstu_schedule")
try:
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME,
exchange_type='topic',
durable=True)
except Exception as e:
print("Failed to connect RabbitMQ")
traceback.print_exception(e)
def currt():
return round(time.time())
FACULTETS = [
FACULTETS = sorted([
"asp", "mag", "fastiv", "fat", "ftkm", "ftpp", "feu", "fevt", "htf", "vkf", "mmf", "fpik"
]
])
DIRNAME = "excels"
PARSED_DIR = "parsed"
DEBUG_ONE_FAC = None #'htf'
result_groups = {}
result = {
"version": 1,
"notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: Данные, доступ к API и т.д. предоставляется КАК-ЕСТЬ (AS-IS) без каких либо, явно или не явно подразумеваемых гарантий.\n\nПарсер написал: Миронов Станислав\n\nИсточник данных: https://www.vstu.ru/student/raspisaniya/zanyatiy/index.php",
"actual_at": round(time.time()),
"documentation": "https://fazziclay.com/api/v1/vstu_schedule_parser/scheme.json",
"daypicture": "QwQ",
"daycite": "running on a rope",
"contact": "https://fazziclay.com/",
"university": "VSTU",
"university_site": "https://www.vstu.ru/",
"source": "https://fazziclay.com/api/v1/vstu_schedule_parser/result.json",
"stat": {
"total_parsing_time": -1,
},
"api_notices": {
"updated_at": 1757688552,
"text": "Пожалуйста сохраняйте 'updated_at', это время изменения ЭТОГО текста. Тут возможно будут появлятся важные BREAKING CHANGES и дедлайны к ним.\nПо хорошему если updated_at другой по сравнению с вашем кэшем это сообщение должно отправляться вам в телеграм как уведомление о поедстоящих изменениях\nwarning=True значит 'text' содержит важное а не как щас hint.\n\n ~fazziclay aka Stanislav;",
"warning": False,
"tut-plavayuschaya-struktura": "required only 'updated_at', 'text' and 'warning'"
},
"doubled_groups": [],
"debug": {
"bleu~~": 1
},
"excels": [],
"facultets": FACULTETS,
DEBUG_ONE_FAC = None #'fevt'
DEBUG_NO_SAVE_STATES = False
"emptykey1": "",
"emptykey2": "",
"groups": result_groups,
"emptykey3": "",
"emptykey4": "",
"see_header_at_top_of_this_file": "SEE TOP OF THIS FILE | ОБРАТИТЕ ВНИМАНИЕ НА ВЕРХ ЭТОГО ФАЙЛА"
}
def process_excel_file(facultet, excel_url, counter, timeid):
is_xlsx = excel_url.endswith(".xlsx")
filename = f"{DIRNAME}/" + timeid + f"_[C{counter}]_" + facultet + ".xls" + ("x" if is_xlsx else "")
excel_info = {
"filename": excel_url.split("/")[-1],
"url": excel_url,
"download_place": filename,
"stat": {
"download": -1,
"create_reader": -1,
"parse": -1,
"cycles": 0
},
"group_names_parsed": [],
"facultet": facultet,
"counter": counter
}
parser.LOGGING = False
parser.LOGGING = LOGGING = True
def parse_sheets(download_place):
to_return = {}
try:
t = utils.StepTimeCounter()
aigenerated.download_file_from_url(excel_url, filename)
excel_info["stat"]['download'] = t.step()
reader = translations.create_reader(filename)
reader = translations.create_reader(download_place)
print("Reader info")
print(reader.info())
excel_info["stat"]['create_reader'] = t.step()
while True:
excel_info['stat']['cycles'] += 1
t = utils.StepTimeCounter()
print(f"Parsing sheet №{reader.get_sheet_index()+1} (from 1)")
sheet_dict = {
"index": reader.get_sheet_index(),
"name": reader.get_sheet_name(),
"reader_info": reader.info(),
"groups": {}
}
to_return["SHEET_"+str(reader.get_sheet_index())] = sheet_dict
prs = parser.Parser(reader)
print("Parser created; parser.parse();")
prs.parse()
print("parsed done!")
sheet_dict['parse_time'] = round(t.step())
if len(prs.raw_no_schedule) > 0:
sheet_dict["other_raws"] = prs.raw_no_schedule
if len(prs.features) > 0:
sheet_dict["features"] = sorted(prs.features)
if prs.parser_error is not None:
excel_info["parser_error_cycle_" + str(excel_info['stat']['cycles'])] = prs.parser_error
sheet_dict["parser_error"] = prs.parser_error
if prs.parser_warnings is not None and len(prs.parser_warnings) > 0:
sheet_dict["parser_warnings"] = prs.parser_warnings
for group_name in prs.groups.keys():
if group_name in result_groups.keys():
print(f" -- WTF -- Doubled groups -- name: {group_name}")
if 'warning_doubled_groups_skip' not in excel_info.keys():
excel_info['warning_doubled_groups_skip'] = []
excel_info['warning_doubled_groups_skip'].append(group_name)
result['doubled_groups'].append(group_name)
for group_name_key in prs.groups.keys():
gr = prs.groups[group_name_key]
sheet_dict['week_keys_metadata'] = prs.week_keys_metadata
sheet_dict['groups'][group_name_key] = gr
continue
gr = result_groups[group_name] = prs.groups[group_name]
gr['facultet'] = facultet
gr['data_source'] = excel_url.split("/")[-1]
gr['debug'] = {
"counter": counter,
"timeid": timeid,
"excel_url": excel_url,
"reader_info": reader.info(),
"reader_sheet_index": reader.get_sheet_index(),
"filename": filename
}
excel_info["group_names_parsed"].append(group_name)
print(f"Populates {len(prs.groups)} groups to result: " + " ".join(prs.groups.keys()))
print(f"Populates {len(prs.groups)} groups: " + " ".join(prs.groups.keys()))
if not reader.has_next_sheet():
print("File ended")
@@ -138,33 +98,60 @@ def process_excel_file(facultet, excel_url, counter, timeid):
else:
reader.next_sheet()
print("Next sheet!")
excel_info["stat"]['parse'] = t.step()
except Exception as e:
print(f"Error while {excel_url}")
print(e)
traceback.print_exc()
u = uuid.uuid4()
excel_info['error'] = {
to_return['error'] = {
"smile": ":(",
"error_message": str(e),
"log_anchor": str(u),
"time": currt()
}
print(f"Log Anchor: {u}")
faileds.append({
"ex": e,
"fac": facultet,
"url": excel_url
})
result['excels'].append(excel_info)
return to_return
faileds = []
def main():
def parsed_file_path(excel_filename: str):
format = excel_filename.split(".")[-1]
fl = format.lower()
if fl not in ["json", "xls", "xlsx"]:
print(f"Unknown filename format: {excel_filename}")
return
if fl != "json":
excel_filename = excel_filename.replace("." + format, ".json")
excel_filename = excel_filename.lower()
filepath = PARSED_DIR + os.path.sep + excel_filename
return filepath
def load_parsed_state(excel_filename):
filepath = parsed_file_path(excel_filename)
if not os.path.exists(filepath):
return
with open(filepath, "r", encoding="utf-8") as fp:
return json.load(fp=fp)
def save_parsed_state(excel_filename, obj):
filepath = parsed_file_path(excel_filename)
if DEBUG_NO_SAVE_STATES:
print("Saved! (fake because DEBUG_NO_SAVE_STATES)")
with open(filepath, "w", encoding="utf-8") as fp:
json.dump(obj, fp=fp, ensure_ascii=False, sort_keys=True)
print(f"Saved parsed state to '{filepath}'")
def run_session():
faileds = []
t = utils.StepTimeCounter()
# Delete tempdir
try:
try:
shutil.rmtree(DIRNAME)
@@ -177,42 +164,206 @@ def main():
print(f"Failed create '{DIRNAME}': ")
raise e
print("main(); parse links starting...")
EXCEL_LINKS = links_parser.parse_links(FACULTETS if DEBUG_ONE_FAC is None else [DEBUG_ONE_FAC])
counter = 0
timeid = str(round(time.time()))
for facultet in EXCEL_LINKS.keys():
counter += 1000
print(f"\n\n-- Факультет '{facultet}' --")
facultet_urls = EXCEL_LINKS[facultet]
for excel_url in facultet_urls:
counter += 1
print(f"\n\n-- Ссылка --")
print(f"{excel_url}")
if len(EXCEL_LINKS) < 5 and not DEBUG_ONE_FAC:
raise Exception("Safety exception: excel links count < 5; maybe in vstu.ru tech works")
last_changeds = set()
states = []
changed = False
for excel_dict in EXCEL_LINKS:
try:
last_changeds.add(excel_dict['last_changed'])
excel_url = excel_dict['url']
facultet = excel_dict['facultet']
excel_filename = excel_url.split("/")[-1]
excel_dict['json_represent'] = parsed_file_path(excel_filename).split(os.path.sep)[-1]
print(f"Processing {facultet} {excel_filename}")
state = load_parsed_state(excel_filename)
is_new = state is None
if is_new:
state = {}
print("Start processing excel file")
process_excel_file(facultet, excel_url, counter, timeid)
print("Excel file processing done!")
else:
same_date = False
try:
same_date = state['excel']['last_changed'] == excel_dict['last_changed']
print(f"Excel[{excel_filename}]: inServer={excel_dict['last_changed']}, inState={state['excel']['last_changed']} same={same_date}")
except Exception as e:
print(f"Excel[{excel_filename}]: failed testify last_changed")
r = "parser.excel_found." + ("same" if same_date else "different") + "." + facultet
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=r,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_found",
"same": same_date,
"excel_dict": excel_dict
}, ensure_ascii=False).encode('utf-8')
)
print(f"RabbitMQ published r={r}")
if same_date:
state['actual_at'] = currt()
try:
del state['excel']['different_in_this_session']
except: pass
states.append(state)
save_parsed_state(excel_filename, state)
continue
changed = True
excel_dict['different_in_this_session'] = True
state['actual_at'] = currt()
state['excel'] = excel_dict
is_xlsx = excel_url.endswith(".xlsx")
download_place = f"{DIRNAME}/" + excel_filename + "_" + facultet + ".xls" + ("x" if is_xlsx else "")
utils.download_file_from_url(excel_url, download_place)
sha1hash = utils.calculate_sha1(download_place)
state['excel']['sha1hash'] = sha1hash
state['sheets'] = parse_sheets(download_place)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.excel_parsed." + facultet,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "excel_file_parsed",
"is_new": is_new,
"state": state
}, ensure_ascii=False).encode('utf-8')
)
save_parsed_state(excel_filename, state)
states.append(state)
except Exception as e:
faileds.append({
"uuid": str(uuid.uuid4()),
"exception": str(e),
"traceback": traceback.format_exception(e),
"context": f"Failed process excel file {excel_dict['url']}"
})
traceback.print_exception(e)
print("Saving result.json")
result['stat']['total_parsing_time'] = t.step()
with open("parser.json", 'w', encoding="utf-8") as fp:
lc = {"*_x": ":("}
try:
s = sorted(last_changeds)
lc = {
"early": s[0],
"newly": s[-1]
}
except: pass
json.dump({
"last_changeds": lc,
"actual_at": currt(),
"all_files": EXCEL_LINKS,
"faileds": faileds
}, fp=fp, ensure_ascii=False)
if changed:
all_files = states
d = {
"version": 2,
"notice": "ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: ПРЕДОСТАВЛЯЕТСЯ КАК-ЕСТЬ (AS-IS) БЕЗ КАКИХ ЛИБО ГАРАНТИЙ",
"contact": "https://fazziclay.com/ или fazziclay@gmail.com",
"api_notices": {
"just_save_and_check_diffs": "просто сохраните и проверяйте разницу"
},
"actual_at": currt(),
"all_files": sorted(all_files, key=lambda d: d['excel']['url']),
"faileds": faileds
}
with open("result_v2.json", 'w', encoding="utf-8") as fp:
json.dump(d, fp=fp, ensure_ascii=False)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.result_v2",
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "schedule_result_v2",
"data": d
}, ensure_ascii=False).encode('utf-8')
)
json.dump(result, open('result.json', 'w'), indent=2, ensure_ascii=False)
print("Saved to result.json indent=2")
json.dump(result, open('result-no-indent.json', 'w'), ensure_ascii=False)
print("Saved to result-no-indent.json")
print("Faileds:")
print(faileds)
# Delete a non-empty directory and its contents
# Delete a non-empty directory and its contents
try:
shutil.rmtree(DIRNAME)
print(f"Directory '{DIRNAME}' and its contents deleted successfully.")
except Exception as e:
print(f"Error deleting directory '{DIRNAME}': {e}")
return {"changed": changed}
def check_dirs():
if not os.path.exists(PARSED_DIR):
os.mkdir(PARSED_DIR)
def main():
while True:
t = utils.StepTimeCounter()
err = None
sess = None
try:
check_dirs()
print("BEGIN run_session();")
sess = run_session()
print("END run_session();")
if DEBUG_ONE_FAC:
print("DEBUG_ONE_FAC; break infinity-loop")
break
except Exception as e:
err = e
print("Exception in run_session();")
traceback.print_exception(e)
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key="parser.session_end." + ('complete' if err is None else 'failed'),
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=json.dumps({
"type": "session_end",
"err": str(err) if err else None,
"duration": t.step(),
"session": sess
}, ensure_ascii=False).encode('utf-8')
)
print("Sleep for 30 minutes")
time.sleep(60*30)
print("Wake up!")
if __name__ == "__main__":

531
parser.py
View File

@@ -3,223 +3,393 @@
PAIR_NUMS = [
"1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16"
]
WEEKDAYS_STARTSWITH = [
"понед",
"вторник",
"среда",
"четверг",
"пятница",
"суббота",
"воскр"
]
BAD_GROUP_NAMES = [
"янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг"
]
from datetime import time
import json
import uuid
import aigenerated
from coord import Coord, Merged
from translations import ExcelSheetReader
import utils
from collections import defaultdict
LOGGING = True
LOGGING = False
def pprint(*args, **kwargs):
if LOGGING:
print(*args, **kwargs)
def is_weeknum(text):
for wd in WEEKDAYS_STARTSWITH:
if text.strip().replace(" ", "").lower().startswith(wd):
return True
return False
def is_pair(text):
for p in PAIR_NUMS:
if text.strip().replace(" ", "").lower().startswith(p):
return True
return False
class Parser:
def __init__(self, reader: ExcelSheetReader):
self.reader = reader
self.groups = {}
self.teachers = set()
self.places = set()
self.parser_error = None
self.groups = {} # Группы которые удалось распарсить
self.features = set() # фичи данной страницы
self.week_keys_metadata = {} # календарик
self.schedule_range_row = None # [min, max] диапазон col включительно где расписание
self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..)
self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata)
self.parser_error = None # ошибка парсера перед выходом
self.parser_warnings = [] # предупреждения парсера
pprint("Parser created for '{0}'".format(reader.info()))
def parse(self):
monday = self.reader.find("ПОНЕДЕЛЬНИК")
if monday is None:
# Характерные признаки разных сеток
no_pair_numeration = False
col_distance_pair_weekday = None
weekday_firstly_calendar = False
first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True)
if first_weekday is None:
self.features.add("no_weekdays")
print(" -- Failed parse! -- ")
print("ПОНЕДЕЛЬНИК НЕ НАЙДЕН!")
self.parser_error = "'ПОНЕДЕЛЬНИК' не найден в таблице."
print("дни недели не найдены!")
self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет."
self.parse_raw_no_schedule()
return
head_rx = monday.row - 1 # выше первого понидельника
pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True)
if pair_num_any is None:
no_pair_numeration = True
self.features.add("no_pair_numeration")
self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}")
else:
self.features.add("pair_numeration")
col_distance_pair_weekday = pair_num_any.col - first_weekday.col
head_rx = first_weekday.row - 1 # выше первого понидельника
group_col_start = first_weekday.col + 2
if col_distance_pair_weekday is not None:
if col_distance_pair_weekday > 1:
weekday_firstly_calendar = True
self.features.add("weekdays_before_calendar")
group_col_start = pair_num_any.col + 1
if head_rx < 0:
raise Exception("head_rx < 0: Программа пыталась найти 'ПОНЕДЕЛЬНИК', но по всей видимости не нашла.")
raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.")
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head={head}")
self.groups = parse_groups(self.reader, head, monday, head_rx) # parse groups to self.groups
head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()])
print(head_joined)
if (len(head_joined) == 0) or "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined:
head_rx -= 1
self.raw_no_schedule.append(head_joined)
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head (upper)={head}")
self.features.add("post_groups_info_row")
self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups
pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}')
pprint("\n\n\n")
for group in self.groups.values():
pprint("\nSTART OF PROCESS GROUP\n")
self.process_group(group, monday)
self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None)
pprint("\nEND OF PROCESS GROUP\n")
pprint(self.teachers)
# week metadatas parse
S = 9999999
group_min_col = S
group_min_row = S
for x in self.groups.values():
p = x['position']
group_min_row = min(p[0], group_min_row)
group_min_col = min(p[1], group_min_col)
if group_min_row != S and group_min_col != S:
pprint("Process weekmetadatas!")
self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col))
# parse no-schedule raws (согласовано, и т.д.)
self.parse_raw_no_schedule()
def parse_potokoviy(self, merged: Merged):
speaker = None
location = None
def parse_raw_no_schedule(self):
"""Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule"""
if self.schedule_range_row is None:
self.schedule_range_row = [999999999, 999999999] # прекрасное далёко
row = 0
while row < self.reader.get_row_count():
if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]:
row = self.schedule_range_row[1] + 1
row_values = self.reader.get_row_values(row)
row_values = [v for v in row_values if isinstance(v, str) and v.strip()]
if len(row_values) > 0:
self.raw_no_schedule.append(row_values)
row += 1
# speaker
low = merged.low
speaker_pos = low.shift(down=merged.height())
speaker = speaker_pos.cell(self.reader).value
def process_weekmetadatas(self, first_group: "Coord"):
"""Обработать календарик"""
for x in self.weeknums.keys():
pprint(x)
set_of_merged: set = self.weeknums[x]
l = len(set_of_merged)
if l != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<")
continue
weekday_merged: Merged = set_of_merged.pop()
if weekday_merged.width() != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})")
continue
month_row = first_group.row
curr_col = first_group.col - 1
while curr_col >= 0:
month_pos = Coord(month_row, curr_col)
month_cell = month_pos.cell(self.reader)
if month_cell.is_empty():
pprint("month cell is empty")
curr_col -= 1
continue
month_name = str(month_cell.value).strip()
pprint(month_cell)
all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height())
pprint(f"all_nums_of_month={all_nums_of_month}")
if (x not in self.week_keys_metadata.keys()):
self.week_keys_metadata[x] = {}
if (month_name not in self.week_keys_metadata[x].keys()):
self.week_keys_metadata[x][month_name] = []
for x2 in all_nums_of_month:
if x2.lower() == month_name.lower():
pprint(f"Skip {x2} month number because it == month_name")
continue
m = self.week_keys_metadata[x][month_name]
if x2 not in m:
try:
m.append(str(x2).replace(".0", ""))
except:
m.append(x2)
curr_col -= 1
# location
location = merged.high.shift(down=1).cell(self.reader).value
def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"):
self.weeknums[week_key_name].add(merged)
def row_with_schedule_notify(self, row_coord):
"""Вызывается каждый раз когда в переданной row обранужено расписание"""
if self.schedule_range_row is None:
self.schedule_range_row = [row_coord, row_coord]
if self.schedule_range_row[1] < row_coord:
self.schedule_range_row[1] = row_coord
if self.schedule_range_row[0] > row_coord:
self.schedule_range_row[0] = row_coord
return {"loc": str(location).strip(), "leader": str(speaker).strip(), "name": str(merged.cell(self.reader).value).strip()}
def process_group(self, group, monday):
def process_group(self, group: dict, first_weekday: Coord, pair_pos_col):
"""
Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups)
group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'}
"""
pprint(f"process_group group={group}")
group_name = group['name']
pprint(group_name)
row = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
pprint(F"Имя группы: {group_name}")
row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
self.row_with_schedule_notify(group['position'][0])
group_header_pos = Coord(group['position'][0], group['position'][1])
width = group['width']
weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии.
previous_pair = None
while row < self.reader.get_row_count(): # maybe условие чтобы не уйти ниже чем есть строк
pos = Coord(row, group['position'][1]) # текущая позиция, верхний левый угол (=low)
pprint(f"while pos={pos}")
pos_right = pos.shift(right=3)
pair_pos = pos.replace(col=5)
weekday_pos = pos.replace(col=4)
merged = self.reader.get_merged_coord(pos)
merged_cell = merged.cell(self.reader)
cv = merged_cell.value
# В конце (12 пара:>) название группы, можно использовать как якорь
if utils.unspace(cv) == group_name:
pprint("Lesson == group name; ending group loop.")
break
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = utils.unspace(weekday_mr.cell(self.reader).value)
pair_mr = self.reader.get_merged_coord(pair_pos)
pair = utils.unspace(pair_mr.cell(self.reader).value)
skip = 0
if weekday == "":
if weeknum == 1:
weeknum += 1
pprint("------")
skip = 1
row += 1
else:
break
weekcycles = 0
while row_c1 < self.reader.get_row_count():
pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low)
self.row_with_schedule_notify(pos_c1.row)
if not skip:
next = 3 # на сколько пыгнуть для следующего шага?
if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name):
pprint("Ended with grpup name; stop moving down, break")
break
weekday_pos = pos_c1.replace(col=first_weekday.col)
weekday_cell = weekday_pos.cell(self.reader)
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = weekday_cell.value
if not is_weeknum(weekday):
row_c1 += 1
pprint("Not weeknum!")
if weekcycles > 0:
if (weeknum != 2):
pprint("Weeknum now 2")
weekday = 0
weeknum = 2
continue
pprint(weekday)
weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2")
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
# state
event_no = 1
is_widely = False
override_col_range = None
all_raw = set()
pairs = set()
times = []
first_coord = None
row_c2 = row_c1
while row_c2 <= weekday_mr.high.row:
pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low)
cell_c2 = pos_c2.cell(self.reader)
mr_c2 = self.reader.get_merged_coord(pos_c2)
is_empty_lesson = len(utils.parse_all_dirt(self.reader, pos, 4, 3)) == 0 # если в поле не найдено ничего..
parsed_discipline_name = None
parsed_location = None
parsed_leader = None
pairs = 1
wtf_tomanypairs = False
is_solid = pos_right in merged
parsed_uncotigorized = []
is_wide_maybe_potokoviy = merged.width() > 4 # потоковая ли лекция (занимает несколько групп.)
if not is_empty_lesson:
cur = pos.shift(down=2)
while utils.has_no_bottom_border(self.reader, cur):
next += 3
pairs += 1
pprint(f"next = {next} cur={cur}")
if pairs >= 7:
wtf_tomanypairs = True
break
cur = cur.shift(down=3)
if is_wide_maybe_potokoviy:
ret = self.parse_potokoviy(merged)
parsed_location = ret['loc']
parsed_leader = ret['leader']
parsed_discipline_name = ret['name']
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, merged.width(), next))
else:
if (is_solid):
parsed_discipline_name = cv
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, 4, next))
# попытка исправить пару (1-2) если пустая.
fuck_empty_pair_in_excel = pair == ""
previous_dump = previous_pair
if fuck_empty_pair_in_excel:
if previous_pair is None or previous_pair == "":
pair = f"EMPTY_IN_EXCEL_{uuid.uuid4()}"
else:
pair = utils.next_element(PAIR_NUMS, previous_pair)
if first_coord is None:
first_coord = pos_c2.row
if pair != "":
previous_pair = pair if next == 3 else None # костыль чтобы избежать гипотетически не верной даты.
pair_num = None
pair_num_mr = None
if pair_pos_col is not None:
pair_num = pos_c2.replace(col=pair_pos_col)
pair_num_mr = self.reader.get_merged_coord(pair_num)
if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1):
is_widely = True
override_col_range = (mr_c2.low.col, mr_c2.high.col)
col_low = group_header_pos.col
col_high = group_header_pos.col + width - 1
if override_col_range is not None:
col_low = min(col_low, override_col_range[0])
col_high = max(col_high, override_col_range[1])
# пытаемся из некотегорезированных данных выцепить место и лидера (препода)
prepods = set()
if parsed_leader is not None: prepods.add(parsed_leader.strip())
locations = set()
if parsed_location is not None: locations.add(parsed_location.strip().replace(" ", ""))
for x in list(parsed_uncotigorized):
if aigenerated.is_surname_string(x):
prepods.add(x.strip())
if aigenerated.is_room_number(x):
locations.add(x.strip().replace(" ", "") if x is not None else None)
# попытка починить пустую дисциплину
if parsed_discipline_name is None:
l = sorted(utils.remove_from_list(list(parsed_uncotigorized), list(locations | prepods | set([parsed_location, parsed_leader]))))
parsed_discipline_name = " ".join(l)
# чистим сеты от мусора
utils.discards_list(prepods, nones=True, emptystrings=True)
utils.discards_list(locations, nones=True, emptystrings=True)
utils.discards_list(parsed_uncotigorized, nones=True, emptystrings=True)
# если не пустой предмет то записываем его
if not is_empty_lesson:
slots = group['slots']
w = weekday + ("_1" if weeknum == 1 else "_2")
if w not in slots.keys():
slots[w] = {}
dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True)
if len(dirty_line) > 0:
if pair_num_mr is not None:
pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip()
if len(pair_num_to_add) == 0:
pair_num_to_add = "???"
pprint("Составители эксельки? Вы почему не указали номер пары ёклмн")
pairs.add(pair_num_to_add)
today = slots[w]
today[pair] = {
"excel_pos": str(pos),
"discipline_name": parsed_discipline_name.strip(),
"locations": sorted(locations),
"leads": sorted(prepods),
"is_solid": is_solid,
"time_coeff": pairs,
"is_flow": is_wide_maybe_potokoviy,
"lefttopmerged": {
"width": merged.width(),
"height": merged.height(),
"excel_range": utils.merged_humanize(merged.as_numbers())
},
"raw": sorted(parsed_uncotigorized),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if fuck_empty_pair_in_excel:
today[pair]['pair_num_empty'] = {
"prev": previous_dump,
"restored": pair != "",
"pair": pair
}
if wtf_tomanypairs:
today[pair]['to_many_parsing_time_coeff'] = True
for cell in dirty_line:
if not cell.is_time:
all_raw.add(str(cell.value))
else:
dt: time = cell.value
times.append(str(dt))
# INCREMENT на next и конец цикла.
row += next
def clean_state():
nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord
is_widely = False
override_col_range = None
event_no += 1
all_raw = set()
pairs = set()
first_coord = None
times = []
if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0):
if not (len(all_raw) == 0):
# this code last for current state event
pprint(f"{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}")
slots = group['slots']
w = weekday_key_name
if w not in slots.keys():
slots[w] = {}
pair_name = "????"
try:
pair_name = sorted(pairs)[0]
except: pass
obj = {
"object": "event",
"pairs": sorted(pairs),
"is_flow": is_widely,
"excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)),
"raw": sorted(all_raw),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if len(times) > 0:
obj['times'] = times
def smart_insert(first_dict, key, to_insert):
if key not in first_dict.keys():
first_dict[key] = {}
if isinstance(first_dict[key], dict):
if len(first_dict[key].keys()) == 0:
first_dict[key] = to_insert
else:
p = first_dict[key]
first_dict[key] = [p, to_insert]
elif isinstance(first_dict[key], list):
first_dict[key].append(to_insert)
else:
self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)")
if pair_pos_col is None:
smart_insert(slots, w, obj)
else:
smart_insert(slots[w], pair_name, obj)
# here may be a empty all_raw
clean_state()
first_coord = None
if row_c2 >= weekday_mr.high.row:
clean_state()
pprint("Last for weekday")
def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx):
row_c2 += 1
row_c1 += weekday_mr.height()
weekcycles += 1
def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx):
"""Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)"""
groups = {}
i = 0
@@ -227,21 +397,26 @@ def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx):
x = head[i]
pprint(f"while i={i} head[i]={x}")
merged = reader.get_merged_coord(Coord(head_rx, i))
if i > monday.col + 1:
if merged is None or x == "":
break
if merged.width() != 4:
pprint(f"WARNING: group header witdh !=4 (found: {merged.width()}); blocks !=4 not supported by parser.")
if i >= col_start:
if merged is None or x == "" or x is None:
break
name = utils.unspace(x)
groups[name] = {
"name": name,
"position": [head_rx, i],
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
skip = False
if "-" not in name:
for x in BAD_GROUP_NAMES:
if x in name.lower():
skip = True
pprint(f"Skip groupname {name} because not dash in name and in blacklist")
if not skip:
groups[name.lower()] = {
"name": name,
"position": [head_rx, i],
"width": merged.width(),
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
if merged is None:
i += 1

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
openpyxl
xlrd
beautifulsoup4
requests
pika
python-dotenv

View File

@@ -1,6 +1,7 @@
# --- Абстрактный базовый класс (Контракт) ---
# Copyright Stanislav Mironov
from abc import ABC, abstractmethod
from datetime import time
import openpyxl
import xlrd
@@ -10,13 +11,23 @@ from coord import Coord, Merged
EMPTY_CTYPES = [xlrd.XL_CELL_EMPTY, xlrd.XL_CELL_BLANK]
class TranschendentnostCell:
def __init__(self, value, is_empty):
def __init__(self, value, is_empty, is_time=False):
self.value = value
self.is_time = isinstance(value, time) or is_time
self._is_empty = is_empty
def is_nospace_nocase_same(self, query):
try:
if self.value.lower().replace(" ", "").strip() == query.lower().replace(" ", "").strip():
return True
except: pass
return False
def is_empty(self):
return self._is_empty
# --- Абстрактный базовый класс (Контракт) ---
class ExcelSheetReader(ABC):
"""
Абстрактный базовый класс, определяющий интерфейс для чтения данных из листа Excel.
@@ -28,6 +39,10 @@ class ExcelSheetReader(ABC):
@abstractmethod
def get_sheet_index(self):
pass
@abstractmethod
def get_sheet_name(self):
pass
@abstractmethod
def has_next_sheet(self):
@@ -71,16 +86,28 @@ class ExcelSheetReader(ABC):
return "TODO: info"
@abstractmethod
def cell(self, row, col):
def cell(self, row, col) -> TranschendentnostCell:
"""Возвращает абстрактную клетку"""
pass
def find(self, query = None):
def find(self, query = None, startswith=False, nospace=False):
return self.find_any([query], startswith=startswith, nospace=nospace)
def find_any(self, query = None, startswith=False, nospace=False):
for rx in range(self.get_row_count()):
i = 0
for x in self.get_row_values(rx):
if x == query:
return Coord(rx, i)
if nospace:
x = str(x).replace(" ", "").strip()
for query_selected in query:
if x == query_selected:
return Coord(rx, i)
elif startswith:
try:
if str(x).lower().startswith(query_selected.lower()):
return Coord(rx, i)
except: pass
i += 1
return None
@@ -104,7 +131,6 @@ class ExcelSheetReader(ABC):
# --- Реализация №1: Обертка для xlrd ---
class XlrdSheetReader(ExcelSheetReader):
def __init__(self, file_path, sheet_index=0):
super().__init__(file_path)
@@ -117,6 +143,9 @@ class XlrdSheetReader(ExcelSheetReader):
def init_sheet(self):
self.sheet = self.book.sheet_by_index(self.sheet_index)
def get_sheet_name(self):
return self.sheet.name
def has_next_sheet(self):
return self.sheet_index < len(self.book.sheet_names())-1
@@ -140,7 +169,24 @@ class XlrdSheetReader(ExcelSheetReader):
def cell(self, row, col):
"""Возвращает абстрактную клетку"""
c = self.sheet.cell(row, col)
return TranschendentnostCell(c.value, c.ctype in EMPTY_CTYPES)
is_empty = c.ctype in EMPTY_CTYPES
is_time = c.ctype == xlrd.XL_CELL_DATE
value = c.value
if is_empty:
value = ""
elif is_time:
if isinstance(value, float):
if value <= 1:
seconds = round(value * 86400)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
value = time(hour=hours, second=seconds, minute=minutes)
else:
print(f"TODO: value is {value} its unix? not 0.xxxxxxxx")
else:
is_time = False
print("IsTime but not float!")
return TranschendentnostCell(value, is_empty, is_time=is_time)
def get_border_style(self, coord: Coord, side):
row = coord.row
@@ -173,7 +219,6 @@ class XlrdSheetReader(ExcelSheetReader):
# --- Реализация №2: Обертка-транслятор для openpyxl ---
class OpenpyxlSheetReader(ExcelSheetReader):
def __init__(self, file_path, sheet_name=None):
super().__init__(file_path)
@@ -192,6 +237,9 @@ class OpenpyxlSheetReader(ExcelSheetReader):
def get_sheet_index(self):
return self.sheet_index
def get_sheet_name(self):
return self.workbook.sheetnames[self.sheet_index]
def has_next_sheet(self):
return self.sheet_index < len(self.workbook.sheetnames)-1
@@ -221,7 +269,7 @@ class OpenpyxlSheetReader(ExcelSheetReader):
c = self._get_cell(row, col)
is_empty = (c.value is None)
return TranschendentnostCell("" if is_empty else c.value, is_empty)
return TranschendentnostCell("" if is_empty else c.value, is_empty, is_time=isinstance(c.value, time))
def get_cell_value(self, row, col):
cell = self._get_cell(row, col)
@@ -260,8 +308,7 @@ class OpenpyxlSheetReader(ExcelSheetReader):
return []
# --- Фабричная функция (Ваша единственная точка входа) ---
# --- Фабричная функция (единственная точка входа) ---
def create_reader(file_path, **kwargs) -> ExcelSheetReader:
"""
Создает и возвращает подходящий экземпляр ридера в зависимости от расширения файла.

142
utils.py
View File

@@ -1,12 +1,82 @@
# Copyright Stanislav Mironov
import time
import xlrd
from coord import Coord, Merged
from coord import Coord
from translations import ExcelSheetReader
import re
import hashlib
import requests
from urllib.parse import urlsplit, urlunsplit, quote
def download_file_from_url(url, output_filename):
"""
Скачивает файл по URL со спецсимволами и пробелами, сохраняя его под указанным именем.
Args:
url (str): Исходный URL, который может содержать пробелы и кириллицу.
output_filename (str): Имя файла для сохранения (например, 'calc.xls').
"""
try:
# --- Шаг 1: Правильное кодирование URL ---
# Разбираем URL на части: ('https', 'www.vstu.ru', '/path/to file.xls', '', '')
parts = urlsplit(url)
# Кодируем только путь, оставляя слэши '/' безопасными
# Это превратит ' ' в '%20', 'В' в '%D0%92' и т.д.
encoded_path = quote(parts.path, safe='/-_')
# Собираем URL обратно из частей с уже закодированным путем
encoded_url = urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment))
# --- Шаг 2: Скачивание файла ---
response = requests.get(encoded_url, stream=True)
# Проверяем, успешен ли запрос (код 200 OK)
# Если сервер вернет ошибку (404, 500 и т.д.), здесь возникнет исключение
response.raise_for_status()
# --- Шаг 3: Сохранение файла ---
# Открываем файл для записи в бинарном режиме ('wb')
# Использование 'with' гарантирует, что файл будет закрыт автоматически
with open(output_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Файл успешно скачан и сохранен как '{output_filename}'")
except requests.exceptions.RequestException as e:
print(f"❌ Ошибка скачивания: {e}")
except Exception as e:
print(f"❌ Произошла непредвиденная ошибка: {e}")
def calculate_sha1(filepath):
"""
Calculates the SHA1 hash of a given file.
Args:
filepath (str): The path to the file.
Returns:
str: The hexadecimal representation of the SHA1 hash, or None if the file is not found.
"""
sha1_hash = hashlib.sha1()
try:
with open(filepath, "rb") as f:
# Read the file in chunks to handle large files efficiently
for chunk in iter(lambda: f.read(4096), b""):
sha1_hash.update(chunk)
return sha1_hash.hexdigest()
except FileNotFoundError:
print(f"Error: File not found at {filepath}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
class StepTimeCounter:
def __init__(self):
@@ -53,7 +123,7 @@ def remove_from_list(l: list, todel: list):
return l
def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down):
def parse_all_dirt(reader: "ExcelSheetReader", min_pos: Coord, right, down, with_cells=False):
RET = set()
row = min_pos.row
@@ -61,67 +131,14 @@ def parse_all_dirt(reader: "ExcelSheetReader", min_pos, right, down):
col = min_pos.col
while col < min_pos.col + right:
#print(excel_coordinate(row, col))
cv = reader.get_cell_value(row, col)
value = str(cv).strip()
if cv is not None and len(value) > 0:
RET.add(value)
cv = reader.cell(row, col)
if cv is not None and not cv.is_empty():
RET.add(cv if with_cells else str(cv.value))
col += 1
row += 1
return RET
# GEMINI GENERATED
def normalize_name(raw_name):
"""
Приводит разнородные записи ФИО к единому структурированному виду.
"""
# Шаг 1: Очистка
name = re.sub(r'\s+', ' ', raw_name).strip()
# Шаг 2: Извлечение звания
known_titles = ['проф.', 'доц.', 'акад.', 'к.т.н.', 'д.м.н.']
title = None
for t in known_titles:
if name.lower().startswith(t):
title = t
# Удаляем звание из строки, убираем лишние пробелы
name = name[len(t):].strip()
break
# Шаг 3 и 4: Разделение и идентификация
parts = name.split(' ')
last_name = None
initials = None
# Простой эвристический анализ
# Ищем инициалы (содержат точку или состоят из 1-2 заглавных букв)
initials_parts = []
name_parts = []
for part in parts:
if '.' in part or (1 <= len(part) <= 2 and part.isupper()):
initials_parts.append(part)
else:
# Считаем все остальное частью фамилии (для двойных фамилий)
name_parts.append(part)
if name_parts:
last_name = " ".join(name_parts)
if initials_parts:
initials = "".join(initials_parts) # Сливаем "А." и "Н." в "А.Н."
# Если фамилия не найдена (например, только инициалы),
# но есть части, считаем первую часть фамилией
if not last_name and name_parts:
last_name = name_parts[0]
return {
"last_name": last_name,
"initials": initials,
"title": title
}
def excel_coordinate(row, col):
"""
Преобразует координаты строки и столбца (начиная с 0) в эквивалент Excel (например, A7, CB34).
@@ -165,7 +182,7 @@ def find(sh, query = None):
return None
def weekday_to_num(st: str):
if st.upper().strip() == "ПОНЕДЕЛЬНИК":
if st.upper().strip().startswith("ПОНЕД"):
return 1
if st.upper().strip() == "ВТОРНИК":
return 2
@@ -177,8 +194,9 @@ def weekday_to_num(st: str):
return 5
if st.upper().strip() == "СУББОТА":
return 6
if st.upper().strip() == "ВОСКРЕСЕНЬЕ":
if st.upper().strip().startswith("ВОСКР"):
return 7
print(f"Unknown weekday num for str: {st}; returnted -1")
return -1