From 2314277b947089ef18634c8c5e12a7da86f6536d Mon Sep 17 00:00:00 2001 From: Facetoe Date: Wed, 2 Aug 2017 00:35:01 +0800 Subject: [PATCH] Add generic calendar module. (#598) --- i3pystatus/calendar/__init__.py | 214 +++++++++++++++++++++++++++ i3pystatus/calendar/google.py | 116 +++++++++++++++ i3pystatus/calendar/khal_calendar.py | 67 +++++++++ i3pystatus/calendar/lightning.py | 103 +++++++++++++ i3pystatus/google_calendar.py | 212 -------------------------- 5 files changed, 500 insertions(+), 212 deletions(-) create mode 100644 i3pystatus/calendar/__init__.py create mode 100644 i3pystatus/calendar/google.py create mode 100644 i3pystatus/calendar/khal_calendar.py create mode 100644 i3pystatus/calendar/lightning.py delete mode 100644 i3pystatus/google_calendar.py diff --git a/i3pystatus/calendar/__init__.py b/i3pystatus/calendar/__init__.py new file mode 100644 index 0000000..033b39c --- /dev/null +++ b/i3pystatus/calendar/__init__.py @@ -0,0 +1,214 @@ +import inspect +import threading +from abc import abstractmethod +from datetime import datetime, timedelta + +from i3pystatus import IntervalModule, formatp, SettingsBase +from i3pystatus.core.color import ColorRangeModule +from i3pystatus.core.desktop import DesktopNotification + + +def strip_microseconds(delta): + return delta - timedelta(microseconds=delta.microseconds) + + +def formatter(func): + """ Decorator to mark a CalendarEvent method as a formatter. """ + func.formatter = True + return func + + +class CalendarEvent: + """ + Simple class representing an Event. The attributes title, start, end and recurring are required as + these will be used for the formatters. The id attribute is used to uniquely identify the event. + + If a backend wishes to provide extra formatters to the user, this can be done by adding additional + methods and decorating them with the @formatter decorator. See the LightningCalendarEvent from the + lightning module for an example of this. + """ + + # Unique identifier for this event + id = None + + # The title of this event + title = None + + # Datetime object representing when this event begins + start = None + + # Datetime object representing when this event ends + end = None + + # Whether or not this event is a recurring event + recurring = False + + def formatters(self): + """ + Build a dictionary containing all those key/value pairs that will be exposed to the user via formatters. + """ + event_dict = dict( + title=self.title, + remaining=self.time_remaining + ) + + def is_formatter(x): + return inspect.ismethod(x) and hasattr(x, 'formatter') and getattr(x, 'formatter') + + for method_name, method in inspect.getmembers(self, is_formatter): + event_dict[method_name] = method() + return event_dict + + @property + def time_remaining(self): + return strip_microseconds(self.start - datetime.now(tz=self.start.tzinfo)) + + def __str__(self): + return "{}(title='{}', start={}, end={}, recurring={})" \ + .format(type(self).__name__, + self.title, + repr(self.start), + repr(self.end), + self.recurring) + + +class CalendarBackend(SettingsBase): + """ + Base class for calendar backend. Subclasses should implement update and populate the events list. + + Optionally, subclasses can override on_click to perform actions on the current event when clicked. + """ + + def init(self): + self.events = [] + + @abstractmethod + def update(self): + """ Subclasses should implement this method and populate the events list with CalendarEvents.""" + + def on_click(self, event): + """ Override this method to do more interesting things with the event. """ + DesktopNotification( + title=event.title, + body="{} until {}!".format(event.time_remaining, event.title), + icon='dialog-information', + urgency=1, + timeout=0, + ).display() + + def __iter__(self): + return iter(self.events) + + def __len__(self): + return len(self.events) + + +class Calendar(IntervalModule, ColorRangeModule): + """ + Generic calendar module. Requires the PyPI package ``colour``. + + .. rubric:: Available formatters + + * {title} - the title or summary of the event + * {remaining_time} - how long until this event is due + + Additional formatters may be provided by the backend, consult their documentation for details. + """ + + settings = ( + ('format', 'Format string to display in the bar'), + ('backend', 'Backend to use for collecting calendar events'), + ('skip_recurring', 'Whether or not to skip recurring events'), + ('update_interval', "How often in seconds to call the backend's update method"), + ('urgent_seconds', "When within this many seconds of the event, set the urgent flag"), + ('urgent_blink', 'Whether or not to blink when within urgent_seconds of the event'), + ('dynamic_color', 'Whether or not to change color as the event approaches'), + ) + + required = ('backend',) + + skip_recurring = False + interval = 1 + backend = None + update_interval = 600 + dynamic_color = True + urgent_seconds = 300 + urgent_blink = False + + current_event = None + urgent_acknowledged = False + + format = "{title} - {remaining}" + + on_rightclick = 'handle_click' + on_leftclick = 'acknowledge' + + def init(self): + self.condition = threading.Condition() + self.thread = threading.Thread(target=self.update_thread, daemon=True) + self.thread.start() + self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.urgent_seconds * 2) + + def update_thread(self): + self.refresh_events() + while True: + with self.condition: + self.condition.wait(self.update_interval) + self.refresh_events() + + def refresh_events(self): + self.backend.update() + + def valid_event(ev): + if self.skip_recurring and ev.recurring: + return False + elif ev.time_remaining < timedelta(seconds=0): + return False + return True + + for event in self.backend: + if valid_event(event): + if self.current_event and self.current_event.id != event.id: + self.urgent_acknowledged = False + self.current_event = event + return + self.current_event = None + + def run(self): + if self.current_event and self.current_event.time_remaining > timedelta(seconds=0): + self.output = { + "full_text": formatp(self.format, **self.current_event.formatters()), + "color": self.get_color() if self.dynamic_color else None, + "urgent": self.is_urgent() + } + else: + self.output = {} + + def handle_click(self): + if self.current_event: + self.backend.on_click(self.current_event) + + def get_color(self): + if self.current_event.time_remaining.days > 0: + color = self.colors[-1] + else: + p = self.percentage(self.current_event.time_remaining.seconds, self.urgent_seconds) + color = self.get_gradient(p, self.colors) + return color + + def is_urgent(self): + """ + Determine whether or not to set the urgent flag. If urgent_blink is set, toggles urgent flag + on and off every second. + """ + if not self.current_event: + return False + now = datetime.now(tz=self.current_event.start.tzinfo) + alert_time = now + timedelta(seconds=self.urgent_seconds) + urgent = alert_time > self.current_event.start + if urgent and self.urgent_blink: + urgent = now.second % 2 == 0 and not self.urgent_acknowledged + return urgent + + def acknowledge(self): + self.urgent_acknowledged = not self.urgent_acknowledged diff --git a/i3pystatus/calendar/google.py b/i3pystatus/calendar/google.py new file mode 100644 index 0000000..46b674e --- /dev/null +++ b/i3pystatus/calendar/google.py @@ -0,0 +1,116 @@ +import datetime +from datetime import timezone + +import httplib2 +import oauth2client +import pytz +from apiclient import discovery +from dateutil import parser +from googleapiclient.errors import HttpError +from i3pystatus.calendar import CalendarBackend, CalendarEvent, formatter +from i3pystatus.core.util import user_open, require, internet + + +class GoogleCalendarEvent(CalendarEvent): + def __init__(self, google_event): + self.id = google_event['id'] + self.title = google_event['summary'] + self.start = self._parse_date(google_event['start']) + self.end = self._parse_date(google_event['end']) + self.recurring = 'recurringEventId' in google_event + self._link = google_event['htmlLink'] + self._status = google_event['status'] + self._kind = google_event['kind'] + + @formatter + def htmlLink(self): + return self._link + + @formatter + def status(self): + return self._status + + @formatter + def kind(self): + return self._kind + + def _parse_date(self, date_section): + if 'dateTime' not in date_section: + result = parser.parse(date_section['date']) + else: + result = parser.parse(date_section['dateTime']) + return result.replace(tzinfo=timezone.utc).astimezone(tz=None) + + +class Google(CalendarBackend): + """ + Calendar backend for interacting with Google Calendar. + + Requires the Google Calendar API package - https://developers.google.com/google-apps/calendar/quickstart/python. + Additionally requires the `colour`, `httplib2`, `oauth2client`, `pytz`, `apiclient` and `dateutil` modules. + + .. rubric:: Available formatters + + * `{kind}` — type of event + * `{status}` — eg, confirmed + * `{htmlLink}` — link to the calendar event + """ + + settings = ( + ('credential_path', 'Path to credentials'), + ('days', 'Only show events between now and this many days in the future'), + ) + + required = ('credential_path',) + + days = 7 + + def init(self): + self.service = None + self.events = [] + + @require(internet) + def update(self): + if self.service is None: + self.connect_service() + self.refresh_events() + + def on_click(self, event): + user_open(event.htmlLink()) + + def connect_service(self): + self.logger.debug("Connecting Service..") + self.credentials = oauth2client.file.Storage(self.credential_path).get() + self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http())) + + def refresh_events(self): + """ + Retrieve the next N events from Google. + """ + now = datetime.datetime.now(tz=pytz.UTC) + try: + now, later = self.get_timerange_formatted(now) + events_result = self.service.events().list( + calendarId='primary', + timeMin=now, + timeMax=later, + maxResults=10, + singleEvents=True, + orderBy='startTime', + timeZone='utc' + ).execute() + self.events.clear() + for event in events_result.get('items', []): + self.events.append(GoogleCalendarEvent(event)) + except HttpError as e: + if e.resp.status in (500, 503): + self.logger.warn("GoogleCalendar received %s while retrieving events" % e.resp.status) + else: + raise + + def get_timerange_formatted(self, now): + """ + Return two ISO8601 formatted date strings, one for timeMin, the other for timeMax (to be consumed by get_events) + """ + later = now + datetime.timedelta(days=self.days) + return now.isoformat(), later.isoformat() diff --git a/i3pystatus/calendar/khal_calendar.py b/i3pystatus/calendar/khal_calendar.py new file mode 100644 index 0000000..774d5ab --- /dev/null +++ b/i3pystatus/calendar/khal_calendar.py @@ -0,0 +1,67 @@ +from datetime import date, timedelta + +import khal +import khal.cli +import khal.settings +from i3pystatus.calendar import CalendarBackend, CalendarEvent, formatter + + +class KhalEvent(CalendarEvent): + def __init__(self, khal_event): + self.id = khal_event.uid + self.start = khal_event.start_local + self.end = khal_event.end_local + self.title = khal_event.summary + self.recurring = khal_event.recurring + self._calendar = khal_event.calendar + + @formatter + def calendar(self): + return self._calendar + + +class Khal(CalendarBackend): + """ + Backend for Khal. Requires `khal` to be installed. + + .. rubric:: Available formatters + * `{calendar}` — Calendar event is from. + """ + + settings = ( + ('config_path', 'Path to your khal.conf'), + ('calendars', 'Restrict to these calendars pass as a list)'), + ('days', 'Check for the next X days'), + ) + + required = ('config_path', 'calendars') + + days = 7 + + config_path = None + calendars = None + + def init(self): + self.collection = None + self.events = [] + + def open_connection(self): + self.logger.debug("Opening collection with config {}".format(self.config_path)) + config = khal.settings.get_config(self.config_path) + self.collection = khal.cli.build_collection(config, None) + + def update(self): + if self.collection is None: + self.open_connection() + events = [] + for days in range(self.days): + events += list(self.collection.get_events_on( + date.today() + timedelta(days=days)) + ) + + # filter out unwanted calendars + self.logger.debug("calendars %s" % self.calendars) + if self.calendars is not None: + events = [evt for evt in events if evt.calendar in self.calendars] + for event in events: + self.events.append(KhalEvent(event)) diff --git a/i3pystatus/calendar/lightning.py b/i3pystatus/calendar/lightning.py new file mode 100644 index 0000000..3f797ad --- /dev/null +++ b/i3pystatus/calendar/lightning.py @@ -0,0 +1,103 @@ +import sqlite3 +from datetime import datetime + +import pytz +from dateutil.tz import tzlocal +from i3pystatus.calendar import CalendarEvent, CalendarBackend, formatter + + +class Flag: + PRIVATE = 1 + HAS_ATTENDEES = 2 + HAS_PROPERTIES = 4 + EVENT_ALLDAY = 8 + HAS_RECURRENCE = 16 + HAS_EXCEPTIONS = 32 + HAS_ATTACHMENTS = 64 + HAS_RELATIONS = 128 + HAS_ALARMS = 256 + RECURRENCE_ID_ALLDAY = 512 + + +class LightningCalendarEvent(CalendarEvent): + def __init__(self, row): + self.id = row['id'] + self.title = row['title'] + self._event_start = row['event_start'] + self._event_start_tz = row['event_start_tz'] + self._event_end = row['event_end'] + self._event_end_tz = row['event_end_tz'] + self._flags = row['flags'] + self._location = row['location'] or '' + + @property + def recurring(self): + return (self._flags & Flag.HAS_RECURRENCE) != 0 + + @property + def end(self): + return self._convert_date(self._event_end, self._event_end_tz) + + @property + def start(self): + return self._convert_date(self._event_start, self._event_start_tz) + + @formatter + def location(self): + return self._location + + def _convert_date(self, microseconds_from_epoch, timezone): + if timezone == 'floating': + tz = tzlocal() + else: + tz = pytz.timezone(timezone) + d = datetime.fromtimestamp(microseconds_from_epoch / 1000000, tz=pytz.UTC) + return d.astimezone(tz) + + +class Lightning(CalendarBackend): + """ + Backend for querying the Thunderbird's Lightning database. Requires `pytz` and `dateutil`. + + .. rubric:: Available formatters + + * `{location}` — Where the event occurs + """ + + settings = ( + ('database_path', 'Path to local.sqlite.'), + ('days', 'Only show events between now and this many days in the future'), + ) + + required = ('database_path',) + + days = 7 + + database_path = None + + def update(self): + with sqlite3.connect(self.database_path) as connection: + connection.row_factory = sqlite3.Row + cursor = connection.cursor() + cursor.execute(""" + SELECT + id, + title, + event_start, + event_start_tz, + event_end, + event_end_tz, + flags, + cal_properties.value AS location + FROM cal_events + LEFT OUTER JOIN cal_properties ON cal_properties.item_id = id AND cal_properties.key = 'LOCATION' + WHERE + datetime(event_start / 1000000, 'unixepoch', 'localtime') < datetime('now', 'localtime', '+' || :days || ' days') + AND + datetime(event_start / 1000000, 'unixepoch', 'localtime') > datetime('now', 'localtime') + ORDER BY event_start ASC + """, dict(days=self.days)) + self.events.clear() + for row in cursor: + self.events.append(LightningCalendarEvent(row)) + cursor.close() diff --git a/i3pystatus/google_calendar.py b/i3pystatus/google_calendar.py deleted file mode 100644 index 6c98fa7..0000000 --- a/i3pystatus/google_calendar.py +++ /dev/null @@ -1,212 +0,0 @@ -import datetime -import threading - -import httplib2 -import oauth2client -import pytz -from apiclient import discovery -from dateutil import parser -from googleapiclient.errors import HttpError - -from i3pystatus import IntervalModule, logger -from i3pystatus.core.color import ColorRangeModule -from i3pystatus.core.util import user_open, internet, require - - -class GoogleCalendar(IntervalModule, ColorRangeModule): - """ - Simple module for displaying next Google Calendar event. - - Requires the Google Calendar API package - https://developers.google.com/google-apps/calendar/quickstart/python. - Additionally requires the `colour`, `httplib2`, `oauth2client`, `pytz`, `apiclient` and `dateutil` modules. - - All top level keys returned by the Google Calendar API can be used as formatters. Some - examples include: - - .. rubric:: Available formatters - - * `{kind}` — type of event - * `{status}` — eg, confirmed - * `{summary}` — essentially the title - * `{remaining_time}` - how long remaining until the event - * `{start_time}` - when this event starts - * `{htmlLink}` — link to the calendar event - - - """ - settings = ( - ('format', 'format string'), - ("credential_path", "Path to credentials"), - ("skip_recurring", "Skip recurring events."), - ("update_interval", "How often (in seconds) to call the Goggle API and update events"), - ("days", "Only show events between now and this many days in the future"), - ("urgent_seconds", "Add urgent hint when this many seconds until event startTime"), - ("urgent_blink", "Whether or not to blink when the within urgent_seconds of event start"), - ("start_color", "Hex or English name for start of color range, eg '#00FF00' or 'green'"), - ("end_color", "Hex or English name for end of color range, eg '#FF0000' or 'red'"), - ) - - required = ('credential_path',) - - format = "{summary} ({remaining_time})" - credential_path = None - interval = 1 - - skip_recurring = True - update_interval = 60 - days = 1 - urgent_seconds = 300 - urgent_blink = False - color = None - - service = None - credentials = None - - display_event = None - last_event_refresh = None - urgent_acknowledged = False - update_lock = threading.Lock() - - on_rightclick = 'acknowledge' - on_leftclick = 'open_calendar' - - def init(self): - self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.urgent_seconds * 2) - self.last_event_refresh = datetime.datetime.now(tz=pytz.UTC) - datetime.timedelta(seconds=self.update_interval) - - @require(internet) - def run(self): - if self.service is None: - self.connect_service() - now = datetime.datetime.now(tz=pytz.UTC) - if self.should_update(now): - threading.Thread(target=self.update_display_event, args=(now,), daemon=True).start() - self.refresh_output(now) - - def should_update(self, now): - """ - Whether or not we should update events. - """ - wait_window = self.last_event_refresh + datetime.timedelta(seconds=self.update_interval) - if self.display_event is None: - should_update = wait_window < now - elif self.display_event['start_time'] < now: - should_update = True - elif wait_window < now: - should_update = True - else: - should_update = False - return should_update and not self.update_lock.locked() - - def update_display_event(self, now): - """ - Call the Google API and attempt to update the current event. - """ - with self.update_lock: - logger.debug("Retrieving events...".format(threading.current_thread().name)) - self.last_event_refresh = now - for event in self.get_events(now): - # If we don't have a dateTime just make do with a date. - if 'dateTime' not in event['start']: - event['start_time'] = pytz.utc.localize(parser.parse(event['start']['date'])) - else: - event['start_time'] = parser.parse(event['start']['dateTime']) - - if 'recurringEventId' in event and self.skip_recurring: - continue - elif event['start_time'] < now: - continue - - # It is possible for there to be no title... - if 'summary' not in event: - event['summary'] = '(no title)' - - if self.display_event: - # If this is a new event, reset the urgent_acknowledged flag. - if self.display_event['id'] != event['id']: - self.urgent_acknowledged = False - self.display_event = event - return - self.display_event = None - - def refresh_output(self, now): - """ - Build our output dict. - """ - if self.display_event: - start_time = self.display_event['start_time'] - alert_time = now + datetime.timedelta(seconds=self.urgent_seconds) - self.display_event['remaining_time'] = str((start_time - now)).partition('.')[0] - urgent = self.is_urgent(alert_time, start_time, now) - color = self.get_color(now, start_time) - - self.output = { - 'full_text': self.format.format(**self.display_event), - 'color': color, - 'urgent': urgent - } - else: - self.output = { - 'full_text': "", - } - - def is_urgent(self, alert_time, start_time, now): - """ - Determine whether or not to set the urgent flag. If urgent_blink is set, toggles urgent flag - on and off every second. - """ - urgent = alert_time > start_time - if urgent and self.urgent_blink: - urgent = now.second % 2 == 0 and not self.urgent_acknowledged - return urgent - - def get_events(self, now): - """ - Retrieve the next N events from Google. - """ - events = [] - try: - now, later = self.get_timerange_formatted(now) - events_result = self.service.events().list( - calendarId='primary', - timeMin=now, - timeMax=later, - maxResults=10, - singleEvents=True, - orderBy='startTime', - timeZone='utc' - ).execute() - events = events_result.get('items', []) - except HttpError as e: - if e.resp.status in (500, 503): - logger.warn("GoogleCalendar received %s while retrieving events" % e.resp.status) - else: - raise - return events - - def get_timerange_formatted(self, now): - """ - Return two ISO8601 formatted date strings, one for timeMin, the other for timeMax (to be consumed by get_events) - """ - later = now + datetime.timedelta(days=self.days) - return now.isoformat(), later.isoformat() - - def get_color(self, now, start_time): - seconds_to_event = (start_time - now).seconds - v = self.percentage(seconds_to_event, self.urgent_seconds) - color = self.get_gradient(v, self.colors) - return color - - def connect_service(self): - logger.debug("Connecting Service..") - self.credentials = oauth2client.file.Storage(self.credential_path).get() - self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http())) - - def open_calendar(self): - if self.display_event: - calendar_url = self.display_event.get('htmlLink', None) - if calendar_url: - user_open(calendar_url) - - def acknowledge(self): - self.urgent_acknowledged = True