Files
ems/backend/tests/test_planning_dispatch_milp.py

264 lines
8.0 KiB
Python

"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB)."""
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from types import SimpleNamespace
from services.planning_engine import (
PlanningSlot,
_dynamic_arb_floor_wh_series,
solve_dispatch,
)
def _slot(
*,
load: int = 2000,
buy: float = 3.0,
sell: float = 3.0,
pv_a: int = 0,
pv_b: int = 0,
) -> PlanningSlot:
return PlanningSlot(
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=pv_b,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
def _battery(
*,
uc_wh: float = 100_000.0,
min_pct: float = 10.0,
arb_pct: float = 20.0,
max_pct: float = 95.0,
) -> SimpleNamespace:
uc = uc_wh
min_wh = min_pct / 100.0 * uc
arb_wh = arb_pct / 100.0 * uc
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_wh,
arb_floor_wh=arb_wh,
reserve_soc_wh=arb_wh,
soc_max_wh=max_pct / 100.0 * uc,
charge_efficiency=0.95,
discharge_efficiency=0.95,
degradation_cost_czk_kwh=0.15,
max_charge_power_w=10_000,
max_discharge_power_w=10_000,
)
class DynamicArbFloorTests(unittest.TestCase):
def test_more_pv_ahead_lowers_floor(self) -> None:
"""Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu."""
min_w = 1_000.0
base_w = 2_000.0
uc = 10_000.0
s0 = _slot()
s_low_pv = replace_slot(s0, pv_a=100, pv_b=0)
s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0)
ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc)
ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc)
self.assertLess(ser_high[0], ser_low[0])
self.assertGreaterEqual(ser_low[0], min_w)
self.assertLessEqual(ser_low[0], base_w)
def replace_slot(
s: PlanningSlot,
*,
pv_a: int | None = None,
pv_b: int | None = None,
load: int | None = None,
) -> PlanningSlot:
return PlanningSlot(
interval_start=s.interval_start,
buy_price=s.buy_price,
sell_price=s.sell_price,
pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w,
pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w,
load_baseline_w=load if load is not None else s.load_baseline_w,
ev1_connected=s.ev1_connected,
ev2_connected=s.ev2_connected,
is_predicted_price=s.is_predicted_price,
)
class PlanningDispatchMilpTests(unittest.TestCase):
def test_two_tier_soc_solves_optimal(self) -> None:
slots = [_slot()]
battery = _battery()
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.15 * battery.usable_capacity_wh
results, ms = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
price_failsafe_active=False,
)
self.assertGreaterEqual(ms, 0)
self.assertEqual(len(results), 1)
def test_deep_discharge_allows_covering_load_only(self) -> None:
slots = [
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
]
battery = _battery(uc_wh=50_000.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.12 * battery.usable_capacity_wh
results, _ms = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
price_failsafe_active=False,
)
self.assertEqual(len(results), 2)
def test_negative_buy_price_allows_import_for_baseline(self) -> None:
slots = [_slot(load=6000, buy=-0.5, sell=2.0)]
battery = _battery()
hp = SimpleNamespace(
rated_heating_power_w=8000,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=11_000,
battery_capacity_kwh=50.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.5 * battery.usable_capacity_wh
results, _ms = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
price_failsafe_active=False,
)
self.assertGreaterEqual(results[0].grid_setpoint_w, 0)
def test_export_implies_end_soc_at_least_reserve(self) -> None:
"""Při ge >= 1 W musí koncové soc[t] >= arb_base_wh (rezerva z DB)."""
slots = [
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
]
battery = _battery(uc_wh=100_000.0, min_pct=10.0, arb_pct=20.0)
hp = SimpleNamespace(
rated_heating_power_w=0,
tuv_min_temp_c=45.0,
tuv_target_temp_c=55.0,
)
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
vehicles = [
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
),
]
soc0 = 0.22 * battery.usable_capacity_wh
results, _ms = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
tuv_delta_stats=None,
operating_mode="AUTO",
price_failsafe_active=False,
)
reserve_pct = 20.0
for r in results:
if r.grid_setpoint_w < 0:
self.assertGreaterEqual(
r.battery_soc_target,
reserve_pct - 0.2,
msg="export slot must end at or above reserve SoC",
)
if __name__ == "__main__":
unittest.main()