Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek 66834ddfa6
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped
dalsi rozvolneni at vic jedeme arbitraz
2026-05-21 14:54:46 +02:00

549 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích a exportních slotů referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
Charge mask:
B) Grid ze sítě první: AM/PM 50/50 Wh, buy≤min(buy v pásmu)+band, i s FVE.
A) PV-surplus: store_score DESC, doplní zbytek po vrstvě B.
Discharge-export mask:
ref_buy = min(buy) celého horizontu.
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
"""
from __future__ import annotations
import unittest
from datetime import date, datetime, timezone, timedelta
from types import SimpleNamespace
from zoneinfo import ZoneInfo
from services.planning_engine import INTERVAL_H, PlanningSlot
_PRAGUE = ZoneInfo("Europe/Prague")
_LOOKAHEAD_SLOTS = 4
_BUY_LOOKAHEAD_EPS = 0.05
_BUY_CHARGE_BAND = 0.40
_MAX_GRID_CHARGE_CAP = 24
def _prague_date(s: PlanningSlot) -> date:
return s.interval_start.astimezone(_PRAGUE).date()
def _export_window_start(slots: list[PlanningSlot], degrad: float) -> datetime | None:
"""Kopie R__063: sell > min(buy) téhož kalendářního dne (Prague) + degrad."""
result: datetime | None = None
for s in slots:
day = _prague_date(s)
day_min = min(
float(x.buy_price) for x in slots if _prague_date(x) == day
)
if float(s.sell_price) > day_min + degrad:
if result is None or s.interval_start < result:
result = s.interval_start
return result
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
return max(tail) if tail else float(slots[t].sell_price)
def _buy_min_next_n(
slots: list[PlanningSlot],
t: int,
n: int = _LOOKAHEAD_SLOTS,
*,
export_window_start: datetime | None = None,
) -> float | None:
tail = [
float(slots[i].buy_price)
for i in range(t + 1, min(t + 1 + n, len(slots)))
if export_window_start is None or slots[i].interval_start < export_window_start
]
return min(tail) if tail else None
def _store_score(slots: list[PlanningSlot], t: int) -> float:
s = slots[t]
buy = float(s.buy_price)
sell = float(s.sell_price)
fso = _future_sell(slots, t)
return fso - sell - max(0.0, buy - sell)
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set(range(len(slots)))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
ref_buy_am = min(
(float(s.buy_price) for s in slots if _prague_hour(s) < 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_pm = min(
(float(s.buy_price) for s in slots if _prague_hour(s) >= 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_global = min(float(s.buy_price) for s in slots)
export_window_start = _export_window_start(slots, degrad)
plan_day = _prague_date(slots[0])
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
if current_soc_wh >= reserve_wh:
charge_target_wh = max(energy_to_fill, 0.0)
else:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(energy_to_fill, 0.0),
)
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
n_pm = len(slots) - n_am
if n_am <= 0:
chg_am = 0.0
chg_pm = charge_target_wh
elif n_pm <= 0:
chg_am = charge_target_wh
chg_pm = 0.0
else:
chg_am = charge_target_wh / 2.0
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
grid_filled_wh = 0.0
cap_am = (
max(1, min(_MAX_GRID_CHARGE_CAP, int(chg_am / per_slot_full_wh) + 1))
if per_slot_full_wh > 0
else 6
)
cap_pm = (
max(1, min(_MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh) + 1))
if per_slot_full_wh > 0
else 6
)
def _grid_b_ok(t: int, ref_buy_seg: float) -> bool:
s = slots[t]
buy = float(s.buy_price)
if buy > ref_buy_seg + _BUY_CHARGE_BAND:
return False
nxt = _buy_min_next_n(slots, t, export_window_start=export_window_start)
if nxt is not None and buy > nxt + _BUY_LOOKAHEAD_EPS:
return False
return True
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
before_export = (
0
if export_window_start is not None
and slots[t].interval_start < export_window_start
else 1
)
return (today_first, before_export, int(pred), price, t)
if purchase_pricing_mode != "fixed":
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _grid_b_ok(t, ref_buy_am) and _prague_hour(slots[t]) < 12
]
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_am = 0
for t, _pred, _price in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _grid_b_ok(t, ref_buy_pm) and _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_pm = 0
for t, _pred, _price in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0)
pv_candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
if pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad:
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
cum = 0.0
for t, _score, pv_surplus_w in pv_candidates:
if cum >= pv_layer_cap:
break
selected.add(t)
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
return selected
def _select_discharge_export_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
charge_slots: set[int] | None = None,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
if discharge_buf <= 0:
return set(range(len(slots)))
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
soc_max_wh = float(battery.soc_max_wh)
exportable_wh = soc_max_wh - min_soc_wh
if exportable_wh <= 0:
return set()
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
discharge_target_wh = exportable_wh * discharge_buf
ref_buy = min(float(s.buy_price) for s in slots)
if purchase_pricing_mode == "fixed":
sell_min = degrad
else:
sell_min = ref_buy + degrad
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if float(slots[t].sell_price) > sell_min
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
if cum >= discharge_target_wh or per_slot_wh <= 0:
break
selected.add(t)
cum += per_slot_wh
return selected
def _prague_hour(s: PlanningSlot) -> int:
dt = s.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_PRAGUE).hour
def _slot(
*,
buy: float,
sell: float = 1.0,
pv: int = 0,
load: int = 2_000,
hour_utc: int = 12,
predicted: bool = False,
interval_start: datetime | None = None,
) -> PlanningSlot:
if interval_start is None:
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
return PlanningSlot(
interval_start=interval_start,
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=predicted,
)
def _battery(
*,
charge_buf: float = 1.3,
discharge_buf: float = 1.5,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
min_soc_pct: float = 10.0,
reserve_soc_pct: float = 20.0,
max_charge_w: float = 18_000.0,
max_discharge_w: float = 18_000.0,
charge_eff: float = 0.95,
discharge_eff: float = 0.95,
degrad: float = 0.15,
) -> SimpleNamespace:
uc = uc_wh
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_soc_pct / 100.0 * uc,
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
soc_max_wh=soc_max_pct / 100.0 * uc,
max_charge_power_w=max_charge_w,
max_discharge_power_w=max_discharge_w,
charge_efficiency=charge_eff,
discharge_efficiency=discharge_eff,
charge_slot_buffer=charge_buf,
discharge_slot_buffer=discharge_buf,
degradation_cost_czk_kwh=degrad,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_returns_all_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set(range(3)))
def test_pv_surplus_high_store_score_selected(self) -> None:
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
slots = [
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
]
battery = _battery(
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Slot s lepší marží (nižší buy) má být vybrán")
self.assertNotIn(0, out, "Ztrátový sell≪buy slot nemá grid charge z masky A")
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
slots = [
_slot(buy=0.5, hour_utc=4),
_slot(buy=3.0, hour_utc=5),
_slot(buy=0.4, hour_utc=14),
_slot(buy=9.9, hour_utc=15),
]
battery = _battery(
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None:
"""Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy)."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base),
_slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)),
_slot(
buy=5.5,
sell=3.8,
pv=0,
load=2500,
interval_start=base + timedelta(hours=7, minutes=30),
),
_slot(
buy=0.50,
sell=-0.3,
pv=2000,
load=5000,
interval_start=base + timedelta(hours=26),
),
]
battery = _battery(uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh)
self.assertIn(0, out)
self.assertIn(1, out)
self.assertEqual(
min(out),
0,
"Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)",
)
def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None:
"""Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(
buy=0.80,
sell=-0.08,
pv=2_500,
load=3_400,
interval_start=base,
),
_slot(
buy=0.72,
sell=-0.13,
pv=500,
load=3_400,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=2.50,
sell=1.40,
pv=2_000,
load=3_800,
interval_start=base + timedelta(hours=5),
),
_slot(
buy=5.50,
sell=3.80,
pv=100,
load=2_900,
interval_start=base + timedelta(hours=9),
),
]
battery = _battery(uc_wh=64_000.0)
soc = 0.46 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertIn(1, out, "Levnější PM slot (lookahead) má allow_charge i s FVE")
self.assertNotIn(
2,
out,
"Drahý odpolední slot nemá být v grid maskě B jen kvůli globálnímu min",
)
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
slots = [
_slot(
buy=1.49,
sell=-0.04,
pv=0,
load=3_500,
interval_start=base,
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=30),
),
]
battery = _battery(uc_wh=64_000.0)
soc = 0.31 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertNotIn(0, out, "VT slot před levným NT nesmí dostat grid charge z masky B")
self.assertIn(1, out, "NT slot může být vybrán")
def test_ote_slots_prioritized_over_predicted(self) -> None:
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
slots = [
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
]
battery = _battery(
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out)
self.assertNotIn(1, out)
class SelectDischargeExportSlotsTests(unittest.TestCase):
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
slots = [
_slot(buy=0.50, sell=-0.30, hour_utc=6),
_slot(buy=0.51, sell=-0.29, hour_utc=7),
_slot(buy=5.60, sell=3.30, hour_utc=16),
_slot(buy=5.98, sell=3.37, hour_utc=17),
]
battery = _battery(uc_wh=64_000.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
)
self.assertIn(2, discharge)
self.assertIn(3, discharge)
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
slots = [
_slot(buy=0.40, sell=0.10, hour_utc=8),
_slot(buy=4.00, sell=0.50, hour_utc=18),
]
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.0
)
self.assertNotIn(1, discharge)
class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_skips_non_pv_grid_charge_slots(self) -> None:
slots = [
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
_slot(buy=6.35, sell=3.5, hour_utc=18, load=500),
]
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.4 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertEqual(out, set())
def test_fixed_allows_discharge_on_high_sell(self) -> None:
slots = [
_slot(buy=6.35, sell=1.0, hour_utc=10),
_slot(buy=6.35, sell=3.8, hour_utc=18),
_slot(buy=6.35, sell=3.2, hour_utc=19),
]
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
discharge = _select_discharge_export_slots(
slots,
battery,
current_soc_wh=0.5 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertIn(1, discharge)
self.assertIn(2, discharge)
if __name__ == "__main__":
unittest.main()