Add export plan guard to block Deye export against plan.
Some checks failed
CI and deploy / migration-check (push) Failing after 39s
CI and deploy / deploy (push) Has been skipped

Force PASSIVE/no-export when sell is negative or export_mode is NONE,
and alert NEG_SELL_EXPORT in plan_actual_slot_guard when export still occurs.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Dusan Vojacek
2026-05-29 00:14:52 +02:00
parent 620a557a89
commit a7dff75e58
8 changed files with 230 additions and 4 deletions

View File

@@ -59,6 +59,7 @@ from services.control.repository import (
)
from services.control.setpoints import (
_DictRecord,
_apply_export_plan_guard,
_apply_price_failsafe_guard,
_build_setpoints,
_clamp_deye_tou_soc_pct,

View File

@@ -19,7 +19,11 @@ from services.control.repository import (
_fetch_plan_row_for_slot_offset,
_load_inverter_config,
)
from services.control.setpoints import _apply_price_failsafe_guard, _build_setpoints
from services.control.setpoints import (
_apply_export_plan_guard,
_apply_price_failsafe_guard,
_build_setpoints,
)
from services.signal_service import enqueue_site_signals
logger = logging.getLogger(__name__)
@@ -94,8 +98,10 @@ async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None:
)
sp_next = sp_now
else:
sp_now = _apply_export_plan_guard(site_id, mode, pi_now, sp_now)
sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now)
if sp_next is not None:
sp_next = _apply_export_plan_guard(site_id, mode, pi_next, sp_next)
sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next)
planning_run_id = await db.fetchval(

View File

@@ -223,6 +223,71 @@ def _build_setpoints(
return None
def _passive_no_export_guard(sp: ControlSetpoints) -> ControlSetpoints:
"""PASSIVE, žádný vývoz do sítě; vybíjení baterie do sítě vynulováno (reg 109 přes export_ban)."""
bat = int(sp.battery_w or 0)
if bat < 0:
bat = 0
return ControlSetpoints(
battery_w=bat,
grid_export_limit=0,
ev1_current_a=sp.ev1_current_a,
ev2_current_a=sp.ev2_current_a,
heat_pump_enable=sp.heat_pump_enable,
grid_setpoint_w=max(0, int(sp.grid_setpoint_w or 0)),
ev1_power_w=sp.ev1_power_w,
ev2_power_w=sp.ev2_power_w,
target_soc_pct=sp.target_soc_pct,
deye_physical_mode="PASSIVE",
export_mode="NONE",
export_ban=True,
deye_gen_cutoff_enabled=sp.deye_gen_cutoff_enabled,
effective_sell_price_czk_kwh=sp.effective_sell_price_czk_kwh,
pv_a_allowed_w=sp.pv_a_allowed_w,
lock_battery=sp.lock_battery,
self_sustain_local_use=sp.self_sustain_local_use,
)
def _apply_export_plan_guard(
site_id: int,
mode: OperatingModeInfo,
pi: Any | None,
sp: ControlSetpoints,
) -> ControlSetpoints:
"""
Exekuční pojistka: plán zakazuje vývoz (záporná vykupní nebo export_mode NONE),
ale Deye může zůstat v SELL — vynutit PASSIVE a export_ban před zápisem Modbus.
"""
if mode.mode_code != "AUTO" or pi is None:
return sp
sell_raw = pi.get("effective_sell_price")
sell_f: float | None = (
float(sell_raw) if sell_raw is not None else sp.effective_sell_price_czk_kwh
)
export_mode_raw = pi.get("export_mode")
export_mode = (
str(export_mode_raw).strip().upper()
if export_mode_raw is not None
else (sp.export_mode or "")
)
grid_sp = int(pi.get("grid_setpoint_w") or sp.grid_setpoint_w or 0)
neg_sell = sell_f is not None and float(sell_f) < 0
plan_no_export = export_mode == "NONE" and grid_sp >= 0
if not neg_sell and not plan_no_export:
return sp
reason = "neg_sell" if neg_sell else "export_mode_none"
logger.warning(
"control export site=%s: AUTO export plan guard (%s) -> PASSIVE no-export",
site_id,
reason,
)
return _passive_no_export_guard(sp)
def _apply_price_failsafe_guard(
site_id: int,
mode: OperatingModeInfo,

View File

@@ -0,0 +1,106 @@
"""Exekuční pojistka exportu podle plánu (Plan 3)."""
from __future__ import annotations
import unittest
from services.control.exporter_monolith import (
ControlSetpoints,
_apply_export_plan_guard,
get_deye_mode,
)
from services.control.models import OperatingModeInfo
from services.control.setpoints import _DictRecord
def _auto_mode() -> OperatingModeInfo:
return OperatingModeInfo(
mode_code="AUTO",
battery_mode="AUTO",
grid_mode="AUTO",
ev_enabled=True,
heat_pump_enabled_def=True,
loxone_mode_value=0,
)
def _sp(**kwargs: object) -> ControlSetpoints:
base = dict(
battery_w=0,
grid_export_limit=8000,
ev1_current_a=0,
ev2_current_a=0,
heat_pump_enable=False,
grid_setpoint_w=-8000,
ev1_power_w=0,
ev2_power_w=0,
target_soc_pct=50,
deye_physical_mode="SELL",
export_mode="BATTERY_SELL",
export_ban=False,
)
base.update(kwargs)
return ControlSetpoints(**base) # type: ignore[arg-type]
class ExportPlanGuardTests(unittest.TestCase):
def test_neg_sell_forces_passive_no_export(self) -> None:
sp = _sp()
pi = _DictRecord(
{
"grid_setpoint_w": -8000,
"effective_sell_price": -0.5,
"export_mode": "NONE",
}
)
out = _apply_export_plan_guard(1, _auto_mode(), pi, sp)
self.assertEqual(get_deye_mode(out), "PASSIVE")
self.assertTrue(out.export_ban)
self.assertEqual(out.grid_export_limit, 0)
self.assertGreaterEqual(out.grid_setpoint_w, 0)
self.assertEqual(out.export_mode, "NONE")
def test_export_mode_none_with_non_negative_grid(self) -> None:
sp = _sp(grid_setpoint_w=0, battery_w=-5000, export_mode="BATTERY_SELL")
pi = _DictRecord(
{
"grid_setpoint_w": 0,
"effective_sell_price": 2.5,
"export_mode": "NONE",
}
)
out = _apply_export_plan_guard(1, _auto_mode(), pi, sp)
self.assertEqual(get_deye_mode(out), "PASSIVE")
self.assertEqual(out.battery_w, 0)
self.assertTrue(out.export_ban)
def test_profitable_export_unchanged(self) -> None:
sp = _sp()
pi = _DictRecord(
{
"grid_setpoint_w": -8000,
"effective_sell_price": 9.5,
"export_mode": "BATTERY_SELL",
}
)
out = _apply_export_plan_guard(1, _auto_mode(), pi, sp)
self.assertIs(out, sp)
self.assertEqual(get_deye_mode(out), "SELL")
def test_non_auto_mode_skipped(self) -> None:
sp = _sp()
pi = _DictRecord({"effective_sell_price": -1.0, "export_mode": "NONE"})
mode = OperatingModeInfo(
mode_code="SELF_SUSTAIN",
battery_mode="PASSIVE",
grid_mode="PASSIVE",
ev_enabled=False,
heat_pump_enabled_def=False,
loxone_mode_value=1,
)
out = _apply_export_plan_guard(1, mode, pi, sp)
self.assertIs(out, sp)
if __name__ == "__main__":
unittest.main()