Add generic calendar module. (#598)

This commit is contained in:
Facetoe 2017-08-02 00:35:01 +08:00 committed by GitHub
parent fc2ae2a387
commit 2314277b94
5 changed files with 500 additions and 212 deletions

View File

@ -0,0 +1,214 @@
import inspect
import threading
from abc import abstractmethod
from datetime import datetime, timedelta
from i3pystatus import IntervalModule, formatp, SettingsBase
from i3pystatus.core.color import ColorRangeModule
from i3pystatus.core.desktop import DesktopNotification
def strip_microseconds(delta):
return delta - timedelta(microseconds=delta.microseconds)
def formatter(func):
""" Decorator to mark a CalendarEvent method as a formatter. """
func.formatter = True
return func
class CalendarEvent:
"""
Simple class representing an Event. The attributes title, start, end and recurring are required as
these will be used for the formatters. The id attribute is used to uniquely identify the event.
If a backend wishes to provide extra formatters to the user, this can be done by adding additional
methods and decorating them with the @formatter decorator. See the LightningCalendarEvent from the
lightning module for an example of this.
"""
# Unique identifier for this event
id = None
# The title of this event
title = None
# Datetime object representing when this event begins
start = None
# Datetime object representing when this event ends
end = None
# Whether or not this event is a recurring event
recurring = False
def formatters(self):
"""
Build a dictionary containing all those key/value pairs that will be exposed to the user via formatters.
"""
event_dict = dict(
title=self.title,
remaining=self.time_remaining
)
def is_formatter(x):
return inspect.ismethod(x) and hasattr(x, 'formatter') and getattr(x, 'formatter')
for method_name, method in inspect.getmembers(self, is_formatter):
event_dict[method_name] = method()
return event_dict
@property
def time_remaining(self):
return strip_microseconds(self.start - datetime.now(tz=self.start.tzinfo))
def __str__(self):
return "{}(title='{}', start={}, end={}, recurring={})" \
.format(type(self).__name__,
self.title,
repr(self.start),
repr(self.end),
self.recurring)
class CalendarBackend(SettingsBase):
"""
Base class for calendar backend. Subclasses should implement update and populate the events list.
Optionally, subclasses can override on_click to perform actions on the current event when clicked.
"""
def init(self):
self.events = []
@abstractmethod
def update(self):
""" Subclasses should implement this method and populate the events list with CalendarEvents."""
def on_click(self, event):
""" Override this method to do more interesting things with the event. """
DesktopNotification(
title=event.title,
body="{} until {}!".format(event.time_remaining, event.title),
icon='dialog-information',
urgency=1,
timeout=0,
).display()
def __iter__(self):
return iter(self.events)
def __len__(self):
return len(self.events)
class Calendar(IntervalModule, ColorRangeModule):
"""
Generic calendar module. Requires the PyPI package ``colour``.
.. rubric:: Available formatters
* {title} - the title or summary of the event
* {remaining_time} - how long until this event is due
Additional formatters may be provided by the backend, consult their documentation for details.
"""
settings = (
('format', 'Format string to display in the bar'),
('backend', 'Backend to use for collecting calendar events'),
('skip_recurring', 'Whether or not to skip recurring events'),
('update_interval', "How often in seconds to call the backend's update method"),
('urgent_seconds', "When within this many seconds of the event, set the urgent flag"),
('urgent_blink', 'Whether or not to blink when within urgent_seconds of the event'),
('dynamic_color', 'Whether or not to change color as the event approaches'),
)
required = ('backend',)
skip_recurring = False
interval = 1
backend = None
update_interval = 600
dynamic_color = True
urgent_seconds = 300
urgent_blink = False
current_event = None
urgent_acknowledged = False
format = "{title} - {remaining}"
on_rightclick = 'handle_click'
on_leftclick = 'acknowledge'
def init(self):
self.condition = threading.Condition()
self.thread = threading.Thread(target=self.update_thread, daemon=True)
self.thread.start()
self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.urgent_seconds * 2)
def update_thread(self):
self.refresh_events()
while True:
with self.condition:
self.condition.wait(self.update_interval)
self.refresh_events()
def refresh_events(self):
self.backend.update()
def valid_event(ev):
if self.skip_recurring and ev.recurring:
return False
elif ev.time_remaining < timedelta(seconds=0):
return False
return True
for event in self.backend:
if valid_event(event):
if self.current_event and self.current_event.id != event.id:
self.urgent_acknowledged = False
self.current_event = event
return
self.current_event = None
def run(self):
if self.current_event and self.current_event.time_remaining > timedelta(seconds=0):
self.output = {
"full_text": formatp(self.format, **self.current_event.formatters()),
"color": self.get_color() if self.dynamic_color else None,
"urgent": self.is_urgent()
}
else:
self.output = {}
def handle_click(self):
if self.current_event:
self.backend.on_click(self.current_event)
def get_color(self):
if self.current_event.time_remaining.days > 0:
color = self.colors[-1]
else:
p = self.percentage(self.current_event.time_remaining.seconds, self.urgent_seconds)
color = self.get_gradient(p, self.colors)
return color
def is_urgent(self):
"""
Determine whether or not to set the urgent flag. If urgent_blink is set, toggles urgent flag
on and off every second.
"""
if not self.current_event:
return False
now = datetime.now(tz=self.current_event.start.tzinfo)
alert_time = now + timedelta(seconds=self.urgent_seconds)
urgent = alert_time > self.current_event.start
if urgent and self.urgent_blink:
urgent = now.second % 2 == 0 and not self.urgent_acknowledged
return urgent
def acknowledge(self):
self.urgent_acknowledged = not self.urgent_acknowledged

