import psutil import platform from datetime import datetime import subprocess import json import socket def getBoard(): result = subprocess.run(['dmidecode', '-t', '1'], stdout=subprocess.PIPE) result = result.stdout.decode('utf-8').replace("\t","").split("\n") for item in result: if "Product Name" in item: return item.split(":")[1][1:] return "" def getSysInfo(): uname = platform.uname() info = {} info["hostname"] = uname.node info["kernel"] = uname.release info["version"] = uname.version info["board"] = getBoard() return info def getDisk(): disk_io = psutil.disk_io_counters(perdisk=True) disk_io_dict = {} for disk in disk_io: read_data = False if "nvme" in disk: if "p" not in disk: read_data = True if not any(char.isdigit() for char in disk): read_data = True if read_data: data = {} data["read"] = disk_io[disk].read_bytes data["write"] = disk_io[disk].write_bytes data["io_read"] = disk_io[disk].read_count data["io_write"] = disk_io[disk].write_count data["io_read_time"] = disk_io[disk].read_time data["io_write_time"] = disk_io[disk].write_time data["io_read_merged"] = disk_io[disk].read_merged_count data["io_write_merged"] = disk_io[disk].write_merged_count data["busy"] = disk_io[disk].busy_time disk_io_dict[disk] = data return disk_io_dict def getCPU(): cpu_dict = {} cpu_dict["time_percent"] = {} cpu_dict["frequency"] = {} cpu_time = psutil.cpu_times_percent() freq = psutil.cpu_freq(percpu=True) core_index = 0 for cpu in freq: cpu_dict["frequency"][core_index] = cpu.current core_index += 1 cpu_dict["usage"] = psutil.cpu_percent() cpu_dict["cpu_count"] = psutil.cpu_count() cpu_dict["time_percent"]["user"] = cpu_time.user cpu_dict["time_percent"]["nice"] = cpu_time.nice cpu_dict["time_percent"]["system"] = cpu_time.system cpu_dict["time_percent"]["idle"] = cpu_time.idle cpu_dict["time_percent"]["iowait"] = cpu_time.iowait cpu_dict["time_percent"]["irq"] = cpu_time.irq cpu_dict["time_percent"]["softirq"] = cpu_time.softirq cpu_dict["time_percent"]["steal"] = cpu_time.steal cpu_dict["time_percent"]["guest"] = cpu_time.guest cpu_dict["time_percent"]["guest_nice"] = cpu_time.guest_nice return cpu_dict def getMemory(): mem_dict = {} mem = psutil.virtual_memory() swap = psutil.swap_memory() mem_dict["total"] = mem.total mem_dict["available"] = mem.available mem_dict["percent"] = mem.percent mem_dict["used"] = mem.used mem_dict["free"] = mem.free mem_dict["active"] = mem.active mem_dict["inactive"] = mem.inactive mem_dict["buffers"] = mem.buffers mem_dict["cached"] = mem.cached mem_dict["shared"] = mem.shared mem_dict["slab"] = mem.slab mem_dict["swap_total"] = swap.total mem_dict["swap_used"] = swap.used mem_dict["swap_free"] = swap.free mem_dict["swap_percent"] = swap.percent mem_dict["swap_in"] = swap.sin mem_dict["swap_out"] = swap.sout return mem_dict def getPartitions(): part_dict = {} partitions = psutil.disk_partitions() for part in partitions: name = part.device.split('/')[-1] if "loop" not in name: part_dict[name] = {} part_dict[name]["size"] = psutil.disk_usage(part.mountpoint).total part_dict[name]["used"] = psutil.disk_usage(part.mountpoint).used part_dict[name]["free"] = psutil.disk_usage(part.mountpoint).free return part_dict def getZFS(): zfs_dict = {} result = subprocess.run(['zpool', 'list', '-jHp'], stdout=subprocess.PIPE) result = result.stdout.decode('utf-8') zfs = json.loads(result) for pool in zfs["pools"]: zfs_dict[pool] = {} match zfs["pools"][pool]["state"]: case "ONLINE": zfs_dict[pool]["state"] = 2 case "DEGRADED": zfs_dict[pool]["state"] = 1 case _: zfs_dict[pool]["state"] = 0 zfs_dict[pool]["size"] = int(zfs["pools"][pool]["properties"]["size"]["value"]) zfs_dict[pool]["used"] = int(zfs["pools"][pool]["properties"]["allocated"]["value"]) zfs_dict[pool]["free"] = int(zfs["pools"][pool]["properties"]["free"]["value"]) zfs_dict[pool]["fragmentation"] = int(zfs["pools"][pool]["properties"]["fragmentation"]["value"]) zfs_dict[pool]["dedup"] = float(zfs["pools"][pool]["properties"]["dedupratio"]["value"]) return zfs_dict def getUptime(): boot = psutil.boot_time() uptime = datetime.now().timestamp() - boot return uptime def getNetwork(): net_dict = {} net = psutil.net_io_counters(pernic=True) for nic in net: if "fw" not in nic: if "lo" not in nic: if "br" not in nic: net_dict[nic] = {} net_dict[nic]["rx"] = net[nic].bytes_recv net_dict[nic]["tx"] = net[nic].bytes_sent net_dict[nic]["err_rx"] = net[nic].errin net_dict[nic]["err_tx"] = net[nic].errout net_dict[nic]["drop_rx"] = net[nic].dropin net_dict[nic]["drop_tx"] = net[nic].dropout net_dict[nic]["packet_tx"] = net[nic].packets_sent net_dict[nic]["packet_rx"] = net[nic].packets_recv return net_dict def getIP(): addresses = psutil.net_if_addrs() addr = {} for interface in addresses: for type in addresses[interface]: if type.family == socket.AF_INET: addr[interface] = type.address return addr def users(): result = subprocess.run(['w'], stdout=subprocess.PIPE) result = result.stdout.decode('utf-8').split('\n') if "FROM" not in result[1]: result = subprocess.run(['w', '-f'], stdout=subprocess.PIPE) result = result.stdout.decode('utf-8').split('\n') result.pop(-1) header = [] users = {} if len(result) > 2: for i in range(1,len(result)): a = result[i].split(" ") user = [] for item in a: if item != "": if i == 1: header.append(item) else: user.append(item) active_user = "" for item in range(0,len(header)): try: if "USER" in header[item]: if user[item] in users: users[user[item]]["sessions"] = users[user[item]]["sessions"] + 1 else: users[user[item]] = {} users[user[item]]["sessions"] = 1 users[user[item]]["session"] = {} active_user = user[item] if "FROM" in header[item]: users[active_user]["session"][users[active_user]["sessions"]] = {} if user[item].count(".") == 3: users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item] else: if user[item] == "-": users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item] else: users[active_user]["session"][users[active_user]["sessions"]]["from"] = user[item-1] except: pass return users if __name__ == "__main__": # print(getSysInfo()) print(getDisk()) # print(users()) # print(getCPU()) # print(getMemory()) # print(getZFS()) print(getPartitions()) # print(getUptime()) # print(getNetwork())