Skip to content

Commit

Permalink
voctocore/lib/config: rework code to use schedule.json instead of xml
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunsi committed Apr 11, 2024
1 parent fa06579 commit b2534a5
Showing 1 changed file with 134 additions and 74 deletions.
208 changes: 134 additions & 74 deletions voctocore/lib/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#!/usr/bin/env python3
import os.path
import json
import logging
import os
import os.path
import re
from configparser import DuplicateSectionError
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo

from lib.args import Args

from vocto.config import VocConfigParser
import xml.etree.ElementTree as ET
from datetime import date, datetime, timedelta
import re
import os

__all__ = ['Config']

Expand All @@ -24,11 +27,11 @@ def scanduration(str):


class VoctocoreConfigParser(VocConfigParser):

def __init__(self):
super().__init__()
self.events = []
self.event_now = None
self.event_tz = None
self.events_update = None
self.default_insert = None

Expand Down Expand Up @@ -74,46 +77,103 @@ def getSchedule(self):
return None

def _getEvents(self):
# check if file has been changed before re-read
self.events = None
# parse XML and iterate all schedule/day elements
self.events = ET.parse(
self.getSchedule()).getroot() .findall('day/room/event')
self.log.info("read {n} events from file \'{f}\'".format(
n=len(self.events), f=self.getSchedule()))
# remember the update time
self.events_update = os.path.getmtime(self.getSchedule())
schedule_path = self.getSchedule()
if not schedule_path:
return None

try:
# return early if schedule has not been updated since we last
# read it
update_time = os.path.getmtime(schedule_path)
if self.events_update and update_time <= self.events_update:
return self.events

# try to load events from json schedule
with open(schedule_path) as f:
schedule = json.load(f)
except (FileNotFoundError, json.decoder.JSONDecodeError) as e:
self.log.error('error while loading schedule from {}: {}'.format(
schedule_path,
repr(e),
))
return None
# try parsing the schedule
try:
self.event_tz = ZoneInfo(schedule['schedule']['conference']['time_zone_name'])
events = []
for day in schedule['schedule']['conference']['days']:
for talks_in_room in day['rooms'].values():
events.extend(talks_in_room)
for talk in events:
if (
'date' not in talk
or 'duration' not in talk
or 'id' not in talk
or 'room' not in talk
or 'title' not in talk
):
# the other code assumes these values are there, exit
# early if we're missing these.
self.logger.error('found malformed talk in {} - missing id/title/date/duration!'.format(
schedule_path,
))
return None
# calculate end time for easier calculations later
start = scandatetime(talk['date'])
d_h, d_m = talk['duration'].split(':')
talk['__start_dt'] = start.replace(tzinfo=self.event_tz)
talk['__end_dt'] = (start + timedelta(hours=int(d_h), minutes=int(d_m))).replace(tzinfo=self.event_tz)
self.log.info("read {n} events from file '{f}'".format(
n=len(events),
f=schedule_path,
))
# remember the update time. do not read it again, to avoid race
# conditions where the schedule got updated between determining
# the mtime and the time we finished parsing.
self.events = events
self.events_update = update_time
except Exception as e:
self.log.error('error while parsing schedule from {}: {}'.format(
schedule_path,
repr(e),
))
return self.events

def _getEventNow(self):
# get currnet date and time
now = datetime.now()
# check for option overlay/event
if self.getScheduleEvent():
# find event by ID
for event in self._getEvents():
if event.get('id') == self.getScheduleEvent():
if event['id'] == self.getScheduleEvent():
# remember current event
self.event_now = event
else:
# check if there is no event already running
if (not self.event_now) or now > scandatetime(self.event_now.find('date').text) + scanduration(self.event_now.find('duration').text):
# inistialize a past date
past = datetime(1999, 1, 1)
# remember nearest start time
nowest = past
# iterate events
for event in self._getEvents():
# check for room
if event.find('room').text == self.getScheduleRoom() or not self.getScheduleRoom():
# get start time
time = scandatetime(event.find('date').text)
# time nearer then nowest
if now >= time and time > nowest:
# remember new nearest time
nowest = time
# rememeber current event
self.event_now = event
NOW = datetime.now(self.event_tz)
# If no event is currently running, or the current event
# has already ended, scan the schedule for the next event
# that happens.
if (
not self.event_now
or NOW >= self.event_now['__end_dt']
):
my_room = self.getScheduleRoom()
events = self._getEvents()
if not events:
return None

for event in events:
# If we have a room set, but the event is not in
# this room, ignore it.
if my_room and event['room'] != my_room:
continue

# This assumes there's no overlapping events
if event['__start_dt'] <= NOW < event['__end_dt']:
self.event_now = event
break
else:
# No event found in self.events
self.event_now = None
return self.event_now

def getOverlaysPath(self):
Expand All @@ -125,18 +185,14 @@ def getOverlaysPath(self):

def getOverlaysTitle(self):
if self.getSchedule():
try:
event = self._getEventNow()
if event:
at = scandatetime(event.find('date').text)
return (at.strftime("%Y-%m-%d %H:%M"),
(at + scanduration(event.find('duration').text)
).strftime("%Y-%m-%d %H:%M"),
event.get('id'),
event.find('title').text)
except FileNotFoundError:
self.log.error(
'schedule file \'%s\' not found', self.getSchedule())
event = self._getEventNow()
if event:
return (
event['__start_dt'].strftime("%Y-%m-%d %H:%M"),
event['__end_dt'].strftime("%Y-%m-%d %H:%M"),
event['id'],
event['title'],
)
return None

def getOverlayFilePath(self, overlay):
Expand Down Expand Up @@ -185,37 +241,41 @@ def getOverlayFiles(self):

# checkt for overlay/schedule option
if self.getSchedule():
try:

def generate(event):
''' return all available insert names for event '''
# get list of persons from event
persons = event.findall("persons/person")
# generate insert file names and names
inserts = ["event_{eid}_person_{pid}|{text}".format(
eid=event.get('id'),
pid=person.get('id'),
text=person.text) for person in persons]
# add a insert for all persons together
if len(persons) > 1:
inserts += ["event_{eid}_persons|{text}".format(
eid=event.get('id'),
text=", ".join([person.text for person in persons]))]
return inserts

# get current event
event = self._getEventNow()
# get current event
event = self._getEventNow()
if event:
# generate inserts from event
inserts = generate(event)
persons = {}
# assemble a list of persons which actually have names
for person in event.get('persons', []):
name = person.get('public_name', person.get('name'))
if name:
persons[name] = person['id']
if persons:
# ensure we always have a 'persons' overlay if we
# have persons on schedule
inserts.append(
"event_{eid}_persons|{text}".format(
eid=event['id'],
text=", ".join(persons),
)
)
# if we have more than one person, add inserts for
# each one of them
if len(persons) > 1:
for pname, pid in sorted(persons.items()):
inserts.append(
"event_{eid}_person_{pid}|{text}".format(
eid=event['id'],
pid=pid,
text=pname,
)
)
# if empty show warning
if not inserts:
self.log.warning('schedule file \'%s\' contains no information for inserts of event #%s',
self.getSchedule(),
event.get('id'))
except FileNotFoundError:
# show error at file not found
self.log.error('schedule file \'%s\' not found',
self.getSchedule())
event['id'])
# check for overlay/files option
if self.has_option('overlay', 'files'):
# add inserts from files
Expand Down

0 comments on commit b2534a5

Please sign in to comment.