Files
VSTU_Schedule_Parser/parser.py
FazziCLAY ef430e6232
All checks were successful
Build and Run VSTU Schedule Parser / build_and_run (push) Successful in 19s
fix: .strip() weekday_key_name
2026-04-05 22:23:04 +03:00

428 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright Stanislav Mironov
PAIR_NUMS = [
"1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16"
]
WEEKDAYS_STARTSWITH = [
"понед",
"вторник",
"среда",
"четверг",
"пятница",
"суббота",
"воскр"
]
BAD_GROUP_NAMES = [
"янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг"
]
from datetime import time
import json
from coord import Coord, Merged
from translations import ExcelSheetReader
import utils
from collections import defaultdict
LOGGING = False
def pprint(*args, **kwargs):
if LOGGING:
print(*args, **kwargs)
def is_weeknum(text):
for wd in WEEKDAYS_STARTSWITH:
if text.strip().replace(" ", "").lower().startswith(wd):
return True
return False
def is_pair(text):
for p in PAIR_NUMS:
if text.strip().replace(" ", "").lower().startswith(p):
return True
return False
class Parser:
def __init__(self, reader: ExcelSheetReader):
self.reader = reader
self.groups = {} # Группы которые удалось распарсить
self.features = set() # фичи данной страницы
self.week_keys_metadata = {} # календарик
self.schedule_range_row = None # [min, max] диапазон col включительно где расписание
self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..)
self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata)
self.parser_error = None # ошибка парсера перед выходом
self.parser_warnings = [] # предупреждения парсера
pprint("Parser created for '{0}'".format(reader.info()))
def parse(self):
# Характерные признаки разных сеток
no_pair_numeration = False
col_distance_pair_weekday = None
weekday_firstly_calendar = False
first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True)
if first_weekday is None:
self.features.add("no_weekdays")
print(" -- Failed parse! -- ")
print("дни недели не найдены!")
self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет."
self.parse_raw_no_schedule()
return
pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True)
if pair_num_any is None:
no_pair_numeration = True
self.features.add("no_pair_numeration")
self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}")
else:
self.features.add("pair_numeration")
col_distance_pair_weekday = pair_num_any.col - first_weekday.col
head_rx = first_weekday.row - 1 # выше первого понидельника
group_col_start = first_weekday.col + 2
if col_distance_pair_weekday is not None:
if col_distance_pair_weekday > 1:
weekday_firstly_calendar = True
self.features.add("weekdays_before_calendar")
group_col_start = pair_num_any.col + 1
if head_rx < 0:
raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.")
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head={head}")
head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()])
print(head_joined)
if (len(head_joined) == 0) or "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined:
head_rx -= 1
self.raw_no_schedule.append(head_joined)
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
pprint(f"head (upper)={head}")
self.features.add("post_groups_info_row")
self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups
pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}')
pprint("\n\n\n")
for group in self.groups.values():
pprint("\nSTART OF PROCESS GROUP\n")
self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None)
pprint("\nEND OF PROCESS GROUP\n")
# week metadatas parse
S = 9999999
group_min_col = S
group_min_row = S
for x in self.groups.values():
p = x['position']
group_min_row = min(p[0], group_min_row)
group_min_col = min(p[1], group_min_col)
if group_min_row != S and group_min_col != S:
pprint("Process weekmetadatas!")
self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col))
# parse no-schedule raws (согласовано, и т.д.)
self.parse_raw_no_schedule()
def parse_raw_no_schedule(self):
"""Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule"""
if self.schedule_range_row is None:
self.schedule_range_row = [999999999, 999999999] # прекрасное далёко
row = 0
while row < self.reader.get_row_count():
if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]:
row = self.schedule_range_row[1] + 1
row_values = self.reader.get_row_values(row)
row_values = [v for v in row_values if isinstance(v, str) and v.strip()]
if len(row_values) > 0:
self.raw_no_schedule.append(row_values)
row += 1
def process_weekmetadatas(self, first_group: "Coord"):
"""Обработать календарик"""
for x in self.weeknums.keys():
pprint(x)
set_of_merged: set = self.weeknums[x]
l = len(set_of_merged)
if l != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<")
continue
weekday_merged: Merged = set_of_merged.pop()
if weekday_merged.width() != 1:
self.week_keys_metadata[x] = {
"error": True,
"error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})"
}
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})")
continue
month_row = first_group.row
curr_col = first_group.col - 1
while curr_col >= 0:
month_pos = Coord(month_row, curr_col)
month_cell = month_pos.cell(self.reader)
if month_cell.is_empty():
pprint("month cell is empty")
curr_col -= 1
continue
month_name = str(month_cell.value).strip()
pprint(month_cell)
all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height())
pprint(f"all_nums_of_month={all_nums_of_month}")
if (x not in self.week_keys_metadata.keys()):
self.week_keys_metadata[x] = {}
if (month_name not in self.week_keys_metadata[x].keys()):
self.week_keys_metadata[x][month_name] = []
for x2 in all_nums_of_month:
if x2.lower() == month_name.lower():
pprint(f"Skip {x2} month number because it == month_name")
continue
m = self.week_keys_metadata[x][month_name]
if x2 not in m:
try:
m.append(str(x2).replace(".0", ""))
except:
m.append(x2)
curr_col -= 1
def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"):
self.weeknums[week_key_name].add(merged)
def row_with_schedule_notify(self, row_coord):
"""Вызывается каждый раз когда в переданной row обранужено расписание"""
if self.schedule_range_row is None:
self.schedule_range_row = [row_coord, row_coord]
if self.schedule_range_row[1] < row_coord:
self.schedule_range_row[1] = row_coord
if self.schedule_range_row[0] > row_coord:
self.schedule_range_row[0] = row_coord
def process_group(self, group: dict, first_weekday: Coord, pair_pos_col):
"""
Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups)
group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'}
"""
pprint(f"process_group group={group}")
group_name = group['name']
pprint(F"Имя группы: {group_name}")
row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
self.row_with_schedule_notify(group['position'][0])
group_header_pos = Coord(group['position'][0], group['position'][1])
width = group['width']
weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии.
weekcycles = 0
while row_c1 < self.reader.get_row_count():
pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low)
self.row_with_schedule_notify(pos_c1.row)
if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name):
pprint("Ended with grpup name; stop moving down, break")
break
weekday_pos = pos_c1.replace(col=first_weekday.col)
weekday_cell = weekday_pos.cell(self.reader)
weekday_mr = self.reader.get_merged_coord(weekday_pos)
weekday = weekday_cell.value
if not is_weeknum(weekday):
row_c1 += 1
pprint("Not weeknum!")
if weekcycles > 0:
if (weeknum != 2):
pprint("Weeknum now 2")
weekday = 0
weeknum = 2
continue
pprint(weekday)
weekday_key_name = (str(weekday) + ("_1" if weeknum == 1 else "_2")).strip()
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
# state
event_no = 1
is_widely = False
override_col_range = None
all_raw = set()
pairs = set()
times = []
first_coord = None
row_c2 = row_c1
while row_c2 <= weekday_mr.high.row:
pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low)
cell_c2 = pos_c2.cell(self.reader)
mr_c2 = self.reader.get_merged_coord(pos_c2)
if first_coord is None:
first_coord = pos_c2.row
pair_num = None
pair_num_mr = None
if pair_pos_col is not None:
pair_num = pos_c2.replace(col=pair_pos_col)
pair_num_mr = self.reader.get_merged_coord(pair_num)
if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1):
is_widely = True
override_col_range = (mr_c2.low.col, mr_c2.high.col)
col_low = group_header_pos.col
col_high = group_header_pos.col + width - 1
if override_col_range is not None:
col_low = min(col_low, override_col_range[0])
col_high = max(col_high, override_col_range[1])
dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True)
if len(dirty_line) > 0:
if pair_num_mr is not None:
pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip()
if len(pair_num_to_add) == 0:
pair_num_to_add = "???"
pprint("Составители эксельки? Вы почему не указали номер пары ёклмн")
pairs.add(pair_num_to_add)
for cell in dirty_line:
if not cell.is_time:
all_raw.add(str(cell.value))
else:
dt: time = cell.value
times.append(str(dt))
def clean_state():
nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord
is_widely = False
override_col_range = None
event_no += 1
all_raw = set()
pairs = set()
first_coord = None
times = []
if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0):
if not (len(all_raw) == 0):
# this code last for current state event
pprint(f"{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}")
slots = group['slots']
w = weekday_key_name
if w not in slots.keys():
slots[w] = {}
pair_name = "????"
try:
pair_name = sorted(pairs)[0]
except: pass
obj = {
"object": "event",
"pairs": sorted(pairs),
"is_flow": is_widely,
"excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)),
"raw": sorted(all_raw),
"weekday": utils.weekday_to_num(weekday),
"weeknum": weeknum
}
if len(times) > 0:
obj['times'] = times
def smart_insert(first_dict, key, to_insert):
if key not in first_dict.keys():
first_dict[key] = {}
if isinstance(first_dict[key], dict):
if len(first_dict[key].keys()) == 0:
first_dict[key] = to_insert
else:
p = first_dict[key]
first_dict[key] = [p, to_insert]
elif isinstance(first_dict[key], list):
first_dict[key].append(to_insert)
else:
self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)")
if pair_pos_col is None:
smart_insert(slots, w, obj)
else:
smart_insert(slots[w], pair_name, obj)
# here may be a empty all_raw
clean_state()
first_coord = None
if row_c2 >= weekday_mr.high.row:
clean_state()
pprint("Last for weekday")
row_c2 += 1
row_c1 += weekday_mr.height()
weekcycles += 1
def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx):
"""Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)"""
groups = {}
i = 0
while i < len(head):
x = head[i]
pprint(f"while i={i} head[i]={x}")
merged = reader.get_merged_coord(Coord(head_rx, i))
if i >= col_start:
if merged is None or x == "" or x is None:
break
name = utils.unspace(x)
skip = False
if "-" not in name:
for x in BAD_GROUP_NAMES:
if x in name.lower():
skip = True
pprint(f"Skip groupname {name} because not dash in name and in blacklist")
if not skip:
groups[name.lower()] = {
"name": name,
"position": [head_rx, i],
"width": merged.width(),
"position_human": utils.merged_humanize(merged.as_numbers()),
"slots": {}
}
if merged is None:
i += 1
else:
i += merged.width()
return groups