Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek 93f883f5e0
Some checks failed
CI and deploy / migration-check (push) Successful in 5s
CI and deploy / deploy (push) Failing after 20s
sql first refactor
2026-04-19 20:02:20 +02:00

195 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích slotů (anti-micro-cycling) referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Tento soubor drží kopii algoritmu
pro rychlé unit testy bez PostgreSQL.
"""
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from types import SimpleNamespace
from services.planning_engine import INTERVAL_H, PlanningSlot
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set()
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
selected: set[int] = set()
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
if pv_surplus_w > 0:
selected.add(t)
grid_target_wh = energy_to_fill * charge_buf
if grid_target_wh <= 0 or per_slot_full_wh <= 0:
return selected
grid_candidates = [
(t, float(s.buy_price)) for t, s in enumerate(slots) if t not in selected
]
grid_candidates.sort(key=lambda x: x[1])
cumulative = 0.0
for t, _price in grid_candidates:
if cumulative >= grid_target_wh:
break
selected.add(t)
cumulative += per_slot_full_wh
return selected
def _slot(*, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000) -> PlanningSlot:
return PlanningSlot(
interval_start=datetime(2026, 4, 19, 12, 0, tzinfo=timezone.utc),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
)
def _battery(
*,
charge_buf: float = 1.3,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
max_charge_w: float = 18_000.0,
charge_eff: float = 0.95,
) -> SimpleNamespace:
return SimpleNamespace(
usable_capacity_wh=uc_wh,
soc_max_wh=soc_max_pct / 100.0 * uc_wh,
max_charge_power_w=max_charge_w,
charge_efficiency=charge_eff,
charge_slot_buffer=charge_buf,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_pv_surplus_slot_always_selected_regardless_of_buy_price(self) -> None:
"""Slot s PV-surplus má být in, i když má nejvyšší buy_price."""
slots = [
_slot(buy=0.5, pv=0, load=2_000), # bez PV, levný grid
_slot(buy=9.9, pv=8_000, load=2_000), # velký PV-surplus, drahý grid
]
battery = _battery()
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(1, out)
def test_grid_filler_prefers_lowest_buy_price(self) -> None:
"""Mezi neprvními sloty se vybere ten s nejnižší buy_price."""
slots = [
_slot(buy=3.0, pv=0, load=2_000, sell=0.1),
_slot(buy=0.4, pv=0, load=2_000, sell=0.3),
_slot(buy=1.2, pv=0, load=2_000, sell=0.2),
]
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(1, out)
def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None:
"""Regrese: dřívější logika vyřazovala sloty bez PV-surplus úplně."""
slots = [
_slot(buy=0.4, pv=3_320, load=3_747),
_slot(buy=0.42, pv=2_116, load=3_747),
_slot(buy=0.44, pv=1_649, load=3_747),
_slot(buy=0.47, pv=1_276, load=3_747),
_slot(buy=1.13, pv=1_286, load=523),
_slot(buy=1.60, pv=1_020, load=523),
]
battery = _battery()
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
for idx in (0, 1, 2, 3):
self.assertIn(
idx,
out,
msg=(
f"Slot {idx} (levný grid nákup ~0.4 Kč) musí být povolen pro "
"nabíjení i bez PV-surplus, jinak optimizer skončí s dražším "
"nákupem v pozdějších slotech (nelogická ekonomika)."
),
)
def test_long_horizon_pv_surplus_does_not_exhaust_grid_budget(self) -> None:
"""Regrese: v 96h horizontu nesmí PV-surplus sloty „vyžrat“ grid rozpočet.
V dřívější verzi se kumulativní PV budget odečítal od `charge_buf × headroom`,
takže v dlouhém horizontu s mnoha PV sloty zbylo 0 na grid filler a levné
grid sloty se nepovolily. Tento test simuluje realistický 96h profil.
"""
# 40 levných grid-only slotů (simulace noční / „inter-peak“ hodiny).
cheap_grid = [_slot(buy=0.4 + 0.01 * i, pv=0, load=2_000) for i in range(40)]
# 100 PV-surplus slotů s velkou výrobou (přes den, přes víc dní).
pv_days = [_slot(buy=1.5, pv=10_000, load=2_000) for _ in range(100)]
slots = cheap_grid + pv_days
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
grid_selected = sum(1 for i in range(len(cheap_grid)) if i in out)
self.assertGreaterEqual(
grid_selected,
5,
msg=(
"V dlouhém horizontu s mnoha PV-surplus sloty musí zůstat dostatek "
"grid slotů povolených pro nabíjení z levného importu."
),
)
def test_energy_budget_is_charge_buf_times_headroom(self) -> None:
"""Součet uvolněné energie se pohybuje okolo charge_buf × (soc_max current_soc)."""
slots = [_slot(buy=float(i + 1), pv=0, load=2_000) for i in range(40)]
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
current_soc_wh = 0.2 * battery.usable_capacity_wh
target = battery.charge_slot_buffer * (battery.soc_max_wh - current_soc_wh)
per_slot_wh = (
battery.max_charge_power_w * battery.charge_efficiency * INTERVAL_H
)
out = _select_charge_slots(slots, battery, current_soc_wh=current_soc_wh)
slots_picked = len(out)
self.assertLessEqual((slots_picked - 1) * per_slot_wh, target)
self.assertGreaterEqual(slots_picked * per_slot_wh, target)
def test_returns_empty_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set())
if __name__ == "__main__":
unittest.main()