1009 lines
35 KiB
Python
1009 lines
35 KiB
Python
"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import unittest
|
|
from datetime import datetime, timedelta, timezone
|
|
from types import SimpleNamespace
|
|
|
|
from services.planning_engine import (
|
|
PlanningSlot,
|
|
_dynamic_arb_floor_wh_series,
|
|
_prewindow_deferral_slots,
|
|
_slots_until_buy_le_threshold,
|
|
_slots_until_sell_lt,
|
|
_soc_panel_min_wh_series,
|
|
solve_dispatch,
|
|
)
|
|
|
|
|
|
def _slot(
|
|
*,
|
|
load: int = 2000,
|
|
buy: float = 3.0,
|
|
sell: float = 3.0,
|
|
pv_a: int = 0,
|
|
pv_b: int = 0,
|
|
) -> PlanningSlot:
|
|
return PlanningSlot(
|
|
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
|
|
buy_price=buy,
|
|
sell_price=sell,
|
|
pv_a_forecast_w=pv_a,
|
|
pv_b_forecast_w=pv_b,
|
|
load_baseline_w=load,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
|
|
|
|
def _battery(
|
|
*,
|
|
uc_wh: float = 100_000.0,
|
|
min_pct: float = 10.0,
|
|
arb_pct: float = 20.0,
|
|
max_pct: float = 95.0,
|
|
terminal_soc_value_factor: float = 0.9,
|
|
) -> SimpleNamespace:
|
|
uc = uc_wh
|
|
min_wh = min_pct / 100.0 * uc
|
|
arb_wh = arb_pct / 100.0 * uc
|
|
return SimpleNamespace(
|
|
usable_capacity_wh=uc,
|
|
min_soc_wh=min_wh,
|
|
arb_floor_wh=arb_wh,
|
|
reserve_soc_wh=arb_wh,
|
|
soc_max_wh=max_pct / 100.0 * uc,
|
|
charge_efficiency=0.95,
|
|
discharge_efficiency=0.95,
|
|
degradation_cost_czk_kwh=0.15,
|
|
max_charge_power_w=10_000,
|
|
max_discharge_power_w=10_000,
|
|
planner_terminal_soc_value_factor=terminal_soc_value_factor,
|
|
)
|
|
|
|
|
|
class SlotsUntilSellNegativeTests(unittest.TestCase):
|
|
def test_slots_until_first_negative_sell(self) -> None:
|
|
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
|
|
slots: list[PlanningSlot] = []
|
|
for i in range(10):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=1.0,
|
|
sell_price=2.0 if i < 4 else -0.5,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
)
|
|
)
|
|
dist = _slots_until_sell_lt(slots, 0.0)
|
|
self.assertEqual(dist[0], 4)
|
|
self.assertEqual(dist[3], 1)
|
|
self.assertEqual(dist[4], 0)
|
|
|
|
def test_prewindow_deferral_prefers_sell_anchor(self) -> None:
|
|
"""Když existuje záporný prodej, kotva je vzdálenost k němu, ne k extrémnímu buy."""
|
|
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
|
|
slots: list[PlanningSlot] = []
|
|
for i in range(8):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=-50.0,
|
|
sell_price=1.0 if i < 2 else -0.1,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
)
|
|
)
|
|
adv = _prewindow_deferral_slots(slots, -2.0)
|
|
self.assertEqual(adv[0], 2)
|
|
|
|
def test_prewindow_deferral_falls_back_to_buy_when_no_negative_sell(self) -> None:
|
|
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
|
|
slots: list[PlanningSlot] = []
|
|
for i in range(10):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=3.0 if i < 7 else -10.0,
|
|
sell_price=2.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
)
|
|
)
|
|
adv = _prewindow_deferral_slots(slots, -2.0)
|
|
self.assertEqual(adv[0], 7)
|
|
|
|
|
|
class SlotsUntilBuyExtremeTests(unittest.TestCase):
|
|
def test_slots_until_first_extreme(self) -> None:
|
|
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
|
|
slots: list[PlanningSlot] = []
|
|
for i in range(10):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=1.0,
|
|
sell_price=1.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
)
|
|
)
|
|
slots[-1] = PlanningSlot(
|
|
interval_start=slots[-1].interval_start,
|
|
buy_price=-10.0,
|
|
sell_price=0.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
)
|
|
dist = _slots_until_buy_le_threshold(slots, -2.0)
|
|
self.assertEqual(dist[0], 9)
|
|
self.assertEqual(dist[8], 1)
|
|
self.assertEqual(dist[9], 0)
|
|
|
|
def test_prewindow_clamps_relaxed_floor_until_close(self) -> None:
|
|
sm = [5000.0] * 10
|
|
dist = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] # obecná kotva (sell nebo buy)
|
|
panel = _soc_panel_min_wh_series(sm, dist, 10_000.0, 20_000.0, 2)
|
|
self.assertEqual(panel[0], 20_000.0)
|
|
self.assertEqual(panel[6], 20_000.0)
|
|
self.assertEqual(panel[7], 5000.0)
|
|
self.assertEqual(panel[9], 5000.0)
|
|
|
|
|
|
class DynamicArbFloorTests(unittest.TestCase):
|
|
def test_more_pv_ahead_lowers_floor(self) -> None:
|
|
"""Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu."""
|
|
min_w = 1_000.0
|
|
base_w = 2_000.0
|
|
uc = 10_000.0
|
|
s0 = _slot()
|
|
s_low_pv = replace_slot(s0, pv_a=100, pv_b=0)
|
|
s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0)
|
|
ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc)
|
|
ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc)
|
|
self.assertLess(ser_high[0], ser_low[0])
|
|
self.assertGreaterEqual(ser_low[0], min_w)
|
|
self.assertLessEqual(ser_low[0], base_w)
|
|
|
|
|
|
def replace_slot(
|
|
s: PlanningSlot,
|
|
*,
|
|
pv_a: int | None = None,
|
|
pv_b: int | None = None,
|
|
load: int | None = None,
|
|
) -> PlanningSlot:
|
|
return PlanningSlot(
|
|
interval_start=s.interval_start,
|
|
buy_price=s.buy_price,
|
|
sell_price=s.sell_price,
|
|
pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w,
|
|
pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w,
|
|
load_baseline_w=load if load is not None else s.load_baseline_w,
|
|
ev1_connected=s.ev1_connected,
|
|
ev2_connected=s.ev2_connected,
|
|
is_predicted_price=s.is_predicted_price,
|
|
)
|
|
|
|
|
|
class PlanningDispatchMilpTests(unittest.TestCase):
|
|
def test_neg_sell_with_future_neg_buy_prefers_curtail_pv_a_over_export(self) -> None:
|
|
"""
|
|
Když:
|
|
- aktuální slot má sell < 0 (export je náklad),
|
|
- v horizontu existuje budoucí buy < 0,
|
|
- a zároveň existuje PV B (necurtailable) někde v horizontu,
|
|
solver preferuje curtail PV A (ca) místo placeného exportu ge.
|
|
"""
|
|
slots = [
|
|
_slot(load=0, buy=3.0, sell=-0.1, pv_a=5000, pv_b=0),
|
|
_slot(load=0, buy=-10.0, sell=1.0, pv_a=0, pv_b=5000),
|
|
]
|
|
battery = _battery(uc_wh=50_000.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.50 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 2)
|
|
# Slot 0: PV A se má raději uříznout než vyvážet za zápornou cenu.
|
|
self.assertEqual(int(results[0].pv_a_curtailed_w), 5000)
|
|
self.assertGreaterEqual(int(results[0].grid_setpoint_w), 0)
|
|
|
|
def test_pv_surplus_export_uses_hard_export_cap(self) -> None:
|
|
slots = [
|
|
_slot(load=0, buy=3.0, sell=2.5, pv_a=20_000, pv_b=0),
|
|
]
|
|
battery = _battery()
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=13_500)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = battery.soc_max_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertEqual(results[0].export_mode, "PV_SURPLUS")
|
|
self.assertEqual(results[0].export_limit_w, 13_500)
|
|
self.assertGreater(results[0].pv_a_curtailed_w, 0)
|
|
|
|
def test_two_tier_soc_solves_optimal(self) -> None:
|
|
slots = [_slot()]
|
|
battery = _battery()
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.15 * battery.usable_capacity_wh
|
|
results, ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertGreaterEqual(ms, 0)
|
|
self.assertEqual(len(results), 1)
|
|
|
|
def test_deep_discharge_allows_covering_load_only(self) -> None:
|
|
slots = [
|
|
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
|
|
_slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0),
|
|
]
|
|
battery = _battery(uc_wh=50_000.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.12 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 2)
|
|
|
|
def test_negative_buy_price_allows_import_for_baseline(self) -> None:
|
|
slots = [_slot(load=6000, buy=-0.5, sell=2.0)]
|
|
battery = _battery()
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=8000,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=11_000,
|
|
battery_capacity_kwh=50.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.5 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertGreaterEqual(results[0].grid_setpoint_w, 0)
|
|
|
|
def test_export_implies_end_soc_at_least_reserve(self) -> None:
|
|
"""Bez arbitrážní relaxace: při ge >= 1 W musí koncové soc[t] >= arb_base_wh (rezerva z DB)."""
|
|
slots = [
|
|
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
|
|
_slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0),
|
|
]
|
|
battery = _battery(uc_wh=100_000.0, min_pct=10.0, arb_pct=20.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.22 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
reserve_pct = 20.0
|
|
for r in results:
|
|
if r.grid_setpoint_w < 0:
|
|
self.assertGreaterEqual(
|
|
r.battery_soc_target,
|
|
reserve_pct - 0.2,
|
|
msg="export slot must end at or above reserve SoC",
|
|
)
|
|
|
|
def test_export_before_extreme_negative_buy_can_end_below_reserve(self) -> None:
|
|
"""
|
|
Při relaxovaném soc_min (záporný buy v lookahead) smí významný export skončit u planner floor,
|
|
ne u provozní rezervy — jinak nejde ráno vypustit do sítě a nachystat kapacitu před levným nákupem.
|
|
"""
|
|
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
|
|
s0 = PlanningSlot(
|
|
interval_start=base,
|
|
buy_price=2.5,
|
|
sell_price=2.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=400,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
)
|
|
s1 = PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15),
|
|
buy_price=-12.0,
|
|
sell_price=-0.5,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=400,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
)
|
|
slots = [s0, s1]
|
|
battery = _battery(uc_wh=10_000.0, min_pct=10.0, arb_pct=20.0)
|
|
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
|
|
battery.planner_discharge_floor_percent = 5.0
|
|
battery.max_charge_power_w = 50_000
|
|
battery.max_discharge_power_w = 50_000
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.88 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 2)
|
|
if results[0].grid_setpoint_w < 0:
|
|
self.assertLess(
|
|
results[0].battery_soc_target,
|
|
19.0,
|
|
msg="with relaxed soc_min, first-slot export should be able to finish below reserve %",
|
|
)
|
|
|
|
def test_negative_sell_forbids_battery_export_arbitrage(self) -> None:
|
|
"""
|
|
Pokud sell < 0, solver nesmí vybíjet baterii do sítě pro arbitráž (dump musí proběhnout předtím).
|
|
V okně sell<0 smí export vzniknout jen z přebytku FVE; zde ale FVE=0, takže očekáváme grid_setpoint>=0.
|
|
"""
|
|
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
|
|
s0 = PlanningSlot(
|
|
interval_start=base,
|
|
buy_price=2.0,
|
|
sell_price=2.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
)
|
|
s1 = PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15),
|
|
buy_price=2.0,
|
|
sell_price=-0.5,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
)
|
|
s2 = PlanningSlot(
|
|
interval_start=base + timedelta(minutes=30),
|
|
buy_price=-15.0,
|
|
sell_price=-1.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
)
|
|
slots = [s0, s1, s2]
|
|
battery = _battery(uc_wh=10_000.0, min_pct=10.0, arb_pct=20.0)
|
|
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
|
|
battery.planner_discharge_floor_percent = 5.0
|
|
battery.max_charge_power_w = 50_000
|
|
battery.max_discharge_power_w = 50_000
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
]
|
|
soc0 = 0.9 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 3)
|
|
# V sell<0 slotu bez FVE a bez zátěže nesmí být export (to by muselo být z baterie).
|
|
self.assertGreaterEqual(results[1].grid_setpoint_w, 0)
|
|
# A zároveň nesmí být baterie ve výboji (dump musí proběhnout předtím).
|
|
self.assertGreaterEqual(results[1].battery_setpoint_w, 0)
|
|
|
|
def test_anchor_hits_floor_before_first_negative_sell(self) -> None:
|
|
"""
|
|
Pokud se v horizontu objeví první sell<0 a současně existuje planner floor (relaxace),
|
|
solver má skončit už v předchozím slotu u planner floor (cca 5 %), ne na ~15 %.
|
|
"""
|
|
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
|
|
# Slot 0-1: sell >= 0; slot 2: první sell < 0; slot 3: extrémně záporný buy (motivace k bufferu).
|
|
slots = [
|
|
PlanningSlot(
|
|
interval_start=base,
|
|
buy_price=3.0,
|
|
sell_price=1.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15),
|
|
buy_price=3.0,
|
|
sell_price=0.5,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=30),
|
|
buy_price=3.0,
|
|
sell_price=-0.2,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=45),
|
|
buy_price=-20.0,
|
|
sell_price=-1.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
]
|
|
battery = _battery(uc_wh=20_000.0, min_pct=10.0, arb_pct=20.0)
|
|
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
|
|
battery.planner_discharge_floor_percent = 5.0
|
|
battery.max_charge_power_w = 50_000
|
|
battery.max_discharge_power_w = 50_000
|
|
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
]
|
|
soc0 = 0.9 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
# Slot index 1 je poslední před prvním sell<0 (index 2).
|
|
self.assertLessEqual(
|
|
results[1].battery_soc_target,
|
|
6.0,
|
|
msg="anchor should drive SoC close to planner floor before first negative sell",
|
|
)
|
|
|
|
def test_anchor_uses_planner_floor_even_without_extreme_buy(self) -> None:
|
|
"""
|
|
Regrese: pokud v horizontu není buy <= threshold (soc_min_series by se nerelaxovala),
|
|
kotva před sell<0 má stejně mířit na planner floor (5 %), ne na base min SoC.
|
|
"""
|
|
base = datetime(2026, 4, 3, 6, 0, tzinfo=timezone.utc)
|
|
slots = [
|
|
PlanningSlot(
|
|
interval_start=base,
|
|
buy_price=3.0,
|
|
sell_price=1.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15),
|
|
buy_price=3.0,
|
|
sell_price=0.5,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=30),
|
|
buy_price=3.0,
|
|
sell_price=-0.2,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
),
|
|
]
|
|
battery = _battery(uc_wh=20_000.0, min_pct=12.0, arb_pct=20.0)
|
|
battery.planner_extreme_buy_threshold_czk_kwh = -2.0
|
|
battery.planner_discharge_floor_percent = 5.0
|
|
battery.max_charge_power_w = 50_000
|
|
battery.max_discharge_power_w = 50_000
|
|
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
|
grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000)
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
]
|
|
soc0 = 0.9 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
# Slot index 1 je poslední před prvním sell<0 (index 2).
|
|
self.assertLessEqual(results[1].battery_soc_target, 6.0)
|
|
|
|
def test_grid_import_soft_cap_penalizes_breaker_overdraw(self) -> None:
|
|
"""
|
|
Soft cap: solver může nominálně překročit breaker, ale jen pokud se to vyplatí.
|
|
Při běžné (nezáporné) nákupní ceně by měl držet import <= breaker.
|
|
"""
|
|
slots = [_slot(load=3700, buy=0.4, sell=-0.3, pv_a=0, pv_b=1500)]
|
|
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0)
|
|
battery.max_charge_power_w = 18_000
|
|
battery.max_discharge_power_w = 18_000
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.55 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertLessEqual(
|
|
results[0].grid_setpoint_w,
|
|
grid.max_import_power_w,
|
|
msg="soft cap: for normal buy price, planned grid import should not exceed breaker",
|
|
)
|
|
|
|
def test_grid_import_soft_cap_allows_overdraw_when_extremely_negative(self) -> None:
|
|
"""
|
|
Regrese: při extrémně záporné nákupní ceně může solver překročit breaker (za cenu penalizace),
|
|
aby stihl krátké okno nabíjení. Překročení nesmí být 'zadarmo' (kontrolujeme alespoň, že existuje).
|
|
"""
|
|
# Dvouslotový scénář: v 1. slotu extrémně záporná cena, ve 2. slotu drahá.
|
|
# Terminal SoC kotva pak nepenalizuje držení energie (průměrná buy je ~0) a solver má motivaci
|
|
# v 1. slotu nabít na max, i kdyby to znamenalo malé překročení breakeru.
|
|
s0 = _slot(load=0, buy=-20.0, sell=-0.3, pv_a=0, pv_b=0)
|
|
s1 = replace_slot(s0, load=0)
|
|
s1 = PlanningSlot(
|
|
interval_start=s0.interval_start + timedelta(minutes=15),
|
|
buy_price=20.0,
|
|
sell_price=-0.3,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=0,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
slots = [s0, s1]
|
|
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0)
|
|
battery.max_charge_power_w = 18_000
|
|
battery.max_discharge_power_w = 18_000
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
|
]
|
|
soc0 = 0.55 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 2)
|
|
self.assertGreater(
|
|
results[0].grid_setpoint_w,
|
|
grid.max_import_power_w,
|
|
msg="with very negative buy price, solver may choose to exceed breaker (soft cap)",
|
|
)
|
|
|
|
def test_block_export_on_negative_sell_no_grid_export_pv_surplus(self) -> None:
|
|
"""site_grid_connection.block_export_on_negative_sell → ge=0 při sell<0."""
|
|
slots = [
|
|
PlanningSlot(
|
|
interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc),
|
|
buy_price=5.25,
|
|
sell_price=-0.5,
|
|
pv_a_forecast_w=7000,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=500,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=False,
|
|
)
|
|
]
|
|
battery = _battery(uc_wh=20_000.0, arb_pct=15.0, max_pct=95.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(
|
|
max_import_power_w=17_000,
|
|
max_export_power_w=8000,
|
|
block_export_on_negative_sell=True,
|
|
)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.34 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 1)
|
|
self.assertGreaterEqual(results[0].grid_setpoint_w, 0, "no grid export")
|
|
self.assertGreater(results[0].battery_setpoint_w, 0, "surplus PV should charge")
|
|
|
|
|
|
class TerminalSocShadowTests(unittest.TestCase):
|
|
"""Terminal SoC shadow price v objective drží konec horizontu nad holým minimem."""
|
|
|
|
def test_terminal_soc_shadow_price_prevents_drain(self) -> None:
|
|
base = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc)
|
|
slots = []
|
|
for i in range(3):
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=15 * i),
|
|
buy_price=2.0,
|
|
sell_price=0.6,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=600,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
)
|
|
slots.append(
|
|
PlanningSlot(
|
|
interval_start=base + timedelta(minutes=45),
|
|
buy_price=2.0,
|
|
sell_price=14.0,
|
|
pv_a_forecast_w=0,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=600,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
is_predicted_price=False,
|
|
)
|
|
)
|
|
battery = _battery(uc_wh=12_000.0, min_pct=12.0, arb_pct=20.0)
|
|
hp = SimpleNamespace(
|
|
rated_heating_power_w=0,
|
|
tuv_min_temp_c=45.0,
|
|
tuv_target_temp_c=55.0,
|
|
)
|
|
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
|
vehicles = [
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
SimpleNamespace(
|
|
max_charge_power_w=0,
|
|
battery_capacity_kwh=1.0,
|
|
default_target_soc_pct=80.0,
|
|
),
|
|
]
|
|
soc0 = 0.5 * battery.usable_capacity_wh
|
|
results, _ms, _ = solve_dispatch(
|
|
slots,
|
|
battery,
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
soc0,
|
|
50.0,
|
|
tuv_delta_stats=None,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(results), 4)
|
|
# Bez shadow price by solver mohl končit u min SoC; kotva drží znatelnou rezervu.
|
|
self.assertGreaterEqual(
|
|
results[-1].battery_soc_target,
|
|
15.0,
|
|
msg="terminal SoC shadow price should keep end-of-horizon SoC above bare minimum",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|