"""Export plánovaných setpointů na Modbus (Deye, EV, TČ) a HTTP do Loxone.""" from __future__ import annotations import asyncio import json import logging import os from collections import defaultdict from dataclasses import dataclass from typing import Any from datetime import date, datetime, timedelta, timezone from zoneinfo import ZoneInfo import asyncpg import httpx from app.config import get_settings from services.modbus_client import get_modbus_client from services.signal_service import enqueue_site_signals logger = logging.getLogger(__name__) PRAGUE_TZ = ZoneInfo("Europe/Prague") # Hodiny Deye 62–64: po zápisu sekundy na zařízení dál běží → verify musí být toleranční. DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120 # Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund… DEYE_CLOCK_DRIFT_OK_SEC = 60 # …a zároveň neuplynul tento interval od posledního syncu / potvrzení driftu. DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24 # Deye LV baterie: převod výkon → proud pro registry 108/109 (viz docs/04-modules/modbus-registers.md) BATT_VOLTAGE_V = 51.2 # Reg 143 ve SELL: min(|grid_setpoint_w|, …) nesmí klesnout pod tuto podlahu (W) — kvůli chování firmware, ne mapování režimu. REG143_SELL_CAP_MIN_W = 200 # Reg 178 – bitové pole: používáme bity 4–5 (peak shaving switch) a bity 0–1 (MI export cutoff). # Ostatní bity zachovat → read-modify-write. REG178_SELL = 0b00100000 # 32, grid peak shaving disable REG178_PASSIVE = 0b00110000 # 48, grid peak shaving enable (PASSIVE i CHARGE) # TOU reg 166+ ve PASSIVE při prioritě baterie: signál střídači „využij celý dostupný rozsah“, # ne provozní strop z DB (ten je pro LP / Wh – viz asset_battery.max_soc_percent). DEYE_TOU_SOC_PASSIVE_BATTERY_PRIORITY_PCT = 100 # Verify: jen bity 4–5 (horní byte layout v dokumentaci); ostatní bity mohou mít firmware / Loxone REG178_VERIFY_MASK = 0x0030 # Reg 178 bits 0–1: MI export cutoff (AC coupling / GEN). REG178_MI_EXPORT_MASK = 0x0003 REG178_MI_EXPORT_DISABLE = 0b10 REG178_MI_EXPORT_ENABLE = 0b11 REG178_VERIFY_MASK_COMBINED = REG178_VERIFY_MASK | REG178_MI_EXPORT_MASK # Po 3 neúspěšných verify pokusech → SELF_SUSTAIN jen u těchto registrech (bezpečnost / export). # 62–64 řeší toleranční bundle (nemění režim). 178 a TOU power W jsou „soft“ — jen log + Discord. DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145}) # Výkonové řádky TOU (154 + slot_index 0…5) — firmware často přepíše na max W z max_charge/max_discharge A. DEYE_TOU_POWER_REGS = frozenset(range(154, 160)) # Deye LV: firmware často odmítne 351 A a drží 350 — horní strop pro zápis z DB. DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350 def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool: return (int(expected_i) & REG178_VERIFY_MASK_COMBINED) == ( int(actual_i) & REG178_VERIFY_MASK_COMBINED ) def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool: """True = po 3× mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr).""" return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN def _deye_tou_power_verify_match( expected_i: int, actual_i: int, inv: InverterConfig ) -> bool: """Firmware často clampne TOU power W na max z reg. 108/109 × 51.2 V — akceptovat jako OK.""" if int(actual_i) == int(expected_i): return True # 51.2 V — nesmí int(BATT_VOLTAGE_V)==51 (off-by-one vs. firmware 17920 W @ 350 A) max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V) max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) a = int(actual_i) return a == max_w_charge or a == max_w_discharge def _deye_reg178_verify_with_double_read( expected_i: int, actual_first: int, actual_second: int | None ) -> tuple[bool, int]: """ Vrátí (shoda, hodnota_pro_journal). Druhé čtení použít jen když první neprojde maskou (RS485 / glitch). """ if _deye_reg178_verify_match(expected_i, actual_first): return True, actual_first if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second): return True, int(actual_second) return False, actual_first # Neaktivní TOU bloky (3–6): „konec dne“ — Deye často 23:59 (2359) neuloží a vrátí např. 2355, # verify pak hlásí mismatch. 23:55 je na zařízeních stabilní (viz HHMM jako desítkové číslo). DEYE_TOU_INACTIVE_HHMM = 2355 # Registry TOU řádků 3–6 (slot index 2…5): 150–153, 156–159, … — pro detekci skutečného zápisu po filtru „unchanged“. _DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset( [ 150, 151, 152, 153, 156, 157, 158, 159, 168, 169, 170, 171, 174, 175, 176, 177, ] ) # Systémový čas Deye — vždy toleranční verify jako celek 62–64 (reg 64 sám nesmí do striktní větve). DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64}) DEYE_REGISTER_NAMES: dict[int, str] = { 108: "max_charge_a (max nabíjecí proud baterie)", 109: "max_discharge_a (max vybíjecí proud baterie)", 141: "energy_mode (0, EMS nemění)", 142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)", 143: "export_limit_w (max export do sítě)", 145: "solar_sell (0=disabled, 1=enabled)", 340: "max_solar_power_w (strop DC PV A v W; součet nominal_power_wp řiditelných polí)", 178: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3; bits4-5 peak shaving 32/48)", 148: "time_point_1_time", 149: "time_point_2_time", 154: "time_point_1_power_w", 155: "time_point_2_power_w", 166: "time_point_1_soc_min_pct", 167: "time_point_2_soc_min_pct", 172: "time_point_1_grid_charge", 173: "time_point_2_grid_charge", 62: "system_time_year_month", 63: "system_time_day_hour", 64: "system_time_min_sec", } for _tp_i in range(6): _n = _tp_i + 1 DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time") DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w") DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct") DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge") def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int: if not power_w or power_w <= 0: return 0 return min(32, max(0, int(power_w / (phases * voltage)))) def battery_watts_to_amps(power_w: int, max_amps: int) -> int: """Proud z |výkonu| baterie; max_amps z DB (už COALESCE se stropy v SQL). int(|W|/51.2) — u kladných hodnot stejné jako floor bez importu math. """ derived = int(abs(power_w) / BATT_VOLTAGE_V) return min(max(0, max_amps), max(0, derived)) def current_slot_hhmm() -> int: """Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM (např. 1415).""" now = datetime.now(ZoneInfo("Europe/Prague")) slot_min = (now.minute // 15) * 15 return now.hour * 100 + slot_min def next_slot_hhmm() -> int: """Začátek příštího 15min slotu v Europe/Prague, formát HHMM (např. 1430).""" now = datetime.now(ZoneInfo("Europe/Prague")) minutes = now.minute slot_minutes = ((minutes // 15) + 1) * 15 if slot_minutes >= 60: next_hour = (now.hour + 1) % 24 next_min = 0 else: next_hour = now.hour next_min = slot_minutes return next_hour * 100 + next_min @dataclass class InverterConfig: id: int code: str host: str port: int unit_id: int max_export_power_w: int | None max_import_power_w: int | None no_export: bool max_battery_charge_w: int | None max_battery_discharge_w: int | None min_soc_percent: int | None reserve_soc_percent: int | None max_soc_percent: int | None usable_capacity_wh: int | None max_charge_a: int max_discharge_a: int deye_last_system_time_sync_minute: datetime | None = None deye_last_system_time_sync_at: datetime | None = None deye_last_tou_inactive_write_prague_date: date | None = None deye_tou_inactive_signature: str | None = None deye_zero_export_mode: int = 1 deye_gen_microinverter_cutoff_enabled: bool = False manufacturer: str = "" #: Součet nominal_power_wp controllable PV na invertoru; 0 = EMS nezapisuje reg 340. pv_a_cap_w: int = 0 def compute_pv_a_reg340_max_solar_w(cap_w: int, forecast_w: int, curtail_w: int) -> int: """Hodnota pro Deye reg 340 (max solar power, W) z capu a plánovaného curtailmentu pole A.""" if curtail_w <= 0: return int(cap_w) return max(0, min(int(cap_w), int(forecast_w) - int(curtail_w))) def _prague_minute_start_utc() -> datetime: """UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague.""" p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0) return p.astimezone(timezone.utc) def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None: """Dekódování reg 62–64 (Deye system time v Europe/Prague).""" try: year = (int(r62) >> 8) + 2000 month = int(r62) & 0xFF day = int(r63) >> 8 hour = int(r63) & 0xFF minute = int(r64) >> 8 second = int(r64) & 0xFF if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23): return None if not (0 <= minute <= 59 and 0 <= second <= 59): return None return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ) except (ValueError, OverflowError): return None def _deye_clock_registers_verify_match( w62: int, w63: int, w64: int, a62: int, a63: int, a64: int, ) -> bool: w_dt = _deye_registers_to_prague_datetime(w62, w63, w64) a_dt = _deye_registers_to_prague_datetime(a62, a63, a64) if w_dt is None or a_dt is None: return False return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC def _deye_should_skip_time_sync_after_read( inv: InverterConfig, r62: int, r63: int, r64: int, ) -> bool: """ True = nezařazovat zápis 62–64: drift je malý a od posledního úspěšného zápisu (FC 0x10 ACK) nebo tolerančního ověření neuplynulo 24h — sloupec deye_last_system_time_sync_at doplňuje write_inverter_setpoints po úspěšném zápisu batche obsahujícího 62–64 a znovu po úspěšném verify. """ dev = _deye_registers_to_prague_datetime(r62, r63, r64) if dev is None: return False wall = datetime.now(PRAGUE_TZ) drift = abs((wall - dev).total_seconds()) if drift > DEYE_CLOCK_DRIFT_OK_SEC: return False last_write = inv.deye_last_system_time_sync_at if last_write is None: return False if last_write.tzinfo is None: last_write = last_write.replace(tzinfo=timezone.utc) else: last_write = last_write.astimezone(timezone.utc) age = datetime.now(timezone.utc) - last_write if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS): return False return True async def _fetch_written_deye_clock_commands( site_id: int, asset_id: int, host: str, port: int, unit_id: int, db: asyncpg.Connection, ) -> list[asyncpg.Record]: """Všechny řádky journalu 62–64 ve stavu written pro daný invertor/endpoint.""" rows = await db.fetch( """ SELECT * FROM ems.modbus_command WHERE site_id = $1 AND asset_type = 'inverter' AND asset_id = $2 AND device_host = $3 AND device_port = $4 AND device_unit_id = $5 AND register IN (62, 63, 64) AND status = 'written' ORDER BY register """, site_id, asset_id, host, port, unit_id, ) return list(rows) async def _fetch_last_verified_inverter_registers( site_id: int, inverter_asset_id: int, db: asyncpg.Connection ) -> dict[int, int]: """ Poslední hodnota na zařízení podle journalu (jen status verified). Slouží k přeskočení duplicitního zápisu stejné hodnoty. """ raw = await db.fetchval( """ select ems.fn_modbus_last_verified_map($1::int, $2::int) """, site_id, inverter_asset_id, ) data = raw if isinstance(raw, dict) else json.loads(raw) return {int(k): int(v) for k, v in data.items()} def _drop_registers_matching_last_verified( registers: list[tuple[int, str, int]], last_verified: dict[int, int], ) -> tuple[list[tuple[int, str, int]], list[int]]: """Vynechá položky s hodnotou shodnou s posledním ověřeným stavem; vrátí (nový seznam, vynechané reg).""" out: list[tuple[int, str, int]] = [] skipped: list[int] = [] for reg, meta, val in registers: lv = last_verified.get(int(reg)) if lv is not None: # reg178: porovnáváme jen masku bitů 4–5 (Deye si v dalších bitech drží vlastní stav). if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)): skipped.append(int(reg)) continue if int(lv) == int(val): skipped.append(int(reg)) continue out.append((reg, meta, val)) return out, skipped @dataclass class ControlSetpoints: battery_w: int | None grid_export_limit: int ev1_current_a: int ev2_current_a: int heat_pump_enable: bool grid_setpoint_w: int ev1_power_w: int ev2_power_w: int target_soc_pct: int | None = None #: Explicitní fyzický režim z plánu (PASSIVE/SELL/CHARGE). Pokud je vyplněn, má přednost před detekcí ze znamének. deye_physical_mode: str | None = None #: True = zákaz exportu (BLOCK_EXPORT) pro daný slot: např. při efektivní vykupní ceně < 0. export_ban: bool = False #: True = odpojit GEN port (MI export cutoff) v tomto slotu dle plánu (reg 178 bits0-1, 0-based). #: None/False = neodpojovat. deye_gen_cutoff_enabled: bool = False #: Efektivní vykupní cena slotu (Kč/kWh z plánu); pro TOU řízení priorit baterie vs. přetok effective_sell_price_czk_kwh: float | None = None #: True = reg 108/109 na 0 (PRESERVE – Deye baterii nepoužívá) lock_battery: bool = False #: Režim SELF_SUSTAIN: plný rozsah nabíjení/vybíjení na invertoru + zero-export (reg 142) a nízké TOU %. self_sustain_local_use: bool = False #: Deye reg 340 (max solar power, W). None = EMS reg 340 v tomto ticku neřeší (PRESERVE/SELF_SUSTAIN/CHARGE_CHEAP/…). pv_a_allowed_w: int | None = None @dataclass class OperatingModeInfo: mode_code: str battery_mode: str grid_mode: str ev_enabled: bool heat_pump_enabled_def: bool loxone_mode_value: int async def create_modbus_commands( site_id: int, planning_run_id: int | None, asset_type: str, asset_id: int, asset_code: str, host: str, port: int, unit_id: int, registers: list[tuple[int, str, int]], db: asyncpg.Connection, deye_physical_mode: str | None = None, ) -> list[int]: """ Vytvoří záznamy v modbus_command pro sadu zápisů. Vrátí list command IDs. Pro Deye se jméno registru bere z DEYE_REGISTER_NAMES (prostřední položka tuplu se ignoruje). """ ids: list[int] = [] for reg, _ignored_name, val in registers: register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}") cmd_id = await db.fetchval( """ INSERT INTO ems.modbus_command (site_id, asset_type, asset_id, asset_code, device_host, device_port, device_unit_id, register, register_name, value_to_write, planning_run_id, status, deye_physical_mode) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12) RETURNING id """, site_id, asset_type, asset_id, asset_code, host, port, unit_id, reg, register_name, val, planning_run_id, deye_physical_mode, ) if cmd_id is not None: ids.append(int(cmd_id)) return ids def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]: """Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3.""" if not cmds: return [] sorted_cmds = sorted(cmds, key=lambda c: int(c["register"])) runs: list[list[asyncpg.Record]] = [] cur: list[asyncpg.Record] = [sorted_cmds[0]] for c in sorted_cmds[1:]: if int(c["register"]) == int(cur[-1]["register"]) + 1: cur.append(c) else: runs.append(cur) cur = [c] runs.append(cur) return runs async def execute_modbus_commands( command_ids: list[int], db: asyncpg.Connection, ) -> bool: """ Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích). Aktualizuje status na 'written' nebo 'failed'. Vrátí True pokud všechny příkazy uspěly. """ MAX_RETRIES = 3 RETRY_DELAY = 0.5 rows: list[asyncpg.Record] = [] for cmd_id in command_ids: cmd = await db.fetchrow( "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id ) if cmd is not None: rows.append(cmd) if not rows: return True by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) for cmd in rows: by_gw[ (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) all_ok = True for (host, port, unit), group in by_gw.items(): client = await get_modbus_client(host, port) for run in _modbus_command_contiguous_runs(group): start_reg = int(run[0]["register"]) values = [int(c["value_to_write"]) for c in run] ids_run = [int(c["id"]) for c in run] for attempt in range(MAX_RETRIES): try: await client.write_registers(start_reg, values, unit) for cmd, val in zip(run, values): cid = int(cmd["id"]) await db.execute( """ UPDATE ems.modbus_command SET status='written', value_written=$1, written_at=now(), attempt_count=attempt_count+1, error_msg=NULL WHERE id=$2 """, val, cid, ) logger.info( "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", cid, cmd["asset_code"], int(cmd["register"]), val, start_reg, attempt + 1, ) break except Exception as e: if attempt < MAX_RETRIES - 1: logger.warning( "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", start_reg, len(values), attempt + 1, e, ) await asyncio.sleep(RETRY_DELAY) await client.force_disconnect() else: for cmd in run: await db.execute( """ UPDATE ems.modbus_command SET status='failed', error_msg=$1, attempt_count=attempt_count+1 WHERE id=$2 """, str(e), int(cmd["id"]), ) logger.error( "Modbus batch 0x%04X count=%s all %s attempts failed: %s", start_reg, len(values), MAX_RETRIES, e, ) all_ok = False return all_ok async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" from services.notification_service import run_fn_set_mode_with_discord await run_fn_set_mode_with_discord( db, site_id, "SELF_SUSTAIN", "system:mismatch", None, reason, ) logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason) def _modbus_cmd_register(cmd: Any) -> int: """asyncpg.Record má __getitem__; objekty s atributem .register též (testy).""" try: return int(cmd["register"]) except (KeyError, TypeError): return int(cmd.register) def _deye_expected_clock_triplet_for_verify( bundle: list[asyncpg.Record], last_verified: dict[int, int], a62: int, a63: int, a64: int, ) -> tuple[int, int, int]: """ Sestaví očekávané (w62,w63,w64) pro toleranční verify. Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení (aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64). """ by_reg = {_modbus_cmd_register(c): c for c in bundle} def _vtw(c: Any) -> int: try: return int(c["value_to_write"]) except (KeyError, TypeError): return int(c.value_to_write) w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62) w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63) w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64) return (int(w62), int(w63), int(w64)) async def _verify_deye_clock_written_bundle( site_id: int, bundle: list[asyncpg.Record], a62: int, a63: int, a64: int, db: asyncpg.Connection, ) -> bool: """ Toleranční ověření pro jeden až tři řádky journalu 62–64 ve stavu written. Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech. """ from services.notification_service import ( notify_modbus_clock_verify_exhausted, notify_modbus_mismatch, ) cmds_s = sorted(bundle, key=_modbus_cmd_register) try: asset_id = int(cmds_s[0]["asset_id"]) except (KeyError, TypeError): asset_id = int(cmds_s[0].asset_id) last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db) w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64) clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64) actual_by_reg = {62: a62, 63: a63, 64: a64} for cmd in cmds_s: try: cid = int(cmd["id"]) except (KeyError, TypeError): cid = int(cmd.id) r = _modbus_cmd_register(cmd) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_by_reg[r], clock_ok, cid, ) if clock_ok: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_system_time_sync_minute = $1, deye_last_system_time_sync_at = now() WHERE id = $2 """, _prague_minute_start_utc(), asset_id, ) for cmd in cmds_s: try: cid_l = int(cmd["id"]) except (KeyError, TypeError): cid_l = int(cmd.id) try: code_l = str(cmd["asset_code"]) except (KeyError, TypeError): code_l = str(cmd.asset_code) rr = _modbus_cmd_register(cmd) logger.info( "[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s", cid_l, code_l, rr, actual_by_reg[rr], ) return True cmd0 = cmds_s[0] try: ac0 = str(cmd0["asset_code"]) except (KeyError, TypeError): ac0 = str(cmd0.asset_code) logger.error( "[cmd clock] MISMATCH %s 62–64: written=(%s,%s,%s) actual=(%s,%s,%s)", ac0, w62, w63, w64, a62, a63, a64, ) attempts = 0 for cmd in cmds_s: try: cid_q = int(cmd["id"]) except (KeyError, TypeError): cid_q = int(cmd.id) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q ) ac = int(row_ac["attempt_count"] or 0) if row_ac else 0 attempts = max(attempts, ac) await notify_modbus_mismatch( db, site_id, ac0, 62, "system_time_62_64", w62, a62, attempts, ) ids_ordered = [] for c in cmds_s: try: ids_ordered.append(int(c["id"])) except (KeyError, TypeError): ids_ordered.append(int(c.id)) if attempts < 3: for cid in ids_ordered: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cid, ) await execute_modbus_commands(ids_ordered, db) await verify_modbus_commands(ids_ordered, db, site_id) else: logger.critical( "[cmd clock] 3 failed verify attempts (62–64); režim se nemění automaticky" ) site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id) await notify_modbus_clock_verify_exhausted( db, site_id, site["code"] if site else str(site_id), ac0, (w62, w63, w64), (a62, a63, a64), ) return False async def verify_modbus_commands( command_ids: list[int], db: asyncpg.Connection, site_id: int, ) -> bool: """ Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write. Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145) → SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění. """ from services.notification_service import notify_modbus_mismatch inv_cfg = await _load_inverter_config(site_id, db) async def _apply_verify_result( cmd: asyncpg.Record, actual_i: int, *, client: Any, unit: int, ) -> bool: """Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN).""" reg = int(cmd["register"]) cmd_id = int(cmd["id"]) if reg in DEYE_CLOCK_REGS: asset_id = int(cmd["asset_id"]) host = str(cmd["device_host"]) port_i = int(cmd["device_port"]) uid = int(cmd["device_unit_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port_i, uid, db ) if not bundle: bundle = [cmd] try: cvals = await client.read_holding_registers(62, 3, uid) except Exception as e: logger.error( "verify clock guard read 62–64 failed (reg 0x%04X): %s", reg, e ) return False if len(cvals) != 3: logger.error( "verify clock guard: expected 3 regs, got %s", len(cvals) ) return False logger.warning( "Clock register 0x%04X reached strict verify path; using tolerant 62–64 bundle", reg, ) return await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) expected_i = int(cmd["value_to_write"]) matches = actual_i == expected_i if reg == 178: first_178 = int(actual_i) second_178: int | None = None if not _deye_reg178_verify_match(expected_i, first_178): try: r178 = await client.read_holding_registers(178, 1, unit) if r178 and len(r178) >= 1: second_178 = int(r178[0]) except Exception as e: logger.warning( "[cmd %s] reg178 double-read failed: %s", cmd_id, e ) matches, actual_i = _deye_reg178_verify_with_double_read( expected_i, first_178, second_178 ) if ( matches and second_178 is not None and not _deye_reg178_verify_match(expected_i, first_178) ): logger.info( "[cmd %s] reg178 double-read recovered: first=%s second=%s", cmd_id, first_178, second_178, ) if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None: matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_i, matches, cmd_id, ) if not matches: logger.error( "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, ( " (reg178 mask 0x%04X)" % REG178_VERIFY_MASK if reg == 178 else "" ), ) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id ) attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0 await notify_modbus_mismatch( db, site_id, cmd["asset_code"], reg, cmd["register_name"] or "", expected_i, actual_i, attempts, ) if attempts < 3: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cmd_id, ) await execute_modbus_commands([cmd_id], db) await verify_modbus_commands([cmd_id], db, site_id) else: if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): logger.critical( "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", cmd_id, ) await _switch_to_self_sustain( site_id, db, reason=( f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} " f"reg 0x{reg:04X}" ), ) else: logger.warning( "[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X " "(no mode change): %s", cmd_id, reg, cmd["asset_code"], ) return False if reg == 178 and actual_i != expected_i: logger.info( "[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, ) else: logger.info( "[cmd %s] verified OK: %s 0x%04X=%s", cmd_id, cmd["asset_code"], reg, actual_i, ) return True cmds: list[asyncpg.Record] = [] for cmd_id in command_ids: cmd = await db.fetchrow( "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id ) if cmd is not None and cmd["status"] == "written": cmds.append(cmd) if not cmds: return True by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) for cmd in cmds: by_gw[ (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) all_ok = True for (host, port, unit), group in by_gw.items(): client = await get_modbus_client(host, port) clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS] rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS] if clock_cmds: asset_id = int(clock_cmds[0]["asset_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port, unit, db ) if not bundle: bundle = clock_cmds try: cvals = await client.read_holding_registers(62, 3, unit) except Exception as e: logger.error("verify clock read 62–64 failed: %s", e) all_ok = False else: if len(cvals) != 3: logger.error( "verify clock read: expected 3 regs, got %s", len(cvals) ) all_ok = False else: matched = await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) if not matched: all_ok = False for run in _modbus_command_contiguous_runs(rest): start_reg = int(run[0]["register"]) n = len(run) try: values = await client.read_holding_registers(start_reg, n, unit) except Exception as e: logger.error( "verify batch read 0x%04X count=%s failed: %s", start_reg, n, e ) all_ok = False continue if len(values) != n: logger.error( "verify read 0x%04X: expected %s regs, got %s", start_reg, n, len(values), ) all_ok = False continue for cmd, actual in zip(run, values): matched = await _apply_verify_result( cmd, int(actual), client=client, unit=unit ) if not matched: all_ok = False return all_ok async def _fetch_operating_mode(site_id: int, db: asyncpg.Connection) -> OperatingModeInfo | None: sql = """ SELECT som.mode_code, omd.battery_mode, omd.grid_mode, omd.ev_enabled, omd.heat_pump_enabled, omd.loxone_mode_value, som.valid_until FROM ems.site_operating_mode som JOIN ems.operating_mode_def omd ON omd.code = som.mode_code WHERE som.site_id = $1 """ row = await db.fetchrow(sql, site_id) if row is None: return None vu = row["valid_until"] if vu is not None: now_utc = datetime.now(timezone.utc) if vu.tzinfo is None: vu = vu.replace(tzinfo=timezone.utc) if vu <= now_utc: exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()") from services.notification_service import notify_operating_mode_changed for er in exp_rows: await notify_operating_mode_changed( str(er["site_code"]), str(er["old_mode"]), str(er["new_mode"]), "system:expiry", "Automatické vypršení dočasného režimu", ) row = await db.fetchrow(sql, site_id) if row is None: return None return OperatingModeInfo( mode_code=row["mode_code"], battery_mode=row["battery_mode"], grid_mode=row["grid_mode"], ev_enabled=bool(row["ev_enabled"]), heat_pump_enabled_def=bool(row["heat_pump_enabled"]), loxone_mode_value=int(row["loxone_mode_value"]), ) async def _get_current_soc(site_id: int, db: asyncpg.Connection) -> int: soc = await db.fetchval( """ SELECT battery_soc_percent FROM ems.telemetry_inverter WHERE site_id = $1 AND battery_soc_percent IS NOT NULL ORDER BY measured_at DESC LIMIT 1 """, site_id, ) return int(soc) if soc is not None else 50 async def _load_inverter_config( site_id: int, db: asyncpg.Connection ) -> InverterConfig | None: row = await db.fetchrow( """ SELECT ai.id, ai.code, coalesce(ai.manufacturer, '') AS manufacturer, coalesce(ems.fn_inverter_pv_a_max_w(ai.id), 0) AS pv_a_cap_w, se.host, se.port, se.unit_id, sgc.max_export_power_w, sgc.max_import_power_w, sgc.no_export, ai.max_battery_charge_w, ai.max_battery_discharge_w, ab.min_soc_percent, ab.reserve_soc_percent, ab.max_soc_percent, ab.usable_capacity_wh, ai.deye_last_system_time_sync_minute, ai.deye_last_system_time_sync_at, ai.deye_last_tou_inactive_write_prague_date, ai.deye_tou_inactive_signature, COALESCE(ai.deye_zero_export_mode, 1) AS deye_zero_export_mode, COALESCE(ai.deye_gen_microinverter_cutoff_enabled, false) AS deye_gen_microinverter_cutoff_enabled, COALESCE( ai.deye_register_max_charge_a, FLOOR( LEAST( COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w), ai.max_battery_charge_w )::numeric / 51.2 )::int ) AS max_charge_a, COALESCE( ai.deye_register_max_discharge_a, FLOOR( LEAST( COALESCE(ab.bms_max_discharge_w, ai.max_battery_discharge_w), ai.max_battery_discharge_w )::numeric / 51.2 )::int ) AS max_discharge_a FROM ems.asset_inverter ai JOIN ems.site_endpoint se ON se.id = ai.endpoint_id JOIN ems.asset_battery ab ON ab.inverter_id = ai.id LEFT JOIN ems.site_grid_connection sgc ON sgc.site_id = ai.site_id WHERE ai.site_id = $1 AND ai.active = true AND ai.controllable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' ORDER BY ai.id LIMIT 1 """, site_id, ) if row is None: return None mc = row["max_charge_a"] md = row["max_discharge_a"] max_charge_a = int(mc) if mc is not None else 0 max_discharge_a = int(md) if md is not None else 0 # Firmware Deye často drží max 350 A — vyšší hodnota z DB → mismatch 351 vs 350. max_charge_a = min(max_charge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) max_discharge_a = min(max_discharge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) port = int(row["port"] or 502) uid = int(row["unit_id"] if row["unit_id"] is not None else 1) return InverterConfig( id=int(row["id"]), code=row["code"], host=row["host"], port=port, unit_id=uid, max_export_power_w=int(row["max_export_power_w"]) if row["max_export_power_w"] is not None else None, max_import_power_w=int(row["max_import_power_w"]) if row["max_import_power_w"] is not None else None, no_export=bool(row["no_export"] or False), max_battery_charge_w=int(row["max_battery_charge_w"]) if row["max_battery_charge_w"] is not None else None, max_battery_discharge_w=int(row["max_battery_discharge_w"]) if row["max_battery_discharge_w"] is not None else None, min_soc_percent=int(round(float(row["min_soc_percent"]))) if row["min_soc_percent"] is not None else None, reserve_soc_percent=int(row["reserve_soc_percent"]) if row["reserve_soc_percent"] is not None else None, max_soc_percent=int(row["max_soc_percent"]) if row["max_soc_percent"] is not None else None, usable_capacity_wh=int(row["usable_capacity_wh"]) if row["usable_capacity_wh"] is not None else None, max_charge_a=max_charge_a, max_discharge_a=max_discharge_a, deye_last_system_time_sync_minute=row["deye_last_system_time_sync_minute"], deye_last_system_time_sync_at=row["deye_last_system_time_sync_at"], deye_last_tou_inactive_write_prague_date=row[ "deye_last_tou_inactive_write_prague_date" ], deye_tou_inactive_signature=row["deye_tou_inactive_signature"], deye_zero_export_mode=int(row["deye_zero_export_mode"]), deye_gen_microinverter_cutoff_enabled=bool(row["deye_gen_microinverter_cutoff_enabled"] or False), manufacturer=str(row["manufacturer"] or ""), pv_a_cap_w=int(row["pv_a_cap_w"] or 0), ) def _deye_system_time_register_rows() -> tuple[datetime, list[tuple[int, str, int]]]: """Hodnoty pro reg 62–64 (Europe/Prague); sekundy v reg 64 = 0 (stabilnější zápis).""" now = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0) reg62 = ((now.year - 2000) << 8) | now.month reg63 = (now.day << 8) | now.hour reg64 = (now.minute << 8) | 0 rows = [ (62, "", reg62), (63, "", reg63), (64, "", reg64), ] return now, rows def _deye_time_point_rows( slot_index: int, time_hhmm: int, power_w: int, soc_pct: int, grid_charge: bool, ) -> list[tuple[int, str, int]]: g = 1 if grid_charge else 0 return [ (148 + slot_index, "", time_hhmm), (154 + slot_index, "", power_w), (166 + slot_index, "", soc_pct), (172 + slot_index, "", g), ] async def _fetch_plan_row_for_slot_offset( site_id: int, db: asyncpg.Connection, slot_offset: int ) -> asyncpg.Record | None: """Řádek plánu pro slot z ems.fn_planning_interval_at_offset (jsonb → Record-like dict).""" raw = await db.fetchval( """ select ems.fn_planning_interval_at_offset($1::int, $2::int) """, site_id, slot_offset, ) if raw is None: return None data = raw if isinstance(raw, dict) else json.loads(raw) if not data: return None return _DictRecord(data) async def _fetch_max_charge_power_w(site_id: int, db: asyncpg.Connection) -> int: v = await db.fetchval( "select ems.fn_planning_max_effective_charge_w($1::int)", site_id, ) return int(v or 0) class _DictRecord: """Minimální asyncpg Record kompatibilita pro dict z jsonb.""" __slots__ = ("_d",) def __init__(self, d: dict[str, Any]) -> None: self._d = d def __getitem__(self, k: str) -> Any: return self._d[k] def get(self, k: str, default: Any = None) -> Any: return self._d.get(k, default) def __contains__(self, k: str) -> bool: return k in self._d def _build_setpoints( mode: OperatingModeInfo, pi: asyncpg.Record | None, *, pv_a_cap_w: int = 0, inverter_manufacturer: str = "", ) -> ControlSetpoints | None: code = mode.mode_code if code == "MANUAL": return None if code == "AUTO": if pi is None: return None grid_sp = int(pi["grid_setpoint_w"] or 0) ev1_w = int(pi["ev1_setpoint_w"] or 0) if "ev1_setpoint_w" in pi else 0 ev2_w = int(pi["ev2_setpoint_w"] or 0) if "ev2_setpoint_w" in pi else 0 hp_en = bool(pi["heat_pump_enabled"]) tgt = pi["battery_soc_target_pct"] target_soc = int(round(float(tgt))) if tgt is not None else None pm_raw = pi.get("deye_physical_mode") pm: str | None = str(pm_raw).strip().upper() if pm_raw is not None else None sell_raw = pi.get("effective_sell_price") sell_f: float | None = float(sell_raw) if sell_raw is not None else None # Záporný výkup sám o sobě neblokuje export, pokud plán export explicitně žádá (soulad s LP). export_ban = sell_f is not None and float(sell_f) < 0 and grid_sp >= 0 gen_cutoff_raw = pi.get("deye_gen_cutoff_enabled") gen_cutoff = bool(gen_cutoff_raw) if gen_cutoff_raw is not None else False pv_a_allowed: int | None = None if (inverter_manufacturer or "").strip().lower() == "deye" and int(pv_a_cap_w) > 0: forecast = int(pi.get("pv_a_forecast_solver_w") or 0) curtail = int(pi.get("pv_a_curtailed_w") or 0) pv_a_allowed = compute_pv_a_reg340_max_solar_w(int(pv_a_cap_w), forecast, curtail) # Home-01 strategie: pokud jsou zároveň buy<0 i sell<0 a PV B vyrábí (necurtailable), # chceme držet baterii „prázdnější“ pro PV B / další záporný nákup a PV A raději odstavit # i když forecast PV A je nulový (predikce/telemetrie může být odpojená). buy_raw = pi.get("effective_buy_price") buy_f: float | None = float(buy_raw) if buy_raw is not None else None pv_b = int(pi.get("pv_b_forecast_solver_w") or 0) if ( buy_f is not None and sell_f is not None and float(buy_f) < 0.0 and float(sell_f) < 0.0 and pv_b > 0 ): pv_a_allowed = 0 return ControlSetpoints( battery_w=int(pi["battery_setpoint_w"] or 0), grid_export_limit=abs(min(grid_sp, 0)), ev1_current_a=watts_to_amps(ev1_w, phases=3), ev2_current_a=watts_to_amps(ev2_w, phases=1), heat_pump_enable=hp_en, grid_setpoint_w=grid_sp, ev1_power_w=ev1_w, ev2_power_w=ev2_w, target_soc_pct=target_soc, deye_physical_mode=pm, export_ban=bool(export_ban), deye_gen_cutoff_enabled=bool(gen_cutoff), effective_sell_price_czk_kwh=sell_f, pv_a_allowed_w=pv_a_allowed, ) if code == "SELF_SUSTAIN": return ControlSetpoints( battery_w=None, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, self_sustain_local_use=True, ) if code == "CHARGE_CHEAP": # max_charge doplníme v export_setpoints z DB return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, ) if code == "PRESERVE": return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, lock_battery=True, ) logger.warning("Unknown mode_code %s for site export, skipping", code) return None def _apply_price_failsafe_guard( site_id: int, mode: OperatingModeInfo, pi: asyncpg.Record | None, sp: ControlSetpoints, ) -> ControlSetpoints: if mode.mode_code != "AUTO" or pi is None: return sp if "is_predicted_price" not in pi or not bool(pi["is_predicted_price"]): return sp logger.warning( "control export site=%s: AUTO slot uses predicted price -> forcing PASSIVE no-export guard", site_id, ) return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=sp.ev1_current_a, ev2_current_a=sp.ev2_current_a, heat_pump_enable=sp.heat_pump_enable, grid_setpoint_w=max(0, int(sp.grid_setpoint_w or 0)), ev1_power_w=sp.ev1_power_w, ev2_power_w=sp.ev2_power_w, target_soc_pct=sp.target_soc_pct, effective_sell_price_czk_kwh=sp.effective_sell_price_czk_kwh, pv_a_allowed_w=sp.pv_a_allowed_w, ) def _deye_reg143_export_w(no_export: bool, max_export_power_w: int | None) -> int: """Reg 143 – max export W z DB (např. SUN-20K / home-01 = 13 500 W).""" if no_export: return 0 return max(0, int(max_export_power_w or 0)) def _clamp_deye_tou_soc_pct(pct: int) -> int: return max(5, min(95, pct)) def _clamp_deye_tou_soc_pct_hi(pct: int, hi: int) -> int: """Stejné dolní omezení 5 % jako u TOU; horní mez z parametru (např. 100 u priority baterie).""" return max(5, min(int(hi), int(pct))) def _deye_tou_min_soc_pct(inv: InverterConfig) -> int: if inv.min_soc_percent is not None: return _clamp_deye_tou_soc_pct(int(inv.min_soc_percent)) return 10 def _deye_tou_reserve_soc_pct(inv: InverterConfig) -> int: if inv.reserve_soc_percent is not None: return _clamp_deye_tou_soc_pct(int(inv.reserve_soc_percent)) return 20 def _deye_passive_tou_battery_soc_pct( inv: InverterConfig, setpoints: ControlSetpoints, ) -> int: """ Hodnota SOC u Deye TOU řádku (reg 166+) ve fyzickém PASSIVE. Na home-01 Deye interpretuje TOU % jako „kam má směřovat využití baterie“: je-li zapsané procento **nižší než skutečný SoC**, přebytek FVE míří spíš do sítě. Při **záporné vykupní** nebo **plánovaném nabíjení** (kladný ``battery_w``) EMS zapíše **100 %** do TOU (signál střídači „ber přebytek do baterie v celém rozsahu“). **``max_soc_percent`` v DB** je odděleně: horní limit pro **plánovač / Wh bilance** (denní provoz, viz komentář sloupce), **nikoli** časové „do kdy“. Jinak zůstane provozní podlaha ``min_soc_percent`` (typicky nízká % → přetok do sítě možný dle chování Deye). Režim **SELF_SUSTAIN** (``self_sustain_local_use``): vždy ``min_soc_percent`` — nízké TOU drží prioritu „baterie jako buffer“ při plném reg. 108/109 a reg. 142 zero-export; neaplikuje se sem logika 100 % podle ceny (LP se v SELF_SUSTAIN nepoužívá). """ mn = _deye_tou_min_soc_pct(inv) if setpoints.self_sustain_local_use: return mn bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w) sell = setpoints.effective_sell_price_czk_kwh want_battery_priority = bat_w > 0 or (sell is not None and float(sell) < 0) if not want_battery_priority: return mn return _clamp_deye_tou_soc_pct_hi(DEYE_TOU_SOC_PASSIVE_BATTERY_PRIORITY_PCT, hi=100) def _deye_zero_export_amps_for_passive( grid_w: int, bat_w: int, max_charge_a: int, max_discharge_a: int, ) -> tuple[int, int]: """ PASSIVE (zero export k CT/zátěži, reg. 142 dle DB): výchozí plné 108/109. - Export v plánu (grid_w < 0) a žádné plánované vybíjení (bat_w >= 0): **108 = 0** — nepřebírat přebytek FVE do baterie, ať může jít přetok do sítě. - Import v plánu (grid_w > 0) a žádné plánované nabíjení (bat_w <= 0): **109 = 0** — nevybíjet baterii, odběr ze sítě. """ if grid_w < 0 and bat_w >= 0: return 0, max_discharge_a if grid_w > 0 and bat_w <= 0: return max_charge_a, 0 return max_charge_a, max_discharge_a def get_deye_mode(setpoints: ControlSetpoints) -> str: """ Fyzický režim Deye: SELL | CHARGE | PASSIVE. Primárně explicitně z plánu (`setpoints.deye_physical_mode`), fallback jen ze znamének (viz ``docs/04-modules/operating-modes.md``): - **CHARGE** — ``battery_w`` > 0 **a** ``grid_setpoint_w`` > 0 (nabíjení ze sítě + import v plánu). - **SELL** — ``grid_setpoint_w`` < 0 **a** ``battery_w`` < 0 (export + vybíjení baterie v plánu). - **PASSIVE** (ZERO) — vše ostatní; reg. **108/109** dle ``_deye_zero_export_amps_for_passive``. ``battery_w=None`` (SELF_SUSTAIN) → bat_w 0 → typicky PASSIVE; v ``write_inverter_setpoints`` má SELF_SUSTAIN vlastní větev (108/109 max). """ pm = (setpoints.deye_physical_mode or "").strip().upper() if pm in {"PASSIVE", "SELL", "CHARGE"}: return pm grid_w = int(setpoints.grid_setpoint_w or 0) bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w) if bat_w > 0 and grid_w > 0: return "CHARGE" if grid_w < 0 and bat_w < 0: return "SELL" return "PASSIVE" def _deye_tou_params( setpoints: ControlSetpoints, inv: InverterConfig, ) -> tuple[int, int, bool]: """ Parametry jednoho Deye time pointu: výkon W, SOC % (TOU reg 166+), grid_charge. Ve PASSIVE viz _deye_passive_tou_battery_soc_pct (min vs. plný max z DB). """ max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) tp_discharge_w = 0 if setpoints.lock_battery else max_batt_w_discharge tou_min = _deye_tou_min_soc_pct(inv) tou_reserve = _deye_tou_reserve_soc_pct(inv) if setpoints.lock_battery: return tp_discharge_w, tou_min, False deye_mode = get_deye_mode(setpoints) if deye_mode == "CHARGE": raw_bat = setpoints.battery_w battery_w = int(raw_bat) if raw_bat is not None else 0 cap = int(inv.max_soc_percent) if inv.max_soc_percent is not None else 95 target_soc = max(10, min(95, cap)) tp_charge_w = ( battery_watts_to_amps(battery_w, int(inv.max_charge_a)) * int(BATT_VOLTAGE_V) ) return tp_charge_w, target_soc, True if deye_mode == "SELL": return tp_discharge_w, tou_reserve, False tou_soc = _deye_passive_tou_battery_soc_pct(inv, setpoints) return tp_discharge_w, tou_soc, False async def write_inverter_setpoints( site_id: int, setpoints_now: ControlSetpoints, setpoints_next: ControlSetpoints | None, db: asyncpg.Connection, planning_run_id: int | None = None, ) -> str: inv = await _load_inverter_config(site_id, db) if inv is None: return "FAIL inverter: no controllable Modbus endpoint" unit_id = int(inv.unit_id if inv.unit_id is not None else 1) raw_bat = setpoints_now.battery_w grid_w = int(setpoints_now.grid_setpoint_w or 0) no_export = inv.no_export export_lim = _deye_reg143_export_w(no_export, inv.max_export_power_w) max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) tp_discharge_w = 0 if setpoints_now.lock_battery else max_batt_w_discharge tou_min_pct = _deye_tou_min_soc_pct(inv) tou_reserve_pct = _deye_tou_reserve_soc_pct(inv) try: soc_telemetry = await _get_current_soc(site_id, db) deye_mode = get_deye_mode(setpoints_now) bat_w = int(raw_bat) if raw_bat is not None else 0 if setpoints_now.lock_battery: charge_a = 0 discharge_a = 0 elif deye_mode == "CHARGE": charge_a = battery_watts_to_amps(bat_w, inv.max_charge_a) discharge_a = 0 elif deye_mode == "SELL": # Záměrný výdej baterie do sítě: plný vybíjecí proud; export strop dle plánu níže. charge_a = 0 discharge_a = int(inv.max_discharge_a) elif setpoints_now.self_sustain_local_use: # SELF_SUSTAIN: plný nabíjecí i vybíjecí proud invertoru — přebytek FVE jde do baterie, # reg. 142 = zero export to load/CT (viz selling_mode níže), ne reg. 108 = 0. charge_a = int(inv.max_charge_a) discharge_a = int(inv.max_discharge_a) else: # PASSIVE (ZERO): výchozí plné 108/109; u přetoku FVE do sítě nebo importu bez baterie viz helper. charge_a, discharge_a = _deye_zero_export_amps_for_passive( grid_w, bat_w, int(inv.max_charge_a), int(inv.max_discharge_a), ) zero_exp_mode = int(inv.deye_zero_export_mode or 1) selling_mode = 0 if deye_mode == "SELL" else zero_exp_mode solar_sell = 0 if (setpoints_now.export_ban and deye_mode != "SELL") else 1 export_limit = export_lim if deye_mode == "SELL" and grid_w < 0: export_limit = min(export_lim, max(REG143_SELL_CAP_MIN_W, abs(grid_w))) reg178_val = REG178_SELL if deye_mode == "SELL" else REG178_PASSIVE logger.info( f"[control] site={site_id} fyzický režim Deye: {deye_mode} | " f"battery_w={raw_bat!r} grid_w={grid_w} | " f"charge_a={charge_a} discharge_a={discharge_a} | " f"reg142={selling_mode} reg145={solar_sell} reg143={export_limit}W reg178={reg178_val}" ) now_cet, time_rows = _deye_system_time_register_rows() skip_time = False try: mb_clock = await get_modbus_client(inv.host, inv.port) tvals = await mb_clock.read_holding_registers( 62, 3, int(inv.unit_id if inv.unit_id is not None else 1) ) if len(tvals) == 3: skip_time = _deye_should_skip_time_sync_after_read( inv, int(tvals[0]), int(tvals[1]), int(tvals[2]) ) else: logger.warning( "Deye clock read: expected 3 registers, got %s; will sync 62–64", len(tvals), ) except Exception as e: logger.warning("Deye clock read failed (will sync 62–64): %s", e) if skip_time: logger.info( "Deye clock 62–64 skipped (drift ≤ %ss, last sync < %sh ago): %s CET", DEYE_CLOCK_DRIFT_OK_SEC, DEYE_CLOCK_RESYNC_INTERVAL_HOURS, now_cet.strftime("%Y-%m-%d %H:%M:%S"), ) else: logger.info("Deye time will sync: %s CET", now_cet.strftime("%Y-%m-%d %H:%M:%S")) registers: list[tuple[int, str, int]] = [] if skip_time else list(time_rows) sp_tp2 = setpoints_next if setpoints_next is not None else setpoints_now hh_cur = current_slot_hhmm() hh_nxt = next_slot_hhmm() p1, s1, g1 = _deye_tou_params(setpoints_now, inv) p2, s2, g2 = _deye_tou_params(sp_tp2, inv) registers.extend(_deye_time_point_rows(0, hh_cur, p1, s1, g1)) registers.extend(_deye_time_point_rows(1, hh_nxt, p2, s2, g2)) prague_date = datetime.now(PRAGUE_TZ).date() inactive_sig = ( f"{DEYE_TOU_INACTIVE_HHMM}|{tou_min_pct}|{tou_reserve_pct}|{tp_discharge_w}" ) need_inactive_tou = ( inv.deye_last_tou_inactive_write_prague_date != prague_date or inv.deye_tou_inactive_signature != inactive_sig ) if need_inactive_tou: for idx in range(2, 6): registers.extend( _deye_time_point_rows( idx, DEYE_TOU_INACTIVE_HHMM, tp_discharge_w, tou_min_pct, False ) ) else: logger.debug( "Deye TOU rows 3–6 skipped (already written today, signature unchanged)" ) registers.extend( [ (108, "", charge_a), (109, "", discharge_a), (141, "energy_mode (0)", 0), (142, "limit_control", selling_mode), (143, "", export_limit), (145, "solar_sell", solar_sell), ] ) mfr = (inv.manufacturer or "").strip().lower() if ( mfr == "deye" and int(inv.pv_a_cap_w) > 0 and setpoints_now.pv_a_allowed_w is not None ): registers.append((340, "max_solar_power_w", int(setpoints_now.pv_a_allowed_w))) # Reg 178: bitové pole. Nastavujeme bits4–5 (peak shaving) vždy; bits0–1 (MI export cutoff) jen pokud feature. # Ostatní bity musí zůstat zachované → read-modify-write. try: mb178 = await get_modbus_client(inv.host, inv.port) r178 = await mb178.read_holding_registers(178, 1, unit_id) if not r178 or len(r178) < 1: raise OSError(f"reg178 read returned {len(r178) if r178 is not None else None} values") current_178 = int(r178[0]) peak_bits = int(reg178_val) & int(REG178_VERIFY_MASK) if inv.deye_gen_microinverter_cutoff_enabled: want_cutoff = bool(setpoints_now.deye_gen_cutoff_enabled) and deye_mode != "SELL" mi_bits = ( REG178_MI_EXPORT_ENABLE if want_cutoff else REG178_MI_EXPORT_DISABLE ) else: mi_bits = int(current_178) & int(REG178_MI_EXPORT_MASK) new_178 = ( (int(current_178) & ~int(REG178_VERIFY_MASK_COMBINED)) | int(peak_bits) | int(mi_bits) ) registers.append((178, "control_board_special_1", int(new_178))) logger.info( "[control] %s: reg178 (control_board_special_1) old=%s new=%s peak_bits=0x%04X mi_bits=%s", inv.code, current_178, new_178, int(peak_bits), int(mi_bits), ) except Exception as e: logger.warning("[control] %s: reg178 RMW failed (skip reg178 write): %s", inv.code, e) logger.info( "[control] %s: deye_mode=%s charge=%sA discharge=%sA " "reg142=%s reg145=%s export=%sW " "tp1=%s tp2=%s soc=%s%% (batt=%r grid=%sW)", inv.code, deye_mode, charge_a, discharge_a, selling_mode, solar_sell, export_limit, hh_cur, hh_nxt, soc_telemetry, raw_bat, grid_w, ) last_verified = await _fetch_last_verified_inverter_registers(site_id, inv.id, db) registers, skipped_unchanged = _drop_registers_matching_last_verified( registers, last_verified ) if skipped_unchanged: logger.info( "[control] %s: skip %s registers (value equals last verified): %s", inv.code, len(skipped_unchanged), skipped_unchanged[:24], ) if not registers: logger.info( "[control] %s: all Deye holding regs match last verified, no Modbus write", inv.code, ) if need_inactive_tou: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_tou_inactive_write_prague_date = $1, deye_tou_inactive_signature = $2 WHERE id = $3 """, prague_date, inactive_sig, inv.id, ) return ( f"OK inverter: batt_w={raw_bat!r} (no changes vs last verified Modbus snapshot)" ) will_write_inactive = any( int(r) in _DEYE_INACTIVE_TOU_REGISTERS for r, _, _ in registers ) cmd_ids = await create_modbus_commands( site_id, planning_run_id, "inverter", inv.id, inv.code, inv.host, inv.port, inv.unit_id, registers, db, deye_physical_mode=deye_mode, ) if not await execute_modbus_commands(cmd_ids, db): return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)" logger.info("[control] Inverter %s journal write OK", inv.code) will_write_time = any(int(r) in DEYE_CLOCK_REGS for r, _, _ in registers) if will_write_time: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_system_time_sync_minute = $1, deye_last_system_time_sync_at = now() WHERE id = $2 """, _prague_minute_start_utc(), inv.id, ) if need_inactive_tou or will_write_inactive: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_tou_inactive_write_prague_date = $1, deye_tou_inactive_signature = $2 WHERE id = $3 """, prague_date, inactive_sig, inv.id, ) except Exception as e: return f"FAIL inverter: {inv.code}: {e}" return ( f"OK inverter: batt_w={raw_bat!r} " f"(time points + FC 0x10: 108/109/141/142/178/143/145/340 dle plánu)" ) async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict[str, Any]: """ Živé čtení holding registrů Deye 108, 109, 141–145, 178, 191, 340 (stejné TCP spojení jako telemetrie/export). Vše pod jedním mutexem + sdružené FC3 bloky — mezi jednotlivými read_register dřív telemetrie střídavě brala lock a RS485 brány házely cizí transaction_id / I/O timeouty. """ inv = await _load_inverter_config(site_id, db) if inv is None: raise ValueError("no controllable Modbus inverter for site") uid = int(inv.unit_id) client = await get_modbus_client(inv.host, inv.port) read_at = datetime.now(timezone.utc) try: async with client.batch(uid) as mb: b108 = await mb.read_holding_registers(108, 2) b141 = await mb.read_holding_registers(141, 5) r178 = await mb.read_holding_registers(178, 1) r191 = await mb.read_holding_registers(191, 1) r340 = await mb.read_holding_registers(340, 1) r108, r109 = b108[0], b108[1] r141, r142, r143 = b141[0], b141[1], b141[2] r145 = b141[4] r178 = r178[0] r191 = r191[0] r340v = int(r340[0]) if r340 and len(r340) >= 1 else 0 except Exception: logger.exception("read_deye_registers_live site=%s failed", site_id) raise return { "reg108_charge_a": int(r108), "reg109_discharge_a": int(r109), "reg141_energy_mode": int(r141), "reg142_limit_control": int(r142), "reg143_export_limit_w": int(r143), "reg145_solar_sell": int(r145), "reg178_peak_shaving_switch": int(r178), "reg178_control_board_special_1": int(r178), "reg178_mi_export_cutoff_bits": int(r178) & int(REG178_MI_EXPORT_MASK), "reg178_mi_export_cutoff_is_on": (int(r178) & int(REG178_MI_EXPORT_MASK)) == int(REG178_MI_EXPORT_ENABLE), "reg191_peak_shaving_w": int(r191), "reg340_max_solar_power_w": int(r340v), "read_at": read_at.isoformat(), } def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: c = (charger_code or "").strip().lower() if c == "ev-charger-1": a = sp.ev1_current_a elif c == "ev-charger-2": a = sp.ev2_current_a elif c.endswith("-1") or c == "ev1": a = sp.ev1_current_a elif c.endswith("-2") or c == "ev2": a = sp.ev2_current_a else: a = 0 if a < 6: a = 0 return a async def write_ev_setpoints(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str: rows = await db.fetch( """ SELECT ec.code, se.host, se.port, se.unit_id FROM ems.asset_ev_charger ec JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 AND ec.schedulable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' ORDER BY ec.code """, site_id, ) if not rows: return "OK EV: no schedulable chargers" for row in rows: code = row["code"] current_a = _current_limit_for_charger(code, setpoints) logger.info( "EV setpoint [%s]: %sA (TODO: Modbus registers)", code, current_a, ) return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)" async def write_heat_pump_setpoint(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str: rows = await db.fetch( """ SELECT hp.code, se.host, se.port, se.unit_id FROM ems.asset_heat_pump hp JOIN ems.site_endpoint se ON se.id = hp.endpoint_id WHERE hp.site_id = $1 AND hp.schedulable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, ) if not rows: return "OK heat pump: no schedulable unit" for row in rows: logger.info( "HP setpoint [%s]: enable=%s (TODO: Modbus registers)", row["code"], setpoints.heat_pump_enable, ) return "OK heat pump: logged (Modbus TODO)" async def send_loxone_setpoints( site_id: int, setpoints: ControlSetpoints, mode: OperatingModeInfo, db: asyncpg.Connection, ) -> str: endpoint = await db.fetchrow( """ SELECT host, port, protocol FROM ems.site_endpoint WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true ORDER BY id LIMIT 1 """, site_id, ) if not endpoint: return "OK Loxone: no endpoint, skipped" proto = (endpoint["protocol"] or "http").lower() if proto not in ("http", "https"): proto = "http" host = endpoint["host"] port = int(endpoint["port"] or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}/dev/sps/io" settings = get_settings() user = settings.loxone_user or os.getenv("LOXONE_USER") or "" password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" auth = (user, password) if user else None batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w) paths: list[tuple[str, int]] = [ (f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value), (f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display), (f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w), (f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w), (f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w), (f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", 1 if setpoints.heat_pump_enable else 0), ] errs: list[str] = [] try: async with httpx.AsyncClient(timeout=5.0) as client: for url, _ in paths: try: r = await client.get(url, auth=auth) r.raise_for_status() except Exception as e: errs.append(f"{url!s}: {e}") except Exception as e: return f"FAIL Loxone: client {e}" if errs: return "FAIL Loxone: " + "; ".join(errs[:3]) return "OK Loxone: all virtual inputs updated" async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None: mode = await _fetch_operating_mode(site_id, db) if mode is None: logger.warning("control export site=%s: no operating mode row", site_id) return if mode.mode_code == "MANUAL": logger.info("control export site=%s: MANUAL, skip writes", site_id) return try: inv_for_pv = await _load_inverter_config(site_id, db) cap_pv = int(inv_for_pv.pv_a_cap_w) if inv_for_pv is not None else 0 mfr_pv = (inv_for_pv.manufacturer or "") if inv_for_pv is not None else "" pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0) pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1) sp_now = _build_setpoints( mode, pi_now, pv_a_cap_w=cap_pv, inverter_manufacturer=mfr_pv ) sp_next = _build_setpoints( mode, pi_next, pv_a_cap_w=cap_pv, inverter_manufacturer=mfr_pv ) if mode.mode_code == "AUTO" and sp_now is None: if pi_now is None: logger.warning( "control export site=%s: AUTO but no planning_interval for current slot, skip", site_id, ) return if sp_now is None: logger.warning( "control export site=%s: no setpoints for mode %s, skip", site_id, mode.mode_code, ) return if mode.mode_code == "CHARGE_CHEAP": max_ch = await _fetch_max_charge_power_w(site_id, db) # Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB. pw = max(1, int(max_ch)) sp_now = ControlSetpoints( battery_w=pw, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=pw, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, effective_sell_price_czk_kwh=None, ) sp_next = sp_now else: sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now) if sp_next is not None: sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next) planning_run_id = await db.fetchval( """ SELECT id FROM ems.planning_run WHERE site_id = $1 AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, site_id, ) if planning_run_id is not None: planning_run_id = int(planning_run_id) try: inv_res = await write_inverter_setpoints( site_id, sp_now, sp_next, db, planning_run_id=planning_run_id ) except Exception as e: logger.error("inverter write failed: %s", e) inv_res = f"FAIL inverter: {e}" try: ev_res = await write_ev_setpoints(site_id, sp_now, db) except Exception as e: logger.error("ev write failed: %s", e) ev_res = f"FAIL ev: {e}" try: hp_res = await write_heat_pump_setpoint(site_id, sp_now, db) except Exception as e: logger.error("hp write failed: %s", e) hp_res = f"FAIL heat pump: {e}" try: lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db) except Exception as e: logger.error("loxone write failed: %s", e) lox_res = f"FAIL Loxone: {e}" results = list( zip( ("inverter", "ev", "heat_pump", "loxone"), (inv_res, ev_res, hp_res, lox_res), ) ) for name, res in results: if isinstance(res, Exception): logger.error("control export site=%s %s: FAIL %s", site_id, name, res) elif isinstance(res, str) and res.startswith("FAIL"): logger.error("control export site=%s %s: %s", site_id, name, res) else: logger.info("control export site=%s %s: %s", site_id, name, res) finally: try: await enqueue_site_signals(site_id, db) except Exception as e: logger.warning( "control export site=%s: signal enqueue failed: %s", site_id, e )