From 1b6498883b8d78f182125752843b1435273bc5ef Mon Sep 17 00:00:00 2001 From: enkore Date: Sun, 24 Feb 2013 18:43:00 +0100 Subject: [PATCH] Divided __init__ into some submodules This didn't change the public "API" used by modules. .core.io contains the IO classes, namely IOHandler, StandaloneIO and JSONIO .core.util contains SettingsBase and ClassFinder .core.exceptions contains all custom exceptions --- i3pystatus/__init__.py | 268 ++-------------------------------- i3pystatus/core/exceptions.py | 24 +++ i3pystatus/core/io.py | 104 +++++++++++++ i3pystatus/core/util.py | 134 +++++++++++++++++ i3pystatus/mkdocs.py | 2 +- 5 files changed, 274 insertions(+), 258 deletions(-) create mode 100644 i3pystatus/core/exceptions.py create mode 100644 i3pystatus/core/io.py create mode 100644 i3pystatus/core/util.py diff --git a/i3pystatus/__init__.py b/i3pystatus/__init__.py index 0345129..57c8480 100644 --- a/i3pystatus/__init__.py +++ b/i3pystatus/__init__.py @@ -1,133 +1,22 @@ #!/usr/bin/env python import sys -import json +import types from threading import Thread import time -from contextlib import contextmanager -import types -import inspect import functools import collections +from .core import io +from .core.util import * + __all__ = [ - "SettingsBase", "ClassFinder", "ModuleFinder", - "ConfigError", "ConfigKeyError", "ConfigMissingError", "ConfigAmbigiousClassesError", "ConfigInvalidModuleError", + "SettingsBase", + "ClassFinder", "ModuleFinder", "Module", "AsyncModule", "IntervalModule", "i3pystatus", "I3statusHandler", ] -class ConfigError(Exception): - """ABC for configuration exceptions""" - def __init__(self, module, *args, **kwargs): - message = "Module '{0}': {1}".format(module, self.format(*args, **kwargs)) - - super().__init__(message) - -class ConfigKeyError(ConfigError, KeyError): - def format(self, key): - return "invalid option '{0}'".format(key) - -class ConfigMissingError(ConfigError): - def format(self, missing): - return "missing required options: {0}".format(missing) - super().__init__(module) - -class ConfigAmbigiousClassesError(ConfigError): - def format(self, ambigious_classes): - return "ambigious module specification, found multiple classes: {0}".format(ambigious_classes) - -class ConfigInvalidModuleError(ConfigError): - def format(self): - return "no class found" - -class KeyConstraintDict(collections.UserDict): - class MissingKeys(Exception): - def __init__(self, keys): - self.keys = keys - - def __init__(self, valid_keys, required_keys): - super().__init__() - - self.valid_keys = valid_keys - self.required_keys = set(required_keys) - self.seen_keys = set() - - def __setitem__(self, key, value): - if key in self.valid_keys: - self.seen_keys.add(key) - self.data[key] = value - else: - raise KeyError(key) - - def missing(self): - return self.required_keys - (self.seen_keys & self.required_keys) - - def __iter__(self): - if self.missing(): - raise self.MissingKeys(self.missing()) - - return self.data.__iter__() - -class SettingsBase: - """ - Support class for providing a nice and flexible settings interface - - Classes inherit from this class and define what settings they provide and - which are required. - - The constructor is either passed a dictionary containing these settings, or - keyword arguments specifying the same. - - Settings are stored as attributes of self - """ - - settings = tuple() - """settings should be tuple containing two types of elements: - * bare strings, which must be valid identifiers. - * two-tuples, the first element being a identifier (as above) and the second - a docstring for the particular setting""" - - required = tuple() - """required can list settings which are required""" - - def __init__(self, *args, **kwargs): - def flatten_setting(setting): - return setting[0] if isinstance(setting, tuple) else setting - def flatten_settings(settings): - return tuple(flatten_setting(setting) for setting in settings) - - def get_argument_dict(args, kwargs): - if len(args) == 1 and not kwargs: - # User can also pass in a dict for their settings - # Note: you could do that anyway, with the ** syntax - return args[0] - return kwargs - - self.settings = flatten_settings(self.settings) - - sm = KeyConstraintDict(self.settings, self.required) - settings_source = get_argument_dict(args, kwargs) - - try: - sm.update(settings_source) - except KeyError as exc: - raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc - - try: - self.__dict__.update(sm) - except KeyConstraintDict.MissingKeys as exc: - raise ConfigMissingError(type(self).__name__, missing=exc.keys) from exc - - self.__name__ = "{}.{}".format(self.__module__, self.__class__.__name__) - - self.init() - - def init(self): - """Convenience method which is called after all settings are set - - In case you don't want to type that super()…blabla :-)""" - class Module(SettingsBase): output = None position = 0 @@ -161,151 +50,14 @@ class IntervalModule(AsyncModule): self.run() time.sleep(self.interval) -class ClassFinder: - """Support class to find classes of specific bases in a module""" - - def __init__(self, baseclass, exclude=[]): - self.baseclass = baseclass - self.exclude = exclude - - def predicate_factory(self, module): - def predicate(obj): - return ( - inspect.isclass(obj) and - issubclass(obj, self.baseclass) and - obj not in self.exclude and - obj.__module__ == module.__name__ - ) - return predicate - - def search_module(self, module): - # Neat trick: [(x,y),(u,v)] becomes [(x,u),(y,v)] - return list(zip(*inspect.getmembers(module, self.predicate_factory(module))))[1] - - def get_class(self, module): - classes = self.search_module(module) - - if len(classes) > 1: - # If there are multiple Module clases bundled in one module, - # well, we can't decide for the user. - raise ConfigAmbigiousClassesError(module.__name__, classes) - elif not classes: - raise ConfigInvalidModuleError(module.__name__) - - return classes[0] - - def instanciate_class_from_module(self, module, *args, **kwargs): - return self.get_class(module)(*args, **kwargs) - -ModuleFinder = functools.partial(ClassFinder, baseclass=Module, exclude=[Module, IntervalModule, AsyncModule]) - -class IOHandler: - def __init__(self, inp=sys.stdin, out=sys.stdout): - self.inp = inp - self.out = out - - def write_line(self, message): - """Unbuffered printing to stdout.""" - - self.out.write(message + "\n") - self.out.flush() - - def read(self): - """Iterate over all input lines (Generator)""" - - while True: - try: - yield self.read_line() - except EOFError: - return - - def read_line(self): - """Interrupted respecting reader for stdin. - - Raises EOFError if the end of stream has been reached""" - - try: - line = self.inp.readline().strip() - except KeyboardInterrupt: - raise EOFError() - - # i3status sends EOF, or an empty line - if not line: - raise EOFError() - return line - -class StandaloneIO(IOHandler): - """ - I/O handler for standalone usage of i3pystatus (w/o i3status) - - Writing works as usual, but reading will always return a empty JSON array, - and the i3bar protocol header - """ - - n = -1 - proto = ( - '{"version":1}', - "[", - "[]", - ",[]", - ) - - def __init__(self, interval=1): - super().__init__() - self.interval = interval - - def read(self): - while True: - try: - time.sleep(self.interval) - except KeyboardInterrupt: - return - yield self.read_line() - - def read_line(self): - self.n += 1 - - return self.proto[min(self.n, len(self.proto)-1)] - -class JSONIO: - def __init__(self, io): - self.io = io - self.io.write_line(self.io.read_line()) - self.io.write_line(self.io.read_line()) - - def read(self): - """Iterate over all JSON input (Generator)""" - - for line in self.io.read(): - with self.parse_line(line) as j: - yield j - - @contextmanager - def parse_line(self, line): - """ - Parse a single line of JSON and write modified JSON back. - - Usage is quite simple using the usual with-Syntax. - """ - - prefix = "" - - # ignore comma at start of lines - if line.startswith(","): - line, prefix = line[1:], "," - - j = json.loads(line) - yield j - self.io.write_line(prefix + json.dumps(j)) - class i3pystatus: modules = [] def __init__(self, standalone=False, interval=1, input_stream=sys.stdin): if standalone: - self.io = StandaloneIO(interval) + self.io = core.io.StandaloneIO(interval) else: - self.io = IOHandler(input_stream) + self.io = core.io.IOHandler(input_stream) self.finder = ModuleFinder() @@ -338,7 +90,9 @@ class i3pystatus: module.registered(self) def run(self): - for j in JSONIO(self.io).read(): + for j in core.io.JSONIO(self.io).read(): for module in self.modules: module.inject(j) I3statusHandler = i3pystatus + +ModuleFinder = functools.partial(ClassFinder, baseclass=Module, exclude=[Module, IntervalModule, AsyncModule]) diff --git a/i3pystatus/core/exceptions.py b/i3pystatus/core/exceptions.py new file mode 100644 index 0000000..4d5256b --- /dev/null +++ b/i3pystatus/core/exceptions.py @@ -0,0 +1,24 @@ + +class ConfigError(Exception): + """ABC for configuration exceptions""" + def __init__(self, module, *args, **kwargs): + message = "Module '{0}': {1}".format(module, self.format(*args, **kwargs)) + + super().__init__(message) + +class ConfigKeyError(ConfigError, KeyError): + def format(self, key): + return "invalid option '{0}'".format(key) + +class ConfigMissingError(ConfigError): + def format(self, missing): + return "missing required options: {0}".format(missing) + super().__init__(module) + +class ConfigAmbigiousClassesError(ConfigError): + def format(self, ambigious_classes): + return "ambigious module specification, found multiple classes: {0}".format(ambigious_classes) + +class ConfigInvalidModuleError(ConfigError): + def format(self): + return "no class found" diff --git a/i3pystatus/core/io.py b/i3pystatus/core/io.py new file mode 100644 index 0000000..ef311b1 --- /dev/null +++ b/i3pystatus/core/io.py @@ -0,0 +1,104 @@ + +import time +import json +import sys +from contextlib import contextmanager + +class IOHandler: + def __init__(self, inp=sys.stdin, out=sys.stdout): + self.inp = inp + self.out = out + + def write_line(self, message): + """Unbuffered printing to stdout.""" + + self.out.write(message + "\n") + self.out.flush() + + def read(self): + """Iterate over all input lines (Generator)""" + + while True: + try: + yield self.read_line() + except EOFError: + return + + def read_line(self): + """Interrupted respecting reader for stdin. + + Raises EOFError if the end of stream has been reached""" + + try: + line = self.inp.readline().strip() + except KeyboardInterrupt: + raise EOFError() + + # i3status sends EOF, or an empty line + if not line: + raise EOFError() + return line + +class StandaloneIO(IOHandler): + """ + I/O handler for standalone usage of i3pystatus (w/o i3status) + + Writing works as usual, but reading will always return a empty JSON array, + and the i3bar protocol header + """ + + n = -1 + proto = ( + '{"version":1}', + "[", + "[]", + ",[]", + ) + + def __init__(self, interval=1): + super().__init__() + self.interval = interval + + def read(self): + while True: + try: + time.sleep(self.interval) + except KeyboardInterrupt: + return + yield self.read_line() + + def read_line(self): + self.n += 1 + + return self.proto[min(self.n, len(self.proto)-1)] + +class JSONIO: + def __init__(self, io): + self.io = io + self.io.write_line(self.io.read_line()) + self.io.write_line(self.io.read_line()) + + def read(self): + """Iterate over all JSON input (Generator)""" + + for line in self.io.read(): + with self.parse_line(line) as j: + yield j + + @contextmanager + def parse_line(self, line): + """ + Parse a single line of JSON and write modified JSON back. + + Usage is quite simple using the usual with-Syntax. + """ + + prefix = "" + + # ignore comma at start of lines + if line.startswith(","): + line, prefix = line[1:], "," + + j = json.loads(line) + yield j + self.io.write_line(prefix + json.dumps(j)) \ No newline at end of file diff --git a/i3pystatus/core/util.py b/i3pystatus/core/util.py new file mode 100644 index 0000000..5268bd1 --- /dev/null +++ b/i3pystatus/core/util.py @@ -0,0 +1,134 @@ + +import inspect +import types +import collections + +from .exceptions import * + +__all__ = [ + "SettingsBase", + "ClassFinder", +] + +class KeyConstraintDict(collections.UserDict): + class MissingKeys(Exception): + def __init__(self, keys): + self.keys = keys + + def __init__(self, valid_keys, required_keys): + super().__init__() + + self.valid_keys = valid_keys + self.required_keys = set(required_keys) + self.seen_keys = set() + + def __setitem__(self, key, value): + if key in self.valid_keys: + self.seen_keys.add(key) + self.data[key] = value + else: + raise KeyError(key) + + def missing(self): + return self.required_keys - (self.seen_keys & self.required_keys) + + def __iter__(self): + if self.missing(): + raise self.MissingKeys(self.missing()) + + return self.data.__iter__() + +class SettingsBase: + """ + Support class for providing a nice and flexible settings interface + + Classes inherit from this class and define what settings they provide and + which are required. + + The constructor is either passed a dictionary containing these settings, or + keyword arguments specifying the same. + + Settings are stored as attributes of self + """ + + settings = tuple() + """settings should be tuple containing two types of elements: + * bare strings, which must be valid identifiers. + * two-tuples, the first element being a identifier (as above) and the second + a docstring for the particular setting""" + + required = tuple() + """required can list settings which are required""" + + def __init__(self, *args, **kwargs): + def flatten_setting(setting): + return setting[0] if isinstance(setting, tuple) else setting + def flatten_settings(settings): + return tuple(flatten_setting(setting) for setting in settings) + + def get_argument_dict(args, kwargs): + if len(args) == 1 and not kwargs: + # User can also pass in a dict for their settings + # Note: you could do that anyway, with the ** syntax + return args[0] + return kwargs + + self.settings = flatten_settings(self.settings) + + sm = KeyConstraintDict(self.settings, self.required) + settings_source = get_argument_dict(args, kwargs) + + try: + sm.update(settings_source) + except KeyError as exc: + raise ConfigKeyError(type(self).__name__, key=exc.args[0]) from exc + + try: + self.__dict__.update(sm) + except KeyConstraintDict.MissingKeys as exc: + raise ConfigMissingError(type(self).__name__, missing=exc.keys) from exc + + self.__name__ = "{}.{}".format(self.__module__, self.__class__.__name__) + + self.init() + + def init(self): + """Convenience method which is called after all settings are set + + In case you don't want to type that super()…blabla :-)""" + +class ClassFinder: + """Support class to find classes of specific bases in a module""" + + def __init__(self, baseclass, exclude=[]): + self.baseclass = baseclass + self.exclude = exclude + + def predicate_factory(self, module): + def predicate(obj): + return ( + inspect.isclass(obj) and + issubclass(obj, self.baseclass) and + obj not in self.exclude and + obj.__module__ == module.__name__ + ) + return predicate + + def search_module(self, module): + # Neat trick: [(x,y),(u,v)] becomes [(x,u),(y,v)] + return list(zip(*inspect.getmembers(module, self.predicate_factory(module))))[1] + + def get_class(self, module): + classes = self.search_module(module) + + if len(classes) > 1: + # If there are multiple Module clases bundled in one module, + # well, we can't decide for the user. + raise ConfigAmbigiousClassesError(module.__name__, classes) + elif not classes: + raise ConfigInvalidModuleError(module.__name__) + + return classes[0] + + def instanciate_class_from_module(self, module, *args, **kwargs): + return self.get_class(module)(*args, **kwargs) diff --git a/i3pystatus/mkdocs.py b/i3pystatus/mkdocs.py index 96b1504..7eccca8 100755 --- a/i3pystatus/mkdocs.py +++ b/i3pystatus/mkdocs.py @@ -135,4 +135,4 @@ with open("template.md", "r") as template: finder = i3pystatus.ClassFinder(baseclass=i3pystatus.mail.Backend, exclude=[i3pystatus.mail.Backend]) tpl = tpl.replace("!!i3pystatus.mail!!", generate_doc_for_module(i3pystatus.mail.__path__, "###", finder).replace("\n", "\n> ")) - print(tpl) \ No newline at end of file + print(tpl)