prepsani s opusem dle planu
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-24 22:44:21 +02:00
parent 2d021b15c3
commit 8bef1c6da6
11 changed files with 720 additions and 16 deletions

View File

@@ -73,6 +73,108 @@ def _buy_min_next_n(
return min(tail) if tail else None
def _pv_surplus_w(s: PlanningSlot) -> int:
return max(0, int(s.pv_a_forecast_w) + int(s.pv_b_forecast_w) - int(s.load_baseline_w))
def _first_neg_sell_ord(slots: list[PlanningSlot]) -> int | None:
for i, s in enumerate(slots):
if float(s.sell_price) < 0:
return i
return None
def _apply_dynamic_grid_filter(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
grid_slots: set[int],
) -> set[int]:
"""Self-konzistentni filtr vrstvy B (kopie R__063 A2)."""
if not grid_slots:
return grid_slots
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
first_neg = _first_neg_sell_ord(slots)
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
soc_max = float(battery.soc_max_wh)
deficit = max(0.0, soc_max - float(current_soc_wh))
threshold = deficit * 0.6
t_len = len(slots)
pv_ahead = [0.0] * t_len
neg_ahead = [0.0] * t_len
for t in range(t_len):
pv_sum = 0.0
neg_sum = 0.0
for w2 in range(t, t_len):
if first_neg is not None and w2 >= first_neg:
break
s2 = slots[w2]
pv_s = _pv_surplus_w(s2)
if pv_s > 0 and (float(s2.sell_price) < 0 or float(s2.buy_price) < 0):
pv_sum += min(pv_s, max_p_w) * eta * INTERVAL_H
if float(s2.buy_price) < 0:
neg_sum += per_slot_wh
pv_ahead[t] = min(pv_sum, deficit)
neg_ahead[t] = neg_sum
slots[t].pv_charge_wh_ahead = pv_ahead[t]
slots[t].neg_buy_wh_ahead = neg_ahead[t]
remaining = set(grid_slots)
acq_prev = -999.0
for _ in range(5):
if not remaining:
break
total_wh = len(remaining) * per_slot_wh
if total_wh <= 0:
acq = min(float(s.buy_price) for s in slots)
else:
acq = sum(float(slots[t].buy_price) * per_slot_wh for t in remaining) / total_wh
if abs(acq - acq_prev) < 0.05:
break
acq_prev = acq
to_remove: set[int] = set()
for t in list(remaining):
s = slots[t]
if float(s.buy_price) < 0:
continue
if float(s.buy_price) > acq - degrad and pv_ahead[t] + neg_ahead[t] >= threshold:
to_remove.add(t)
slots[t].grid_charge_suppressed_reason = (
"cheaper_pv_ahead"
if pv_ahead[t] >= neg_ahead[t]
else "cheaper_neg_buy_ahead"
)
if not to_remove:
break
remaining -= to_remove
cum_allowed = len(remaining) * per_slot_wh
pv0 = pv_ahead[0] if t_len else 0.0
target_deficit = deficit - pv0
if cum_allowed < target_deficit * 0.6:
suppressed = sorted(
[
t
for t in grid_slots
if t not in remaining and slots[t].grid_charge_suppressed_reason
],
key=lambda t: (float(slots[t].buy_price), t),
)
for t in suppressed:
if float(slots[t].buy_price) >= 2 * acq_prev:
break
remaining.add(t)
slots[t].grid_charge_suppressed_reason = "safety_failsafe_unlock"
cum_allowed += per_slot_wh
if cum_allowed >= target_deficit * 0.6:
break
return remaining
def _store_score(slots: list[PlanningSlot], t: int) -> float:
s = slots[t]
buy = float(s.buy_price)
@@ -87,6 +189,7 @@ def _select_charge_slots(
current_soc_wh: float,
*,
purchase_pricing_mode: str = "spot",
apply_dynamic_grid_filter: bool = True,
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
@@ -141,6 +244,7 @@ def _select_charge_slots(
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
grid_selected: set[int] = set()
grid_filled_wh = 0.0
buf_mult = charge_buf if charge_buf > 0 else 1.0
@@ -185,6 +289,7 @@ def _select_charge_slots(
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
grid_selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
@@ -210,6 +315,7 @@ def _select_charge_slots(
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
grid_selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
@@ -217,6 +323,14 @@ def _select_charge_slots(
for t, s in enumerate(slots):
if float(s.buy_price) < 0:
selected.add(t)
grid_selected.add(t)
if apply_dynamic_grid_filter:
filtered_grid = _apply_dynamic_grid_filter(
slots, battery, current_soc_wh, grid_selected
)
for t in grid_selected - filtered_grid:
selected.discard(t)
elif purchase_pricing_mode == "fixed" and any(
float(s.sell_price) > float(s.buy_price) + degrad for s in slots
@@ -695,6 +809,115 @@ class SelectDischargeExportSlotsTests(unittest.TestCase):
self.assertNotIn(1, discharge)
class DynamicGridFilterTests(unittest.TestCase):
def _range(self, start_h: int, end_h: int, **kwargs) -> list[PlanningSlot]:
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
out: list[PlanningSlot] = []
for h in range(start_h, end_h):
for minute in (0, 15, 30, 45):
t = base.replace(hour=h, minute=minute).astimezone(timezone.utc)
out.append(
_slot(
interval_start=t,
hour_utc=t.hour,
buy=kwargs.get("buy", 4.5),
sell=kwargs.get("sell", 2.0),
pv=kwargs.get("pv_b", 0),
load=kwargs.get("load", 500),
)
)
if "pv_a" in kwargs:
out[-1] = PlanningSlot(
interval_start=t,
buy_price=float(kwargs.get("buy", 4.5)),
sell_price=float(kwargs.get("sell", 2.0)),
pv_a_forecast_w=int(kwargs["pv_a"]),
pv_b_forecast_w=int(kwargs.get("pv_b", 0)),
load_baseline_w=int(kwargs.get("load", 500)),
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
return out
def _home01_battery(self) -> SimpleNamespace:
return _battery(charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0)
def _uniform_buy_slots(self, buy: float, n: int = 96) -> list[PlanningSlot]:
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
return [
_slot(
buy=buy,
sell=2.0,
load=500,
interval_start=(base + timedelta(minutes=15 * i)).astimezone(timezone.utc),
)
for i in range(n)
]
def test_home01_night_charge_before_neg_sell_pv_day(self) -> None:
slots = [
*self._range(0, 5, buy=4.7, sell=2.9),
*self._range(5, 7, buy=5.0, sell=3.0, pv_b=400),
*self._range(7, 11, buy=4.5, sell=2.8, pv_a=3000, pv_b=2000),
*self._range(11, 14, buy=0.5, sell=-0.4, pv_a=6000, pv_b=5000),
*self._range(14, 17, buy=1.0, sell=-0.3, pv_a=5000, pv_b=4000),
*self._range(17, 19, buy=4.5, sell=3.0),
*self._range(19, 22, buy=6.5, sell=4.0),
*self._range(22, 24, buy=4.8, sell=3.0),
]
selected = _select_charge_slots(
slots, self._home01_battery(), current_soc_wh=30_000.0
)
for i in range(88, 96):
self.assertNotIn(i, selected, f"slot {i} (noc) nema byt allow_grid_charge")
self.assertTrue(any(i in selected for i in range(44, 56)))
def test_cloudy_day_no_pv_grid_unlock(self) -> None:
slots = self._uniform_buy_slots(4.5 + 0.1)
selected = _select_charge_slots(
slots, self._home01_battery(), current_soc_wh=30_000.0
)
self.assertGreater(len(selected), 4, "failsafe musel uvolnit nejake sloty")
def test_ba81_fixed_tariff_mask_unchanged(self) -> None:
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
slots_fixed = self._uniform_buy_slots(buy=3.5)
selected_v1 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
apply_dynamic_grid_filter=False,
)
selected_v2 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
)
self.assertEqual(selected_v1, selected_v2)
def test_kv1_block_export_unchanged(self) -> None:
"""KV1: filtr vrstvy B beze zmeny u fixed tarifu (stejne jako BA81)."""
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
slots_fixed = self._uniform_buy_slots(buy=3.5)
selected_v1 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
apply_dynamic_grid_filter=False,
)
selected_v2 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
)
self.assertEqual(selected_v1, selected_v2)
class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_skips_grid_charge_when_no_sell_arbitrage(self) -> None:
"""Fixní buy bez výkupu nad buy+degrad → žádné grid nabíjení."""