Files
ems/scripts/test_modbus_deye.py
Dusan Vojacek 920d9ff40c
All checks were successful
deploy / deploy (push) Successful in 35s
test / smoke-test (push) Successful in 7s
fix pv vyroby (unsinged)
2026-04-10 20:30:03 +02:00

70 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Rychlý test Modbus TCP na Deye přes Waveshare (registry dle aktuální mapy)."""
from __future__ import annotations
import asyncio
import sys
from pymodbus.client import AsyncModbusTcpClient
HOST = "172.16.1.10"
PORT = 502
UNIT_ID = 1 # dle DIP přepínače
# (adresa, typ, počet registrů, jednotka, popis)
REGISTERS: dict[str, tuple[int, str, int, str, str]] = {
"run_state": (500, "uint", 1, "", "enum provozní stav střídače (raw)"),
"battery_soc_%": (588, "uint", 1, "%", "SoC baterie"),
"battery_power_w": (590, "sint", 1, "W", "+ vybíjení / nabíjení"),
"batt_charge_today_wh": (514, "uint", 1, "Wh", "dnešní nabití baterie"),
"batt_discharge_today_wh": (515, "uint", 1, "Wh", "dnešní vybití baterie"),
"gen_port_power_w": (667, "sint", 1, "W", "GEN port FVE pole B (signed)"),
"grid_total_power_w": (625, "sint", 1, "W", "+ import ze sítě / export"),
"load_total_power_w": (653, "uint", 1, "W", "celková spotřeba"),
"pv1_power_w": (672, "sint", 1, "W", "výkon PV1 (signed)"),
"pv2_power_w": (673, "sint", 1, "W", "výkon PV2 (signed)"),
}
async def read_reg(client: AsyncModbusTcpClient, address: int, reg_type: str) -> int | None:
result = await client.read_holding_registers(
address, count=1, device_id=UNIT_ID
)
if result.isError():
return None
raw = int(result.registers[0])
if reg_type == "sint" and raw > 32767:
raw -= 65536
return raw
async def test_deye() -> None:
print(f"Připojuji se na {HOST}:{PORT} device_id={UNIT_ID}...")
client = AsyncModbusTcpClient(HOST, port=PORT)
try:
ok = await client.connect()
if not ok or not client.connected:
print("CHYBA: Nelze se připojit na Waveshare")
return
print("Připojeno OK\n")
print(f"{'Signál':<28} {'Hodnota':>10} {'Jedn.':<6} Reg(dec) Popis")
print("-" * 85)
for name, (addr, rtype, _count, unit, desc) in REGISTERS.items():
val = await read_reg(client, addr, rtype)
reg_h = f"{addr} (0x{addr:04X})"
if val is None:
print(f" {name:<26} {'CHYBA':>10} {'':6} {reg_h:<12} {desc}")
else:
u = unit or ""
print(f" {name:<26} {val:>10} {u:<6} {reg_h:<12} {desc}")
finally:
client.close()
if __name__ == "__main__":
try:
asyncio.run(test_deye())
except KeyboardInterrupt:
sys.exit(130)