2024-07-07 10:08:19 +02:00

67 lines
2.4 KiB
Python

"""Simple test script to scan inverter present on local network"""
import asyncio
import binascii
import logging
import sys
import goodwe
from goodwe.exceptions import InverterError
from goodwe.protocol import ProtocolCommand
logging.basicConfig(
format="%(asctime)-15s %(funcName)s(%(lineno)d) - %(levelname)s: %(message)s",
stream=sys.stderr,
level=getattr(logging, "INFO", None),
)
def try_command(command, ip):
print(f"Trying command: {command}")
try:
response = asyncio.run(ProtocolCommand(bytes.fromhex(command), lambda x: True).execute(result[0], timeout=2, retries=0))
print(f"Response to {command} command: {response.hex()}")
except InverterError as err:
print(f"No response to {command} command")
def omnik_command(logger_sn):
# frame = (headCode) + (dataFieldLength) + (contrlCode) + (sn) + (sn) + (command) + (checksum) + (endCode)
frame_hdr = binascii.unhexlify('680241b1') # from SolarMan / new Omnik app
command = binascii.unhexlify('0100')
defchk = binascii.unhexlify('87')
endCode = binascii.unhexlify('16')
# tar = bytearray.fromhex(hex(logger_sn)[8:10] + hex(logger_sn)[6:8] + hex(logger_sn)[4:6] + hex(logger_sn)[2:4])
# frame = bytearray(frame_hdr + tar + tar + command + defchk + endCode)
frame = bytearray(frame_hdr + binascii.unhexlify(logger_sn) + command + defchk + endCode)
checksum = 0
frame_bytes = bytearray(frame)
for i in range(1, len(frame_bytes) - 2, 1):
checksum += frame_bytes[i] & 255
frame_bytes[len(frame_bytes) - 2] = int((checksum & 255))
return frame_bytes.hex()
result = asyncio.run(goodwe.search_inverters()).decode("utf-8").split(",")
print(f"Located inverter at IP: {result[0]}, mac: {result[1]}, name: {result[2]}")
# EM/ES
try_command("AA55C07F0102000241", result[0])
# DT (SolarGo)
try_command("7F03753100280409", result[0])
# Omnik v5 ?
try_command("197d0001000dff045e50303036564657f6e60d", result[0])
# Omnik 4 ?
sn = bytes(result[2][10:], 'utf-8').hex()
try_command(omnik_command(sn), result[0])
# Omnik 4 reversed ?
sn = "".join(reversed([sn[i:i + 2] for i in range(0, len(sn), 2)]))
try_command(omnik_command(sn), result[0])
print(f"\n\nIdentifying inverter at IP: {result[0]}, name: {result[2]} mac: {result[1]}")
inverter = asyncio.run(goodwe.discover(result[0]))
print(
f"Identified inverter model: {inverter.model_name}, serialNr: {inverter.serial_number}"
)