commit
e9b1976e1e
@ -60,7 +60,7 @@ class IOHandler:
|
||||
Raises EOFError if the end of stream has been reached"""
|
||||
|
||||
try:
|
||||
line = self.inp.readline().decode("utf-8").strip()
|
||||
line = self.inp.readline().strip()
|
||||
except KeyboardInterrupt:
|
||||
raise EOFError()
|
||||
|
||||
|
@ -5,12 +5,22 @@ from i3pystatus import (
|
||||
I3statusHandler,
|
||||
mailchecker,
|
||||
modsde,
|
||||
notmuchmailchecker,
|
||||
notmuch,
|
||||
thunderbird,
|
||||
regex,
|
||||
)
|
||||
|
||||
status = I3statusHandler()
|
||||
|
||||
# Regular expression file watcher
|
||||
# If you're using a thinkpad, chances are that this displays your fan speed and level
|
||||
regexsettings = {
|
||||
"regex": "speed:\s+([0-9]+)\nlevel:\s+([a-zA-Z0-9]+)",
|
||||
"file": "/proc/acpi/ibm/fan",
|
||||
"format": "{0} [{1}]",
|
||||
}
|
||||
status.register(regex.Regex(regexsettings))
|
||||
|
||||
# The imap checker module
|
||||
mailsettings = {
|
||||
"color": "#ff0000",
|
||||
@ -46,8 +56,9 @@ status.register_module(mde)
|
||||
|
||||
# the notmuch mail checker module
|
||||
db_path = "path_to_your_notmuch_database"
|
||||
notmuch = notmuchmailchecker.NotmuchMailChecker(db_path)
|
||||
status.register_module(notmuch)
|
||||
nm = notmuch.NotmuchMailChecker(db_path)
|
||||
status.register_module(nm)
|
||||
|
||||
|
||||
# the thunderbird dbus new mail checker module
|
||||
tb = thunderbirdnewmail.ThunderbirdMailChecker()
|
||||
|
@ -20,27 +20,21 @@ class MailChecker(IntervalModule):
|
||||
"servers": []
|
||||
}
|
||||
|
||||
servers = []
|
||||
|
||||
def __init__(self, settings = None):
|
||||
self.settings.update(settings)
|
||||
|
||||
for server in settings["servers"]:
|
||||
srv = MailChecker.MailServer(server)
|
||||
self.servers.append(srv)
|
||||
self.servers = list(map(MailChecker.MailServer, settings["servers"]))
|
||||
|
||||
def run(self):
|
||||
unread = sum([server.get_unread_count() for server in self.servers])
|
||||
unread = sum(map(lambda server: server.get_unread_count(), self.servers))
|
||||
|
||||
if not unread:
|
||||
return None
|
||||
|
||||
self.output = {
|
||||
"full_text" : "%d new email%s" % (unread, ("s" if unread > 1 else "")),
|
||||
"name" : "newmail",
|
||||
"urgent" : "true",
|
||||
"color" : self.settings["color"]
|
||||
}
|
||||
if unread:
|
||||
self.output = {
|
||||
"full_text" : "%d new email%s" % (unread, ("s" if unread > 1 else "")),
|
||||
"name" : "newmail",
|
||||
"urgent" : "true",
|
||||
"color" : self.settings["color"]
|
||||
}
|
||||
|
||||
class MailServer:
|
||||
"""
|
||||
@ -50,20 +44,13 @@ class MailChecker(IntervalModule):
|
||||
tries to reconnect. It checks every "pause" seconds.
|
||||
"""
|
||||
|
||||
host = ""
|
||||
port = ""
|
||||
imap_class = imaplib.IMAP4
|
||||
username = ""
|
||||
password = ""
|
||||
connection = None
|
||||
|
||||
def __init__(self, settings_dict):
|
||||
self.host = settings_dict["host"]
|
||||
self.port = settings_dict["port"]
|
||||
self.username = settings_dict["username"]
|
||||
self.password = settings_dict["password"]
|
||||
self.__dict__.update(settings_dict)
|
||||
|
||||
if settings_dict["ssl"]:
|
||||
if self.ssl:
|
||||
self.imap_class = imaplib.IMAP4_SSL
|
||||
|
||||
def get_connection(self):
|
||||
@ -87,5 +74,5 @@ class MailChecker(IntervalModule):
|
||||
conn = self.get_connection()
|
||||
if conn:
|
||||
unread += len(conn.search(None,"UnSeen")[1][0].split())
|
||||
|
||||
|
||||
return unread
|
||||
|
30
i3pystatus/regex.py
Normal file
30
i3pystatus/regex.py
Normal file
@ -0,0 +1,30 @@
|
||||
import re
|
||||
|
||||
from i3pystatus import IntervalModule
|
||||
|
||||
class Regex(IntervalModule):
|
||||
"""
|
||||
Simple regex file watcher
|
||||
|
||||
Settings:
|
||||
* flags — Python.re flags
|
||||
* regex — regular expression
|
||||
* file — file to search for regex matches
|
||||
* format — new-style format string used for output, default is "{0}"
|
||||
"""
|
||||
|
||||
flags = 0
|
||||
format = "{0}"
|
||||
|
||||
def __init__(self, settings):
|
||||
self.__dict__.update(settings)
|
||||
|
||||
self.re = re.compile(self.regex, self.flags)
|
||||
|
||||
def run(self):
|
||||
with open(self.file, "r") as f:
|
||||
match = self.re.search(f.read())
|
||||
self.output = self.output = {
|
||||
"full_text" : self.format.format(*match.groups()),
|
||||
"name" : "regex",
|
||||
}
|
@ -7,15 +7,17 @@
|
||||
# The plugin must be active and thunderbird running for the module to work
|
||||
# properly.
|
||||
|
||||
import dbus, gobject
|
||||
from dbus.mainloop.glib import DBusGMainLoop
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
from i3pystatus import AsyncModule
|
||||
import dbus, gobject
|
||||
from dbus.mainloop.glib import DBusGMainLoop
|
||||
|
||||
class ThunderbirdMailChecker(AsyncModule):
|
||||
from i3pystatus import IntervalModule
|
||||
|
||||
class ThunderbirdMailChecker(IntervalModule):
|
||||
"""
|
||||
This class listens for dbus signals emitted by
|
||||
the dbus-sender extension for thunderbird.
|
||||
@ -24,8 +26,8 @@ class ThunderbirdMailChecker(AsyncModule):
|
||||
settings = {
|
||||
"format": "%d new email"
|
||||
}
|
||||
|
||||
unread = set()
|
||||
interval = 1
|
||||
|
||||
def __init__(self, settings=None):
|
||||
if settings is not None:
|
||||
@ -43,10 +45,7 @@ class ThunderbirdMailChecker(AsyncModule):
|
||||
dbus.mainloop.glib.threads_init()
|
||||
self.context = loop.get_context()
|
||||
|
||||
def mainloop(self):
|
||||
while True:
|
||||
self.context.iteration(False)
|
||||
time.sleep(1)
|
||||
self.run = partial(self.context.iteration, False)
|
||||
|
||||
def new_msg(self, id, author, subject):
|
||||
if id not in self.unread:
|
||||
@ -59,7 +58,7 @@ class ThunderbirdMailChecker(AsyncModule):
|
||||
self._output()
|
||||
|
||||
def _output(self):
|
||||
self.context.iteration(False)
|
||||
self.run()
|
||||
|
||||
unread = len(self.unread)
|
||||
if unread:
|
||||
|
Loading…
Reference in New Issue
Block a user