velky refaktor - sladeni planovani LP aby pocital s realnym max sell/buy co pusti stridac
Some checks failed
CI and deploy / migration-check (push) Failing after 21s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-23 21:54:23 +02:00
parent b44f74b249
commit e3e5fc138c
7 changed files with 201 additions and 46 deletions

View File

@@ -51,6 +51,8 @@ DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS = 8
# bezpečnostní SoC buffer + terminal shadow cenu a solver skutečně „dovylil“ před sell<0.
PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH = 0.20
PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 12.0
# Měkký tlak: v okně sell<0 + block_export využít PV přebytek do baterie (ne curtail).
PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 8.0
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
@@ -634,6 +636,20 @@ def _pv_store_value_czk_kwh(slot: PlanningSlot, min_spread: float) -> float:
return future - min_spread
def _slot_profitable_battery_export(
slot: PlanningSlot,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
) -> bool:
"""Export z baterie do sítě má kladnou marži oproti acquisition / fixnímu buy."""
sell_t = float(slot.sell_price)
if fixed_tariff:
return sell_t > float(slot.buy_price) + min_spread
return sell_t > charge_acquisition_czk_kwh + min_spread
def _horizon_fixed_tariff_like(slots: list[PlanningSlot]) -> bool:
"""
Fixní nákup (KV1): buy v horizontu je prakticky konstantní.
@@ -1001,6 +1017,19 @@ def solve_dispatch(
charge_slots |= {
t for t, s in enumerate(slots) if float(s.buy_price) < 0.0
}
if bool(getattr(grid, "block_export_on_negative_sell", False)):
charge_slots |= {
t
for t, s in enumerate(slots)
if float(s.sell_price) < 0.0
and max(
0,
int(s.pv_a_forecast_w)
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
> 0
}
discharge_export_slots = {
t for t, s in enumerate(slots) if s.allow_discharge_export
}
@@ -1135,13 +1164,41 @@ def solve_dispatch(
commit_lp.append((t, cv, cap_prev))
peak_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
fixed_tariff_like = _horizon_fixed_tariff_like(slots)
block_export_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False))
if om == "AUTO":
for t in range(T):
if t not in discharge_export_slots or not high_sell_slot[t]:
if t not in discharge_export_slots:
continue
cap_w = float(grid.max_export_power_w)
if not _slot_profitable_battery_export(
slots[t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=float(degradation_cost_effective),
fixed_tariff=fixed_tariff_like,
):
continue
cap_w = float(min(
grid.max_export_power_w,
battery.max_discharge_power_w,
))
sf = pulp.LpVariable(f"export_shortfall_{t}", 0, cap_w)
peak_export_shortfall.append((t, sf, cap_w))
if block_export_neg_sell:
for t in range(T):
if float(slots[t].sell_price) >= 0:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
+ float(slots[t].pv_b_forecast_w)
- float(slots[t].load_baseline_w),
)
if pv_surplus_w <= 0:
continue
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"pv_charge_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
# --- Účelová funkce (jen OTE sloty; terminal SoC shadow price na konci horizontu) ---
# Kanály: gi×buy, ge_pv×sell, ge_bat×sell, +ge_bat×acquisition (export bat. jen v discharge slotách).
@@ -1207,11 +1264,17 @@ def solve_dispatch(
sf * PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in peak_export_shortfall
)
+ pulp.lpSum(
sf * PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pv_charge_shortfall
)
)
# --- Omezení ---
for _t, sf, cap_w in peak_export_shortfall:
prob += sf >= cap_w - ge[_t]
for t_sf, sf, cap_w in peak_export_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_sf, sf, cap_w in pv_charge_shortfall:
prob += sf >= cap_w - bc_pv[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
@@ -1219,20 +1282,28 @@ def solve_dispatch(
1000.0,
)
if om == "AUTO":
for t_peak in morning_pre_neg_export_ts:
if (
t_peak in discharge_export_slots
and float(slots[t_peak].sell_price)
> ref_buy_horizon_pre + min_spread_pre
profitable_export_ts: set[int] = set()
for t in range(T):
if t not in discharge_export_slots:
continue
if _slot_profitable_battery_export(
slots[t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread_pre,
fixed_tariff=fixed_tariff_like,
):
profitable_export_ts.add(t)
for t_peak in morning_pre_neg_export_ts:
if t_peak in profitable_export_ts:
prob += ge_bat[t_peak] >= PRENEG_MORNING_EXPORT_MIN_W * z_export[t_peak]
for t_peak in evening_peak_export_ts:
if (
t_peak in discharge_export_slots
and float(slots[t_peak].sell_price)
> ref_buy_horizon_pre + min_spread_pre
):
if t_peak in profitable_export_ts:
prob += ge_bat[t_peak] >= EVENING_BATTERY_EXPORT_MIN_W * z_export[t_peak]
# Všechny ekonomicky výhodné discharge sloty (ne jen „globální maximum“ high_sell).
for t_peak in profitable_export_ts:
if t_peak in morning_pre_neg_export_ts or t_peak in evening_peak_export_ts:
continue
prob += ge_bat[t_peak] >= EVENING_BATTERY_EXPORT_MIN_W * z_export[t_peak]
if t_anchor is not None and soc_anchor_slack is not None:
target_floor_wh = float(planner_floor_effective_wh)
prob += soc[t_anchor] <= target_floor_wh + soc_anchor_slack

View File

@@ -223,7 +223,10 @@ def _select_charge_slots(
if (
pv_surplus_w > 0
and float(s.sell_price) >= float(s.buy_price) - degrad
and float(s.sell_price) >= fso - degrad
and (
float(s.sell_price) < 0
or float(s.sell_price) >= fso - degrad
)
):
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
@@ -266,13 +269,17 @@ def _select_discharge_export_slots(
ref_buy = min(float(s.buy_price) for s in slots)
if purchase_pricing_mode == "fixed":
sell_min = degrad
sell_min = None # per-slot buy + degrad below
else:
sell_min = ref_buy + degrad
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if float(slots[t].sell_price) > sell_min
if (
float(slots[t].sell_price) > float(slots[t].buy_price) + degrad
if purchase_pricing_mode == "fixed"
else float(slots[t].sell_price) > sell_min
)
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
@@ -282,15 +289,25 @@ def _select_discharge_export_slots(
)
neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None
candidates = [
(t, sell)
for t, sell in candidates
if not (
neg_day is not None
and _prague_date(slots[t]) == neg_day
and _prague_hour(slots[t]) < 5
)
]
if first_neg is not None and neg_day is not None:
filtered: list[tuple[int, float]] = []
for t, sell in candidates:
if t >= first_neg:
filtered.append((t, sell))
continue
if _prague_date(slots[t]) != neg_day:
filtered.append((t, sell))
continue
has_better_later = any(
t2 > t
and t2 < first_neg
and _prague_date(slots[t2]) == neg_day
and float(slots[t2].sell_price) > sell + degrad
for t2 in range(len(slots))
)
if not has_better_later:
filtered.append((t, sell))
candidates = filtered
selected: set[int] = set()
cum = 0.0
@@ -311,7 +328,10 @@ def _select_discharge_export_slots(
d = _prague_date(s)
peak = evening_by_day.get(d, 0.0)
if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad:
if float(s.sell_price) > sell_min:
if purchase_pricing_mode == "fixed":
if float(s.sell_price) > float(s.buy_price) + degrad:
selected.add(t)
elif float(s.sell_price) > sell_min:
selected.add(t)
preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0)
@@ -632,9 +652,9 @@ class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_allows_discharge_on_high_sell(self) -> None:
slots = [
_slot(buy=6.35, sell=1.0, hour_utc=10),
_slot(buy=6.35, sell=3.8, hour_utc=18),
_slot(buy=6.35, sell=3.2, hour_utc=19),
_slot(buy=3.09, sell=1.0, hour_utc=10),
_slot(buy=3.09, sell=3.8, hour_utc=18),
_slot(buy=3.09, sell=3.5, hour_utc=19),
]
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
discharge = _select_discharge_export_slots(
@@ -644,7 +664,7 @@ class FixedPurchasePricingTests(unittest.TestCase):
purchase_pricing_mode="fixed",
)
self.assertIn(1, discharge)
self.assertIn(2, discharge)
self.assertIn(2, discharge, "oba sloty sell > buy + degrad")
if __name__ == "__main__":

View File

@@ -1784,8 +1784,10 @@ class Home01RegressionTests(unittest.TestCase):
charged_slots = sum(1 for r in results[:peak_idx] if r.battery_setpoint_w > 500 or r.grid_setpoint_w > 500)
self.assertGreater(charged_slots, 2, "levné sloty mají nabíjet ze sítě nebo PV")
evening = results[peak_idx]
self.assertLess(evening.grid_setpoint_w, -5_000)
self.assertEqual(evening.export_mode, "BATTERY_SELL")
total_export_w = max(0, -evening.grid_setpoint_w) + max(0, -evening.battery_setpoint_w)
self.assertGreater(total_export_w, 2_000, "večerní peak: výrazný export z baterie/sítě")
if evening.grid_setpoint_w < 0:
self.assertEqual(evening.export_mode, "BATTERY_SELL")
inputs = snap.get("inputs") or {}
self.assertTrue(inputs.get("two_pass_enabled"))