Some modifications to Google Calendar:

* Add amazing urgent_blink feature
	* Perform updates in a background thread
	* Be more consistent with time
	* Add some enlightening comments
This commit is contained in:
Facetoe 2017-01-23 20:19:49 +08:00
parent d90511597e
commit 6c9bb749b9

View File

@ -1,4 +1,5 @@
import datetime
import threading
import httplib2
import oauth2client
@ -6,9 +7,10 @@ import pytz
from apiclient import discovery
from dateutil import parser
from googleapiclient.errors import HttpError
from i3pystatus import IntervalModule, logger
from i3pystatus.core.color import ColorRangeModule
from i3pystatus.core.util import internet, require, user_open
from i3pystatus.core.util import user_open, internet, require
class GoogleCalendar(IntervalModule, ColorRangeModule):
@ -36,51 +38,105 @@ class GoogleCalendar(IntervalModule, ColorRangeModule):
('format', 'format string'),
("credential_path", "Path to credentials"),
("skip_recurring", "Skip recurring events."),
("update_interval", "How often (in seconds) to call the Goggle API and update events"),
("days", "Only show events between now and this many days in the future"),
("urgent_seconds", "Add urgent hint when this many seconds until event startTime"),
("urgent_blink", "Whether or not to blink when the within urgent_seconds of event start"),
("start_color", "Hex or English name for start of color range, eg '#00FF00' or 'green'"),
("end_color", "Hex or English name for end of color range, eg '#FF0000' or 'red'"),
)
required = ('credential_path',)
interval = 30
format = "{summary} ({remaining_time})"
credential_path = None
interval = 1
skip_recurring = True
update_interval = 60
days = 1
urgent_seconds = 300
urgent_blink = False
color = None
service = None
credentials = None
display_event = None
last_event_refresh = None
urgent_acknowledged = False
update_lock = threading.Lock()
def on_click(self, button, **kwargs):
self.open_calendar()
on_rightclick = 'acknowledge'
on_leftclick = 'open_calendar'
def init(self):
self.colors = self.get_hex_color_range(self.end_color, self.start_color, self.urgent_seconds * 2)
self.last_event_refresh = datetime.datetime.now(tz=pytz.UTC) - datetime.timedelta(seconds=self.update_interval)
self.connect_service()
@require(internet)
def run(self):
if not self.service:
self.connect_service()
now = datetime.datetime.now(tz=pytz.UTC)
if self.should_update(now):
threading.Thread(target=self.update_display_event, args=(now,), daemon=True).start()
self.refresh_output(now)
try:
self.display_event = self.get_next_event()
except ConnectionResetError as e:
logger.warn(e)
def should_update(self, now):
"""
Whether or not we should update events.
"""
wait_window = self.last_event_refresh + datetime.timedelta(seconds=self.update_interval)
if self.display_event is None:
should_update = wait_window < now
elif self.display_event['start_time'] < now:
should_update = True
elif wait_window < now:
should_update = True
else:
should_update = False
return should_update and not self.update_lock.locked()
def update_display_event(self, now):
"""
Call the Google API and attempt to update the current event.
"""
with self.update_lock:
logger.debug("Retrieving events...".format(threading.current_thread().name))
self.last_event_refresh = now
for event in self.get_events(now):
# If we don't have a dateTime just make do with a date.
if 'dateTime' not in event['start']:
event['start_time'] = pytz.utc.localize(parser.parse(event['start']['date']))
else:
event['start_time'] = parser.parse(event['start']['dateTime'])
if 'recurringEventId' in event and self.skip_recurring:
continue
elif event['start_time'] < now:
continue
# It is possible for there to be no title...
if 'summary' not in event:
event['summary'] = '(no title)'
if self.display_event:
# If this is a new event, reset the urgent_acknowledged flag.
if self.display_event['id'] != event['id']:
self.urgent_acknowledged = False
self.display_event = event
return
self.display_event = None
def refresh_output(self, now):
"""
Build our output dict.
"""
if self.display_event:
start_time = self.display_event['start_time']
now = datetime.datetime.now(tz=pytz.UTC)
alert_time = now + datetime.timedelta(seconds=self.urgent_seconds)
self.display_event['remaining_time'] = str((start_time - now)).partition('.')[0]
urgent = alert_time > start_time
urgent = self.is_urgent(alert_time, start_time, now)
color = self.get_color(now, start_time)
self.output = {
@ -89,38 +145,27 @@ class GoogleCalendar(IntervalModule, ColorRangeModule):
'urgent': urgent
}
else:
self.display_event = None
self.output = {
'full_text': "",
}
def connect_service(self):
self.credentials = oauth2client.file.Storage(self.credential_path).get()
self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http()))
def is_urgent(self, alert_time, start_time, now):
"""
Determine whether or not to set the urgent flag. If urgent_blink is set, toggles urgent flag
on and off every second.
"""
urgent = alert_time > start_time
if urgent and self.urgent_blink:
urgent = now.second % 2 == 0 and not self.urgent_acknowledged
return urgent
def get_next_event(self):
for event in self.get_events():
# If we don't have a dateTime just make do with a date.
if 'dateTime' not in event['start']:
event['start_time'] = pytz.utc.localize(parser.parse(event['start']['date']))
else:
event['start_time'] = parser.parse(event['start']['dateTime'])
now = datetime.datetime.now(tz=pytz.UTC)
if 'recurringEventId' in event and self.skip_recurring:
continue
elif event['start_time'] < now:
continue
# It is possible for there to be no title...
if 'summary' not in event:
event['summary'] = '(no title)'
return event
def get_events(self):
def get_events(self, now):
"""
Retrieve the next N events from Google.
"""
events = []
try:
now, later = self.get_timerange()
now, later = self.get_timerange_formatted(now)
events_result = self.service.events().list(
calendarId='primary',
timeMin=now,
@ -138,12 +183,12 @@ class GoogleCalendar(IntervalModule, ColorRangeModule):
raise
return events
def get_timerange(self):
now = datetime.datetime.utcnow()
def get_timerange_formatted(self, now):
"""
Return two ISO8601 formatted date strings, one for timeMin, the other for timeMax (to be consumed by get_events)
"""
later = now + datetime.timedelta(days=self.days)
now = now.isoformat() + 'Z'
later = later.isoformat() + 'Z'
return now, later
return now.isoformat(), later.isoformat()
def get_color(self, now, start_time):
seconds_to_event = (start_time - now).seconds
@ -151,8 +196,16 @@ class GoogleCalendar(IntervalModule, ColorRangeModule):
color = self.get_gradient(v, self.colors)
return color
def connect_service(self):
logger.debug("Connecting Service..")
self.credentials = oauth2client.file.Storage(self.credential_path).get()
self.service = discovery.build('calendar', 'v3', http=self.credentials.authorize(httplib2.Http()))
def open_calendar(self):
if self.display_event:
calendar_url = self.display_event.get('htmlLink', None)
if calendar_url:
user_open(calendar_url)
def acknowledge(self):
self.urgent_acknowledged = True