"""Pre-selection nabíjecích a exportních slotů – referenční Python. Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG. Charge mask: B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC). A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell − degrad. Discharge-export mask: ref_buy = min(buy) celého horizontu. Top sloty dle sell_price desc kde sell > ref_buy + degradation. """ from __future__ import annotations import unittest from datetime import date, datetime, timezone, timedelta from types import SimpleNamespace from zoneinfo import ZoneInfo from services.planning_engine import INTERVAL_H, PlanningSlot _PRAGUE = ZoneInfo("Europe/Prague") _LOOKAHEAD_SLOTS = 4 _BUY_LOOKAHEAD_EPS = 0.05 _BUY_CHARGE_BAND = 0.40 _MAX_GRID_CHARGE_CAP = 24 def _prague_date(s: PlanningSlot) -> date: return s.interval_start.astimezone(_PRAGUE).date() def _export_window_start(slots: list[PlanningSlot], degrad: float) -> datetime | None: """Kopie R__063: sell > min(buy) téhož kalendářního dne (Prague) + degrad.""" result: datetime | None = None for s in slots: day = _prague_date(s) day_min = min( float(x.buy_price) for x in slots if _prague_date(x) == day ) if float(s.sell_price) > day_min + degrad: if result is None or s.interval_start < result: result = s.interval_start return result def _future_sell(slots: list[PlanningSlot], t: int) -> float: tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))] return max(tail) if tail else float(slots[t].sell_price) def _buy_min_next_n( slots: list[PlanningSlot], t: int, n: int = _LOOKAHEAD_SLOTS, *, export_window_start: datetime | None = None, ) -> float | None: tail = [ float(slots[i].buy_price) for i in range(t + 1, min(t + 1 + n, len(slots))) if export_window_start is None or slots[i].interval_start < export_window_start ] return min(tail) if tail else None def _store_score(slots: list[PlanningSlot], t: int) -> float: s = slots[t] buy = float(s.buy_price) sell = float(s.sell_price) fso = _future_sell(slots, t) return fso - sell - max(0.0, buy - sell) def _select_charge_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (charge mask).""" charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: return set(range(len(slots))) energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh) if energy_to_fill <= 0: return set(range(len(slots))) reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0) degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) ref_buy_am = min( (float(s.buy_price) for s in slots if _prague_hour(s) < 12), default=min(float(s.buy_price) for s in slots), ) ref_buy_pm = min( (float(s.buy_price) for s in slots if _prague_hour(s) >= 12), default=min(float(s.buy_price) for s in slots), ) ref_buy_global = min(float(s.buy_price) for s in slots) export_window_start = _export_window_start(slots, degrad) plan_day = _prague_date(slots[0]) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_full_wh = max_p_w * eta * INTERVAL_H soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0) if charge_buf > 0: charge_target_wh = min( max(energy_to_fill, 0.0) * charge_buf, max(soc_max_wh - float(current_soc_wh), 0.0), ) elif current_soc_wh >= reserve_wh: charge_target_wh = max(energy_to_fill, 0.0) else: charge_target_wh = min( max(energy_to_fill, 0.0) * charge_buf, max(energy_to_fill, 0.0), ) n_am = sum(1 for s in slots if _prague_hour(s) < 12) n_pm = len(slots) - n_am if n_am <= 0: chg_am = 0.0 chg_pm = charge_target_wh elif n_pm <= 0: chg_am = charge_target_wh chg_pm = 0.0 else: chg_am = charge_target_wh / 2.0 chg_pm = charge_target_wh - chg_am selected: set[int] = set() grid_filled_wh = 0.0 buf_mult = charge_buf if charge_buf > 0 else 1.0 cap_am = ( max( 1, min( _MAX_GRID_CHARGE_CAP, int(chg_am / per_slot_full_wh * buf_mult) + 1, ), ) if per_slot_full_wh > 0 else 6 ) cap_pm = ( max( 1, min( _MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh * buf_mult) + 1, ), ) if per_slot_full_wh > 0 else 6 ) def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]: today_first = 0 if _prague_date(slots[t]) == plan_day else 1 before_export = ( 0 if export_window_start is not None and slots[t].interval_start < export_window_start else 1 ) return (today_first, before_export, int(pred), price, t) if purchase_pricing_mode != "fixed": am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if _prague_hour(slots[t]) < 12 ] am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2])) cum = 0.0 grid_am = 0 for t, _pred, _price in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am: break selected.add(t) cum += per_slot_full_wh grid_am += 1 grid_filled_wh += cum chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh) if per_slot_full_wh > 0: cap_pm = max( cap_pm, min( _MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh * buf_mult) + 1, ), ) pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if _prague_hour(slots[t]) >= 12 ] pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2])) cum = 0.0 grid_pm = 0 for t, _pred, _price in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm: break selected.add(t) cum += per_slot_full_wh grid_pm += 1 grid_filled_wh += cum pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0) pv_candidates: list[tuple[int, float, float]] = [] for t, s in enumerate(slots): pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) fso = _future_sell(slots, t) if ( pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad and float(s.sell_price) >= fso - degrad ): pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w))) pv_candidates.sort(key=lambda x: (-x[1], x[0])) cum = 0.0 for t, _score, pv_surplus_w in pv_candidates: if cum >= pv_layer_cap: break selected.add(t) cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H return selected def _select_discharge_export_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, charge_slots: set[int] | None = None, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask).""" discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0) if discharge_buf <= 0: return set(range(len(slots))) min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0) soc_max_wh = float(battery.soc_max_wh) exportable_wh = soc_max_wh - min_soc_wh if exportable_wh <= 0: return set() degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0) per_slot_wh = max_p_w * eta * INTERVAL_H discharge_target_wh = exportable_wh * discharge_buf ref_buy = min(float(s.buy_price) for s in slots) if purchase_pricing_mode == "fixed": sell_min = degrad else: sell_min = ref_buy + degrad candidates = [ (t, float(slots[t].sell_price)) for t in range(len(slots)) if float(slots[t].sell_price) > sell_min ] candidates.sort(key=lambda x: (-x[1], -x[0])) selected: set[int] = set() cum = 0.0 for t, _sell in candidates: if cum >= discharge_target_wh or per_slot_wh <= 0: break selected.add(t) cum += per_slot_wh return selected def _prague_hour(s: PlanningSlot) -> int: dt = s.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_PRAGUE).hour def _slot( *, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000, hour_utc: int = 12, predicted: bool = False, interval_start: datetime | None = None, ) -> PlanningSlot: if interval_start is None: interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc) return PlanningSlot( interval_start=interval_start, buy_price=buy, sell_price=sell, pv_a_forecast_w=0, pv_b_forecast_w=pv, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=predicted, ) def _battery( *, charge_buf: float = 1.3, discharge_buf: float = 1.5, uc_wh: float = 64_000.0, soc_max_pct: float = 95.0, min_soc_pct: float = 10.0, reserve_soc_pct: float = 20.0, max_charge_w: float = 18_000.0, max_discharge_w: float = 18_000.0, charge_eff: float = 0.95, discharge_eff: float = 0.95, degrad: float = 0.15, ) -> SimpleNamespace: uc = uc_wh return SimpleNamespace( usable_capacity_wh=uc, min_soc_wh=min_soc_pct / 100.0 * uc, reserve_soc_wh=reserve_soc_pct / 100.0 * uc, soc_max_wh=soc_max_pct / 100.0 * uc, max_charge_power_w=max_charge_w, max_discharge_power_w=max_discharge_w, charge_efficiency=charge_eff, discharge_efficiency=discharge_eff, charge_slot_buffer=charge_buf, discharge_slot_buffer=discharge_buf, degradation_cost_czk_kwh=degrad, ) class SelectChargeSlotsTests(unittest.TestCase): def test_buffer_zero_returns_all_slots(self) -> None: slots = [_slot(buy=3.0) for _ in range(4)] battery = _battery(charge_buf=0.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertEqual(out, set(range(4))) def test_returns_all_when_battery_is_full(self) -> None: slots = [_slot(buy=0.1) for _ in range(3)] battery = _battery() out = _select_charge_slots( slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 ) self.assertEqual(out, set(range(3))) def test_pv_surplus_high_store_score_selected(self) -> None: """Slot s vyšším store_score (lepší uložení vs export) má přednost.""" slots = [ _slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8), _slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9), _slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10), ] battery = _battery( charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán") self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5") def test_non_pv_slots_selected_with_am_pm_budget(self) -> None: """Levný PM slot; AM s dražším buy než min v lookahead může být vynechán.""" slots = [ _slot(buy=0.5, hour_utc=4), _slot(buy=3.0, hour_utc=5), _slot(buy=0.4, hour_utc=14), _slot(buy=9.9, hour_utc=15), ] battery = _battery( charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán") def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None: """Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy).""" base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc) slots = [ _slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base), _slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)), _slot( buy=5.5, sell=3.8, pv=0, load=2500, interval_start=base + timedelta(hours=7, minutes=30), ), _slot( buy=0.50, sell=-0.3, pv=2000, load=5000, interval_start=base + timedelta(hours=26), ), ] battery = _battery(uc_wh=64_000.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh) self.assertIn(0, out) self.assertIn(1, out) self.assertEqual( min(out), 0, "Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)", ) def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None: """Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B.""" base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc) slots = [ _slot( buy=0.80, sell=-0.08, pv=2_500, load=3_400, interval_start=base, ), _slot( buy=0.72, sell=-0.13, pv=500, load=3_400, interval_start=base + timedelta(minutes=15), ), _slot( buy=2.50, sell=1.40, pv=2_000, load=3_800, interval_start=base + timedelta(hours=5), ), _slot( buy=5.50, sell=3.80, pv=100, load=2_900, interval_start=base + timedelta(hours=9), ), ] battery = _battery(uc_wh=64_000.0, charge_buf=1.05) soc = 0.88 * battery.usable_capacity_wh out = _select_charge_slots(slots, battery, current_soc_wh=soc) self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE") self.assertIn(0, out) self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty") def test_vt_before_nt_skips_expensive_pm_slot(self) -> None: """Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45.""" base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc) slots = [ _slot( buy=1.49, sell=-0.04, pv=0, load=3_500, interval_start=base, ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=15), ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=30), ), ] battery = _battery(uc_wh=64_000.0, charge_buf=1.0) soc = 0.92 * battery.usable_capacity_wh out = _select_charge_slots(slots, battery, current_soc_wh=soc) self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49") self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány") def test_pm_grid_gets_unused_am_wh_budget(self) -> None: """Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge.""" base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc) slots = [ _slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base), _slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)), _slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)), _slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)), _slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)), ] battery = _battery(charge_buf=1.3, uc_wh=64_000.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh) pm_cheap = {2, 3} self.assertTrue( pm_cheap & out, "po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu", ) def test_ote_slots_prioritized_over_predicted(self) -> None: """Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným.""" slots = [ _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False), _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True), ] battery = _battery( charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out) self.assertNotIn(1, out) class SelectDischargeExportSlotsTests(unittest.TestCase): def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None: slots = [ _slot(buy=0.50, sell=-0.30, hour_utc=6), _slot(buy=0.51, sell=-0.29, hour_utc=7), _slot(buy=5.60, sell=3.30, hour_utc=16), _slot(buy=5.98, sell=3.37, hour_utc=17), ] battery = _battery(uc_wh=64_000.0) charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge ) self.assertIn(2, discharge) self.assertIn(3, discharge) def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None: slots = [ _slot(buy=0.40, sell=0.10, hour_utc=8), _slot(buy=4.00, sell=0.50, hour_utc=18), ] battery = _battery(uc_wh=10_000.0, discharge_buf=2.0) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.0 ) self.assertNotIn(1, discharge) class FixedPurchasePricingTests(unittest.TestCase): def test_fixed_skips_non_pv_grid_charge_slots(self) -> None: slots = [ _slot(buy=6.35, sell=2.0, hour_utc=14, load=500), _slot(buy=6.35, sell=3.5, hour_utc=18, load=500), ] battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.4 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertEqual(out, set()) def test_fixed_allows_discharge_on_high_sell(self) -> None: slots = [ _slot(buy=6.35, sell=1.0, hour_utc=10), _slot(buy=6.35, sell=3.8, hour_utc=18), _slot(buy=6.35, sell=3.2, hour_utc=19), ] battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.5 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertIn(1, discharge) self.assertIn(2, discharge) if __name__ == "__main__": unittest.main()