This commit is contained in:
enkore 2013-10-01 15:22:09 +02:00
parent 21cd318c29
commit 78c01dd3e5
35 changed files with 416 additions and 162 deletions

View File

@ -16,10 +16,12 @@ __all__ = [
"formatp",
]
def main():
parser = argparse.ArgumentParser(description="A replacement for i3status")
parser.add_argument("-c", "--config", action="store", help="Config file")
parser.add_argument("-t", "--test", action="store_true", help="Test modules")
parser.add_argument(
"-t", "--test", action="store_true", help="Test modules")
args = parser.parse_args()
config = Config(config_file=args.config)

View File

@ -2,7 +2,9 @@ from alsaaudio import Mixer, ALSAAudioError
from i3pystatus import IntervalModule
class ALSA(IntervalModule):
"""
Shows volume of ALSA mixer. You can also use this for inputs, btw.
@ -54,7 +56,8 @@ class ALSA(IntervalModule):
}
def create_mixer(self):
self.alsamixer = Mixer(control=self.mixer, id=self.mixer_id, cardindex=self.card)
self.alsamixer = Mixer(
control=self.mixer, id=self.mixer_id, cardindex=self.card)
def run(self):
self.create_mixer()
@ -62,10 +65,11 @@ class ALSA(IntervalModule):
muted = False
if self.has_mute:
muted = self.alsamixer.getmute()[self.channel] == 1
self.fdict["volume"] = self.alsamixer.getvolume()[self.channel]
self.fdict["muted"] = self.muted if muted else self.muted
self.output = {
"full_text" : self.format.format(**self.fdict),
"full_text": self.format.format(**self.fdict),
"color": self.color_muted if muted else self.color,
}

View File

@ -1,6 +1,8 @@
from i3pystatus.file import File
class Backlight(File):
"""
Screen backlight info
@ -17,15 +19,15 @@ class Backlight(File):
)
required = ()
backlight="acpi_video0"
format="{brightness}/{max_brightness}"
backlight = "acpi_video0"
format = "{brightness}/{max_brightness}"
base_path = "/sys/class/backlight/{backlight}/"
components={
components = {
"brightness": (int, "brightness"),
"max_brightness": (int, "max_brightness"),
}
transforms={
transforms = {
"percentage": lambda cdict: (cdict["brightness"] / cdict["max_brightness"]) * 100,
}

View File

@ -4,11 +4,13 @@
import re
import configparser
from i3pystatus import IntervalModule
from i3pystatus import IntervalModule, formatp
from i3pystatus.core.util import PrefixedKeyDict, lchop, TimeWrapper
from i3pystatus.core.desktop import display_notification
class UEventParser(configparser.ConfigParser):
@staticmethod
def parse_file(file):
parser = UEventParser()
@ -25,7 +27,9 @@ class UEventParser(configparser.ConfigParser):
def read_string(self, string):
super().read_string("[id10t]\n" + string)
class Battery:
@staticmethod
def create(from_file):
batinfo = UEventParser.parse_file(from_file)
@ -52,33 +56,41 @@ class Battery:
else:
return "Full"
class BatteryCharge(Battery):
def consumption(self):
return self.bat["VOLTAGE_NOW"] * self.bat["CURRENT_NOW"] # V * A = W
return self.bat["VOLTAGE_NOW"] * self.bat["CURRENT_NOW"] # V * A = W
def _percentage(self, design):
return self.bat["CHARGE_NOW"] / self.bat["CHARGE_FULL"+design]
return self.bat["CHARGE_NOW"] / self.bat["CHARGE_FULL" + design]
def remaining(self):
if self.status() == "Discharging":
return self.bat["CHARGE_NOW"] / self.bat["CURRENT_NOW"] * 60 # Ah / A = h * 60 min = min
# Ah / A = h * 60 min = min
return self.bat["CHARGE_NOW"] / self.bat["CURRENT_NOW"] * 60
else:
return (self.bat["CHARGE_FULL"] - self.bat["CHARGE_NOW"]) / self.bat["CURRENT_NOW"] * 60
class BatteryEnergy(Battery):
def consumption(self):
return self.bat["POWER_NOW"]
def _percentage(self, design):
return self.bat["ENERGY_NOW"] / self.bat["ENERGY_FULL"+design]
return self.bat["ENERGY_NOW"] / self.bat["ENERGY_FULL" + design]
def remaining(self):
if self.status() == "Discharging":
return self.bat["ENERGY_NOW"] / self.bat["POWER_NOW"] * 60 # Wh / W = h * 60 min = min
# Wh / W = h * 60 min = min
return self.bat["ENERGY_NOW"] / self.bat["POWER_NOW"] * 60
else:
return (self.bat["ENERGY_FULL"] - self.bat["ENERGY_NOW"]) / self.bat["POWER_NOW"] * 60
class BatteryChecker(IntervalModule):
"""
This class uses the /sys/class/power_supply//uevent interface to check for the
battery status
@ -98,8 +110,10 @@ class BatteryChecker(IntervalModule):
"format",
("alert", "Display a libnotify-notification on low battery"),
"alert_percentage",
("alert_format_title", "The title of the notification, all formatters can be used"),
("alert_format_body", "The body text of the notification, all formatters can be used"),
("alert_format_title",
"The title of the notification, all formatters can be used"),
("alert_format_body",
"The body text of the notification, all formatters can be used"),
("path", "Override the default-generated path"),
("status", "A dictionary mapping ('DIS', 'CHR', 'FULL') to alternative names"),
)
@ -110,7 +124,7 @@ class BatteryChecker(IntervalModule):
"DIS": "DIS",
"FULL": "FULL",
}
alert = False
alert_percentage = 10
alert_format_title = "Low battery"
@ -120,7 +134,8 @@ class BatteryChecker(IntervalModule):
def init(self):
if not self.path:
self.path = "/sys/class/power_supply/{0}/uevent".format(self.battery_ident)
self.path = "/sys/class/power_supply/{0}/uevent".format(
self.battery_ident)
def run(self):
urgent = False
@ -152,8 +167,8 @@ class BatteryChecker(IntervalModule):
if self.alert and fdict["status"] == "DIS" and fdict["percentage"] <= self.alert_percentage:
display_notification(
title=self.alert_format_title.format(**fdict),
body=self.alert_format_body.format(**fdict),
title=formatp(self.alert_format_title, **fdict),
body=formatp(self.alert_format_body, **fdict),
icon="battery-caution",
urgency=2,
)
@ -161,7 +176,7 @@ class BatteryChecker(IntervalModule):
fdict["status"] = self.status[fdict["status"]]
self.output = {
"full_text": self.format.format(**fdict).strip(),
"full_text": formatp(self.format, **fdict).strip(),
"instance": self.battery_ident,
"urgent": urgent,
"color": color

View File

@ -6,7 +6,9 @@ from threading import Thread
from i3pystatus.core import io, util
from i3pystatus.core.modules import Module, START_HOOKS
class Status:
def __init__(self, standalone=False, interval=1, input_stream=sys.stdin):
self.standalone = standalone
if standalone:
@ -28,7 +30,7 @@ class Status:
return None
def run_command_endpoint(self):
for command in io.JSONIO(io=io.IOHandler(sys.stdin, open(os.devnull,"w")), skiplines=1).read():
for command in io.JSONIO(io=io.IOHandler(sys.stdin, open(os.devnull, "w")), skiplines=1).read():
module = self.modules.get_by_id(command["instance"])
if module:
module.on_click(command["button"])

View File

@ -18,6 +18,7 @@ SEARCHPATH = (
class ConfigFinder:
def __init__(self, searchpath=SEARCHPATH):
self.searchpath = searchpath
@ -37,10 +38,12 @@ class ConfigFinder:
else:
failed.append(path)
raise RuntimeError("Didn't find a config file, tried\n * {mods}".format(mods="\n * ".join(failed)))
raise RuntimeError(
"Didn't find a config file, tried\n * {mods}".format(mods="\n * ".join(failed)))
class Config:
def __init__(self, config_file=None):
self.finder = ConfigFinder()
self.config_file = config_file or self.finder.find_config_file()
@ -52,12 +55,15 @@ class Config:
@contextlib.contextmanager
def setup():
import i3pystatus
class TestStatus(i3pystatus.Status):
def run(self):
self.modules.reverse()
self.call_start_hooks()
for module in self.modules:
sys.stdout.write("{module}: ".format(module=module.__name__))
sys.stdout.write(
"{module}: ".format(module=module.__name__))
sys.stdout.flush()
test = module.test()
if test is not True:
@ -72,5 +78,6 @@ class Config:
i3pystatus.Status = i3pystatus.Status.__bases__[0]
with setup():
print("Using configuration file {file}\n".format(file=self.config_file))
print(
"Using configuration file {file}\n".format(file=self.config_file))
self.run()

View File

@ -1,27 +1,34 @@
class ConfigError(Exception):
"""ABC for configuration exceptions"""
def __init__(self, module, *args, **kwargs):
message = "Module '{0}': {1}".format(module, self.format(*args, **kwargs))
message = "Module '{0}': {1}".format(
module, self.format(*args, **kwargs))
super().__init__(message)
class ConfigKeyError(ConfigError, KeyError):
def format(self, key):
return "invalid option '{0}'".format(key)
class ConfigMissingError(ConfigError):
def format(self, missing):
return "missing required options: {0}".format(missing)
class ConfigAmbigiousClassesError(ConfigError):
def format(self, ambigious_classes):
return "ambigious module specification, found multiple classes: {0}".format(ambigious_classes)
class ConfigInvalidModuleError(ConfigError):
def format(self):
return "no class found"

View File

@ -4,6 +4,7 @@ from importlib import import_module
class ClassFinder:
"""Support class to find classes of specific bases in a module"""
def __init__(self, baseclass):
@ -44,5 +45,6 @@ class ClassFinder:
elif inspect.isclass(module) and issubclass(module, self.baseclass):
return module(*args, **kwargs)
elif args or kwargs:
raise ValueError("Additional arguments are invalid if 'module' is already an object")
raise ValueError(
"Additional arguments are invalid if 'module' is already an object")
return module

View File

@ -5,7 +5,9 @@ import sys
import threading
from contextlib import contextmanager
class IOHandler:
def __init__(self, inp=sys.stdin, out=sys.stdout):
self.inp = inp
self.out = out
@ -42,7 +44,9 @@ class IOHandler:
raise EOFError()
return line
class StandaloneIO(IOHandler):
"""
I/O handler for standalone usage of i3pystatus (w/o i3status)
@ -69,9 +73,11 @@ class StandaloneIO(IOHandler):
def read_line(self):
self.n += 1
return self.proto[min(self.n, len(self.proto)-1)]
return self.proto[min(self.n, len(self.proto) - 1)]
class JSONIO:
def __init__(self, io, skiplines=2):
self.io = io
for i in range(skiplines):

View File

@ -9,6 +9,7 @@ __all__ = [
"Module", "AsyncModule", "IntervalModule",
]
class Module(SettingsBase):
output = None
position = 0
@ -34,9 +35,9 @@ class Module(SettingsBase):
pass
def on_click(self, button):
if button == 1: # Left mouse button
if button == 1: # Left mouse button
self.on_leftclick()
elif button == 3: # Right mouse button
elif button == 3: # Right mouse button
self.on_rightclick()
@chain
@ -52,8 +53,9 @@ class Module(SettingsBase):
def __repr__(self):
return self.__class__.__name__
class IntervalModule(Module):
interval = 5 # seconds
interval = 5 # seconds
managers = {}
def registered(self, status_handler):

View File

@ -1,7 +1,9 @@
from i3pystatus.core.util import KeyConstraintDict
from i3pystatus.core.exceptions import ConfigKeyError, ConfigMissingError
class SettingsBase:
"""
Support class for providing a nice and flexible settings interface
@ -26,6 +28,7 @@ class SettingsBase:
def __init__(self, *args, **kwargs):
def flatten_setting(setting):
return setting[0] if isinstance(setting, tuple) else setting
def flatten_settings(settings):
return tuple(flatten_setting(setting) for setting in settings)
@ -44,14 +47,16 @@ class SettingsBase:
try:
sm.update(settings_source)
except KeyError as exc:
raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc
raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc
try:
self.__dict__.update(sm)
except KeyConstraintDict.MissingKeys as exc:
raise ConfigMissingError(type(self).__name__, missing=exc.keys) from exc
raise ConfigMissingError(
type(self).__name__, missing=exc.keys) from exc
self.__name__ = "{}.{}".format(self.__module__, self.__class__.__name__)
self.__name__ = "{}.{}".format(
self.__module__, self.__class__.__name__)
self.init()

View File

@ -8,6 +8,7 @@ timer = time.perf_counter if hasattr(time, "perf_counter") else time.clock
class Thread(threading.Thread):
def __init__(self, target_interval, workloads=None, start_barrier=1):
super().__init__()
self.workloads = workloads or []
@ -36,7 +37,8 @@ class Thread(threading.Thread):
time.sleep(0.4)
def execute_workloads(self):
for workload in self: workload()
for workload in self:
workload()
self.workloads.sort(key=lambda workload: workload.time)
def run(self):
@ -54,6 +56,7 @@ class Thread(threading.Thread):
class Wrapper:
def __init__(self, workload):
self.workload = workload
@ -62,6 +65,7 @@ class Wrapper:
class ExceptionWrapper(Wrapper):
def __call__(self):
try:
self.workload()
@ -84,6 +88,7 @@ class WorkloadWrapper(Wrapper):
class Manager:
def __init__(self, target_interval):
self.target_interval = target_interval
self.upper_bound = target_interval * 1.1
@ -108,7 +113,8 @@ class Manager:
return partition(workloads, self.lower_bound, lambda workload: workload.time)
def create_threads(self, threads):
for workloads in threads: self.create_thread(workloads)
for workloads in threads:
self.create_thread(workloads)
def create_thread(self, workloads):
thread = Thread(self.target_interval, workloads, start_barrier=0)
@ -119,4 +125,5 @@ class Manager:
self.threads[0].append(self.wrap(workload))
def start(self):
for thread in self.threads: thread.start()
for thread in self.threads:
thread.start()

View File

@ -7,17 +7,20 @@ import string
from i3pystatus.core.exceptions import *
from i3pystatus.core.imputil import ClassFinder
def chain(fun):
def chained(self, *args, **kwargs):
fun(self, *args, **kwargs)
return self
return chained
def lchop(string, prefix):
if string.startswith(prefix):
return string[len(prefix):]
return string
def popwhile(predicate, iterable):
while iterable:
item = iterable.pop()
@ -26,6 +29,7 @@ def popwhile(predicate, iterable):
else:
break
def partition(iterable, limit, key=lambda x: x):
def pop_partition():
sum = 0.0
@ -40,18 +44,22 @@ def partition(iterable, limit, key=lambda x: x):
return partitions
def round_dict(dic, places):
for key, value in dic.items():
dic[key] = round(value, places)
class ModuleList(collections.UserList):
def __init__(self, status_handler, module_base):
self.status_handler = status_handler
self.finder = ClassFinder(module_base)
super().__init__()
def append(self, module, *args, **kwargs):
module = self.finder.instanciate_class_from_module(module, *args, **kwargs)
module = self.finder.instanciate_class_from_module(
module, *args, **kwargs)
module.registered(self.status_handler)
super().append(module)
return module
@ -63,7 +71,9 @@ class ModuleList(collections.UserList):
return module
return None
class PrefixedKeyDict(collections.UserDict):
def __init__(self, prefix):
super().__init__()
@ -72,8 +82,11 @@ class PrefixedKeyDict(collections.UserDict):
def __setitem__(self, key, value):
super().__setitem__(self.prefix + key, value)
class KeyConstraintDict(collections.UserDict):
class MissingKeys(Exception):
def __init__(self, keys):
self.keys = keys
@ -104,11 +117,13 @@ class KeyConstraintDict(collections.UserDict):
def missing(self):
return self.required_keys - (self.seen_keys & self.required_keys)
def convert_position(pos, json):
if pos < 0:
pos = len(json) + (pos+1)
pos = len(json) + (pos + 1)
return pos
def flatten(l):
l = list(l)
i = 0
@ -123,6 +138,7 @@ def flatten(l):
i += 1
return l
def formatp(string, **kwargs):
"""
Function for advanced format strings with partial formatting
@ -149,17 +165,25 @@ def formatp(string, **kwargs):
"""
class Token:
string = ""
def __repr__(self):
return "<%s> " % self.__class__.__name__
class OpeningBracket(Token):
def __repr__(self):
return "<Group>"
class ClosingBracket(Token):
def __repr__(self):
return "</Group>"
class String(Token):
def __init__(self, str):
self.string = str
def __repr__(self):
return super().__repr__() + repr(self.string)
@ -187,9 +211,11 @@ def formatp(string, **kwargs):
if prev != "\\" and char in TOKENS:
token = TOKENS[char]()
token.index = next
if char == "]": level -= 1
if char == "]":
level -= 1
token.level = level
if char == "[": level += 1
if char == "[":
level += 1
stack.append(token)
else:
if stack and isinstance(stack[-1], String):
@ -214,7 +240,7 @@ def formatp(string, **kwargs):
while items[0].level > level:
nested.append(items.pop(0))
if nested:
subtree.append(build_tree(nested, level+1))
subtree.append(build_tree(nested, level + 1))
item = items.pop(0)
if item.string:
@ -242,7 +268,9 @@ def formatp(string, **kwargs):
formatp.field_re = re.compile(r"({(\w+)[^}]*})")
class TimeWrapper:
class TimeTemplate(string.Template):
delimiter = "%"
idpattern = r"[a-zA-Z]"

View File

@ -3,7 +3,9 @@ import os
from i3pystatus import IntervalModule
from .core.util import round_dict
class Disk(IntervalModule):
"""
Gets `{used}`, `{free}`, `{available}` and `{total}` amount of bytes on the given mounted filesystem.
@ -18,7 +20,7 @@ class Disk(IntervalModule):
required = ("path",)
color = "#FFFFFF"
format = "{free}/{avail}"
divisor = 1024**3
divisor = 1024 ** 3
def run(self):
cdict = {}

View File

@ -2,7 +2,9 @@ from os.path import join
from i3pystatus import IntervalModule
class File(IntervalModule):
"""
Rip information from text files

View File

@ -1,13 +1,16 @@
from i3pystatus import IntervalModule
class Load(IntervalModule):
"""
Shows system load
"""
format = "{avg1} {avg5}"
settings = (
("format", "format string used for output. {avg1}, {avg5} and {avg15} are the load average of the last one, five and fifteen minutes, respectively. {tasks} is the number of tasks (i.e. 1/285, which indiciates that one out of 285 total tasks is runnable)."),
("format",
"format string used for output. {avg1}, {avg5} and {avg15} are the load average of the last one, five and fifteen minutes, respectively. {tasks} is the number of tasks (i.e. 1/285, which indiciates that one out of 285 total tasks is runnable)."),
)
file = "/proc/loadavg"
@ -17,5 +20,5 @@ class Load(IntervalModule):
avg1, avg5, avg15, tasks, lastpid = f.read().split(" ", 5)
self.output = {
"full_text" : self.format.format(avg1=avg1, avg5=avg5, avg15=avg15, tasks=tasks),
"full_text": self.format.format(avg1=avg1, avg5=avg5, avg15=avg15, tasks=tasks),
}

View File

@ -1,7 +1,9 @@
from i3pystatus import SettingsBase, IntervalModule
class Backend(SettingsBase):
"""Handles the details of checking for mail"""
unread = 0
@ -9,7 +11,9 @@ class Backend(SettingsBase):
You'll probably implement that as a property"""
class Mail(IntervalModule):
"""
Generic mail checker
@ -29,7 +33,7 @@ class Mail(IntervalModule):
required = ("backends",)
color = "#ffffff"
color_unread ="#ff0000"
color_unread = "#ff0000"
format = "{unread} new email"
format_plural = "{unread} new emails"
hide_if_null = True
@ -56,7 +60,7 @@ class Mail(IntervalModule):
format = self.format_plural
self.output = {
"full_text" : format.format(unread=unread),
"urgent" : urgent,
"color" : color,
"full_text": format.format(unread=unread),
"urgent": urgent,
"color": color,
}

View File

@ -3,16 +3,18 @@
import sys
import json
from datetime import datetime,timedelta
from datetime import datetime, timedelta
import imaplib
from i3pystatus.mail import Backend
class IMAP(Backend):
"""
Checks for mail on a IMAP server
"""
settings = (
"host", "port",
"username", "password",
@ -52,7 +54,7 @@ class IMAP(Backend):
def unread(self):
conn = self.get_connection()
if conn:
return len(conn.search(None,"UnSeen")[1][0].split())
return len(conn.search(None, "UnSeen")[1][0].split())
else:
sys.stderr.write("no connection")

View File

@ -8,7 +8,9 @@ import json
from i3pystatus.mail import Backend
class Notmuch(Backend):
"""
This class uses the notmuch python bindings to check for the
number of messages in the notmuch database with the tags "inbox"

View File

@ -18,7 +18,9 @@ from gi.repository import GObject
from i3pystatus.mail import Backend
class Thunderbird(Backend):
"""
This class listens for dbus signals emitted by
the dbus-sender extension for thunderbird.

View File

@ -23,6 +23,7 @@ __Settings:__
{endstring}\n"""
class Module:
name = ""
doc = ""
@ -36,7 +37,8 @@ class Module:
if neighbours == 1:
self.name = module_name
else:
self.name = "{module}.{cls}".format(module=module_name, cls=self.cls.__name__)
self.name = "{module}.{cls}".format(
module=module_name, cls=self.cls.__name__)
self.doc = self.cls.__doc__ or module.__doc__ or ""
@ -61,6 +63,7 @@ class Module:
endstring=self.endstring
)
class Setting:
doc = ""
required = False
@ -95,6 +98,7 @@ class Setting:
return formatted
def get_modules(path):
modules = []
for finder, modname, ispkg in pkgutil.iter_modules(path):
@ -102,11 +106,13 @@ def get_modules(path):
modules.append(get_module(finder, modname))
return modules
def get_module(finder, modname):
fullname = "i3pystatus.{modname}".format(modname=modname)
return (modname, finder.find_loader(fullname)[0].load_module(fullname))
def get_all(module_path, heading, finder=None):
def get_all(module_path, heading, finder=None, ignore=None):
mods = []
if not finder:
finder = ClassFinder(i3pystatus.Module)
@ -117,17 +123,21 @@ def get_all(module_path, heading, finder=None):
for cls in classes:
if cls.__name__ not in found:
found.append(cls.__name__)
mods.append(Module(cls, neighbours=len(classes), module_name=name, module=module, heading=heading))
mods.append(
Module(cls, neighbours=len(classes), module_name=name, module=module, heading=heading))
return sorted(mods, key=lambda module: module.name)
def generate_doc_for_module(module_path, heading="###", finder=None):
return "".join(map(str, get_all(module_path, heading, finder)))
def generate_doc_for_module(module_path, heading="###", finder=None, ignore=None):
return "".join(map(str, get_all(module_path, heading, finder, ignore or [])))
with open("README.tpl.md", "r") as template:
tpl = template.read()
tpl = tpl.replace("!!module_doc!!", generate_doc_for_module(i3pystatus.__path__))
tpl = tpl.replace(
"!!module_doc!!", generate_doc_for_module(i3pystatus.__path__))
finder = ClassFinder(baseclass=i3pystatus.mail.Backend)
tpl = tpl.replace("!!i3pystatus.mail!!", generate_doc_for_module(i3pystatus.mail.__path__, "###", finder).replace("\n", "\n> "))
tpl = tpl.replace("!!i3pystatus.mail!!", generate_doc_for_module(
i3pystatus.mail.__path__, "###", finder, ["Backend"]).replace("\n", "\n> "))
with open("README.md", "w") as output:
output.write(tpl + "\n")

View File

@ -4,7 +4,9 @@ import sys
import json
import time
import threading
import urllib.request, urllib.parse, urllib.error
import urllib.request
import urllib.parse
import urllib.error
import re
import http.cookiejar
import xml.etree.ElementTree as ET
@ -12,14 +14,17 @@ import webbrowser
from i3pystatus import IntervalModule
class ModsDeChecker(IntervalModule):
"""
This class returns i3status parsable output of the number of
unread posts in any bookmark in the mods.de forums.
"""
settings = (
("format", """Use {unread} as the formatter for number of unread posts"""),
("format",
"""Use {unread} as the formatter for number of unread posts"""),
("offset", """subtract number of posts before output"""),
"color", "username", "password"
)
@ -37,7 +42,8 @@ class ModsDeChecker(IntervalModule):
def init(self):
self.cj = http.cookiejar.CookieJar()
self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cj))
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cj))
def run(self):
unread = self.get_unread_count()
@ -46,9 +52,9 @@ class ModsDeChecker(IntervalModule):
self.output = None
else:
self.output = {
"full_text" : self.format.format(unread=unread),
"urgent" : "true",
"color" : self.color
"full_text": self.format.format(unread=unread),
"urgent": "true",
"color": self.color
}
def get_unread_count(self):
@ -61,7 +67,8 @@ class ModsDeChecker(IntervalModule):
return int(root.attrib["newposts"]) - self.offset
except Exception:
self.cj.clear()
self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cj))
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cj))
self.logged_in = False
def test(self):
@ -92,7 +99,8 @@ class ModsDeChecker(IntervalModule):
for cookie in self.cj:
self.cj.clear
self.logged_in = True
self.opener.addheaders.append(("Cookie", "{}={}".format(cookie.name, cookie.value)))
self.opener.addheaders.append(
("Cookie", "{}={}".format(cookie.name, cookie.value)))
return True
return False

