"""Pre-selection nabíjecích a exportních slotů – referenční Python. Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG. Charge mask: A) PV-surplus: sell_price ASC, dokud PV nepokryje charge target. B) Non-PV: AM/PM 50/50, OTE-first, buy_price ASC. Discharge-export mask: ref_buy = min(buy) mezi allow_charge sloty (arbitráž mezi sloty, ne sell vs buy ve stejném). Top sloty dle sell_price desc kde sell > ref_buy + degradation. """ from __future__ import annotations import unittest from datetime import datetime, timezone, timedelta from types import SimpleNamespace from zoneinfo import ZoneInfo from services.planning_engine import INTERVAL_H, PlanningSlot _PRAGUE = ZoneInfo("Europe/Prague") def _select_charge_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (charge mask).""" charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: return set(range(len(slots))) energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh) if energy_to_fill <= 0: return set(range(len(slots))) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_full_wh = max_p_w * eta * INTERVAL_H charge_target_wh = max(energy_to_fill, 0) * charge_buf # AM/PM budget n_am = sum(1 for s in slots if _prague_hour(s) < 12) n_pm = len(slots) - n_am if n_am <= 0: chg_am = 0.0 chg_pm = charge_target_wh elif n_pm <= 0: chg_am = charge_target_wh chg_pm = 0.0 else: chg_am = charge_target_wh / 2.0 chg_pm = charge_target_wh - chg_am selected: set[int] = set() # A) PV-surplus: cheapest sell_price first pv_candidates: list[tuple[int, float, float]] = [] for t, s in enumerate(slots): pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) if pv_surplus_w > 0: pv_candidates.append((t, float(s.sell_price), float(pv_surplus_w))) pv_candidates.sort(key=lambda x: (x[1], x[0])) cum = 0.0 for t, _sell, pv_surplus_w in pv_candidates: if cum >= charge_target_wh: break selected.add(t) cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H # B) Non-PV grid charge — jen spot nákup (u fixed je buy všude stejný → jen FVE) if purchase_pricing_mode == "fixed": return selected # B) Non-PV: AM budget (OTE-first) am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0 and _prague_hour(slots[t]) < 12 ] am_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 for t, _pred, _price in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0: break selected.add(t) cum += per_slot_full_wh # B) Non-PV: PM budget (OTE-first) pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0 and _prague_hour(slots[t]) >= 12 ] pm_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 for t, _pred, _price in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0: break selected.add(t) cum += per_slot_full_wh return selected def _select_discharge_export_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, charge_slots: set[int] | None = None, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask).""" discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0) if discharge_buf <= 0: return set(range(len(slots))) min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0) soc_max_wh = float(battery.soc_max_wh) exportable_wh = soc_max_wh - min_soc_wh if exportable_wh <= 0: return set() degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0) per_slot_wh = max_p_w * eta * INTERVAL_H discharge_target_wh = exportable_wh * discharge_buf if charge_slots is None: charge_slots = _select_charge_slots(slots, battery, current_soc_wh) ref_buy = min( (float(slots[t].buy_price) for t in charge_slots), default=min(float(s.buy_price) for s in slots), ) if purchase_pricing_mode == "fixed": sell_min = degrad else: sell_min = ref_buy + degrad candidates = [ (t, float(slots[t].sell_price)) for t in range(len(slots)) if float(slots[t].sell_price) > sell_min ] candidates.sort(key=lambda x: (-x[1], -x[0])) selected: set[int] = set() cum = 0.0 for t, _sell in candidates: if cum >= discharge_target_wh or per_slot_wh <= 0: break selected.add(t) cum += per_slot_wh return selected def _prague_hour(s: PlanningSlot) -> int: dt = s.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_PRAGUE).hour def _slot( *, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000, hour_utc: int = 12, predicted: bool = False, ) -> PlanningSlot: return PlanningSlot( interval_start=datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc), buy_price=buy, sell_price=sell, pv_a_forecast_w=0, pv_b_forecast_w=pv, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=predicted, ) def _battery( *, charge_buf: float = 1.3, discharge_buf: float = 1.5, uc_wh: float = 64_000.0, soc_max_pct: float = 95.0, min_soc_pct: float = 10.0, max_charge_w: float = 18_000.0, max_discharge_w: float = 18_000.0, charge_eff: float = 0.95, discharge_eff: float = 0.95, degrad: float = 0.15, ) -> SimpleNamespace: uc = uc_wh return SimpleNamespace( usable_capacity_wh=uc, min_soc_wh=min_soc_pct / 100.0 * uc, soc_max_wh=soc_max_pct / 100.0 * uc, max_charge_power_w=max_charge_w, max_discharge_power_w=max_discharge_w, charge_efficiency=charge_eff, discharge_efficiency=discharge_eff, charge_slot_buffer=charge_buf, discharge_slot_buffer=discharge_buf, degradation_cost_czk_kwh=degrad, ) class SelectChargeSlotsTests(unittest.TestCase): def test_buffer_zero_returns_all_slots(self) -> None: slots = [_slot(buy=3.0) for _ in range(4)] battery = _battery(charge_buf=0.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertEqual(out, set(range(4))) def test_returns_all_when_battery_is_full(self) -> None: slots = [_slot(buy=0.1) for _ in range(3)] battery = _battery() out = _select_charge_slots( slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 ) self.assertEqual(out, set(range(3))) def test_pv_surplus_cheapest_sell_price_selected(self) -> None: """PV-surplus sloty s nejnižší sell_price se vybírají přednostně.""" slots = [ _slot(buy=1.0, sell=2.0, pv=8_000, load=2_000), _slot(buy=1.0, sell=5.0, pv=8_000, load=2_000), _slot(buy=1.0, sell=3.0, pv=8_000, load=2_000), ] battery = _battery( charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "Cheapest sell_price PV slot must be selected") self.assertNotIn(1, out, "Expensive sell_price PV slot should be excluded") def test_non_pv_slots_selected_with_am_pm_budget(self) -> None: """Non-PV sloty se vybírají dle buy_price v rámci AM/PM rozpočtu.""" slots = [ _slot(buy=0.5, hour_utc=4), # AM slot, cheap _slot(buy=3.0, hour_utc=5), # AM slot, expensive _slot(buy=0.4, hour_utc=14), # PM slot, cheap _slot(buy=9.9, hour_utc=15), # PM slot, expensive ] battery = _battery( charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "Cheapest AM slot must be selected") self.assertIn(2, out, "Cheapest PM slot must be selected") def test_ote_slots_prioritized_over_predicted(self) -> None: """OTE sloty (is_predicted_price=false) mají přednost před predikovanými.""" slots = [ _slot(buy=3.56, hour_utc=13, predicted=False), # OTE, dražší _slot(buy=2.00, hour_utc=13, predicted=True), # predicted, levnější ] battery = _battery( charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "OTE slot must be selected even if pricier than predicted") def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None: """Regrese: sloty bez PV-surplus se vybírají přes AM/PM grid budget.""" slots = [ _slot(buy=0.4, pv=3_320, load=3_747, hour_utc=13), _slot(buy=0.42, pv=2_116, load=3_747, hour_utc=13), _slot(buy=0.44, pv=1_649, load=3_747, hour_utc=13), _slot(buy=0.47, pv=1_276, load=3_747, hour_utc=13), ] battery = _battery() out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) for idx in (0, 1, 2, 3): self.assertIn(idx, out) def test_long_horizon_pv_surplus_does_not_exhaust_grid_budget(self) -> None: """Regrese: v 96h horizontu nesmí PV-surplus sloty „vyžrat" grid rozpočet.""" cheap_grid = [_slot(buy=0.4 + 0.01 * i, pv=0, load=2_000) for i in range(40)] pv_days = [_slot(buy=1.5, sell=1.5, pv=10_000, load=2_000) for _ in range(100)] slots = cheap_grid + pv_days battery = _battery( charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) grid_selected = sum(1 for i in range(len(cheap_grid)) if i in out) self.assertGreaterEqual( grid_selected, 5, "V dlouhém horizontu s mnoha PV-surplus sloty musí zůstat dostatek " "grid slotů povolených pro nabíjení z levného importu.", ) class SelectDischargeExportSlotsTests(unittest.TestCase): def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None: """Regrese home-01: večer sell 3.3 > ref_buy 0.5 + degrad i když buy ve slotu je 5.6.""" slots = [ _slot(buy=0.50, sell=-0.30, hour_utc=6), _slot(buy=0.51, sell=-0.29, hour_utc=7), _slot(buy=5.60, sell=3.30, hour_utc=16), _slot(buy=5.98, sell=3.37, hour_utc=17), ] battery = _battery(uc_wh=64_000.0) charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge ) self.assertIn(0, charge) self.assertIn(2, discharge, "Evening sell must qualify vs ref buy, not same-slot buy") self.assertIn(3, discharge) def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None: slots = [ _slot(buy=0.40, sell=0.10, hour_utc=8), _slot(buy=4.00, sell=0.50, hour_utc=18), ] battery = _battery(uc_wh=10_000.0, discharge_buf=2.0) charge = _select_charge_slots(slots, battery, current_soc_wh=0.0) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.0, charge_slots=charge ) self.assertNotIn(1, discharge, "sell 0.5 < ref 0.4 + 0.15") class FixedPurchasePricingTests(unittest.TestCase): """purchase_pricing_mode=fixed: žádné grid CHARGE, export dle sell.""" def test_fixed_skips_non_pv_grid_charge_slots(self) -> None: slots = [ _slot(buy=6.35, sell=2.0, hour_utc=14, load=500), _slot(buy=6.35, sell=3.5, hour_utc=18, load=500), ] battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.4 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertEqual(out, set(), "fixed buy must not enable non-PV grid charge") def test_fixed_allows_discharge_on_high_sell(self) -> None: slots = [ _slot(buy=6.35, sell=1.0, hour_utc=10), _slot(buy=6.35, sell=3.8, hour_utc=18), _slot(buy=6.35, sell=3.2, hour_utc=19), ] battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.5 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertIn(1, discharge) self.assertIn(2, discharge) if __name__ == "__main__": unittest.main()