i3pystatus/i3pystatus/network.py
2015-01-20 08:22:31 +08:00

371 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import netifaces
from i3pystatus import IntervalModule
from i3pystatus.core.color import ColorRangeModule
from i3pystatus.core.util import make_graph, round_dict, make_bar
def count_bits(integer):
bits = 0
while (integer):
integer &= integer - 1
bits += 1
return bits
def v6_to_int(v6):
return int(v6.replace(":", ""), 16)
def prefix6(mask):
return count_bits(v6_to_int(mask))
def cidr6(addr, mask):
return "{addr}/{bits}".format(addr=addr, bits=prefix6(mask))
def v4_to_int(v4):
sum = 0
mul = 1
for part in reversed(v4.split(".")):
sum += int(part) * mul
mul *= 2 ** 8
return sum
def prefix4(mask):
return count_bits(v4_to_int(mask))
def cidr4(addr, mask):
return "{addr}/{bits}".format(addr=addr, bits=prefix4(mask))
def get_bonded_slaves():
try:
with open("/sys/class/net/bonding_masters") as f:
masters = f.read().split()
except FileNotFoundError:
return {}
slaves = {}
for master in masters:
with open("/sys/class/net/{}/bonding/slaves".format(master)) as f:
for slave in f.read().split():
slaves[slave] = master
return slaves
def sysfs_interface_up(interface, unknown_up=False):
try:
with open("/sys/class/net/{}/operstate".format(interface)) as f:
status = f.read().strip()
except FileNotFoundError:
# Interface doesn't exist
return False
return status == "up" or unknown_up and status == "unknown"
class NetworkInfo():
"""
Retrieve network information.
"""
def __init__(self, interface, ignore_interfaces, detached_down, unknown_up, get_wifi_info=False):
if interface not in netifaces.interfaces() and not detached_down:
raise RuntimeError(
"Unknown interface {iface}!".format(iface=interface))
self.ignore_interfaces = ignore_interfaces
self.detached_down = detached_down
self.unknown_up = unknown_up
self.get_wifi_info = get_wifi_info
def get_info(self, interface):
format_dict = dict(v4="", v4mask="", v4cidr="", v6="", v6mask="", v6cidr="")
iface_up = sysfs_interface_up(interface, self.unknown_up)
if not iface_up:
return format_dict
network_info = netifaces.ifaddresses(interface)
slaves = get_bonded_slaves()
try:
master = slaves[interface]
except KeyError:
pass
else:
if sysfs_interface_up(interface, self.unknown_up):
master_info = netifaces.ifaddresses(master)
for af in (netifaces.AF_INET, netifaces.AF_INET6):
try:
network_info[af] = master_info[af]
except KeyError:
pass
try:
mac = network_info[netifaces.AF_PACKET][0]["addr"]
except KeyError:
mac = "NONE"
format_dict['mac'] = mac
if iface_up:
format_dict.update(self.extract_network_info(network_info))
format_dict.update(self.extract_wireless_info(interface))
return format_dict
@staticmethod
def extract_network_info(network_info):
info = dict()
if netifaces.AF_INET in network_info:
v4 = network_info[netifaces.AF_INET][0]
info["v4"] = v4["addr"]
info["v4mask"] = v4["netmask"]
info["v4cidr"] = cidr4(v4["addr"], v4["netmask"])
if netifaces.AF_INET6 in network_info:
for v6 in network_info[netifaces.AF_INET6]:
info["v6"] = v6["addr"]
info["v6mask"] = v6["netmask"]
info["v6cidr"] = cidr6(v6["addr"], v6["netmask"])
if not v6["addr"].startswith("fe80::"): # prefer non link-local addresses
break
return info
def extract_wireless_info(self, interface):
info = dict(essid="", freq="", quality=0.0, quality_bar="")
# Just return empty values if we're not using any Wifi functionality
if not self.get_wifi_info:
return info
import basiciw
try:
iwi = basiciw.iwinfo(interface)
except Exception:
# Not a wireless interface
return info
info["essid"] = iwi["essid"]
info["freq"] = iwi["freq"]
quality = iwi["quality"]
if quality["quality_max"] > 0:
info["quality"] = quality["quality"] / quality["quality_max"]
else:
info["quality"] = quality["quality"]
info["quality"] *= 100
info["quality_bar"] = make_bar(info["quality"])
return info
class NetworkTraffic():
"""
Retrieve network traffic information
"""
pnic = None
pnic_before = None
def __init__(self, unknown_up, divisor, round_size):
self.unknown_up = unknown_up
self.divisor = divisor
self.round_size = round_size
def update_counters(self, interface):
import psutil
self.pnic_before = self.pnic
counters = psutil.net_io_counters(pernic=True)
self.pnic = counters[interface] if interface in counters else None
def clear_counters(self):
self.pnic_before = None
self.pnic = None
def get_bytes_sent(self):
return (self.pnic.bytes_sent - self.pnic_before.bytes_sent) / self.divisor
def get_bytes_received(self):
return (self.pnic.bytes_recv - self.pnic_before.bytes_recv) / self.divisor
def get_packets_sent(self):
return self.pnic.packets_sent - self.pnic_before.packets_sent
def get_packets_received(self):
return self.pnic.packets_recv - self.pnic_before.packets_recv
def get_usage(self, interface):
self.update_counters(interface)
usage = dict(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0)
if not sysfs_interface_up(interface, self.unknown_up) or not self.pnic_before:
return usage
else:
usage["bytes_sent"] = self.get_bytes_sent()
usage["bytes_recv"] = self.get_bytes_received()
usage["packets_sent"] = self.get_packets_sent()
usage["packets_recv"] = self.get_packets_received()
round_dict(usage, self.round_size)
return usage
class Network(IntervalModule, ColorRangeModule):
"""
Displays network information for an interface.
Requires the PyPI packages `psutil`, `colour`, `netifaces` and `basiciw`
.. rubric:: Available formatters
Network Traffic Formatters:
* `{interface}` — the configured network interface
* `{kbs}` Float representing kb\s
* `{network_graph}` Unicode graph representing network usage
* `{bytes_sent}` — bytes sent per second (divided by divisor)
* `{bytes_recv}` — bytes received per second (divided by divisor)
* `{packets_sent}` — bytes sent per second (divided by divisor)
* `{packets_recv}` — bytes received per second (divided by divisor)
Network Information Formatters:
* `{interface}` — same as setting
* `{v4}` — IPv4 address
* `{v4mask}` — subnet mask
* `{v4cidr}` — IPv4 address in cidr notation (i.e. 192.168.2.204/24)
* `{v6}` — IPv6 address
* `{v6mask}` — subnet mask
* `{v6cidr}` — IPv6 address in cidr notation
* `{mac}` — MAC of interface
Wireless Information Formatters:
* `{essid}` — ESSID of currently connected wifi
* `{freq}` — Current frequency
* `{quality}` — Link quality in percent
* `{quality_bar}` —Bar graphically representing link quality
"""
settings = (
("format_up", "format string"),
("format_down", "format string"),
"color_up",
"color_down",
("interface", "Interface to watch, eg 'eth0'"),
("dynamic_color", "Set color dynamically based on network traffic. Note: this overrides color_up"),
("start_color", "Hex or English name for start of color range, eg '#00FF00' or 'green'"),
("end_color", "Hex or English name for end of color range, eg '#FF0000' or 'red'"),
("graph_width", "Width of the network traffic graph"),
("upper_limit",
"Expected max kb/s. This value controls how the network traffic graph is drawn and in what color"),
("graph_type", "Whether to draw the network traffic graph for input or output. "
"Allowed values 'input' or 'output'"),
("divisor", "divide all byte values by this value"),
("ignore_interfaces", "Array of interfaces to ignore when cycling through "
"on click, eg, ['lo']"),
("round_size", "defines number of digits in round"),
("detached_down", "If the interface doesn't exist, display it as if it were down"),
("unknown_up", "If the interface is in unknown state, display it as if it were up"),
)
interval = 1
interface = 'eth0'
format_up = "{interface} {network_graph}{kbs}KB/s"
format_down = "{interface}: DOWN"
color_up = "#00FF00"
color_down = "#FF0000"
dynamic_color = True
graph_type = 'input'
graph_width = 15
upper_limit = 150.0
# Network traffic settings
divisor = 1024
round_size = None
# Network info settings
detached_down = True
unknown_up = False
ignore_interfaces = ["lo"]
on_leftclick = "nm-connection-editor"
on_rightclick = "cycle_interface"
on_upscroll = ['cycle_interface', 1]
on_downscroll = ['cycle_interface', -1]
def init(self):
# Don't require importing basiciw unless using the functionality it offers.
if any(s in self.format_up or s in self.format_up for s in
['essid', 'freq', 'quality', 'quality_bar']):
get_wifi_info = True
else:
get_wifi_info = False
self.network_info = NetworkInfo(self.interface, self.ignore_interfaces, self.detached_down, self.unknown_up,
get_wifi_info)
# Don't require importing psutil unless using the functionality it offers.
if any(s in self.format_up or s in self.format_down for s in
['bytes_sent', 'bytes_recv', 'packets_sent', 'packets_recv', 'network_graph']):
self.network_traffic = NetworkTraffic(self.unknown_up, self.divisor, self.round_size)
else:
self.network_traffic = None
self.colors = self.get_hex_color_range(self.start_color, self.end_color, int(self.upper_limit))
self.kbs_arr = [0.0] * self.graph_width
def cycle_interface(self, increment=1):
interfaces = [i for i in netifaces.interfaces() if i not in self.ignore_interfaces]
if self.interface in interfaces:
next_index = (interfaces.index(self.interface) + increment) % len(interfaces)
self.interface = interfaces[next_index]
elif len(interfaces) > 0:
self.interface = interfaces[0]
if self.network_traffic:
self.network_traffic.clear_counters()
self.kbs_arr = [0.0] * self.graph_width
def get_network_graph(self, kbs):
# Cycle array by inserting at the start and chopping off the last element
self.kbs_arr.insert(0, kbs)
self.kbs_arr = self.kbs_arr[:self.graph_width]
return make_graph(self.kbs_arr, self.upper_limit)
def run(self):
format_values = dict(kbs="", network_graph="", bytes_sent="", bytes_recv="", packets_sent="", packets_recv="",
interface="", v4="", v4mask="", v4cidr="", v6="", v6mask="", v6cidr="", mac="",
essid="", freq="", quality="", quality_bar="")
color = None
if self.network_traffic:
network_usage = self.network_traffic.get_usage(self.interface)
format_values.update(network_usage)
if self.graph_type == 'input':
kbs = network_usage['bytes_recv']
elif self.graph_type == 'output':
kbs = network_usage['bytes_sent']
else:
raise Exception("graph_type must be either 'input' or 'output'!")
format_values['network_graph'] = self.get_network_graph(kbs)
format_values['kbs'] = "{0:.1f}".format(round(kbs, 2)).rjust(6)
color = self.get_gradient(kbs, self.colors, self.upper_limit)
network_info = self.network_info.get_info(self.interface)
format_values.update(network_info)
format_values['interface'] = self.interface
if sysfs_interface_up(self.interface, self.unknown_up):
if not self.dynamic_color:
color = self.color_up
self.output = {
"full_text": self.format_up.format(**format_values),
'color': color,
}
else:
color = self.color_down
self.output = {
"full_text": self.format_down.format(**format_values),
'color': color,
}