Files
ems/backend/tests/test_planning_dispatch_milp.py
Dusan Vojacek 7490ac3d70
Some checks failed
CI and deploy / migration-check (push) Failing after 20s
CI and deploy / deploy (push) Has been skipped
planner v2 vc. porovnani
2026-05-15 23:03:32 +02:00

1088 lines
38 KiB
Python

"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB)."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.planning_engine import (
DispatchResult,
PlanningSlot,
_dynamic_arb_floor_wh_series,
_dispatch_result_comparison,
_prewindow_deferral_slots,
_slots_until_buy_le_threshold,
_slots_until_sell_lt,
_soc_panel_min_wh_series,
solve_dispatch,
)
def _slot(
*,
load: int = 2000,
buy: float = 3.0,
sell: float = 3.0,
pv_a: int = 0,
pv_b: int = 0,
) -> PlanningSlot:
return PlanningSlot(
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=pv_b,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
def _battery(
*,
uc_wh: float = 100_000.0,
min_pct: float = 10.0,
arb_pct: float = 20.0,
max_pct: float = 95.0,
terminal_soc_value_factor: float = 0.9,
) -> SimpleNamespace:
uc = uc_wh
min_wh = min_pct / 100.0 * uc
arb_wh = arb_pct / 100.0 * uc
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_wh,
arb_floor_wh=arb_wh,
reserve_soc_wh=arb_wh,
soc_max_wh=max_pct / 100.0 * uc,
charge_efficiency=0.95,
discharge_efficiency=0.95,
degradation_cost_czk_kwh=0.15,
max_charge_power_w=10_000,
max_discharge_power_w=10_000,
planner_terminal_soc_value_factor=terminal_soc_value_factor,
)
class SlotsUntilSellNegativeTests(unittest.TestCase):
def test_slots_until_first_negative_sell(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=1.0,
sell_price=2.0 if i < 4 else -0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
dist = _slots_until_sell_lt(slots, 0.0)
self.assertEqual(dist[0], 4)
self.assertEqual(dist[3], 1)
self.assertEqual(dist[4], 0)
def test_prewindow_deferral_prefers_sell_anchor(self) -> None:
"""Když existuje záporný prodej, kotva je vzdálenost k němu, ne k extrémnímu buy."""
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(8):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=-50.0,
sell_price=1.0 if i < 2 else -0.1,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
adv = _prewindow_deferral_slots(slots, -2.0)
self.assertEqual(adv[0], 2)
def test_prewindow_deferral_falls_back_to_buy_when_no_negative_sell(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=3.0 if i < 7 else -10.0,
sell_price=2.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
adv = _prewindow_deferral_slots(slots, -2.0)
self.assertEqual(adv[0], 7)
class SlotsUntilBuyExtremeTests(unittest.TestCase):
def test_slots_until_first_extreme(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=1.0,
sell_price=1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
slots[-1] = PlanningSlot(
interval_start=slots[-1].interval_start,
buy_price=-10.0,
sell_price=0.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
dist = _slots_until_buy_le_threshold(slots, -2.0)
self.assertEqual(dist[0], 9)
self.assertEqual(dist[8], 1)
self.assertEqual(dist[9], 0)
def test_prewindow_clamps_relaxed_floor_until_close(self) -> None:
sm = [5000.0] * 10
dist = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] # obecná kotva (sell nebo buy)
panel = _soc_panel_min_wh_series(sm, dist, 10_000.0, 20_000.0, 2)
self.assertEqual(panel[0], 20_000.0)
self.assertEqual(panel[6], 20_000.0)
self.assertEqual(panel[7], 5000.0)
self.assertEqual(panel[9], 5000.0)
class DynamicArbFloorTests(unittest.TestCase):
def test_more_pv_ahead_lowers_floor(self) -> None:
"""Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu."""
min_w = 1_000.0
base_w = 2_000.0
uc = 10_000.0
s0 = _slot()
s_low_pv = replace_slot(s0, pv_a=100, pv_b=0)
s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0)
ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc)
ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc)
self.assertLess(ser_high[0], ser_low[0])
self.assertGreaterEqual(ser_low[0], min_w)
self.assertLessEqual(ser_low[0], base_w)
def replace_slot(
s: PlanningSlot,
*,
pv_a: int | None = None,
pv_b: int | None = None,
load: int | None = None,
) -> PlanningSlot:
return PlanningSlot(
interval_start=s.interval_start,
buy_price=s.buy_price,
sell_price=s.sell_price,
pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w,
pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w,
load_baseline_w=load if load is not None else s.load_baseline_w,
ev1_connected=s.ev1_connected,
ev2_connected=s.ev2_connected,
is_predicted_price=s.is_predicted_price,
)
class PlanningDispatchMilpTests(unittest.TestCase):
def test_dispatch_result_comparison_marks_changed_slots(self) -> None:
dt = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc)
active = [
DispatchResult(
interval_start=dt,
battery_setpoint_w=1000,
battery_soc_target=50.0,
grid_setpoint_w=0,
export_limit_w=0,
export_mode="NONE",
deye_physical_mode="PASSIVE",
deye_gen_cutoff_enabled=False,
ev1_setpoint_w=None,
ev2_setpoint_w=None,
ev1_via_bat_w=0,
ev2_via_bat_w=0,
heat_pump_enabled=False,
heat_pump_setpoint_w=0,
pv_a_curtailed_w=0,
expected_cost_czk=1.0,
effective_buy_price=1.0,
effective_sell_price=1.0,
is_predicted_price=False,
)
]
peer = [
DispatchResult(
interval_start=dt,
battery_setpoint_w=2000,
battery_soc_target=55.0,
grid_setpoint_w=-1000,
export_limit_w=1000,
export_mode="PV_SURPLUS",
deye_physical_mode="SELL",
deye_gen_cutoff_enabled=True,
ev1_setpoint_w=None,
ev2_setpoint_w=None,
ev1_via_bat_w=0,
ev2_via_bat_w=0,
heat_pump_enabled=False,
heat_pump_setpoint_w=0,
pv_a_curtailed_w=200,
expected_cost_czk=2.0,
effective_buy_price=1.0,
effective_sell_price=1.0,
is_predicted_price=False,
)
]
cmp = _dispatch_result_comparison(active, 10, "v1", peer, 12, "v2")
self.assertEqual(cmp["active"]["planner_version"], "v1")
self.assertEqual(cmp["peer"]["planner_version"], "v2")
self.assertEqual(cmp["diff"]["changed_slots"], 1)
self.assertEqual(len(cmp["slot_diffs"]), 1)
def test_planner_version_is_recorded_in_snapshot(self) -> None:
slots = [_slot(load=500, buy=1.0, sell=1.0, pv_a=0, pv_b=0) for _ in range(2)]
battery = _battery()
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
results, _ms, snap = solve_dispatch(
slots,
battery,
hp,
grid,
[],
vehicles,
current_soc_wh=0.5 * battery.usable_capacity_wh,
current_tuv_temp_c=50.0,
planner_version="v2",
)
self.assertEqual(len(results), 2)
self.assertEqual(snap["inputs"]["planner_version"], "v2")
def test_neg_sell_with_future_neg_buy_prefers_curtail_pv_a_over_export(self) -> None:
"""
Když:
- aktuální slot má sell < 0 (export je náklad),
- v horizontu existuje budoucí buy < 0,
- a zároveň existuje PV B (necurtailable) někde v horizontu,
solver preferuje curtail PV A (ca) místo placeného exportu ge.
"""
slots = [
_slot(load=0, buy=3.0, sell=-0.1, pv_a=5000, pv_b=0),
_slot(load=0, buy=-10.0, sell=1.0, pv_a=0, pv_b=5000),
]
battery = _battery(uc_wh=50_000.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.50 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 2)
# Slot 0: PV A se má raději uříznout než vyvážet za zápornou cenu.
self.assertEqual(int(results[0].pv_a_curtailed_w), 5000)
self.assertGreaterEqual(int(results[0].grid_setpoint_w), 0)
def test_pv_surplus_export_uses_hard_export_cap(self) -> None:
slots = [
_slot(load=0, buy=3.0, sell=2.5, pv_a=20_000, pv_b=0),
]
battery = _battery()
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=13_500)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = battery.soc_max_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].export_mode, "PV_SURPLUS")
self.assertEqual(results[0].export_limit_w, 13_500)
self.assertGreater(results[0].pv_a_curtailed_w, 0)
def test_two_tier_soc_solves_optimal(self) -> None:
slots = [_slot()]
battery = _battery()
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.15 * battery.usable_capacity_wh
results, ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertGreaterEqual(ms, 0)
self.assertEqual(len(results), 1)
def test_deep_discharge_allows_covering_load_only(self) -> None:
slots = [
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
]
battery = _battery(uc_wh=50_000.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.12 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 2)
def test_negative_buy_price_allows_import_for_baseline(self) -> None:
slots = [_slot(load=6000, buy=-0.5, sell=2.0)]
battery = _battery()
hp = SimpleNamespace(
rated_heating_power_w=8000,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.5 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertGreaterEqual(results[0].grid_setpoint_w, 0)
def test_export_implies_end_soc_at_least_reserve(self) -> None:
"""Bez arbitrážní relaxace: při ge >= 1 W musí koncové soc[t] >= arb_base_wh (rezerva z DB)."""
slots = [
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
]
battery = _battery(uc_wh=100_000.0, min_pct=10.0, arb_pct=20.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.22 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
reserve_pct = 20.0
for r in results:
if r.grid_setpoint_w < 0:
self.assertGreaterEqual(
r.battery_soc_target,
reserve_pct - 0.2,
msg="export slot must end at or above reserve SoC",
)
def test_export_before_extreme_negative_buy_can_end_below_reserve(self) -> None:
"""
Při relaxovaném soc_min (záporný buy v lookahead) smí významný export skončit u planner floor,
ne u provozní rezervy — jinak nejde ráno vypustit do sítě a nachystat kapacitu před levným nákupem.
"""
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
s0 = PlanningSlot(
interval_start=base,
buy_price=2.5,
sell_price=2.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=400,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=True,
)
s1 = PlanningSlot(
interval_start=base + timedelta(minutes=15),
buy_price=-12.0,
sell_price=-0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=400,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=True,
)
slots = [s0, s1]
battery = _battery(uc_wh=10_000.0, min_pct=10.0, arb_pct=20.0)
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
battery.planner_discharge_floor_percent = 5.0
battery.max_charge_power_w = 50_000
battery.max_discharge_power_w = 50_000
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.88 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 2)
if results[0].grid_setpoint_w < 0:
self.assertLess(
results[0].battery_soc_target,
19.0,
msg="with relaxed soc_min, first-slot export should be able to finish below reserve %",
)
def test_negative_sell_forbids_battery_export_arbitrage(self) -> None:
"""
Pokud sell < 0, solver nesmí vybíjet baterii do sítě pro arbitráž (dump musí proběhnout předtím).
V okně sell<0 smí export vzniknout jen z přebytku FVE; zde ale FVE=0, takže očekáváme grid_setpoint>=0.
"""
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
s0 = PlanningSlot(
interval_start=base,
buy_price=2.0,
sell_price=2.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=True,
)
s1 = PlanningSlot(
interval_start=base + timedelta(minutes=15),
buy_price=2.0,
sell_price=-0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=True,
)
s2 = PlanningSlot(
interval_start=base + timedelta(minutes=30),
buy_price=-15.0,
sell_price=-1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=True,
)
slots = [s0, s1, s2]
battery = _battery(uc_wh=10_000.0, min_pct=10.0, arb_pct=20.0)
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
battery.planner_discharge_floor_percent = 5.0
battery.max_charge_power_w = 50_000
battery.max_discharge_power_w = 50_000
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
soc0 = 0.9 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 3)
# V sell<0 slotu bez FVE a bez zátěže nesmí být export (to by muselo být z baterie).
self.assertGreaterEqual(results[1].grid_setpoint_w, 0)
# A zároveň nesmí být baterie ve výboji (dump musí proběhnout předtím).
self.assertGreaterEqual(results[1].battery_setpoint_w, 0)
def test_anchor_hits_floor_before_first_negative_sell(self) -> None:
"""
Pokud se v horizontu objeví první sell<0 a současně existuje planner floor (relaxace),
solver má skončit už v předchozím slotu u planner floor (cca 5 %), ne na ~15 %.
"""
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
# Slot 0-1: sell >= 0; slot 2: první sell < 0; slot 3: extrémně záporný buy (motivace k bufferu).
slots = [
PlanningSlot(
interval_start=base,
buy_price=3.0,
sell_price=1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
PlanningSlot(
interval_start=base + timedelta(minutes=15),
buy_price=3.0,
sell_price=0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
PlanningSlot(
interval_start=base + timedelta(minutes=30),
buy_price=3.0,
sell_price=-0.2,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
PlanningSlot(
interval_start=base + timedelta(minutes=45),
buy_price=-20.0,
sell_price=-1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
]
battery = _battery(uc_wh=20_000.0, min_pct=10.0, arb_pct=20.0)
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
battery.planner_discharge_floor_percent = 5.0
battery.max_charge_power_w = 50_000
battery.max_discharge_power_w = 50_000
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
soc0 = 0.9 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
# Slot index 1 je poslední před prvním sell<0 (index 2).
self.assertLessEqual(
results[1].battery_soc_target,
6.0,
msg="anchor should drive SoC close to planner floor before first negative sell",
)
def test_anchor_uses_planner_floor_even_without_extreme_buy(self) -> None:
"""
Regrese: pokud v horizontu není buy <= threshold (soc_min_series by se nerelaxovala),
kotva před sell<0 má stejně mířit na planner floor (5 %), ne na base min SoC.
"""
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
slots = [
PlanningSlot(
interval_start=base,
buy_price=3.0,
sell_price=1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
PlanningSlot(
interval_start=base + timedelta(minutes=15),
buy_price=3.0,
sell_price=0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
PlanningSlot(
interval_start=base + timedelta(minutes=30),
buy_price=3.0,
sell_price=-0.2,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
),
]
battery = _battery(uc_wh=20_000.0, min_pct=12.0, arb_pct=20.0)
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
battery.planner_discharge_floor_percent = 5.0
battery.max_charge_power_w = 50_000
battery.max_discharge_power_w = 50_000
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
soc0 = 0.9 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
# Slot index 1 je poslední před prvním sell<0 (index 2).
self.assertLessEqual(results[1].battery_soc_target, 6.0)
def test_grid_import_soft_cap_penalizes_breaker_overdraw(self) -> None:
"""
Soft cap: solver může nominálně překročit breaker, ale jen pokud se to vyplatí.
Při běžné (nezáporné) nákupní ceně by měl držet import <= breaker.
"""
slots = [_slot(load=3700, buy=0.4, sell=-0.3, pv_a=0, pv_b=1500)]
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0)
battery.max_charge_power_w = 18_000
battery.max_discharge_power_w = 18_000
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.55 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 1)
self.assertLessEqual(
results[0].grid_setpoint_w,
grid.max_import_power_w,
msg="soft cap: for normal buy price, planned grid import should not exceed breaker",
)
def test_grid_import_soft_cap_allows_overdraw_when_extremely_negative(self) -> None:
"""
Regrese: při extrémně záporné nákupní ceně může solver překročit breaker (za cenu penalizace),
aby stihl krátké okno nabíjení. Překročení nesmí být 'zadarmo' (kontrolujeme alespoň, že existuje).
"""
# Dvouslotový scénář: v 1. slotu extrémně záporná cena, ve 2. slotu drahá.
# Terminal SoC kotva pak nepenalizuje držení energie (průměrná buy je ~0) a solver má motivaci
# v 1. slotu nabít na max, i kdyby to znamenalo malé překročení breakeru.
s0 = _slot(load=0, buy=-20.0, sell=-0.3, pv_a=0, pv_b=0)
s1 = replace_slot(s0, load=0)
s1 = PlanningSlot(
interval_start=s0.interval_start + timedelta(minutes=15),
buy_price=20.0,
sell_price=-0.3,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
slots = [s0, s1]
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0)
battery.max_charge_power_w = 18_000
battery.max_discharge_power_w = 18_000
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
soc0 = 0.55 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 2)
self.assertGreater(
results[0].grid_setpoint_w,
grid.max_import_power_w,
msg="with very negative buy price, solver may choose to exceed breaker (soft cap)",
)
def test_block_export_on_negative_sell_no_grid_export_pv_surplus(self) -> None:
"""site_grid_connection.block_export_on_negative_sell → ge=0 při sell<0."""
slots = [
PlanningSlot(
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
buy_price=5.25,
sell_price=-0.5,
pv_a_forecast_w=7000,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
allow_charge=True,
allow_discharge_export=False,
)
]
battery = _battery(uc_wh=20_000.0, arb_pct=15.0, max_pct=95.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(
max_import_power_w=17_000,
max_export_power_w=8000,
block_export_on_negative_sell=True,
)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.34 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 1)
self.assertGreaterEqual(results[0].grid_setpoint_w, 0, "no grid export")
self.assertGreater(results[0].battery_setpoint_w, 0, "surplus PV should charge")
class TerminalSocShadowTests(unittest.TestCase):
"""Terminal SoC shadow price v objective drží konec horizontu nad holým minimem."""
def test_terminal_soc_shadow_price_prevents_drain(self) -> None:
base = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc)
slots = []
for i in range(3):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=0.6,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=600,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
)
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=45),
buy_price=2.0,
sell_price=14.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=600,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
)
battery = _battery(uc_wh=12_000.0, min_pct=12.0, arb_pct=20.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.5 * battery.usable_capacity_wh
results, _ms, _ = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
)
self.assertEqual(len(results), 4)
# Bez shadow price by solver mohl končit u min SoC; kotva drží znatelnou rezervu.
self.assertGreaterEqual(
results[-1].battery_soc_target,
15.0,
msg="terminal SoC shadow price should keep end-of-horizon SoC above bare minimum",
)
if __name__ == "__main__":
unittest.main()