View File

@ -4,10 +4,13 @@ import socket
from i3pystatus import IntervalModule, formatp
from i3pystatus.core.util import TimeWrapper
def format_time(seconds):
return "{}:{:02}".format(*divmod(int(seconds), 60)) if seconds else ""
class MPD(IntervalModule):
"""
Displays various information from MPD (the music player daemon)
@ -71,7 +74,7 @@ class MPD(IntervalModule):
currentsong = self._mpd_command(s, "currentsong")
fdict = {
"pos": int(status.get("song", 0))+1,
"pos": int(status.get("song", 0)) + 1,
"len": int(status["playlistlength"]),
"status": self.status[status["state"]],
"volume": int(status["volume"]),
@ -92,13 +95,14 @@ class MPD(IntervalModule):
def on_leftclick(self):
with socket.create_connection(("localhost", self.port)) as s:
s.recv(8192)
self._mpd_command(s, "pause %i" % (0 if self._mpd_command(s, "status")["state"] == "pause" else 1))
self._mpd_command(s, "pause %i" %
(0 if self._mpd_command(s, "status")["state"] == "pause" else 1))
def on_rightclick(self):
with socket.create_connection(("localhost", self.port)) as s:
s.recv(8192)
vol = int(self._mpd_command(s, "status")["volume"])
if vol == 0:
self._mpd_command(s, "setvol %i" % self.vol)

View File

@ -7,6 +7,8 @@ import netifaces
from i3pystatus import IntervalModule
# Remainder: if we raise minimum Python version to 3.3, use ipaddress module
def count_bits(integer):
bits = 0
while(integer):
@ -14,30 +16,38 @@ def count_bits(integer):
bits += 1
return bits
def v6_to_int(v6):
return int(v6.replace(":", ""), 16)
def prefix6(mask):
return count_bits(v6_to_int(mask))
def cidr6(addr, mask):
return "{addr}/{bits}".format(addr=addr, bits=prefix6(mask))
def v4_to_int(v4):
sum = 0
mul = 1
for part in reversed(v4.split(".")):
sum += int(part) * mul
mul *= 2**8
mul *= 2 ** 8
return sum
def prefix4(mask):
return count_bits(v4_to_int(mask))
def cidr4(addr, mask):
return "{addr}/{bits}".format(addr=addr, bits=prefix4(mask))
class Network(IntervalModule):
"""
Display network information about a interface.
@ -72,7 +82,8 @@ class Network(IntervalModule):
def init(self):
if self.interface not in netifaces.interfaces():
raise RuntimeError("Unknown interface {iface}!".format(iface=self.interface))
raise RuntimeError(
"Unknown interface {iface}!".format(iface=self.interface))
self.baseinfo = {
"interface": self.interface,
@ -83,7 +94,8 @@ class Network(IntervalModule):
def collect(self):
info = netifaces.ifaddresses(self.interface)
up = netifaces.AF_INET in info or netifaces.AF_INET6 in info
fdict = dict(zip_longest(["v4", "v4mask", "v4cidr", "v6", "v6mask", "v6cidr"], [], fillvalue=""))
fdict = dict(
zip_longest(["v4", "v4mask", "v4cidr", "v6", "v6mask", "v6cidr"], [], fillvalue=""))
fdict.update(self.baseinfo)
if up:

View File

@ -7,15 +7,18 @@ from lxml.cssselect import CSSSelector
from i3pystatus import IntervalModule
class TrackerAPI:
def __init__(self, idcode):
pass
def status(self):
return {}
class DHL(TrackerAPI):
URL="http://nolp.dhl.de/nextt-online-public/set_identcodes.do?lang=en&idc={idcode}"
URL = "http://nolp.dhl.de/nextt-online-public/set_identcodes.do?lang=en&idc={idcode}"
def __init__(self, idcode):
self.idcode = idcode
@ -23,7 +26,8 @@ class DHL(TrackerAPI):
error_selector = CSSSelector("#set_identcodes .error")
self.error = lambda page: len(error_selector(page)) >= 1
self.progress_selector = CSSSelector(".greyprogressbar > span, .greenprogressbar > span")
self.progress_selector = CSSSelector(
".greyprogressbar > span, .greenprogressbar > span")
self.last_status_selector = CSSSelector(".events .eventList tr")
self.intrarow_status_selector = CSSSelector("td.status div")
@ -36,14 +40,16 @@ class DHL(TrackerAPI):
else:
ret["progress"] = self.progress_selector(page)[0].text.strip()
last_row = self.last_status_selector(page)[-1]
ret["status"] = self.intrarow_status_selector(last_row)[0].text.strip()
ret["status"] = self.intrarow_status_selector(
last_row)[0].text.strip()
return ret
def get_url(self):
return self.url
class UPS(TrackerAPI):
URL="http://wwwapps.ups.com/WebTracking/processRequest?HTMLVersion=5.0&Requester=NES&AgreeToTermsAndConditions=yes&loc=en_US&tracknum={idcode}"
URL = "http://wwwapps.ups.com/WebTracking/processRequest?HTMLVersion=5.0&Requester=NES&AgreeToTermsAndConditions=yes&loc=en_US&tracknum={idcode}"
def __init__(self, idcode):
self.idcode = idcode
@ -62,13 +68,15 @@ class UPS(TrackerAPI):
ret["progress"] = ret["status"] = "n/a"
else:
ret["status"] = self.status_selector(page)[0].text.strip()
progress_cls = int(int(self.progress_selector(page)[0].get("class").strip("staus")) / 5 * 100)
ret["progress"] = progress_cls
progress_cls = int(
int(self.progress_selector(page)[0].get("class").strip("staus")) / 5 * 100)
ret["progress"] = progress_cls
return ret
def get_url(self):
return self.url
class ParcelTracker(IntervalModule):
interval = 20

View File

@ -2,7 +2,9 @@ from .pulse import *
from i3pystatus import Module
class PulseAudio(Module):
"""
Shows volume of default PulseAudio sink (output).
@ -22,10 +24,11 @@ class PulseAudio(Module):
"""Creates context, when context is ready context_notify_cb is called"""
# Wrap callback methods in appropriate ctypefunc instances so
# that the Pulseaudio C API can call them
self._context_notify_cb = pa_context_notify_cb_t(self.context_notify_cb)
self._context_notify_cb = pa_context_notify_cb_t(
self.context_notify_cb)
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb)
self._update_cb = pa_context_subscribe_cb_t(self.update_cb)
self._success_cb = pa_context_success_cb_t (self.success_cb)
self._success_cb = pa_context_success_cb_t(self.success_cb)
self._server_info_cb = pa_server_info_cb_t(self.server_info_cb)
# Create the mainloop thread and set our context_notify_cb
@ -41,7 +44,8 @@ class PulseAudio(Module):
def request_update(self, context):
"""Requests a sink info update (sink_info_cb is called)"""
pa_operation_unref(pa_context_get_sink_info_by_name(context, self.sink, self._sink_info_cb, None))
pa_operation_unref(pa_context_get_sink_info_by_name(
context, self.sink, self._sink_info_cb, None))
def success_cb(self, context, success, userdata):
pass
@ -63,11 +67,13 @@ class PulseAudio(Module):
state = pa_context_get_state(context)
if state == PA_CONTEXT_READY:
pa_operation_unref(pa_context_get_server_info(context, self._server_info_cb, None))
pa_operation_unref(
pa_context_get_server_info(context, self._server_info_cb, None))
pa_context_set_subscribe_callback(context, self._update_cb, None)
pa_operation_unref(pa_context_subscribe(context, PA_SUBSCRIPTION_EVENT_CHANGE|PA_SUBSCRIPTION_MASK_SINK, self._success_cb, None))
pa_operation_unref(pa_context_subscribe(
context, PA_SUBSCRIPTION_EVENT_CHANGE | PA_SUBSCRIPTION_MASK_SINK, self._success_cb, None))
def update_cb(self, context, t, idx, userdata):
"""A sink property changed, calls request_update"""
@ -77,7 +83,7 @@ class PulseAudio(Module):
"""Updates self.output"""
if sink_info_p:
sink_info = sink_info_p.contents
volume_percent = int(100 * sink_info.volume.values[0]/0x10000)
volume_percent = int(100 * sink_info.volume.values[0] / 0x10000)
volume_db = pa_sw_volume_to_dB(sink_info.volume.values[0])
if volume_db == float('-Infinity'):
volume_db = "-∞"

View File

@ -20,48 +20,64 @@ PA_OPERATION_RUNNING = 0
PA_SUBSCRIPTION_EVENT_CHANGE = 16
PA_SUBSCRIPTION_MASK_SINK = 1
class pa_sink_port_info(Structure):
pass
class pa_format_info(Structure):
pass
class pa_context(Structure):
pass
pa_context._fields_ = [
]
pa_context_notify_cb_t = CFUNCTYPE(None, POINTER(pa_context), c_void_p)
pa_context_success_cb_t = CFUNCTYPE(None, POINTER(pa_context), c_int, c_void_p)
class pa_proplist(Structure):
pass
pa_context_event_cb_t = CFUNCTYPE(None, POINTER(pa_context), STRING, POINTER(pa_proplist), c_void_p)
pa_context_event_cb_t = CFUNCTYPE(
None, POINTER(pa_context), STRING, POINTER(pa_proplist), c_void_p)
class pa_mainloop_api(Structure):
pass
pa_context_new = _libraries['libpulse.so.0'].pa_context_new
pa_context_new.restype = POINTER(pa_context)
pa_context_new.argtypes = [POINTER(pa_mainloop_api), STRING]
pa_context_new_with_proplist = _libraries['libpulse.so.0'].pa_context_new_with_proplist
pa_context_new_with_proplist = _libraries[
'libpulse.so.0'].pa_context_new_with_proplist
pa_context_new_with_proplist.restype = POINTER(pa_context)
pa_context_new_with_proplist.argtypes = [POINTER(pa_mainloop_api), STRING, POINTER(pa_proplist)]
pa_context_new_with_proplist.argtypes = [
POINTER(pa_mainloop_api), STRING, POINTER(pa_proplist)]
pa_context_unref = _libraries['libpulse.so.0'].pa_context_unref
pa_context_unref.restype = None
pa_context_unref.argtypes = [POINTER(pa_context)]
pa_context_ref = _libraries['libpulse.so.0'].pa_context_ref
pa_context_ref.restype = POINTER(pa_context)
pa_context_ref.argtypes = [POINTER(pa_context)]
pa_context_set_state_callback = _libraries['libpulse.so.0'].pa_context_set_state_callback
pa_context_set_state_callback = _libraries[
'libpulse.so.0'].pa_context_set_state_callback
pa_context_set_state_callback.restype = None
pa_context_set_state_callback.argtypes = [POINTER(pa_context), pa_context_notify_cb_t, c_void_p]
pa_context_set_state_callback.argtypes = [
POINTER(pa_context), pa_context_notify_cb_t, c_void_p]
# values for enumeration 'pa_context_state'
pa_context_state = c_int # enum
pa_context_state = c_int # enum
pa_context_state_t = pa_context_state
pa_context_get_state = _libraries['libpulse.so.0'].pa_context_get_state
pa_context_get_state.restype = pa_context_state_t
pa_context_get_state.argtypes = [POINTER(pa_context)]
# values for enumeration 'pa_context_flags'
pa_context_flags = c_int # enum
pa_context_flags = c_int # enum
pa_context_flags_t = pa_context_flags
class pa_spawn_api(Structure):
_fields_ = [
('prefork', CFUNCTYPE(None)),
@ -71,13 +87,17 @@ class pa_spawn_api(Structure):
pa_context_connect = _libraries['libpulse.so.0'].pa_context_connect
pa_context_connect.restype = c_int
pa_context_connect.argtypes = [POINTER(pa_context), STRING, pa_context_flags_t, POINTER(pa_spawn_api)]
pa_context_connect.argtypes = [
POINTER(pa_context), STRING, pa_context_flags_t, POINTER(pa_spawn_api)]
pa_context_disconnect = _libraries['libpulse.so.0'].pa_context_disconnect
pa_context_disconnect.restype = None
pa_context_disconnect.argtypes = [POINTER(pa_context)]
class pa_operation(Structure):
pass
class pa_sample_spec(Structure):
_fields_ = [
('format', c_int),
@ -86,27 +106,31 @@ class pa_sample_spec(Structure):
]
# values for enumeration 'pa_subscription_mask'
pa_subscription_mask = c_int # enum
pa_subscription_mask = c_int # enum
pa_subscription_mask_t = pa_subscription_mask
# values for enumeration 'pa_subscription_event_type'
pa_subscription_event_type = c_int # enum
pa_subscription_event_type = c_int # enum
pa_subscription_event_type_t = pa_subscription_event_type
pa_context_subscribe_cb_t = CFUNCTYPE(None, POINTER(pa_context), pa_subscription_event_type_t, c_uint32, c_void_p)
pa_context_subscribe_cb_t = CFUNCTYPE(
None, POINTER(pa_context), pa_subscription_event_type_t, c_uint32, c_void_p)
pa_context_subscribe = _libraries['libpulse.so.0'].pa_context_subscribe
pa_context_subscribe.restype = POINTER(pa_operation)
pa_context_subscribe.argtypes = [POINTER(pa_context), pa_subscription_mask_t, pa_context_success_cb_t, c_void_p]
pa_context_set_subscribe_callback = _libraries['libpulse.so.0'].pa_context_set_subscribe_callback
pa_context_subscribe.argtypes = [
POINTER(pa_context), pa_subscription_mask_t, pa_context_success_cb_t, c_void_p]
pa_context_set_subscribe_callback = _libraries[
'libpulse.so.0'].pa_context_set_subscribe_callback
pa_context_set_subscribe_callback.restype = None
pa_context_set_subscribe_callback.argtypes = [POINTER(pa_context), pa_context_subscribe_cb_t, c_void_p]
pa_context_set_subscribe_callback.argtypes = [
POINTER(pa_context), pa_context_subscribe_cb_t, c_void_p]
# values for enumeration 'pa_sink_flags'
pa_sink_flags = c_int # enum
pa_sink_flags = c_int # enum
pa_sink_flags_t = pa_sink_flags
# values for enumeration 'pa_sink_state'
pa_sink_state = c_int # enum
pa_sink_state = c_int # enum
pa_sink_state_t = pa_sink_state
pa_free_cb_t = CFUNCTYPE(None, c_void_p)
@ -114,14 +138,18 @@ pa_strerror = _libraries['libpulse.so.0'].pa_strerror
pa_strerror.restype = STRING
pa_strerror.argtypes = [c_int]
class pa_sink_info(Structure):
pass
class pa_cvolume(Structure):
_fields_ = [
('channels', c_uint8),
('values', pa_volume_t * 32),
]
class pa_channel_map(Structure):
_fields_ = [
('channels', c_uint8),
@ -153,16 +181,24 @@ pa_sink_info._fields_ = [
('n_formats', c_uint8),
('formats', POINTER(POINTER(pa_format_info))),
]
pa_sink_info_cb_t = CFUNCTYPE(None, POINTER(pa_context), POINTER(pa_sink_info), c_int, c_void_p)
pa_context_get_sink_info_by_name = _libraries['libpulse.so.0'].pa_context_get_sink_info_by_name
pa_sink_info_cb_t = CFUNCTYPE(
None, POINTER(pa_context), POINTER(pa_sink_info), c_int, c_void_p)
pa_context_get_sink_info_by_name = _libraries[
'libpulse.so.0'].pa_context_get_sink_info_by_name
pa_context_get_sink_info_by_name.restype = POINTER(pa_operation)
pa_context_get_sink_info_by_name.argtypes = [POINTER(pa_context), STRING, pa_sink_info_cb_t, c_void_p]
pa_context_get_sink_info_by_index = _libraries['libpulse.so.0'].pa_context_get_sink_info_by_index
pa_context_get_sink_info_by_name.argtypes = [
POINTER(pa_context), STRING, pa_sink_info_cb_t, c_void_p]
pa_context_get_sink_info_by_index = _libraries[
'libpulse.so.0'].pa_context_get_sink_info_by_index
pa_context_get_sink_info_by_index.restype = POINTER(pa_operation)
pa_context_get_sink_info_by_index.argtypes = [POINTER(pa_context), c_uint32, pa_sink_info_cb_t, c_void_p]
pa_context_get_sink_info_list = _libraries['libpulse.so.0'].pa_context_get_sink_info_list
pa_context_get_sink_info_by_index.argtypes = [
POINTER(pa_context), c_uint32, pa_sink_info_cb_t, c_void_p]
pa_context_get_sink_info_list = _libraries[
'libpulse.so.0'].pa_context_get_sink_info_list
pa_context_get_sink_info_list.restype = POINTER(pa_operation)
pa_context_get_sink_info_list.argtypes = [POINTER(pa_context), pa_sink_info_cb_t, c_void_p]
pa_context_get_sink_info_list.argtypes = [
POINTER(pa_context), pa_sink_info_cb_t, c_void_p]
class pa_server_info(Structure):
pass
@ -177,10 +213,14 @@ pa_server_info._fields_ = [
('cookie', c_uint32),
('channel_map', pa_channel_map),
]
pa_server_info_cb_t = CFUNCTYPE(None, POINTER(pa_context), POINTER(pa_server_info), c_void_p)
pa_context_get_server_info = _libraries['libpulse.so.0'].pa_context_get_server_info
pa_server_info_cb_t = CFUNCTYPE(
None, POINTER(pa_context), POINTER(pa_server_info), c_void_p)
pa_context_get_server_info = _libraries[
'libpulse.so.0'].pa_context_get_server_info
pa_context_get_server_info.restype = POINTER(pa_operation)
pa_context_get_server_info.argtypes = [POINTER(pa_context), pa_server_info_cb_t, c_void_p]
pa_context_get_server_info.argtypes = [
POINTER(pa_context), pa_server_info_cb_t, c_void_p]
class pa_threaded_mainloop(Structure):
pass
@ -189,37 +229,48 @@ pa_threaded_mainloop._fields_ = [
pa_threaded_mainloop_new = _libraries['libpulse.so.0'].pa_threaded_mainloop_new
pa_threaded_mainloop_new.restype = POINTER(pa_threaded_mainloop)
pa_threaded_mainloop_new.argtypes = []
pa_threaded_mainloop_free = _libraries['libpulse.so.0'].pa_threaded_mainloop_free
pa_threaded_mainloop_free = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_free
pa_threaded_mainloop_free.restype = None
pa_threaded_mainloop_free.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_start = _libraries['libpulse.so.0'].pa_threaded_mainloop_start
pa_threaded_mainloop_start = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_start
pa_threaded_mainloop_start.restype = c_int
pa_threaded_mainloop_start.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_stop = _libraries['libpulse.so.0'].pa_threaded_mainloop_stop
pa_threaded_mainloop_stop = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_stop
pa_threaded_mainloop_stop.restype = None
pa_threaded_mainloop_stop.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_lock = _libraries['libpulse.so.0'].pa_threaded_mainloop_lock
pa_threaded_mainloop_lock = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_lock
pa_threaded_mainloop_lock.restype = None
pa_threaded_mainloop_lock.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_unlock = _libraries['libpulse.so.0'].pa_threaded_mainloop_unlock
pa_threaded_mainloop_unlock = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_unlock
pa_threaded_mainloop_unlock.restype = None
pa_threaded_mainloop_unlock.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_wait = _libraries['libpulse.so.0'].pa_threaded_mainloop_wait
pa_threaded_mainloop_wait = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_wait
pa_threaded_mainloop_wait.restype = None
pa_threaded_mainloop_wait.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_signal = _libraries['libpulse.so.0'].pa_threaded_mainloop_signal
pa_threaded_mainloop_signal = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_signal
pa_threaded_mainloop_signal.restype = None
pa_threaded_mainloop_signal.argtypes = [POINTER(pa_threaded_mainloop), c_int]
pa_threaded_mainloop_accept = _libraries['libpulse.so.0'].pa_threaded_mainloop_accept
pa_threaded_mainloop_accept = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_accept
pa_threaded_mainloop_accept.restype = None
pa_threaded_mainloop_accept.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_get_retval = _libraries['libpulse.so.0'].pa_threaded_mainloop_get_retval
pa_threaded_mainloop_get_retval = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_get_retval
pa_threaded_mainloop_get_retval.restype = c_int
pa_threaded_mainloop_get_retval.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_get_api = _libraries['libpulse.so.0'].pa_threaded_mainloop_get_api
pa_threaded_mainloop_get_api = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_get_api
pa_threaded_mainloop_get_api.restype = POINTER(pa_mainloop_api)
pa_threaded_mainloop_get_api.argtypes = [POINTER(pa_threaded_mainloop)]
pa_threaded_mainloop_in_thread = _libraries['libpulse.so.0'].pa_threaded_mainloop_in_thread
pa_threaded_mainloop_in_thread = _libraries[
'libpulse.so.0'].pa_threaded_mainloop_in_thread
pa_threaded_mainloop_in_thread.restype = c_int
pa_threaded_mainloop_in_thread.argtypes = [POINTER(pa_threaded_mainloop)]

View File

@ -1,12 +1,16 @@
import urllib.request, urllib.parse, urllib.error
import urllib.request
import urllib.parse
import urllib.error
import http.cookiejar
import webbrowser
import json
from i3pystatus import IntervalModule
class pyLoad(IntervalModule):
"""
Shows pyLoad status
@ -40,12 +44,13 @@ class pyLoad(IntervalModule):
def _rpc_call(self, method, data=None):
if not data:
data = {}
urlencoded = urllib.parse.urlencode(data).encode("ascii")
urlencoded = urllib.parse.urlencode(data).encode("ascii")
return json.loads(self.opener.open("{address}/api/{method}/".format(address=self.address, method=method), urlencoded).read().decode("utf-8"))
def init(self):
self.cj = http.cookiejar.CookieJar()
self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cj))
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cj))
def login(self):
return self._rpc_call("login", {
@ -59,7 +64,8 @@ class pyLoad(IntervalModule):
downloads_status = self._rpc_call("statusDownloads")
if downloads_status:
progress = sum(dl["percent"] for dl in downloads_status) / len(downloads_status) * 100
progress = sum(dl["percent"]
for dl in downloads_status) / len(downloads_status) * 100
else:
progress = 100.0

View File

@ -2,7 +2,9 @@ import re
from i3pystatus import IntervalModule
class Regex(IntervalModule):
"""
Simple regex file watcher
@ -26,5 +28,5 @@ class Regex(IntervalModule):
with open(self.file, "r") as f:
match = self.re.search(f.read())
self.output = {
"full_text" : self.format.format(*match.groups()),
}
"full_text": self.format.format(*match.groups()),
}

View File

@ -3,7 +3,9 @@ import os.path
from i3pystatus import IntervalModule
class RunWatch(IntervalModule):
"""
Expands the given path using glob to a pidfile and checks
if the process ID found inside is valid

View File

@ -3,7 +3,9 @@ import glob
from i3pystatus import IntervalModule
class Temperature(IntervalModule):
"""
Shows CPU temperature of Intel processors
@ -11,7 +13,8 @@ class Temperature(IntervalModule):
"""
settings = (
("format", "format string used for output. {temp} is the temperature in degrees celsius, {critical} and {high} are the trip point temps."),
("format",
"format string used for output. {temp} is the temperature in degrees celsius, {critical} and {high} are the trip point temps."),
"color", "color_critical", "high_factor"
)
format = "{temp} °C"
@ -22,9 +25,11 @@ class Temperature(IntervalModule):
def init(self):
self.base_path = "/sys/devices/platform/coretemp.0"
input = glob.glob("{base_path}/temp*_input".format(base_path=self.base_path))[0]
input = glob.glob(
"{base_path}/temp*_input".format(base_path=self.base_path))[0]
self.input = re.search("temp([0-9]+)_input", input).group(1)
self.base_path = "{base_path}/temp{input}_".format(base_path=self.base_path, input=self.input)
self.base_path = "{base_path}/temp{input}_".format(
base_path=self.base_path, input=self.input)
with open("{base_path}crit".format(base_path=self.base_path), "r") as f:
self.critical = float(f.read().strip()) / 1000
@ -44,7 +49,7 @@ class Temperature(IntervalModule):
color = self.color_high
self.output = {
"full_text" : self.format.format(temp=temp, critical=self.critical, high=self.high),
"full_text": self.format.format(temp=temp, critical=self.critical, high=self.high),
"urgent": urgent,
"color": color,
}

View File

@ -3,7 +3,9 @@ import basiciw
from i3pystatus.network import Network
class Wireless(Network):
"""
Display network information about a interface.

View File

@ -8,20 +8,20 @@ setup(name="i3pystatus",
url="http://github.com/enkore/i3pystatus",
license="MIT",
classifiers=[
"Development Status :: 4 - Beta",
"Environment :: X11 Applications",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
"Topic :: Desktop Environment :: Window Managers",
"Development Status :: 4 - Beta",
"Environment :: X11 Applications",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
"Topic :: Desktop Environment :: Window Managers",
],
packages=[
"i3pystatus",
"i3pystatus.core",
"i3pystatus.mail",
"i3pystatus.pulseaudio",
"i3pystatus",
"i3pystatus.core",
"i3pystatus.mail",
"i3pystatus.pulseaudio",
],
entry_points={
"console_scripts": ["i3pystatus = i3pystatus:main"],
"console_scripts": ["i3pystatus = i3pystatus:main"],
},
)
)

View File

@ -4,6 +4,7 @@ import unittest
from i3pystatus import battery
def factory(path, format, expected):
def test():
bc = battery.BatteryChecker(path=path, format=format)
@ -13,6 +14,7 @@ def factory(path, format, expected):
test.description = path + ":" + format
return test
def basic_test_generator():
cases = [
("test_battery_basic1", "FULL", "0.000", ""),

View File

@ -9,15 +9,18 @@ import types
from i3pystatus.core import util
def get_random_string(length=6, chars=string.printable):
return ''.join(random.choice(chars) for x in range(length))
def lchop(prefix, string):
chopped = util.lchop(string, prefix)
if string.startswith(prefix):
assert len(chopped) == len(string) - len(prefix)
assert not (prefix and chopped.startswith(prefix))
def lchop_test_generator():
cases = [
('\x0b,S0I', "=t5Bk+\x0b'_;duq=9"),
@ -49,17 +52,19 @@ def lchop_test_generator():
("<|(h|P9Wz9d9'u,M", '7d-A\nY{}5"\' !*gHHh`x0!B2Ox?yeKb\x0b'),
('bV?:f\x0b#HDhuwSvys3', ";\r,L![\x0cU7@ne@'?[*&V<dap]+Tq[n1!|PE"),
('T\r~bGV^@JC?P@Pa66.', "9,q>VI,[}pHM\nB65@LfE16VJPw=r'zU\x0bzWj@"),
('^|j7N!mV0o(?*1>p?dy', '\\ZdA&:\t\x0b:8\t|7.Kl,oHw-\x0cS\nwZlND~uC@le`Sm'),
('^|j7N!mV0o(?*1>p?dy',
'\\ZdA&:\t\x0b:8\t|7.Kl,oHw-\x0cS\nwZlND~uC@le`Sm'),
]
for prefix, string in cases:
yield lchop, prefix, prefix+string
yield lchop, prefix, prefix + string
yield lchop, prefix, string
yield lchop, string, string
yield lchop, string, prefix
yield lchop, "", string
yield lchop, prefix, ""
yield lchop, prefix+prefix, prefix+prefix+prefix+string
yield lchop, prefix + prefix, prefix + prefix + prefix + string
def partition(iterable, limit, assrt):
partitions = util.partition(iterable, limit)
@ -70,8 +75,8 @@ def partition(iterable, limit, assrt):
def partition_test_generator():
cases = [
([1, 2, 3, 4], 3, [[1,2], [3], [4]]),
([2, 1, 3, 4], 3, [[1,2], [3], [4]]),
([1, 2, 3, 4], 3, [[1, 2], [3], [4]]),
([2, 1, 3, 4], 3, [[1, 2], [3], [4]]),
([0.33, 0.45, 0.89], 1, [[0.33, 0.45, 0.89]]),
([], 10, []),
]
@ -79,9 +84,11 @@ def partition_test_generator():
for iterable, limit, assrt in cases:
yield partition, iterable, limit, assrt
def popwhile(iterable, predicate, assrt):
assert list(util.popwhile(predicate, iterable)) == assrt
def popwhile_test_generator():
cases = [
([1, 2, 3, 4], lambda x: x < 2, []),
@ -94,25 +101,30 @@ def popwhile_test_generator():
for iterable, predicate, assrt in cases:
yield popwhile, iterable, predicate, assrt
def keyconstraintdict_missing(valid, required, feedkeys, assrt_missing):
kcd = util.KeyConstraintDict(valid_keys=valid, required_keys=required)
kcd.update(dict.fromkeys(feedkeys))
assert kcd.missing() == set(assrt_missing)
def keyconstraintdict_missing_test_generator():
cases = [
# ( valid, required, feed, missing )
(("foo", "bar", "baz"), ("foo",), ("bar",), ("foo",)),
(("foo", "bar", "baz"), ("foo",), tuple(), ("foo",)),
(("foo", "bar", "baz"), ("bar", "baz"), ("bar", "baz"), tuple()),
(("foo", "bar", "baz"), ("bar", "baz"), ("bar", "foo", "baz"), tuple()),
(("foo", "bar", "baz"), ("bar", "baz"),
("bar", "foo", "baz"), tuple()),
]
for valid, required, feed, missing in cases:
yield keyconstraintdict_missing, valid, required, feed, missing
class ModuleListTests(unittest.TestCase):
class ModuleBase:
pass
@ -128,7 +140,8 @@ class ModuleListTests(unittest.TestCase):
module.registered.assert_called_with(self.status_handler)
def _create_module_class(self, name, bases=None):
if not bases: bases = (self.ModuleBase,)
if not bases:
bases = (self.ModuleBase,)
return type(name, bases, {
"registered": MagicMock(),
"__init__": MagicMock(return_value=None),
@ -173,7 +186,9 @@ class ModuleListTests(unittest.TestCase):
cls.__init__.assert_called_with()
cls.registered.assert_called_with(self.status_handler)
class PrefixedKeyDictTests(unittest.TestCase):
def test_no_prefix(self):
dict = util.PrefixedKeyDict("")
dict["foo"] = None
@ -200,46 +215,55 @@ class PrefixedKeyDictTests(unittest.TestCase):
assert realdict["pfx_foo"] == None
assert realdict["pfx_bar"] == 42
class KeyConstraintDictAdvancedTests(unittest.TestCase):
def test_invalid_1(self):
kcd = util.KeyConstraintDict(valid_keys=tuple(), required_keys=tuple())
with self.assertRaises(KeyError):
kcd["invalid"] = True
def test_invalid_2(self):
kcd = util.KeyConstraintDict(valid_keys=("foo", "bar"), required_keys=tuple())
kcd = util.KeyConstraintDict(
valid_keys=("foo", "bar"), required_keys=tuple())
with self.assertRaises(KeyError):
kcd["invalid"] = True
def test_incomplete_iteration(self):
kcd = util.KeyConstraintDict(valid_keys=("foo", "bar"), required_keys=("foo",))
kcd = util.KeyConstraintDict(
valid_keys=("foo", "bar"), required_keys=("foo",))
with self.assertRaises(util.KeyConstraintDict.MissingKeys):
for x in kcd:
pass
def test_completeness(self):
kcd = util.KeyConstraintDict(valid_keys=("foo", "bar"), required_keys=("foo",))
kcd = util.KeyConstraintDict(
valid_keys=("foo", "bar"), required_keys=("foo",))
kcd["foo"] = False
for x in kcd:
pass
assert kcd.missing() == set()
def test_remove_required(self):
kcd = util.KeyConstraintDict(valid_keys=("foo", "bar"), required_keys=("foo",))
kcd = util.KeyConstraintDict(
valid_keys=("foo", "bar"), required_keys=("foo",))
kcd["foo"] = None
assert kcd.missing() == set()
del kcd["foo"]
assert kcd.missing() == set(["foo"])
def test_set_twice(self):
kcd = util.KeyConstraintDict(valid_keys=("foo", "bar"), required_keys=("foo",))
kcd = util.KeyConstraintDict(
valid_keys=("foo", "bar"), required_keys=("foo",))
kcd["foo"] = 1
kcd["foo"] = 2
assert kcd.missing() == set()
del kcd["foo"]
assert kcd.missing() == set(["foo"])
class FormatPTests(unittest.TestCase):
def test_escaping(self):
assert util.formatp("[razamba \[ mabe \]]") == "razamba [ mabe ]"
@ -251,22 +275,28 @@ class FormatPTests(unittest.TestCase):
def test_nesting(self):
s = "[[{artist} - ]{album} - ]{title}"
assert util.formatp(s, title="Black rose") == "Black rose"
assert util.formatp(s, artist="In Flames", title="Gyroscope") == "Gyroscope"
assert util.formatp(s, artist="SOAD", album="Toxicity", title="Science") == "SOAD - Toxicity - Science"
assert util.formatp(s, album="Toxicity", title="Science") == "Toxicity - Science"
assert util.formatp(
s, artist="In Flames", title="Gyroscope") == "Gyroscope"
assert util.formatp(
s, artist="SOAD", album="Toxicity", title="Science") == "SOAD - Toxicity - Science"
assert util.formatp(
s, album="Toxicity", title="Science") == "Toxicity - Science"
def test_bare(self):
assert util.formatp("{foo} blar", foo="bar") == "bar blar"
def test_presuffix(self):
assert util.formatp("ALINA[{title} schnacke]KOMMAHER", title="") == "ALINAKOMMAHER"
assert util.formatp(
"ALINA[{title} schnacke]KOMMAHER", title="") == "ALINAKOMMAHER"
assert util.formatp("grml[{title}]") == "grml"
assert util.formatp("[{t}]grml") == "grml"
def test_side_by_side(self):
s = "{status} [{artist} / [{album} / ]]{title}[ {song_elapsed}/{song_length}]"
assert util.formatp(s, status="", title="Only For The Weak", song_elapsed="1:41", song_length="4:55") == "▷ Only For The Weak 1:41/4:55"
assert util.formatp(s, status="", album="Foo", title="Die, Die, Crucified", song_elapsed="2:52") == " Die, Die, Crucified"
assert util.formatp(s, status="", title="Only For The Weak",
song_elapsed="1:41", song_length="4:55") == "▷ Only For The Weak 1:41/4:55"
assert util.formatp(
s, status="", album="Foo", title="Die, Die, Crucified", song_elapsed="2:52") == " Die, Die, Crucified"
assert util.formatp("[[{a}][{b}]]", b=1) == "1"
def test_complex_field(self):