oprava ranniho nabijeni a oprava bodu T
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-26 14:09:19 +02:00
parent 58b0a2f882
commit d1ba864fc0
6 changed files with 415 additions and 76 deletions

View File

@@ -71,9 +71,11 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-neg-sell-b-ramp-v35"
PLANNER_BUILD_TAG = "2026-05-28-neg-prep-window-v36"
# Po t_detach v prep: necpát PV do bat (měkké; tvrdý hold přes soc_target z rampy).
NEG_SELL_POST_DETACH_BCPV_DISCOURAGE_CZK_KWH = 250.0
# Večer před neg dnem: výboj směrem k soc_need na začátku zítřejšího sell<0 okna.
NEG_EVENING_PREP_DISCHARGE_SHORTFALL_PENALTY_CZK_KWH = 70.0
# Před prvním sell<0: export FVE jen pokud predikce v sell<0 okně pokryje dobítí na prep cíl.
PRE_NEG_PV_EXPORT_FORECAST_MARGIN = 1.15
PRE_NEG_PV_EXPORT_MIN_NEEDED_WH = 2500.0
@@ -823,6 +825,59 @@ def _neg_sell_phases_enabled(battery: Any) -> bool:
return prep_pct < 100.0 - 1e-6 and tail_slots > 0
def _neg_sell_indices_by_prague_day(
slots: list[PlanningSlot],
) -> dict[object, list[int]]:
by_day: dict[object, list[int]] = {}
for t, st in enumerate(slots):
if float(st.sell_price) < 0.0:
by_day.setdefault(_prague_calendar_date(st), []).append(t)
for day in by_day:
by_day[day].sort()
return by_day
def _neg_sell_t_detach_index(
indices: list[int],
charge_b: dict[int, float],
soc_need: dict[int, float],
tail_start: int,
soc_max: float,
*,
margin: float = 1.05,
min_gap_wh: float = 500.0,
detach_soc_frac: float = 0.85,
) -> int:
"""
Bod T: první prep slot, kde (1) soc_need[t] ≥ detach_soc_frac × soc_max a
(2) zbývající B-nabití od t do konce pokryje mezeru do 100 %.
Dřívější chyba: soc_need[t] ≤ soc_need[tail_start] platilo hned na začátku okna.
"""
if not indices:
return 0
suffix_from: dict[int, float] = {}
run = 0.0
for t in reversed(indices):
run += float(charge_b.get(t, 0.0))
suffix_from[t] = run
thresh_wh = max(
soc_max * detach_soc_frac,
float(soc_need.get(tail_start, soc_max)) * 0.92,
)
for t in indices:
if t >= tail_start:
continue
need_t = float(soc_need.get(t, soc_max))
if need_t < thresh_wh:
continue
gap_rem = soc_max - need_t
if gap_rem <= min_gap_wh:
return t
if suffix_from.get(t, 0.0) >= gap_rem * margin:
return t
return tail_start
def _neg_sell_pv_b_charge_wh(slot: PlanningSlot, battery: Any) -> float:
"""Odhad Wh nabitelné jen z PV B v jednom sell<0 slotu (surplus nad load, cap výkonu)."""
pv_surplus_b = max(0.0, float(slot.pv_b_forecast_w) - float(slot.load_baseline_w))
@@ -888,7 +943,8 @@ def _neg_sell_day_phases(
) -> tuple[list[str], list[Optional[float]], list[float], dict[str, Any]]:
"""
Per slot: phase (none|prep|tail), soc_target_wh (rampa z PV B, ne fixní %), shortfall váha.
V35: zpětná projekce soc_need z B od tail; t_detach = první prep kde soc_need ≤ soc_need[tail_start].
V35: zpětná projekce soc_need z B od tail.
V36: t_detach = první prep slot kde suffix B-nabití pokryje (soc_max soc_need[t]).
"""
t_len = len(slots)
phases: list[str] = ["none"] * t_len
@@ -918,14 +974,14 @@ def _neg_sell_day_phases(
t_prev = indices[i - 1]
soc_need[t_prev] = max(min_soc, soc_need[t_cur] - charge_b[t_cur])
soc_detach_wh = float(soc_need.get(tail_start, soc_max))
t_detach = tail_start
for t in indices:
if t >= tail_start:
continue
if soc_need[t] <= soc_detach_wh + 1e-3:
t_detach = t
break
t_detach = _neg_sell_t_detach_index(
indices,
charge_b,
soc_need,
tail_start,
soc_max,
)
soc_detach_wh = float(soc_need.get(t_detach, soc_max))
e_surplus = _neg_sell_e_surplus_after_t_wh(slots, t_detach, last_t, battery)
@@ -969,6 +1025,7 @@ def _neg_sell_day_phases(
meta: dict[str, Any] = {
"neg_sell_b_ramp_v35": True,
"neg_sell_prep_window_v36": True,
"days": day_meta,
"post_detach_prep_ts": sorted(post_detach_prep_ts),
}
@@ -1010,6 +1067,43 @@ def _neg_sell_day_pv_usable_wh(
return total_wh
def _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots: list[PlanningSlot],
battery: Any,
first_neg_t: int,
soc_at_day_start_wh: float,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> bool:
"""
Cushion pro jeden pražský den: usable A+B v sell<0 okně pokryje dobítí na soc_need[first_neg].
"""
if first_neg_t < 0 or first_neg_t >= len(slots):
return False
if neg_sell_phases_en and soc_target_by_t is not None:
tgt = soc_target_by_t[first_neg_t]
target_wh = float(tgt) if tgt is not None else float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_t,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
else:
target_wh = float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_t,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
needed_wh = max(0.0, target_wh - float(soc_at_day_start_wh))
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
def _pre_neg_pv_export_forecast_cushion_ok(
slots: list[PlanningSlot],
battery: Any,
@@ -1017,30 +1111,90 @@ def _pre_neg_pv_export_forecast_cushion_ok(
first_neg_sell_idx: int | None,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> bool:
"""
Export FVE před sell<0 jen pokud forecast B v sell<0 okně pokryje dobítí na soc_need z rampy.
Jinak raději nabíjet teď — riziko deště / podhodnocené FVE v sell<0.
"""
"""Zpětná kompatibilita: cushion pro první sell<0 v horizontu."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return False
if neg_sell_phases_en:
targets = soc_target_by_t
if neg_sell_phases_en and targets is None:
_ph, targets, _w, _meta = _neg_sell_day_phases(slots, battery)
tgt = targets[first_neg_sell_idx]
target_wh = float(tgt) if tgt is not None else float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_b_usable_wh(slots, first_neg_sell_idx, battery)
else:
target_wh = float(battery.soc_max_wh)
usable_wh = _neg_sell_day_pv_usable_wh(
return _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots,
battery,
first_neg_sell_idx,
current_soc_wh,
neg_sell_phases_en=neg_sell_phases_en,
soc_target_by_t=targets,
)
def _pre_neg_pv_export_slot_indices_for_day(
slots: list[PlanningSlot],
first_neg_t: int,
first_neg_buy_idx: int | None,
) -> set[int]:
"""Kladný sell téhož dne před prvním sell<0, PV přebytek."""
if first_neg_t <= 0:
return set()
neg_day = _prague_calendar_date(slots[first_neg_t])
out: set[int] = set()
for t in range(first_neg_t):
if _prague_calendar_date(slots[t]) != neg_day:
continue
if float(slots[t].sell_price) < 0.0:
continue
if first_neg_buy_idx is not None and t >= first_neg_buy_idx:
continue
if _slot_pv_surplus_w(slots[t]) <= NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W:
continue
out.add(t)
return out
def _pre_neg_pv_export_bundle(
slots: list[PlanningSlot],
battery: Any,
current_soc_wh: float,
first_neg_buy_idx: int | None,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> tuple[set[int], dict[str, bool]]:
"""
v36: pre-neg export per pražský den s vlastním cushion (A+B v neg okně dne).
"""
by_day = _neg_sell_indices_by_prague_day(slots)
export_ts: set[int] = set()
cushion_by_day: dict[str, bool] = {}
soc_est = float(current_soc_wh)
for day in sorted(by_day.keys()):
indices = by_day[day]
if not indices:
continue
first_t = indices[0]
ok = _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots,
first_neg_sell_idx,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
battery,
first_t,
soc_est,
neg_sell_phases_en=neg_sell_phases_en,
soc_target_by_t=soc_target_by_t,
)
needed_wh = max(0.0, target_wh - float(current_soc_wh))
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
cushion_by_day[str(day)] = ok
if ok:
export_ts |= _pre_neg_pv_export_slot_indices_for_day(
slots,
first_t,
first_neg_buy_idx,
)
tgt0 = (
float(soc_target_by_t[first_t])
if soc_target_by_t and soc_target_by_t[first_t] is not None
else float(battery.soc_max_wh)
)
soc_est = max(float(battery.min_soc_wh), min(float(battery.soc_max_wh), tgt0))
return export_ts, cushion_by_day
def _pre_neg_pv_export_slot_indices(
@@ -1049,7 +1203,7 @@ def _pre_neg_pv_export_slot_indices(
pre_neg_export_last_t: int | None,
first_neg_buy_idx: int | None,
) -> set[int]:
"""Sloty s kladným sell před prvním sell<0 (a před buy<0), PV přebytek)."""
"""Legacy: jen před globálním prvním sell<0 (v36 preferuj _pre_neg_pv_export_bundle)."""
if first_neg_sell_idx is None or pre_neg_export_last_t is None:
return set()
out: set[int] = set()
@@ -1064,6 +1218,36 @@ def _pre_neg_pv_export_slot_indices(
return out
def _evening_discharge_before_neg_day_ts(
slots: list[PlanningSlot],
neg_sell_day_meta: dict[str, Any],
) -> set[int]:
"""
Večer/noc kalendářního dne D1 před pražským dnem D s sell<0: příprava headroomu.
"""
from datetime import timedelta
out: set[int] = set()
for day_info in neg_sell_day_meta.get("days") or []:
first_neg = int(day_info.get("first_neg_idx", -1))
if first_neg < 0 or first_neg >= len(slots):
continue
neg_date = _prague_calendar_date(slots[first_neg])
prev_date = neg_date - timedelta(days=1)
for t, st in enumerate(slots):
if _prague_calendar_date(st) != prev_date:
continue
if float(st.sell_price) < 0.0:
continue
h = _prague_hour(st)
if not (17 <= h <= 23 or _in_night_battery_export_window(st)):
continue
if float(st.sell_price) < PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH:
continue
out.add(t)
return out
MORNING_PRENEG_START_HOUR = 5
MORNING_PRENEG_END_HOUR = 11
@@ -1856,29 +2040,42 @@ def solve_dispatch(
prep_hold_bcpv_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_curtail_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_met_binary: dict[int, pulp.LpVariable] = {}
pre_neg_pv_export_forecast_ok = bool(
om == "AUTO"
and not purchase_fixed_pre
and first_neg_sell_idx is not None
and pre_neg_export_last_t is not None
and _pre_neg_pv_export_forecast_cushion_ok(
pre_neg_cushion_by_day: dict[str, bool] = {}
pre_neg_pv_export_ts: set[int] = set()
neg_evening_before_neg_ts: set[int] = set()
if om == "AUTO" and not purchase_fixed_pre and neg_sell_phases_en:
pre_neg_pv_export_ts, pre_neg_cushion_by_day = _pre_neg_pv_export_bundle(
slots,
battery,
current_soc_wh,
first_neg_sell_idx,
neg_sell_phases_en=neg_sell_phases_en,
)
)
pre_neg_pv_export_ts = (
_pre_neg_pv_export_slot_indices(
slots,
first_neg_sell_idx,
pre_neg_export_last_t,
first_neg_buy_idx,
neg_sell_phases_en=True,
soc_target_by_t=neg_sell_soc_target_by_t,
)
if pre_neg_pv_export_forecast_ok
else set()
)
neg_evening_before_neg_ts = _evening_discharge_before_neg_day_ts(
slots,
neg_sell_day_meta,
)
elif om == "AUTO" and not purchase_fixed_pre:
legacy_ok = bool(
first_neg_sell_idx is not None
and pre_neg_export_last_t is not None
and _pre_neg_pv_export_forecast_cushion_ok(
slots,
battery,
current_soc_wh,
first_neg_sell_idx,
neg_sell_phases_en=False,
)
)
if legacy_ok:
pre_neg_pv_export_ts = _pre_neg_pv_export_slot_indices(
slots,
first_neg_sell_idx,
pre_neg_export_last_t,
first_neg_buy_idx,
)
pre_neg_pv_export_forecast_ok = bool(pre_neg_pv_export_ts)
pre_neg_buy_discharge_ts: set[int] = set()
if om == "AUTO" and first_neg_buy_idx is not None and first_neg_buy_idx > 0:
pre_neg_buy_discharge_ts = _pre_neg_buy_discharge_indices(
@@ -2049,6 +2246,7 @@ def solve_dispatch(
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_evening_before_neg_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable, float]] = []
neg_buy_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
@@ -2138,25 +2336,41 @@ def solve_dispatch(
cap_ns = float(min(pv_surplus_ns, battery.max_charge_power_w))
sf_ns = pulp.LpVariable(f"neg_phase_pv_charge_{t_ns}", 0, cap_ns)
pv_charge_shortfall.append((t_ns, sf_ns, cap_ns))
if pre_neg_pv_export_forecast_ok:
for t_pe in sorted(pre_neg_pv_export_ts):
s_pe = slots[t_pe]
pv_surplus_pe = max(
0.0,
float(s_pe.pv_a_forecast_w)
+ float(s_pe.pv_b_forecast_w)
- float(s_pe.load_baseline_w),
for t_pe in sorted(pre_neg_pv_export_ts):
s_pe = slots[t_pe]
pv_surplus_pe = max(
0.0,
float(s_pe.pv_a_forecast_w)
+ float(s_pe.pv_b_forecast_w)
- float(s_pe.load_baseline_w),
)
cap_pe = float(
min(
pv_surplus_pe,
float(grid.max_export_power_w),
)
cap_pe = float(
min(
pv_surplus_pe,
float(grid.max_export_power_w),
)
)
if cap_pe <= 500.0:
continue
sf_pe = pulp.LpVariable(f"pre_neg_pv_export_sf_{t_pe}", 0, cap_pe)
pre_neg_pv_export_shortfall.append((t_pe, sf_pe, cap_pe))
)
if cap_pe <= 500.0:
continue
sf_pe = pulp.LpVariable(f"pre_neg_pv_export_sf_{t_pe}", 0, cap_pe)
pre_neg_pv_export_shortfall.append((t_pe, sf_pe, cap_pe))
export_cap_evening = _battery_export_cap_w(battery, grid)
for t_ev in sorted(neg_evening_before_neg_ts):
if t_ev not in discharge_export_slots:
continue
if not _slot_profitable_battery_export(
slots[t_ev],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=float(degradation_cost_effective),
fixed_tariff=fixed_tariff_like,
):
continue
sf_ev = pulp.LpVariable(
f"neg_eve_prep_discharge_{t_ev}",
0,
export_cap_evening,
)
neg_evening_before_neg_shortfall.append((t_ev, sf_ev, export_cap_evening))
for t in range(T):
if not post_neg_pv_topup[t]:
continue
@@ -2379,6 +2593,12 @@ def solve_dispatch(
sf * PRE_NEG_PV_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_pv_export_shortfall
)
+ pulp.lpSum(
sf * NEG_EVENING_PREP_DISCHARGE_SHORTFALL_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
for _t, sf, _cap in neg_evening_before_neg_shortfall
)
+ pulp.lpSum(
bc_pv[t]
* PRE_NEG_PV_BCPV_DISCOURAGE_CZK_KWH
@@ -2479,6 +2699,8 @@ def solve_dispatch(
prob += sf >= cap_w - bc_pv[t_sf]
for t_sf, sf, cap_w in pre_neg_pv_export_shortfall:
prob += sf >= cap_w - ge_pv[t_sf]
for t_sf, sf, cap_w in neg_evening_before_neg_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
@@ -3315,6 +3537,12 @@ def solve_dispatch(
"neg_sell_post_detach_prep": (
t in neg_sell_post_detach_prep_ts if neg_sell_phases_en else None
),
"pre_neg_pv_export": (
t in pre_neg_pv_export_ts if neg_sell_phases_en else None
),
"neg_evening_before_neg": (
t in neg_evening_before_neg_ts if neg_sell_phases_en else None
),
}
)
tgt_s = st.safety_soc_target_wh if daytime_en else None
@@ -3427,9 +3655,15 @@ def solve_dispatch(
else None
),
"pre_neg_pv_export_forecast_ok": bool(pre_neg_pv_export_forecast_ok),
"pre_neg_cushion_by_day": pre_neg_cushion_by_day or None,
"pre_neg_pv_export_slots": [
slots[i].interval_start.isoformat() for i in sorted(pre_neg_pv_export_ts)
],
"neg_evening_before_neg_slots": [
slots[i].interval_start.isoformat()
for i in sorted(neg_evening_before_neg_ts)
],
"neg_sell_prep_window_v36": bool(neg_sell_phases_en),
"neg_sell_day_pv_usable_wh": (
_neg_sell_day_pv_usable_wh(
slots,

View File

@@ -19,6 +19,7 @@ from services.planning_engine import (
_in_night_battery_export_window,
_neg_sell_day_phases,
_neg_sell_phases_enabled,
_pre_neg_pv_export_bundle,
_pre_neg_buy_soc_ceiling_wh,
_pre_neg_peak_sell_idx,
_pre_neg_pv_export_forecast_cushion_ok,
@@ -3771,6 +3772,18 @@ class NegSellSocPhaseTests(unittest.TestCase):
self.assertGreater(float(meta.get("e_surplus_after_t_wh") or 0), 0.0)
self.assertIn("post_detach_prep_ts", meta)
def test_t_detach_not_first_neg_on_long_sunny_day(self) -> None:
"""Bod T až po nabití rampy (~85 % soc_max), ne na prvním sell<0 slotu."""
slots = self._neg_sell_slots(24, pv_b=7000, pv_a=5000)
bat = self._phase_battery(tail_slots=4)
_ph, _tg, _w, meta = _neg_sell_day_phases(slots, bat)
day = meta["days"][0]
self.assertGreater(
int(day["t_detach_idx"]),
int(day["first_neg_idx"]),
"t_detach must be after first neg slot on long window",
)
def test_prep_reaches_soc_by_mid_window(self) -> None:
slots = self._neg_sell_slots(12)
bat = self._phase_battery()
@@ -4019,5 +4032,67 @@ class PreNegPvExportForecastTests(unittest.TestCase):
self.assertGreater(results[2].battery_setpoint_w, 2000)
class NegSellPrepWindowV36Tests(unittest.TestCase):
"""v36: pre-neg per den, opravený bod T, večerní výboj před neg dnem."""
def test_pre_neg_bundle_second_calendar_day(self) -> None:
# Dva pražské dny: den 1 odpoledne neg, den 2 ráno před neg.
base = datetime(2026, 6, 10, 10, 0, tzinfo=ZoneInfo("Europe/Prague")).astimezone(
timezone.utc
)
slots: list[PlanningSlot] = []
for i in range(120):
local = (base + timedelta(minutes=15 * i)).astimezone(
ZoneInfo("Europe/Prague")
)
h = local.hour + local.minute / 60.0
if local.date().day == 10:
sell = -0.2 if h >= 14 else 2.5
elif local.date().day == 11:
sell = -0.2 if 9 <= h < 15 else 2.8
else:
sell = 2.5
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sell,
pv_a_forecast_w=7000,
pv_b_forecast_w=9000,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
)
)
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
_ph, tg, _w, meta = _neg_sell_day_phases(slots, bat)
export_ts, cushion = _pre_neg_pv_export_bundle(
slots,
bat,
0.35 * bat.soc_max_wh,
None,
neg_sell_phases_en=True,
soc_target_by_t=tg,
)
self.assertGreaterEqual(len(cushion), 2)
self.assertGreater(len(export_ts), 0)
if len(meta.get("days", [])) >= 2:
second_first = int(meta["days"][1]["first_neg_idx"])
second_morning = [
t
for t in export_ts
if t < second_first and float(slots[t].sell_price) >= 0.0
]
self.assertGreater(
len(second_morning),
0,
"morning before 2nd neg day should allow pre-neg export",
)
if __name__ == "__main__":
unittest.main()