"""Modbus verify workflow pro control export.""" from __future__ import annotations import logging from collections import defaultdict from typing import Any import asyncpg from services.control.deye_helpers import ( DEYE_CLOCK_REGS, DEYE_TOU_POWER_REGS, REG178_VERIFY_MASK, _deye_clock_registers_verify_match, _deye_reg178_verify_match, _deye_reg178_verify_with_double_read, _deye_tou_power_verify_match, _prague_minute_start_utc, deye_reg_triggers_self_sustain_after_verify_exhaust, ) from services.control.modbus_journal import ( _fetch_last_verified_inverter_registers, _fetch_written_deye_clock_commands, _modbus_command_contiguous_runs, execute_modbus_commands, ) from services.control.repository import _load_inverter_config from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" from services.notification_service import run_fn_set_mode_with_discord await run_fn_set_mode_with_discord( db, site_id, "SELF_SUSTAIN", "system:mismatch", None, reason, ) logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason) def _modbus_cmd_register(cmd: Any) -> int: """asyncpg.Record má __getitem__; objekty s atributem .register též (testy).""" try: return int(cmd["register"]) except (KeyError, TypeError): return int(cmd.register) def _deye_expected_clock_triplet_for_verify( bundle: list[asyncpg.Record], last_verified: dict[int, int], a62: int, a63: int, a64: int, ) -> tuple[int, int, int]: """ Sestaví očekávané (w62,w63,w64) pro toleranční verify. Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení. """ by_reg = {_modbus_cmd_register(c): c for c in bundle} def _vtw(c: Any) -> int: try: return int(c["value_to_write"]) except (KeyError, TypeError): return int(c.value_to_write) w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62) w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63) w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64) return (int(w62), int(w63), int(w64)) async def _verify_deye_clock_written_bundle( site_id: int, bundle: list[asyncpg.Record], a62: int, a63: int, a64: int, db: asyncpg.Connection, ) -> bool: """ Toleranční ověření pro jeden až tři řádky journalu 62-64 ve stavu written. Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech. """ from services.notification_service import ( notify_modbus_clock_verify_exhausted, notify_modbus_mismatch, ) cmds_s = sorted(bundle, key=_modbus_cmd_register) try: asset_id = int(cmds_s[0]["asset_id"]) except (KeyError, TypeError): asset_id = int(cmds_s[0].asset_id) last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db) w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64) clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64) actual_by_reg = {62: a62, 63: a63, 64: a64} for cmd in cmds_s: try: cid = int(cmd["id"]) except (KeyError, TypeError): cid = int(cmd.id) r = _modbus_cmd_register(cmd) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_by_reg[r], clock_ok, cid, ) if clock_ok: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_system_time_sync_minute = $1, deye_last_system_time_sync_at = now() WHERE id = $2 """, _prague_minute_start_utc(), asset_id, ) for cmd in cmds_s: try: cid_l = int(cmd["id"]) except (KeyError, TypeError): cid_l = int(cmd.id) try: code_l = str(cmd["asset_code"]) except (KeyError, TypeError): code_l = str(cmd.asset_code) rr = _modbus_cmd_register(cmd) logger.info( "[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s", cid_l, code_l, rr, actual_by_reg[rr], ) return True cmd0 = cmds_s[0] try: ac0 = str(cmd0["asset_code"]) except (KeyError, TypeError): ac0 = str(cmd0.asset_code) logger.error( "[cmd clock] MISMATCH %s 62-64: written=(%s,%s,%s) actual=(%s,%s,%s)", ac0, w62, w63, w64, a62, a63, a64, ) attempts = 0 for cmd in cmds_s: try: cid_q = int(cmd["id"]) except (KeyError, TypeError): cid_q = int(cmd.id) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q ) ac = int(row_ac["attempt_count"] or 0) if row_ac else 0 attempts = max(attempts, ac) await notify_modbus_mismatch( db, site_id, ac0, 62, "system_time_62_64", w62, a62, attempts, ) ids_ordered = [] for c in cmds_s: try: ids_ordered.append(int(c["id"])) except (KeyError, TypeError): ids_ordered.append(int(c.id)) if attempts < 3: for cid in ids_ordered: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cid, ) await execute_modbus_commands(ids_ordered, db) await verify_modbus_commands(ids_ordered, db, site_id) else: logger.critical( "[cmd clock] 3 failed verify attempts (62-64); režim se nemění automaticky" ) site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id) await notify_modbus_clock_verify_exhausted( db, site_id, site["code"] if site else str(site_id), ac0, (w62, w63, w64), (a62, a63, a64), ) return False async def verify_modbus_commands( command_ids: list[int], db: asyncpg.Connection, site_id: int, ) -> bool: """ Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write. Při mismatch řeší retry a po vyčerpání kritických registrů SELF_SUSTAIN. """ from services.notification_service import notify_modbus_mismatch inv_cfg = await _load_inverter_config(site_id, db) async def _apply_verify_result( cmd: asyncpg.Record, actual_i: int, *, client: Any, unit: int, ) -> bool: reg = int(cmd["register"]) cmd_id = int(cmd["id"]) if reg in DEYE_CLOCK_REGS: asset_id = int(cmd["asset_id"]) host = str(cmd["device_host"]) port_i = int(cmd["device_port"]) uid = int(cmd["device_unit_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port_i, uid, db ) if not bundle: bundle = [cmd] try: cvals = await client.read_holding_registers(62, 3, uid) except Exception as e: logger.error( "verify clock guard read 62-64 failed (reg 0x%04X): %s", reg, e ) return False if len(cvals) != 3: logger.error("verify clock guard: expected 3 regs, got %s", len(cvals)) return False logger.warning( "Clock register 0x%04X reached strict verify path; using tolerant 62-64 bundle", reg, ) return await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) expected_i = int(cmd["value_to_write"]) matches = actual_i == expected_i if reg == 178: first_178 = int(actual_i) second_178: int | None = None if not _deye_reg178_verify_match(expected_i, first_178): try: r178 = await client.read_holding_registers(178, 1, unit) if r178 and len(r178) >= 1: second_178 = int(r178[0]) except Exception as e: logger.warning("[cmd %s] reg178 double-read failed: %s", cmd_id, e) matches, actual_i = _deye_reg178_verify_with_double_read( expected_i, first_178, second_178 ) if ( matches and second_178 is not None and not _deye_reg178_verify_match(expected_i, first_178) ): logger.info( "[cmd %s] reg178 double-read recovered: first=%s second=%s", cmd_id, first_178, second_178, ) if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None: matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_i, matches, cmd_id, ) if not matches: logger.error( "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, " (reg178 mask 0x%04X)" % REG178_VERIFY_MASK if reg == 178 else "", ) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id ) attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0 await notify_modbus_mismatch( db, site_id, cmd["asset_code"], reg, cmd["register_name"] or "", expected_i, actual_i, attempts, ) if attempts < 3: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cmd_id, ) await execute_modbus_commands([cmd_id], db) await verify_modbus_commands([cmd_id], db, site_id) else: if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): logger.critical( "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", cmd_id, ) await _switch_to_self_sustain( site_id, db, reason=( f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} " f"reg 0x{reg:04X}" ), ) else: logger.warning( "[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X " "(no mode change): %s", cmd_id, reg, cmd["asset_code"], ) return False if reg == 178 and actual_i != expected_i: logger.info( "[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, ) else: logger.info( "[cmd %s] verified OK: %s 0x%04X=%s", cmd_id, cmd["asset_code"], reg, actual_i, ) return True cmds: list[asyncpg.Record] = [] for cmd_id in command_ids: cmd = await db.fetchrow( "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id ) if cmd is not None and cmd["status"] == "written": cmds.append(cmd) if not cmds: return True by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) for cmd in cmds: by_gw[ (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) all_ok = True for (host, port, unit), group in by_gw.items(): client = await get_modbus_client(host, port) clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS] rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS] if clock_cmds: asset_id = int(clock_cmds[0]["asset_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port, unit, db ) if not bundle: bundle = clock_cmds try: cvals = await client.read_holding_registers(62, 3, unit) except Exception as e: logger.error("verify clock read 62-64 failed: %s", e) all_ok = False else: if len(cvals) != 3: logger.error("verify clock read: expected 3 regs, got %s", len(cvals)) all_ok = False else: matched = await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) if not matched: all_ok = False for run in _modbus_command_contiguous_runs(rest): start_reg = int(run[0]["register"]) n = len(run) try: values = await client.read_holding_registers(start_reg, n, unit) except Exception as e: logger.error( "verify batch read 0x%04X count=%s failed: %s", start_reg, n, e ) all_ok = False continue if len(values) != n: logger.error( "verify read 0x%04X: expected %s regs, got %s", start_reg, n, len(values), ) all_ok = False continue for cmd, actual in zip(run, values): matched = await _apply_verify_result( cmd, int(actual), client=client, unit=unit ) if not matched: all_ok = False return all_ok