diff --git a/backend/services/control/exporter_monolith.py b/backend/services/control/exporter_monolith.py index 41ed1c4..fe379d0 100644 --- a/backend/services/control/exporter_monolith.py +++ b/backend/services/control/exporter_monolith.py @@ -2,8 +2,6 @@ from __future__ import annotations -import asyncio -import json import logging import os from collections import defaultdict @@ -21,7 +19,7 @@ from services.control.deye_helpers import ( DEYE_CLOCK_RESYNC_INTERVAL_HOURS, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC, # noqa: F401 - re-export for compatibility DEYE_CRITICAL_REGS_SELF_SUSTAIN, # noqa: F401 - re-export for compatibility - DEYE_REGISTER_NAMES, + DEYE_REGISTER_NAMES, # noqa: F401 - re-export for compatibility DEYE_TOU_INACTIVE_HHMM, DEYE_TOU_POWER_REGS, PRAGUE_TZ, @@ -49,6 +47,14 @@ from services.control.deye_helpers import ( watts_to_amps, ) from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo +from services.control.modbus_journal import ( + _drop_registers_matching_last_verified, + _fetch_last_verified_inverter_registers, + _fetch_written_deye_clock_commands, + _modbus_command_contiguous_runs, + create_modbus_commands, + execute_modbus_commands, +) from services.control.repository import ( _fetch_max_charge_power_w, _fetch_operating_mode, @@ -77,238 +83,6 @@ from services.signal_service import enqueue_site_signals logger = logging.getLogger(__name__) -async def _fetch_written_deye_clock_commands( - site_id: int, - asset_id: int, - host: str, - port: int, - unit_id: int, - db: asyncpg.Connection, -) -> list[asyncpg.Record]: - """Všechny řádky journalu 62–64 ve stavu written pro daný invertor/endpoint.""" - rows = await db.fetch( - """ - SELECT * FROM ems.modbus_command - WHERE site_id = $1 - AND asset_type = 'inverter' - AND asset_id = $2 - AND device_host = $3 - AND device_port = $4 - AND device_unit_id = $5 - AND register IN (62, 63, 64) - AND status = 'written' - ORDER BY register - """, - site_id, - asset_id, - host, - port, - unit_id, - ) - return list(rows) - - -async def _fetch_last_verified_inverter_registers( - site_id: int, inverter_asset_id: int, db: asyncpg.Connection -) -> dict[int, int]: - """ - Poslední hodnota na zařízení podle journalu (jen status verified). - Slouží k přeskočení duplicitního zápisu stejné hodnoty. - """ - raw = await db.fetchval( - """ - select ems.fn_modbus_last_verified_map($1::int, $2::int) - """, - site_id, - inverter_asset_id, - ) - data = raw if isinstance(raw, dict) else json.loads(raw) - return {int(k): int(v) for k, v in data.items()} - - -def _drop_registers_matching_last_verified( - registers: list[tuple[int, str, int]], - last_verified: dict[int, int], -) -> tuple[list[tuple[int, str, int]], list[int]]: - """Vynechá položky s hodnotou shodnou s posledním ověřeným stavem; vrátí (nový seznam, vynechané reg).""" - out: list[tuple[int, str, int]] = [] - skipped: list[int] = [] - for reg, meta, val in registers: - lv = last_verified.get(int(reg)) - if lv is not None: - # reg178: porovnáváme jen masku bitů 4–5 (Deye si v dalších bitech drží vlastní stav). - if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)): - skipped.append(int(reg)) - continue - if int(lv) == int(val): - skipped.append(int(reg)) - continue - out.append((reg, meta, val)) - return out, skipped - - -async def create_modbus_commands( - site_id: int, - planning_run_id: int | None, - asset_type: str, - asset_id: int, - asset_code: str, - host: str, - port: int, - unit_id: int, - registers: list[tuple[int, str, int]], - db: asyncpg.Connection, - deye_physical_mode: str | None = None, -) -> list[int]: - """ - Vytvoří záznamy v modbus_command pro sadu zápisů. - Vrátí list command IDs. - Pro Deye se jméno registru bere z DEYE_REGISTER_NAMES (prostřední položka tuplu se ignoruje). - """ - ids: list[int] = [] - for reg, _ignored_name, val in registers: - register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}") - cmd_id = await db.fetchval( - """ - INSERT INTO ems.modbus_command - (site_id, asset_type, asset_id, asset_code, - device_host, device_port, device_unit_id, - register, register_name, value_to_write, - planning_run_id, status, deye_physical_mode) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12) - RETURNING id - """, - site_id, - asset_type, - asset_id, - asset_code, - host, - port, - unit_id, - reg, - register_name, - val, - planning_run_id, - deye_physical_mode, - ) - if cmd_id is not None: - ids.append(int(cmd_id)) - return ids - - -def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]: - """Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3.""" - if not cmds: - return [] - sorted_cmds = sorted(cmds, key=lambda c: int(c["register"])) - runs: list[list[asyncpg.Record]] = [] - cur: list[asyncpg.Record] = [sorted_cmds[0]] - for c in sorted_cmds[1:]: - if int(c["register"]) == int(cur[-1]["register"]) + 1: - cur.append(c) - else: - runs.append(cur) - cur = [c] - runs.append(cur) - return runs - - -async def execute_modbus_commands( - command_ids: list[int], - db: asyncpg.Connection, -) -> bool: - """ - Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích). - Aktualizuje status na 'written' nebo 'failed'. - Vrátí True pokud všechny příkazy uspěly. - """ - MAX_RETRIES = 3 - RETRY_DELAY = 0.5 - - rows: list[asyncpg.Record] = [] - for cmd_id in command_ids: - cmd = await db.fetchrow( - "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id - ) - if cmd is not None: - rows.append(cmd) - - if not rows: - return True - - by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) - for cmd in rows: - by_gw[ - (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) - ].append(cmd) - - all_ok = True - for (host, port, unit), group in by_gw.items(): - client = await get_modbus_client(host, port) - for run in _modbus_command_contiguous_runs(group): - start_reg = int(run[0]["register"]) - values = [int(c["value_to_write"]) for c in run] - ids_run = [int(c["id"]) for c in run] - for attempt in range(MAX_RETRIES): - try: - await client.write_registers(start_reg, values, unit) - for cmd, val in zip(run, values): - cid = int(cmd["id"]) - await db.execute( - """ - UPDATE ems.modbus_command - SET status='written', value_written=$1, written_at=now(), - attempt_count=attempt_count+1, error_msg=NULL - WHERE id=$2 - """, - val, - cid, - ) - logger.info( - "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", - cid, - cmd["asset_code"], - int(cmd["register"]), - val, - start_reg, - attempt + 1, - ) - break - except Exception as e: - if attempt < MAX_RETRIES - 1: - logger.warning( - "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", - start_reg, - len(values), - attempt + 1, - e, - ) - await asyncio.sleep(RETRY_DELAY) - await client.force_disconnect() - else: - for cmd in run: - await db.execute( - """ - UPDATE ems.modbus_command - SET status='failed', error_msg=$1, - attempt_count=attempt_count+1 - WHERE id=$2 - """, - str(e), - int(cmd["id"]), - ) - logger.error( - "Modbus batch 0x%04X count=%s all %s attempts failed: %s", - start_reg, - len(values), - MAX_RETRIES, - e, - ) - all_ok = False - - return all_ok - - async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" from services.notification_service import run_fn_set_mode_with_discord diff --git a/backend/services/control/modbus_journal.py b/backend/services/control/modbus_journal.py new file mode 100644 index 0000000..683aa23 --- /dev/null +++ b/backend/services/control/modbus_journal.py @@ -0,0 +1,243 @@ +"""Modbus command journal helpers pro control export.""" + +from __future__ import annotations + +import asyncio +import json +import logging +from collections import defaultdict + +import asyncpg + +from services.control.deye_helpers import DEYE_REGISTER_NAMES, _deye_reg178_verify_match +from services.modbus_client import get_modbus_client + +logger = logging.getLogger(__name__) + + +async def _fetch_written_deye_clock_commands( + site_id: int, + asset_id: int, + host: str, + port: int, + unit_id: int, + db: asyncpg.Connection, +) -> list[asyncpg.Record]: + """Všechny řádky journalu 62-64 ve stavu written pro daný invertor/endpoint.""" + rows = await db.fetch( + """ + SELECT * FROM ems.modbus_command + WHERE site_id = $1 + AND asset_type = 'inverter' + AND asset_id = $2 + AND device_host = $3 + AND device_port = $4 + AND device_unit_id = $5 + AND register IN (62, 63, 64) + AND status = 'written' + ORDER BY register + """, + site_id, + asset_id, + host, + port, + unit_id, + ) + return list(rows) + + +async def _fetch_last_verified_inverter_registers( + site_id: int, inverter_asset_id: int, db: asyncpg.Connection +) -> dict[int, int]: + """ + Poslední hodnota na zařízení podle journalu (jen status verified). + Slouží k přeskočení duplicitního zápisu stejné hodnoty. + """ + raw = await db.fetchval( + """ + select ems.fn_modbus_last_verified_map($1::int, $2::int) + """, + site_id, + inverter_asset_id, + ) + data = raw if isinstance(raw, dict) else json.loads(raw) + return {int(k): int(v) for k, v in data.items()} + + +def _drop_registers_matching_last_verified( + registers: list[tuple[int, str, int]], + last_verified: dict[int, int], +) -> tuple[list[tuple[int, str, int]], list[int]]: + """Vynechá položky s hodnotou shodnou s posledním ověřeným stavem.""" + out: list[tuple[int, str, int]] = [] + skipped: list[int] = [] + for reg, meta, val in registers: + lv = last_verified.get(int(reg)) + if lv is not None: + if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)): + skipped.append(int(reg)) + continue + if int(lv) == int(val): + skipped.append(int(reg)) + continue + out.append((reg, meta, val)) + return out, skipped + + +async def create_modbus_commands( + site_id: int, + planning_run_id: int | None, + asset_type: str, + asset_id: int, + asset_code: str, + host: str, + port: int, + unit_id: int, + registers: list[tuple[int, str, int]], + db: asyncpg.Connection, + deye_physical_mode: str | None = None, +) -> list[int]: + """ + Vytvoří záznamy v modbus_command pro sadu zápisů. + Vrátí list command IDs. + """ + ids: list[int] = [] + for reg, _ignored_name, val in registers: + register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}") + cmd_id = await db.fetchval( + """ + INSERT INTO ems.modbus_command + (site_id, asset_type, asset_id, asset_code, + device_host, device_port, device_unit_id, + register, register_name, value_to_write, + planning_run_id, status, deye_physical_mode) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12) + RETURNING id + """, + site_id, + asset_type, + asset_id, + asset_code, + host, + port, + unit_id, + reg, + register_name, + val, + planning_run_id, + deye_physical_mode, + ) + if cmd_id is not None: + ids.append(int(cmd_id)) + return ids + + +def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]: + """Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3.""" + if not cmds: + return [] + sorted_cmds = sorted(cmds, key=lambda c: int(c["register"])) + runs: list[list[asyncpg.Record]] = [] + cur: list[asyncpg.Record] = [sorted_cmds[0]] + for c in sorted_cmds[1:]: + if int(c["register"]) == int(cur[-1]["register"]) + 1: + cur.append(c) + else: + runs.append(cur) + cur = [c] + runs.append(cur) + return runs + + +async def execute_modbus_commands( + command_ids: list[int], + db: asyncpg.Connection, +) -> bool: + """ + Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích). + Aktualizuje status na 'written' nebo 'failed'. + """ + max_retries = 3 + retry_delay = 0.5 + + rows: list[asyncpg.Record] = [] + for cmd_id in command_ids: + cmd = await db.fetchrow( + "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id + ) + if cmd is not None: + rows.append(cmd) + + if not rows: + return True + + by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) + for cmd in rows: + by_gw[ + (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) + ].append(cmd) + + all_ok = True + for (host, port, unit), group in by_gw.items(): + client = await get_modbus_client(host, port) + for run in _modbus_command_contiguous_runs(group): + start_reg = int(run[0]["register"]) + values = [int(c["value_to_write"]) for c in run] + for attempt in range(max_retries): + try: + await client.write_registers(start_reg, values, unit) + for cmd, val in zip(run, values): + cid = int(cmd["id"]) + await db.execute( + """ + UPDATE ems.modbus_command + SET status='written', value_written=$1, written_at=now(), + attempt_count=attempt_count+1, error_msg=NULL + WHERE id=$2 + """, + val, + cid, + ) + logger.info( + "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", + cid, + cmd["asset_code"], + int(cmd["register"]), + val, + start_reg, + attempt + 1, + ) + break + except Exception as e: + if attempt < max_retries - 1: + logger.warning( + "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", + start_reg, + len(values), + attempt + 1, + e, + ) + await asyncio.sleep(retry_delay) + await client.force_disconnect() + else: + for cmd in run: + await db.execute( + """ + UPDATE ems.modbus_command + SET status='failed', error_msg=$1, + attempt_count=attempt_count+1 + WHERE id=$2 + """, + str(e), + int(cmd["id"]), + ) + logger.error( + "Modbus batch 0x%04X count=%s all %s attempts failed: %s", + start_reg, + len(values), + max_retries, + e, + ) + all_ok = False + + return all_ok