View File

@ -0,0 +1,116 @@
import datetime
from datetime import timezone
import httplib2
import oauth2client
import pytz
from apiclient import discovery
from dateutil import parser
from googleapiclient.errors import HttpError
from i3pystatus.calendar import CalendarBackend, CalendarEvent, formatter
from i3pystatus.core.util import user_open, require, internet
class GoogleCalendarEvent(CalendarEvent):
def __init__(self, google_event):
self.id = google_event['id']
self.title = google_event['summary']
self.start = self._parse_date(google_event['start'])
self.end = self._parse_date(google_event['end'])
self.recurring = 'recurringEventId' in google_event
self._link = google_event['htmlLink']
self._status = google_event['status']
self._kind = google_event['kind']
@formatter
def htmlLink(self):
return self._link
@formatter
def status(self):
return self._status
@formatter
def kind(self):
return self._kind
def _parse_date(self, date_section):
if 'dateTime' not in date_section:
result = parser.parse(date_section['date'])
else:
result = parser.parse(date_section['dateTime'])
return result.replace(tzinfo=timezone.utc).astimezone(tz=None)
class Google(CalendarBackend):
"""
Calendar backend for interacting with Google Calendar.
Requires the Google Calendar API package - https://developers.google.com/google-apps/calendar/quickstart/python.
Additionally requires the `colour`, `httplib2`, `oauth2client`, `pytz`, `apiclient` and `dateutil` modules.
.. rubric:: Available formatters
* `{kind}` type of event
* `{status}` eg, confirmed
* `{htmlLink}` link to the calendar event
"""
settings = (
('credential_path', 'Path to credentials'),
('days', 'Only show events between now and this many days in the future'),
)
required = ('credential_path',)
days = 7
def init(self):
self.service = None
self.events = []
@require(internet)
def update(self):
if self.service is None:
self.connect_service()
self.refresh_events()
def on_click(self, event):
user_open(event.htmlLink())
def connect_service(self):
self.logger.debug("Connecting Service..")
self.credentials = oauth2client.file.Storage(self.credential_path).get()
self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http()))
def refresh_events(self):
"""
Retrieve the next N events from Google.
"""
now = datetime.datetime.now(tz=pytz.UTC)
try:
now, later = self.get_timerange_formatted(now)
events_result = self.service.events().list(
calendarId='primary',
timeMin=now,
timeMax=later,
maxResults=10,
singleEvents=True,
orderBy='startTime',
timeZone='utc'
).execute()
self.events.clear()
for event in events_result.get('items', []):
self.events.append(GoogleCalendarEvent(event))
except HttpError as e:
if e.resp.status in (500, 503):
self.logger.warn("GoogleCalendar received %s while retrieving events" % e.resp.status)
else:
raise
def get_timerange_formatted(self, now):
"""
Return two ISO8601 formatted date strings, one for timeMin, the other for timeMax (to be consumed by get_events)
"""
later = now + datetime.timedelta(days=self.days)
return now.isoformat(), later.isoformat()

View File

@ -0,0 +1,67 @@
from datetime import date, timedelta
import khal
import khal.cli
import khal.settings
from i3pystatus.calendar import CalendarBackend, CalendarEvent, formatter
class KhalEvent(CalendarEvent):
def __init__(self, khal_event):
self.id = khal_event.uid
self.start = khal_event.start_local
self.end = khal_event.end_local
self.title = khal_event.summary
self.recurring = khal_event.recurring
self._calendar = khal_event.calendar
@formatter
def calendar(self):
return self._calendar
class Khal(CalendarBackend):
"""
Backend for Khal. Requires `khal` to be installed.
.. rubric:: Available formatters
* `{calendar}` Calendar event is from.
"""
settings = (
('config_path', 'Path to your khal.conf'),
('calendars', 'Restrict to these calendars pass as a list)'),
('days', 'Check for the next X days'),
)
required = ('config_path', 'calendars')
days = 7
config_path = None
calendars = None
def init(self):
self.collection = None
self.events = []
def open_connection(self):
self.logger.debug("Opening collection with config {}".format(self.config_path))
config = khal.settings.get_config(self.config_path)
self.collection = khal.cli.build_collection(config, None)
def update(self):
if self.collection is None:
self.open_connection()
events = []
for days in range(self.days):
events += list(self.collection.get_events_on(
date.today() + timedelta(days=days))
)
# filter out unwanted calendars
self.logger.debug("calendars %s" % self.calendars)
if self.calendars is not None:
events = [evt for evt in events if evt.calendar in self.calendars]
for event in events:
self.events.append(KhalEvent(event))

