Some internal code butchering again.

This commit is contained in:
enkore 2013-03-05 17:26:10 +01:00
parent 3cac448f6e
commit 03d96ad0ea
7 changed files with 191 additions and 189 deletions

View File

@ -0,0 +1,44 @@
import inspect
import types
class ClassFinder:
"""Support class to find classes of specific bases in a module"""
def __init__(self, baseclass):
self.baseclass = baseclass
def predicate_factory(self, module):
def predicate(obj):
return (
inspect.isclass(obj) and
issubclass(obj, self.baseclass) and
obj.__module__ == module.__name__
)
return predicate
def search_module(self, module):
return list(zip(*inspect.getmembers(module, self.predicate_factory(module))))[1]
def get_class(self, module):
classes = self.search_module(module)
if len(classes) > 1:
# If there are multiple Module clases bundled in one module,
# well, we can't decide for the user.
raise ConfigAmbigiousClassesError(module.__name__, classes)
elif not classes:
raise ConfigInvalidModuleError(module.__name__)
return classes[0]
def get_module(self, module):
return getattr(__import__("i3pystatus.{module}".format(module=module), globals(), {}, []), module)
def instanciate_class_from_module(self, module, *args, **kwargs):
if isinstance(module, types.ModuleType):
return self.get_class(module)(*args, **kwargs)
elif isinstance(module, str):
return self.instanciate_class_from_module(self.get_module(module), *args, **kwargs)
elif args or kwargs:
raise ValueError("Additional arguments are invalid if 'module' is already an object")
return module

View File

