From b2534a555cc1a2a880b50b5c86960c132d6caa24 Mon Sep 17 00:00:00 2001 From: Franziska Kunsmann Date: Thu, 11 Apr 2024 19:35:14 +0200 Subject: [PATCH] voctocore/lib/config: rework code to use schedule.json instead of xml --- voctocore/lib/config.py | 208 ++++++++++++++++++++++++++-------------- 1 file changed, 134 insertions(+), 74 deletions(-) diff --git a/voctocore/lib/config.py b/voctocore/lib/config.py index e5a65b66..a954505d 100644 --- a/voctocore/lib/config.py +++ b/voctocore/lib/config.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 -import os.path +import json import logging +import os +import os.path +import re from configparser import DuplicateSectionError +from datetime import date, datetime, timedelta +from zoneinfo import ZoneInfo + from lib.args import Args + from vocto.config import VocConfigParser -import xml.etree.ElementTree as ET -from datetime import date, datetime, timedelta -import re -import os __all__ = ['Config'] @@ -24,11 +27,11 @@ def scanduration(str): class VoctocoreConfigParser(VocConfigParser): - def __init__(self): super().__init__() self.events = [] self.event_now = None + self.event_tz = None self.events_update = None self.default_insert = None @@ -74,46 +77,103 @@ def getSchedule(self): return None def _getEvents(self): - # check if file has been changed before re-read - self.events = None - # parse XML and iterate all schedule/day elements - self.events = ET.parse( - self.getSchedule()).getroot() .findall('day/room/event') - self.log.info("read {n} events from file \'{f}\'".format( - n=len(self.events), f=self.getSchedule())) - # remember the update time - self.events_update = os.path.getmtime(self.getSchedule()) + schedule_path = self.getSchedule() + if not schedule_path: + return None + + try: + # return early if schedule has not been updated since we last + # read it + update_time = os.path.getmtime(schedule_path) + if self.events_update and update_time <= self.events_update: + return self.events + + # try to load events from json schedule + with open(schedule_path) as f: + schedule = json.load(f) + except (FileNotFoundError, json.decoder.JSONDecodeError) as e: + self.log.error('error while loading schedule from {}: {}'.format( + schedule_path, + repr(e), + )) + return None + # try parsing the schedule + try: + self.event_tz = ZoneInfo(schedule['schedule']['conference']['time_zone_name']) + events = [] + for day in schedule['schedule']['conference']['days']: + for talks_in_room in day['rooms'].values(): + events.extend(talks_in_room) + for talk in events: + if ( + 'date' not in talk + or 'duration' not in talk + or 'id' not in talk + or 'room' not in talk + or 'title' not in talk + ): + # the other code assumes these values are there, exit + # early if we're missing these. + self.logger.error('found malformed talk in {} - missing id/title/date/duration!'.format( + schedule_path, + )) + return None + # calculate end time for easier calculations later + start = scandatetime(talk['date']) + d_h, d_m = talk['duration'].split(':') + talk['__start_dt'] = start.replace(tzinfo=self.event_tz) + talk['__end_dt'] = (start + timedelta(hours=int(d_h), minutes=int(d_m))).replace(tzinfo=self.event_tz) + self.log.info("read {n} events from file '{f}'".format( + n=len(events), + f=schedule_path, + )) + # remember the update time. do not read it again, to avoid race + # conditions where the schedule got updated between determining + # the mtime and the time we finished parsing. + self.events = events + self.events_update = update_time + except Exception as e: + self.log.error('error while parsing schedule from {}: {}'.format( + schedule_path, + repr(e), + )) return self.events def _getEventNow(self): - # get currnet date and time - now = datetime.now() # check for option overlay/event if self.getScheduleEvent(): # find event by ID for event in self._getEvents(): - if event.get('id') == self.getScheduleEvent(): + if event['id'] == self.getScheduleEvent(): # remember current event self.event_now = event else: - # check if there is no event already running - if (not self.event_now) or now > scandatetime(self.event_now.find('date').text) + scanduration(self.event_now.find('duration').text): - # inistialize a past date - past = datetime(1999, 1, 1) - # remember nearest start time - nowest = past - # iterate events - for event in self._getEvents(): - # check for room - if event.find('room').text == self.getScheduleRoom() or not self.getScheduleRoom(): - # get start time - time = scandatetime(event.find('date').text) - # time nearer then nowest - if now >= time and time > nowest: - # remember new nearest time - nowest = time - # rememeber current event - self.event_now = event + NOW = datetime.now(self.event_tz) + # If no event is currently running, or the current event + # has already ended, scan the schedule for the next event + # that happens. + if ( + not self.event_now + or NOW >= self.event_now['__end_dt'] + ): + my_room = self.getScheduleRoom() + events = self._getEvents() + if not events: + return None + + for event in events: + # If we have a room set, but the event is not in + # this room, ignore it. + if my_room and event['room'] != my_room: + continue + + # This assumes there's no overlapping events + if event['__start_dt'] <= NOW < event['__end_dt']: + self.event_now = event + break + else: + # No event found in self.events + self.event_now = None return self.event_now def getOverlaysPath(self): @@ -125,18 +185,14 @@ def getOverlaysPath(self): def getOverlaysTitle(self): if self.getSchedule(): - try: - event = self._getEventNow() - if event: - at = scandatetime(event.find('date').text) - return (at.strftime("%Y-%m-%d %H:%M"), - (at + scanduration(event.find('duration').text) - ).strftime("%Y-%m-%d %H:%M"), - event.get('id'), - event.find('title').text) - except FileNotFoundError: - self.log.error( - 'schedule file \'%s\' not found', self.getSchedule()) + event = self._getEventNow() + if event: + return ( + event['__start_dt'].strftime("%Y-%m-%d %H:%M"), + event['__end_dt'].strftime("%Y-%m-%d %H:%M"), + event['id'], + event['title'], + ) return None def getOverlayFilePath(self, overlay): @@ -185,37 +241,41 @@ def getOverlayFiles(self): # checkt for overlay/schedule option if self.getSchedule(): - try: - - def generate(event): - ''' return all available insert names for event ''' - # get list of persons from event - persons = event.findall("persons/person") - # generate insert file names and names - inserts = ["event_{eid}_person_{pid}|{text}".format( - eid=event.get('id'), - pid=person.get('id'), - text=person.text) for person in persons] - # add a insert for all persons together - if len(persons) > 1: - inserts += ["event_{eid}_persons|{text}".format( - eid=event.get('id'), - text=", ".join([person.text for person in persons]))] - return inserts - - # get current event - event = self._getEventNow() + # get current event + event = self._getEventNow() + if event: # generate inserts from event - inserts = generate(event) + persons = {} + # assemble a list of persons which actually have names + for person in event.get('persons', []): + name = person.get('public_name', person.get('name')) + if name: + persons[name] = person['id'] + if persons: + # ensure we always have a 'persons' overlay if we + # have persons on schedule + inserts.append( + "event_{eid}_persons|{text}".format( + eid=event['id'], + text=", ".join(persons), + ) + ) + # if we have more than one person, add inserts for + # each one of them + if len(persons) > 1: + for pname, pid in sorted(persons.items()): + inserts.append( + "event_{eid}_person_{pid}|{text}".format( + eid=event['id'], + pid=pid, + text=pname, + ) + ) # if empty show warning if not inserts: self.log.warning('schedule file \'%s\' contains no information for inserts of event #%s', self.getSchedule(), - event.get('id')) - except FileNotFoundError: - # show error at file not found - self.log.error('schedule file \'%s\' not found', - self.getSchedule()) + event['id']) # check for overlay/files option if self.has_option('overlay', 'files'): # add inserts from files