diff --git a/backend/services/planning_engine.py b/backend/services/planning_engine.py index 4aa6f53..3cd1316 100644 --- a/backend/services/planning_engine.py +++ b/backend/services/planning_engine.py @@ -159,10 +159,19 @@ def _select_charge_slots( current_soc_wh: float, ) -> set[int]: """ - Pre-select which slots are eligible for battery charging. - Only the X cheapest sell-price PV-surplus slots are selected, - enough to fill the battery with a configurable buffer. - Returns set of slot indices. Empty set = no restriction. + Pre-select which slots are eligible for battery charging (anti-micro-cycling). + + Logika: + 1) Sloty s PV-surplus (pv_a + pv_b > load_baseline) jsou vždy zahrnuty – + jde o „zdarma“ nabíjení z FVE, nemá smysl ho zakazovat. + 2) Zbývající energetický rozpočet (cíl = charge_buf × (soc_max − current_soc), + snížený o očekávaný přínos z PV-surplus slotů) se doplní nejlevnějšími sloty + podle buy_price (nákupní cena ze sítě). + 3) Per-slot kapacita přírůstku SoC = max_charge_power × η × 15 min (plný výkon, + ne limitovaný aktuálním PV-surplus výkonem). + + Vrací množinu indexů povolených pro `bc[t] > 0` v MILP. Prázdná množina = žádné + restrikce. `charge_slot_buffer <= 0` v DB ⇒ všechny sloty povoleny. """ charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: @@ -172,28 +181,35 @@ def _select_charge_slots( if energy_to_fill <= 0: return set() - candidates: list[tuple[int, float, float]] = [] - for t, s in enumerate(slots): - pv_surplus = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) - if pv_surplus <= 0: - continue - charge_w = min(float(battery.max_charge_power_w), float(pv_surplus)) - charge_wh = charge_w * float(battery.charge_efficiency) * INTERVAL_H - candidates.append((t, float(s.sell_price), charge_wh)) - - candidates.sort(key=lambda x: x[1]) + eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) + max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) + per_slot_full_wh = max_p_w * eta * INTERVAL_H selected: set[int] = set() + pv_budget_wh = 0.0 + for t, s in enumerate(slots): + pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) + if pv_surplus_w <= 0: + continue + selected.add(t) + pv_budget_wh += min(float(pv_surplus_w), max_p_w) * eta * INTERVAL_H + + target_wh = energy_to_fill * charge_buf + remaining_wh = max(0.0, target_wh - pv_budget_wh) + if remaining_wh <= 0 or per_slot_full_wh <= 0: + return selected + + grid_candidates = [ + (t, float(s.buy_price)) for t, s in enumerate(slots) if t not in selected + ] + grid_candidates.sort(key=lambda x: x[1]) + cumulative = 0.0 - target = energy_to_fill * charge_buf - for t, _price, wh in candidates: - if cumulative >= target: + for t, _price in grid_candidates: + if cumulative >= remaining_wh: break selected.add(t) - cumulative += wh - - if cumulative < energy_to_fill: - selected = set(c[0] for c in candidates) + cumulative += per_slot_full_wh return selected diff --git a/backend/tests/test_planning_charge_slot_selection.py b/backend/tests/test_planning_charge_slot_selection.py new file mode 100644 index 0000000..0166343 --- /dev/null +++ b/backend/tests/test_planning_charge_slot_selection.py @@ -0,0 +1,128 @@ +"""`_select_charge_slots`: pre-selection nabíjecích slotů (anti-micro-cycling). + +Ověřuje novou logiku podle varianty B: + - PV-surplus sloty jsou vždy zahrnuty. + - Zbytek rozpočtu doplnit nejlevnějšími sloty podle `buy_price` (ne `sell_price`). + - Žádné sloty nesmí být vyloučeny kvůli tomu, že nemají PV-surplus, když + `charge_slot_buffer` > 0 a ještě chybí energie do `soc_max`. +""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timezone +from types import SimpleNamespace + +from services.planning_engine import INTERVAL_H, PlanningSlot, _select_charge_slots + + +def _slot(*, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000) -> PlanningSlot: + return PlanningSlot( + interval_start=datetime(2026, 4, 19, 12, 0, tzinfo=timezone.utc), + buy_price=buy, + sell_price=sell, + pv_a_forecast_w=0, + pv_b_forecast_w=pv, + load_baseline_w=load, + ev1_connected=False, + ev2_connected=False, + ) + + +def _battery( + *, + charge_buf: float = 1.3, + uc_wh: float = 64_000.0, + soc_max_pct: float = 95.0, + max_charge_w: float = 18_000.0, + charge_eff: float = 0.95, +) -> SimpleNamespace: + return SimpleNamespace( + usable_capacity_wh=uc_wh, + soc_max_wh=soc_max_pct / 100.0 * uc_wh, + max_charge_power_w=max_charge_w, + charge_efficiency=charge_eff, + charge_slot_buffer=charge_buf, + ) + + +class SelectChargeSlotsTests(unittest.TestCase): + def test_buffer_zero_returns_all_slots(self) -> None: + slots = [_slot(buy=3.0) for _ in range(4)] + battery = _battery(charge_buf=0.0) + out = _select_charge_slots(slots, battery, current_soc_wh=0.0) + self.assertEqual(out, set(range(4))) + + def test_pv_surplus_slot_always_selected_regardless_of_buy_price(self) -> None: + """Slot s PV-surplus má být in, i když má nejvyšší buy_price.""" + slots = [ + _slot(buy=0.5, pv=0, load=2_000), # bez PV, levný grid + _slot(buy=9.9, pv=8_000, load=2_000), # velký PV-surplus, drahý grid + ] + battery = _battery() + out = _select_charge_slots(slots, battery, current_soc_wh=0.0) + self.assertIn(1, out) + + def test_grid_filler_prefers_lowest_buy_price(self) -> None: + """Mezi neprvními sloty se vybere ten s nejnižší buy_price.""" + slots = [ + _slot(buy=3.0, pv=0, load=2_000, sell=0.1), + _slot(buy=0.4, pv=0, load=2_000, sell=0.3), + _slot(buy=1.2, pv=0, load=2_000, sell=0.2), + ] + battery = _battery( + charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0 + ) + out = _select_charge_slots(slots, battery, current_soc_wh=0.0) + self.assertIn(1, out) + + def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None: + """Regrese: dřívější logika vyřazovala sloty bez PV-surplus úplně.""" + slots = [ + _slot(buy=0.4, pv=3_320, load=3_747), + _slot(buy=0.42, pv=2_116, load=3_747), + _slot(buy=0.44, pv=1_649, load=3_747), + _slot(buy=0.47, pv=1_276, load=3_747), + _slot(buy=1.13, pv=1_286, load=523), + _slot(buy=1.60, pv=1_020, load=523), + ] + battery = _battery() + out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) + for idx in (0, 1, 2, 3): + self.assertIn( + idx, + out, + msg=( + f"Slot {idx} (levný grid nákup ~0.4 Kč) musí být povolen pro " + "nabíjení i bez PV-surplus, jinak optimizer skončí s dražším " + "nákupem v pozdějších slotech (nelogická ekonomika)." + ), + ) + + def test_energy_budget_is_charge_buf_times_headroom(self) -> None: + """Součet uvolněné energie se pohybuje okolo charge_buf × (soc_max − current_soc).""" + slots = [_slot(buy=float(i + 1), pv=0, load=2_000) for i in range(40)] + battery = _battery( + charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0 + ) + current_soc_wh = 0.2 * battery.usable_capacity_wh + target = battery.charge_slot_buffer * (battery.soc_max_wh - current_soc_wh) + per_slot_wh = ( + battery.max_charge_power_w * battery.charge_efficiency * INTERVAL_H + ) + out = _select_charge_slots(slots, battery, current_soc_wh=current_soc_wh) + slots_picked = len(out) + self.assertLessEqual((slots_picked - 1) * per_slot_wh, target) + self.assertGreaterEqual(slots_picked * per_slot_wh, target) + + def test_returns_empty_when_battery_is_full(self) -> None: + slots = [_slot(buy=0.1) for _ in range(3)] + battery = _battery() + out = _select_charge_slots( + slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 + ) + self.assertEqual(out, set()) + + +if __name__ == "__main__": + unittest.main() diff --git a/docs/04-modules/modbus-registers.md b/docs/04-modules/modbus-registers.md index 12d51b9..0ef4253 100644 --- a/docs/04-modules/modbus-registers.md +++ b/docs/04-modules/modbus-registers.md @@ -60,7 +60,7 @@ Režim **CHARGE_CHEAP** v EMS nastaví `grid_setpoint_w` tak, aby platila podmí ### Čtyři typy slotů a mapování na registry -Solver předvybírá sloty pro nabíjení a export-vybíjení (`_select_charge_slots`, `_select_discharge_export_slots`). Výsledné setpointy pak určují typ slotu: +Solver předvybírá sloty pro nabíjení a export-vybíjení (`_select_charge_slots`, `_select_discharge_export_slots`). Nabíjení: vždy povoleno v slotech s PV-surplus; zbytek rozpočtu (`charge_slot_buffer × (soc_max − current_soc) − PV přínos`) doplněn nejlevnějšími sloty podle **`buy_price`** (nákupní cena ze sítě). Export-vybíjení: top-N slotů podle nejvyšší **`sell_price`**. Výsledné setpointy pak určují typ slotu: | | **Charge** | **Pass-through** | **Discharge-export** | **Self-consumption** | |---|---|---|---|---|