refactor modbus verify workflow

This commit is contained in:
Dusan Vojacek
2026-05-02 19:51:41 +02:00
parent 53288d130a
commit 44cd7f986a
2 changed files with 484 additions and 461 deletions

View File

@@ -3,9 +3,8 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from collections import defaultdict
from typing import Any
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any
import asyncpg import asyncpg
@@ -47,8 +46,6 @@ from services.control.models import ControlSetpoints, InverterConfig, OperatingM
from services.control.modbus_journal import ( from services.control.modbus_journal import (
_drop_registers_matching_last_verified, _drop_registers_matching_last_verified,
_fetch_last_verified_inverter_registers, _fetch_last_verified_inverter_registers,
_fetch_written_deye_clock_commands,
_modbus_command_contiguous_runs,
create_modbus_commands, create_modbus_commands,
execute_modbus_commands, execute_modbus_commands,
) )
@@ -80,469 +77,19 @@ from services.control.setpoints import (
_deye_zero_export_amps_for_passive, _deye_zero_export_amps_for_passive,
get_deye_mode, get_deye_mode,
) )
from services.control.verify import (
_deye_expected_clock_triplet_for_verify,
_modbus_cmd_register,
_switch_to_self_sustain,
_verify_deye_clock_written_bundle,
verify_modbus_commands,
)
from services.modbus_client import get_modbus_client from services.modbus_client import get_modbus_client
from services.signal_service import enqueue_site_signals from services.signal_service import enqueue_site_signals
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None:
"""Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord."""
from services.notification_service import run_fn_set_mode_with_discord
await run_fn_set_mode_with_discord(
db,
site_id,
"SELF_SUSTAIN",
"system:mismatch",
None,
reason,
)
logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason)
def _modbus_cmd_register(cmd: Any) -> int:
"""asyncpg.Record má __getitem__; objekty s atributem .register též (testy)."""
try:
return int(cmd["register"])
except (KeyError, TypeError):
return int(cmd.register)
def _deye_expected_clock_triplet_for_verify(
bundle: list[asyncpg.Record],
last_verified: dict[int, int],
a62: int,
a63: int,
a64: int,
) -> tuple[int, int, int]:
"""
Sestaví očekávané (w62,w63,w64) pro toleranční verify.
Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení
(aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64).
"""
by_reg = {_modbus_cmd_register(c): c for c in bundle}
def _vtw(c: Any) -> int:
try:
return int(c["value_to_write"])
except (KeyError, TypeError):
return int(c.value_to_write)
w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62)
w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63)
w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64)
return (int(w62), int(w63), int(w64))
async def _verify_deye_clock_written_bundle(
site_id: int,
bundle: list[asyncpg.Record],
a62: int,
a63: int,
a64: int,
db: asyncpg.Connection,
) -> bool:
"""
Toleranční ověření pro jeden až tři řádky journalu 6264 ve stavu written.
Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech.
"""
from services.notification_service import (
notify_modbus_clock_verify_exhausted,
notify_modbus_mismatch,
)
cmds_s = sorted(bundle, key=_modbus_cmd_register)
try:
asset_id = int(cmds_s[0]["asset_id"])
except (KeyError, TypeError):
asset_id = int(cmds_s[0].asset_id)
last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db)
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64)
clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64)
actual_by_reg = {62: a62, 63: a63, 64: a64}
for cmd in cmds_s:
try:
cid = int(cmd["id"])
except (KeyError, TypeError):
cid = int(cmd.id)
r = _modbus_cmd_register(cmd)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_by_reg[r],
clock_ok,
cid,
)
if clock_ok:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
asset_id,
)
for cmd in cmds_s:
try:
cid_l = int(cmd["id"])
except (KeyError, TypeError):
cid_l = int(cmd.id)
try:
code_l = str(cmd["asset_code"])
except (KeyError, TypeError):
code_l = str(cmd.asset_code)
rr = _modbus_cmd_register(cmd)
logger.info(
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
cid_l,
code_l,
rr,
actual_by_reg[rr],
)
return True
cmd0 = cmds_s[0]
try:
ac0 = str(cmd0["asset_code"])
except (KeyError, TypeError):
ac0 = str(cmd0.asset_code)
logger.error(
"[cmd clock] MISMATCH %s 6264: written=(%s,%s,%s) actual=(%s,%s,%s)",
ac0,
w62,
w63,
w64,
a62,
a63,
a64,
)
attempts = 0
for cmd in cmds_s:
try:
cid_q = int(cmd["id"])
except (KeyError, TypeError):
cid_q = int(cmd.id)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q
)
ac = int(row_ac["attempt_count"] or 0) if row_ac else 0
attempts = max(attempts, ac)
await notify_modbus_mismatch(
db,
site_id,
ac0,
62,
"system_time_62_64",
w62,
a62,
attempts,
)
ids_ordered = []
for c in cmds_s:
try:
ids_ordered.append(int(c["id"]))
except (KeyError, TypeError):
ids_ordered.append(int(c.id))
if attempts < 3:
for cid in ids_ordered:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cid,
)
await execute_modbus_commands(ids_ordered, db)
await verify_modbus_commands(ids_ordered, db, site_id)
else:
logger.critical(
"[cmd clock] 3 failed verify attempts (6264); režim se nemění automaticky"
)
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
await notify_modbus_clock_verify_exhausted(
db,
site_id,
site["code"] if site else str(site_id),
ac0,
(w62, w63, w64),
(a62, a63, a64),
)
return False
async def verify_modbus_commands(
command_ids: list[int],
db: asyncpg.Connection,
site_id: int,
) -> bool:
"""
Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write.
Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145)
→ SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění.
"""
from services.notification_service import notify_modbus_mismatch
inv_cfg = await _load_inverter_config(site_id, db)
async def _apply_verify_result(
cmd: asyncpg.Record,
actual_i: int,
*,
client: Any,
unit: int,
) -> bool:
"""Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN)."""
reg = int(cmd["register"])
cmd_id = int(cmd["id"])
if reg in DEYE_CLOCK_REGS:
asset_id = int(cmd["asset_id"])
host = str(cmd["device_host"])
port_i = int(cmd["device_port"])
uid = int(cmd["device_unit_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port_i, uid, db
)
if not bundle:
bundle = [cmd]
try:
cvals = await client.read_holding_registers(62, 3, uid)
except Exception as e:
logger.error(
"verify clock guard read 6264 failed (reg 0x%04X): %s", reg, e
)
return False
if len(cvals) != 3:
logger.error(
"verify clock guard: expected 3 regs, got %s", len(cvals)
)
return False
logger.warning(
"Clock register 0x%04X reached strict verify path; using tolerant 6264 bundle",
reg,
)
return await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
expected_i = int(cmd["value_to_write"])
matches = actual_i == expected_i
if reg == 178:
first_178 = int(actual_i)
second_178: int | None = None
if not _deye_reg178_verify_match(expected_i, first_178):
try:
r178 = await client.read_holding_registers(178, 1, unit)
if r178 and len(r178) >= 1:
second_178 = int(r178[0])
except Exception as e:
logger.warning(
"[cmd %s] reg178 double-read failed: %s", cmd_id, e
)
matches, actual_i = _deye_reg178_verify_with_double_read(
expected_i, first_178, second_178
)
if (
matches
and second_178 is not None
and not _deye_reg178_verify_match(expected_i, first_178)
):
logger.info(
"[cmd %s] reg178 double-read recovered: first=%s second=%s",
cmd_id,
first_178,
second_178,
)
if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None:
matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_i,
matches,
cmd_id,
)
if not matches:
logger.error(
"[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
(
" (reg178 mask 0x%04X)" % REG178_VERIFY_MASK
if reg == 178
else ""
),
)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id
)
attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0
await notify_modbus_mismatch(
db,
site_id,
cmd["asset_code"],
reg,
cmd["register_name"] or "",
expected_i,
actual_i,
attempts,
)
if attempts < 3:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cmd_id,
)
await execute_modbus_commands([cmd_id], db)
await verify_modbus_commands([cmd_id], db, site_id)
else:
if deye_reg_triggers_self_sustain_after_verify_exhaust(reg):
logger.critical(
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
cmd_id,
)
await _switch_to_self_sustain(
site_id,
db,
reason=(
f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} "
f"reg 0x{reg:04X}"
),
)
else:
logger.warning(
"[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X "
"(no mode change): %s",
cmd_id,
reg,
cmd["asset_code"],
)
return False
if reg == 178 and actual_i != expected_i:
logger.info(
"[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
)
else:
logger.info(
"[cmd %s] verified OK: %s 0x%04X=%s",
cmd_id,
cmd["asset_code"],
reg,
actual_i,
)
return True
cmds: list[asyncpg.Record] = []
for cmd_id in command_ids:
cmd = await db.fetchrow(
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
)
if cmd is not None and cmd["status"] == "written":
cmds.append(cmd)
if not cmds:
return True
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
for cmd in cmds:
by_gw[
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
].append(cmd)
all_ok = True
for (host, port, unit), group in by_gw.items():
client = await get_modbus_client(host, port)
clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS]
rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS]
if clock_cmds:
asset_id = int(clock_cmds[0]["asset_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port, unit, db
)
if not bundle:
bundle = clock_cmds
try:
cvals = await client.read_holding_registers(62, 3, unit)
except Exception as e:
logger.error("verify clock read 6264 failed: %s", e)
all_ok = False
else:
if len(cvals) != 3:
logger.error(
"verify clock read: expected 3 regs, got %s", len(cvals)
)
all_ok = False
else:
matched = await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
if not matched:
all_ok = False
for run in _modbus_command_contiguous_runs(rest):
start_reg = int(run[0]["register"])
n = len(run)
try:
values = await client.read_holding_registers(start_reg, n, unit)
except Exception as e:
logger.error(
"verify batch read 0x%04X count=%s failed: %s", start_reg, n, e
)
all_ok = False
continue
if len(values) != n:
logger.error(
"verify read 0x%04X: expected %s regs, got %s",
start_reg,
n,
len(values),
)
all_ok = False
continue
for cmd, actual in zip(run, values):
matched = await _apply_verify_result(
cmd, int(actual), client=client, unit=unit
)
if not matched:
all_ok = False
return all_ok
async def write_inverter_setpoints( async def write_inverter_setpoints(
site_id: int, site_id: int,
setpoints_now: ControlSetpoints, setpoints_now: ControlSetpoints,

View File

@@ -0,0 +1,476 @@
"""Modbus verify workflow pro control export."""
from __future__ import annotations
import logging
from collections import defaultdict
from typing import Any
import asyncpg
from services.control.deye_helpers import (
DEYE_CLOCK_REGS,
DEYE_TOU_POWER_REGS,
REG178_VERIFY_MASK,
_deye_clock_registers_verify_match,
_deye_reg178_verify_match,
_deye_reg178_verify_with_double_read,
_deye_tou_power_verify_match,
_prague_minute_start_utc,
deye_reg_triggers_self_sustain_after_verify_exhaust,
)
from services.control.modbus_journal import (
_fetch_last_verified_inverter_registers,
_fetch_written_deye_clock_commands,
_modbus_command_contiguous_runs,
execute_modbus_commands,
)
from services.control.repository import _load_inverter_config
from services.modbus_client import get_modbus_client
logger = logging.getLogger(__name__)
async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None:
"""Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord."""
from services.notification_service import run_fn_set_mode_with_discord
await run_fn_set_mode_with_discord(
db,
site_id,
"SELF_SUSTAIN",
"system:mismatch",
None,
reason,
)
logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason)
def _modbus_cmd_register(cmd: Any) -> int:
"""asyncpg.Record má __getitem__; objekty s atributem .register též (testy)."""
try:
return int(cmd["register"])
except (KeyError, TypeError):
return int(cmd.register)
def _deye_expected_clock_triplet_for_verify(
bundle: list[asyncpg.Record],
last_verified: dict[int, int],
a62: int,
a63: int,
a64: int,
) -> tuple[int, int, int]:
"""
Sestaví očekávané (w62,w63,w64) pro toleranční verify.
Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení.
"""
by_reg = {_modbus_cmd_register(c): c for c in bundle}
def _vtw(c: Any) -> int:
try:
return int(c["value_to_write"])
except (KeyError, TypeError):
return int(c.value_to_write)
w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62)
w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63)
w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64)
return (int(w62), int(w63), int(w64))
async def _verify_deye_clock_written_bundle(
site_id: int,
bundle: list[asyncpg.Record],
a62: int,
a63: int,
a64: int,
db: asyncpg.Connection,
) -> bool:
"""
Toleranční ověření pro jeden až tři řádky journalu 62-64 ve stavu written.
Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech.
"""
from services.notification_service import (
notify_modbus_clock_verify_exhausted,
notify_modbus_mismatch,
)
cmds_s = sorted(bundle, key=_modbus_cmd_register)
try:
asset_id = int(cmds_s[0]["asset_id"])
except (KeyError, TypeError):
asset_id = int(cmds_s[0].asset_id)
last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db)
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64)
clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64)
actual_by_reg = {62: a62, 63: a63, 64: a64}
for cmd in cmds_s:
try:
cid = int(cmd["id"])
except (KeyError, TypeError):
cid = int(cmd.id)
r = _modbus_cmd_register(cmd)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_by_reg[r],
clock_ok,
cid,
)
if clock_ok:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
asset_id,
)
for cmd in cmds_s:
try:
cid_l = int(cmd["id"])
except (KeyError, TypeError):
cid_l = int(cmd.id)
try:
code_l = str(cmd["asset_code"])
except (KeyError, TypeError):
code_l = str(cmd.asset_code)
rr = _modbus_cmd_register(cmd)
logger.info(
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
cid_l,
code_l,
rr,
actual_by_reg[rr],
)
return True
cmd0 = cmds_s[0]
try:
ac0 = str(cmd0["asset_code"])
except (KeyError, TypeError):
ac0 = str(cmd0.asset_code)
logger.error(
"[cmd clock] MISMATCH %s 62-64: written=(%s,%s,%s) actual=(%s,%s,%s)",
ac0,
w62,
w63,
w64,
a62,
a63,
a64,
)
attempts = 0
for cmd in cmds_s:
try:
cid_q = int(cmd["id"])
except (KeyError, TypeError):
cid_q = int(cmd.id)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q
)
ac = int(row_ac["attempt_count"] or 0) if row_ac else 0
attempts = max(attempts, ac)
await notify_modbus_mismatch(
db,
site_id,
ac0,
62,
"system_time_62_64",
w62,
a62,
attempts,
)
ids_ordered = []
for c in cmds_s:
try:
ids_ordered.append(int(c["id"]))
except (KeyError, TypeError):
ids_ordered.append(int(c.id))
if attempts < 3:
for cid in ids_ordered:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cid,
)
await execute_modbus_commands(ids_ordered, db)
await verify_modbus_commands(ids_ordered, db, site_id)
else:
logger.critical(
"[cmd clock] 3 failed verify attempts (62-64); režim se nemění automaticky"
)
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
await notify_modbus_clock_verify_exhausted(
db,
site_id,
site["code"] if site else str(site_id),
ac0,
(w62, w63, w64),
(a62, a63, a64),
)
return False
async def verify_modbus_commands(
command_ids: list[int],
db: asyncpg.Connection,
site_id: int,
) -> bool:
"""
Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write.
Při mismatch řeší retry a po vyčerpání kritických registrů SELF_SUSTAIN.
"""
from services.notification_service import notify_modbus_mismatch
inv_cfg = await _load_inverter_config(site_id, db)
async def _apply_verify_result(
cmd: asyncpg.Record,
actual_i: int,
*,
client: Any,
unit: int,
) -> bool:
reg = int(cmd["register"])
cmd_id = int(cmd["id"])
if reg in DEYE_CLOCK_REGS:
asset_id = int(cmd["asset_id"])
host = str(cmd["device_host"])
port_i = int(cmd["device_port"])
uid = int(cmd["device_unit_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port_i, uid, db
)
if not bundle:
bundle = [cmd]
try:
cvals = await client.read_holding_registers(62, 3, uid)
except Exception as e:
logger.error(
"verify clock guard read 62-64 failed (reg 0x%04X): %s", reg, e
)
return False
if len(cvals) != 3:
logger.error("verify clock guard: expected 3 regs, got %s", len(cvals))
return False
logger.warning(
"Clock register 0x%04X reached strict verify path; using tolerant 62-64 bundle",
reg,
)
return await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
expected_i = int(cmd["value_to_write"])
matches = actual_i == expected_i
if reg == 178:
first_178 = int(actual_i)
second_178: int | None = None
if not _deye_reg178_verify_match(expected_i, first_178):
try:
r178 = await client.read_holding_registers(178, 1, unit)
if r178 and len(r178) >= 1:
second_178 = int(r178[0])
except Exception as e:
logger.warning("[cmd %s] reg178 double-read failed: %s", cmd_id, e)
matches, actual_i = _deye_reg178_verify_with_double_read(
expected_i, first_178, second_178
)
if (
matches
and second_178 is not None
and not _deye_reg178_verify_match(expected_i, first_178)
):
logger.info(
"[cmd %s] reg178 double-read recovered: first=%s second=%s",
cmd_id,
first_178,
second_178,
)
if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None:
matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_i,
matches,
cmd_id,
)
if not matches:
logger.error(
"[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
" (reg178 mask 0x%04X)" % REG178_VERIFY_MASK if reg == 178 else "",
)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id
)
attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0
await notify_modbus_mismatch(
db,
site_id,
cmd["asset_code"],
reg,
cmd["register_name"] or "",
expected_i,
actual_i,
attempts,
)
if attempts < 3:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cmd_id,
)
await execute_modbus_commands([cmd_id], db)
await verify_modbus_commands([cmd_id], db, site_id)
else:
if deye_reg_triggers_self_sustain_after_verify_exhaust(reg):
logger.critical(
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
cmd_id,
)
await _switch_to_self_sustain(
site_id,
db,
reason=(
f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} "
f"reg 0x{reg:04X}"
),
)
else:
logger.warning(
"[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X "
"(no mode change): %s",
cmd_id,
reg,
cmd["asset_code"],
)
return False
if reg == 178 and actual_i != expected_i:
logger.info(
"[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
)
else:
logger.info(
"[cmd %s] verified OK: %s 0x%04X=%s",
cmd_id,
cmd["asset_code"],
reg,
actual_i,
)
return True
cmds: list[asyncpg.Record] = []
for cmd_id in command_ids:
cmd = await db.fetchrow(
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
)
if cmd is not None and cmd["status"] == "written":
cmds.append(cmd)
if not cmds:
return True
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
for cmd in cmds:
by_gw[
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
].append(cmd)
all_ok = True
for (host, port, unit), group in by_gw.items():
client = await get_modbus_client(host, port)
clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS]
rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS]
if clock_cmds:
asset_id = int(clock_cmds[0]["asset_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port, unit, db
)
if not bundle:
bundle = clock_cmds
try:
cvals = await client.read_holding_registers(62, 3, unit)
except Exception as e:
logger.error("verify clock read 62-64 failed: %s", e)
all_ok = False
else:
if len(cvals) != 3:
logger.error("verify clock read: expected 3 regs, got %s", len(cvals))
all_ok = False
else:
matched = await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
if not matched:
all_ok = False
for run in _modbus_command_contiguous_runs(rest):
start_reg = int(run[0]["register"])
n = len(run)
try:
values = await client.read_holding_registers(start_reg, n, unit)
except Exception as e:
logger.error(
"verify batch read 0x%04X count=%s failed: %s", start_reg, n, e
)
all_ok = False
continue
if len(values) != n:
logger.error(
"verify read 0x%04X: expected %s regs, got %s",
start_reg,
n,
len(values),
)
all_ok = False
continue
for cmd, actual in zip(run, values):
matched = await _apply_verify_result(
cmd, int(actual), client=client, unit=unit
)
if not matched:
all_ok = False
return all_ok