386 lines
12 KiB
Python
386 lines
12 KiB
Python
"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import unittest
|
|
from datetime import datetime, timedelta, timezone
|
|
from types import SimpleNamespace
|
|
|
|
from services.planning_engine import (
|
|
PlanningSlot,
|
|
_dynamic_arb_floor_wh_series,
|
|
solve_dispatch,
|
|
)
|
|
|
|
|
|
def _slot(
|
|
*,
|
|
load: int = 2000,
|
|
buy: float = 3.0,
|
|
sell: float = 3.0,
|
|
pv_a: int = 0,
|
|
pv_b: int = 0,
|
|
) -> PlanningSlot:
|
|
return PlanningSlot(
|
|
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
|
|
buy_price=buy,
|
|
sell_price=sell,
|
|
pv_a_forecast_w=pv_a,
|
|
pv_b_forecast_w=pv_b,
|
|
load_baseline_w=load,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
|
|
|
|
def _battery(
|
|
*,
|
|
uc_wh: float = 100_000.0,
|
|
min_pct: float = 10.0,
|
|
arb_pct: float = 20.0,
|
|
max_pct: float = 95.0,
|
|
) -> SimpleNamespace:
|
|
uc = uc_wh
|
|
min_wh = min_pct / 100.0 * uc
|
|
arb_wh = arb_pct / 100.0 * uc
|
|
return SimpleNamespace(
|
|
usable_capacity_wh=uc,
|
|
min_soc_wh=min_wh,
|
|
arb_floor_wh=arb_wh,
|
|
reserve_soc_wh=arb_wh,
|
|
soc_max_wh=max_pct / 100.0 * uc,
|
|
charge_efficiency=0.95,
|
|
discharge_efficiency=0.95,
|
|
degradation_cost_czk_kwh=0.15,
|
|
max_charge_power_w=10_000,
|
|
max_discharge_power_w=10_000,
|
|
)
|
|
|
|
|
|
class DynamicArbFloorTests(unittest.TestCase):
|
|
def test_more_pv_ahead_lowers_floor(self) -> None:
|
|
"""Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu."""
|
|
min_w = 1_000.0
|
|
base_w = 2_000.0
|
|
uc = 10_000.0
|
|
s0 = _slot()
|
|
s_low_pv = replace_slot(s0, pv_a=100, pv_b=0)
|
|
s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0)
|
|
ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc)
|
|
ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc)
|
|
self.assertLess(ser_high[0], ser_low[0])
|
|
self.assertGreaterEqual(ser_low[0], min_w)
|
|
self.assertLessEqual(ser_low[0], base_w)
|
|
|
|
|
|
def replace_slot(
|
|
s: PlanningSlot,
|
|
*,
|
|
pv_a: int | None = None,
|
|
pv_b: int | None = None,
|
|
load: int | None = None,
|
|
) -> PlanningSlot:
|
|
return PlanningSlot(
|
|
interval_start=s.interval_start,
|
|
buy_price=s.buy_price,
|
|
sell_price=s.sell_price,
|
|
pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w,
|
|
pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w,
|
|
load_baseline_w=load if load is not None else s.load_baseline_w,
|
|
ev1_connected=s.ev1_connected,
|
|
ev2_connected=s.ev2_connected,
|
|
is_predicted_price=s.is_predicted_price,
|
|
)
|
|
|
|
|
|
class PlanningDispatchMilpTests(unittest.TestCase):
|
|
def test_two_tier_soc_solves_optimal(self) -> None:
|
|
slots = [_slot()]
|
|
battery = _battery()
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.15 * battery.usable_capacity_wh
|
|
results, ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertGreaterEqual(ms, 0)
|
|
self.assertEqual(len(results), 1)
|
|
|
|
def test_deep_discharge_allows_covering_load_only(self) -> None:
|
|
slots = [
|
|
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
|
|
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
|
|
]
|
|
battery = _battery(uc_wh=50_000.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.12 * battery.usable_capacity_wh
|
|
results, _ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 2)
|
|
|
|
def test_negative_buy_price_allows_import_for_baseline(self) -> None:
|
|
slots = [_slot(load=6000, buy=-0.5, sell=2.0)]
|
|
battery = _battery()
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=8000,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.5 * battery.usable_capacity_wh
|
|
results, _ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertGreaterEqual(results[0].grid_setpoint_w, 0)
|
|
|
|
def test_export_implies_end_soc_at_least_reserve(self) -> None:
|
|
"""Při ge >= 1 W musí koncové soc[t] >= arb_base_wh (rezerva z DB)."""
|
|
slots = [
|
|
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
|
|
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
|
|
]
|
|
battery = _battery(uc_wh=100_000.0, min_pct=10.0, arb_pct=20.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.22 * battery.usable_capacity_wh
|
|
results, _ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
reserve_pct = 20.0
|
|
for r in results:
|
|
if r.grid_setpoint_w < 0:
|
|
self.assertGreaterEqual(
|
|
r.battery_soc_target,
|
|
reserve_pct - 0.2,
|
|
msg="export slot must end at or above reserve SoC",
|
|
)
|
|
|
|
|
|
def test_grid_import_cap_allows_full_bms_charge_above_breaker(self) -> None:
|
|
"""
|
|
Cheap buy, load 3.7 kW, PV malé → breaker 17 kW limituje gi, ale bc musí moct být
|
|
plných BMS 18 kW (Deye reg 128 + firmware throttling chrání jistič fyzicky).
|
|
"""
|
|
slots = [
|
|
_slot(load=3700, buy=0.4, sell=-0.3, pv_a=0, pv_b=1500),
|
|
_slot(load=2000, buy=5.0, sell=4.5, pv_a=0, pv_b=0),
|
|
_slot(load=2000, buy=5.0, sell=4.5, pv_a=0, pv_b=0),
|
|
]
|
|
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0)
|
|
battery.max_charge_power_w = 18_000
|
|
battery.max_discharge_power_w = 18_000
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.55 * battery.usable_capacity_wh
|
|
results, _ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 3)
|
|
self.assertGreaterEqual(
|
|
results[0].battery_setpoint_w,
|
|
17_500,
|
|
msg="LP must be able to target near-BMS-max charge even when gi would exceed breaker",
|
|
)
|
|
|
|
|
|
class TerminalSocShadowTests(unittest.TestCase):
|
|
"""Terminal SoC shadow price v objective drží konec horizontu nad holým minimem."""
|
|
|
|
def test_terminal_soc_shadow_price_prevents_drain(self) -> None:
|
|
base = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc)
|
|
slots = []
|
|
for i in range(3):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=2.0,
|
|
sell_price=0.6,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=600,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
)
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=45),
|
|
buy_price=2.0,
|
|
sell_price=14.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=600,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
)
|
|
battery = _battery(uc_wh=12_000.0, min_pct=12.0, arb_pct=20.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.5 * battery.usable_capacity_wh
|
|
results, _ms = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 4)
|
|
# Bez shadow price by solver mohl končit u min SoC; kotva drží znatelnou rezervu.
|
|
self.assertGreaterEqual(
|
|
results[-1].battery_soc_target,
|
|
15.0,
|
|
msg="terminal SoC shadow price should keep end-of-horizon SoC above bare minimum",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|