refactor: big, more patterns\n\nBREAKING CHANGES

This commit is contained in:
2026-03-16 20:53:42 +03:00
parent 2105e9bc36
commit 7e0e4a0b71
5 changed files with 416 additions and 244 deletions

453
parser.py
View File

@@ -3,10 +3,21 @@
PAIR_NUMS = [
"1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16"
]
WEEKDAYS_STARTSWITH = [
"понед",
"вторник",
"среда",
"четверг",
"пятница",
"суббота"
]
bad_group_names = [
"янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг"
]
from datetime import time
import json
import uuid
import aigenerated
from coord import Coord, Merged
from translations import ExcelSheetReader
import utils
@@ -17,50 +28,127 @@ LOGGING = False
def pprint(*args, **kwargs):
if LOGGING:
print(*args, **kwargs)
def is_weeknum(text):
for wd in WEEKDAYS_STARTSWITH:
if text.strip().replace(" ", "").lower().startswith(wd):
return True
return False
def is_pair(text):
for p in PAIR_NUMS:
if text.strip().replace(" ", "").lower().startswith(p):
return True
return False
class Parser:
def __init__(self, reader: ExcelSheetReader):
self.reader = reader
self.groups = {}
self.week_keys_metadata = {}
self.groups = {} # Группы которые удалось распарсить
self.features = set() # фичи данной страницы
self.week_keys_metadata = {} # календарик
self.schedule_range_row = None # [min, max] диапазон col включительно где расписание
self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..)
self.weeknums: defaultdict = defaultdict(set) # no support json!
self.parser_error = None
self.parser_warnings = []
self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata)
self.parser_error = None # ошибка парсера перед выходом
self.parser_warnings = [] # предупреждения парсера
pprint("Parser created for '{0}'".format(reader.info()))
def parse(self):
monday = self.reader.find("ПОНЕДЕЛЬНИК")
if monday is None:
# Характерные признаки разных сеток
no_pair_numeration = False
col_distance_pair_weekday = None
weekday_firstly_calendar = False
first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True)
if first_weekday is None:
self.features.add("no_weekdays")
print(" -- Failed parse! -- ")
print("ПОНЕДЕЛЬНИК НЕ НАЙДЕН!")
self.parser_error = "'ПОНЕДЕЛЬНИК' не найден в таблице."
print("дни недели не найдены!")
self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет."
return
if monday.col != 4:
print("--- warning parse! ---")
print(f"Monday col != 4 (actual: {monday})")
self.parser_warnings.append(f"Monday col != 4 (actual: {monday}); Это, наверное, может работать не стабильно!")
pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True)
if pair_num_any is None:
no_pair_numeration = True
self.features.add("no_pair_numeration")
self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}")
else:
self.features.add("pair_numeration")
col_distance_pair_weekday = pair_num_any.col - first_weekday.col
head_rx = monday.row - 1 # выше первого понидельника
head_rx = first_weekday.row - 1 # выше первого понидельника
group_col_start = first_weekday.col + 2
if col_distance_pair_weekday is not None:
if col_distance_pair_weekday > 1:
weekday_firstly_calendar = True
self.features.add("weekdays_before_calendar")
group_col_start = pair_num_any.col + 1
if head_rx < 0:
raise Exception("head_rx < 0: Программа пыталась найти 'ПОНЕДЕЛЬНИК', но по всей видимости не нашла.")
raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.")
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head={head}")
self.groups = parse_groups(self.reader, head, monday, head_rx) # parse groups to self.groups
head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()])
print(head_joined)
if "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined:
head_rx -= 1
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head (upper)={head}")
self.features.add("post_groups_info_row")
self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups
pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}')
pprint("\n\n\n")
for group in self.groups.values():
pprint("\nSTART OF PROCESS GROUP\n")
self.process_group(group, monday)
self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None)
pprint("\nEND OF PROCESS GROUP\n")
self.process_weekmetadatas(monday)
def process_weekmetadatas(self, first_monday: "Coord"):
# week metadatas parse
S = 9999999
group_min_col = S
group_min_row = S
for x in self.groups.values():
p = x['position']
group_min_row = min(p[0], group_min_row)
group_min_col = min(p[1], group_min_col)
if group_min_row != S and group_min_col != S:
pprint("Process weekmetadatas!")
self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col))
# parse no-schedule raws (согласовано, и т.д.)
self.parse_raw_no_schedule()
def parse_raw_no_schedule(self):
"""Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule"""
if self.schedule_range_row is None:
return
row = 0
while row < self.reader.get_row_count():
if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]:
row = self.schedule_range_row[1] + 1
row_values = self.reader.get_row_values(row)
row_values = [v for v in row_values if isinstance(v, str) and v.strip()]
if len(row_values) > 0:
self.raw_no_schedule.append(row_values)
row += 1
def process_weekmetadatas(self, first_group: "Coord"):
"""Обработать календарик"""
for x in self.weeknums.keys():
pprint(x)
set_of_merged: set = self.weeknums[x]
@@ -82,14 +170,16 @@ class Parser:
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})")
continue
month_row = first_monday.row - 1
curr_col = weekday_merged.low.col - 1
month_row = first_group.row
curr_col = first_group.col - 1
while curr_col >= 0:
month_pos = Coord(month_row, curr_col)
month_cell = month_pos.cell(self.reader)
if month_cell.is_empty():
pprint("month cell is empty")
break
curr_col -= 1
continue
month_name = str(month_cell.value).strip()
pprint(month_cell)
all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height())
@@ -117,6 +207,16 @@ class Parser:
def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"):
self.weeknums[week_key_name].add(merged)
def row_with_schedule_notify(self, row_coord):
if self.schedule_range_row is None:
self.schedule_range_row = [row_coord, row_coord]
if self.schedule_range_row[1] < row_coord:
self.schedule_range_row[1] = row_coord
if self.schedule_range_row[0] > row_coord:
self.schedule_range_row[0] = row_coord
def parse_potokoviy(self, merged: Merged):
speaker = None
@@ -132,163 +232,157 @@ class Parser:
return {"loc": str(location).strip(), "leader": str(speaker).strip(), "name": str(merged.cell(self.reader).value).strip()}
def process_group(self, group: dict, monday: Coord):
def process_group(self, group: dict, first_weekday: Coord, pair_pos_col):
"""
Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups)
group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'}
"""
pprint(f"process_group group={group}")
group_name = group['name']
pprint(group_name)
row = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
pprint(F"Имя группы: {group_name}")
row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
self.row_with_schedule_notify(group['position'][0])
group_header_pos = Coord(group['position'][0], group['position'][1])
width = group['width']
weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии.
previous_pair = None
while row < self.reader.get_row_count(): # maybe условие чтобы не уйти ниже чем есть строк
pos = Coord(row, group['position'][1]) # текущая позиция, верхний левый угол (=low)
pprint(f"while pos={pos}")
pos_right = pos.shift(right=3)
pair_pos = pos.replace(col=monday.col + 1)
weekday_pos = pos.replace(col=monday.col)
merged = self.reader.get_merged_coord(pos)
merged_cell = merged.cell(self.reader)
cv = merged_cell.value
# В конце (12 пара:>) название группы, можно использовать как якорь
if utils.unspace(cv) == group_name:
pprint("Lesson == group name; ending group loop.")
break
weekcycles = 0
while row_c1 < self.reader.get_row_count():
pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low)
self.row_with_schedule_notify(pos_c1.row)
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = utils.unspace(weekday_mr.cell(self.reader).value)
pair_mr = self.reader.get_merged_coord(pair_pos)
pair = utils.unspace(pair_mr.cell(self.reader).value)
skip = 0
if weekday == "":
if weeknum == 1:
weeknum += 1
pprint("------")
skip = 1
row += 1
else:
break
if not skip:
next = 3 # на сколько пыгнуть для следующего шага?
if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name):
pprint("Ended with grpup name; stop moving down, break")
break
weekday_pos = pos_c1.replace(col=first_weekday.col)
weekday_cell = weekday_pos.cell(self.reader)
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = weekday_cell.value
if not is_weeknum(weekday):
row_c1 += 1
pprint("Not weeknum!")
if weekcycles > 0:
if (weeknum != 2):
pprint("Weeknum now 2")
weekday = 0
weeknum = 2
continue
pprint(weekday)
weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2")
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
# state
event_no = 1
is_widely = False
override_col_range = None
all_raw = set()
pairs = set()
times = []
first_coord = None
row_c2 = row_c1
while row_c2 <= weekday_mr.high.row:
pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low)
cell_c2 = pos_c2.cell(self.reader)
mr_c2 = self.reader.get_merged_coord(pos_c2)
weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2")
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
if first_coord is None:
first_coord = pos_c2.row
is_empty_lesson = len(utils.parse_all_dirt(self.reader, pos, 4, 3)) == 0 # если в поле не найдено ничего..
parsed_discipline_name = None
parsed_location = None
parsed_leader = None
pairs = 1
wtf_tomanypairs = False
is_solid = pos_right in merged
parsed_uncotigorized = []
is_wide_maybe_potokoviy = merged.width() > 4 # потоковая ли лекция (занимает несколько групп.)
if not is_empty_lesson:
cur = pos.shift(down=2)
while utils.has_no_bottom_border(self.reader, cur):
next += 3
pairs += 1
pprint(f"next = {next} cur={cur}")
if pairs >= 7:
wtf_tomanypairs = True
break
cur = cur.shift(down=3)
if is_wide_maybe_potokoviy:
ret = self.parse_potokoviy(merged)
parsed_location = ret['loc']
parsed_leader = ret['leader']
parsed_discipline_name = ret['name']
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, merged.width(), next))
else:
if (is_solid):
parsed_discipline_name = cv
parsed_uncotigorized = list(utils.parse_all_dirt(self.reader, merged.low, 4, next))
# попытка исправить пару (1-2) если пустая.
fuck_empty_pair_in_excel = pair == ""
previous_dump = previous_pair
if fuck_empty_pair_in_excel:
if previous_pair is None or previous_pair == "":
pair = f"EMPTY_IN_EXCEL"
else:
pair = utils.next_element(PAIR_NUMS, previous_pair)
pair_num = None
pair_num_mr = None
if pair_pos_col is not None:
pair_num = pos_c2.replace(col=pair_pos_col)
pair_num_mr = self.reader.get_merged_coord(pair_num)
if pair != "":
previous_pair = pair if next == 3 else None # костыль чтобы избежать гипотетически не верной даты.
if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1):
is_widely = True
override_col_range = (mr_c2.low.col, mr_c2.high.col)
col_low = group_header_pos.col
col_high = group_header_pos.col + width - 1
if override_col_range is not None:
col_low = min(col_low, override_col_range[0])
col_high = max(col_high, override_col_range[1])
# пытаемся из некотегорезированных данных выцепить место и лидера (препода)
prepods = set()
if parsed_leader is not None: prepods.add(parsed_leader.strip())
locations = set()
if parsed_location is not None: locations.add(parsed_location.strip().replace(" ", ""))
for x in list(parsed_uncotigorized):
if aigenerated.is_surname_string(x):
prepods.add(x.strip())
if aigenerated.is_room_number(x):
locations.add(x.strip().replace(" ", "") if x is not None else None)
# попытка починить пустую дисциплину
if parsed_discipline_name is None:
l = sorted(utils.remove_from_list(list(parsed_uncotigorized), list(locations | prepods | set([parsed_location, parsed_leader]))))
parsed_discipline_name = " ".join(l)
# чистим сеты от мусора
utils.discards_list(prepods, nones=True, emptystrings=True)
utils.discards_list(locations, nones=True, emptystrings=True)
utils.discards_list(parsed_uncotigorized, nones=True, emptystrings=True)
# если не пустой предмет то записываем его
if not is_empty_lesson:
slots = group['slots']
w = weekday_key_name
if w not in slots.keys():
slots[w] = {}
dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True)
if len(dirty_line) > 0:
if pair_num_mr is not None:
pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip()
if len(pair_num_to_add) == 0:
pair_num_to_add = "???"
pairs.add(pair_num_to_add)
today = slots[w]
today[pair] = {
"excel_pos": str(pos),
"discipline_name": parsed_discipline_name.strip(),
"locations": sorted(locations),
"leads": sorted(prepods),
"is_solid": is_solid,
"time_coeff": pairs,
"is_flow": is_wide_maybe_potokoviy,
"lefttopmerged": {
"width": merged.width(),
"height": merged.height(),
"excel_range": utils.merged_humanize(merged.as_numbers())
},
"raw": sorted(parsed_uncotigorized),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if fuck_empty_pair_in_excel:
today[pair]['pair_num_empty'] = {
"prev": previous_dump,
"restored": pair != "",
"pair": pair
}
if wtf_tomanypairs:
today[pair]['to_many_parsing_time_coeff'] = True
for cell in dirty_line:
if not cell.is_time:
all_raw.add(str(cell.value))
else:
dt: time = cell.value
times.append(str(dt))
# INCREMENT на next и конец цикла.
row += next
def clean_state():
nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord
is_widely = False
override_col_range = None
event_no += 1
all_raw = set()
pairs = set()
first_coord = None
times = []
if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0):
if not (len(all_raw) == 0):
# this code last for current state event
pprint(f"{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}")
slots = group['slots']
w = weekday_key_name
if w not in slots.keys():
slots[w] = {}
pair_name = "????"
try:
pair_name = sorted(pairs)[0]
except: pass
today = slots[w]
obj = {
"object": "event",
"pairs": sorted(pairs),
"is_flow": is_widely,
"excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)),
"raw": sorted(all_raw),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if len(times) > 0:
obj['times'] = times
if pair_pos_col is None:
slots[w] = obj
else:
today[pair_name] = obj
# here may be a empty all_raw
clean_state()
first_coord = None
if row_c2 >= weekday_mr.high.row:
clean_state()
pprint("Last for weekday")
row_c2 += 1
row_c1 += weekday_mr.height()
weekcycles += 1
def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx):
def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx):
"""Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)"""
groups = {}
i = 0
@@ -296,21 +390,26 @@ def parse_groups(reader: "ExcelSheetReader", head, monday: Coord, head_rx):
x = head[i]
pprint(f"while i={i} head[i]={x}")
merged = reader.get_merged_coord(Coord(head_rx, i))
if i > monday.col + 1:
if merged is None or x == "":
break
if merged.width() != 4:
pprint(f"WARNING: group header witdh !=4 (found: {merged.width()}); blocks !=4 not supported by parser.")
if i >= col_start:
if merged is None or x == "" or x is None:
break
name = utils.unspace(x)
groups[name] = {
"name": name,
"position": [head_rx, i],
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
skip = False
if "-" not in name:
for x in bad_group_names:
if x in name.lower():
skip = True
pprint(f"Skip groupname {name} because not dash in name and in blacklist")
if not skip:
groups[name] = {
"name": name,
"position": [head_rx, i],
"width": merged.width(),
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
if merged is None:
i += 1