"""Pre-selection nabíjecích slotů (anti-micro-cycling) – referenční Python. Logika je v DB: ems.fn_load_planning_slots_full. Tento soubor drží kopii algoritmu pro rychlé unit testy bez PostgreSQL. Algoritmus (aktuální): A) PV-surplus sloty (pv_surplus > 0): ranking dle sell_price ASC, vyberou se nejlevnější, dokud kumulativní PV surplus nepokryje charge target (energy_to_fill × charge_buf). Zbylé → PV do sítě. B) Non-PV sloty (pv_surplus <= 0): AM/PM rozpočet 50/50, OTE-first priorita (is_predicted_price=false před true), poté seřazené dle buy_price ASC. """ from __future__ import annotations import unittest from datetime import datetime, timezone, timedelta from types import SimpleNamespace from zoneinfo import ZoneInfo from services.planning_engine import INTERVAL_H, PlanningSlot _PRAGUE = ZoneInfo("Europe/Prague") def _select_charge_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (charge mask).""" charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: return set(range(len(slots))) energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh) if energy_to_fill <= 0: return set(range(len(slots))) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_full_wh = max_p_w * eta * INTERVAL_H charge_target_wh = max(energy_to_fill, 0) * charge_buf # AM/PM budget n_am = sum(1 for s in slots if _prague_hour(s) < 12) n_pm = len(slots) - n_am if n_am <= 0: chg_am = 0.0 chg_pm = charge_target_wh elif n_pm <= 0: chg_am = charge_target_wh chg_pm = 0.0 else: chg_am = charge_target_wh / 2.0 chg_pm = charge_target_wh - chg_am selected: set[int] = set() # A) PV-surplus: cheapest sell_price first pv_candidates: list[tuple[int, float, float]] = [] for t, s in enumerate(slots): pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) if pv_surplus_w > 0: pv_candidates.append((t, float(s.sell_price), float(pv_surplus_w))) pv_candidates.sort(key=lambda x: (x[1], x[0])) cum = 0.0 for t, _sell, pv_surplus_w in pv_candidates: if cum >= charge_target_wh: break selected.add(t) cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H # B) Non-PV: AM budget (OTE-first) am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0 and _prague_hour(slots[t]) < 12 ] am_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 for t, _pred, _price in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0: break selected.add(t) cum += per_slot_full_wh # B) Non-PV: PM budget (OTE-first) pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if t not in selected and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0 and _prague_hour(slots[t]) >= 12 ] pm_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0])) cum = 0.0 for t, _pred, _price in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0: break selected.add(t) cum += per_slot_full_wh return selected def _prague_hour(s: PlanningSlot) -> int: dt = s.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_PRAGUE).hour def _slot( *, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000, hour_utc: int = 12, predicted: bool = False, ) -> PlanningSlot: return PlanningSlot( interval_start=datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc), buy_price=buy, sell_price=sell, pv_a_forecast_w=0, pv_b_forecast_w=pv, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=predicted, ) def _battery( *, charge_buf: float = 1.3, uc_wh: float = 64_000.0, soc_max_pct: float = 95.0, max_charge_w: float = 18_000.0, charge_eff: float = 0.95, ) -> SimpleNamespace: return SimpleNamespace( usable_capacity_wh=uc_wh, soc_max_wh=soc_max_pct / 100.0 * uc_wh, max_charge_power_w=max_charge_w, charge_efficiency=charge_eff, charge_slot_buffer=charge_buf, ) class SelectChargeSlotsTests(unittest.TestCase): def test_buffer_zero_returns_all_slots(self) -> None: slots = [_slot(buy=3.0) for _ in range(4)] battery = _battery(charge_buf=0.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertEqual(out, set(range(4))) def test_returns_all_when_battery_is_full(self) -> None: slots = [_slot(buy=0.1) for _ in range(3)] battery = _battery() out = _select_charge_slots( slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 ) self.assertEqual(out, set(range(3))) def test_pv_surplus_cheapest_sell_price_selected(self) -> None: """PV-surplus sloty s nejnižší sell_price se vybírají přednostně.""" slots = [ _slot(buy=1.0, sell=2.0, pv=8_000, load=2_000), _slot(buy=1.0, sell=5.0, pv=8_000, load=2_000), _slot(buy=1.0, sell=3.0, pv=8_000, load=2_000), ] battery = _battery( charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "Cheapest sell_price PV slot must be selected") self.assertNotIn(1, out, "Expensive sell_price PV slot should be excluded") def test_non_pv_slots_selected_with_am_pm_budget(self) -> None: """Non-PV sloty se vybírají dle buy_price v rámci AM/PM rozpočtu.""" slots = [ _slot(buy=0.5, hour_utc=4), # AM slot, cheap _slot(buy=3.0, hour_utc=5), # AM slot, expensive _slot(buy=0.4, hour_utc=14), # PM slot, cheap _slot(buy=9.9, hour_utc=15), # PM slot, expensive ] battery = _battery( charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "Cheapest AM slot must be selected") self.assertIn(2, out, "Cheapest PM slot must be selected") def test_ote_slots_prioritized_over_predicted(self) -> None: """OTE sloty (is_predicted_price=false) mají přednost před predikovanými.""" slots = [ _slot(buy=3.56, hour_utc=13, predicted=False), # OTE, dražší _slot(buy=2.00, hour_utc=13, predicted=True), # predicted, levnější ] battery = _battery( charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out, "OTE slot must be selected even if pricier than predicted") def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None: """Regrese: sloty bez PV-surplus se vybírají přes AM/PM grid budget.""" slots = [ _slot(buy=0.4, pv=3_320, load=3_747, hour_utc=13), _slot(buy=0.42, pv=2_116, load=3_747, hour_utc=13), _slot(buy=0.44, pv=1_649, load=3_747, hour_utc=13), _slot(buy=0.47, pv=1_276, load=3_747, hour_utc=13), ] battery = _battery() out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) for idx in (0, 1, 2, 3): self.assertIn(idx, out) def test_long_horizon_pv_surplus_does_not_exhaust_grid_budget(self) -> None: """Regrese: v 96h horizontu nesmí PV-surplus sloty „vyžrat" grid rozpočet.""" cheap_grid = [_slot(buy=0.4 + 0.01 * i, pv=0, load=2_000) for i in range(40)] pv_days = [_slot(buy=1.5, sell=1.5, pv=10_000, load=2_000) for _ in range(100)] slots = cheap_grid + pv_days battery = _battery( charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) grid_selected = sum(1 for i in range(len(cheap_grid)) if i in out) self.assertGreaterEqual( grid_selected, 5, "V dlouhém horizontu s mnoha PV-surplus sloty musí zůstat dostatek " "grid slotů povolených pro nabíjení z levného importu.", ) if __name__ == "__main__": unittest.main()