Files
monitoring/intelPower.py
2026-04-27 11:35:05 +02:00

116 lines
4.1 KiB
Python

import logging
import os.path
import argparse
from itertools import count
from struct import unpack
from time import sleep
from warnings import warn
from datetime import datetime
logger = logging.getLogger("intel-master")
class IntelPower:
RAPL_MSR_POWER_UNIT = 0x606
RAPL_MSR_ENERGY = 0x611
RAPL_MSR_PP0_ENERGY = 0x639
def __init__(self):
self._energy_unit = self._get_power_units()
self._package_topology = self._detect_physical_package_topology()
self._cores = list(self._package_topology.keys())
self._cores = sorted(self._cores)
self.timestamp = 0
self.package_energy = -1
self.core_energy = -1
def _read_msr(self, cpu_id, offset):
msr_file = "/dev/cpu/{}/msr".format(cpu_id)
try:
with open(msr_file, "rb", buffering=8192) as f:
f.seek(offset)
return self._decode_int64(f.read(8))
except PermissionError:
raise PermissionError("root privilege is required to read model-specific registers")
except FileNotFoundError:
raise FileNotFoundError("msr driver is not loaded, try \"sudo modprobe msr\" to load msr module")
@staticmethod
def _decode_int64(buffer):
return unpack("q", buffer)[0]
@staticmethod
def _detect_physical_package_topology():
cpu_package_mapping = {}
for cpu_id in count():
filename = "/sys/devices/system/cpu/cpu{}/topology/physical_package_id".format(cpu_id)
if os.path.isfile(filename):
with open(filename, "r") as f:
package_id = int(f.read())
logger.debug("detected cpu {} in socket {}".format(cpu_id, package_id))
cpu_package_mapping[cpu_id] = package_id
else:
return cpu_package_mapping
def _get_power_units(self):
power_unit = self._read_msr(0, self.RAPL_MSR_POWER_UNIT)
raw_unit = (power_unit >> 8) & 0x1F;
logger.debug("CPU energy unit is 1/2^{}".format(power_unit))
power_unit = 0.5 ** raw_unit
return power_unit
def _read_package_energy(self, cpu_id):
energy = self._read_msr(cpu_id, self.RAPL_MSR_ENERGY)
logger.debug("CPU {} current package energy {} J".format(cpu_id, energy, self._energy_unit))
return energy
def _read_core_energy(self, cpu_id):
energy = self._read_msr(cpu_id, self.RAPL_MSR_PP0_ENERGY)
logger.debug("CPU {} current package energy {} J".format(cpu_id, energy, self._energy_unit))
return energy
def _calc_power_wtime(self, before, after, duration):
return (after - before) * self._energy_unit / duration
def measure_nonblocking(self):
timestamp = datetime.now()
package_energy = {c: self._read_package_energy(c) for c in self._cores}
core_energy = {c: self._read_core_energy(c) for c in self._cores}
if self.package_energy != -1:
time_delta = (timestamp - self.timestamp).total_seconds()
package_power = {c: self._calc_power_wtime(self.package_energy[c], package_energy[c], time_delta) for c in self._cores}
core_power = {c: self._calc_power_wtime(self.core_energy[c], core_energy[c], time_delta) for c in self._cores}
else:
for c in self._cores:
package_power = {c: 0 for c in self._cores}
core_power = {c: 0 for c in self._cores}
self.timestamp = timestamp
self.package_energy = package_energy
self.core_energy = core_energy
avg_pp = 0
avg_cp = 0
for c in self._cores:
avg_pp += package_power[c]
avg_cp += core_power[c]
avg_pp = avg_pp / len(self._cores)
avg_cp = avg_cp / len(self._cores)
return avg_pp, avg_cp
def read(self):
return self._read_msr(1, self.RAPL_MSR_POWER_UNIT)
if __name__ == "__main__":
print(IntelPower().read())
print(IntelPower()._detect_physical_package_topology())
ip = IntelPower()
pp, cp = ip.measure_nonblocking()
print(pp, cp)
sleep(1)
pp, cp = ip.measure_nonblocking()
print(pp, cp)