287 lines
9.3 KiB
Python
287 lines
9.3 KiB
Python
"""Měkké safety SoC a rolling charge commitment v solve_dispatch."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import unittest
|
|
from datetime import datetime, timedelta, timezone
|
|
from types import SimpleNamespace
|
|
|
|
from services.planning_engine import PlanningSlot, solve_dispatch
|
|
|
|
|
|
def _bat(**kwargs: object) -> SimpleNamespace:
|
|
base = dict(
|
|
usable_capacity_wh=20_000.0,
|
|
min_soc_wh=2000.0,
|
|
arb_floor_wh=4000.0,
|
|
reserve_soc_wh=4000.0,
|
|
soc_max_wh=19_000.0,
|
|
charge_efficiency=0.95,
|
|
discharge_efficiency=0.95,
|
|
degradation_cost_czk_kwh=0.1,
|
|
max_charge_power_w=5000,
|
|
max_discharge_power_w=5000,
|
|
planner_terminal_soc_value_factor=0.2,
|
|
planner_extreme_buy_threshold_czk_kwh=-5.0,
|
|
planner_discharge_floor_percent=None,
|
|
planner_discharge_relax_prewindow_slots=8,
|
|
planner_daytime_charge_target_enabled=True,
|
|
planner_charge_commitment_penalty_czk_kwh=0.5,
|
|
)
|
|
base.update(kwargs)
|
|
return SimpleNamespace(**base)
|
|
|
|
|
|
def _grid() -> SimpleNamespace:
|
|
return SimpleNamespace(
|
|
max_import_power_w=11_000,
|
|
max_export_power_w=11_000,
|
|
block_export_on_negative_sell=False,
|
|
deye_gen_microinverter_cutoff_enabled=False,
|
|
)
|
|
|
|
|
|
def _hp() -> SimpleNamespace:
|
|
return SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
|
|
|
|
|
def _slot(
|
|
t0: datetime,
|
|
idx: int,
|
|
*,
|
|
buy: float = 3.0,
|
|
sell: float = 2.5,
|
|
pv_a: int = 0,
|
|
load: int = 1500,
|
|
safety: float | None = None,
|
|
fut_buy: float | None = None,
|
|
fut_sell: float | None = None,
|
|
daytime_pv_surplus: bool = False,
|
|
) -> PlanningSlot:
|
|
return PlanningSlot(
|
|
interval_start=t0 + timedelta(minutes=15 * idx),
|
|
buy_price=buy,
|
|
sell_price=sell,
|
|
pv_a_forecast_w=pv_a,
|
|
pv_b_forecast_w=0,
|
|
load_baseline_w=load,
|
|
ev1_connected=False,
|
|
ev2_connected=False,
|
|
allow_charge=True,
|
|
allow_discharge_export=True,
|
|
safety_soc_target_wh=safety,
|
|
future_avoided_buy_czk_kwh=fut_buy,
|
|
future_sell_opportunity_czk_kwh=fut_sell,
|
|
is_daytime_pv_surplus_slot=daytime_pv_surplus,
|
|
)
|
|
|
|
|
|
class PlanningSafetyCommitmentTests(unittest.TestCase):
|
|
def test_solver_snapshot_has_version_and_masks(self) -> None:
|
|
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
|
|
slots = [_slot(t0, i, buy=2.0, sell=2.0, pv_a=6000, load=1500) for i in range(8)]
|
|
hp, grid = _hp(), _grid()
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
|
|
] * 2
|
|
res, _ms, snap = solve_dispatch(
|
|
slots,
|
|
_bat(),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=5000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
)
|
|
self.assertEqual(len(res), 8)
|
|
self.assertEqual(snap.get("version"), 1)
|
|
self.assertIn("masks", snap)
|
|
self.assertEqual(len(snap["masks"]), 8)
|
|
|
|
def test_charge_commitment_snapshot_populated(self) -> None:
|
|
"""Rolling kotva: při předchozím nabíjení z PV se do snapshotu zapíše commitment."""
|
|
t0 = datetime(2026, 5, 4, 10, 0, tzinfo=timezone.utc)
|
|
slots = [_slot(t0, i, buy=1.5, sell=1.2, pv_a=8000, load=1000) for i in range(12)]
|
|
hp, grid = _hp(), _grid()
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
|
|
] * 2
|
|
prev = [None] * 12
|
|
prev[0] = 4000.0
|
|
_res1, _, snap1 = solve_dispatch(
|
|
slots,
|
|
_bat(),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=4000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
charge_commitment_prev_w=prev,
|
|
)
|
|
self.assertTrue(snap1["chosen_slots"]["charge_commitment"])
|
|
_res2, _, snap2 = solve_dispatch(
|
|
slots,
|
|
_bat(),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=4000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
charge_commitment_prev_w=None,
|
|
)
|
|
self.assertEqual(snap2["chosen_slots"]["charge_commitment"], [])
|
|
|
|
def test_export_floor_uses_safety_target_in_non_high_sell_slot(self) -> None:
|
|
"""Regrese: safety target nemá tlačit jen přes objective, ale chránit export floor."""
|
|
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
|
|
# Slot 0 není high-sell (future max sell je vyšší), ale safety target je nad arb_base.
|
|
slots = [
|
|
_slot(
|
|
t0,
|
|
0,
|
|
buy=3.0,
|
|
sell=2.0,
|
|
pv_a=8000,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0, # high-sell somewhere later, not this slot
|
|
daytime_pv_surplus=True,
|
|
),
|
|
_slot(
|
|
t0,
|
|
1,
|
|
buy=3.0,
|
|
sell=6.0,
|
|
pv_a=0,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0,
|
|
daytime_pv_surplus=False,
|
|
),
|
|
]
|
|
hp, grid = _hp(), _grid()
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
|
|
] * 2
|
|
_res, _ms, snap = solve_dispatch(
|
|
slots,
|
|
_bat(arb_floor_wh=4000.0, reserve_soc_wh=4000.0, min_soc_wh=2000.0, soc_max_wh=19_000.0),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=8000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
)
|
|
b0 = snap["soc_bounds"][0]
|
|
self.assertEqual(b0["export_floor_reason"], "safety_export_floor")
|
|
self.assertEqual(float(b0["export_soc_floor_wh"]), 12_000.0)
|
|
self.assertFalse(bool(b0["high_sell_slot"]))
|
|
|
|
def test_export_floor_keeps_arb_base_in_high_sell_slot(self) -> None:
|
|
"""High-sell výjimka: v peak slotu nesmí safety floor blokovat arbitráž."""
|
|
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
|
|
# Slot 0 je high-sell (sell == future max), safety target je nad arb_base, ale nemá se aplikovat.
|
|
slots = [
|
|
_slot(
|
|
t0,
|
|
0,
|
|
buy=3.0,
|
|
sell=6.0,
|
|
pv_a=0,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0,
|
|
daytime_pv_surplus=False,
|
|
),
|
|
_slot(
|
|
t0,
|
|
1,
|
|
buy=3.0,
|
|
sell=2.0,
|
|
pv_a=0,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0,
|
|
daytime_pv_surplus=False,
|
|
),
|
|
]
|
|
hp, grid = _hp(), _grid()
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
|
|
] * 2
|
|
_res, _ms, snap = solve_dispatch(
|
|
slots,
|
|
_bat(arb_floor_wh=4000.0, reserve_soc_wh=4000.0, min_soc_wh=2000.0, soc_max_wh=19_000.0),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=8000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
)
|
|
b0 = snap["soc_bounds"][0]
|
|
self.assertTrue(bool(b0["high_sell_slot"]))
|
|
self.assertEqual(b0["export_floor_reason"], "arb_base")
|
|
self.assertEqual(float(b0["export_soc_floor_wh"]), 4000.0)
|
|
|
|
def test_safety_penalty_only_active_in_daytime_pv_surplus_slots(self) -> None:
|
|
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
|
|
slots = [
|
|
_slot(
|
|
t0,
|
|
0,
|
|
buy=3.0,
|
|
sell=2.0,
|
|
pv_a=8000,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0,
|
|
daytime_pv_surplus=True,
|
|
),
|
|
_slot(
|
|
t0,
|
|
1,
|
|
buy=3.0,
|
|
sell=2.0,
|
|
pv_a=0,
|
|
load=500,
|
|
safety=12_000.0,
|
|
fut_sell=6.0,
|
|
daytime_pv_surplus=False,
|
|
),
|
|
]
|
|
hp, grid = _hp(), _grid()
|
|
vehicles = [
|
|
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
|
|
] * 2
|
|
_res, _ms, snap = solve_dispatch(
|
|
slots,
|
|
_bat(),
|
|
hp,
|
|
grid,
|
|
[None, None],
|
|
vehicles,
|
|
current_soc_wh=8000.0,
|
|
current_tuv_temp_c=50.0,
|
|
operating_mode="AUTO",
|
|
)
|
|
t0o = snap["objective_terms"][0]
|
|
t1o = snap["objective_terms"][1]
|
|
self.assertTrue(bool(t0o["safety_penalty_active"]))
|
|
self.assertGreater(float(t0o["safety_deficit_penalty_czk_per_wh"]), 0.0)
|
|
self.assertFalse(bool(t1o["safety_penalty_active"]))
|
|
self.assertEqual(float(t1o["safety_deficit_penalty_czk_per_wh"]), 0.0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|