First version of rrule support. Redesigned CalendarInterface. Dropped Support for get_week_events

This commit is contained in:
Maximilian Giller 2019-03-24 11:07:22 +01:00
parent c854e7eec1
commit 88ace91239
2 changed files with 106 additions and 32 deletions

View file

@ -1,46 +1,116 @@
from DataSourceInterface import DataSourceInterface from DataSourceInterface import DataSourceInterface
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from dateutil.rrule import rrulestr
from dateutil.parser import parse
import calendar
class CalendarInterface (DataSourceInterface): class CalendarInterface (DataSourceInterface):
"""Interface for fetching and processing calendar event information.""" """Interface for fetching and processing calendar event information."""
def __init__ (self): def __init__ (self):
self.loaded_events = self.__get_events__() self.events = self.__get_events__()
self.__sort_events__() self.events = self.__sort_events__(self.events)
def __sort_events__(self): def __sort_events__ (self, events):
self.loaded_events.sort(key=lambda x : x.begin_datetime) events.sort(key=lambda x : x.begin_datetime)
return events
def __get_events__ (self): def __get_events__ (self):
raise NotImplementedError("Functions needs to be implemented") raise NotImplementedError("Functions needs to be implemented")
def get_upcoming_events(self): def get_upcoming_events (self, timespan = None):
return self.__get_events_to_filter__(lambda x : (x.begin_datetime - datetime.now(timezone.utc)) > timedelta(0)) if timespan is None:
timespan = timedelta(31)
return self.__get_events_in_range__(datetime.now(), timespan)
def get_today_events (self): def get_today_events (self):
return self.get_day_events(datetime.now(timezone.utc)) return self.get_day_events(datetime.now())
def get_day_events (self, date): def get_day_events (self, date):
if type(date) is type(datetime.now()): day_start = datetime(date.year, date.month, date.day)
date = date.date() return self.__get_events_in_range__(day_start, timedelta(1))
return self.__get_events_to_filter__(lambda x : (x.begin_datetime.date() - date) <= timedelta(0) and (x.end_datetime.date() - date) >= timedelta(0))
def get_month_events (self, month = -1): def get_month_events (self, month = -1):
if month < 0: if month < 0:
month = datetime.now().month month = datetime.now().month
return self.__get_events_to_filter__(lambda x : x.begin_datetime.month == month or x.end_datetime.month == month)
month_start = datetime(datetime.now().year, month, 1)
month_days = calendar.monthrange(month_start.year, month_start.month)[1]
return self.__get_events_in_range__(month_start, timedelta(month_days))
def get_week_events (self, week = -1): def get_week_events (self, week = -1):
raise NotImplementedError("Support dropped. Needs update.")
if week < 0 and week_starts_on == "Monday": if week < 0 and week_starts_on == "Monday":
week = int(datetime.now().strftime('%W')) + 1 week = int(datetime.now().strftime('%W')) + 1
elif week < 0: elif week < 0:
week = int(datetime.now().strftime('%U')) + 1 week = int(datetime.now().strftime('%U')) + 1
if week_starts_on == "Monday": if week_starts_on == "Monday":
return self.__get_events_to_filter__(lambda x : int(x.begin_datetime.strftime('%W')) + 1 == week or int(x.end_datetime.strftime('%W')) + 1 == week) return self.__get_events_in_range__(lambda x : int(x.begin_datetime.strftime('%W')) + 1 == week or int(x.end_datetime.strftime('%W')) + 1 == week)
else: else:
return self.__get_events_to_filter__(lambda x : int(x.begin_datetime.strftime('%U')) + 1 == week or int(x.end_datetime.strftime('%U')) + 1 == week) return self.__get_events_in_range__(lambda x : int(x.begin_datetime.strftime('%U')) + 1 == week or int(x.end_datetime.strftime('%U')) + 1 == week)
def __get_events_to_filter__(self, event_filter): def __get_events_in_range__ (self, start, duration):
if self.loaded_events is None: if self.events is None:
return [] return []
return [event for event in self.loaded_events if event_filter(event)]
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
events_in_range = []
for event in self.events:
event_occurrence = self.__get_if_event_in_range__(event, start, duration)
if event_occurrence:
events_in_range.extend(event_occurrence)
events_in_range = self.__sort_events__(events_in_range)
return events_in_range
def __get_if_event_in_range__ (self, event, start, duration):
'''Returns list or None'''
if event is None:
return None
if event.rrule is None:
return self.__is_onetime_in_range__(event, start, duration)
else:
return self.__is_repeating_in_range__(event, start, duration)
def __is_onetime_in_range__ (self, event, start, duration):
if event.begin_datetime > start:
first_start = start
first_duration = duration
second_start = event.begin_datetime
second_duration = event.duration
else:
first_start = event.begin_datetime
first_duration = event.duration
second_start = start
second_duration = duration
event_end = event.begin_datetime + event.duration
if (second_start - first_start) < first_duration:
return [ event ]
else:
return None
def __is_repeating_in_range__ (self, event, start, duration):
end = start + duration
occurrences = []
r_string=event.rrule
rule=rrulestr(r_string,dtstart=parse(str(event.begin_datetime)))
for occurrence in rule:
if occurrence - end > timedelta(0):
return occurrences
merged_event = self.__merge_event_data__(event, start=occurrence)
if self.__is_onetime_in_range__(merged_event, start, duration):
occurrences.append(merged_event)
return occurrences
def __merge_event_data__ (self, event, start = None):
if start is not None:
event.begin_datetime = start
event.end_datetime = start + event.duration
return event

