"""Export plánovaných setpointů na Modbus (Deye, EV, TČ) a HTTP do Loxone.""" from __future__ import annotations import logging from collections import defaultdict from typing import Any from datetime import datetime, timezone import asyncpg from services.control.deye_helpers import ( BATT_VOLTAGE_V, DEYE_CLOCK_DRIFT_OK_SEC, DEYE_CLOCK_REGS, DEYE_CLOCK_RESYNC_INTERVAL_HOURS, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC, # noqa: F401 - re-export for compatibility DEYE_CRITICAL_REGS_SELF_SUSTAIN, # noqa: F401 - re-export for compatibility DEYE_REGISTER_NAMES, # noqa: F401 - re-export for compatibility DEYE_TOU_INACTIVE_HHMM, DEYE_TOU_POWER_REGS, PRAGUE_TZ, REG143_SELL_CAP_MIN_W, REG178_MI_EXPORT_DISABLE, REG178_MI_EXPORT_ENABLE, REG178_MI_EXPORT_MASK, REG178_PASSIVE, REG178_SELL, REG178_VERIFY_MASK, REG178_VERIFY_MASK_COMBINED, _DEYE_INACTIVE_TOU_REGISTERS, _deye_clock_registers_verify_match, _deye_reg178_verify_match, _deye_reg178_verify_with_double_read, _deye_registers_to_prague_datetime, # noqa: F401 - re-export for compatibility _deye_should_skip_time_sync_after_read, _deye_tou_power_verify_match, _prague_minute_start_utc, battery_watts_to_amps, compute_pv_a_reg340_max_solar_w, current_slot_hhmm, deye_reg_triggers_self_sustain_after_verify_exhaust, # noqa: F401 - re-export next_slot_hhmm, watts_to_amps, ) from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo from services.control.modbus_journal import ( _drop_registers_matching_last_verified, _fetch_last_verified_inverter_registers, _fetch_written_deye_clock_commands, _modbus_command_contiguous_runs, create_modbus_commands, execute_modbus_commands, ) from services.control.outputs import ( _current_limit_for_charger, send_loxone_setpoints, write_ev_setpoints, write_heat_pump_setpoint, ) from services.control.repository import ( _fetch_max_charge_power_w, _fetch_operating_mode, _fetch_plan_row_for_slot_offset, _get_current_soc, _load_inverter_config, ) from services.control.setpoints import ( _DictRecord, _apply_price_failsafe_guard, _build_setpoints, _clamp_deye_tou_soc_pct, _deye_passive_tou_battery_soc_pct, _deye_reg143_export_w, _deye_system_time_register_rows, _deye_time_point_rows, _deye_tou_min_soc_pct, _deye_tou_params, _deye_tou_reserve_soc_pct, _deye_zero_export_amps_for_passive, get_deye_mode, ) from services.modbus_client import get_modbus_client from services.signal_service import enqueue_site_signals logger = logging.getLogger(__name__) async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None: """Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord.""" from services.notification_service import run_fn_set_mode_with_discord await run_fn_set_mode_with_discord( db, site_id, "SELF_SUSTAIN", "system:mismatch", None, reason, ) logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason) def _modbus_cmd_register(cmd: Any) -> int: """asyncpg.Record má __getitem__; objekty s atributem .register též (testy).""" try: return int(cmd["register"]) except (KeyError, TypeError): return int(cmd.register) def _deye_expected_clock_triplet_for_verify( bundle: list[asyncpg.Record], last_verified: dict[int, int], a62: int, a63: int, a64: int, ) -> tuple[int, int, int]: """ Sestaví očekávané (w62,w63,w64) pro toleranční verify. Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení (aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64). """ by_reg = {_modbus_cmd_register(c): c for c in bundle} def _vtw(c: Any) -> int: try: return int(c["value_to_write"]) except (KeyError, TypeError): return int(c.value_to_write) w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62) w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63) w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64) return (int(w62), int(w63), int(w64)) async def _verify_deye_clock_written_bundle( site_id: int, bundle: list[asyncpg.Record], a62: int, a63: int, a64: int, db: asyncpg.Connection, ) -> bool: """ Toleranční ověření pro jeden až tři řádky journalu 62–64 ve stavu written. Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech. """ from services.notification_service import ( notify_modbus_clock_verify_exhausted, notify_modbus_mismatch, ) cmds_s = sorted(bundle, key=_modbus_cmd_register) try: asset_id = int(cmds_s[0]["asset_id"]) except (KeyError, TypeError): asset_id = int(cmds_s[0].asset_id) last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db) w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64) clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64) actual_by_reg = {62: a62, 63: a63, 64: a64} for cmd in cmds_s: try: cid = int(cmd["id"]) except (KeyError, TypeError): cid = int(cmd.id) r = _modbus_cmd_register(cmd) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_by_reg[r], clock_ok, cid, ) if clock_ok: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_system_time_sync_minute = $1, deye_last_system_time_sync_at = now() WHERE id = $2 """, _prague_minute_start_utc(), asset_id, ) for cmd in cmds_s: try: cid_l = int(cmd["id"]) except (KeyError, TypeError): cid_l = int(cmd.id) try: code_l = str(cmd["asset_code"]) except (KeyError, TypeError): code_l = str(cmd.asset_code) rr = _modbus_cmd_register(cmd) logger.info( "[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s", cid_l, code_l, rr, actual_by_reg[rr], ) return True cmd0 = cmds_s[0] try: ac0 = str(cmd0["asset_code"]) except (KeyError, TypeError): ac0 = str(cmd0.asset_code) logger.error( "[cmd clock] MISMATCH %s 62–64: written=(%s,%s,%s) actual=(%s,%s,%s)", ac0, w62, w63, w64, a62, a63, a64, ) attempts = 0 for cmd in cmds_s: try: cid_q = int(cmd["id"]) except (KeyError, TypeError): cid_q = int(cmd.id) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q ) ac = int(row_ac["attempt_count"] or 0) if row_ac else 0 attempts = max(attempts, ac) await notify_modbus_mismatch( db, site_id, ac0, 62, "system_time_62_64", w62, a62, attempts, ) ids_ordered = [] for c in cmds_s: try: ids_ordered.append(int(c["id"])) except (KeyError, TypeError): ids_ordered.append(int(c.id)) if attempts < 3: for cid in ids_ordered: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cid, ) await execute_modbus_commands(ids_ordered, db) await verify_modbus_commands(ids_ordered, db, site_id) else: logger.critical( "[cmd clock] 3 failed verify attempts (62–64); režim se nemění automaticky" ) site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id) await notify_modbus_clock_verify_exhausted( db, site_id, site["code"] if site else str(site_id), ac0, (w62, w63, w64), (a62, a63, a64), ) return False async def verify_modbus_commands( command_ids: list[int], db: asyncpg.Connection, site_id: int, ) -> bool: """ Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write. Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145) → SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění. """ from services.notification_service import notify_modbus_mismatch inv_cfg = await _load_inverter_config(site_id, db) async def _apply_verify_result( cmd: asyncpg.Record, actual_i: int, *, client: Any, unit: int, ) -> bool: """Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN).""" reg = int(cmd["register"]) cmd_id = int(cmd["id"]) if reg in DEYE_CLOCK_REGS: asset_id = int(cmd["asset_id"]) host = str(cmd["device_host"]) port_i = int(cmd["device_port"]) uid = int(cmd["device_unit_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port_i, uid, db ) if not bundle: bundle = [cmd] try: cvals = await client.read_holding_registers(62, 3, uid) except Exception as e: logger.error( "verify clock guard read 62–64 failed (reg 0x%04X): %s", reg, e ) return False if len(cvals) != 3: logger.error( "verify clock guard: expected 3 regs, got %s", len(cvals) ) return False logger.warning( "Clock register 0x%04X reached strict verify path; using tolerant 62–64 bundle", reg, ) return await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) expected_i = int(cmd["value_to_write"]) matches = actual_i == expected_i if reg == 178: first_178 = int(actual_i) second_178: int | None = None if not _deye_reg178_verify_match(expected_i, first_178): try: r178 = await client.read_holding_registers(178, 1, unit) if r178 and len(r178) >= 1: second_178 = int(r178[0]) except Exception as e: logger.warning( "[cmd %s] reg178 double-read failed: %s", cmd_id, e ) matches, actual_i = _deye_reg178_verify_with_double_read( expected_i, first_178, second_178 ) if ( matches and second_178 is not None and not _deye_reg178_verify_match(expected_i, first_178) ): logger.info( "[cmd %s] reg178 double-read recovered: first=%s second=%s", cmd_id, first_178, second_178, ) if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None: matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg) await db.execute( """ UPDATE ems.modbus_command SET value_verified=$1::int, verified_at=now(), status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END WHERE id=$3::int """, actual_i, matches, cmd_id, ) if not matches: logger.error( "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, ( " (reg178 mask 0x%04X)" % REG178_VERIFY_MASK if reg == 178 else "" ), ) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id ) attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0 await notify_modbus_mismatch( db, site_id, cmd["asset_code"], reg, cmd["register_name"] or "", expected_i, actual_i, attempts, ) if attempts < 3: await db.execute( "UPDATE ems.modbus_command SET status='retrying' WHERE id=$1", cmd_id, ) await execute_modbus_commands([cmd_id], db) await verify_modbus_commands([cmd_id], db, site_id) else: if deye_reg_triggers_self_sustain_after_verify_exhaust(reg): logger.critical( "[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN", cmd_id, ) await _switch_to_self_sustain( site_id, db, reason=( f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} " f"reg 0x{reg:04X}" ), ) else: logger.warning( "[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X " "(no mode change): %s", cmd_id, reg, cmd["asset_code"], ) return False if reg == 178 and actual_i != expected_i: logger.info( "[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s", cmd_id, cmd["asset_code"], reg, expected_i, actual_i, ) else: logger.info( "[cmd %s] verified OK: %s 0x%04X=%s", cmd_id, cmd["asset_code"], reg, actual_i, ) return True cmds: list[asyncpg.Record] = [] for cmd_id in command_ids: cmd = await db.fetchrow( "SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id ) if cmd is not None and cmd["status"] == "written": cmds.append(cmd) if not cmds: return True by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list) for cmd in cmds: by_gw[ (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) all_ok = True for (host, port, unit), group in by_gw.items(): client = await get_modbus_client(host, port) clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS] rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS] if clock_cmds: asset_id = int(clock_cmds[0]["asset_id"]) bundle = await _fetch_written_deye_clock_commands( site_id, asset_id, host, port, unit, db ) if not bundle: bundle = clock_cmds try: cvals = await client.read_holding_registers(62, 3, unit) except Exception as e: logger.error("verify clock read 62–64 failed: %s", e) all_ok = False else: if len(cvals) != 3: logger.error( "verify clock read: expected 3 regs, got %s", len(cvals) ) all_ok = False else: matched = await _verify_deye_clock_written_bundle( site_id, bundle, int(cvals[0]), int(cvals[1]), int(cvals[2]), db, ) if not matched: all_ok = False for run in _modbus_command_contiguous_runs(rest): start_reg = int(run[0]["register"]) n = len(run) try: values = await client.read_holding_registers(start_reg, n, unit) except Exception as e: logger.error( "verify batch read 0x%04X count=%s failed: %s", start_reg, n, e ) all_ok = False continue if len(values) != n: logger.error( "verify read 0x%04X: expected %s regs, got %s", start_reg, n, len(values), ) all_ok = False continue for cmd, actual in zip(run, values): matched = await _apply_verify_result( cmd, int(actual), client=client, unit=unit ) if not matched: all_ok = False return all_ok async def write_inverter_setpoints( site_id: int, setpoints_now: ControlSetpoints, setpoints_next: ControlSetpoints | None, db: asyncpg.Connection, planning_run_id: int | None = None, ) -> str: inv = await _load_inverter_config(site_id, db) if inv is None: return "FAIL inverter: no controllable Modbus endpoint" unit_id = int(inv.unit_id if inv.unit_id is not None else 1) raw_bat = setpoints_now.battery_w grid_w = int(setpoints_now.grid_setpoint_w or 0) no_export = inv.no_export export_lim = _deye_reg143_export_w(no_export, inv.max_export_power_w) max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) tp_discharge_w = 0 if setpoints_now.lock_battery else max_batt_w_discharge tou_min_pct = _deye_tou_min_soc_pct(inv) tou_reserve_pct = _deye_tou_reserve_soc_pct(inv) try: soc_telemetry = await _get_current_soc(site_id, db) deye_mode = get_deye_mode(setpoints_now) bat_w = int(raw_bat) if raw_bat is not None else 0 if setpoints_now.lock_battery: charge_a = 0 discharge_a = 0 elif deye_mode == "CHARGE": charge_a = battery_watts_to_amps(bat_w, inv.max_charge_a) discharge_a = 0 elif deye_mode == "SELL": # Záměrný výdej baterie do sítě: plný vybíjecí proud; export strop dle plánu níže. charge_a = 0 discharge_a = int(inv.max_discharge_a) elif setpoints_now.self_sustain_local_use: # SELF_SUSTAIN: plný nabíjecí i vybíjecí proud invertoru — přebytek FVE jde do baterie, # reg. 142 = zero export to load/CT (viz selling_mode níže), ne reg. 108 = 0. charge_a = int(inv.max_charge_a) discharge_a = int(inv.max_discharge_a) else: # PASSIVE (ZERO): výchozí plné 108/109; u přetoku FVE do sítě nebo importu bez baterie viz helper. charge_a, discharge_a = _deye_zero_export_amps_for_passive( grid_w, bat_w, int(inv.max_charge_a), int(inv.max_discharge_a), ) zero_exp_mode = int(inv.deye_zero_export_mode or 1) selling_mode = 0 if deye_mode == "SELL" else zero_exp_mode solar_sell = 0 if (setpoints_now.export_ban and deye_mode != "SELL") else 1 export_limit = export_lim if deye_mode == "SELL" and grid_w < 0: export_limit = min(export_lim, max(REG143_SELL_CAP_MIN_W, abs(grid_w))) reg178_val = REG178_SELL if deye_mode == "SELL" else REG178_PASSIVE logger.info( f"[control] site={site_id} fyzický režim Deye: {deye_mode} | " f"battery_w={raw_bat!r} grid_w={grid_w} | " f"charge_a={charge_a} discharge_a={discharge_a} | " f"reg142={selling_mode} reg145={solar_sell} reg143={export_limit}W reg178={reg178_val}" ) now_cet, time_rows = _deye_system_time_register_rows() skip_time = False try: mb_clock = await get_modbus_client(inv.host, inv.port) tvals = await mb_clock.read_holding_registers( 62, 3, int(inv.unit_id if inv.unit_id is not None else 1) ) if len(tvals) == 3: skip_time = _deye_should_skip_time_sync_after_read( inv, int(tvals[0]), int(tvals[1]), int(tvals[2]) ) else: logger.warning( "Deye clock read: expected 3 registers, got %s; will sync 62–64", len(tvals), ) except Exception as e: logger.warning("Deye clock read failed (will sync 62–64): %s", e) if skip_time: logger.info( "Deye clock 62–64 skipped (drift ≤ %ss, last sync < %sh ago): %s CET", DEYE_CLOCK_DRIFT_OK_SEC, DEYE_CLOCK_RESYNC_INTERVAL_HOURS, now_cet.strftime("%Y-%m-%d %H:%M:%S"), ) else: logger.info("Deye time will sync: %s CET", now_cet.strftime("%Y-%m-%d %H:%M:%S")) registers: list[tuple[int, str, int]] = [] if skip_time else list(time_rows) sp_tp2 = setpoints_next if setpoints_next is not None else setpoints_now hh_cur = current_slot_hhmm() hh_nxt = next_slot_hhmm() p1, s1, g1 = _deye_tou_params(setpoints_now, inv) p2, s2, g2 = _deye_tou_params(sp_tp2, inv) registers.extend(_deye_time_point_rows(0, hh_cur, p1, s1, g1)) registers.extend(_deye_time_point_rows(1, hh_nxt, p2, s2, g2)) prague_date = datetime.now(PRAGUE_TZ).date() inactive_sig = ( f"{DEYE_TOU_INACTIVE_HHMM}|{tou_min_pct}|{tou_reserve_pct}|{tp_discharge_w}" ) need_inactive_tou = ( inv.deye_last_tou_inactive_write_prague_date != prague_date or inv.deye_tou_inactive_signature != inactive_sig ) if need_inactive_tou: for idx in range(2, 6): registers.extend( _deye_time_point_rows( idx, DEYE_TOU_INACTIVE_HHMM, tp_discharge_w, tou_min_pct, False ) ) else: logger.debug( "Deye TOU rows 3–6 skipped (already written today, signature unchanged)" ) registers.extend( [ (108, "", charge_a), (109, "", discharge_a), (141, "energy_mode (0)", 0), (142, "limit_control", selling_mode), (143, "", export_limit), (145, "solar_sell", solar_sell), ] ) if ( bool(inv.deye_reg340_pv_a_control_enabled) and int(inv.pv_a_cap_w) > 0 and setpoints_now.pv_a_allowed_w is not None ): registers.append((340, "max_solar_power_w", int(setpoints_now.pv_a_allowed_w))) # Reg 178: bitové pole. Nastavujeme bits4–5 (peak shaving) vždy; bits0–1 (MI export cutoff) jen pokud feature. # Ostatní bity musí zůstat zachované → read-modify-write. try: mb178 = await get_modbus_client(inv.host, inv.port) r178 = await mb178.read_holding_registers(178, 1, unit_id) if not r178 or len(r178) < 1: raise OSError(f"reg178 read returned {len(r178) if r178 is not None else None} values") current_178 = int(r178[0]) peak_bits = int(reg178_val) & int(REG178_VERIFY_MASK) if inv.deye_gen_microinverter_cutoff_enabled: want_cutoff = bool(setpoints_now.deye_gen_cutoff_enabled) and deye_mode != "SELL" mi_bits = ( REG178_MI_EXPORT_ENABLE if want_cutoff else REG178_MI_EXPORT_DISABLE ) else: mi_bits = int(current_178) & int(REG178_MI_EXPORT_MASK) new_178 = ( (int(current_178) & ~int(REG178_VERIFY_MASK_COMBINED)) | int(peak_bits) | int(mi_bits) ) registers.append((178, "control_board_special_1", int(new_178))) logger.info( "[control] %s: reg178 (control_board_special_1) old=%s new=%s peak_bits=0x%04X mi_bits=%s", inv.code, current_178, new_178, int(peak_bits), int(mi_bits), ) except Exception as e: logger.warning("[control] %s: reg178 RMW failed (skip reg178 write): %s", inv.code, e) logger.info( "[control] %s: deye_mode=%s charge=%sA discharge=%sA " "reg142=%s reg145=%s export=%sW " "tp1=%s tp2=%s soc=%s%% (batt=%r grid=%sW)", inv.code, deye_mode, charge_a, discharge_a, selling_mode, solar_sell, export_limit, hh_cur, hh_nxt, soc_telemetry, raw_bat, grid_w, ) last_verified = await _fetch_last_verified_inverter_registers(site_id, inv.id, db) registers, skipped_unchanged = _drop_registers_matching_last_verified( registers, last_verified ) if skipped_unchanged: logger.info( "[control] %s: skip %s registers (value equals last verified): %s", inv.code, len(skipped_unchanged), skipped_unchanged[:24], ) if not registers: logger.info( "[control] %s: all Deye holding regs match last verified, no Modbus write", inv.code, ) if need_inactive_tou: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_tou_inactive_write_prague_date = $1, deye_tou_inactive_signature = $2 WHERE id = $3 """, prague_date, inactive_sig, inv.id, ) return ( f"OK inverter: batt_w={raw_bat!r} (no changes vs last verified Modbus snapshot)" ) will_write_inactive = any( int(r) in _DEYE_INACTIVE_TOU_REGISTERS for r, _, _ in registers ) cmd_ids = await create_modbus_commands( site_id, planning_run_id, "inverter", inv.id, inv.code, inv.host, inv.port, inv.unit_id, registers, db, deye_physical_mode=deye_mode, ) if not await execute_modbus_commands(cmd_ids, db): return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)" logger.info("[control] Inverter %s journal write OK", inv.code) will_write_time = any(int(r) in DEYE_CLOCK_REGS for r, _, _ in registers) if will_write_time: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_system_time_sync_minute = $1, deye_last_system_time_sync_at = now() WHERE id = $2 """, _prague_minute_start_utc(), inv.id, ) if need_inactive_tou or will_write_inactive: await db.execute( """ UPDATE ems.asset_inverter SET deye_last_tou_inactive_write_prague_date = $1, deye_tou_inactive_signature = $2 WHERE id = $3 """, prague_date, inactive_sig, inv.id, ) except Exception as e: return f"FAIL inverter: {inv.code}: {e}" return ( f"OK inverter: batt_w={raw_bat!r} " f"(time points + FC 0x10: 108/109/141/142/178/143/145/340 dle plánu)" ) async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict[str, Any]: """ Živé čtení holding registrů Deye 108, 109, 141–145, 178, 191 a volitelně 340 (jen pokud `deye_reg340_pv_a_control_enabled`, jinak `reg340_max_solar_power_w` = null). Vše pod jedním mutexem + sdružené FC3 bloky — mezi jednotlivými read_register dřív telemetrie střídavě brala lock a RS485 brány házely cizí transaction_id / I/O timeouty. """ inv = await _load_inverter_config(site_id, db) if inv is None: raise ValueError("no controllable Modbus inverter for site") uid = int(inv.unit_id) client = await get_modbus_client(inv.host, inv.port) read_at = datetime.now(timezone.utc) try: async with client.batch(uid) as mb: b108 = await mb.read_holding_registers(108, 2) b141 = await mb.read_holding_registers(141, 5) r178 = await mb.read_holding_registers(178, 1) r191 = await mb.read_holding_registers(191, 1) if inv.deye_reg340_pv_a_control_enabled: r340 = await mb.read_holding_registers(340, 1) else: r340 = None r108, r109 = b108[0], b108[1] r141, r142, r143 = b141[0], b141[1], b141[2] r145 = b141[4] r178 = r178[0] r191 = r191[0] r340v = ( int(r340[0]) if r340 is not None and len(r340) >= 1 else None ) except Exception: logger.exception("read_deye_registers_live site=%s failed", site_id) raise return { "reg108_charge_a": int(r108), "reg109_discharge_a": int(r109), "reg141_energy_mode": int(r141), "reg142_limit_control": int(r142), "reg143_export_limit_w": int(r143), "reg145_solar_sell": int(r145), "reg178_peak_shaving_switch": int(r178), "reg178_control_board_special_1": int(r178), "reg178_mi_export_cutoff_bits": int(r178) & int(REG178_MI_EXPORT_MASK), "reg178_mi_export_cutoff_is_on": (int(r178) & int(REG178_MI_EXPORT_MASK)) == int(REG178_MI_EXPORT_ENABLE), "reg191_peak_shaving_w": int(r191), "reg340_max_solar_power_w": r340v, "read_at": read_at.isoformat(), } async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None: mode = await _fetch_operating_mode(site_id, db) if mode is None: logger.warning("control export site=%s: no operating mode row", site_id) return if mode.mode_code == "MANUAL": logger.info("control export site=%s: MANUAL, skip writes", site_id) return try: inv_for_pv = await _load_inverter_config(site_id, db) cap_pv = int(inv_for_pv.pv_a_cap_w) if inv_for_pv is not None else 0 reg340_en = ( bool(inv_for_pv.deye_reg340_pv_a_control_enabled) if inv_for_pv is not None else False ) pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0) pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1) sp_now = _build_setpoints( mode, pi_now, pv_a_cap_w=cap_pv, reg340_pv_a_control_enabled=reg340_en, ) sp_next = _build_setpoints( mode, pi_next, pv_a_cap_w=cap_pv, reg340_pv_a_control_enabled=reg340_en, ) if mode.mode_code == "AUTO" and sp_now is None: if pi_now is None: logger.warning( "control export site=%s: AUTO but no planning_interval for current slot, skip", site_id, ) return if sp_now is None: logger.warning( "control export site=%s: no setpoints for mode %s, skip", site_id, mode.mode_code, ) return if mode.mode_code == "CHARGE_CHEAP": max_ch = await _fetch_max_charge_power_w(site_id, db) # Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB. pw = max(1, int(max_ch)) sp_now = ControlSetpoints( battery_w=pw, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=pw, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, effective_sell_price_czk_kwh=None, ) sp_next = sp_now else: sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now) if sp_next is not None: sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next) planning_run_id = await db.fetchval( """ SELECT id FROM ems.planning_run WHERE site_id = $1 AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, site_id, ) if planning_run_id is not None: planning_run_id = int(planning_run_id) try: inv_res = await write_inverter_setpoints( site_id, sp_now, sp_next, db, planning_run_id=planning_run_id ) except Exception as e: logger.error("inverter write failed: %s", e) inv_res = f"FAIL inverter: {e}" try: ev_res = await write_ev_setpoints(site_id, sp_now, db) except Exception as e: logger.error("ev write failed: %s", e) ev_res = f"FAIL ev: {e}" try: hp_res = await write_heat_pump_setpoint(site_id, sp_now, db) except Exception as e: logger.error("hp write failed: %s", e) hp_res = f"FAIL heat pump: {e}" try: lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db) except Exception as e: logger.error("loxone write failed: %s", e) lox_res = f"FAIL Loxone: {e}" results = list( zip( ("inverter", "ev", "heat_pump", "loxone"), (inv_res, ev_res, hp_res, lox_res), ) ) for name, res in results: if isinstance(res, Exception): logger.error("control export site=%s %s: FAIL %s", site_id, name, res) elif isinstance(res, str) and res.startswith("FAIL"): logger.error("control export site=%s %s: %s", site_id, name, res) else: logger.info("control export site=%s %s: %s", site_id, name, res) finally: try: await enqueue_site_signals(site_id, db) except Exception as e: logger.warning( "control export site=%s: signal enqueue failed: %s", site_id, e )