commit
9c94a8d812
@ -32,7 +32,7 @@ MOCK_MODULES = [
|
||||
"bs4",
|
||||
"dota2py",
|
||||
"novaclient",
|
||||
"speedtest_cli",
|
||||
"speedtest",
|
||||
"pyzabbix",
|
||||
"vk",
|
||||
"google-api-python-client",
|
||||
|
@ -1,5 +1,5 @@
|
||||
from i3pystatus import IntervalModule
|
||||
import speedtest_cli
|
||||
import speedtest
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
@ -12,23 +12,58 @@ from io import StringIO
|
||||
class NetSpeed(IntervalModule):
|
||||
"""
|
||||
Attempts to provide an estimation of internet speeds.
|
||||
Requires: speedtest_cli
|
||||
Requires: speedtest-cli/modularize-2
|
||||
speedtest-cli/modularize-2 can be installed using pip:
|
||||
`pip install git+https://github.com/sivel/speedtest-cli.git@modularize-2`
|
||||
"""
|
||||
|
||||
settings = (
|
||||
("url", "Target URL to download a file from. Uses speedtest_cli to "
|
||||
"find the 'best' server if none is supplied."),
|
||||
("units", "Valid values are B, b, bytes, or bits"),
|
||||
"format"
|
||||
)
|
||||
color = "#FFFFFF"
|
||||
interval = 300
|
||||
url = None
|
||||
units = 'bits'
|
||||
format = "{speed} ({hosting_provider})"
|
||||
format = "↓{speed_down:.1f}{down_units} ↑{speed_up:.1f}{up_units} ({hosting_provider})"
|
||||
|
||||
def form_b(self, n: float)->tuple:
|
||||
"""
|
||||
formats a bps as bps/kbps/mbps/gbps etc
|
||||
handles whether its meant to be in bytes
|
||||
:param n: input float
|
||||
:rtype tuple:
|
||||
:return: tuple of float-number of mbps etc, str-units
|
||||
"""
|
||||
unit = 'bps'
|
||||
kilo = 1000
|
||||
mega = 1000000
|
||||
giga = 1000000000
|
||||
bps = 0
|
||||
|
||||
if self.units == 'bytes' or self.units == 'B':
|
||||
unit = 'Bps'
|
||||
kilo = 8000
|
||||
mega = 8000000
|
||||
giga = 8000000000
|
||||
|
||||
if n < kilo:
|
||||
bps = float(n)
|
||||
|
||||
if n >= kilo and n < mega:
|
||||
unit = "K" + unit
|
||||
bps = float(n / 1024.0)
|
||||
|
||||
if n >= mega and n < giga:
|
||||
unit = "M" + unit
|
||||
bps = float(n / (1024.0 * 1024.0))
|
||||
|
||||
if n >= giga:
|
||||
unit = "G" + unit
|
||||
bps = float(n / (1024.0 * 1024.0 * 1024.0))
|
||||
|
||||
return bps, unit
|
||||
|
||||
def run(self):
|
||||
|
||||
# since speedtest_cli likes to print crap, we need to squelch it
|
||||
@contextlib.contextmanager
|
||||
def nostdout():
|
||||
@ -37,73 +72,44 @@ class NetSpeed(IntervalModule):
|
||||
yield
|
||||
sys.stdout = save_stdout
|
||||
|
||||
if not self.url:
|
||||
with nostdout():
|
||||
cdict = {
|
||||
"speed_up": 0.0,
|
||||
"speed_down": 0.0,
|
||||
"down_units": "",
|
||||
"up_units": "",
|
||||
"hosting_provider": 'null'
|
||||
}
|
||||
st = None
|
||||
with nostdout():
|
||||
try:
|
||||
# this is now the canonical way to use speedtest_cli as a module.
|
||||
st = speedtest.Speedtest()
|
||||
except speedtest.ConfigRetrievalError:
|
||||
# log('Cannot retrieve speedtest configuration')
|
||||
self.output = {}
|
||||
if st:
|
||||
try:
|
||||
config = speedtest_cli.getConfig()
|
||||
servers = speedtest_cli.closestServers(config['client'])
|
||||
best = speedtest_cli.getBestServer(servers)
|
||||
# 1500x1500 is about 4.3MB, which seems like a reasonable place to
|
||||
# start, i guess...
|
||||
url = '%s/random1500x1500.jpg' % os.path.dirname(best['url'])
|
||||
except KeyError:
|
||||
url = None
|
||||
# get the servers
|
||||
st.get_servers()
|
||||
st.get_best_server()
|
||||
|
||||
if not url:
|
||||
cdict = {
|
||||
"speed": 0,
|
||||
"hosting_provider": 'null',
|
||||
}
|
||||
else:
|
||||
with open('/dev/null', 'wb') as devnull:
|
||||
start = time.time()
|
||||
req = requests.get(url, stream=True)
|
||||
devnull.write(req.content)
|
||||
end = time.time()
|
||||
total_length = int(req.headers.get('content-length'))
|
||||
devnull.close()
|
||||
except speedtest.ServersRetrievalError:
|
||||
# log this somehow
|
||||
# log('Cannot retrieve speedtest server list')
|
||||
pass
|
||||
results = st.results
|
||||
|
||||
# chop off the float after the 4th decimal point
|
||||
# note: not rounding, simply cutting
|
||||
# note: dl_time is in seconds
|
||||
dl_time = float(end - start)
|
||||
down, up = st.download(), st.upload()
|
||||
speed_down, down_units = self.form_b(down)
|
||||
speed_up, up_units = self.form_b(up)
|
||||
|
||||
if self.units == 'bits' or self.units == 'b':
|
||||
unit = 'bps'
|
||||
kilo = 1000
|
||||
mega = 1000000
|
||||
giga = 1000000000
|
||||
factor = 8
|
||||
elif self.units == 'bytes' or self.units == 'B':
|
||||
unit = 'Bps'
|
||||
kilo = 8000
|
||||
mega = 8000000
|
||||
giga = 8000000000
|
||||
factor = 1
|
||||
|
||||
if total_length < kilo:
|
||||
bps = float(total_length / dl_time)
|
||||
|
||||
if total_length >= kilo and total_length < mega:
|
||||
unit = "K" + unit
|
||||
bps = float((total_length / 1024.0) / dl_time)
|
||||
|
||||
if total_length >= mega and total_length < giga:
|
||||
unit = "M" + unit
|
||||
bps = float((total_length / (1024.0 * 1024.0)) / dl_time)
|
||||
|
||||
if total_length >= giga:
|
||||
unit = "G" + unit
|
||||
bps = float((total_length / (1024.0 * 1024.0 * 1024.0)) / dl_time)
|
||||
|
||||
bps = "%.2f" % (bps * factor)
|
||||
speed = "%s %s" % (bps, unit)
|
||||
hosting_provider = '.'.join(urlparse(url).hostname.split('.')[-2:])
|
||||
|
||||
cdict = {
|
||||
"speed": speed,
|
||||
"hosting_provider": hosting_provider,
|
||||
}
|
||||
cdict = {
|
||||
"speed_down": speed_down,
|
||||
"speed_up": speed_up,
|
||||
"up_units": up_units,
|
||||
"down_units": down_units,
|
||||
"hosting_provider": results.server.get("sponsor", "Unknown Provider")
|
||||
}
|
||||
|
||||
self.output = {
|
||||
"full_text": self.format.format(**cdict),
|
||||
|
Loading…
Reference in New Issue
Block a user