Files
monitoring/sysinfo.py
2026-04-27 11:35:05 +02:00

245 lines
7.9 KiB
Python

import psutil
import platform
from datetime import datetime
import subprocess
import json
import socket
def getBoard():
result = subprocess.run(['dmidecode', '-t', '1'], stdout=subprocess.PIPE)
result = result.stdout.decode('utf-8').replace("\t","").split("\n")
for item in result:
if "Product Name" in item:
return item.split(":")[1][1:]
return ""
def getSysInfo():
uname = platform.uname()
info = {}
info["hostname"] = uname.node
info["kernel"] = uname.release
info["version"] = uname.version
info["board"] = getBoard()
return info
def getDisk():
disk_io = psutil.disk_io_counters(perdisk=True)
disk_io_dict = {}
for disk in disk_io:
read_data = False
if "nvme" in disk:
if "p" not in disk:
read_data = True
if not any(char.isdigit() for char in disk):
read_data = True
if read_data:
data = {}
data["read"] = disk_io[disk].read_bytes
data["write"] = disk_io[disk].write_bytes
data["io_read"] = disk_io[disk].read_count
data["io_write"] = disk_io[disk].write_count
data["io_read_time"] = disk_io[disk].read_time
data["io_write_time"] = disk_io[disk].write_time
data["io_read_merged"] = disk_io[disk].read_merged_count
data["io_write_merged"] = disk_io[disk].write_merged_count
data["busy"] = disk_io[disk].busy_time
disk_io_dict[disk] = data
return disk_io_dict
def getCPU():
cpu_dict = {}
cpu_dict["time_percent"] = {}
cpu_dict["frequency"] = {}
cpu_time = psutil.cpu_times_percent()
freq = psutil.cpu_freq(percpu=True)
core_index = 0
for cpu in freq:
cpu_dict["frequency"][core_index] = cpu.current
core_index += 1
cpu_dict["usage"] = psutil.cpu_percent()
cpu_dict["cpu_count"] = psutil.cpu_count()
cpu_dict["time_percent"]["user"] = cpu_time.user
cpu_dict["time_percent"]["nice"] = cpu_time.nice
cpu_dict["time_percent"]["system"] = cpu_time.system
cpu_dict["time_percent"]["idle"] = cpu_time.idle
cpu_dict["time_percent"]["iowait"] = cpu_time.iowait
cpu_dict["time_percent"]["irq"] = cpu_time.irq
cpu_dict["time_percent"]["softirq"] = cpu_time.softirq
cpu_dict["time_percent"]["steal"] = cpu_time.steal
cpu_dict["time_percent"]["guest"] = cpu_time.guest
cpu_dict["time_percent"]["guest_nice"] = cpu_time.guest_nice
return cpu_dict
def getMemory():
mem_dict = {}
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
mem_dict["total"] = mem.total
mem_dict["available"] = mem.available
mem_dict["percent"] = mem.percent
mem_dict["used"] = mem.used
mem_dict["free"] = mem.free
mem_dict["active"] = mem.active
mem_dict["inactive"] = mem.inactive
mem_dict["buffers"] = mem.buffers
mem_dict["cached"] = mem.cached
mem_dict["shared"] = mem.shared
mem_dict["slab"] = mem.slab
mem_dict["swap_total"] = swap.total
mem_dict["swap_used"] = swap.used
mem_dict["swap_free"] = swap.free
mem_dict["swap_percent"] = swap.percent
mem_dict["swap_in"] = swap.sin
mem_dict["swap_out"] = swap.sout
return mem_dict
def getPartitions():
part_dict = {}
partitions = psutil.disk_partitions()
for part in partitions:
name = part.device.split('/')[-1]
if "loop" not in name:
part_dict[name] = {}
part_dict[name]["size"] = psutil.disk_usage(part.mountpoint).total
part_dict[name]["used"] = psutil.disk_usage(part.mountpoint).used
part_dict[name]["free"] = psutil.disk_usage(part.mountpoint).free
return part_dict
def getZFS():
zfs_dict = {}
result = subprocess.run(['zpool', 'list', '-jHp'], stdout=subprocess.PIPE)
result = result.stdout.decode('utf-8')
zfs = json.loads(result)
for pool in zfs["pools"]:
zfs_dict[pool] = {}
match zfs["pools"][pool]["state"]:
case "ONLINE":
zfs_dict[pool]["state"] = 2
case "DEGRADED":
zfs_dict[pool]["state"] = 1
case _:
zfs_dict[pool]["state"] = 0
zfs_dict[pool]["size"] = int(zfs["pools"][pool]["properties"]["size"]["value"])
zfs_dict[pool]["used"] = int(zfs["pools"][pool]["properties"]["allocated"]["value"])
zfs_dict[pool]["free"] = int(zfs["pools"][pool]["properties"]["free"]["value"])
zfs_dict[pool]["fragmentation"] = int(zfs["pools"][pool]["properties"]["fragmentation"]["value"])
zfs_dict[pool]["dedup"] = float(zfs["pools"][pool]["properties"]["dedupratio"]["value"])
return zfs_dict
def getUptime():
boot = psutil.boot_time()
uptime = datetime.now().timestamp() - boot
return uptime
def getNetwork():
net_dict = {}
net = psutil.net_io_counters(pernic=True)
for nic in net:
if "fw" not in nic:
if "lo" not in nic:
if "br" not in nic:
net_dict[nic] = {}
net_dict[nic]["rx"] = net[nic].bytes_recv
net_dict[nic]["tx"] = net[nic].bytes_sent
net_dict[nic]["err_rx"] = net[nic].errin
net_dict[nic]["err_tx"] = net[nic].errout
net_dict[nic]["drop_rx"] = net[nic].dropin
net_dict[nic]["drop_tx"] = net[nic].dropout
net_dict[nic]["packet_tx"] = net[nic].packets_sent
net_dict[nic]["packet_rx"] = net[nic].packets_recv
return net_dict
def getIP():
addresses = psutil.net_if_addrs()
addr = {}
for interface in addresses:
for type in addresses[interface]:
if type.family == socket.AF_INET:
addr[interface] = type.address
return addr
def users():
result = subprocess.run(['w'], stdout=subprocess.PIPE)
result = result.stdout.decode('utf-8').split('\n')
if "FROM" not in result[1]:
result = subprocess.run(['w', '-f'], stdout=subprocess.PIPE)
result = result.stdout.decode('utf-8').split('\n')
result.pop(-1)
header = []
users = {}
if len(result) > 2:
for i in range(1,len(result)):
a = result[i].split(" ")
user = []
for item in a:
if item != "":
if i == 1:
header.append(item)
else:
user.append(item)
active_user = ""
for item in range(0,len(header)):
try:
if "USER" in header[item]:
if user[item] in users:
users[user[item]]["sessions"] = users[user[item]]["sessions"] + 1
else:
users[user[item]] = {}
users[user[item]]["sessions"] = 1
users[user[item]]["session"] = {}
active_user = user[item]
if "FROM" in header[item]:
users[active_user]["session"][users[active_user]["sessions"]] = {}
if user[item].count(".") == 3:
users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item]
else:
if user[item] == "-":
users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item]
else:
users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item-1]
except:
pass
return users
if __name__ == "__main__":
# print(getSysInfo())
print(getDisk())
# print(users())
# print(getCPU())
# print(getMemory())
# print(getZFS())
print(getPartitions())
# print(getUptime())
# print(getNetwork())