uz sem zaoufalej
Some checks failed
CI and deploy / migration-check (push) Failing after 11s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-25 02:13:41 +02:00
parent 161b463367
commit 4beb8cf99f
7 changed files with 153 additions and 198 deletions

View File

@@ -64,11 +64,7 @@ NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH = 4.0
# Výboj baterie při sell<0 jen těsně před extrémně záporným buy (round-trip arbitráž).
EXTREME_BUY_DUMP_PREWINDOW_SLOTS = 12
NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH = 80.0
# Měkký tlak: v buy<0 slotech max import ze sítě do baterie (zisk z OTE záporného nákupu).
NEG_BUY_GRID_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 120.0
# Měkký tlak: v buy<0 okně dobít na soc_max (ne zastavit na ~94 %).
NEG_BUY_SOC_UNDERFILL_PENALTY_CZK_PER_WH = 0.45
PLANNER_BUILD_TAG = "2026-05-27-pre-neg-buy-strategy-v19c"
PLANNER_BUILD_TAG = "2026-05-28-revert-v19-hard-v20"
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
@@ -937,41 +933,6 @@ def _evening_battery_export_push_indices(
return sorted(out)
def _negative_buy_slot_indices(slots: list[PlanningSlot]) -> list[int]:
return [t for t, s in enumerate(slots) if float(s.buy_price) < 0.0]
def _neg_buy_soc_pressure_slots(slots: list[PlanningSlot]) -> list[int]:
"""Tlak na soc_max jen na konci buy<0 okna (ne každý 15min slot zvlášť)."""
neg = _negative_buy_slot_indices(slots)
return [neg[-1]] if neg else []
def _pre_neg_buy_discharge_push_indices(
slots: list[PlanningSlot],
pre_neg_buy_discharge_ts: set[int],
*,
max_slots_per_day: int = 8,
) -> list[int]:
"""
Tvrdý push ge_bat jen u nejlepších sell slotů před buy<0.
Jinak součet max výbojů přes celou noc může překročit SoC → Infeasible.
"""
by_day: dict = {}
for t in pre_neg_buy_discharge_ts:
d = _prague_calendar_date(slots[t])
by_day.setdefault(d, []).append(t)
out: list[int] = []
for d in sorted(by_day.keys()):
ranked = sorted(
by_day[d],
key=lambda i: float(slots[i].sell_price),
reverse=True,
)
out.extend(ranked[:max_slots_per_day])
return sorted(out)
def _planner_soc_for_solver(
current_soc_wh: float,
battery,
@@ -1098,13 +1059,11 @@ def solve_dispatch(
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
planner_version: str | None = None,
relaxed_expensive_import: bool = False,
relaxed_neg_buy_pressure: bool = False,
) -> tuple[list[DispatchResult], int, dict[str, Any]]:
"""
LP solver pro dispatch optimalizaci.
Vrátí (výsledky, solver_duration_ms, solver_debug_snapshot).
relaxed_expensive_import: nouzový režim po Infeasible — síť smí krmit baseload v drahých slotech.
relaxed_neg_buy_pressure: vypne měkké v19 shortfall (pre-neg export, neg-buy soc/grid).
"""
T = len(slots)
if T < 1:
@@ -1226,7 +1185,6 @@ def solve_dispatch(
om = (operating_mode or "AUTO").strip().upper()
charge_slots: set[int] = set()
discharge_export_slots: set[int] = set()
pre_neg_buy_discharge_ts: set[int] = set()
if om == "AUTO":
charge_slots = {t for t, s in enumerate(slots) if s.allow_charge}
charge_slots |= {
@@ -1248,26 +1206,6 @@ def solve_dispatch(
discharge_export_slots = {
t for t, s in enumerate(slots) if s.allow_discharge_export
}
# Vybití baterie před `buy<0` oknem: pokud je v horizontu buy<0, můžeme baterii
# vybít teď za `sell` a v buy<0 okně ji nabít za záporný buy (= příjem).
_neg_buy_for_disch = next(
(t for t, s in enumerate(slots) if float(s.buy_price) < 0), None
)
if _neg_buy_for_disch is not None and _neg_buy_for_disch > 0:
_neg_buy_prices = [
float(slots[t].buy_price)
for t in range(_neg_buy_for_disch, T)
if float(slots[t].buy_price) < 0
]
_avg_neg_buy = sum(_neg_buy_prices) / len(_neg_buy_prices) if _neg_buy_prices else 0.0
_disch_sell_thr = max(_avg_neg_buy + float(degradation_cost_effective), 0.1)
for t in range(_neg_buy_for_disch):
st = slots[t]
if float(st.sell_price) < _disch_sell_thr:
continue
# Jen kladný sell (sell<0 + z_export by vynutilo ge_bat≥1 zároveň s ge_bat=0).
if float(st.sell_price) >= 1.0:
pre_neg_buy_discharge_ts.add(t)
# SELF_SUSTAIN dřív vynucoval ge[t] == 0, což umí udělat MILP infeasible v okamžiku, kdy:
# - baterie je na max SoC (nelze nabíjet),
# - PV pole B není curtailable,
@@ -1445,9 +1383,6 @@ def solve_dispatch(
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable]] = []
neg_buy_soc_underfill: list[tuple[int, pulp.LpVariable]] = []
neg_buy_grid_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_buy_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
fixed_tariff_like = fixed_tariff_like_pre
block_export_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False))
if om == "AUTO":
@@ -1499,17 +1434,11 @@ def solve_dispatch(
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"post_neg_pv_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
_neg_buy_idx_soc = next(
(t for t, s in enumerate(slots) if float(s.buy_price) < 0), None
)
for t in range(T):
if float(slots[t].sell_price) >= 0:
continue
if t not in charge_slots:
continue
# Před buy<0 nehonit soc_max — kapacitu šetříme na záporný nákup.
if _neg_buy_idx_soc is not None and t < _neg_buy_idx_soc:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
@@ -1524,35 +1453,6 @@ def solve_dispatch(
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append((t, us))
if not relaxed_neg_buy_pressure:
for t in _neg_buy_soc_pressure_slots(slots):
us_buy = pulp.LpVariable(
f"neg_buy_soc_under_{t}",
0,
float(battery.usable_capacity_wh),
)
neg_buy_soc_underfill.append((t, us_buy))
for t in range(T):
if float(slots[t].buy_price) >= 0:
continue
headroom_wh = float(battery.soc_max_wh) - float(soc_panel_min[t])
if headroom_wh < 500.0:
continue
cap_gi = float(
min(
battery.max_charge_power_w,
grid.max_import_power_w,
headroom_wh / max(INTERVAL_H * battery.charge_efficiency, 1e-6),
)
)
if cap_gi < 500.0:
continue
sf_gi = pulp.LpVariable(f"neg_buy_gi_shortfall_{t}", 0, cap_gi)
neg_buy_grid_shortfall.append((t, sf_gi, cap_gi))
for t in pre_neg_buy_discharge_ts:
cap_pre = float(_battery_export_cap_w(battery, grid))
sf_pre = pulp.LpVariable(f"preneg_buy_disch_sf_{t}", 0, cap_pre)
pre_neg_buy_export_shortfall.append((t, sf_pre, cap_pre))
for t in neg_sell_bat_dump_slots:
dump_target_w = _battery_export_cap_w(battery, grid)
sf_dump = pulp.LpVariable(f"neg_bat_dump_shortfall_{t}", 0, dump_target_w)
@@ -1662,18 +1562,6 @@ def solve_dispatch(
us * NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH
for _t, us in neg_sell_soc_underfill
)
+ pulp.lpSum(
us * NEG_BUY_SOC_UNDERFILL_PENALTY_CZK_PER_WH
for _t, us in neg_buy_soc_underfill
)
+ pulp.lpSum(
sf * NEG_BUY_GRID_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_buy_grid_shortfall
)
+ pulp.lpSum(
sf * PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_buy_export_shortfall
)
+ pulp.lpSum(
sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_sell_bat_dump_shortfall
@@ -1683,10 +1571,6 @@ def solve_dispatch(
for t in range(T)
if t in discharge_export_slots and t in profitable_export_ts_pre
)
+ pulp.lpSum(
-35.0 * z_export[t]
for t in pre_neg_buy_discharge_ts
)
)
# --- Omezení ---
@@ -1698,12 +1582,6 @@ def solve_dispatch(
prob += sf >= cap_w - ge_bat[t_sf]
for t_us, us in neg_sell_soc_underfill:
prob += us >= float(battery.soc_max_wh) - soc[t_us]
for t_us, us in neg_buy_soc_underfill:
prob += us >= float(battery.soc_max_wh) - soc[t_us]
for t_sf, sf, cap_w in neg_buy_grid_shortfall:
prob += sf >= cap_w - bc_gi[t_sf]
for t_sf, sf, cap_w in pre_neg_buy_export_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
@@ -1727,7 +1605,6 @@ def solve_dispatch(
if t_peak not in discharge_export_slots:
continue
prob += ge_bat[t_peak] >= export_push_w * z_export[t_peak]
# Pre-neg-buy: jen shortfall (tvrdý push by kolidoval s kotvou SoC v krátkých testech).
# Ostatní profitable sloty: shortfall penalizace (ne tvrdý push na celý horizont).
if t_anchor is not None and soc_anchor_slack is not None:
target_floor_wh = float(planner_floor_effective_wh)
@@ -1853,7 +1730,7 @@ def solve_dispatch(
block_neg_sell_export_t = bool(
getattr(grid, "block_export_on_negative_sell", False)
)
if t not in neg_sell_bat_dump_slots and t not in pre_neg_buy_discharge_ts:
if t not in neg_sell_bat_dump_slots:
prob += ge_bat[t] == 0
ev_cap_neg = sum(
float(vehicles[e].max_charge_power_w)
@@ -1959,12 +1836,6 @@ def solve_dispatch(
# Bez hluboké relaxace: export končí ≥ rezerva. Při hluboké relaxaci (soc_panel_min pod min_soc)
# sladit s LP spodkem — jinak z_export vynutil arb_base a blokoval vývoz k planner floor.
if (
om == "AUTO"
and t in pre_neg_buy_discharge_ts
and floor_pct is not None
):
export_soc_floor_t = float(planner_floor_effective_wh)
elif (
om == "AUTO"
and first_neg_sell_idx is not None
and t < first_neg_sell_idx
@@ -1995,7 +1866,6 @@ def solve_dispatch(
tgt_s is not None
and not high_sell_slot[t]
and t not in profitable_export_ts_pre
and t not in pre_neg_buy_discharge_ts
and not (
om == "AUTO"
and t in discharge_export_slots
@@ -2042,16 +1912,6 @@ def solve_dispatch(
prob += bd[t] == 0
# Slot pre-selection (z DB fn_load_planning_slots_full → allow_*)
# PŘED prvním buy<0 slotem v horizontu (= rezervační okno):
# - sell ≥ 0 → bc_pv = 0 (PV poteče do gridu / curtail, baterka se nenabíjí z PV
# protože v buy<0 okně bude akvizice levnější — záporná).
# - sell < 0 → slot je v charge_slots (R__063), bc_pv ≤ pv_surplus (= nemůžeme
# pole A vyhodit do mínusu, raději nabít baterii).
# JINDY (po buy<0 okně, nebo žádné buy<0 v horizontu): původní permissive
# bc_pv ≤ pv_surplus aby nedošlo k regresi normálních dnů.
_neg_buy_idx_main = next(
(t for t, s in enumerate(slots) if float(s.buy_price) < 0), None
)
if om == "AUTO":
for t in range(T):
s = slots[t]
@@ -2062,12 +1922,6 @@ def solve_dispatch(
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
# Grid→bat (bc_gi): R__063 dává allow_charge=true ze dvou různých důvodů:
# (a) ekonomicky výhodný grid charge slot (nízký buy, výhodná arbitráž),
# (b) sell<0 + pv_surplus (= "povolit PV nabíjení aby pole A nešlo do mínusu").
# V druhém případě bc_gi NESMÍ být povoleno (home-01 run 16652: 09:1509:45
# nabíjelo 18 kW ze sítě za buy 1,11,2 Kč jen proto, že sell=0,2).
# Druhý případ poznáme přes `sell<0 + pv_surplus>0`.
if (
t in charge_slots
and sell_t_pre < 0
@@ -2075,34 +1929,20 @@ def solve_dispatch(
and float(s.buy_price) >= 0.0
):
prob += bc_gi[t] == 0
pv_surplus_w = max(
0,
int(s.pv_a_forecast_w)
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
in_pre_neg_buy_window = (
_neg_buy_idx_main is not None and t < _neg_buy_idx_main
)
if (
in_pre_neg_buy_window
and t in charge_slots
and float(s.sell_price) < 0.0
):
# sell<0+PV charge_slots před buy<0: neplnit z PV A (kapacita pro import).
prob += bc_pv[t] == 0
elif t not in charge_slots:
if t not in charge_slots:
pv_surplus_w = max(
0,
int(s.pv_a_forecast_w)
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
if float(s.buy_price) >= 0.0:
prob += bc_gi[t] == 0
if pv_surplus_w <= 0:
prob += bc_pv[t] == 0
else:
prob += bc_pv[t] <= float(pv_surplus_w)
if (
t not in discharge_export_slots
and t not in neg_sell_bat_dump_slots
and t not in pre_neg_buy_discharge_ts
):
if t not in discharge_export_slots and t not in neg_sell_bat_dump_slots:
prob += ge_bat[t] == 0
prob += z_export[t] == 0
@@ -2285,28 +2125,6 @@ def solve_dispatch(
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
relaxed_expensive_import=True,
relaxed_neg_buy_pressure=relaxed_neg_buy_pressure,
)
if not relaxed_neg_buy_pressure:
logger.warning(
"solve_dispatch Infeasible, retry with relaxed_neg_buy_pressure "
"(skip v19 soft shortfalls)"
)
return solve_dispatch(
slots,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
relaxed_expensive_import=True,
relaxed_neg_buy_pressure=True,
)
raise RuntimeError(f"Solver: {pulp.LpStatus[status]}")
@@ -2538,7 +2356,6 @@ def solve_dispatch(
},
"load_first_enabled": om == "AUTO",
"relaxed_expensive_import": relaxed_expensive_import,
"relaxed_neg_buy_pressure": relaxed_neg_buy_pressure,
"charge_acquisition_buy_czk_kwh": charge_acquisition_czk_kwh,
"charge_acquisition_cutoff_at": (
slots[0].charge_acquisition_cutoff_at.isoformat()