143 lines
No EOL
5.3 KiB
Python
143 lines
No EOL
5.3 KiB
Python
from DataSourceInterface import DataSourceInterface
|
|
from datetime import datetime, timezone, timedelta, date
|
|
from dateutil.rrule import rrulestr
|
|
from dateutil.parser import parse
|
|
import calendar
|
|
|
|
class CalendarInterface (DataSourceInterface):
|
|
"""Interface for fetching and processing calendar event information."""
|
|
def __init__ (self):
|
|
self.events = []
|
|
|
|
def reload (self):
|
|
if self.is_available() == False:
|
|
return
|
|
self.events = self.__get_events__()
|
|
self.events = self.__sort_events__(self.events)
|
|
|
|
def __sort_events__ (self, events):
|
|
events.sort(key=lambda x : x.begin_datetime)
|
|
return events
|
|
|
|
def __sort_event_types__ (self, events):
|
|
multiday = [ev for ev in events if ev.multiday]
|
|
allday = [ev for ev in events if ev.allday and ev.multiday == False]
|
|
timed = [ev for ev in events if ev.allday == False and ev.multiday == False]
|
|
return multiday + allday + timed
|
|
|
|
def __get_events__ (self):
|
|
raise NotImplementedError("Functions needs to be implemented")
|
|
|
|
def get_upcoming_events (self, timespan = None, start_time = None):
|
|
if timespan is None:
|
|
timespan = timedelta(31)
|
|
if start_time == None:
|
|
local_tzinfo = datetime.now(timezone.utc).astimezone().tzinfo
|
|
start_time = datetime.now(local_tzinfo)
|
|
return self.__get_events_in_range__(start_time, timespan)
|
|
|
|
def get_today_events (self):
|
|
return self.get_day_events(date.today())
|
|
|
|
def get_day_events (self, day):
|
|
if type(day) is not type(date.today()):
|
|
raise TypeError("get_day_events only takes date-objects as parameters, not \"%s\"" % str(type(day)))
|
|
local_tzinfo = datetime.now(timezone.utc).astimezone().tzinfo
|
|
day_start = datetime(day.year, day.month, day.day, 0, 0, 0, 0, local_tzinfo)
|
|
return self.__get_events_in_range__(day_start, timedelta(1))
|
|
|
|
def get_month_events (self, month = -1, year = -1):
|
|
if month < 0:
|
|
month = datetime.now().month
|
|
if year < 0:
|
|
year = datetime.now().year
|
|
|
|
local_tzinfo = datetime.now(timezone.utc).astimezone().tzinfo
|
|
month_start = datetime(year, month, 1, 0, 0, 0, 0, local_tzinfo)
|
|
month_days = calendar.monthrange(month_start.year, month_start.month)[1]
|
|
return self.__get_events_in_range__(month_start, timedelta(month_days))
|
|
|
|
def __get_events_in_range__ (self, start, duration):
|
|
if self.events is None:
|
|
return []
|
|
|
|
if start.tzinfo is None:
|
|
raise TypeError("start datetime needs to be timezone-aware")
|
|
|
|
events_in_range = []
|
|
for event in self.events:
|
|
event_occurrence = self.__get_if_event_in_range__(event, start, duration)
|
|
if event_occurrence:
|
|
events_in_range.extend(event_occurrence)
|
|
|
|
events_in_range = self.__sort_events__(events_in_range)
|
|
return self.__sort_event_types__(events_in_range)
|
|
|
|
def __get_if_event_in_range__ (self, event, start, duration):
|
|
'''Returns list or None'''
|
|
if event is None:
|
|
return None
|
|
|
|
if event.rrule is None:
|
|
return self.__is_onetime_in_range__(event, start, duration)
|
|
else:
|
|
return self.__is_repeating_in_range__(event, start, duration)
|
|
|
|
def __is_onetime_in_range__ (self, event, start, duration):
|
|
if event.begin_datetime > start:
|
|
first_start = start
|
|
first_duration = duration
|
|
second_start = event.begin_datetime
|
|
else:
|
|
first_start = event.begin_datetime
|
|
first_duration = event.duration
|
|
second_start = start
|
|
|
|
if (second_start - first_start) < first_duration:
|
|
return [ event ]
|
|
else:
|
|
return None
|
|
|
|
def __is_repeating_in_range__ (self, event, start, duration):
|
|
end = start + duration
|
|
occurrences = []
|
|
|
|
try:
|
|
r_string=self.__add_timezoneawarness__(event.rrule)
|
|
rule=rrulestr(r_string,dtstart=event.begin_datetime)
|
|
for occurrence in rule:
|
|
if occurrence - end > timedelta(0):
|
|
return occurrences
|
|
merged_event = self.__merge_event_data__(event, start=occurrence)
|
|
if self.__is_onetime_in_range__(merged_event, start, duration):
|
|
occurrences.append(merged_event)
|
|
return occurrences
|
|
except Exception as ex:
|
|
print("\"is_repeating_in_range\" failed while processing: dtstart.tzinfo="+str(event.begin_datetime.tzinfo))
|
|
raise ex
|
|
|
|
def __merge_event_data__ (self, event, start = None):
|
|
if start is not None:
|
|
event.begin_datetime = start
|
|
event.end_datetime = start + event.duration
|
|
|
|
return event
|
|
|
|
def __add_timezoneawarness__ (self, rrule):
|
|
"""UNTIL must be specified in UTC when DTSTART is timezone-aware (which it is)"""
|
|
if "UNTIL" not in rrule:
|
|
return rrule
|
|
|
|
timezone_str = "T000000Z"
|
|
until_template = "UNTIL=YYYYMMDD"
|
|
|
|
until_index = rrule.index("UNTIL")
|
|
|
|
tz_index = until_index + len(until_template)
|
|
if tz_index < 0 or tz_index >= len(rrule):
|
|
return rrule
|
|
|
|
if rrule[tz_index] is "T":
|
|
return rrule
|
|
|
|
return rrule[:tz_index] + timezone_str + rrule[tz_index:] |