Files
ems/backend/services/control/setpoints.py
Dusan Vojacek bcb05d4896 tuning palnneru
2026-05-04 19:04:48 +02:00

331 lines
11 KiB
Python

"""Výpočet control setpointů a Deye TOU parametrů."""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any
from services.control.deye_helpers import (
BATT_VOLTAGE_V,
PRAGUE_TZ,
battery_watts_to_amps,
compute_pv_a_reg340_max_solar_w,
watts_to_amps,
)
from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo
logger = logging.getLogger(__name__)
def _deye_system_time_register_rows() -> tuple[datetime, list[tuple[int, str, int]]]:
"""Hodnoty pro reg 62-64 (Europe/Prague); sekundy v reg 64 = 0 (stabilnější zápis)."""
now = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
reg62 = ((now.year - 2000) << 8) | now.month
reg63 = (now.day << 8) | now.hour
reg64 = (now.minute << 8) | 0
rows = [
(62, "", reg62),
(63, "", reg63),
(64, "", reg64),
]
return now, rows
def _deye_time_point_rows(
slot_index: int,
time_hhmm: int,
power_w: int,
soc_pct: int,
grid_charge: bool,
) -> list[tuple[int, str, int]]:
g = 1 if grid_charge else 0
return [
(148 + slot_index, "", time_hhmm),
(154 + slot_index, "", power_w),
(166 + slot_index, "", soc_pct),
(172 + slot_index, "", g),
]
class _DictRecord:
"""Minimální asyncpg Record kompatibilita pro dict z jsonb."""
__slots__ = ("_d",)
def __init__(self, d: dict[str, Any]) -> None:
self._d = d
def __getitem__(self, k: str) -> Any:
return self._d[k]
def get(self, k: str, default: Any = None) -> Any:
return self._d.get(k, default)
def __contains__(self, k: str) -> bool:
return k in self._d
def _build_setpoints(
mode: OperatingModeInfo,
pi: Any | None,
*,
pv_a_cap_w: int = 0,
reg340_pv_a_control_enabled: bool = False,
) -> ControlSetpoints | None:
code = mode.mode_code
if code == "MANUAL":
return None
if code == "AUTO":
if pi is None:
return None
grid_sp = int(pi["grid_setpoint_w"] or 0)
export_limit_raw = pi.get("export_limit_w")
export_limit = int(export_limit_raw) if export_limit_raw is not None else abs(min(grid_sp, 0))
ev1_w = int(pi["ev1_setpoint_w"] or 0) if "ev1_setpoint_w" in pi else 0
ev2_w = int(pi["ev2_setpoint_w"] or 0) if "ev2_setpoint_w" in pi else 0
hp_en = bool(pi["heat_pump_enabled"])
tgt = pi["battery_soc_target_pct"]
target_soc = int(round(float(tgt))) if tgt is not None else None
pm_raw = pi.get("deye_physical_mode")
pm: str | None = str(pm_raw).strip().upper() if pm_raw is not None else None
sell_raw = pi.get("effective_sell_price")
sell_f: float | None = float(sell_raw) if sell_raw is not None else None
export_mode_raw = pi.get("export_mode")
export_mode = str(export_mode_raw).strip().upper() if export_mode_raw is not None else None
if export_mode == "NONE":
export_limit = 0
# Záporný výkup sám o sobě neblokuje export, pokud plán export explicitně žádá.
export_ban = sell_f is not None and float(sell_f) < 0 and grid_sp >= 0
gen_cutoff_raw = pi.get("deye_gen_cutoff_enabled")
gen_cutoff = bool(gen_cutoff_raw) if gen_cutoff_raw is not None else False
pv_a_allowed: int | None = None
if bool(reg340_pv_a_control_enabled) and int(pv_a_cap_w) > 0:
forecast = int(pi.get("pv_a_forecast_solver_w") or 0)
curtail = int(pi.get("pv_a_curtailed_w") or 0)
pv_a_allowed = compute_pv_a_reg340_max_solar_w(int(pv_a_cap_w), forecast, curtail)
buy_raw = pi.get("effective_buy_price")
buy_f: float | None = float(buy_raw) if buy_raw is not None else None
pv_b = int(pi.get("pv_b_forecast_solver_w") or 0)
if (
buy_f is not None
and sell_f is not None
and float(buy_f) < 0.0
and float(sell_f) < 0.0
and pv_b > 0
):
pv_a_allowed = 0
return ControlSetpoints(
battery_w=int(pi["battery_setpoint_w"] or 0),
grid_export_limit=max(0, export_limit),
ev1_current_a=watts_to_amps(ev1_w, phases=3),
ev2_current_a=watts_to_amps(ev2_w, phases=1),
heat_pump_enable=hp_en,
grid_setpoint_w=grid_sp,
ev1_power_w=ev1_w,
ev2_power_w=ev2_w,
target_soc_pct=target_soc,
deye_physical_mode=pm,
export_ban=bool(export_ban),
deye_gen_cutoff_enabled=bool(gen_cutoff),
effective_sell_price_czk_kwh=sell_f,
pv_a_allowed_w=pv_a_allowed,
)
if code == "SELF_SUSTAIN":
return ControlSetpoints(
battery_w=None,
grid_export_limit=0,
ev1_current_a=0,
ev2_current_a=0,
heat_pump_enable=False,
grid_setpoint_w=0,
ev1_power_w=0,
ev2_power_w=0,
target_soc_pct=None,
self_sustain_local_use=True,
)
if code == "CHARGE_CHEAP":
return ControlSetpoints(
battery_w=0,
grid_export_limit=0,
ev1_current_a=0,
ev2_current_a=0,
heat_pump_enable=False,
grid_setpoint_w=0,
ev1_power_w=0,
ev2_power_w=0,
target_soc_pct=None,
)
if code == "PRESERVE":
return ControlSetpoints(
battery_w=0,
grid_export_limit=0,
ev1_current_a=0,
ev2_current_a=0,
heat_pump_enable=False,
grid_setpoint_w=0,
ev1_power_w=0,
ev2_power_w=0,
target_soc_pct=None,
lock_battery=True,
)
logger.warning("Unknown mode_code %s for site export, skipping", code)
return None
def _apply_price_failsafe_guard(
site_id: int,
mode: OperatingModeInfo,
pi: Any | None,
sp: ControlSetpoints,
) -> ControlSetpoints:
if mode.mode_code != "AUTO" or pi is None:
return sp
if "is_predicted_price" not in pi or not bool(pi["is_predicted_price"]):
return sp
logger.warning(
"control export site=%s: AUTO slot uses predicted price -> forcing PASSIVE no-export guard",
site_id,
)
return ControlSetpoints(
battery_w=0,
grid_export_limit=0,
ev1_current_a=sp.ev1_current_a,
ev2_current_a=sp.ev2_current_a,
heat_pump_enable=sp.heat_pump_enable,
grid_setpoint_w=max(0, int(sp.grid_setpoint_w or 0)),
ev1_power_w=sp.ev1_power_w,
ev2_power_w=sp.ev2_power_w,
target_soc_pct=sp.target_soc_pct,
effective_sell_price_czk_kwh=sp.effective_sell_price_czk_kwh,
pv_a_allowed_w=sp.pv_a_allowed_w,
)
def _deye_reg143_export_w(no_export: bool, max_export_power_w: int | None) -> int:
"""Reg 143 - max export W z DB (např. SUN-20K / home-01 = 13 500 W)."""
if no_export:
return 0
return max(0, int(max_export_power_w or 0))
def _clamp_deye_tou_soc_pct(pct: int) -> int:
return max(5, min(95, pct))
def _deye_tou_min_soc_pct(inv: InverterConfig) -> int:
if inv.min_soc_percent is not None:
return _clamp_deye_tou_soc_pct(int(inv.min_soc_percent))
return 10
def _deye_tou_reserve_soc_pct(inv: InverterConfig) -> int:
if inv.reserve_soc_percent is not None:
return _clamp_deye_tou_soc_pct(int(inv.reserve_soc_percent))
return 20
def _deye_passive_tou_battery_soc_pct(
inv: InverterConfig, _setpoints: ControlSetpoints
) -> int:
"""Hodnota SOC u Deye TOU řádku (reg 166+) ve fyzickém PASSIVE."""
return _deye_tou_min_soc_pct(inv)
def _deye_zero_export_amps_for_passive(
grid_w: int,
bat_w: int,
max_charge_a: int,
max_discharge_a: int,
) -> tuple[int, int]:
"""
PASSIVE (zero export k CT/zátěži): výchozí plné 108/109.
Export v plánu bez vybíjení baterie vypne charge A; import bez nabíjení vypne discharge A.
"""
if grid_w < 0 and bat_w >= 0:
return 0, max_discharge_a
if grid_w > 0 and bat_w <= 0:
return max_charge_a, 0
return max_charge_a, max_discharge_a
def deye_battery_charge_discharge_amps(
*,
lock_battery: bool,
deye_mode: str,
self_sustain_local_use: bool,
bat_w: int,
grid_w: int,
max_charge_a: int,
max_discharge_a: int,
) -> tuple[int, int]:
"""
Proud nabíjení / vybíjení (reg 108 / 109) pro zápis Deye.
PASSIVE + plán chce nabíjet z PV přebytku i při exportu do sítě: nenulový charge, discharge 0.
"""
if lock_battery:
return 0, 0
if deye_mode == "CHARGE":
return battery_watts_to_amps(bat_w, max_charge_a), 0
if deye_mode == "SELL":
return 0, int(max_discharge_a)
if self_sustain_local_use:
return int(max_charge_a), int(max_discharge_a)
if bat_w > 0:
return battery_watts_to_amps(bat_w, max_charge_a), 0
return _deye_zero_export_amps_for_passive(
grid_w, bat_w, int(max_charge_a), int(max_discharge_a)
)
def get_deye_mode(setpoints: ControlSetpoints) -> str:
"""Fyzický režim Deye: SELL | CHARGE | PASSIVE."""
pm = (setpoints.deye_physical_mode or "").strip().upper()
if pm in {"PASSIVE", "SELL", "CHARGE"}:
return pm
grid_w = int(setpoints.grid_setpoint_w or 0)
bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
if bat_w > 0 and grid_w > 0:
return "CHARGE"
if grid_w < 0 and bat_w < 0:
return "SELL"
return "PASSIVE"
def _deye_tou_params(
setpoints: ControlSetpoints,
inv: InverterConfig,
) -> tuple[int, int, bool]:
"""Parametry jednoho Deye time pointu: výkon W, SOC % (TOU reg 166+), grid_charge."""
max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
tp_discharge_w = 0 if setpoints.lock_battery else max_batt_w_discharge
tou_min = _deye_tou_min_soc_pct(inv)
tou_reserve = _deye_tou_reserve_soc_pct(inv)
if setpoints.lock_battery:
return tp_discharge_w, tou_min, False
deye_mode = get_deye_mode(setpoints)
if deye_mode == "CHARGE":
raw_bat = setpoints.battery_w
battery_w = int(raw_bat) if raw_bat is not None else 0
cap = int(inv.max_soc_percent) if inv.max_soc_percent is not None else 95
target_soc = max(10, min(100, cap))
tp_charge_w = (
battery_watts_to_amps(battery_w, int(inv.max_charge_a)) * int(BATT_VOLTAGE_V)
)
return tp_charge_w, target_soc, True
if deye_mode == "SELL":
return tp_discharge_w, tou_reserve, False
tou_soc = _deye_passive_tou_battery_soc_pct(inv, setpoints)
return tp_discharge_w, tou_soc, False