Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek a17c22d475
Some checks failed
CI and deploy / migration-check (push) Failing after 12s
CI and deploy / deploy (push) Has been skipped
dalsi fix - chtel drzet baterii prakticky porad a neprodat ani nejeet passive mode
2026-05-16 15:52:14 +02:00

335 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích a exportních slotů referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
Charge mask:
A) PV-surplus: sell_price ASC, dokud PV nepokryje charge target.
B) Non-PV: AM/PM 50/50, OTE-first, buy_price ASC.
Discharge-export mask:
ref_buy = min(buy) mezi allow_charge sloty (arbitráž mezi sloty, ne sell vs buy ve stejném).
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
"""
from __future__ import annotations
import unittest
from datetime import datetime, timezone, timedelta
from types import SimpleNamespace
from zoneinfo import ZoneInfo
from services.planning_engine import INTERVAL_H, PlanningSlot
_PRAGUE = ZoneInfo("Europe/Prague")
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set(range(len(slots)))
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
charge_target_wh = max(energy_to_fill, 0) * charge_buf
# AM/PM budget
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
n_pm = len(slots) - n_am
if n_am <= 0:
chg_am = 0.0
chg_pm = charge_target_wh
elif n_pm <= 0:
chg_am = charge_target_wh
chg_pm = 0.0
else:
chg_am = charge_target_wh / 2.0
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
# A) PV-surplus: cheapest sell_price first
pv_candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
if pv_surplus_w > 0:
pv_candidates.append((t, float(s.sell_price), float(pv_surplus_w)))
pv_candidates.sort(key=lambda x: (x[1], x[0]))
cum = 0.0
for t, _sell, pv_surplus_w in pv_candidates:
if cum >= charge_target_wh:
break
selected.add(t)
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
# B) Non-PV: AM budget (OTE-first)
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if t not in selected
and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0
and _prague_hour(slots[t]) < 12
]
am_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0]))
cum = 0.0
for t, _pred, _price in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0:
break
selected.add(t)
cum += per_slot_full_wh
# B) Non-PV: PM budget (OTE-first)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if t not in selected
and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0
and _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0]))
cum = 0.0
for t, _pred, _price in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0:
break
selected.add(t)
cum += per_slot_full_wh
return selected
def _select_discharge_export_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
charge_slots: set[int] | None = None,
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
if discharge_buf <= 0:
return set(range(len(slots)))
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
soc_max_wh = float(battery.soc_max_wh)
exportable_wh = soc_max_wh - min_soc_wh
if exportable_wh <= 0:
return set()
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
discharge_target_wh = exportable_wh * discharge_buf
if charge_slots is None:
charge_slots = _select_charge_slots(slots, battery, current_soc_wh)
ref_buy = min(
(float(slots[t].buy_price) for t in charge_slots),
default=min(float(s.buy_price) for s in slots),
)
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if float(slots[t].sell_price) > ref_buy + degrad
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
if cum >= discharge_target_wh or per_slot_wh <= 0:
break
selected.add(t)
cum += per_slot_wh
return selected
def _prague_hour(s: PlanningSlot) -> int:
dt = s.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_PRAGUE).hour
def _slot(
*,
buy: float,
sell: float = 1.0,
pv: int = 0,
load: int = 2_000,
hour_utc: int = 12,
predicted: bool = False,
) -> PlanningSlot:
return PlanningSlot(
interval_start=datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=predicted,
)
def _battery(
*,
charge_buf: float = 1.3,
discharge_buf: float = 1.5,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
min_soc_pct: float = 10.0,
max_charge_w: float = 18_000.0,
max_discharge_w: float = 18_000.0,
charge_eff: float = 0.95,
discharge_eff: float = 0.95,
degrad: float = 0.15,
) -> SimpleNamespace:
uc = uc_wh
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_soc_pct / 100.0 * uc,
soc_max_wh=soc_max_pct / 100.0 * uc,
max_charge_power_w=max_charge_w,
max_discharge_power_w=max_discharge_w,
charge_efficiency=charge_eff,
discharge_efficiency=discharge_eff,
charge_slot_buffer=charge_buf,
discharge_slot_buffer=discharge_buf,
degradation_cost_czk_kwh=degrad,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_returns_all_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set(range(3)))
def test_pv_surplus_cheapest_sell_price_selected(self) -> None:
"""PV-surplus sloty s nejnižší sell_price se vybírají přednostně."""
slots = [
_slot(buy=1.0, sell=2.0, pv=8_000, load=2_000),
_slot(buy=1.0, sell=5.0, pv=8_000, load=2_000),
_slot(buy=1.0, sell=3.0, pv=8_000, load=2_000),
]
battery = _battery(
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out, "Cheapest sell_price PV slot must be selected")
self.assertNotIn(1, out, "Expensive sell_price PV slot should be excluded")
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
"""Non-PV sloty se vybírají dle buy_price v rámci AM/PM rozpočtu."""
slots = [
_slot(buy=0.5, hour_utc=4), # AM slot, cheap
_slot(buy=3.0, hour_utc=5), # AM slot, expensive
_slot(buy=0.4, hour_utc=14), # PM slot, cheap
_slot(buy=9.9, hour_utc=15), # PM slot, expensive
]
battery = _battery(
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out, "Cheapest AM slot must be selected")
self.assertIn(2, out, "Cheapest PM slot must be selected")
def test_ote_slots_prioritized_over_predicted(self) -> None:
"""OTE sloty (is_predicted_price=false) mají přednost před predikovanými."""
slots = [
_slot(buy=3.56, hour_utc=13, predicted=False), # OTE, dražší
_slot(buy=2.00, hour_utc=13, predicted=True), # predicted, levnější
]
battery = _battery(
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out, "OTE slot must be selected even if pricier than predicted")
def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None:
"""Regrese: sloty bez PV-surplus se vybírají přes AM/PM grid budget."""
slots = [
_slot(buy=0.4, pv=3_320, load=3_747, hour_utc=13),
_slot(buy=0.42, pv=2_116, load=3_747, hour_utc=13),
_slot(buy=0.44, pv=1_649, load=3_747, hour_utc=13),
_slot(buy=0.47, pv=1_276, load=3_747, hour_utc=13),
]
battery = _battery()
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
for idx in (0, 1, 2, 3):
self.assertIn(idx, out)
def test_long_horizon_pv_surplus_does_not_exhaust_grid_budget(self) -> None:
"""Regrese: v 96h horizontu nesmí PV-surplus sloty „vyžrat" grid rozpočet."""
cheap_grid = [_slot(buy=0.4 + 0.01 * i, pv=0, load=2_000) for i in range(40)]
pv_days = [_slot(buy=1.5, sell=1.5, pv=10_000, load=2_000) for _ in range(100)]
slots = cheap_grid + pv_days
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
grid_selected = sum(1 for i in range(len(cheap_grid)) if i in out)
self.assertGreaterEqual(
grid_selected,
5,
"V dlouhém horizontu s mnoha PV-surplus sloty musí zůstat dostatek "
"grid slotů povolených pro nabíjení z levného importu.",
)
class SelectDischargeExportSlotsTests(unittest.TestCase):
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
"""Regrese home-01: večer sell 3.3 > ref_buy 0.5 + degrad i když buy ve slotu je 5.6."""
slots = [
_slot(buy=0.50, sell=-0.30, hour_utc=6),
_slot(buy=0.51, sell=-0.29, hour_utc=7),
_slot(buy=5.60, sell=3.30, hour_utc=16),
_slot(buy=5.98, sell=3.37, hour_utc=17),
]
battery = _battery(uc_wh=64_000.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
)
self.assertIn(0, charge)
self.assertIn(2, discharge, "Evening sell must qualify vs ref buy, not same-slot buy")
self.assertIn(3, discharge)
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
slots = [
_slot(buy=0.40, sell=0.10, hour_utc=8),
_slot(buy=4.00, sell=0.50, hour_utc=18),
]
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.0)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.0, charge_slots=charge
)
self.assertNotIn(1, discharge, "sell 0.5 < ref 0.4 + 0.15")
if __name__ == "__main__":
unittest.main()