From 0ed6f18e1a81731d3ae671d218c6192b5519d54d Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:51:38 +0200 Subject: [PATCH] =?UTF-8?q?EV=20=C5=99=C3=ADzen=C3=AD:=20z=C3=A1pis=20Amps?= =?UTF-8?q?-to-use=20p=C5=99es=20journal=20+=20watchdog=20+=20okam=C5=BEit?= =?UTF-8?q?=C3=BD=20replan=20po=20p=C5=99=C3=ADjezdu?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bod 1 — write_ev_setpoints reálně (konec TODO stubu): - reg 15 (0=stop, 6–32 A) z plánu přes _current_limit_for_charger; plná journal pipeline (create_modbus_commands → execute, verify job 2 min generic) - watchdog reg 19=300 s + reg 20=8 A: výpadek EMS → wallbox po 5 min failsafe 8 A (auto se přes noc nabije); drop-unchanged → zapisuje se jen při změně - fn_modbus_last_verified_map: + p_asset_type (drop 2-arg; dosud hardcoded 'inverter' — pro chargery vracela {}) - verify: SELF_SUSTAIN fallback explicitně jen pro asset_type='inverter' — mismatch wallboxu nesmí degradovat režim celé site - journal register_name: mimo inverter platí jméno od volajícího Bod 2 — telemetry_collector: přechod available→connected spustí fire-and-forget run_rolling_replan(triggered_by=ev_arrival:) + export_setpoints přes BG pool — reakce na příjezd ~60 s místo až 15 min. Bod 3 (Tesla API SoC) čeká na developer credentials. Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/services/control/modbus_journal.py | 32 ++++++-- backend/services/control/outputs.py | 73 ++++++++++++++++++- backend/services/control/verify.py | 7 +- backend/services/telemetry_collector.py | 36 +++++++++ .../R__002_fn_modbus_last_verified_map.sql | 12 ++- 5 files changed, 148 insertions(+), 12 deletions(-) diff --git a/backend/services/control/modbus_journal.py b/backend/services/control/modbus_journal.py index 683aa23..8407243 100644 --- a/backend/services/control/modbus_journal.py +++ b/backend/services/control/modbus_journal.py @@ -46,8 +46,12 @@ async def _fetch_written_deye_clock_commands( return list(rows) -async def _fetch_last_verified_inverter_registers( - site_id: int, inverter_asset_id: int, db: asyncpg.Connection +async def _fetch_last_verified_registers( + site_id: int, + asset_id: int, + db: asyncpg.Connection, + *, + asset_type: str = "inverter", ) -> dict[int, int]: """ Poslední hodnota na zařízení podle journalu (jen status verified). @@ -55,15 +59,25 @@ async def _fetch_last_verified_inverter_registers( """ raw = await db.fetchval( """ - select ems.fn_modbus_last_verified_map($1::int, $2::int) + select ems.fn_modbus_last_verified_map($1::int, $2::int, $3::text) """, site_id, - inverter_asset_id, + asset_id, + asset_type, ) data = raw if isinstance(raw, dict) else json.loads(raw) return {int(k): int(v) for k, v in data.items()} +async def _fetch_last_verified_inverter_registers( + site_id: int, inverter_asset_id: int, db: asyncpg.Connection +) -> dict[int, int]: + """Zpětně kompatibilní alias (Deye cesty).""" + return await _fetch_last_verified_registers( + site_id, inverter_asset_id, db, asset_type="inverter" + ) + + def _drop_registers_matching_last_verified( registers: list[tuple[int, str, int]], last_verified: dict[int, int], @@ -102,8 +116,14 @@ async def create_modbus_commands( Vrátí list command IDs. """ ids: list[int] = [] - for reg, _ignored_name, val in registers: - register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}") + for reg, given_name, val in registers: + # Deye registry mají kanonická jména; pro ostatní zařízení (Teltonika…) + # platí jméno dodané volajícím. + register_name = ( + DEYE_REGISTER_NAMES.get(reg) + if asset_type == "inverter" + else None + ) or given_name or f"reg_{reg}" cmd_id = await db.fetchval( """ INSERT INTO ems.modbus_command diff --git a/backend/services/control/outputs.py b/backend/services/control/outputs.py index 9a5b130..a203736 100644 --- a/backend/services/control/outputs.py +++ b/backend/services/control/outputs.py @@ -13,6 +13,32 @@ from services.control.models import ControlSetpoints, OperatingModeInfo logger = logging.getLogger(__name__) +# Teltonika TeltoCharge – zápisové registry (oficiální protokol rev 0.5; +# docs/04-modules/modbus-registers-teltocharge.md). FC 16 přes journal. +TELTO_REG_AMPS_TO_USE = 15 # 0 = stop, 6–32 A +TELTO_REG_COMM_TIMEOUT_S = 19 # watchdog: bez komunikace → failsafe +TELTO_REG_FAILSAFE_CURRENT_A = 20 +#: Výpadek EMS: po 5 min bez zápisu wallbox přejde na failsafe proud — +#: auto se přes noc nabije i bez EMS (pomalu), místo aby stálo na 0 A. +TELTO_WATCHDOG_TIMEOUT_S = 300 +TELTO_WATCHDOG_FAILSAFE_A = 8 + + +def _telto_setpoint_registers(current_a: int) -> list[tuple[int, str, int]]: + """Registry pro jeden export tick: limit proudu + watchdog konfigurace. + + Watchdog (19/20) se posílá s každým tickem, ale journal drop-unchanged ho + po prvním verified zápisu přeskakuje — reálně se zapíše jednou. + """ + a = int(current_a) + if a < 6: + a = 0 + return [ + (TELTO_REG_AMPS_TO_USE, "telto_amps_to_use", min(a, 32)), + (TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", TELTO_WATCHDOG_TIMEOUT_S), + (TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", TELTO_WATCHDOG_FAILSAFE_A), + ] + def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: c = (charger_code or "").strip().lower() @@ -34,9 +60,16 @@ def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: async def write_ev_setpoints( site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection ) -> str: + from services.control.modbus_journal import ( + _drop_registers_matching_last_verified, + _fetch_last_verified_registers, + create_modbus_commands, + execute_modbus_commands, + ) + rows = await db.fetch( """ - SELECT ec.code, se.host, se.port, se.unit_id + SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id FROM ems.asset_ev_charger ec JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 @@ -50,15 +83,49 @@ async def write_ev_setpoints( if not rows: return "OK EV: no schedulable chargers" + written = 0 for row in rows: code = row["code"] + asset_id = int(row["asset_id"]) + host = str(row["host"]) + port = int(row["port"] or 502) + unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) current_a = _current_limit_for_charger(code, setpoints) + + registers = _telto_setpoint_registers(current_a) + last_verified = await _fetch_last_verified_registers( + site_id, asset_id, db, asset_type="ev_charger" + ) + registers, skipped = _drop_registers_matching_last_verified( + registers, last_verified + ) + if not registers: + logger.debug("EV setpoint [%s]: beze změny (%s A)", code, current_a) + continue + + cmd_ids = await create_modbus_commands( + site_id, + None, + "ev_charger", + asset_id, + code, + host, + port, + unit_id, + registers, + db, + ) + ok = await execute_modbus_commands(cmd_ids, db) + written += 1 logger.info( - "EV setpoint [%s]: %sA (TODO: Modbus registers)", + "EV setpoint [%s]: %s A (regs %s%s) -> %s", code, current_a, + [r for r, _, _ in registers], + f", skip {skipped}" if skipped else "", + "written" if ok else "FAILED", ) - return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)" + return f"OK EV: {written}/{len(rows)} charger(s) written" async def write_heat_pump_setpoint( diff --git a/backend/services/control/verify.py b/backend/services/control/verify.py index 9591525..1944a2c 100644 --- a/backend/services/control/verify.py +++ b/backend/services/control/verify.py @@ -353,7 +353,12 @@ async def verify_modbus_commands( await execute_modbus_commands([cmd_id], db) await verify_modbus_commands([cmd_id], db, site_id) else: - if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): + # SELF_SUSTAIN fallback je Deye politika — mismatch na jiném + # zařízení (EV wallbox…) nesmí degradovat režim celé lokality. + if ( + str(cmd["asset_type"]) == "inverter" + and deye_reg_triggers_self_sustain_after_verify_exhaust(reg) + ): logger.critical( "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", cmd_id, diff --git a/backend/services/telemetry_collector.py b/backend/services/telemetry_collector.py index 588d180..c02e52f 100644 --- a/backend/services/telemetry_collector.py +++ b/backend/services/telemetry_collector.py @@ -13,6 +13,39 @@ from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) +#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po +#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper. +_BG_POOL: asyncpg.Pool | None = None + + +async def _on_ev_arrival(site_id: int, charger_code: str) -> None: + """Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15). + + Deferred importy: planning_engine/control_exporter importují control balík, + který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu. + """ + if _BG_POOL is None: + logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick") + return + try: + from services.control_exporter import export_setpoints + from services.planning_engine import run_rolling_replan + + async with _BG_POOL.acquire() as conn: + await run_rolling_replan( + site_id, conn, triggered_by=f"ev_arrival:{charger_code}" + ) + await export_setpoints(site_id, conn) + logger.info( + "EV arrival replan+export done (site=%s, charger=%s)", + site_id, + charger_code, + ) + except Exception: + logger.exception( + "EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code + ) + # Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers) DEYE_REG_RUN_STATE = 500 DEYE_REG_BATT_CHARGE_TODAY = 514 @@ -254,6 +287,7 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: ) if previous_status == "available" and current_status != "available": logger.info("EV arrival detected on charger %s", code) + asyncio.create_task(_on_ev_arrival(site_id, str(code))) elif previous_status != "available" and current_status == "available": logger.info("EV departure detected on charger %s", code) @@ -307,6 +341,8 @@ async def run_telemetry_loop(conn: asyncpg.Connection) -> float: async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None: """Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep.""" + global _BG_POOL + _BG_POOL = pool while True: try: async with pool.acquire() as conn: diff --git a/db/routines/R__002_fn_modbus_last_verified_map.sql b/db/routines/R__002_fn_modbus_last_verified_map.sql index 3546fc4..81081ae 100644 --- a/db/routines/R__002_fn_modbus_last_verified_map.sql +++ b/db/routines/R__002_fn_modbus_last_verified_map.sql @@ -1,8 +1,13 @@ -- map register -> value_verified z modbus_command (poslední verified řádek per register) +-- Od 2026-06-11 generické přes p_asset_type (inverter / ev_charger / …); +-- starou 2-arg signaturu dropnout (default by způsobil ambiguitu). + +drop function if exists ems.fn_modbus_last_verified_map(int, int); create or replace function ems.fn_modbus_last_verified_map( p_site_id int, - p_asset_id int + p_asset_id int, + p_asset_type text default 'inverter' ) returns jsonb language sql @@ -18,7 +23,10 @@ as $fn$ v.value_verified from ems.vw_modbus_last_verified v where v.site_id = p_site_id - and v.asset_type = 'inverter' + and v.asset_type = p_asset_type and v.asset_id = p_asset_id ) t; $fn$; + +comment on function ems.fn_modbus_last_verified_map is +'Mapa register -> poslední verified hodnota z modbus_command pro dané zařízení (site, asset_type, asset_id). Slouží k drop-unchanged v control exporteru.';