# Copyright Stanislav Mironov PAIR_NUMS = [ "1-2", "3-4", "5-6", "7-8", "9-10", "11-12", "13-14", "15-16" ] WEEKDAYS_STARTSWITH = [ "понед", "вторник", "среда", "четверг", "пятница", "суббота", "воскр" ] BAD_GROUP_NAMES = [ "янв", "февр", "март", "апр", "май", "сент", "окт", "ноя", "дек", "июнь", "июль", "авг" ] from datetime import time import json from coord import Coord, Merged from translations import ExcelSheetReader import utils from collections import defaultdict LOGGING = False def pprint(*args, **kwargs): if LOGGING: print(*args, **kwargs) def is_weeknum(text): for wd in WEEKDAYS_STARTSWITH: if text.strip().replace(" ", "").lower().startswith(wd): return True return False def is_pair(text): for p in PAIR_NUMS: if text.strip().replace(" ", "").lower().startswith(p): return True return False class Parser: def __init__(self, reader: ExcelSheetReader): self.reader = reader self.groups = {} # Группы которые удалось распарсить self.features = set() # фичи данной страницы self.week_keys_metadata = {} # календарик self.schedule_range_row = None # [min, max] диапазон col включительно где расписание self.raw_no_schedule = [] # всё что не schedule_range_row отправляется сюда ('СОГЛАСОВАНО:', etc..) self.weeknums: defaultdict = defaultdict(set) # no support json! (для week_keys_metadata) self.parser_error = None # ошибка парсера перед выходом self.parser_warnings = [] # предупреждения парсера pprint("Parser created for '{0}'".format(reader.info())) def parse(self): # Характерные признаки разных сеток no_pair_numeration = False col_distance_pair_weekday = None weekday_firstly_calendar = False first_weekday = self.reader.find_any(WEEKDAYS_STARTSWITH, startswith=True, nospace=True) if first_weekday is None: self.features.add("no_weekdays") print(" -- Failed parse! -- ") print("дни недели не найдены!") self.parser_error = f"{WEEKDAYS_STARTSWITH} ни один найден в таблице. Дня недели нет." self.parse_raw_no_schedule() return pair_num_any = self.reader.find_any(PAIR_NUMS, nospace=True) if pair_num_any is None: no_pair_numeration = True self.features.add("no_pair_numeration") self.parser_warnings.append(f"Нет нумерации академических часов {PAIR_NUMS}") else: self.features.add("pair_numeration") col_distance_pair_weekday = pair_num_any.col - first_weekday.col head_rx = first_weekday.row - 1 # выше первого понидельника group_col_start = first_weekday.col + 2 if col_distance_pair_weekday is not None: if col_distance_pair_weekday > 1: weekday_firstly_calendar = True self.features.add("weekdays_before_calendar") group_col_start = pair_num_any.col + 1 if head_rx < 0: raise Exception("head_rx < 0: Программа пыталась найти день недели, но по всей видимости не нашла.") head = self.reader.get_row_values(head_rx) # get all ROW (months, groups) pprint(f"head={head}") head_joined = " ||| ".join([v for v in head if isinstance(v, str) and v.strip()]) print(head_joined) if "1 неделя" in head_joined or "1 НЕДЕЛЯ" in head_joined or "2 неделя" in head_joined or "2 НЕДЕЛЯ" in head_joined or "ИЗМЕНЕНИЯ" in head_joined or "изменения" in head_joined or "vtf-vstu.ru" in head_joined: head_rx -= 1 self.raw_no_schedule.append(head_joined) head = self.reader.get_row_values(head_rx) # get all ROW (months, groups) pprint(f"head (upper)={head}") self.features.add("post_groups_info_row") self.groups = parse_groups(self.reader, head, group_col_start, head_rx) # parse groups to self.groups pprint(f'self.groups={json.dumps(self.groups, indent=2, ensure_ascii=False)}') pprint("\n\n\n") for group in self.groups.values(): pprint("\nSTART OF PROCESS GROUP\n") self.process_group(group, first_weekday, pair_num_any.col if pair_num_any else None) pprint("\nEND OF PROCESS GROUP\n") # week metadatas parse S = 9999999 group_min_col = S group_min_row = S for x in self.groups.values(): p = x['position'] group_min_row = min(p[0], group_min_row) group_min_col = min(p[1], group_min_col) if group_min_row != S and group_min_col != S: pprint("Process weekmetadatas!") self.process_weekmetadatas(Coord(row=group_min_row, col=group_min_col)) # parse no-schedule raws (согласовано, и т.д.) self.parse_raw_no_schedule() def parse_raw_no_schedule(self): """Распарсить всё за пределами self.schedule_range_row в self.raw_no_schedule""" if self.schedule_range_row is None: self.schedule_range_row = [999999999, 999999999] # прекрасное далёко row = 0 while row < self.reader.get_row_count(): if row >= self.schedule_range_row[0] and row <= self.schedule_range_row[1]: row = self.schedule_range_row[1] + 1 row_values = self.reader.get_row_values(row) row_values = [v for v in row_values if isinstance(v, str) and v.strip()] if len(row_values) > 0: self.raw_no_schedule.append(row_values) row += 1 def process_weekmetadatas(self, first_group: "Coord"): """Обработать календарик""" for x in self.weeknums.keys(): pprint(x) set_of_merged: set = self.weeknums[x] l = len(set_of_merged) if l != 1: self.week_keys_metadata[x] = { "error": True, "error_text": f"Parse error: count of found '{x}' (need view like WEEKDAY_1; weekday - in r; 1 - weeknum[1, 2]) is {l}; required only one!" } self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because count of uniqie merged cells not one (actual: {l}). :<") continue weekday_merged: Merged = set_of_merged.pop() if weekday_merged.width() != 1: self.week_keys_metadata[x] = { "error": True, "error_text": f"Weekday excel block width != 1 (actual {weekday_merged.width()})" } self.parser_warnings.append(f"Processing weekmetadata for '{x}' failed because weekday excel block width != 1 (actual {weekday_merged.width()})") continue month_row = first_group.row curr_col = first_group.col - 1 while curr_col >= 0: month_pos = Coord(month_row, curr_col) month_cell = month_pos.cell(self.reader) if month_cell.is_empty(): pprint("month cell is empty") curr_col -= 1 continue month_name = str(month_cell.value).strip() pprint(month_cell) all_nums_of_month = utils.parse_all_dirt(self.reader, month_pos.replace(row=weekday_merged.low.row), right=1, down=weekday_merged.height()) pprint(f"all_nums_of_month={all_nums_of_month}") if (x not in self.week_keys_metadata.keys()): self.week_keys_metadata[x] = {} if (month_name not in self.week_keys_metadata[x].keys()): self.week_keys_metadata[x][month_name] = [] for x2 in all_nums_of_month: if x2.lower() == month_name.lower(): pprint(f"Skip {x2} month number because it == month_name") continue m = self.week_keys_metadata[x][month_name] if x2 not in m: try: m.append(str(x2).replace(".0", "")) except: m.append(x2) curr_col -= 1 def push_weekday_meta(self, weekday: str, weeknum: int, week_key_name: str, merged: "Merged"): self.weeknums[week_key_name].add(merged) def row_with_schedule_notify(self, row_coord): """Вызывается каждый раз когда в переданной row обранужено расписание""" if self.schedule_range_row is None: self.schedule_range_row = [row_coord, row_coord] if self.schedule_range_row[1] < row_coord: self.schedule_range_row[1] = row_coord if self.schedule_range_row[0] > row_coord: self.schedule_range_row[0] = row_coord def process_group(self, group: dict, first_weekday: Coord, pair_pos_col): """ Обработать группы, выполняется для каждой группы, после того как они распарены (parse_groups) group = {'name': 'ИВТ-260', 'position': [5, 6], 'position_human': 'G6:J6'} """ pprint(f"process_group group={group}") group_name = group['name'] pprint(F"Имя группы: {group_name}") row_c1 = group['position'][0] + 1 # counter for while, +1 for shift down; также номер строки в таблице (вроде с нуля) self.row_with_schedule_notify(group['position'][0]) group_header_pos = Coord(group['position'][0], group['position'][1]) width = group['width'] weeknum = 1 # номер недели, щёлкнет +1 при каком-то условии. weekcycles = 0 while row_c1 < self.reader.get_row_count(): pos_c1 = Coord(row_c1, group['position'][1]) # текущая позиция, верхний левый угол (=low) self.row_with_schedule_notify(pos_c1.row) if pos_c1.cell(self.reader).is_nospace_nocase_same(group_name): pprint("Ended with grpup name; stop moving down, break") break weekday_pos = pos_c1.replace(col=first_weekday.col) weekday_cell = weekday_pos.cell(self.reader) weekday_mr = self.reader.get_merged_coord(weekday_pos) weekday = weekday_cell.value if not is_weeknum(weekday): row_c1 += 1 pprint("Not weeknum!") if weekcycles > 0: if (weeknum != 2): pprint("Weeknum now 2") weekday = 0 weeknum = 2 continue pprint(weekday) weekday_key_name = weekday + ("_1" if weeknum == 1 else "_2") self.push_weekday_meta(weekday, weeknum, weekday_key_name, weekday_mr) # state event_no = 1 is_widely = False override_col_range = None all_raw = set() pairs = set() times = [] first_coord = None row_c2 = row_c1 while row_c2 <= weekday_mr.high.row: pos_c2 = Coord(row_c2, group['position'][1]) # текущая позиция (внутри группы, внутри дня недели), верхний левый угол (=low) cell_c2 = pos_c2.cell(self.reader) mr_c2 = self.reader.get_merged_coord(pos_c2) if first_coord is None: first_coord = pos_c2.row pair_num = None pair_num_mr = None if pair_pos_col is not None: pair_num = pos_c2.replace(col=pair_pos_col) pair_num_mr = self.reader.get_merged_coord(pair_num) if (not is_widely) and (mr_c2.low.col < group_header_pos.col or mr_c2.high.col > group_header_pos.col + width - 1): is_widely = True override_col_range = (mr_c2.low.col, mr_c2.high.col) col_low = group_header_pos.col col_high = group_header_pos.col + width - 1 if override_col_range is not None: col_low = min(col_low, override_col_range[0]) col_high = max(col_high, override_col_range[1]) dirty_line = utils.parse_all_dirt(self.reader, Coord(row_c2, col_low), (col_high - col_low + 1), 1, with_cells=True) if len(dirty_line) > 0: if pair_num_mr is not None: pair_num_to_add = pair_num_mr.cell(self.reader).value.replace(" ", "").strip() if len(pair_num_to_add) == 0: pair_num_to_add = "???" pprint("Составители эксельки? Вы почему не указали номер пары ёклмн") pairs.add(pair_num_to_add) for cell in dirty_line: if not cell.is_time: all_raw.add(str(cell.value)) else: dt: time = cell.value times.append(str(dt)) def clean_state(): nonlocal is_widely, override_col_range, event_no, all_raw, pairs, times, first_coord is_widely = False override_col_range = None event_no += 1 all_raw = set() pairs = set() first_coord = None times = [] if not utils.has_no_bottom_border(self.reader, pos_c2) and not(mr_c2.high.row - row_c2 > 0): if not (len(all_raw) == 0): # this code last for current state event pprint(f"№{event_no} {pairs}: {'[wide] ' if is_widely else ''} raw={all_raw}") slots = group['slots'] w = weekday_key_name if w not in slots.keys(): slots[w] = {} pair_name = "????" try: pair_name = sorted(pairs)[0] except: pass obj = { "object": "event", "pairs": sorted(pairs), "is_flow": is_widely, "excel_range": utils.merged_humanize((first_coord, col_low, row_c2, col_high)), "raw": sorted(all_raw), "weekday": utils.weekday_to_num(weekday), "weeknum": weeknum } if len(times) > 0: obj['times'] = times def smart_insert(first_dict, key, to_insert): if key not in first_dict.keys(): first_dict[key] = {} if isinstance(first_dict[key], dict): if len(first_dict[key].keys()) == 0: first_dict[key] = to_insert else: p = first_dict[key] first_dict[key] = [p, to_insert] elif isinstance(first_dict[key], list): first_dict[key].append(to_insert) else: self.parser_warnings.append("Wtf? first_dict[key] not is dict and not is list??? (internal error)") if pair_pos_col is None: smart_insert(slots, w, obj) else: smart_insert(slots[w], pair_name, obj) # here may be a empty all_raw clean_state() first_coord = None if row_c2 >= weekday_mr.high.row: clean_state() pprint("Last for weekday") row_c2 += 1 row_c1 += weekday_mr.height() weekcycles += 1 def parse_groups(reader: "ExcelSheetReader", head, col_start, head_rx): """Распознать список групп и метаданные к ним, по сути получить список названий группы и координат её верхнего header-а (AQ6:AT6)""" groups = {} i = 0 while i < len(head): x = head[i] pprint(f"while i={i} head[i]={x}") merged = reader.get_merged_coord(Coord(head_rx, i)) if i >= col_start: if merged is None or x == "" or x is None: break name = utils.unspace(x) skip = False if "-" not in name: for x in BAD_GROUP_NAMES: if x in name.lower(): skip = True pprint(f"Skip groupname {name} because not dash in name and in blacklist") if not skip: groups[name.lower()] = { "name": name, "position": [head_rx, i], "width": merged.width(), "position_human": utils.merged_humanize(merged.as_numbers()), "slots": {} } if merged is None: i += 1 else: i += merged.width() return groups