fix hard limit pro nabijeni
All checks were successful
CI and deploy / migration-check (push) Successful in 4s
CI and deploy / deploy (push) Successful in 30s

This commit is contained in:
Dusan Vojacek
2026-04-19 14:23:10 +02:00
parent ee27f4e3fd
commit 0814b1d8e8
3 changed files with 166 additions and 22 deletions

View File

@@ -159,10 +159,19 @@ def _select_charge_slots(
current_soc_wh: float,
) -> set[int]:
"""
Pre-select which slots are eligible for battery charging.
Only the X cheapest sell-price PV-surplus slots are selected,
enough to fill the battery with a configurable buffer.
Returns set of slot indices. Empty set = no restriction.
Pre-select which slots are eligible for battery charging (anti-micro-cycling).
Logika:
1) Sloty s PV-surplus (pv_a + pv_b > load_baseline) jsou vždy zahrnuty
jde o „zdarma“ nabíjení z FVE, nemá smysl ho zakazovat.
2) Zbývající energetický rozpočet (cíl = charge_buf × (soc_max current_soc),
snížený o očekávaný přínos z PV-surplus slotů) se doplní nejlevnějšími sloty
podle buy_price (nákupní cena ze sítě).
3) Per-slot kapacita přírůstku SoC = max_charge_power × η × 15 min (plný výkon,
ne limitovaný aktuálním PV-surplus výkonem).
Vrací množinu indexů povolených pro `bc[t] > 0` v MILP. Prázdná množina = žádné
restrikce. `charge_slot_buffer <= 0` v DB ⇒ všechny sloty povoleny.
"""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
@@ -172,28 +181,35 @@ def _select_charge_slots(
if energy_to_fill <= 0:
return set()
candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
if pv_surplus <= 0:
continue
charge_w = min(float(battery.max_charge_power_w), float(pv_surplus))
charge_wh = charge_w * float(battery.charge_efficiency) * INTERVAL_H
candidates.append((t, float(s.sell_price), charge_wh))
candidates.sort(key=lambda x: x[1])
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
selected: set[int] = set()
pv_budget_wh = 0.0
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
if pv_surplus_w <= 0:
continue
selected.add(t)
pv_budget_wh += min(float(pv_surplus_w), max_p_w) * eta * INTERVAL_H
target_wh = energy_to_fill * charge_buf
remaining_wh = max(0.0, target_wh - pv_budget_wh)
if remaining_wh <= 0 or per_slot_full_wh <= 0:
return selected
grid_candidates = [
(t, float(s.buy_price)) for t, s in enumerate(slots) if t not in selected
]
grid_candidates.sort(key=lambda x: x[1])
cumulative = 0.0
target = energy_to_fill * charge_buf
for t, _price, wh in candidates:
if cumulative >= target:
for t, _price in grid_candidates:
if cumulative >= remaining_wh:
break
selected.add(t)
cumulative += wh
if cumulative < energy_to_fill:
selected = set(c[0] for c in candidates)
cumulative += per_slot_full_wh
return selected

View File

@@ -0,0 +1,128 @@
"""`_select_charge_slots`: pre-selection nabíjecích slotů (anti-micro-cycling).
Ověřuje novou logiku podle varianty B:
- PV-surplus sloty jsou vždy zahrnuty.
- Zbytek rozpočtu doplnit nejlevnějšími sloty podle `buy_price` (ne `sell_price`).
- Žádné sloty nesmí být vyloučeny kvůli tomu, že nemají PV-surplus, když
`charge_slot_buffer` > 0 a ještě chybí energie do `soc_max`.
"""
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from types import SimpleNamespace
from services.planning_engine import INTERVAL_H, PlanningSlot, _select_charge_slots
def _slot(*, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000) -> PlanningSlot:
return PlanningSlot(
interval_start=datetime(2026, 4, 19, 12, 0, tzinfo=timezone.utc),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
)
def _battery(
*,
charge_buf: float = 1.3,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
max_charge_w: float = 18_000.0,
charge_eff: float = 0.95,
) -> SimpleNamespace:
return SimpleNamespace(
usable_capacity_wh=uc_wh,
soc_max_wh=soc_max_pct / 100.0 * uc_wh,
max_charge_power_w=max_charge_w,
charge_efficiency=charge_eff,
charge_slot_buffer=charge_buf,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_pv_surplus_slot_always_selected_regardless_of_buy_price(self) -> None:
"""Slot s PV-surplus má být in, i když má nejvyšší buy_price."""
slots = [
_slot(buy=0.5, pv=0, load=2_000), # bez PV, levný grid
_slot(buy=9.9, pv=8_000, load=2_000), # velký PV-surplus, drahý grid
]
battery = _battery()
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(1, out)
def test_grid_filler_prefers_lowest_buy_price(self) -> None:
"""Mezi neprvními sloty se vybere ten s nejnižší buy_price."""
slots = [
_slot(buy=3.0, pv=0, load=2_000, sell=0.1),
_slot(buy=0.4, pv=0, load=2_000, sell=0.3),
_slot(buy=1.2, pv=0, load=2_000, sell=0.2),
]
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(1, out)
def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None:
"""Regrese: dřívější logika vyřazovala sloty bez PV-surplus úplně."""
slots = [
_slot(buy=0.4, pv=3_320, load=3_747),
_slot(buy=0.42, pv=2_116, load=3_747),
_slot(buy=0.44, pv=1_649, load=3_747),
_slot(buy=0.47, pv=1_276, load=3_747),
_slot(buy=1.13, pv=1_286, load=523),
_slot(buy=1.60, pv=1_020, load=523),
]
battery = _battery()
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
for idx in (0, 1, 2, 3):
self.assertIn(
idx,
out,
msg=(
f"Slot {idx} (levný grid nákup ~0.4 Kč) musí být povolen pro "
"nabíjení i bez PV-surplus, jinak optimizer skončí s dražším "
"nákupem v pozdějších slotech (nelogická ekonomika)."
),
)
def test_energy_budget_is_charge_buf_times_headroom(self) -> None:
"""Součet uvolněné energie se pohybuje okolo charge_buf × (soc_max current_soc)."""
slots = [_slot(buy=float(i + 1), pv=0, load=2_000) for i in range(40)]
battery = _battery(
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
)
current_soc_wh = 0.2 * battery.usable_capacity_wh
target = battery.charge_slot_buffer * (battery.soc_max_wh - current_soc_wh)
per_slot_wh = (
battery.max_charge_power_w * battery.charge_efficiency * INTERVAL_H
)
out = _select_charge_slots(slots, battery, current_soc_wh=current_soc_wh)
slots_picked = len(out)
self.assertLessEqual((slots_picked - 1) * per_slot_wh, target)
self.assertGreaterEqual(slots_picked * per_slot_wh, target)
def test_returns_empty_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set())
if __name__ == "__main__":
unittest.main()

View File

@@ -60,7 +60,7 @@ Režim **CHARGE_CHEAP** v EMS nastaví `grid_setpoint_w` tak, aby platila podmí
### Čtyři typy slotů a mapování na registry
Solver předvybírá sloty pro nabíjení a export-vybíjení (`_select_charge_slots`, `_select_discharge_export_slots`). Výsledné setpointy pak určují typ slotu:
Solver předvybírá sloty pro nabíjení a export-vybíjení (`_select_charge_slots`, `_select_discharge_export_slots`). Nabíjení: vždy povoleno v slotech s PV-surplus; zbytek rozpočtu (`charge_slot_buffer × (soc_max current_soc) PV přínos`) doplněn nejlevnějšími sloty podle **`buy_price`** (nákupní cena ze sítě). Export-vybíjení: top-N slotů podle nejvyšší **`sell_price`**. Výsledné setpointy pak určují typ slotu:
| | **Charge** | **Pass-through** | **Discharge-export** | **Self-consumption** |
|---|---|---|---|---|