View File

@ -0,0 +1,103 @@
import sqlite3
from datetime import datetime
import pytz
from dateutil.tz import tzlocal
from i3pystatus.calendar import CalendarEvent, CalendarBackend, formatter
class Flag:
PRIVATE = 1
HAS_ATTENDEES = 2
HAS_PROPERTIES = 4
EVENT_ALLDAY = 8
HAS_RECURRENCE = 16
HAS_EXCEPTIONS = 32
HAS_ATTACHMENTS = 64
HAS_RELATIONS = 128
HAS_ALARMS = 256
RECURRENCE_ID_ALLDAY = 512
class LightningCalendarEvent(CalendarEvent):
def __init__(self, row):
self.id = row['id']
self.title = row['title']
self._event_start = row['event_start']
self._event_start_tz = row['event_start_tz']
self._event_end = row['event_end']
self._event_end_tz = row['event_end_tz']
self._flags = row['flags']
self._location = row['location'] or ''
@property
def recurring(self):
return (self._flags & Flag.HAS_RECURRENCE) != 0
@property
def end(self):
return self._convert_date(self._event_end, self._event_end_tz)
@property
def start(self):
return self._convert_date(self._event_start, self._event_start_tz)
@formatter
def location(self):
return self._location
def _convert_date(self, microseconds_from_epoch, timezone):
if timezone == 'floating':
tz = tzlocal()
else:
tz = pytz.timezone(timezone)
d = datetime.fromtimestamp(microseconds_from_epoch / 1000000, tz=pytz.UTC)
return d.astimezone(tz)
class Lightning(CalendarBackend):
"""
Backend for querying the Thunderbird's Lightning database. Requires `pytz` and `dateutil`.
.. rubric:: Available formatters
* `{location}` Where the event occurs
"""
settings = (
('database_path', 'Path to local.sqlite.'),
('days', 'Only show events between now and this many days in the future'),
)
required = ('database_path',)
days = 7
database_path = None
def update(self):
with sqlite3.connect(self.database_path) as connection:
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
cursor.execute("""
SELECT
id,
title,
event_start,
event_start_tz,
event_end,
event_end_tz,
flags,
cal_properties.value AS location
FROM cal_events
LEFT OUTER JOIN cal_properties ON cal_properties.item_id = id AND cal_properties.key = 'LOCATION'
WHERE
datetime(event_start / 1000000, 'unixepoch', 'localtime') < datetime('now', 'localtime', '+' || :days || ' days')
AND
datetime(event_start / 1000000, 'unixepoch', 'localtime') > datetime('now', 'localtime')
ORDER BY event_start ASC
""", dict(days=self.days))
self.events.clear()
for row in cursor:
self.events.append(LightningCalendarEvent(row))
cursor.close()

View File

