implementace co nejdrive dosazeni SOC na home-01 a umozneni plneho socu n slotu ped koncem sell < 0
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-26 08:07:00 +02:00
parent 8494ea26de
commit 91a9bef3d7
10 changed files with 566 additions and 25 deletions

View File

@@ -61,6 +61,9 @@ NEG_SELL_PV_CHARGE_REWARD_CZK_KWH = 0.8
NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH = 0.35
# Jen ventil nekontrolovatelného pole B při plné baterii a sell<0 (spot); ne celý PV přebytek.
NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH = 4.0
# Fáze sell<0 (v32): ASAP na prep_soc %, tail rampa na soc_max.
NEG_SELL_PREP_SOC_SHORTFALL_PENALTY_CZK_PER_WH = 0.85
NEG_SELL_PREP_HOLD_BCPV_PENALTY_CZK_KWH = 60.0
# Výboj baterie při sell<0 jen těsně před extrémně záporným buy (round-trip arbitráž).
EXTREME_BUY_DUMP_PREWINDOW_SLOTS = 12
NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH = 80.0
@@ -68,7 +71,7 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-morning-pv-export-priority-v31"
PLANNER_BUILD_TAG = "2026-05-28-neg-sell-soc-phases-v32"
POS_SELL_PRE_NEG_SOC_SHORTFALL_PENALTY_CZK_PER_WH = 0.30
PRE_NEG_BUY_SOC_CEILING_SLACK_PENALTY_CZK_PER_WH = 0.25
PRE_NEG_BUY_EMPTY_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
@@ -806,6 +809,57 @@ def _prague_calendar_date(slot: PlanningSlot):
return dt.astimezone(ZoneInfo("Europe/Prague")).date()
def _neg_sell_phases_enabled(battery: Any) -> bool:
# Bez atributů z DB (unit testy) = legacy; z DB default 80 % / 4 sloty (V083).
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
tail_slots = int(getattr(battery, "planner_neg_sell_full_soc_tail_slots", 0))
return prep_pct < 100.0 - 1e-6 and tail_slots > 0
def _neg_sell_day_phases(
slots: list[PlanningSlot],
battery: Any,
) -> tuple[list[str], list[Optional[float]], list[float]]:
"""
Per slot: phase (none|prep|tail), soc_target_wh (None mimo sell<0 fáze), prep shortfall váha.
Fáze po kalendářním dni v Europe/Prague.
"""
t_len = len(slots)
phases: list[str] = ["none"] * t_len
soc_targets: list[Optional[float]] = [None] * t_len
shortfall_weights: list[float] = [0.0] * t_len
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
tail_n = int(getattr(battery, "planner_neg_sell_full_soc_tail_slots", 0))
prep_wh = prep_pct / 100.0 * float(battery.soc_max_wh)
soc_max = float(battery.soc_max_wh)
by_day: dict[object, list[int]] = {}
for t, st in enumerate(slots):
if float(st.sell_price) < 0.0:
by_day.setdefault(_prague_calendar_date(st), []).append(t)
for _day, indices in by_day.items():
if not indices:
continue
indices.sort()
last_t = indices[-1]
tail_start = max(indices[0], last_t - tail_n + 1)
for t in indices:
if t >= tail_start:
phases[t] = "tail"
if tail_n <= 1:
soc_targets[t] = soc_max
else:
pos = t - tail_start
frac = pos / float(max(1, tail_n - 1))
soc_targets[t] = prep_wh + frac * (soc_max - prep_wh)
else:
phases[t] = "prep"
soc_targets[t] = prep_wh
shortfall_weights[t] = float(last_t - t + 1) / float(len(indices))
return phases, soc_targets, shortfall_weights
MORNING_PRENEG_START_HOUR = 5
MORNING_PRENEG_END_HOUR = 11
@@ -1574,6 +1628,24 @@ def solve_dispatch(
min_spread_pre = float(degradation_cost_effective)
purchase_fixed_pre = _purchase_pricing_fixed(grid)
fixed_tariff_like_pre = purchase_fixed_pre or _horizon_fixed_tariff_like(slots)
neg_sell_phases_en = (
om == "AUTO"
and not purchase_fixed_pre
and _neg_sell_phases_enabled(battery)
)
neg_sell_phase_by_t: list[str] = ["none"] * T
neg_sell_soc_target_by_t: list[Optional[float]] = [None] * T
neg_sell_shortfall_weight_by_t: list[float] = [0.0] * T
if neg_sell_phases_en:
(
neg_sell_phase_by_t,
neg_sell_soc_target_by_t,
neg_sell_shortfall_weight_by_t,
) = _neg_sell_day_phases(slots, battery)
prep_soc_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_bcpv_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_curtail_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_met_binary: dict[int, pulp.LpVariable] = {}
pre_neg_buy_discharge_ts: set[int] = set()
if om == "AUTO" and first_neg_buy_idx is not None and first_neg_buy_idx > 0:
pre_neg_buy_discharge_ts = _pre_neg_buy_discharge_indices(
@@ -1744,7 +1816,7 @@ def solve_dispatch(
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable, float]] = []
neg_buy_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_batt_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_buy_empty_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
@@ -1812,6 +1884,26 @@ def solve_dispatch(
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"pv_charge_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
if neg_sell_phases_en:
pv_charge_taken = {t_sf for t_sf, _sf, _c in pv_charge_shortfall}
for t_ns in range(T):
if neg_sell_phase_by_t[t_ns] not in ("prep", "tail"):
continue
if t_ns in pv_charge_taken:
continue
if float(slots[t_ns].sell_price) >= 0.0:
continue
pv_surplus_ns = max(
0.0,
float(slots[t_ns].pv_a_forecast_w)
+ float(slots[t_ns].pv_b_forecast_w)
- float(slots[t_ns].load_baseline_w),
)
if pv_surplus_ns <= 500:
continue
cap_ns = float(min(pv_surplus_ns, battery.max_charge_power_w))
sf_ns = pulp.LpVariable(f"neg_phase_pv_charge_{t_ns}", 0, cap_ns)
pv_charge_shortfall.append((t_ns, sf_ns, cap_ns))
for t in range(T):
if not post_neg_pv_topup[t]:
continue
@@ -1828,7 +1920,48 @@ def solve_dispatch(
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"post_neg_pv_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
if len(neg_buy_slot_indices_pre) >= 2:
if neg_sell_phases_en:
for t_ns in range(T):
phase_ns = neg_sell_phase_by_t[t_ns]
tgt_ns = neg_sell_soc_target_by_t[t_ns]
if phase_ns == "none" or tgt_ns is None:
continue
us_prep = pulp.LpVariable(
f"neg_sell_prep_soc_{t_ns}",
0,
float(battery.usable_capacity_wh),
)
w_sf = float(neg_sell_shortfall_weight_by_t[t_ns])
prep_soc_shortfall.append((t_ns, us_prep, w_sf))
tail_last_by_day: dict[object, int] = {}
for t_ln, st_ln in enumerate(slots):
if neg_sell_phase_by_t[t_ln] != "tail":
continue
tail_last_by_day[_prague_calendar_date(st_ln)] = t_ln
for t_tail_last in tail_last_by_day.values():
if t_tail_last in charge_slots or relaxed_neg_buy_charge:
us_tail = pulp.LpVariable(
f"neg_sell_tail_soc_{t_tail_last}",
0,
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append(
(t_tail_last, us_tail, float(battery.soc_max_wh))
)
for t_ph in range(T):
if neg_sell_phase_by_t[t_ph] != "prep":
continue
cap_bc = float(battery.max_charge_power_w)
prep_hold_met_binary[t_ph] = pulp.LpVariable(
f"prep_hold_met_{t_ph}",
cat=pulp.LpBinary,
)
sf_hold = pulp.LpVariable(f"prep_hold_bcpv_{t_ph}", 0, cap_bc)
prep_hold_bcpv_shortfall.append((t_ph, sf_hold, cap_bc))
cap_ca = float(max(0, slots[t_ph].pv_a_forecast_w))
sf_ca = pulp.LpVariable(f"prep_hold_curtail_{t_ph}", 0, cap_ca)
prep_hold_curtail_shortfall.append((t_ph, sf_ca, cap_ca))
elif len(neg_buy_slot_indices_pre) >= 2:
t_nb_last = max(neg_buy_slot_indices_pre)
if t_nb_last in charge_slots or relaxed_neg_buy_charge:
us = pulp.LpVariable(
@@ -1836,7 +1969,9 @@ def solve_dispatch(
0,
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append((t_nb_last, us))
neg_sell_soc_underfill.append(
(t_nb_last, us, float(battery.soc_max_wh))
)
for t in range(T):
if first_neg_buy_idx is None or t >= first_neg_buy_idx:
continue
@@ -1904,7 +2039,17 @@ def solve_dispatch(
)
+ (
ge_pv[t]
* NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH
* (
max(
0.05,
-float(slots[t].sell_price),
)
if (
neg_sell_phases_en
and neg_sell_phase_by_t[t] == "tail"
)
else NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH
)
* INTERVAL_H
/ 1000
if (
@@ -1926,6 +2071,9 @@ def solve_dispatch(
om == "AUTO"
and float(slots[t].buy_price) < 0.0
and t in charge_slots
and not (
neg_sell_phases_en and neg_sell_phase_by_t[t] == "prep"
)
)
else CURTAILMENT_PENALTY
)
@@ -1960,7 +2108,19 @@ def solve_dispatch(
)
+ pulp.lpSum(
us * NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH
for _t, us in neg_sell_soc_underfill
for _t, us, _tgt in neg_sell_soc_underfill
)
+ pulp.lpSum(
us * w_sf * NEG_SELL_PREP_SOC_SHORTFALL_PENALTY_CZK_PER_WH
for _t, us, w_sf in prep_soc_shortfall
)
+ pulp.lpSum(
sf * NEG_SELL_PREP_HOLD_BCPV_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in prep_hold_bcpv_shortfall
)
+ pulp.lpSum(
sf * NEG_SELL_CURTAIL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in prep_hold_curtail_shortfall
)
+ pulp.lpSum(
sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
@@ -2020,8 +2180,28 @@ def solve_dispatch(
prob += sf >= cap_w - bc_pv[t_sf]
for t_sf, sf, cap_w in neg_sell_bat_dump_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_us, us in neg_sell_soc_underfill:
prob += us >= float(battery.soc_max_wh) - soc[t_us]
for t_us, us, _w_sf in prep_soc_shortfall:
tgt_prep = neg_sell_soc_target_by_t[t_us]
if tgt_prep is not None:
prob += us >= float(tgt_prep) - soc[t_us]
for t_us, us, tgt_wh in neg_sell_soc_underfill:
prob += us >= float(tgt_wh) - soc[t_us]
prep_wh_phases = (
float(getattr(battery, "planner_neg_sell_prep_soc_percent", 80.0))
/ 100.0
* float(battery.soc_max_wh)
if neg_sell_phases_en
else 0.0
)
m_hold_soc = float(battery.soc_max_wh)
for t_h, sf_h, cap_h in prep_hold_bcpv_shortfall:
w_h = prep_hold_met_binary[t_h]
soc_prev_h = current_soc_wh if t_h == 0 else soc[t_h - 1]
prob += soc_prev_h >= prep_wh_phases - m_hold_soc * (1 - w_h)
prob += sf_h >= bc_pv[t_h] - cap_h * w_h
for t_c, sf_c, cap_c in prep_hold_curtail_shortfall:
w_c = prep_hold_met_binary[t_c]
prob += sf_c >= ca[t_c] - cap_c * (1 - w_c)
for t_sf, sf, cap_w in neg_buy_charge_shortfall:
# buy<0: bc_pv=0 (import arbitráž); shortfall jen na grid→bat.
prob += sf >= cap_w - bc_gi[t_sf]
@@ -2171,10 +2351,17 @@ def solve_dispatch(
if sv is not None:
eff_tgt_s = float(tgt_s) if tgt_s is not None else float(min_soc_wh)
if (
neg_sell_phases_en
and float(s.sell_price) < 0.0
and neg_sell_soc_target_by_t[t] is not None
):
eff_tgt_s = max(eff_tgt_s, float(neg_sell_soc_target_by_t[t]))
elif (
om == "AUTO"
and float(s.buy_price) < 0.0
and t in charge_slots
and len(neg_buy_slot_indices_pre) >= 2
and not neg_sell_phases_en
):
# buy<0: cíl soc_max jen při víceslotovém okně (jinak fyzicky neřešitelné).
eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh))
@@ -2267,11 +2454,22 @@ def solve_dispatch(
prob += ge_pv[t] == 0
elif not purchase_fixed_pre:
# Spot: sell<0 před buy<0 — PV (A) do baterie, B může jít do sítě (ge_pv≤pv_b).
# Po buy<0 / mimo ranní pásmo: ventil B jen při plné baterii.
# Po buy<0 / mimo ranní pásmo: ventil B jen při plné baterii (nebo tail + sell práh).
before_first_neg_buy = (
first_neg_buy_idx is not None and t < first_neg_buy_idx
)
if before_first_neg_buy:
vent_min_sell = getattr(
battery, "planner_neg_sell_vent_min_sell_czk_kwh", None
)
tail_free_vent = bool(
neg_sell_phases_en
and neg_sell_phase_by_t[t] == "tail"
and vent_min_sell is not None
and float(s.sell_price) >= float(vent_min_sell)
)
if tail_free_vent and float(s.pv_b_forecast_w) > 0:
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
elif before_first_neg_buy:
if float(s.pv_b_forecast_w) > 0:
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
else:
@@ -2752,12 +2950,19 @@ def solve_dispatch(
* INTERVAL_H
/ 1000.0
)
for _tt, _us in neg_sell_soc_underfill:
for _tt, _us, _tgt in neg_sell_soc_underfill:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_us) or 0.0)
* NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH
)
for _tt, _us, _w in prep_soc_shortfall:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_us) or 0.0)
* float(_w)
* NEG_SELL_PREP_SOC_SHORTFALL_PENALTY_CZK_PER_WH
)
sv_t = safety_vars[t]
if sv_t is not None:
penalty_terms_t += float(pulp.value(sv_t) or 0.0) * safety_pen_czk_per_wh[t]
@@ -2817,6 +3022,12 @@ def solve_dispatch(
"slot": st.interval_start.isoformat(),
"allow_charge": bool(st.allow_charge),
"allow_discharge_export": bool(st.allow_discharge_export),
"neg_sell_phase": neg_sell_phase_by_t[t] if neg_sell_phases_en else None,
"neg_sell_soc_target_wh": (
float(neg_sell_soc_target_by_t[t])
if neg_sell_soc_target_by_t[t] is not None
else None
),
}
)
tgt_s = st.safety_soc_target_wh if daytime_en else None
@@ -2902,7 +3113,17 @@ def solve_dispatch(
"planner_terminal_soc_value_factor": float(battery.planner_terminal_soc_value_factor),
"planner_daytime_charge_target_enabled": daytime_en,
"planner_charge_commitment_penalty_czk_kwh": float(commit_pen),
"planner_neg_sell_prep_soc_percent": float(
getattr(battery, "planner_neg_sell_prep_soc_percent", 80.0)
),
"planner_neg_sell_full_soc_tail_slots": int(
getattr(battery, "planner_neg_sell_full_soc_tail_slots", 4)
),
"planner_neg_sell_vent_min_sell_czk_kwh": getattr(
battery, "planner_neg_sell_vent_min_sell_czk_kwh", None
),
},
"neg_sell_phases_enabled": bool(neg_sell_phases_en),
"load_first_enabled": om == "AUTO",
"relaxed_expensive_import": relaxed_expensive_import,
"charge_acquisition_buy_czk_kwh": charge_acquisition_czk_kwh,
@@ -3368,6 +3589,17 @@ async def _load_site_context(site_id: int, db):
planner_charge_commitment_penalty_czk_kwh=float(
b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20
),
planner_neg_sell_prep_soc_percent=float(
b.get("planner_neg_sell_prep_soc_percent") or 80.0
),
planner_neg_sell_full_soc_tail_slots=int(
b.get("planner_neg_sell_full_soc_tail_slots") or 4
),
planner_neg_sell_vent_min_sell_czk_kwh=(
float(b["planner_neg_sell_vent_min_sell_czk_kwh"])
if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None
else None
),
)
hpj = ctx["heat_pump"]