Files
ems/backend/tests/test_planning_safety_commitment.py
Dusan Vojacek 64327af8e0
Some checks failed
CI and deploy / migration-check (push) Failing after 17s
CI and deploy / deploy (push) Has been skipped
fix KV1/BA81 cyklovani
2026-05-06 12:50:05 +02:00

287 lines
9.3 KiB
Python

"""Měkké safety SoC a rolling charge commitment v solve_dispatch."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.planning_engine import PlanningSlot, solve_dispatch
def _bat(**kwargs: object) -> SimpleNamespace:
base = dict(
usable_capacity_wh=20_000.0,
min_soc_wh=2000.0,
arb_floor_wh=4000.0,
reserve_soc_wh=4000.0,
soc_max_wh=19_000.0,
charge_efficiency=0.95,
discharge_efficiency=0.95,
degradation_cost_czk_kwh=0.1,
max_charge_power_w=5000,
max_discharge_power_w=5000,
planner_terminal_soc_value_factor=0.2,
planner_extreme_buy_threshold_czk_kwh=-5.0,
planner_discharge_floor_percent=None,
planner_discharge_relax_prewindow_slots=8,
planner_daytime_charge_target_enabled=True,
planner_charge_commitment_penalty_czk_kwh=0.5,
)
base.update(kwargs)
return SimpleNamespace(**base)
def _grid() -> SimpleNamespace:
return SimpleNamespace(
max_import_power_w=11_000,
max_export_power_w=11_000,
block_export_on_negative_sell=False,
deye_gen_microinverter_cutoff_enabled=False,
)
def _hp() -> SimpleNamespace:
return SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
def _slot(
t0: datetime,
idx: int,
*,
buy: float = 3.0,
sell: float = 2.5,
pv_a: int = 0,
load: int = 1500,
safety: float | None = None,
fut_buy: float | None = None,
fut_sell: float | None = None,
daytime_pv_surplus: bool = False,
) -> PlanningSlot:
return PlanningSlot(
interval_start=t0 + timedelta(minutes=15 * idx),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=0,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
safety_soc_target_wh=safety,
future_avoided_buy_czk_kwh=fut_buy,
future_sell_opportunity_czk_kwh=fut_sell,
is_daytime_pv_surplus_slot=daytime_pv_surplus,
)
class PlanningSafetyCommitmentTests(unittest.TestCase):
def test_solver_snapshot_has_version_and_masks(self) -> None:
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
slots = [_slot(t0, i, buy=2.0, sell=2.0, pv_a=6000, load=1500) for i in range(8)]
hp, grid = _hp(), _grid()
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
] * 2
res, _ms, snap = solve_dispatch(
slots,
_bat(),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=5000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
)
self.assertEqual(len(res), 8)
self.assertEqual(snap.get("version"), 1)
self.assertIn("masks", snap)
self.assertEqual(len(snap["masks"]), 8)
def test_charge_commitment_snapshot_populated(self) -> None:
"""Rolling kotva: při předchozím nabíjení z PV se do snapshotu zapíše commitment."""
t0 = datetime(2026, 5, 4, 10, 0, tzinfo=timezone.utc)
slots = [_slot(t0, i, buy=1.5, sell=1.2, pv_a=8000, load=1000) for i in range(12)]
hp, grid = _hp(), _grid()
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
] * 2
prev = [None] * 12
prev[0] = 4000.0
_res1, _, snap1 = solve_dispatch(
slots,
_bat(),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=4000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
charge_commitment_prev_w=prev,
)
self.assertTrue(snap1["chosen_slots"]["charge_commitment"])
_res2, _, snap2 = solve_dispatch(
slots,
_bat(),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=4000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
charge_commitment_prev_w=None,
)
self.assertEqual(snap2["chosen_slots"]["charge_commitment"], [])
def test_export_floor_uses_safety_target_in_non_high_sell_slot(self) -> None:
"""Regrese: safety target nemá tlačit jen přes objective, ale chránit export floor."""
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
# Slot 0 není high-sell (future max sell je vyšší), ale safety target je nad arb_base.
slots = [
_slot(
t0,
0,
buy=3.0,
sell=2.0,
pv_a=8000,
load=500,
safety=12_000.0,
fut_sell=6.0, # high-sell somewhere later, not this slot
daytime_pv_surplus=True,
),
_slot(
t0,
1,
buy=3.0,
sell=6.0,
pv_a=0,
load=500,
safety=12_000.0,
fut_sell=6.0,
daytime_pv_surplus=False,
),
]
hp, grid = _hp(), _grid()
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
] * 2
_res, _ms, snap = solve_dispatch(
slots,
_bat(arb_floor_wh=4000.0, reserve_soc_wh=4000.0, min_soc_wh=2000.0, soc_max_wh=19_000.0),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=8000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
)
b0 = snap["soc_bounds"][0]
self.assertEqual(b0["export_floor_reason"], "safety_export_floor")
self.assertEqual(float(b0["export_soc_floor_wh"]), 12_000.0)
self.assertFalse(bool(b0["high_sell_slot"]))
def test_export_floor_keeps_arb_base_in_high_sell_slot(self) -> None:
"""High-sell výjimka: v peak slotu nesmí safety floor blokovat arbitráž."""
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
# Slot 0 je high-sell (sell == future max), safety target je nad arb_base, ale nemá se aplikovat.
slots = [
_slot(
t0,
0,
buy=3.0,
sell=6.0,
pv_a=0,
load=500,
safety=12_000.0,
fut_sell=6.0,
daytime_pv_surplus=False,
),
_slot(
t0,
1,
buy=3.0,
sell=2.0,
pv_a=0,
load=500,
safety=12_000.0,
fut_sell=6.0,
daytime_pv_surplus=False,
),
]
hp, grid = _hp(), _grid()
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
] * 2
_res, _ms, snap = solve_dispatch(
slots,
_bat(arb_floor_wh=4000.0, reserve_soc_wh=4000.0, min_soc_wh=2000.0, soc_max_wh=19_000.0),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=8000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
)
b0 = snap["soc_bounds"][0]
self.assertTrue(bool(b0["high_sell_slot"]))
self.assertEqual(b0["export_floor_reason"], "arb_base")
self.assertEqual(float(b0["export_soc_floor_wh"]), 4000.0)
def test_safety_penalty_only_active_in_daytime_pv_surplus_slots(self) -> None:
t0 = datetime(2026, 5, 4, 8, 0, tzinfo=timezone.utc)
slots = [
_slot(
t0,
0,
buy=3.0,
sell=2.0,
pv_a=8000,
load=500,
safety=12_000.0,
fut_sell=6.0,
daytime_pv_surplus=True,
),
_slot(
t0,
1,
buy=3.0,
sell=2.0,
pv_a=0,
load=500,
safety=12_000.0,
fut_sell=6.0,
daytime_pv_surplus=False,
),
]
hp, grid = _hp(), _grid()
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0)
] * 2
_res, _ms, snap = solve_dispatch(
slots,
_bat(),
hp,
grid,
[None, None],
vehicles,
current_soc_wh=8000.0,
current_tuv_temp_c=50.0,
operating_mode="AUTO",
)
t0o = snap["objective_terms"][0]
t1o = snap["objective_terms"][1]
self.assertTrue(bool(t0o["safety_penalty_active"]))
self.assertGreater(float(t0o["safety_deficit_penalty_czk_per_wh"]), 0.0)
self.assertFalse(bool(t1o["safety_penalty_active"]))
self.assertEqual(float(t1o["safety_deficit_penalty_czk_per_wh"]), 0.0)
if __name__ == "__main__":
unittest.main()