"""Pre-selection nabíjecích a exportních slotů – referenční Python. Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG. Charge mask: A) PV-surplus: store_score DESC, dokud PV nepokryje charge target. B) Non-PV: AM/PM, OTE-first, buy≤ref+degrad, lookahead, cap 6 slotů. Discharge-export mask: ref_buy = min(buy) celého horizontu. Top sloty dle sell_price desc kde sell > ref_buy + degradation. """ from __future__ import annotations import unittest from datetime import datetime, timezone, timedelta from types import SimpleNamespace from zoneinfo import ZoneInfo from services.planning_engine import INTERVAL_H, PlanningSlot _PRAGUE = ZoneInfo("Europe/Prague") _LOOKAHEAD_SLOTS = 4 _GRID_CHARGE_CAP_AM = 6 _GRID_CHARGE_CAP_PM = 6 _BUY_LOOKAHEAD_EPS = 0.05 def _future_sell(slots: list[PlanningSlot], t: int) -> float: tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))] return max(tail) if tail else float(slots[t].sell_price) def _buy_min_next_n(slots: list[PlanningSlot], t: int, n: int = _LOOKAHEAD_SLOTS) -> float | None: tail = [ float(slots[i].buy_price) for i in range(t + 1, min(t + 1 + n, len(slots))) ] return min(tail) if tail else None def _store_score(slots: list[PlanningSlot], t: int) -> float: s = slots[t] buy = float(s.buy_price) sell = float(s.sell_price) fso = _future_sell(slots, t) return fso - sell - max(0.0, buy - sell) def _select_charge_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (charge mask).""" charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: return set(range(len(slots))) energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh) if energy_to_fill <= 0: return set(range(len(slots))) reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0) degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) ref_buy = min(float(s.buy_price) for s in slots) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_full_wh = max_p_w * eta * INTERVAL_H if current_soc_wh >= reserve_wh: charge_target_wh = max(energy_to_fill, 0.0) else: charge_target_wh = min( max(energy_to_fill, 0.0) * charge_buf, max(energy_to_fill, 0.0), ) n_am = sum(1 for s in slots if _prague_hour(s) < 12) n_pm = len(slots) - n_am if n_am <= 0: chg_am = 0.0 chg_pm = charge_target_wh elif n_pm <= 0: chg_am = charge_target_wh chg_pm = 0.0 else: chg_am = charge_target_wh / 2.0 chg_pm = charge_target_wh - chg_am selected: set[int] = set() # A) PV-surplus: highest store_score first pv_candidates: list[tuple[int, float, float]] = [] for t, s in enumerate(slots): pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) if pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad: pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w))) pv_candidates.sort(key=lambda x: (-x[1], x[0])) cum = 0.0 for t, _score, pv_surplus_w in pv_candidates: if cum >= charge_target_wh: break selected.add(t) cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H if purchase_pricing_mode == "fixed": return selected def _grid_b_ok(t: int) -> bool: s = slots[t] if max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) > 0: return False buy = float(s.buy_price) sell = float(s.sell_price) if buy > ref_buy + degrad: return False nxt = _buy_min_next_n(slots, t) if nxt is not None and buy > nxt + _BUY_LOOKAHEAD_EPS: return False return True # B) AM am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and _grid_b_ok(t) and _prague_hour(slots[t]) < 12 ] am_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 grid_am = 0 for t, _pred, _price in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= _GRID_CHARGE_CAP_AM: break selected.add(t) cum += per_slot_full_wh grid_am += 1 pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and _grid_b_ok(t) and _prague_hour(slots[t]) >= 12 ] pm_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 grid_pm = 0 for t, _pred, _price in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= _GRID_CHARGE_CAP_PM: break selected.add(t) cum += per_slot_full_wh grid_pm += 1 return selected def _select_discharge_export_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, charge_slots: set[int] | None = None, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask).""" discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0) if discharge_buf <= 0: return set(range(len(slots))) min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0) soc_max_wh = float(battery.soc_max_wh) exportable_wh = soc_max_wh - min_soc_wh if exportable_wh <= 0: return set() degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0) per_slot_wh = max_p_w * eta * INTERVAL_H discharge_target_wh = exportable_wh * discharge_buf ref_buy = min(float(s.buy_price) for s in slots) if purchase_pricing_mode == "fixed": sell_min = degrad else: sell_min = ref_buy + degrad candidates = [ (t, float(slots[t].sell_price)) for t in range(len(slots)) if float(slots[t].sell_price) > sell_min ] candidates.sort(key=lambda x: (-x[1], -x[0])) selected: set[int] = set() cum = 0.0 for t, _sell in candidates: if cum >= discharge_target_wh or per_slot_wh <= 0: break selected.add(t) cum += per_slot_wh return selected def _prague_hour(s: PlanningSlot) -> int: dt = s.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_PRAGUE).hour def _slot( *, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000, hour_utc: int = 12, predicted: bool = False, interval_start: datetime | None = None, ) -> PlanningSlot: if interval_start is None: interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc) return PlanningSlot( interval_start=interval_start, buy_price=buy, sell_price=sell, pv_a_forecast_w=0, pv_b_forecast_w=pv, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=predicted, ) def _battery( *, charge_buf: float = 1.3, discharge_buf: float = 1.5, uc_wh: float = 64_000.0, soc_max_pct: float = 95.0, min_soc_pct: float = 10.0, reserve_soc_pct: float = 20.0, max_charge_w: float = 18_000.0, max_discharge_w: float = 18_000.0, charge_eff: float = 0.95, discharge_eff: float = 0.95, degrad: float = 0.15, ) -> SimpleNamespace: uc = uc_wh return SimpleNamespace( usable_capacity_wh=uc, min_soc_wh=min_soc_pct / 100.0 * uc, reserve_soc_wh=reserve_soc_pct / 100.0 * uc, soc_max_wh=soc_max_pct / 100.0 * uc, max_charge_power_w=max_charge_w, max_discharge_power_w=max_discharge_w, charge_efficiency=charge_eff, discharge_efficiency=discharge_eff, charge_slot_buffer=charge_buf, discharge_slot_buffer=discharge_buf, degradation_cost_czk_kwh=degrad, ) class SelectChargeSlotsTests(unittest.TestCase): def test_buffer_zero_returns_all_slots(self) -> None: slots = [_slot(buy=3.0) for _ in range(4)] battery = _battery(charge_buf=0.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertEqual(out, set(range(4))) def test_returns_all_when_battery_is_full(self) -> None: slots = [_slot(buy=0.1) for _ in range(3)] battery = _battery() out = _select_charge_slots( slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 ) self.assertEqual(out, set(range(3))) def test_pv_surplus_high_store_score_selected(self) -> None: """Slot s vyšším store_score (lepší uložení vs export) má přednost.""" slots = [ _slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8), _slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9), _slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10), ] battery = _battery( charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Slot s lepší marží (nižší buy) má být vybrán") self.assertNotIn(0, out, "Ztrátový sell≪buy slot nemá grid charge z masky A") def test_non_pv_slots_selected_with_am_pm_budget(self) -> None: """Levný PM slot; AM s dražším buy než min v lookahead může být vynechán.""" slots = [ _slot(buy=0.5, hour_utc=4), _slot(buy=3.0, hour_utc=5), _slot(buy=0.4, hour_utc=14), _slot(buy=9.9, hour_utc=15), ] battery = _battery( charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán") def test_vt_before_nt_skips_expensive_pm_slot(self) -> None: """Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45.""" base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc) slots = [ _slot( buy=1.49, sell=-0.04, pv=0, load=3_500, interval_start=base, ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=15), ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=30), ), ] battery = _battery(uc_wh=64_000.0) soc = 0.31 * battery.usable_capacity_wh out = _select_charge_slots(slots, battery, current_soc_wh=soc) self.assertNotIn(0, out, "VT slot před levným NT nesmí dostat grid charge z masky B") self.assertIn(1, out, "NT slot může být vybrán") def test_ote_slots_prioritized_over_predicted(self) -> None: """Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným.""" slots = [ _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False), _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True), ] battery = _battery( charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out) self.assertNotIn(1, out) class SelectDischargeExportSlotsTests(unittest.TestCase): def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None: slots = [ _slot(buy=0.50, sell=-0.30, hour_utc=6), _slot(buy=0.51, sell=-0.29, hour_utc=7), _slot(buy=5.60, sell=3.30, hour_utc=16), _slot(buy=5.98, sell=3.37, hour_utc=17), ] battery = _battery(uc_wh=64_000.0) charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge ) self.assertIn(2, discharge) self.assertIn(3, discharge) def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None: slots = [ _slot(buy=0.40, sell=0.10, hour_utc=8), _slot(buy=4.00, sell=0.50, hour_utc=18), ] battery = _battery(uc_wh=10_000.0, discharge_buf=2.0) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.0 ) self.assertNotIn(1, discharge) class FixedPurchasePricingTests(unittest.TestCase): def test_fixed_skips_non_pv_grid_charge_slots(self) -> None: slots = [ _slot(buy=6.35, sell=2.0, hour_utc=14, load=500), _slot(buy=6.35, sell=3.5, hour_utc=18, load=500), ] battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.4 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertEqual(out, set()) def test_fixed_allows_discharge_on_high_sell(self) -> None: slots = [ _slot(buy=6.35, sell=1.0, hour_utc=10), _slot(buy=6.35, sell=3.8, hour_utc=18), _slot(buy=6.35, sell=3.2, hour_utc=19), ] battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.5 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertIn(1, discharge) self.assertIn(2, discharge) if __name__ == "__main__": unittest.main()