@ -1,7 +1,7 @@
from threading import Thread
import time
from .util import SettingsBase
from .settings import SettingsBase
from .threads import AutomagicManager
__all__ = [

View File

@ -0,0 +1,61 @@
from .util import KeyConstraintDict
from .exceptions import ConfigKeyError, ConfigMissingError
class SettingsBase:
"""
Support class for providing a nice and flexible settings interface
Classes inherit from this class and define what settings they provide and
which are required.
The constructor is either passed a dictionary containing these settings, or
keyword arguments specifying the same.
Settings are stored as attributes of self
"""
settings = tuple()
"""settings should be tuple containing two types of elements:
* bare strings, which must be valid identifiers.
* two-tuples, the first element being a identifier (as above) and the second
a docstring for the particular setting"""
required = tuple()
"""required can list settings which are required"""
def __init__(self, *args, **kwargs):
def flatten_setting(setting):
return setting[0] if isinstance(setting, tuple) else setting
def flatten_settings(settings):
return tuple(flatten_setting(setting) for setting in settings)
def get_argument_dict(args, kwargs):
if len(args) == 1 and not kwargs:
# User can also pass in a dict for their settings
# Note: you could do that anyway, with the ** syntax
return args[0]
return kwargs
self.settings = flatten_settings(self.settings)
sm = KeyConstraintDict(self.settings, self.required)
settings_source = get_argument_dict(args, kwargs)
try:
sm.update(settings_source)
except KeyError as exc:
raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc
try:
self.__dict__.update(sm)
except KeyConstraintDict.MissingKeys as exc:
raise ConfigMissingError(type(self).__name__, missing=exc.keys) from exc
self.__name__ = "{}.{}".format(self.__module__, self.__class__.__name__)
self.init()
def init(self):
"""Convenience method which is called after all settings are set
In case you don't want to type that super()…blabla :-)"""

View File

@ -1,9 +1,9 @@
import sys
import threading
import time
import traceback
import collections
from .util import partition
try:
from setproctitle import setproctitle
@ -11,74 +11,76 @@ except ImportError:
def setproctitle(title):
pass
if hasattr(time, "perf_counter"):
timer = time.perf_counter
else:
timer = time.clock
timer = time.perf_counter if hasattr(time, "perf_counter") else time.clock
class ExceptionWrapper:
class Wrapper:
def __init__(self, workload):
self.workload = workload
def __call__(self):
try:
self.workload()
except Exception as exc:
traceback.print_exception(*sys.exc_info(), file=sys.stderr)
def __repr__(self):
return repr(self.workload)
class WorkloadWrapper:
def __init__(self, workload):
self.workload = workload
self.time = 0.0
class ExceptionWrapper(Wrapper):
def __call__(self):
try:
self.workload()
except Exception as exc:
sys.stderr.write("Exception in {thread}".format(thread=threading.current_thread().name))
traceback.print_exception(*sys.exc_info(), file=sys.stderr)
sys.stderr.flush()
class WorkloadWrapper(Wrapper):
time = 0.0
def __call__(self):
tp1 = timer()
self.workload()
self.time = timer() - tp1
def __repr__(self):
return repr(self.workload)
class Thread(threading.Thread):
def __init__(self, target_interval, workloads=None, start_barrier=1):
super().__init__()
self.workloads = workloads if workloads is not None else []
self.workloads = workloads or []
self.target_interval = target_interval
self.start_barrier = start_barrier
self.daemon = True
def __iter__(self):
return iter(self.workloads)
def __len__(self):
return len(self.workloads)
def pop(self):
return self.workloads.pop()
def append(self, workload):
self.workloads.append(workload)
def run(self):
while len(self) <= self.start_barrier:
time.sleep(0.3)
setproctitle("i3pystatus: {name}/{workloads}".format(name=self.name, workloads=list(map(repr, self.workloads))))
while self:
for workload in self:
workload()
self.workloads.sort(key=lambda workload: workload.time)
filltime = self.target_interval - self.time
if filltime > 0:
time.sleep(filltime)
@property
def time(self):
return sum(map(lambda workload: workload.time, self))
def wait_for_start_barrier(self):
while len(self) <= self.start_barrier:
time.sleep(0.4)
def setproctitle(self):
setproctitle("i3pystatus {name}: {workloads}".format(name=self.name, workloads=list(map(repr, self.workloads))))
def execute_workloads(self):
for workload in self:
workload()
self.workloads.sort(key=lambda workload: workload.time)
def run(self):
self.setproctitle()
while self:
self.execute_workloads()
filltime = self.target_interval - self.time
if filltime > 0:
time.sleep(filltime)
class AutomagicManager:
def __init__(self, target_interval):
self.target_interval = target_interval
@ -91,33 +93,27 @@ class AutomagicManager:
def __call__(self):
separate = []
for thread in self.threads:
separate.extend(self.optimize(thread, thread.time))
separate.extend(self.branch(thread, thread.time))
self.create_threads(self.partition(separate))
def __repr__(self):
return "Manager"
def wrap(self, workload):
return WorkloadWrapper(ExceptionWrapper(workload))
def optimize(self, thread, vtime):
# def calculate_sparse_times():
# return ((self.lower_bound - thread.time, thread) for thread in self.threads)
def branch(self, thread, vtime):
if len(thread) > 1 and vtime > self.upper_bound:
remove = thread.pop()
return [remove] + self.optimize(thread, vtime - remove.time)
return [remove] + self.branch(thread, vtime - remove.time)
return []
def partition(self, workloads):
timesum = 0.0
new_threads = []
current_thread = []
for workload in workloads:
current_thread.append(workload)
timesum += workload.time
if timesum > self.lower_bound:
new_threads.append(current_thread)
current_thread = []
timesum = 0
return new_threads
return partition(workloads, self.lower_bound, lambda workload: workload.time)
def create_threads(self, threads):
for workloads in threads:

View File

@ -1,17 +1,31 @@
import inspect
import types
import collections
from .exceptions import *
from .imputil import ClassFinder
__all__ = [
"SettingsBase",
"ClassFinder",
"ModuleList",
"KeyConstraintDict", "PrefixedKeyDict",
"ModuleList", "KeyConstraintDict", "PrefixedKeyDict",
]
def popwhile(predicate, iterable):
while iterable:
item = iterable.pop()
if predicate(item):
yield item
else:
break
def partition(iterable, limit, key=None):
key = key or (lambda x: x)
partitions = []
while iterable:
sum = 0.0
partitions.append(list(
popwhile(lambda x: sum + key(x) or sum < limit, iterable)
))
return partitions
def round_dict(dic, places):
for key, value in dic.items():
dic[key] = round(value, places)
@ -63,104 +77,3 @@ class KeyConstraintDict(collections.UserDict):
def missing(self):
return self.required_keys - (self.seen_keys & self.required_keys)
class SettingsBase:
"""
Support class for providing a nice and flexible settings interface
Classes inherit from this class and define what settings they provide and
which are required.
The constructor is either passed a dictionary containing these settings, or
keyword arguments specifying the same.
Settings are stored as attributes of self
"""
settings = tuple()
"""settings should be tuple containing two types of elements:
* bare strings, which must be valid identifiers.
* two-tuples, the first element being a identifier (as above) and the second
a docstring for the particular setting"""
required = tuple()
"""required can list settings which are required"""
def __init__(self, *args, **kwargs):
def flatten_setting(setting):
return setting[0] if isinstance(setting, tuple) else setting
def flatten_settings(settings):
return tuple(flatten_setting(setting) for setting in settings)
def get_argument_dict(args, kwargs):
if len(args) == 1 and not kwargs:
# User can also pass in a dict for their settings
# Note: you could do that anyway, with the ** syntax
return args[0]
return kwargs
self.settings = flatten_settings(self.settings)
sm = KeyConstraintDict(self.settings, self.required)
settings_source = get_argument_dict(args, kwargs)
try:
sm.update(settings_source)
except KeyError as exc:
raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc
try:
self.__dict__.update(sm)
except KeyConstraintDict.MissingKeys as exc:
raise ConfigMissingError(type(self).__name__, missing=exc.keys) from exc
self.__name__ = "{}.{}".format(self.__module__, self.__class__.__name__)
self.init()
def init(self):
"""Convenience method which is called after all settings are set
In case you don't want to type that super()…blabla :-)"""
class ClassFinder:
"""Support class to find classes of specific bases in a module"""
def __init__(self, baseclass):
self.baseclass = baseclass
def predicate_factory(self, module):
def predicate(obj):
return (
inspect.isclass(obj) and
issubclass(obj, self.baseclass) and
obj.__module__ == module.__name__
)
return predicate
def search_module(self, module):
return list(zip(*inspect.getmembers(module, self.predicate_factory(module))))[1]
def get_class(self, module):
classes = self.search_module(module)
if len(classes) > 1:
# If there are multiple Module clases bundled in one module,
# well, we can't decide for the user.
raise ConfigAmbigiousClassesError(module.__name__, classes)
elif not classes:
raise ConfigInvalidModuleError(module.__name__)
return classes[0]
def get_module(self, module):
return getattr(__import__("i3pystatus.{module}".format(module=module), globals(), {}, []), module)
def instanciate_class_from_module(self, module, *args, **kwargs):
if isinstance(module, types.ModuleType):
return self.get_class(module)(*args, **kwargs)
elif isinstance(module, str):
return self.instanciate_class_from_module(self.get_module(module), *args, **kwargs)
elif args or kwargs:
raise ValueError("Additional arguments are invalid if 'module' is already an object")
return module

View File

@ -9,7 +9,7 @@ import textwrap
import i3pystatus
import i3pystatus.mail
from .core.util import ClassFinder
from .core.imputil import ClassFinder
IGNORE = ("__main__", "mkdocs")
MODULE_FORMAT = """
@ -36,21 +36,16 @@ class Module:
else:
self.name = "{module}.{cls}".format(module=module_name, cls=self.cls.__name__)
if self.cls.__doc__ is not None:
self.doc = self.cls.__doc__
elif module.__doc__ is not None:
self.doc = module.__doc__
else:
self.doc = ""
self.doc = self.cls.__doc__ or module.__doc__ or ""
if hasattr(self.cls, "_endstring"):
self.endstring = self.cls._endstring
self.get_settings()
self.read_settings()
def get_settings(self):
def read_settings(self):
for setting in self.cls.settings:
self.settings.append(Setting(self, setting))
self.settings.append(Setting(self.cls, setting))
def format_settings(self):
return "\n".join(map(str, self.settings))
@ -65,22 +60,21 @@ class Module:
)
class Setting:
name = ""
doc = ""
required = False
default = sentinel = object()
def __init__(self, mod, setting):
def __init__(self, cls, setting):
if isinstance(setting, tuple):
self.name = setting[0]
self.doc = setting[1]
else:
self.name = setting
if setting in mod.cls.required:
if setting in cls.required:
self.required = True
elif hasattr(mod.cls, self.name):
self.default = getattr(mod.cls, self.name)
elif hasattr(cls, self.name):
self.default = getattr(cls, self.name)
def __str__(self):
attrs = []
@ -99,15 +93,11 @@ class Setting:
return formatted
def get_modules(path=None):
if path is None:
path = i3pystatus.get_path()
def get_modules(path):
modules = []
for finder, modname, ispkg in pkgutil.iter_modules(path):
if modname not in IGNORE:
modules.append(get_module(finder, modname))
return modules
def get_module(finder, modname):
@ -116,12 +106,11 @@ def get_module(finder, modname):
def get_all(module_path, heading, finder=None):
mods = []
if finder is None:
finder = i3pystatus.ClassFinder(i3pystatus.Module)
if not finder:
finder = ClassFinder(i3pystatus.Module)
for name, module in get_modules(module_path):
classes = finder.search_module(module)
for cls in classes:
mods.append(Module(cls, neighbours=len(classes), module_name=name, module=module, heading=heading))
@ -131,10 +120,9 @@ def generate_doc_for_module(module_path, heading="###", finder=None):
return "".join(map(str, get_all(module_path, heading, finder)))
with open("README.tpl.md", "r") as template:
tpl = template.read()
tpl = tpl.replace("!!module_doc!!", generate_doc_for_module(i3pystatus.__path__))
finder = i3pystatus.ClassFinder(baseclass=i3pystatus.mail.Backend)
finder = ClassFinder(baseclass=i3pystatus.mail.Backend)
tpl = tpl.replace("!!i3pystatus.mail!!", generate_doc_for_module(i3pystatus.mail.__path__, "###", finder).replace("\n", "\n> "))
print(tpl)
with open("README.md", "w") as output:
output.write(tpl + "\n")

View File

@ -1,4 +1,4 @@
#!/bin/sh
python -m i3pystatus.mkdocs > README.md
python -m i3pystatus.mkdocs