"""Výpočet control setpointů a Deye TOU parametrů.""" from __future__ import annotations import logging from datetime import datetime from typing import Any from services.control.deye_helpers import ( BATT_VOLTAGE_V, PRAGUE_TZ, battery_watts_to_amps, compute_pv_a_reg340_max_solar_w, watts_to_amps, ) from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo logger = logging.getLogger(__name__) def _deye_system_time_register_rows() -> tuple[datetime, list[tuple[int, str, int]]]: """Hodnoty pro reg 62-64 (Europe/Prague); sekundy v reg 64 = 0 (stabilnější zápis).""" now = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0) reg62 = ((now.year - 2000) << 8) | now.month reg63 = (now.day << 8) | now.hour reg64 = (now.minute << 8) | 0 rows = [ (62, "", reg62), (63, "", reg63), (64, "", reg64), ] return now, rows def _deye_time_point_rows( slot_index: int, time_hhmm: int, power_w: int, soc_pct: int, grid_charge: bool, ) -> list[tuple[int, str, int]]: g = 1 if grid_charge else 0 return [ (148 + slot_index, "", time_hhmm), (154 + slot_index, "", power_w), (166 + slot_index, "", soc_pct), (172 + slot_index, "", g), ] class _DictRecord: """Minimální asyncpg Record kompatibilita pro dict z jsonb.""" __slots__ = ("_d",) def __init__(self, d: dict[str, Any]) -> None: self._d = d def __getitem__(self, k: str) -> Any: return self._d[k] def get(self, k: str, default: Any = None) -> Any: return self._d.get(k, default) def __contains__(self, k: str) -> bool: return k in self._d def _build_setpoints( mode: OperatingModeInfo, pi: Any | None, *, pv_a_cap_w: int = 0, reg340_pv_a_control_enabled: bool = False, ) -> ControlSetpoints | None: code = mode.mode_code if code == "MANUAL": return None if code == "AUTO": if pi is None: return None grid_sp = int(pi["grid_setpoint_w"] or 0) export_limit_raw = pi.get("export_limit_w") export_limit = int(export_limit_raw) if export_limit_raw is not None else abs(min(grid_sp, 0)) ev1_w = int(pi["ev1_setpoint_w"] or 0) if "ev1_setpoint_w" in pi else 0 ev2_w = int(pi["ev2_setpoint_w"] or 0) if "ev2_setpoint_w" in pi else 0 hp_en = bool(pi["heat_pump_enabled"]) tgt = pi["battery_soc_target_pct"] target_soc = int(round(float(tgt))) if tgt is not None else None pm_raw = pi.get("deye_physical_mode") pm: str | None = str(pm_raw).strip().upper() if pm_raw is not None else None sell_raw = pi.get("effective_sell_price") sell_f: float | None = float(sell_raw) if sell_raw is not None else None export_mode_raw = pi.get("export_mode") export_mode = str(export_mode_raw).strip().upper() if export_mode_raw is not None else None if export_mode == "NONE": export_limit = 0 # Záporný výkup sám o sobě neblokuje export, pokud plán export explicitně žádá. export_ban = sell_f is not None and float(sell_f) < 0 and grid_sp >= 0 gen_cutoff_raw = pi.get("deye_gen_cutoff_enabled") gen_cutoff = bool(gen_cutoff_raw) if gen_cutoff_raw is not None else False pv_a_allowed: int | None = None if bool(reg340_pv_a_control_enabled) and int(pv_a_cap_w) > 0: forecast = int(pi.get("pv_a_forecast_solver_w") or 0) curtail = int(pi.get("pv_a_curtailed_w") or 0) pv_a_allowed = compute_pv_a_reg340_max_solar_w(int(pv_a_cap_w), forecast, curtail) buy_raw = pi.get("effective_buy_price") buy_f: float | None = float(buy_raw) if buy_raw is not None else None pv_b = int(pi.get("pv_b_forecast_solver_w") or 0) if ( buy_f is not None and sell_f is not None and float(buy_f) < 0.0 and float(sell_f) < 0.0 and pv_b > 0 ): pv_a_allowed = 0 return ControlSetpoints( battery_w=int(pi["battery_setpoint_w"] or 0), grid_export_limit=max(0, export_limit), ev1_current_a=watts_to_amps(ev1_w, phases=3), ev2_current_a=watts_to_amps(ev2_w, phases=1), heat_pump_enable=hp_en, grid_setpoint_w=grid_sp, ev1_power_w=ev1_w, ev2_power_w=ev2_w, target_soc_pct=target_soc, deye_physical_mode=pm, export_ban=bool(export_ban), deye_gen_cutoff_enabled=bool(gen_cutoff), effective_sell_price_czk_kwh=sell_f, pv_a_allowed_w=pv_a_allowed, ) if code == "SELF_SUSTAIN": return ControlSetpoints( battery_w=None, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, self_sustain_local_use=True, ) if code == "CHARGE_CHEAP": return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, ) if code == "PRESERVE": return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=0, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, target_soc_pct=None, lock_battery=True, ) logger.warning("Unknown mode_code %s for site export, skipping", code) return None def _apply_price_failsafe_guard( site_id: int, mode: OperatingModeInfo, pi: Any | None, sp: ControlSetpoints, ) -> ControlSetpoints: if mode.mode_code != "AUTO" or pi is None: return sp if "is_predicted_price" not in pi or not bool(pi["is_predicted_price"]): return sp logger.warning( "control export site=%s: AUTO slot uses predicted price -> forcing PASSIVE no-export guard", site_id, ) return ControlSetpoints( battery_w=0, grid_export_limit=0, ev1_current_a=sp.ev1_current_a, ev2_current_a=sp.ev2_current_a, heat_pump_enable=sp.heat_pump_enable, grid_setpoint_w=max(0, int(sp.grid_setpoint_w or 0)), ev1_power_w=sp.ev1_power_w, ev2_power_w=sp.ev2_power_w, target_soc_pct=sp.target_soc_pct, effective_sell_price_czk_kwh=sp.effective_sell_price_czk_kwh, pv_a_allowed_w=sp.pv_a_allowed_w, ) def _deye_reg143_export_w(no_export: bool, max_export_power_w: int | None) -> int: """Reg 143 - max export W z DB (např. SUN-20K / home-01 = 13 500 W).""" if no_export: return 0 return max(0, int(max_export_power_w or 0)) def _clamp_deye_tou_soc_pct(pct: int) -> int: return max(5, min(95, pct)) def _deye_tou_min_soc_pct(inv: InverterConfig) -> int: if inv.min_soc_percent is not None: return _clamp_deye_tou_soc_pct(int(inv.min_soc_percent)) return 10 def _deye_tou_reserve_soc_pct(inv: InverterConfig) -> int: if inv.reserve_soc_percent is not None: return _clamp_deye_tou_soc_pct(int(inv.reserve_soc_percent)) return 20 def _deye_passive_tou_battery_soc_pct( inv: InverterConfig, _setpoints: ControlSetpoints ) -> int: """Hodnota SOC u Deye TOU řádku (reg 166+) ve fyzickém PASSIVE.""" return _deye_tou_min_soc_pct(inv) def _deye_zero_export_amps_for_passive( grid_w: int, bat_w: int, max_charge_a: int, max_discharge_a: int, ) -> tuple[int, int]: """ PASSIVE (zero export k CT/zátěži): výchozí plné 108/109. Export v plánu bez vybíjení baterie vypne charge A; import bez nabíjení vypne discharge A. """ if grid_w < 0 and bat_w >= 0: return 0, max_discharge_a if grid_w > 0 and bat_w <= 0: return max_charge_a, 0 return max_charge_a, max_discharge_a def deye_battery_charge_discharge_amps( *, lock_battery: bool, deye_mode: str, self_sustain_local_use: bool, bat_w: int, grid_w: int, max_charge_a: int, max_discharge_a: int, ) -> tuple[int, int]: """ Proud nabíjení / vybíjení (reg 108 / 109) pro zápis Deye. PASSIVE + plán chce nabíjet z PV přebytku i při exportu do sítě: nenulový charge, discharge 0. """ if lock_battery: return 0, 0 if deye_mode == "CHARGE": return battery_watts_to_amps(bat_w, max_charge_a), 0 if deye_mode == "SELL": return 0, int(max_discharge_a) if self_sustain_local_use: return int(max_charge_a), int(max_discharge_a) if bat_w > 0: return battery_watts_to_amps(bat_w, max_charge_a), 0 return _deye_zero_export_amps_for_passive( grid_w, bat_w, int(max_charge_a), int(max_discharge_a) ) def get_deye_mode(setpoints: ControlSetpoints) -> str: """Fyzický režim Deye: SELL | CHARGE | PASSIVE.""" pm = (setpoints.deye_physical_mode or "").strip().upper() if pm in {"PASSIVE", "SELL", "CHARGE"}: return pm grid_w = int(setpoints.grid_setpoint_w or 0) bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w) if bat_w > 0 and grid_w > 0: return "CHARGE" if grid_w < 0 and bat_w < 0: return "SELL" return "PASSIVE" def _deye_tou_params( setpoints: ControlSetpoints, inv: InverterConfig, ) -> tuple[int, int, bool]: """Parametry jednoho Deye time pointu: výkon W, SOC % (TOU reg 166+), grid_charge.""" max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) tp_discharge_w = 0 if setpoints.lock_battery else max_batt_w_discharge tou_min = _deye_tou_min_soc_pct(inv) tou_reserve = _deye_tou_reserve_soc_pct(inv) if setpoints.lock_battery: return tp_discharge_w, tou_min, False deye_mode = get_deye_mode(setpoints) if deye_mode == "CHARGE": raw_bat = setpoints.battery_w battery_w = int(raw_bat) if raw_bat is not None else 0 cap = int(inv.max_soc_percent) if inv.max_soc_percent is not None else 95 target_soc = max(10, min(100, cap)) tp_charge_w = ( battery_watts_to_amps(battery_w, int(inv.max_charge_a)) * int(BATT_VOLTAGE_V) ) return tp_charge_w, target_soc, True if deye_mode == "SELL": return tp_discharge_w, tou_reserve, False tou_soc = _deye_passive_tou_battery_soc_pct(inv, setpoints) return tp_discharge_w, tou_soc, False