"""Modbus command journal helpers pro control export.""" from __future__ import annotations import asyncio import json import logging from collections import defaultdict import asyncpg from services.control.deye_helpers import DEYE_REGISTER_NAMES, _deye_reg178_verify_match from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) async def _fetch_written_deye_clock_commands( site_id: int, asset_id: int, host: str, port: int, unit_id: int, db: asyncpg.Connection, ) -> list[asyncpg.Record]: """Všechny řádky journalu 62-64 ve stavu written pro daný invertor/endpoint.""" rows = await db.fetch( """ SELECT * FROM ems.modbus_command WHERE site_id = $1 AND asset_type = 'inverter' AND asset_id = $2 AND device_host = $3 AND device_port = $4 AND device_unit_id = $5 AND register IN (62, 63, 64) AND status = 'written' ORDER BY register """, site_id, asset_id, host, port, unit_id, ) return list(rows) async def _fetch_last_verified_registers( site_id: int, asset_id: int, db: asyncpg.Connection, *, asset_type: str = "inverter", ) -> dict[int, int]: """ Poslední hodnota na zařízení podle journalu (jen status verified). Slouží k přeskočení duplicitního zápisu stejné hodnoty. """ raw = await db.fetchval( """ select ems.fn_modbus_last_verified_map($1::int, $2::int, $3::text) """, site_id, asset_id, asset_type, ) data = raw if isinstance(raw, dict) else json.loads(raw) return {int(k): int(v) for k, v in data.items()} async def _fetch_device_state_registers( site_id: int, asset_id: int, db: asyncpg.Connection, *, asset_type: str, ) -> dict[int, int]: """ Poslední známá hodnota na zařízení podle journalu — NEJNOVĚJŠÍ řádek per registr, hodnota jen pro status 'verified' nebo 'written' (zápis prošel, verify ještě nemusel doběhnout). Novější failed/mismatch => registr chybí => volající zapíše znovu (obnova konfigurace po výpadku zařízení). Pro write-on-change u EV wallboxů (EEPROM wear): na rozdíl od _fetch_last_verified_registers nevyžaduje úspěšný verify, takže se zápis neopakuje každý export tick, když verify čtení zaostává nebo selhává. """ raw = await db.fetchval( """ select ems.fn_modbus_device_state_map($1::int, $2::int, $3::text) """, site_id, asset_id, asset_type, ) data = raw if isinstance(raw, dict) else json.loads(raw) return {int(k): int(v) for k, v in data.items()} async def _fetch_last_verified_inverter_registers( site_id: int, inverter_asset_id: int, db: asyncpg.Connection ) -> dict[int, int]: """Zpětně kompatibilní alias (Deye cesty).""" return await _fetch_last_verified_registers( site_id, inverter_asset_id, db, asset_type="inverter" ) def _drop_registers_matching_last_verified( registers: list[tuple[int, str, int]], last_verified: dict[int, int], ) -> tuple[list[tuple[int, str, int]], list[int]]: """Vynechá položky s hodnotou shodnou s posledním ověřeným stavem.""" out: list[tuple[int, str, int]] = [] skipped: list[int] = [] for reg, meta, val in registers: lv = last_verified.get(int(reg)) if lv is not None: if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)): skipped.append(int(reg)) continue if int(lv) == int(val): skipped.append(int(reg)) continue out.append((reg, meta, val)) return out, skipped async def create_modbus_commands( site_id: int, planning_run_id: int | None, asset_type: str, asset_id: int, asset_code: str, host: str, port: int, unit_id: int, registers: list[tuple[int, str, int]], db: asyncpg.Connection, deye_physical_mode: str | None = None, ) -> list[int]: """ Vytvoří záznamy v modbus_command pro sadu zápisů. Vrátí list command IDs. """ ids: list[int] = [] for reg, given_name, val in registers: # Deye registry mají kanonická jména; pro ostatní zařízení (Teltonika…) # platí jméno dodané volajícím. register_name = ( DEYE_REGISTER_NAMES.get(reg) if asset_type == "inverter" else None ) or given_name or f"reg_{reg}" cmd_id = await db.fetchval( """ INSERT INTO ems.modbus_command (site_id, asset_type, asset_id, asset_code, device_host, device_port, device_unit_id, register, register_name, value_to_write, planning_run_id, status, deye_physical_mode) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12) RETURNING id """, site_id, asset_type, asset_id, asset_code, host, port, unit_id, reg, register_name, val, planning_run_id, deye_physical_mode, ) if cmd_id is not None: ids.append(int(cmd_id)) return ids def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]: """Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3.""" if not cmds: return [] sorted_cmds = sorted(cmds, key=lambda c: int(c["register"])) runs: list[list[asyncpg.Record]] = [] cur: list[asyncpg.Record] = [sorted_cmds[0]] for c in sorted_cmds[1:]: if int(c["register"]) == int(cur[-1]["register"]) + 1: cur.append(c) else: runs.append(cur) cur = [c] runs.append(cur) return runs def _modbus_error_text(e: BaseException) -> str: """Text chyby pro error_msg — nikdy prázdný (TimeoutError() apod. má str '').""" return str(e).strip() or repr(e) async def _mark_commands_failed( db: asyncpg.Connection, cmd_ids: list[int], error_msg: str ) -> None: for cid in cmd_ids: await db.execute( """ UPDATE ems.modbus_command SET status='failed', error_msg=$1, attempt_count=attempt_count+1 WHERE id=$2 """, error_msg, cid, ) async def execute_modbus_commands( command_ids: list[int], db: asyncpg.Connection, ) -> bool: """ Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích). Aktualizuje status na 'written' nebo 'failed'. Invariant: žádný z předaných příkazů nesmí zůstat 'pending' — i při CancelledError / GatewayLockTimeout / chybě DB se zbylé řádky označí failed s neprázdným error_msg (safety net níže) a výjimka se propaguje. """ max_retries = 3 retry_delay = 0.5 rows: list[asyncpg.Record] = [] for cmd_id in command_ids: cmd = await db.fetchrow( "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id ) if cmd is not None: rows.append(cmd) if not rows: return True by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) for cmd in rows: by_gw[ (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) #: Ještě nerozhodnuté příkazy (pro safety net při výjimce mimo retry cyklus). unresolved: set[int] = {int(c["id"]) for c in rows} all_ok = True try: for (host, port, unit), group in by_gw.items(): client = await get_modbus_client(host, port) for run in _modbus_command_contiguous_runs(group): start_reg = int(run[0]["register"]) values = [int(c["value_to_write"]) for c in run] write_err: Exception | None = None attempts_used = 0 for attempt in range(max_retries): attempts_used = attempt + 1 try: await client.write_registers(start_reg, values, unit) write_err = None break except Exception as e: write_err = e if attempt < max_retries - 1: logger.warning( "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", start_reg, len(values), attempt + 1, _modbus_error_text(e), ) await asyncio.sleep(retry_delay) try: await client.force_disconnect() except Exception as de: logger.warning( "Modbus force_disconnect %s:%s failed: %s", host, port, _modbus_error_text(de), ) if write_err is not None: err = _modbus_error_text(write_err) await _mark_commands_failed(db, [int(c["id"]) for c in run], err) for c in run: unresolved.discard(int(c["id"])) logger.error( "Modbus batch 0x%04X count=%s all %s attempts failed: %s", start_reg, len(values), max_retries, err, ) all_ok = False continue # Journal update mimo retry cyklus — chyba DB nesmí vyvolat # další zápis do zařízení; spadne do safety netu níže. for cmd, val in zip(run, values): cid = int(cmd["id"]) await db.execute( """ UPDATE ems.modbus_command SET status='written', value_written=$1, written_at=now(), attempt_count=attempt_count+1, error_msg=NULL WHERE id=$2 """, val, cid, ) unresolved.discard(cid) logger.info( "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", cid, cmd["asset_code"], int(cmd["register"]), val, start_reg, attempts_used, ) except BaseException as e: # Safety net: CancelledError (shutdown / zrušený task), GatewayLockTimeout # propadlý mimo retry cyklus, chyba DB v success větvi, … — nic nesmí # zůstat 'pending'. Best effort: označit a výjimku propagovat dál. err = f"execute aborted: {_modbus_error_text(e)}" try: await _mark_commands_failed(db, sorted(unresolved), err) except Exception as me: logger.error( "Modbus journal: nelze označit %s příkazů failed (%s): %s", len(unresolved), err, _modbus_error_text(me), ) logger.error("execute_modbus_commands aborted: %s", err) raise return all_ok