70 lines
2.4 KiB
Python
70 lines
2.4 KiB
Python
"""Rychlý test Modbus TCP na Deye přes Waveshare (registry dle aktuální mapy)."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import sys
|
||
|
||
from pymodbus.client import AsyncModbusTcpClient
|
||
|
||
HOST = "172.16.1.10"
|
||
PORT = 502
|
||
UNIT_ID = 1 # dle DIP přepínače
|
||
|
||
# (adresa, typ, počet registrů, jednotka, popis)
|
||
REGISTERS: dict[str, tuple[int, str, int, str, str]] = {
|
||
"run_state": (500, "uint", 1, "", "enum provozní stav střídače (raw)"),
|
||
"battery_soc_%": (588, "uint", 1, "%", "SoC baterie"),
|
||
"battery_power_w": (590, "sint", 1, "W", "+ vybíjení / − nabíjení"),
|
||
"batt_charge_today_wh": (514, "uint", 1, "Wh", "dnešní nabití baterie"),
|
||
"batt_discharge_today_wh": (515, "uint", 1, "Wh", "dnešní vybití baterie"),
|
||
"gen_port_power_w": (667, "sint", 1, "W", "GEN port – FVE pole B (signed)"),
|
||
"grid_total_power_w": (625, "sint", 1, "W", "+ import ze sítě / − export"),
|
||
"load_total_power_w": (653, "uint", 1, "W", "celková spotřeba"),
|
||
"pv1_power_w": (672, "sint", 1, "W", "výkon PV1 (signed)"),
|
||
"pv2_power_w": (673, "sint", 1, "W", "výkon PV2 (signed)"),
|
||
}
|
||
|
||
|
||
async def read_reg(client: AsyncModbusTcpClient, address: int, reg_type: str) -> int | None:
|
||
result = await client.read_holding_registers(
|
||
address, count=1, device_id=UNIT_ID
|
||
)
|
||
if result.isError():
|
||
return None
|
||
raw = int(result.registers[0])
|
||
if reg_type == "sint" and raw > 32767:
|
||
raw -= 65536
|
||
return raw
|
||
|
||
|
||
async def test_deye() -> None:
|
||
print(f"Připojuji se na {HOST}:{PORT} device_id={UNIT_ID}...")
|
||
client = AsyncModbusTcpClient(HOST, port=PORT)
|
||
try:
|
||
ok = await client.connect()
|
||
if not ok or not client.connected:
|
||
print("CHYBA: Nelze se připojit na Waveshare")
|
||
return
|
||
|
||
print("Připojeno OK\n")
|
||
print(f"{'Signál':<28} {'Hodnota':>10} {'Jedn.':<6} Reg(dec) Popis")
|
||
print("-" * 85)
|
||
for name, (addr, rtype, _count, unit, desc) in REGISTERS.items():
|
||
val = await read_reg(client, addr, rtype)
|
||
reg_h = f"{addr} (0x{addr:04X})"
|
||
if val is None:
|
||
print(f" {name:<26} {'CHYBA':>10} {'':6} {reg_h:<12} {desc}")
|
||
else:
|
||
u = unit or "—"
|
||
print(f" {name:<26} {val:>10} {u:<6} {reg_h:<12} {desc}")
|
||
finally:
|
||
client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(test_deye())
|
||
except KeyboardInterrupt:
|
||
sys.exit(130)
|