New self-organizing/self-optimising threading model (no API changes)

Using a simple, hysteresis based approach… currently in this first
version the optimizer recursively moves modules out of a thread until a
certain timing target is reached; the "modules moved out" just mentioned
are then partitioned into new threads according to their latest time profile.

Very simple, but very effective.
Todo: Implement merging threads with (again) good behaving workloads.
This commit is contained in:
enkore 2013-02-28 00:14:51 +01:00
parent 604a4f8e29
commit 901dd4319f
2 changed files with 148 additions and 6 deletions

View File

@ -2,6 +2,7 @@ from threading import Thread
import time import time
from .util import SettingsBase from .util import SettingsBase
from .threads import AutomagicManager
__all__ = [ __all__ = [
"Module", "AsyncModule", "IntervalModule", "Module", "AsyncModule", "IntervalModule",
@ -28,13 +29,23 @@ class AsyncModule(Module):
def mainloop(self): def mainloop(self):
"""This is run in a separate daemon-thread""" """This is run in a separate daemon-thread"""
class IntervalModule(AsyncModule): class IntervalModule(Module):
interval = 5 # seconds interval = 5 # seconds
managers = {}
def registered(self, status_handler):
if self.interval in IntervalModule.managers:
IntervalModule.managers[self.interval].add(self)
else:
am = AutomagicManager(self.interval)
am.add(self)
IntervalModule.managers[self.interval] = am
def __call__(self):
self.run()
def run(self): def run(self):
"""Called every self.interval seconds""" """Called approximately every self.interval seconds
def mainloop(self): Do not rely on this being called from the same thread at all times.
while True: If you need to always have the same thread context, subclass AsyncModule."""
self.run()
time.sleep(self.interval)

131
i3pystatus/core/threads.py Normal file
View File

@ -0,0 +1,131 @@
import sys
import threading
import time
import traceback
if hasattr(time, "perf_counter"):
timer = time.perf_counter
else:
timer = time.clock
class StderrExcDelegate:
def __call__(self, exc):
traceback.print_exception(*sys.exc_info(), file=sys.stderr)
class ExceptionWrapper:
def __init__(self, workload, exc_delegate):
self.workload = workload
self.exc_delegate = exc_delegate
def __call__(self):
try:
self.workload()
except Exception as exc:
self.exc_delegate(exc)
def __repr__(self):
return repr(self.workload)
class WorkloadWrapper:
def __init__(self, workload):
self.workload = workload
self.time = 0.0
def __call__(self):
tp1 = timer()
self.workload()
self.time = timer() - tp1
def __repr__(self):
return repr(self.workload)
class Thread(threading.Thread):
def __init__(self, target_interval, workloads=None):
super().__init__()
self.workloads = workloads if workloads is not None else []
self.target_interval = target_interval
self.daemon = True
def __iter__(self):
return iter(self.workloads)
def __len__(self):
return len(self.workloads)
def pop(self):
return self.workloads.pop()
def push(self, workload):
self.workloads.append(workload)
def run(self):
while self.workloads:
for workload in self.workloads:
workload()
self.workloads.sort(key=lambda workload: workload.time)
filltime = self.target_interval - self.time
if filltime > 0:
time.sleep(filltime)
@property
def time(self):
return sum(map(lambda workload: workload.time, self.workloads))
class AutomagicManager:
def __init__(self, target_interval):
self.target_interval = target_interval
self.upper_bound = target_interval * 1.1
self.lower_bound = target_interval * 0.7
initial_thread = Thread(target_interval, [self.wrap(self)])
self.threads = [initial_thread]
initial_thread.start()
def __call__(self):
separate = []
for thread in self.threads:
separate.extend(self.optimize(thread, thread.time))
self.create_threads(self.partition(separate))
def wrap(self, workload):
return WorkloadWrapper(ExceptionWrapper(workload, exc_delegate=StderrExcDelegate()))
def optimize(self, thread, vtime):
if len(thread) > 1 and vtime > self.upper_bound:
remove = thread.pop()
return [remove] + self.optimize(thread, vtime - remove.time)
return []
def partition(self, workloads):
timesum = 0.0
new_threads = []
current_thread = []
for workload in workloads:
current_thread.append(workload)
timesum += workload.time
if timesum > self.lower_bound:
new_threads.append(current_thread)
current_thread = []
timesum = 0
return new_threads
def create_threads(self, threads):
for workloads in threads:
self.create_thread(workloads)
def create_thread(self, workloads):
thread = Thread(self.target_interval, workloads)
thread.start()
self.threads.append(thread)
def add(self, workload):
self.threads[0].push(self.wrap(workload))