All checks were successful
Build and Run VSTU Schedule Parser / build_and_run (push) Successful in 19s
428 lines
19 KiB
Python
428 lines
19 KiB
Python
# Copyright Stanislav Mironov
|
||
|
||
PAIR_NUMS = [
|
||
"1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16"
|
||
]
|
||
WEEKDAYS_STARTSWITH = [
|
||
"понед",
|
||
"вторник",
|
||
"среда",
|
||
"четверг",
|
||
"пятница",
|
||
"суббота",
|
||
"воскр"
|
||
]
|
||
|
||
BAD_GROUP_NAMES = [
|
||
"янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг"
|
||
]
|
||
|
||
from datetime import time
|
||
import json
|
||
from coord import Coord, Merged
|
||
from translations import ExcelSheetReader
|
||
import utils
|
||
from collections import defaultdict
|
||
|
||
LOGGING = False
|
||
|
||
def pprint(*args, **kwargs):
|
||
if LOGGING:
|
||
print(*args, **kwargs)
|
||
|
||
def is_weeknum(text):
|
||
for wd in WEEKDAYS_STARTSWITH:
|
||
if text.strip().replace(" ", "").lower().startswith(wd):
|
||
return True
|
||
return False
|
||
|
||
def is_pair(text):
|
||
for p in PAIR_NUMS:
|
||
if text.strip().replace(" ", "").lower().startswith(p):
|
||
return True
|
||
return False
|
||
|
||
class Parser:
|
||
def __init__(self, reader: ExcelSheetReader):
|
||
self.reader = reader
|
||
self.groups = {} # Группы которые удалось распарсить
|
||
self.features = set() # фичи данной страницы
|
||
self.week_keys_metadata = {} # календарик
|
||
self.schedule_range_row = None # [min, max] диапазон col включительно где расписание
|
||
self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..)
|
||
|
||
self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata)
|
||
self.parser_error = None # ошибка парсера перед выходом
|
||
self.parser_warnings = [] # предупреждения парсера
|
||
pprint("Parser created for '{0}'".format(reader.info()))
|
||
|
||
def parse(self):
|
||
# Характерные признаки разных сеток
|
||
no_pair_numeration = False
|
||
col_distance_pair_weekday = None
|
||
weekday_firstly_calendar = False
|
||
|
||
first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True)
|
||
|
||
if first_weekday is None:
|
||
self.features.add("no_weekdays")
|
||
print(" -- Failed parse! -- ")
|
||
print("дни недели не найдены!")
|
||
self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет."
|
||
self.parse_raw_no_schedule()
|
||
return
|
||
|
||
pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True)
|
||
if pair_num_any is None:
|
||
no_pair_numeration = True
|
||
self.features.add("no_pair_numeration")
|
||
self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}")
|
||
|
||
else:
|
||
self.features.add("pair_numeration")
|
||
col_distance_pair_weekday = pair_num_any.col - first_weekday.col
|
||
|
||
head_rx = first_weekday.row - 1 # выше первого понидельника
|
||
group_col_start = first_weekday.col + 2
|
||
if col_distance_pair_weekday is not None:
|
||
if col_distance_pair_weekday > 1:
|
||
weekday_firstly_calendar = True
|
||
self.features.add("weekdays_before_calendar")
|
||
group_col_start = pair_num_any.col + 1
|
||
|
||
if head_rx < 0:
|
||
raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.")
|
||
|
||
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
|
||
pprint(f"head={head}")
|
||
|
||
head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()])
|
||
print(head_joined)
|
||
if (len(head_joined) == 0) or "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined:
|
||
head_rx -= 1
|
||
self.raw_no_schedule.append(head_joined)
|
||
head = self.reader.get_row_values(head_rx) # get all ROW (months, groups)
|
||
pprint(f"head (upper)={head}")
|
||
self.features.add("post_groups_info_row")
|
||
|
||
self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups
|
||
pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}')
|
||
|
||
pprint("\n\n\n")
|
||
|
||
for group in self.groups.values():
|
||
pprint("\nSTART OF PROCESS GROUP\n")
|
||
self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None)
|
||
pprint("\nEND OF PROCESS GROUP\n")
|
||
|
||
# week metadatas parse
|
||
S = 9999999
|
||
group_min_col = S
|
||
group_min_row = S
|
||
|
||
for x in self.groups.values():
|
||
p = x['position']
|
||
group_min_row = min(p[0], group_min_row)
|
||
group_min_col = min(p[1], group_min_col)
|
||
|
||
if group_min_row != S and group_min_col != S:
|
||
pprint("Process weekmetadatas!")
|
||
self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col))
|
||
|
||
# parse no-schedule raws (согласовано, и т.д.)
|
||
self.parse_raw_no_schedule()
|
||
|
||
|
||
def parse_raw_no_schedule(self):
|
||
"""Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule"""
|
||
if self.schedule_range_row is None:
|
||
self.schedule_range_row = [999999999, 999999999] # прекрасное далёко
|
||
|
||
row = 0
|
||
while row < self.reader.get_row_count():
|
||
if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]:
|
||
row = self.schedule_range_row[1] + 1
|
||
|
||
row_values = self.reader.get_row_values(row)
|
||
row_values = [v for v in row_values if isinstance(v, str) and v.strip()]
|
||
if len(row_values) > 0:
|
||
self.raw_no_schedule.append(row_values)
|
||
|
||
row += 1
|
||
|
||
def process_weekmetadatas(self, first_group: "Coord"):
|
||
"""Обработать календарик"""
|
||
for x in self.weeknums.keys():
|
||
pprint(x)
|
||
set_of_merged: set = self.weeknums[x]
|
||
l = len(set_of_merged)
|
||
if l != 1:
|
||
self.week_keys_metadata[x] = {
|
||
"error": True,
|
||
"error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!"
|
||
}
|
||
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<")
|
||
continue
|
||
|
||
weekday_merged: Merged = set_of_merged.pop()
|
||
if weekday_merged.width() != 1:
|
||
self.week_keys_metadata[x] = {
|
||
"error": True,
|
||
"error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})"
|
||
}
|
||
self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})")
|
||
continue
|
||
|
||
month_row = first_group.row
|
||
curr_col = first_group.col - 1
|
||
while curr_col >= 0:
|
||
month_pos = Coord(month_row, curr_col)
|
||
month_cell = month_pos.cell(self.reader)
|
||
if month_cell.is_empty():
|
||
pprint("month cell is empty")
|
||
curr_col -= 1
|
||
continue
|
||
|
||
month_name = str(month_cell.value).strip()
|
||
pprint(month_cell)
|
||
all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height())
|
||
pprint(f"all_nums_of_month={all_nums_of_month}")
|
||
if (x not in self.week_keys_metadata.keys()):
|
||
self.week_keys_metadata[x] = {}
|
||
|
||
if (month_name not in self.week_keys_metadata[x].keys()):
|
||
self.week_keys_metadata[x][month_name] = []
|
||
|
||
for x2 in all_nums_of_month:
|
||
if x2.lower() == month_name.lower():
|
||
pprint(f"Skip {x2} month number because it == month_name")
|
||
continue
|
||
|
||
m = self.week_keys_metadata[x][month_name]
|
||
if x2 not in m:
|
||
try:
|
||
m.append(str(x2).replace(".0", ""))
|
||
except:
|
||
m.append(x2)
|
||
|
||
curr_col -= 1
|
||
|
||
|
||
def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"):
|
||
self.weeknums[week_key_name].add(merged)
|
||
|
||
def row_with_schedule_notify(self, row_coord):
|
||
"""Вызывается каждый раз когда в переданной row обранужено расписание"""
|
||
if self.schedule_range_row is None:
|
||
self.schedule_range_row = [row_coord, row_coord]
|
||
|
||
if self.schedule_range_row[1] < row_coord:
|
||
self.schedule_range_row[1] = row_coord
|
||
|
||
if self.schedule_range_row[0] > row_coord:
|
||
self.schedule_range_row[0] = row_coord
|
||
|
||
def process_group(self, group: dict, first_weekday: Coord, pair_pos_col):
|
||
"""
|
||
Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups)
|
||
group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'}
|
||
"""
|
||
pprint(f"process_group group={group}")
|
||
group_name = group['name']
|
||
pprint(F"Имя группы: {group_name}")
|
||
row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля)
|
||
self.row_with_schedule_notify(group['position'][0])
|
||
group_header_pos = Coord(group['position'][0], group['position'][1])
|
||
width = group['width']
|
||
weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии.
|
||
|
||
weekcycles = 0
|
||
while row_c1 < self.reader.get_row_count():
|
||
pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low)
|
||
self.row_with_schedule_notify(pos_c1.row)
|
||
|
||
if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name):
|
||
pprint("Ended with grpup name; stop moving down, break")
|
||
break
|
||
|
||
weekday_pos = pos_c1.replace(col=first_weekday.col)
|
||
weekday_cell = weekday_pos.cell(self.reader)
|
||
weekday_mr = self.reader.get_merged_coord(weekday_pos)
|
||
weekday = weekday_cell.value
|
||
|
||
if not is_weeknum(weekday):
|
||
row_c1 += 1
|
||
pprint("Not weeknum!")
|
||
if weekcycles > 0:
|
||
if (weeknum != 2):
|
||
pprint("Weeknum now 2")
|
||
weekday = 0
|
||
weeknum = 2
|
||
continue
|
||
|
||
pprint(weekday)
|
||
weekday_key_name = (str(weekday) + ("_1" if weeknum == 1 else "_2")).strip()
|
||
self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr)
|
||
|
||
# state
|
||
event_no = 1
|
||
is_widely = False
|
||
override_col_range = None
|
||
all_raw = set()
|
||
pairs = set()
|
||
times = []
|
||
first_coord = None
|
||
|
||
row_c2 = row_c1
|
||
while row_c2 <= weekday_mr.high.row:
|
||
pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low)
|
||
cell_c2 = pos_c2.cell(self.reader)
|
||
mr_c2 = self.reader.get_merged_coord(pos_c2)
|
||
|
||
if first_coord is None:
|
||
first_coord = pos_c2.row
|
||
|
||
pair_num = None
|
||
pair_num_mr = None
|
||
if pair_pos_col is not None:
|
||
pair_num = pos_c2.replace(col=pair_pos_col)
|
||
pair_num_mr = self.reader.get_merged_coord(pair_num)
|
||
|
||
if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1):
|
||
is_widely = True
|
||
override_col_range = (mr_c2.low.col, mr_c2.high.col)
|
||
|
||
col_low = group_header_pos.col
|
||
col_high = group_header_pos.col + width - 1
|
||
if override_col_range is not None:
|
||
col_low = min(col_low, override_col_range[0])
|
||
col_high = max(col_high, override_col_range[1])
|
||
|
||
dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True)
|
||
if len(dirty_line) > 0:
|
||
if pair_num_mr is not None:
|
||
pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip()
|
||
if len(pair_num_to_add) == 0:
|
||
pair_num_to_add = "???"
|
||
pprint("Составители эксельки? Вы почему не указали номер пары ёклмн")
|
||
pairs.add(pair_num_to_add)
|
||
|
||
for cell in dirty_line:
|
||
if not cell.is_time:
|
||
all_raw.add(str(cell.value))
|
||
else:
|
||
dt: time = cell.value
|
||
times.append(str(dt))
|
||
|
||
def clean_state():
|
||
nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord
|
||
is_widely = False
|
||
override_col_range = None
|
||
event_no += 1
|
||
all_raw = set()
|
||
pairs = set()
|
||
first_coord = None
|
||
times = []
|
||
|
||
|
||
if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0):
|
||
if not (len(all_raw) == 0):
|
||
# this code last for current state event
|
||
pprint(f"№{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}")
|
||
|
||
slots = group['slots']
|
||
w = weekday_key_name
|
||
if w not in slots.keys():
|
||
slots[w] = {}
|
||
|
||
pair_name = "????"
|
||
try:
|
||
pair_name = sorted(pairs)[0]
|
||
except: pass
|
||
|
||
obj = {
|
||
"object": "event",
|
||
"pairs": sorted(pairs),
|
||
"is_flow": is_widely,
|
||
"excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)),
|
||
"raw": sorted(all_raw),
|
||
"weekday": utils.weekday_to_num(weekday),
|
||
"weeknum": weeknum
|
||
}
|
||
if len(times) > 0:
|
||
obj['times'] = times
|
||
|
||
def smart_insert(first_dict, key, to_insert):
|
||
if key not in first_dict.keys():
|
||
first_dict[key] = {}
|
||
|
||
if isinstance(first_dict[key], dict):
|
||
if len(first_dict[key].keys()) == 0:
|
||
first_dict[key] = to_insert
|
||
else:
|
||
p = first_dict[key]
|
||
first_dict[key] = [p, to_insert]
|
||
|
||
elif isinstance(first_dict[key], list):
|
||
first_dict[key].append(to_insert)
|
||
|
||
else:
|
||
self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)")
|
||
|
||
if pair_pos_col is None:
|
||
smart_insert(slots, w, obj)
|
||
|
||
else:
|
||
smart_insert(slots[w], pair_name, obj)
|
||
|
||
# here may be a empty all_raw
|
||
clean_state()
|
||
first_coord = None
|
||
|
||
|
||
if row_c2 >= weekday_mr.high.row:
|
||
clean_state()
|
||
pprint("Last for weekday")
|
||
|
||
row_c2 += 1
|
||
|
||
row_c1 += weekday_mr.height()
|
||
weekcycles += 1
|
||
|
||
def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx):
|
||
"""Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)"""
|
||
groups = {}
|
||
i = 0
|
||
while i < len(head):
|
||
x = head[i]
|
||
pprint(f"while i={i} head[i]={x}")
|
||
merged = reader.get_merged_coord(Coord(head_rx, i))
|
||
if i >= col_start:
|
||
if merged is None or x == "" or x is None:
|
||
break
|
||
|
||
name = utils.unspace(x)
|
||
skip = False
|
||
if "-" not in name:
|
||
for x in BAD_GROUP_NAMES:
|
||
if x in name.lower():
|
||
skip = True
|
||
pprint(f"Skip groupname {name} because not dash in name and in blacklist")
|
||
|
||
if not skip:
|
||
groups[name.lower()] = {
|
||
"name": name,
|
||
"position": [head_rx, i],
|
||
"width": merged.width(),
|
||
"position_human": utils.merged_humanize(merged.as_numbers()),
|
||
"slots": {}
|
||
}
|
||
|
||
if merged is None:
|
||
i += 1
|
||
else:
|
||
i += merged.width()
|
||
|
||
return groups
|
||
|