implementace dynamickeho bodu T (kde se rodpojuje PV A)
Some checks failed
CI and deploy / migration-check (push) Failing after 15s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-26 13:28:31 +02:00
parent a53bcd0b81
commit 58b0a2f882
6 changed files with 310 additions and 73 deletions

View File

@@ -71,7 +71,9 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-load-first-hard-v34"
PLANNER_BUILD_TAG = "2026-05-28-neg-sell-b-ramp-v35"
# Po t_detach v prep: necpát PV do bat (měkké; tvrdý hold přes soc_target z rampy).
NEG_SELL_POST_DETACH_BCPV_DISCOURAGE_CZK_KWH = 250.0
# Před prvním sell<0: export FVE jen pokud predikce v sell<0 okně pokryje dobítí na prep cíl.
PRE_NEG_PV_EXPORT_FORECAST_MARGIN = 1.15
PRE_NEG_PV_EXPORT_MIN_NEEDED_WH = 2500.0
@@ -821,34 +823,112 @@ def _neg_sell_phases_enabled(battery: Any) -> bool:
return prep_pct < 100.0 - 1e-6 and tail_slots > 0
def _neg_sell_pv_b_charge_wh(slot: PlanningSlot, battery: Any) -> float:
"""Odhad Wh nabitelné jen z PV B v jednom sell<0 slotu (surplus nad load, cap výkonu)."""
pv_surplus_b = max(0.0, float(slot.pv_b_forecast_w) - float(slot.load_baseline_w))
if pv_surplus_b <= 500.0:
return 0.0
cap_w = min(pv_surplus_b, float(battery.max_charge_power_w))
return cap_w * INTERVAL_H * float(battery.charge_efficiency)
def _neg_sell_day_pv_b_usable_wh(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
battery: Any,
) -> float:
"""Součet B-nabíjení ve všech sell<0 slotech téhož pražského dne."""
if first_neg_sell_idx is None:
return 0.0
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
total = 0.0
for s in slots:
if _prague_calendar_date(s) != neg_day:
continue
if float(s.sell_price) >= 0.0:
continue
total += _neg_sell_pv_b_charge_wh(s, battery)
return total
def _neg_sell_e_surplus_after_t_wh(
slots: list[PlanningSlot],
t_detach: int,
last_neg: int,
battery: Any,
) -> float:
"""Integrál přebytku FVE nad load+bat cap od t_detach do last_neg (Wh)."""
total = 0.0
for t in range(t_detach, last_neg + 1):
if t < 0 or t >= len(slots):
continue
st = slots[t]
if float(st.sell_price) >= 0.0:
continue
pv_surplus = max(
0.0,
float(st.pv_a_forecast_w)
+ float(st.pv_b_forecast_w)
- float(st.load_baseline_w),
)
if pv_surplus <= 500.0:
continue
cap_charge_wh = (
min(pv_surplus, float(battery.max_charge_power_w))
* INTERVAL_H
* float(battery.charge_efficiency)
)
total += max(0.0, pv_surplus * INTERVAL_H - cap_charge_wh)
return total
def _neg_sell_day_phases(
slots: list[PlanningSlot],
battery: Any,
) -> tuple[list[str], list[Optional[float]], list[float]]:
) -> tuple[list[str], list[Optional[float]], list[float], dict[str, Any]]:
"""
Per slot: phase (none|prep|tail), soc_target_wh (None mimo sell<0 fáze), prep shortfall váha.
Fáze po kalendářním dni v Europe/Prague.
Per slot: phase (none|prep|tail), soc_target_wh (rampa z PV B, ne fixní %), shortfall váha.
V35: zpětná projekce soc_need z B od tail; t_detach = první prep kde soc_need ≤ soc_need[tail_start].
"""
t_len = len(slots)
phases: list[str] = ["none"] * t_len
soc_targets: list[Optional[float]] = [None] * t_len
shortfall_weights: list[float] = [0.0] * t_len
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
tail_n = int(getattr(battery, "planner_neg_sell_full_soc_tail_slots", 0))
prep_wh = prep_pct / 100.0 * float(battery.soc_max_wh)
soc_max = float(battery.soc_max_wh)
min_soc = float(battery.min_soc_wh)
post_detach_prep_ts: set[int] = set()
day_meta: list[dict[str, Any]] = []
by_day: dict[object, list[int]] = {}
for t, st in enumerate(slots):
if float(st.sell_price) < 0.0:
by_day.setdefault(_prague_calendar_date(st), []).append(t)
for _day, indices in by_day.items():
for day, indices in by_day.items():
if not indices:
continue
indices.sort()
last_t = indices[-1]
tail_start = max(indices[0], last_t - tail_n + 1)
tail_start = max(indices[0], last_t - tail_n + 1) if tail_n > 0 else last_t + 1
charge_b = {t: _neg_sell_pv_b_charge_wh(slots[t], battery) for t in indices}
soc_need: dict[int, float] = {last_t: soc_max}
for i in range(len(indices) - 1, 0, -1):
t_cur = indices[i]
t_prev = indices[i - 1]
soc_need[t_prev] = max(min_soc, soc_need[t_cur] - charge_b[t_cur])
soc_detach_wh = float(soc_need.get(tail_start, soc_max))
t_detach = tail_start
for t in indices:
if t >= tail_start:
continue
if soc_need[t] <= soc_detach_wh + 1e-3:
t_detach = t
break
e_surplus = _neg_sell_e_surplus_after_t_wh(slots, t_detach, last_t, battery)
for t in indices:
if t >= tail_start:
phases[t] = "tail"
@@ -857,12 +937,45 @@ def _neg_sell_day_phases(
else:
pos = t - tail_start
frac = pos / float(max(1, tail_n - 1))
soc_targets[t] = prep_wh + frac * (soc_max - prep_wh)
lo = float(soc_need.get(tail_start, soc_max))
soc_targets[t] = lo + frac * (soc_max - lo)
else:
phases[t] = "prep"
soc_targets[t] = prep_wh
soc_targets[t] = float(soc_need[t])
if t >= t_detach:
post_detach_prep_ts.add(t)
shortfall_weights[t] = float(last_t - t + 1) / float(len(indices))
return phases, soc_targets, shortfall_weights
day_meta.append(
{
"prague_date": str(day),
"first_neg_idx": indices[0],
"last_neg_idx": last_t,
"tail_start_idx": tail_start,
"t_detach_idx": t_detach,
"soc_detach_wh": soc_detach_wh,
"e_surplus_after_t_wh": e_surplus,
"soc_ramp_wh": [
{
"slot": slots[t].interval_start.isoformat(),
"soc_need_wh": float(soc_need[t]),
"phase": phases[t],
"soc_target_wh": float(soc_targets[t] or 0.0),
}
for t in indices
],
}
)
meta: dict[str, Any] = {
"neg_sell_b_ramp_v35": True,
"days": day_meta,
"post_detach_prep_ts": sorted(post_detach_prep_ts),
}
if day_meta:
meta["t_detach_idx"] = day_meta[0]["t_detach_idx"]
meta["e_surplus_after_t_wh"] = day_meta[0]["e_surplus_after_t_wh"]
return phases, soc_targets, shortfall_weights, meta
def _neg_sell_day_pv_usable_wh(
@@ -906,25 +1019,27 @@ def _pre_neg_pv_export_forecast_cushion_ok(
neg_sell_phases_en: bool,
) -> bool:
"""
Export FVE před sell<0 jen pokud forecast v záporném okně pokryje dobítí na cíl (typ. 80 %).
Export FVE před sell<0 jen pokud forecast B v sell<0 okně pokryje dobítí na soc_need z rampy.
Jinak raději nabíjet teď — riziko deště / podhodnocené FVE v sell<0.
"""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return False
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
if neg_sell_phases_en and prep_pct < 100.0 - 1e-6:
target_wh = prep_pct / 100.0 * float(battery.soc_max_wh)
if neg_sell_phases_en:
_ph, targets, _w, _meta = _neg_sell_day_phases(slots, battery)
tgt = targets[first_neg_sell_idx]
target_wh = float(tgt) if tgt is not None else float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_b_usable_wh(slots, first_neg_sell_idx, battery)
else:
target_wh = float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_sell_idx,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
needed_wh = max(0.0, target_wh - float(current_soc_wh))
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_sell_idx,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
@@ -1725,12 +1840,18 @@ def solve_dispatch(
neg_sell_phase_by_t: list[str] = ["none"] * T
neg_sell_soc_target_by_t: list[Optional[float]] = [None] * T
neg_sell_shortfall_weight_by_t: list[float] = [0.0] * T
neg_sell_day_meta: dict[str, Any] = {}
neg_sell_post_detach_prep_ts: set[int] = set()
if neg_sell_phases_en:
(
neg_sell_phase_by_t,
neg_sell_soc_target_by_t,
neg_sell_shortfall_weight_by_t,
neg_sell_day_meta,
) = _neg_sell_day_phases(slots, battery)
neg_sell_post_detach_prep_ts = set(
neg_sell_day_meta.get("post_detach_prep_ts") or []
)
prep_soc_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_bcpv_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_curtail_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
@@ -2265,6 +2386,13 @@ def solve_dispatch(
/ 1000.0
for t in pre_neg_pv_export_ts
)
+ pulp.lpSum(
bc_pv[t]
* NEG_SELL_POST_DETACH_BCPV_DISCOURAGE_CZK_KWH
* INTERVAL_H
/ 1000.0
for t in neg_sell_post_detach_prep_ts
)
+ pulp.lpSum(
sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_sell_bat_dump_shortfall
@@ -2329,18 +2457,13 @@ def solve_dispatch(
prob += us >= float(tgt_prep) - soc[t_us]
for t_us, us, tgt_wh in neg_sell_soc_underfill:
prob += us >= float(tgt_wh) - soc[t_us]
prep_wh_phases = (
float(getattr(battery, "planner_neg_sell_prep_soc_percent", 80.0))
/ 100.0
* float(battery.soc_max_wh)
if neg_sell_phases_en
else 0.0
)
m_hold_soc = float(battery.soc_max_wh)
for t_h, sf_h, cap_h in prep_hold_bcpv_shortfall:
w_h = prep_hold_met_binary[t_h]
soc_prev_h = current_soc_wh if t_h == 0 else soc[t_h - 1]
prob += soc_prev_h >= prep_wh_phases - m_hold_soc * (1 - w_h)
tgt_hold = neg_sell_soc_target_by_t[t_h]
hold_thr = float(tgt_hold) if tgt_hold is not None else float(battery.soc_max_wh)
prob += soc_prev_h >= hold_thr - m_hold_soc * (1 - w_h)
prob += sf_h >= bc_pv[t_h] - cap_h * w_h
for t_c, sf_c, cap_c in prep_hold_curtail_shortfall:
w_c = prep_hold_met_binary[t_c]
@@ -3189,6 +3312,9 @@ def solve_dispatch(
if neg_sell_soc_target_by_t[t] is not None
else None
),
"neg_sell_post_detach_prep": (
t in neg_sell_post_detach_prep_ts if neg_sell_phases_en else None
),
}
)
tgt_s = st.safety_soc_target_wh if daytime_en else None
@@ -3285,6 +3411,21 @@ def solve_dispatch(
),
},
"neg_sell_phases_enabled": bool(neg_sell_phases_en),
"neg_sell_b_ramp_v35": bool(neg_sell_phases_en),
"neg_sell_day_meta": neg_sell_day_meta if neg_sell_phases_en else None,
"t_detach_idx": (
neg_sell_day_meta.get("t_detach_idx") if neg_sell_phases_en else None
),
"e_surplus_after_t_wh": (
neg_sell_day_meta.get("e_surplus_after_t_wh")
if neg_sell_phases_en
else None
),
"neg_sell_day_pv_b_usable_wh": (
_neg_sell_day_pv_b_usable_wh(slots, first_neg_sell_idx, battery)
if first_neg_sell_idx is not None and neg_sell_phases_en
else None
),
"pre_neg_pv_export_forecast_ok": bool(pre_neg_pv_export_forecast_ok),
"pre_neg_pv_export_slots": [
slots[i].interval_start.isoformat() for i in sorted(pre_neg_pv_export_ts)