View file

@ -2,6 +2,7 @@ from CalendarInterface import CalendarInterface
from CalendarEvent import CalendarEvent from CalendarEvent import CalendarEvent
from ics import Calendar from ics import Calendar
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import re
from settings import week_starts_on from settings import week_starts_on
try: try:
from urllib.request import urlopen from urllib.request import urlopen
@ -36,9 +37,8 @@ class IcalEvents(CalendarInterface):
for calendar in urls: for calendar in urls:
decode = str(urlopen(calendar).read().decode()) decode = str(urlopen(calendar).read().decode())
fixed_decode = self.__fix_errors__(decode)
ical = Calendar(fixed_decode) ical = Calendar(decode)
for event in ical.events: for event in ical.events:
cal_event = CalendarEvent() cal_event = CalendarEvent()
@ -50,20 +50,18 @@ class IcalEvents(CalendarInterface):
cal_event.description = event.description cal_event.description = event.description
cal_event.location = event.location cal_event.location = event.location
cal_event.allday = event.all_day cal_event.allday = event.all_day
cal_event.rrule = self.__extract_rrule__(event)
if cal_event.allday: if cal_event.allday:
cal_event.end_datetime = cal_event.end_datetime - timedelta(2) cal_event.end_datetime = cal_event.end_datetime - timedelta(1)
cal_event.duration = cal_event.duration - timedelta(1)
loaded_events.append(cal_event) loaded_events.append(cal_event)
return loaded_events return loaded_events
except: except BaseException as ex:
print(ex)
return loaded_events return loaded_events
def __fix_errors__(self, decode):
decode = self.__remove_alarms__(decode)
return decode.replace('BEGIN:VALARM\r\nACTION:NONE','BEGIN:VALARM\r\nACTION:DISPLAY\r\nDESCRIPTION:') \
.replace('BEGIN:VALARM\r\nACTION:EMAIL','BEGIN:VALARM\r\nACTION:DISPLAY\r\nDESCRIPTION:')
def __remove_alarms__(self, decode): def __remove_alarms__(self, decode):
alarm_begin = 'BEGIN:VALARM' alarm_begin = 'BEGIN:VALARM'
alarm_end = 'END:VALARM' alarm_end = 'END:VALARM'
@ -76,3 +74,9 @@ class IcalEvents(CalendarInterface):
endAlarmIndex = decode.find(alarm_end, beginAlarmIndex) endAlarmIndex = decode.find(alarm_end, beginAlarmIndex)
decode = decode[:beginAlarmIndex] + decode[endAlarmIndex + len(alarm_end) + len(lineseparation):] decode = decode[:beginAlarmIndex] + decode[endAlarmIndex + len(alarm_end) + len(lineseparation):]
return decode return decode
def __extract_rrule__(self, event):
if re.search('RRULE',str(event)) is None:
return None
return re.search('RRULE:(.+?)\n',str(event)).group(1).rstrip()