@ -1,212 +0,0 @@
import datetime
import threading
import httplib2
import oauth2client
import pytz
from apiclient import discovery
from dateutil import parser
from googleapiclient.errors import HttpError
from i3pystatus import IntervalModule, logger
from i3pystatus.core.color import ColorRangeModule
from i3pystatus.core.util import user_open, internet, require
class GoogleCalendar(IntervalModule, ColorRangeModule):
"""
Simple module for displaying next Google Calendar event.
Requires the Google Calendar API package - https://developers.google.com/google-apps/calendar/quickstart/python.
Additionally requires the `colour`, `httplib2`, `oauth2client`, `pytz`, `apiclient` and `dateutil` modules.
All top level keys returned by the Google Calendar API can be used as formatters. Some
examples include:
.. rubric:: Available formatters
* `{kind}` type of event
* `{status}` eg, confirmed
* `{summary}` essentially the title
* `{remaining_time}` - how long remaining until the event
* `{start_time}` - when this event starts
* `{htmlLink}` link to the calendar event
"""
settings = (
('format', 'format string'),
("credential_path", "Path to credentials"),
("skip_recurring", "Skip recurring events."),
("update_interval", "How often (in seconds) to call the Goggle API and update events"),
("days", "Only show events between now and this many days in the future"),
("urgent_seconds", "Add urgent hint when this many seconds until event startTime"),
("urgent_blink", "Whether or not to blink when the within urgent_seconds of event start"),
("start_color", "Hex or English name for start of color range, eg '#00FF00' or 'green'"),
("end_color", "Hex or English name for end of color range, eg '#FF0000' or 'red'"),
)
required = ('credential_path',)
format = "{summary} ({remaining_time})"
credential_path = None
interval = 1
skip_recurring = True
update_interval = 60
days = 1
urgent_seconds = 300
urgent_blink = False
color = None
service = None
credentials = None
display_event = None
last_event_refresh = None
urgent_acknowledged = False
update_lock = threading.Lock()
on_rightclick = 'acknowledge'
on_leftclick = 'open_calendar'
def init(self):
self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.urgent_seconds * 2)
self.last_event_refresh = datetime.datetime.now(tz=pytz.UTC) - datetime.timedelta(seconds=self.update_interval)
@require(internet)
def run(self):
if self.service is None:
self.connect_service()
now = datetime.datetime.now(tz=pytz.UTC)
if self.should_update(now):
threading.Thread(target=self.update_display_event, args=(now,), daemon=True).start()
self.refresh_output(now)
def should_update(self, now):
"""
Whether or not we should update events.
"""
wait_window = self.last_event_refresh + datetime.timedelta(seconds=self.update_interval)
if self.display_event is None:
should_update = wait_window < now
elif self.display_event['start_time'] < now:
should_update = True
elif wait_window < now:
should_update = True
else:
should_update = False
return should_update and not self.update_lock.locked()
def update_display_event(self, now):
"""
Call the Google API and attempt to update the current event.
"""
with self.update_lock:
logger.debug("Retrieving events...".format(threading.current_thread().name))
self.last_event_refresh = now
for event in self.get_events(now):
# If we don't have a dateTime just make do with a date.
if 'dateTime' not in event['start']:
event['start_time'] = pytz.utc.localize(parser.parse(event['start']['date']))
else:
event['start_time'] = parser.parse(event['start']['dateTime'])
if 'recurringEventId' in event and self.skip_recurring:
continue
elif event['start_time'] < now:
continue
# It is possible for there to be no title...
if 'summary' not in event:
event['summary'] = '(no title)'
if self.display_event:
# If this is a new event, reset the urgent_acknowledged flag.
if self.display_event['id'] != event['id']:
self.urgent_acknowledged = False
self.display_event = event
return
self.display_event = None
def refresh_output(self, now):
"""
Build our output dict.
"""
if self.display_event:
start_time = self.display_event['start_time']
alert_time = now + datetime.timedelta(seconds=self.urgent_seconds)
self.display_event['remaining_time'] = str((start_time - now)).partition('.')[0]
urgent = self.is_urgent(alert_time, start_time, now)
color = self.get_color(now, start_time)
self.output = {
'full_text': self.format.format(**self.display_event),
'color': color,
'urgent': urgent
}
else:
self.output = {
'full_text': "",
}
def is_urgent(self, alert_time, start_time, now):
"""
Determine whether or not to set the urgent flag. If urgent_blink is set, toggles urgent flag
on and off every second.
"""
urgent = alert_time > start_time
if urgent and self.urgent_blink:
urgent = now.second % 2 == 0 and not self.urgent_acknowledged
return urgent
def get_events(self, now):
"""
Retrieve the next N events from Google.
"""
events = []
try:
now, later = self.get_timerange_formatted(now)
events_result = self.service.events().list(
calendarId='primary',
timeMin=now,
timeMax=later,
maxResults=10,
singleEvents=True,
orderBy='startTime',
timeZone='utc'
).execute()
events = events_result.get('items', [])
except HttpError as e:
if e.resp.status in (500, 503):
logger.warn("GoogleCalendar received %s while retrieving events" % e.resp.status)
else:
raise
return events
def get_timerange_formatted(self, now):
"""
Return two ISO8601 formatted date strings, one for timeMin, the other for timeMax (to be consumed by get_events)
"""
later = now + datetime.timedelta(days=self.days)
return now.isoformat(), later.isoformat()
def get_color(self, now, start_time):
seconds_to_event = (start_time - now).seconds
v = self.percentage(seconds_to_event, self.urgent_seconds)
color = self.get_gradient(v, self.colors)
return color
def connect_service(self):
logger.debug("Connecting Service..")
self.credentials = oauth2client.file.Storage(self.credential_path).get()
self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http()))
def open_calendar(self):
if self.display_event:
calendar_url = self.display_event.get('htmlLink', None)
if calendar_url:
user_open(calendar_url)
def acknowledge(self):
self.urgent_acknowledged = True