import logging import os.path import argparse from itertools import count from struct import unpack from time import sleep from warnings import warn from datetime import datetime logger = logging.getLogger("intel-master") class IntelPower: RAPL_MSR_POWER_UNIT = 0x606 RAPL_MSR_ENERGY = 0x611 RAPL_MSR_PP0_ENERGY = 0x639 def __init__(self): self._energy_unit = self._get_power_units() self._package_topology = self._detect_physical_package_topology() self._cores = list(self._package_topology.keys()) self._cores = sorted(self._cores) self.timestamp = 0 self.package_energy = -1 self.core_energy = -1 def _read_msr(self, cpu_id, offset): msr_file = "/dev/cpu/{}/msr".format(cpu_id) try: with open(msr_file, "rb", buffering=8192) as f: f.seek(offset) return self._decode_int64(f.read(8)) except PermissionError: raise PermissionError("root privilege is required to read model-specific registers") except FileNotFoundError: raise FileNotFoundError("msr driver is not loaded, try \"sudo modprobe msr\" to load msr module") @staticmethod def _decode_int64(buffer): return unpack("q", buffer)[0] @staticmethod def _detect_physical_package_topology(): cpu_package_mapping = {} for cpu_id in count(): filename = "/sys/devices/system/cpu/cpu{}/topology/physical_package_id".format(cpu_id) if os.path.isfile(filename): with open(filename, "r") as f: package_id = int(f.read()) logger.debug("detected cpu {} in socket {}".format(cpu_id, package_id)) cpu_package_mapping[cpu_id] = package_id else: return cpu_package_mapping def _get_power_units(self): power_unit = self._read_msr(0, self.RAPL_MSR_POWER_UNIT) raw_unit = (power_unit >> 8) & 0x1F; logger.debug("CPU energy unit is 1/2^{}".format(power_unit)) power_unit = 0.5 ** raw_unit return power_unit def _read_package_energy(self, cpu_id): energy = self._read_msr(cpu_id, self.RAPL_MSR_ENERGY) logger.debug("CPU {} current package energy {} J".format(cpu_id, energy, self._energy_unit)) return energy def _read_core_energy(self, cpu_id): energy = self._read_msr(cpu_id, self.RAPL_MSR_PP0_ENERGY) logger.debug("CPU {} current package energy {} J".format(cpu_id, energy, self._energy_unit)) return energy def _calc_power_wtime(self, before, after, duration): return (after - before) * self._energy_unit / duration def measure_nonblocking(self): timestamp = datetime.now() package_energy = {c: self._read_package_energy(c) for c in self._cores} core_energy = {c: self._read_core_energy(c) for c in self._cores} if self.package_energy != -1: time_delta = (timestamp - self.timestamp).total_seconds() package_power = {c: self._calc_power_wtime(self.package_energy[c], package_energy[c], time_delta) for c in self._cores} core_power = {c: self._calc_power_wtime(self.core_energy[c], core_energy[c], time_delta) for c in self._cores} else: for c in self._cores: package_power = {c: 0 for c in self._cores} core_power = {c: 0 for c in self._cores} self.timestamp = timestamp self.package_energy = package_energy self.core_energy = core_energy avg_pp = 0 avg_cp = 0 for c in self._cores: avg_pp += package_power[c] avg_cp += core_power[c] avg_pp = avg_pp / len(self._cores) avg_cp = avg_cp / len(self._cores) return avg_pp, avg_cp def read(self): return self._read_msr(1, self.RAPL_MSR_POWER_UNIT) if __name__ == "__main__": print(IntelPower().read()) print(IntelPower()._detect_physical_package_topology()) ip = IntelPower() pp, cp = ip.measure_nonblocking() print(pp, cp) sleep(1) pp, cp = ip.measure_nonblocking() print(pp, cp)