Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek e295e55770
Some checks failed
CI and deploy / migration-check (push) Failing after 22s
CI and deploy / deploy (push) Has been skipped
doladeni odpoledniho dobiti
2026-05-21 16:18:30 +02:00

587 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích a exportních slotů referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
Charge mask:
B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC).
A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell degrad.
Discharge-export mask:
ref_buy = min(buy) celého horizontu.
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
"""
from __future__ import annotations
import unittest
from datetime import date, datetime, timezone, timedelta
from types import SimpleNamespace
from zoneinfo import ZoneInfo
from services.planning_engine import INTERVAL_H, PlanningSlot
_PRAGUE = ZoneInfo("Europe/Prague")
_LOOKAHEAD_SLOTS = 4
_BUY_LOOKAHEAD_EPS = 0.05
_BUY_CHARGE_BAND = 0.40
_MAX_GRID_CHARGE_CAP = 24
def _prague_date(s: PlanningSlot) -> date:
return s.interval_start.astimezone(_PRAGUE).date()
def _export_window_start(slots: list[PlanningSlot], degrad: float) -> datetime | None:
"""Kopie R__063: sell > min(buy) téhož kalendářního dne (Prague) + degrad."""
result: datetime | None = None
for s in slots:
day = _prague_date(s)
day_min = min(
float(x.buy_price) for x in slots if _prague_date(x) == day
)
if float(s.sell_price) > day_min + degrad:
if result is None or s.interval_start < result:
result = s.interval_start
return result
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
return max(tail) if tail else float(slots[t].sell_price)
def _buy_min_next_n(
slots: list[PlanningSlot],
t: int,
n: int = _LOOKAHEAD_SLOTS,
*,
export_window_start: datetime | None = None,
) -> float | None:
tail = [
float(slots[i].buy_price)
for i in range(t + 1, min(t + 1 + n, len(slots)))
if export_window_start is None or slots[i].interval_start < export_window_start
]
return min(tail) if tail else None
def _store_score(slots: list[PlanningSlot], t: int) -> float:
s = slots[t]
buy = float(s.buy_price)
sell = float(s.sell_price)
fso = _future_sell(slots, t)
return fso - sell - max(0.0, buy - sell)
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set(range(len(slots)))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
ref_buy_am = min(
(float(s.buy_price) for s in slots if _prague_hour(s) < 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_pm = min(
(float(s.buy_price) for s in slots if _prague_hour(s) >= 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_global = min(float(s.buy_price) for s in slots)
export_window_start = _export_window_start(slots, degrad)
plan_day = _prague_date(slots[0])
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0)
if charge_buf > 0:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(soc_max_wh - float(current_soc_wh), 0.0),
)
elif current_soc_wh >= reserve_wh:
charge_target_wh = max(energy_to_fill, 0.0)
else:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(energy_to_fill, 0.0),
)
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
n_pm = len(slots) - n_am
if n_am <= 0:
chg_am = 0.0
chg_pm = charge_target_wh
elif n_pm <= 0:
chg_am = charge_target_wh
chg_pm = 0.0
else:
chg_am = charge_target_wh / 2.0
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
grid_filled_wh = 0.0
buf_mult = charge_buf if charge_buf > 0 else 1.0
cap_am = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_am / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
cap_pm = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
before_export = (
0
if export_window_start is not None
and slots[t].interval_start < export_window_start
else 1
)
return (today_first, before_export, int(pred), price, t)
if purchase_pricing_mode != "fixed":
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) < 12
]
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_am = 0
for t, _pred, _price in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
if per_slot_full_wh > 0:
cap_pm = max(
cap_pm,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_pm = 0
for t, _pred, _price in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0)
pv_candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
fso = _future_sell(slots, t)
if (
pv_surplus_w > 0
and float(s.sell_price) >= float(s.buy_price) - degrad
and float(s.sell_price) >= fso - degrad
):
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
cum = 0.0
for t, _score, pv_surplus_w in pv_candidates:
if cum >= pv_layer_cap:
break
selected.add(t)
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
return selected
def _select_discharge_export_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
charge_slots: set[int] | None = None,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
if discharge_buf <= 0:
return set(range(len(slots)))
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
soc_max_wh = float(battery.soc_max_wh)
exportable_wh = soc_max_wh - min_soc_wh
if exportable_wh <= 0:
return set()
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
discharge_target_wh = exportable_wh * discharge_buf
ref_buy = min(float(s.buy_price) for s in slots)
if purchase_pricing_mode == "fixed":
sell_min = degrad
else:
sell_min = ref_buy + degrad
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if float(slots[t].sell_price) > sell_min
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
if cum >= discharge_target_wh or per_slot_wh <= 0:
break
selected.add(t)
cum += per_slot_wh
return selected
def _prague_hour(s: PlanningSlot) -> int:
dt = s.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_PRAGUE).hour
def _slot(
*,
buy: float,
sell: float = 1.0,
pv: int = 0,
load: int = 2_000,
hour_utc: int = 12,
predicted: bool = False,
interval_start: datetime | None = None,
) -> PlanningSlot:
if interval_start is None:
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
return PlanningSlot(
interval_start=interval_start,
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=predicted,
)
def _battery(
*,
charge_buf: float = 1.3,
discharge_buf: float = 1.5,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
min_soc_pct: float = 10.0,
reserve_soc_pct: float = 20.0,
max_charge_w: float = 18_000.0,
max_discharge_w: float = 18_000.0,
charge_eff: float = 0.95,
discharge_eff: float = 0.95,
degrad: float = 0.15,
) -> SimpleNamespace:
uc = uc_wh
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_soc_pct / 100.0 * uc,
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
soc_max_wh=soc_max_pct / 100.0 * uc,
max_charge_power_w=max_charge_w,
max_discharge_power_w=max_discharge_w,
charge_efficiency=charge_eff,
discharge_efficiency=discharge_eff,
charge_slot_buffer=charge_buf,
discharge_slot_buffer=discharge_buf,
degradation_cost_czk_kwh=degrad,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_returns_all_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set(range(3)))
def test_pv_surplus_high_store_score_selected(self) -> None:
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
slots = [
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
]
battery = _battery(
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán")
self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5")
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
slots = [
_slot(buy=0.5, hour_utc=4),
_slot(buy=3.0, hour_utc=5),
_slot(buy=0.4, hour_utc=14),
_slot(buy=9.9, hour_utc=15),
]
battery = _battery(
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None:
"""Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy)."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base),
_slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)),
_slot(
buy=5.5,
sell=3.8,
pv=0,
load=2500,
interval_start=base + timedelta(hours=7, minutes=30),
),
_slot(
buy=0.50,
sell=-0.3,
pv=2000,
load=5000,
interval_start=base + timedelta(hours=26),
),
]
battery = _battery(uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh)
self.assertIn(0, out)
self.assertIn(1, out)
self.assertEqual(
min(out),
0,
"Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)",
)
def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None:
"""Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(
buy=0.80,
sell=-0.08,
pv=2_500,
load=3_400,
interval_start=base,
),
_slot(
buy=0.72,
sell=-0.13,
pv=500,
load=3_400,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=2.50,
sell=1.40,
pv=2_000,
load=3_800,
interval_start=base + timedelta(hours=5),
),
_slot(
buy=5.50,
sell=3.80,
pv=100,
load=2_900,
interval_start=base + timedelta(hours=9),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.05)
soc = 0.88 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE")
self.assertIn(0, out)
self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty")
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
slots = [
_slot(
buy=1.49,
sell=-0.04,
pv=0,
load=3_500,
interval_start=base,
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=30),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.0)
soc = 0.92 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49")
self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány")
def test_pm_grid_gets_unused_am_wh_budget(self) -> None:
"""Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge."""
base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base),
_slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)),
_slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)),
_slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)),
_slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)),
]
battery = _battery(charge_buf=1.3, uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh)
pm_cheap = {2, 3}
self.assertTrue(
pm_cheap & out,
"po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu",
)
def test_ote_slots_prioritized_over_predicted(self) -> None:
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
slots = [
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
]
battery = _battery(
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out)
self.assertNotIn(1, out)
class SelectDischargeExportSlotsTests(unittest.TestCase):
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
slots = [
_slot(buy=0.50, sell=-0.30, hour_utc=6),
_slot(buy=0.51, sell=-0.29, hour_utc=7),
_slot(buy=5.60, sell=3.30, hour_utc=16),
_slot(buy=5.98, sell=3.37, hour_utc=17),
]
battery = _battery(uc_wh=64_000.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
)
self.assertIn(2, discharge)
self.assertIn(3, discharge)
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
slots = [
_slot(buy=0.40, sell=0.10, hour_utc=8),
_slot(buy=4.00, sell=0.50, hour_utc=18),
]
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.0
)
self.assertNotIn(1, discharge)
class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_skips_non_pv_grid_charge_slots(self) -> None:
slots = [
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
_slot(buy=6.35, sell=3.5, hour_utc=18, load=500),
]
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.4 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertEqual(out, set())
def test_fixed_allows_discharge_on_high_sell(self) -> None:
slots = [
_slot(buy=6.35, sell=1.0, hour_utc=10),
_slot(buy=6.35, sell=3.8, hour_utc=18),
_slot(buy=6.35, sell=3.2, hour_utc=19),
]
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
discharge = _select_discharge_export_slots(
slots,
battery,
current_soc_wh=0.5 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertIn(1, discharge)
self.assertIn(2, discharge)
if __name__ == "__main__":
unittest.main()