diff --git a/backend/services/control/exporter_monolith.py b/backend/services/control/exporter_monolith.py index bff405e..d8c3a61 100644 --- a/backend/services/control/exporter_monolith.py +++ b/backend/services/control/exporter_monolith.py @@ -3,9 +3,8 @@ from __future__ import annotations import logging -from collections import defaultdict -from typing import Any from datetime import datetime, timezone +from typing import Any import asyncpg @@ -47,8 +46,6 @@ from services.control.models import ControlSetpoints, InverterConfig, OperatingM from services.control.modbus_journal import ( _drop_registers_matching_last_verified, _fetch_last_verified_inverter_registers, - _fetch_written_deye_clock_commands, - _modbus_command_contiguous_runs, create_modbus_commands, execute_modbus_commands, ) @@ -80,469 +77,19 @@ from services.control.setpoints import ( _deye_zero_export_amps_for_passive, get_deye_mode, ) +from services.control.verify import ( + _deye_expected_clock_triplet_for_verify, + _modbus_cmd_register, + _switch_to_self_sustain, + _verify_deye_clock_written_bundle, + verify_modbus_commands, +) from services.modbus_client import get_modbus_client from services.signal_service import enqueue_site_signals logger = logging.getLogger(__name__) -async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: - """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" - from services.notification_service import run_fn_set_mode_with_discord - - await run_fn_set_mode_with_discord( - db, - site_id, - "SELF_SUSTAIN", - "system:mismatch", - None, - reason, - ) - logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason) - - -def _modbus_cmd_register(cmd: Any) -> int: - """asyncpg.Record má __getitem__; objekty s atributem .register též (testy).""" - try: - return int(cmd["register"]) - except (KeyError, TypeError): - return int(cmd.register) - - -def _deye_expected_clock_triplet_for_verify( - bundle: list[asyncpg.Record], - last_verified: dict[int, int], - a62: int, - a63: int, - a64: int, -) -> tuple[int, int, int]: - """ - Sestaví očekávané (w62,w63,w64) pro toleranční verify. - Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení - (aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64). - """ - by_reg = {_modbus_cmd_register(c): c for c in bundle} - def _vtw(c: Any) -> int: - try: - return int(c["value_to_write"]) - except (KeyError, TypeError): - return int(c.value_to_write) - - w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62) - w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63) - w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64) - return (int(w62), int(w63), int(w64)) - - -async def _verify_deye_clock_written_bundle( - site_id: int, - bundle: list[asyncpg.Record], - a62: int, - a63: int, - a64: int, - db: asyncpg.Connection, -) -> bool: - """ - Toleranční ověření pro jeden až tři řádky journalu 62–64 ve stavu written. - Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech. - """ - from services.notification_service import ( - notify_modbus_clock_verify_exhausted, - notify_modbus_mismatch, - ) - - cmds_s = sorted(bundle, key=_modbus_cmd_register) - try: - asset_id = int(cmds_s[0]["asset_id"]) - except (KeyError, TypeError): - asset_id = int(cmds_s[0].asset_id) - last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db) - w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64) - clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64) - actual_by_reg = {62: a62, 63: a63, 64: a64} - - for cmd in cmds_s: - try: - cid = int(cmd["id"]) - except (KeyError, TypeError): - cid = int(cmd.id) - r = _modbus_cmd_register(cmd) - await db.execute( - """ - UPDATE ems.modbus_command - SET value_verified=$1::int, verified_at=now(), - status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END - WHERE id=$3::int - """, - actual_by_reg[r], - clock_ok, - cid, - ) - - if clock_ok: - await db.execute( - """ - UPDATE ems.asset_inverter - SET deye_last_system_time_sync_minute = $1, - deye_last_system_time_sync_at = now() - WHERE id = $2 - """, - _prague_minute_start_utc(), - asset_id, - ) - for cmd in cmds_s: - try: - cid_l = int(cmd["id"]) - except (KeyError, TypeError): - cid_l = int(cmd.id) - try: - code_l = str(cmd["asset_code"]) - except (KeyError, TypeError): - code_l = str(cmd.asset_code) - rr = _modbus_cmd_register(cmd) - logger.info( - "[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s", - cid_l, - code_l, - rr, - actual_by_reg[rr], - ) - return True - - cmd0 = cmds_s[0] - try: - ac0 = str(cmd0["asset_code"]) - except (KeyError, TypeError): - ac0 = str(cmd0.asset_code) - logger.error( - "[cmd clock] MISMATCH %s 62–64: written=(%s,%s,%s) actual=(%s,%s,%s)", - ac0, - w62, - w63, - w64, - a62, - a63, - a64, - ) - - attempts = 0 - for cmd in cmds_s: - try: - cid_q = int(cmd["id"]) - except (KeyError, TypeError): - cid_q = int(cmd.id) - row_ac = await db.fetchrow( - "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q - ) - ac = int(row_ac["attempt_count"] or 0) if row_ac else 0 - attempts = max(attempts, ac) - - await notify_modbus_mismatch( - db, - site_id, - ac0, - 62, - "system_time_62_64", - w62, - a62, - attempts, - ) - - ids_ordered = [] - for c in cmds_s: - try: - ids_ordered.append(int(c["id"])) - except (KeyError, TypeError): - ids_ordered.append(int(c.id)) - if attempts < 3: - for cid in ids_ordered: - await db.execute( - "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", - cid, - ) - await execute_modbus_commands(ids_ordered, db) - await verify_modbus_commands(ids_ordered, db, site_id) - else: - logger.critical( - "[cmd clock] 3 failed verify attempts (62–64); režim se nemění automaticky" - ) - site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id) - await notify_modbus_clock_verify_exhausted( - db, - site_id, - site["code"] if site else str(site_id), - ac0, - (w62, w63, w64), - (a62, a63, a64), - ) - return False - - -async def verify_modbus_commands( - command_ids: list[int], - db: asyncpg.Connection, - site_id: int, -) -> bool: - """ - Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write. - Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145) - → SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění. - """ - from services.notification_service import notify_modbus_mismatch - - inv_cfg = await _load_inverter_config(site_id, db) - - async def _apply_verify_result( - cmd: asyncpg.Record, - actual_i: int, - *, - client: Any, - unit: int, - ) -> bool: - """Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN).""" - reg = int(cmd["register"]) - cmd_id = int(cmd["id"]) - - if reg in DEYE_CLOCK_REGS: - asset_id = int(cmd["asset_id"]) - host = str(cmd["device_host"]) - port_i = int(cmd["device_port"]) - uid = int(cmd["device_unit_id"]) - bundle = await _fetch_written_deye_clock_commands( - site_id, asset_id, host, port_i, uid, db - ) - if not bundle: - bundle = [cmd] - try: - cvals = await client.read_holding_registers(62, 3, uid) - except Exception as e: - logger.error( - "verify clock guard read 62–64 failed (reg 0x%04X): %s", reg, e - ) - return False - if len(cvals) != 3: - logger.error( - "verify clock guard: expected 3 regs, got %s", len(cvals) - ) - return False - logger.warning( - "Clock register 0x%04X reached strict verify path; using tolerant 62–64 bundle", - reg, - ) - return await _verify_deye_clock_written_bundle( - site_id, - bundle, - int(cvals[0]), - int(cvals[1]), - int(cvals[2]), - db, - ) - - expected_i = int(cmd["value_to_write"]) - matches = actual_i == expected_i - if reg == 178: - first_178 = int(actual_i) - second_178: int | None = None - if not _deye_reg178_verify_match(expected_i, first_178): - try: - r178 = await client.read_holding_registers(178, 1, unit) - if r178 and len(r178) >= 1: - second_178 = int(r178[0]) - except Exception as e: - logger.warning( - "[cmd %s] reg178 double-read failed: %s", cmd_id, e - ) - matches, actual_i = _deye_reg178_verify_with_double_read( - expected_i, first_178, second_178 - ) - if ( - matches - and second_178 is not None - and not _deye_reg178_verify_match(expected_i, first_178) - ): - logger.info( - "[cmd %s] reg178 double-read recovered: first=%s second=%s", - cmd_id, - first_178, - second_178, - ) - if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None: - matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg) - - await db.execute( - """ - UPDATE ems.modbus_command - SET value_verified=$1::int, verified_at=now(), - status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END - WHERE id=$3::int - """, - actual_i, - matches, - cmd_id, - ) - - if not matches: - logger.error( - "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s", - cmd_id, - cmd["asset_code"], - reg, - expected_i, - actual_i, - ( - " (reg178 mask 0x%04X)" % REG178_VERIFY_MASK - if reg == 178 - else "" - ), - ) - row_ac = await db.fetchrow( - "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id - ) - attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0 - await notify_modbus_mismatch( - db, - site_id, - cmd["asset_code"], - reg, - cmd["register_name"] or "", - expected_i, - actual_i, - attempts, - ) - - if attempts < 3: - await db.execute( - "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", - cmd_id, - ) - await execute_modbus_commands([cmd_id], db) - await verify_modbus_commands([cmd_id], db, site_id) - else: - if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): - logger.critical( - "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", - cmd_id, - ) - await _switch_to_self_sustain( - site_id, - db, - reason=( - f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} " - f"reg 0x{reg:04X}" - ), - ) - else: - logger.warning( - "[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X " - "(no mode change): %s", - cmd_id, - reg, - cmd["asset_code"], - ) - return False - - if reg == 178 and actual_i != expected_i: - logger.info( - "[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s", - cmd_id, - cmd["asset_code"], - reg, - expected_i, - actual_i, - ) - else: - logger.info( - "[cmd %s] verified OK: %s 0x%04X=%s", - cmd_id, - cmd["asset_code"], - reg, - actual_i, - ) - return True - - cmds: list[asyncpg.Record] = [] - for cmd_id in command_ids: - cmd = await db.fetchrow( - "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id - ) - if cmd is not None and cmd["status"] == "written": - cmds.append(cmd) - - if not cmds: - return True - - by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) - for cmd in cmds: - by_gw[ - (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) - ].append(cmd) - - all_ok = True - for (host, port, unit), group in by_gw.items(): - client = await get_modbus_client(host, port) - clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS] - rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS] - - if clock_cmds: - asset_id = int(clock_cmds[0]["asset_id"]) - bundle = await _fetch_written_deye_clock_commands( - site_id, asset_id, host, port, unit, db - ) - if not bundle: - bundle = clock_cmds - try: - cvals = await client.read_holding_registers(62, 3, unit) - except Exception as e: - logger.error("verify clock read 62–64 failed: %s", e) - all_ok = False - else: - if len(cvals) != 3: - logger.error( - "verify clock read: expected 3 regs, got %s", len(cvals) - ) - all_ok = False - else: - matched = await _verify_deye_clock_written_bundle( - site_id, - bundle, - int(cvals[0]), - int(cvals[1]), - int(cvals[2]), - db, - ) - if not matched: - all_ok = False - - for run in _modbus_command_contiguous_runs(rest): - start_reg = int(run[0]["register"]) - n = len(run) - try: - values = await client.read_holding_registers(start_reg, n, unit) - except Exception as e: - logger.error( - "verify batch read 0x%04X count=%s failed: %s", start_reg, n, e - ) - all_ok = False - continue - if len(values) != n: - logger.error( - "verify read 0x%04X: expected %s regs, got %s", - start_reg, - n, - len(values), - ) - all_ok = False - continue - for cmd, actual in zip(run, values): - matched = await _apply_verify_result( - cmd, int(actual), client=client, unit=unit - ) - if not matched: - all_ok = False - - return all_ok - - async def write_inverter_setpoints( site_id: int, setpoints_now: ControlSetpoints, diff --git a/backend/services/control/verify.py b/backend/services/control/verify.py new file mode 100644 index 0000000..9591525 --- /dev/null +++ b/backend/services/control/verify.py @@ -0,0 +1,476 @@ +"""Modbus verify workflow pro control export.""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import Any + +import asyncpg + +from services.control.deye_helpers import ( + DEYE_CLOCK_REGS, + DEYE_TOU_POWER_REGS, + REG178_VERIFY_MASK, + _deye_clock_registers_verify_match, + _deye_reg178_verify_match, + _deye_reg178_verify_with_double_read, + _deye_tou_power_verify_match, + _prague_minute_start_utc, + deye_reg_triggers_self_sustain_after_verify_exhaust, +) +from services.control.modbus_journal import ( + _fetch_last_verified_inverter_registers, + _fetch_written_deye_clock_commands, + _modbus_command_contiguous_runs, + execute_modbus_commands, +) +from services.control.repository import _load_inverter_config +from services.modbus_client import get_modbus_client + +logger = logging.getLogger(__name__) + + +async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: + """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" + from services.notification_service import run_fn_set_mode_with_discord + + await run_fn_set_mode_with_discord( + db, + site_id, + "SELF_SUSTAIN", + "system:mismatch", + None, + reason, + ) + logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason) + + +def _modbus_cmd_register(cmd: Any) -> int: + """asyncpg.Record má __getitem__; objekty s atributem .register též (testy).""" + try: + return int(cmd["register"]) + except (KeyError, TypeError): + return int(cmd.register) + + +def _deye_expected_clock_triplet_for_verify( + bundle: list[asyncpg.Record], + last_verified: dict[int, int], + a62: int, + a63: int, + a64: int, +) -> tuple[int, int, int]: + """ + Sestaví očekávané (w62,w63,w64) pro toleranční verify. + Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení. + """ + by_reg = {_modbus_cmd_register(c): c for c in bundle} + + def _vtw(c: Any) -> int: + try: + return int(c["value_to_write"]) + except (KeyError, TypeError): + return int(c.value_to_write) + + w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62) + w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63) + w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64) + return (int(w62), int(w63), int(w64)) + + +async def _verify_deye_clock_written_bundle( + site_id: int, + bundle: list[asyncpg.Record], + a62: int, + a63: int, + a64: int, + db: asyncpg.Connection, +) -> bool: + """ + Toleranční ověření pro jeden až tři řádky journalu 62-64 ve stavu written. + Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech. + """ + from services.notification_service import ( + notify_modbus_clock_verify_exhausted, + notify_modbus_mismatch, + ) + + cmds_s = sorted(bundle, key=_modbus_cmd_register) + try: + asset_id = int(cmds_s[0]["asset_id"]) + except (KeyError, TypeError): + asset_id = int(cmds_s[0].asset_id) + last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db) + w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64) + clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64) + actual_by_reg = {62: a62, 63: a63, 64: a64} + + for cmd in cmds_s: + try: + cid = int(cmd["id"]) + except (KeyError, TypeError): + cid = int(cmd.id) + r = _modbus_cmd_register(cmd) + await db.execute( + """ + UPDATE ems.modbus_command + SET value_verified=$1::int, verified_at=now(), + status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END + WHERE id=$3::int + """, + actual_by_reg[r], + clock_ok, + cid, + ) + + if clock_ok: + await db.execute( + """ + UPDATE ems.asset_inverter + SET deye_last_system_time_sync_minute = $1, + deye_last_system_time_sync_at = now() + WHERE id = $2 + """, + _prague_minute_start_utc(), + asset_id, + ) + for cmd in cmds_s: + try: + cid_l = int(cmd["id"]) + except (KeyError, TypeError): + cid_l = int(cmd.id) + try: + code_l = str(cmd["asset_code"]) + except (KeyError, TypeError): + code_l = str(cmd.asset_code) + rr = _modbus_cmd_register(cmd) + logger.info( + "[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s", + cid_l, + code_l, + rr, + actual_by_reg[rr], + ) + return True + + cmd0 = cmds_s[0] + try: + ac0 = str(cmd0["asset_code"]) + except (KeyError, TypeError): + ac0 = str(cmd0.asset_code) + logger.error( + "[cmd clock] MISMATCH %s 62-64: written=(%s,%s,%s) actual=(%s,%s,%s)", + ac0, + w62, + w63, + w64, + a62, + a63, + a64, + ) + + attempts = 0 + for cmd in cmds_s: + try: + cid_q = int(cmd["id"]) + except (KeyError, TypeError): + cid_q = int(cmd.id) + row_ac = await db.fetchrow( + "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q + ) + ac = int(row_ac["attempt_count"] or 0) if row_ac else 0 + attempts = max(attempts, ac) + + await notify_modbus_mismatch( + db, + site_id, + ac0, + 62, + "system_time_62_64", + w62, + a62, + attempts, + ) + + ids_ordered = [] + for c in cmds_s: + try: + ids_ordered.append(int(c["id"])) + except (KeyError, TypeError): + ids_ordered.append(int(c.id)) + if attempts < 3: + for cid in ids_ordered: + await db.execute( + "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", + cid, + ) + await execute_modbus_commands(ids_ordered, db) + await verify_modbus_commands(ids_ordered, db, site_id) + else: + logger.critical( + "[cmd clock] 3 failed verify attempts (62-64); režim se nemění automaticky" + ) + site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id) + await notify_modbus_clock_verify_exhausted( + db, + site_id, + site["code"] if site else str(site_id), + ac0, + (w62, w63, w64), + (a62, a63, a64), + ) + return False + + +async def verify_modbus_commands( + command_ids: list[int], + db: asyncpg.Connection, + site_id: int, +) -> bool: + """ + Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write. + Při mismatch řeší retry a po vyčerpání kritických registrů SELF_SUSTAIN. + """ + from services.notification_service import notify_modbus_mismatch + + inv_cfg = await _load_inverter_config(site_id, db) + + async def _apply_verify_result( + cmd: asyncpg.Record, + actual_i: int, + *, + client: Any, + unit: int, + ) -> bool: + reg = int(cmd["register"]) + cmd_id = int(cmd["id"]) + + if reg in DEYE_CLOCK_REGS: + asset_id = int(cmd["asset_id"]) + host = str(cmd["device_host"]) + port_i = int(cmd["device_port"]) + uid = int(cmd["device_unit_id"]) + bundle = await _fetch_written_deye_clock_commands( + site_id, asset_id, host, port_i, uid, db + ) + if not bundle: + bundle = [cmd] + try: + cvals = await client.read_holding_registers(62, 3, uid) + except Exception as e: + logger.error( + "verify clock guard read 62-64 failed (reg 0x%04X): %s", reg, e + ) + return False + if len(cvals) != 3: + logger.error("verify clock guard: expected 3 regs, got %s", len(cvals)) + return False + logger.warning( + "Clock register 0x%04X reached strict verify path; using tolerant 62-64 bundle", + reg, + ) + return await _verify_deye_clock_written_bundle( + site_id, + bundle, + int(cvals[0]), + int(cvals[1]), + int(cvals[2]), + db, + ) + + expected_i = int(cmd["value_to_write"]) + matches = actual_i == expected_i + if reg == 178: + first_178 = int(actual_i) + second_178: int | None = None + if not _deye_reg178_verify_match(expected_i, first_178): + try: + r178 = await client.read_holding_registers(178, 1, unit) + if r178 and len(r178) >= 1: + second_178 = int(r178[0]) + except Exception as e: + logger.warning("[cmd %s] reg178 double-read failed: %s", cmd_id, e) + matches, actual_i = _deye_reg178_verify_with_double_read( + expected_i, first_178, second_178 + ) + if ( + matches + and second_178 is not None + and not _deye_reg178_verify_match(expected_i, first_178) + ): + logger.info( + "[cmd %s] reg178 double-read recovered: first=%s second=%s", + cmd_id, + first_178, + second_178, + ) + if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None: + matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg) + + await db.execute( + """ + UPDATE ems.modbus_command + SET value_verified=$1::int, verified_at=now(), + status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END + WHERE id=$3::int + """, + actual_i, + matches, + cmd_id, + ) + + if not matches: + logger.error( + "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s", + cmd_id, + cmd["asset_code"], + reg, + expected_i, + actual_i, + " (reg178 mask 0x%04X)" % REG178_VERIFY_MASK if reg == 178 else "", + ) + row_ac = await db.fetchrow( + "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id + ) + attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0 + await notify_modbus_mismatch( + db, + site_id, + cmd["asset_code"], + reg, + cmd["register_name"] or "", + expected_i, + actual_i, + attempts, + ) + + if attempts < 3: + await db.execute( + "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", + cmd_id, + ) + await execute_modbus_commands([cmd_id], db) + await verify_modbus_commands([cmd_id], db, site_id) + else: + if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): + logger.critical( + "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", + cmd_id, + ) + await _switch_to_self_sustain( + site_id, + db, + reason=( + f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} " + f"reg 0x{reg:04X}" + ), + ) + else: + logger.warning( + "[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X " + "(no mode change): %s", + cmd_id, + reg, + cmd["asset_code"], + ) + return False + + if reg == 178 and actual_i != expected_i: + logger.info( + "[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s", + cmd_id, + cmd["asset_code"], + reg, + expected_i, + actual_i, + ) + else: + logger.info( + "[cmd %s] verified OK: %s 0x%04X=%s", + cmd_id, + cmd["asset_code"], + reg, + actual_i, + ) + return True + + cmds: list[asyncpg.Record] = [] + for cmd_id in command_ids: + cmd = await db.fetchrow( + "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id + ) + if cmd is not None and cmd["status"] == "written": + cmds.append(cmd) + + if not cmds: + return True + + by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) + for cmd in cmds: + by_gw[ + (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) + ].append(cmd) + + all_ok = True + for (host, port, unit), group in by_gw.items(): + client = await get_modbus_client(host, port) + clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS] + rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS] + + if clock_cmds: + asset_id = int(clock_cmds[0]["asset_id"]) + bundle = await _fetch_written_deye_clock_commands( + site_id, asset_id, host, port, unit, db + ) + if not bundle: + bundle = clock_cmds + try: + cvals = await client.read_holding_registers(62, 3, unit) + except Exception as e: + logger.error("verify clock read 62-64 failed: %s", e) + all_ok = False + else: + if len(cvals) != 3: + logger.error("verify clock read: expected 3 regs, got %s", len(cvals)) + all_ok = False + else: + matched = await _verify_deye_clock_written_bundle( + site_id, + bundle, + int(cvals[0]), + int(cvals[1]), + int(cvals[2]), + db, + ) + if not matched: + all_ok = False + + for run in _modbus_command_contiguous_runs(rest): + start_reg = int(run[0]["register"]) + n = len(run) + try: + values = await client.read_holding_registers(start_reg, n, unit) + except Exception as e: + logger.error( + "verify batch read 0x%04X count=%s failed: %s", start_reg, n, e + ) + all_ok = False + continue + if len(values) != n: + logger.error( + "verify read 0x%04X: expected %s regs, got %s", + start_reg, + n, + len(values), + ) + all_ok = False + continue + for cmd, actual in zip(run, values): + matched = await _apply_verify_result( + cmd, int(actual), client=client, unit=unit + ) + if not matched: + all_ok = False + + return all_ok