"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB).""" from __future__ import annotations import unittest from datetime import datetime, timedelta, timezone from types import SimpleNamespace from services.planning_engine import ( PlanningSlot, _dynamic_arb_floor_wh_series, solve_dispatch, ) def _slot( *, load: int = 2000, buy: float = 3.0, sell: float = 3.0, pv_a: int = 0, pv_b: int = 0, ) -> PlanningSlot: return PlanningSlot( interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc), buy_price=buy, sell_price=sell, pv_a_forecast_w=pv_a, pv_b_forecast_w=pv_b, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=False, ) def _battery( *, uc_wh: float = 100_000.0, min_pct: float = 10.0, arb_pct: float = 20.0, max_pct: float = 95.0, ) -> SimpleNamespace: uc = uc_wh min_wh = min_pct / 100.0 * uc arb_wh = arb_pct / 100.0 * uc return SimpleNamespace( usable_capacity_wh=uc, min_soc_wh=min_wh, arb_floor_wh=arb_wh, reserve_soc_wh=arb_wh, soc_max_wh=max_pct / 100.0 * uc, charge_efficiency=0.95, discharge_efficiency=0.95, degradation_cost_czk_kwh=0.15, max_charge_power_w=10_000, max_discharge_power_w=10_000, ) class DynamicArbFloorTests(unittest.TestCase): def test_more_pv_ahead_lowers_floor(self) -> None: """Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu.""" min_w = 1_000.0 base_w = 2_000.0 uc = 10_000.0 s0 = _slot() s_low_pv = replace_slot(s0, pv_a=100, pv_b=0) s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0) ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc) ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc) self.assertLess(ser_high[0], ser_low[0]) self.assertGreaterEqual(ser_low[0], min_w) self.assertLessEqual(ser_low[0], base_w) def replace_slot( s: PlanningSlot, *, pv_a: int | None = None, pv_b: int | None = None, load: int | None = None, ) -> PlanningSlot: return PlanningSlot( interval_start=s.interval_start, buy_price=s.buy_price, sell_price=s.sell_price, pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w, pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w, load_baseline_w=load if load is not None else s.load_baseline_w, ev1_connected=s.ev1_connected, ev2_connected=s.ev2_connected, is_predicted_price=s.is_predicted_price, ) class PlanningDispatchMilpTests(unittest.TestCase): def test_two_tier_soc_solves_optimal(self) -> None: slots = [_slot()] battery = _battery() hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000) vehicles = [ SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), ] soc0 = 0.15 * battery.usable_capacity_wh results, ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertGreaterEqual(ms, 0) self.assertEqual(len(results), 1) def test_deep_discharge_allows_covering_load_only(self) -> None: slots = [ _slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0), _slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0), ] battery = _battery(uc_wh=50_000.0) hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000) vehicles = [ SimpleNamespace( max_charge_power_w=11_000, battery_capacity_kwh=50.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=11_000, battery_capacity_kwh=50.0, default_target_soc_pct=80.0, ), ] soc0 = 0.12 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertEqual(len(results), 2) def test_negative_buy_price_allows_import_for_baseline(self) -> None: slots = [_slot(load=6000, buy=-0.5, sell=2.0)] battery = _battery() hp = SimpleNamespace( rated_heating_power_w=8000, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000) vehicles = [ SimpleNamespace( max_charge_power_w=11_000, battery_capacity_kwh=50.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=11_000, battery_capacity_kwh=50.0, default_target_soc_pct=80.0, ), ] soc0 = 0.5 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertGreaterEqual(results[0].grid_setpoint_w, 0) def test_export_implies_end_soc_at_least_reserve(self) -> None: """Při ge >= 1 W musí koncové soc[t] >= arb_base_wh (rezerva z DB).""" slots = [ _slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0), _slot(load=500, buy=2.0, sell=8.0, pv_a=0, pv_b=0), ] battery = _battery(uc_wh=100_000.0, min_pct=10.0, arb_pct=20.0) hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=50_000, max_export_power_w=50_000) vehicles = [ SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), ] soc0 = 0.22 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) reserve_pct = 20.0 for r in results: if r.grid_setpoint_w < 0: self.assertGreaterEqual( r.battery_soc_target, reserve_pct - 0.2, msg="export slot must end at or above reserve SoC", ) def test_grid_import_soft_cap_penalizes_breaker_overdraw(self) -> None: """ Soft cap: solver může nominálně překročit breaker, ale jen pokud se to vyplatí. Při běžné (nezáporné) nákupní ceně by měl držet import <= breaker. """ slots = [_slot(load=3700, buy=0.4, sell=-0.3, pv_a=0, pv_b=1500)] battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0) battery.max_charge_power_w = 18_000 battery.max_discharge_power_w = 18_000 hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500) vehicles = [ SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), ] soc0 = 0.55 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertEqual(len(results), 1) self.assertLessEqual( results[0].grid_setpoint_w, grid.max_import_power_w, msg="soft cap: for normal buy price, planned grid import should not exceed breaker", ) def test_grid_import_soft_cap_allows_overdraw_when_extremely_negative(self) -> None: """ Regrese: při extrémně záporné nákupní ceně může solver překročit breaker (za cenu penalizace), aby stihl krátké okno nabíjení. Překročení nesmí být 'zadarmo' (kontrolujeme alespoň, že existuje). """ # Dvouslotový scénář: v 1. slotu extrémně záporná cena, ve 2. slotu drahá. # Terminal SoC kotva pak nepenalizuje držení energie (průměrná buy je ~0) a solver má motivaci # v 1. slotu nabít na max, i kdyby to znamenalo malé překročení breakeru. s0 = _slot(load=0, buy=-20.0, sell=-0.3, pv_a=0, pv_b=0) s1 = replace_slot(s0, load=0) s1 = PlanningSlot( interval_start=s0.interval_start + timedelta(minutes=15), buy_price=20.0, sell_price=-0.3, pv_a_forecast_w=0, pv_b_forecast_w=0, load_baseline_w=0, ev1_connected=False, ev2_connected=False, is_predicted_price=False, ) slots = [s0, s1] battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0) battery.max_charge_power_w = 18_000 battery.max_discharge_power_w = 18_000 hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500) vehicles = [ SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0), SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0), ] soc0 = 0.55 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertEqual(len(results), 2) self.assertGreater( results[0].grid_setpoint_w, grid.max_import_power_w, msg="with very negative buy price, solver may choose to exceed breaker (soft cap)", ) class TerminalSocShadowTests(unittest.TestCase): """Terminal SoC shadow price v objective drží konec horizontu nad holým minimem.""" def test_terminal_soc_shadow_price_prevents_drain(self) -> None: base = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc) slots = [] for i in range(3): slots.append( PlanningSlot( interval_start=base + timedelta(minutes=15 * i), buy_price=2.0, sell_price=0.6, pv_a_forecast_w=0, pv_b_forecast_w=0, load_baseline_w=600, ev1_connected=False, ev2_connected=False, is_predicted_price=False, ) ) slots.append( PlanningSlot( interval_start=base + timedelta(minutes=45), buy_price=2.0, sell_price=14.0, pv_a_forecast_w=0, pv_b_forecast_w=0, load_baseline_w=600, ev1_connected=False, ev2_connected=False, is_predicted_price=False, ) ) battery = _battery(uc_wh=12_000.0, min_pct=12.0, arb_pct=20.0) hp = SimpleNamespace( rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0, ) grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000) vehicles = [ SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ), ] soc0 = 0.5 * battery.usable_capacity_wh results, _ms = solve_dispatch( slots, battery, hp, grid, [None, None], vehicles, soc0, 50.0, tuv_delta_stats=None, operating_mode="AUTO", ) self.assertEqual(len(results), 4) # Bez shadow price by solver mohl končit u min SoC; kotva drží znatelnou rezervu. self.assertGreaterEqual( results[-1].battery_soc_target, 15.0, msg="terminal SoC shadow price should keep end-of-horizon SoC above bare minimum", ) if __name__ == "__main__": unittest.main()