uprava PV omeznovani
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-25 11:08:01 +02:00
parent f1a4dbd7e7
commit e06f76b9ff
8 changed files with 439 additions and 91 deletions

View File

@@ -68,7 +68,12 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-evening-push-dynamic-budget-v24"
PLANNER_BUILD_TAG = "2026-05-28-pre-neg-buy-soc-phases-v25"
POS_SELL_PRE_NEG_SOC_SHORTFALL_PENALTY_CZK_PER_WH = 0.30
PRE_NEG_BUY_SOC_CEILING_SLACK_PENALTY_CZK_PER_WH = 0.25
PRE_NEG_BUY_EMPTY_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
# buy<0: preferovat import před PV A→bat (měkké; tvrdé bc_pv=0 láme bilanci s polem B).
PRE_NEG_BUY_PV_CHARGE_PENALTY_CZK_KWH = 250.0
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
@@ -1003,6 +1008,99 @@ def _evening_battery_export_push_indices(
return sorted(out)
def _last_non_negative_sell_before_neg_buy(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
) -> int | None:
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return None
candidates = [
i for i in range(first_neg_buy_idx) if float(slots[i].sell_price) >= 0.0
]
return max(candidates) if candidates else None
def _positive_sell_pre_neg_buy_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
) -> list[int]:
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return []
return [
t
for t in range(first_neg_buy_idx)
if float(slots[t].sell_price) >= 0.0
]
def _pre_neg_buy_empty_discharge_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
last_pos_sell_idx: int | None,
) -> list[int]:
"""Sloty mezi posledním sell≥0 a prvním buy<0 — vyprázdnit před levným importem."""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return []
if last_pos_sell_idx is None:
return []
start = last_pos_sell_idx + 1
end = first_neg_buy_idx - 1
if start > end:
return []
return list(range(start, end + 1))
def _pre_neg_buy_soc_ceiling_wh(
slots: list[PlanningSlot],
*,
first_neg_buy_idx: int | None,
min_soc_wh: float,
soc_max_wh: float,
max_charge_w: float,
charge_eff: float,
evening_start_hour: int = 17,
) -> float | None:
"""
Horní SoC těsně před prvním buy<0: pod soc_max musí vejít import v buy<0,
PV B v tom okně a rezerva na odpolední sell<0 (stejný den, před večerem).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return None
per_slot_chg = max(0.0, float(max_charge_w) * float(charge_eff) * INTERVAL_H)
neg_buy_ts = [t for t, s in enumerate(slots) if float(s.buy_price) < 0.0]
if not neg_buy_ts:
return None
last_neg_buy = max(neg_buy_ts)
neg_day = _prague_calendar_date(slots[first_neg_buy_idx])
grid_wh = len(neg_buy_ts) * per_slot_chg
pv_b_wh = 0.0
for t in neg_buy_ts:
s = slots[t]
sur = max(
0.0,
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - float(s.load_baseline_w),
)
pv_b_wh += min(sur, float(max_charge_w)) * float(charge_eff) * INTERVAL_H
post_wh = 0.0
for t in range(last_neg_buy + 1, len(slots)):
s = slots[t]
if _prague_calendar_date(s) != neg_day:
continue
if float(s.buy_price) < 0.0:
continue
if float(s.sell_price) >= 0.0:
break
if _prague_hour(s) >= evening_start_hour:
break
sur = max(0.0, float(s.pv_b_forecast_w) - float(s.load_baseline_w) * 0.25)
post_wh += min(sur, float(max_charge_w)) * float(charge_eff) * INTERVAL_H
buffer_wh = max(per_slot_chg * 2.0, 3000.0)
needed = grid_wh + pv_b_wh + post_wh + buffer_wh
ceiling = float(soc_max_wh) - needed
floor = float(min_soc_wh) + max(per_slot_chg, 1000.0)
return max(floor, min(float(soc_max_wh) - per_slot_chg, ceiling))
def _planner_soc_for_solver(
current_soc_wh: float,
battery,
@@ -1243,8 +1341,6 @@ def solve_dispatch(
ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)]
hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)]
soc_deficit_24h = pulp.LpVariable("soc_deficit_24h", 0, battery.usable_capacity_wh)
soc_anchor_slack = None
t_anchor = None
# GEN port cut-off (BA81): binární proměnná pouze pokud je feature povolená v konfiguraci site/invertoru.
gen_cutoff_enabled = bool(getattr(grid, "deye_gen_microinverter_cutoff_enabled", False))
@@ -1338,6 +1434,9 @@ def solve_dispatch(
(t for t, s in enumerate(slots) if float(s.buy_price) < 0.0),
None,
)
neg_buy_slot_indices_pre = [
t for t, s in enumerate(slots) if float(s.buy_price) < 0.0
]
last_neg_sell_by_prague_date: dict[object, int] = {}
for t_ln, st_ln in enumerate(slots):
if float(st_ln.sell_price) < 0:
@@ -1392,16 +1491,43 @@ def solve_dispatch(
fixed_tariff=fixed_tariff_like_pre,
):
profitable_export_ts_pre.add(_t)
if first_neg_sell_idx is not None and first_neg_sell_idx > 0 and floor_pct is not None:
# Kotva na ranním peaku (ne na posledním slotu před sell<0) — jinak dump až v 07:30.
if (
t_pre_neg_peak is not None
and t_pre_neg_peak < first_neg_sell_idx - 1
):
t_anchor = t_pre_neg_peak
else:
t_anchor = first_neg_sell_idx - 1
soc_anchor_slack = pulp.LpVariable("soc_anchor_slack_wh", 0, float(battery.usable_capacity_wh))
last_pos_sell_pre_neg_buy = _last_non_negative_sell_before_neg_buy(
slots, first_neg_buy_idx
)
pos_sell_pre_neg_buy_ts = _positive_sell_pre_neg_buy_indices(
slots, first_neg_buy_idx
)
pre_neg_buy_empty_ts = _pre_neg_buy_empty_discharge_indices(
slots, first_neg_buy_idx, last_pos_sell_pre_neg_buy
)
pre_neg_buy_soc_ceiling_wh = _pre_neg_buy_soc_ceiling_wh(
slots,
first_neg_buy_idx=first_neg_buy_idx,
min_soc_wh=float(min_soc_wh),
soc_max_wh=float(battery.soc_max_wh),
max_charge_w=float(battery.max_charge_power_w),
charge_eff=float(battery.charge_efficiency),
)
t_pre_neg_buy_anchor: int | None = (
first_neg_buy_idx - 1 if first_neg_buy_idx is not None and first_neg_buy_idx > 0 else None
)
soc_pre_neg_buy_ceiling_slack: pulp.LpVariable | None = None
if (
t_pre_neg_buy_anchor is not None
and pre_neg_buy_soc_ceiling_wh is not None
):
soc_pre_neg_buy_ceiling_slack = pulp.LpVariable(
"soc_pre_neg_buy_ceiling_slack_wh",
0,
float(battery.usable_capacity_wh),
)
pos_sell_soc_shortfall: pulp.LpVariable | None = None
if last_pos_sell_pre_neg_buy is not None:
pos_sell_soc_shortfall = pulp.LpVariable(
"pos_sell_pre_neg_soc_shortfall_wh",
0,
float(battery.usable_capacity_wh),
)
daytime_en = bool(getattr(battery, "planner_daytime_charge_target_enabled", True))
safety_pen_czk_per_wh: list[float] = []
@@ -1466,10 +1592,12 @@ def solve_dispatch(
peak_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable]] = []
neg_buy_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_batt_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_buy_empty_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
fixed_tariff_like = fixed_tariff_like_pre
block_export_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False))
if om == "AUTO":
@@ -1493,12 +1621,15 @@ def solve_dispatch(
for t_pnd in sorted(pre_neg_buy_discharge_ts):
sf_pnd = pulp.LpVariable(f"pre_neg_bat_export_sf_{t_pnd}", 0, export_cap_w)
pre_neg_batt_export_shortfall.append((t_pnd, sf_pnd, export_cap_w))
for t_empty in pre_neg_buy_empty_ts:
sf_e = pulp.LpVariable(f"pre_neg_buy_empty_sf_{t_empty}", 0, export_cap_w)
pre_neg_buy_empty_shortfall.append((t_empty, sf_e, export_cap_w))
if not relaxed_neg_buy_charge:
neg_buy_slot_indices = [
t for t, s in enumerate(slots) if float(s.buy_price) < 0.0
]
if neg_buy_slot_indices:
t_nb_last = max(neg_buy_slot_indices)
t_nb_last = max(neg_buy_slot_indices_pre)
cap_w = float(battery.max_charge_power_w)
sf_nb = pulp.LpVariable(f"neg_buy_charge_sf_{t_nb_last}", 0, cap_w)
neg_buy_charge_shortfall.append((t_nb_last, sf_nb, cap_w))
@@ -1539,15 +1670,22 @@ def solve_dispatch(
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"post_neg_pv_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
if len(neg_buy_slot_indices_pre) >= 2:
t_nb_last = max(neg_buy_slot_indices_pre)
if t_nb_last in charge_slots or relaxed_neg_buy_charge:
us = pulp.LpVariable(
f"neg_buy_soc_under_{t_nb_last}",
0,
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append((t_nb_last, us))
for t in range(T):
if float(slots[t].sell_price) >= 0:
if first_neg_buy_idx is None or t >= first_neg_buy_idx:
continue
if float(slots[t].sell_price) >= 0.0:
continue
if float(slots[t].buy_price) < 0.0:
continue
if t not in charge_slots:
continue
if first_neg_buy_idx is not None and t < first_neg_buy_idx:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
@@ -1556,12 +1694,9 @@ def solve_dispatch(
)
if pv_surplus_w <= 500:
continue
us = pulp.LpVariable(
f"neg_soc_under_{t}",
0,
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append((t, us))
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_m = pulp.LpVariable(f"pre_neg_pv_charge_sf_{t}", 0, cap_w)
pre_neg_pv_charge_shortfall.append((t, sf_m, cap_w))
for t in neg_sell_bat_dump_slots:
dump_target_w = _battery_export_cap_w(battery, grid)
sf_dump = pulp.LpVariable(f"neg_bat_dump_shortfall_{t}", 0, dump_target_w)
@@ -1631,26 +1766,24 @@ def solve_dispatch(
NEG_SELL_CURTAIL_PENALTY_CZK_KWH
if (
om == "AUTO"
and float(slots[t].sell_price) < 0.0
and float(slots[t].buy_price) < 0.0
and t in charge_slots
)
else (
0.0
if (
has_pv_b
and future_neg_buy_from[t]
and float(slots[t].sell_price) < 0.0
)
else CURTAILMENT_PENALTY
)
else CURTAILMENT_PENALTY
)
for t in range(T)
)
+ soc_deficit_24h * soc_deficit_penalty_czk_kwh / 1000
- terminal_soc_kcz_per_wh * soc[T - 1]
+ (
soc_anchor_slack * PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH
if soc_anchor_slack is not None
pos_sell_soc_shortfall * POS_SELL_PRE_NEG_SOC_SHORTFALL_PENALTY_CZK_PER_WH
if pos_sell_soc_shortfall is not None
else 0
)
+ (
soc_pre_neg_buy_ceiling_slack
* PRE_NEG_BUY_SOC_CEILING_SLACK_PENALTY_CZK_PER_WH
if soc_pre_neg_buy_ceiling_slack is not None
else 0
)
+ pulp.lpSum(
@@ -1684,7 +1817,7 @@ def solve_dispatch(
for _t, sf, _cap in pre_neg_batt_export_shortfall
)
+ pulp.lpSum(
(bc_pv[t] + bc_gi[t])
bc_gi[t]
* PRE_NEG_CHARGE_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
@@ -1695,6 +1828,22 @@ def solve_dispatch(
and float(slots[t].buy_price) >= 0.0
)
)
+ pulp.lpSum(
bc_pv[t]
* PRE_NEG_BUY_PV_CHARGE_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
for t in range(T)
if float(slots[t].buy_price) < 0.0
)
+ pulp.lpSum(
sf * PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_pv_charge_shortfall
)
+ pulp.lpSum(
sf * PRE_NEG_BUY_EMPTY_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_buy_empty_shortfall
)
+ pulp.lpSum(
-25.0 * z_export[t]
for t in range(T)
@@ -1712,9 +1861,14 @@ def solve_dispatch(
for t_us, us in neg_sell_soc_underfill:
prob += us >= float(battery.soc_max_wh) - soc[t_us]
for t_sf, sf, cap_w in neg_buy_charge_shortfall:
prob += sf >= cap_w - (bc_gi[t_sf] + bc_pv[t_sf])
# buy<0: bc_pv=0 (import arbitráž); shortfall jen na grid→bat.
prob += sf >= cap_w - bc_gi[t_sf]
for t_sf, sf, cap_w in pre_neg_batt_export_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_sf, sf, cap_w in pre_neg_buy_empty_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_sf, sf, cap_w in pre_neg_pv_charge_shortfall:
prob += sf >= cap_w - bc_pv[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
@@ -1735,6 +1889,9 @@ def solve_dispatch(
prob += ge_bat[t_peak] >= export_push_w * z_export[t_peak]
for t_pnd in pre_neg_buy_discharge_ts:
prob += ge_bat[t_pnd] >= export_push_w * z_export[t_pnd]
for t_empty in pre_neg_buy_empty_ts:
if t_empty in discharge_export_slots:
prob += ge_bat[t_empty] >= export_push_w * z_export[t_empty]
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
evening_push_ts = _evening_battery_export_push_indices(
slots,
@@ -1751,9 +1908,24 @@ def solve_dispatch(
continue
prob += ge_bat[t_peak] >= export_push_w * z_export[t_peak]
# Ostatní profitable sloty: shortfall penalizace (ne tvrdý push na celý horizont).
if t_anchor is not None and soc_anchor_slack is not None:
target_floor_wh = float(planner_floor_effective_wh)
prob += soc[t_anchor] <= target_floor_wh + soc_anchor_slack
if (
last_pos_sell_pre_neg_buy is not None
and pos_sell_soc_shortfall is not None
):
prob += (
soc[last_pos_sell_pre_neg_buy]
>= float(battery.soc_max_wh) - pos_sell_soc_shortfall
)
if (
t_pre_neg_buy_anchor is not None
and pre_neg_buy_soc_ceiling_wh is not None
and soc_pre_neg_buy_ceiling_slack is not None
and last_pos_sell_pre_neg_buy is not None
):
prob += (
soc[t_pre_neg_buy_anchor]
<= float(pre_neg_buy_soc_ceiling_wh) + soc_pre_neg_buy_ceiling_slack
)
for t in range(T):
s = slots[t]
@@ -1832,11 +2004,11 @@ def solve_dispatch(
eff_tgt_s = float(tgt_s) if tgt_s is not None else float(min_soc_wh)
if (
om == "AUTO"
and float(s.sell_price) < 0.0
and float(s.buy_price) < 0.0
and t in charge_slots
and (first_neg_buy_idx is None or t >= first_neg_buy_idx)
and len(neg_buy_slot_indices_pre) >= 2
):
# Záporný výkup: dobít na planner soc_max (typicky 95100 %), ne jen SQL safety ~50 %.
# buy<0: cíl soc_max jen při víceslotovém okně (jinak fyzicky neřešitelné).
eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh))
elif post_neg_pv_topup[t]:
# Po konci sell<0: dobit z FVE na plno, pak teprve export (kladný sell, ne večerní peak).
@@ -1870,6 +2042,8 @@ def solve_dispatch(
prob += ge[t] == 0
prob += ge_pv[t] == 0
prob += ge_bat[t] == 0
# PV A: měkký tlak curtail (NEG_SELL_CURTAIL při buy<0), ne tvrdé bc_pv=0
# (s polem B a bilancí může být bc_pv=0 nutné pro řešitelnost krátkých okének).
# Záporný prodej (sell < 0): výboj baterie jen před extrémně záporným buy (v11).
# Export FVE při sell<0: spot = nabíjení/curtail A; ventil jen pole B při plné baterii.
@@ -1924,13 +2098,14 @@ def solve_dispatch(
prob += ge[t] == 0
prob += ge_pv[t] == 0
elif not purchase_fixed_pre:
# Spot (home-01): ge_pv=0 dokud není plná baterie; pak jen ventil pole B (ne celý surplus).
# Před buy<0 + bc_pv=0: přebytek pole B musí jít do sítě (ge_pv≤pv_b), jinak Infeasible.
# Spot: sell<0 před buy<0 — PV (A) do baterie, B může jít do sítě (ge_pv≤pv_b).
# Po buy<0 / mimo ranní pásmo: ventil B jen při plné baterii.
before_first_neg_buy = (
first_neg_buy_idx is not None and t < first_neg_buy_idx
)
if before_first_neg_buy and float(s.pv_b_forecast_w) > 0:
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
if before_first_neg_buy:
if float(s.pv_b_forecast_w) > 0:
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
else:
soc_prev_neg = current_soc_wh if t == 0 else soc[t - 1]
w_pv_b_vent = pulp.LpVariable(
@@ -2005,6 +2180,8 @@ def solve_dispatch(
export_soc_floor_t = float(planner_floor_effective_wh)
elif om == "AUTO" and t in pre_neg_buy_discharge_ts:
export_soc_floor_t = float(min_soc_wh)
elif om == "AUTO" and t in pre_neg_buy_empty_ts:
export_soc_floor_t = float(min_soc_wh)
elif (
om == "AUTO"
and t in morning_pre_neg_export_ts
@@ -2093,23 +2270,29 @@ def solve_dispatch(
and float(s.buy_price) >= 0.0
):
prob += bc_gi[t] == 0
before_neg_buy = (
first_neg_buy_idx is not None and t < first_neg_buy_idx
)
if before_neg_buy and sell_t_pre < 0.0 and pv_surplus_w > 0:
# Ranní sell<0 před buy<0: PV do sítě/curtail, ne do baterie (kapacita na import).
prob += bc_pv[t] == 0
if float(s.buy_price) < 0.0:
pass
elif (
first_neg_buy_idx is not None
and first_neg_buy_idx > 0
and t in pos_sell_pre_neg_buy_ts
):
prob += ge[t] == 0
prob += ge_pv[t] == 0
prob += ge_bat[t] == 0
elif t not in charge_slots:
if float(s.buy_price) >= 0.0:
prob += bc_gi[t] == 0
if pv_surplus_w <= 0:
prob += bc_pv[t] == 0
else:
prob += bc_pv[t] <= float(pv_surplus_w)
if float(s.buy_price) >= 0.0:
if pv_surplus_w <= 0:
prob += bc_pv[t] == 0
else:
prob += bc_pv[t] <= float(pv_surplus_w)
if (
t not in discharge_export_slots
and t not in neg_sell_bat_dump_slots
and t not in pre_neg_buy_discharge_ts
and t not in pre_neg_buy_empty_ts
):
prob += ge_bat[t] == 0
prob += z_export[t] == 0
@@ -2159,6 +2342,10 @@ def solve_dispatch(
and sell_t < 0
and buy_t >= 0.0
and not purchase_fixed_pre
and (
first_neg_buy_idx is None
or t < first_neg_buy_idx
)
) or (
# KV1: plná baterie + kladný sell — neblokovat ge_pv==0 (jinak masivní curtail).
getattr(grid, "block_export_on_negative_sell", False)