Files
ems/backend/services/planning/heuristics.py
Dusan Vojacek cb6afbb3fd Fáze 1.5: extrakce 88 pre-solver heuristik do services/planning/heuristics.py
SoC série, neg-sell fáze/okna, evening push, pre-neg logika — čistý přesun,
fasáda v planning_engine.py beze změny chování (golden 5/5, baseline faily
beze změny). Roztroušené konstanty MORNING_PRENEG_* doplněny do constants.py.

planning_engine.py: 6345 → 3925 řádků (zbývá: solver, orchestrace, compare).
heuristics.py nese warning: hlavní kandidáti na prune ve Fázi 2/3.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 13:32:47 +02:00

1982 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning/heuristics.py
#
# EMS plánovač pre-solver heuristiky (SoC série, neg-sell fáze, evening push,
# pre-neg okna). Fáze 1 dekompozice: čistý přesun z planning_engine.py.
#
# POZOR (Fáze 2/3): tyto funkce jsou hlavní kandidáti na zjednodušení/odstranění —
# ~35 % ekonomické logiky obchází solver (viz scripts/harness/README.md, baseline
# GAP 27 %). Neměnit chování bez golden gate + economics_report.
import logging
from dataclasses import replace
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any, Optional
from services.planning.constants import (
MORNING_PRENEG_START_HOUR,
MORNING_PRENEG_END_HOUR,
ARB_FLOOR_E_REF_FRAC,
ARB_LOOKAHEAD_SLOTS,
EVENING_PUSH_HYSTERESIS_SELL_PEAK_DELTA_CZK_KWH,
EVENING_PUSH_HYSTERESIS_SOC_PCT,
EXTREME_BUY_DUMP_PREWINDOW_SLOTS,
GE_MIN_EXPORT_W,
INTERVAL_H,
NIGHT_EXPORT_EVENING_START_HOUR,
NIGHT_EXPORT_MORNING_END_HOUR,
NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W,
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH,
PRE_NEG_PV_EXPORT_FORECAST_MARGIN,
PRE_NEG_PV_EXPORT_MIN_NEEDED_WH,
TERMINAL_NEG_BUY_MAGNITUDE_FLOOR,
TERMINAL_NEG_BUY_MAGNITUDE_REF_CZK,
TERMINAL_NEG_BUY_WEIGHT_CAP,
TERMINAL_NEG_BUY_WEIGHT_HORIZON_SLOTS,
)
from services.planning.types import (
PlanningSlot,
DispatchResult,
SOC_MIN_RELAX_LOOKAHEAD_SLOTS,
_prague_calendar_date,
_prague_hour,
)
logger = logging.getLogger(__name__)
def _pv_scarcity_penalty_multiplier(slots: list["PlanningSlot"], battery) -> float:
"""
Měkká úprava ekonomiky cyklu podle očekávaného slunečního zisku.
- málo očekávané FVE energie -> nižší penalizace cyklu (podpora precharge ze sítě),
- hodně očekávané FVE energie -> standardní penalizace.
"""
horizon_slots = min(len(slots), int(24 / INTERVAL_H)) # konzervativní 1 den dopředu
if horizon_slots <= 0:
return 1.0
pv_kwh = 0.0
for s in slots[:horizon_slots]:
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
# coverage = kolikanásobek baterie očekáváme ze slunce v horizontu.
coverage = pv_kwh / batt_kwh
coverage_clamped = max(0.0, min(1.0, coverage))
# 0.65 při nízkém slunci, 1.0 při vysokém slunci.
return 0.65 + 0.35 * coverage_clamped
def _pv_coverage_ratio(slots: list["PlanningSlot"], battery, hours: int = 24) -> float:
horizon_slots = min(len(slots), int(hours / INTERVAL_H))
if horizon_slots <= 0:
return 1.0
pv_kwh = 0.0
for s in slots[:horizon_slots]:
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
return max(0.0, min(1.0, pv_kwh / batt_kwh))
def _dynamic_arb_floor_wh_series(
slots: list["PlanningSlot"],
min_soc_wh: float,
arb_base_wh: float,
usable_wh: float,
) -> list[float]:
"""
Časově proměnná ekonomická podlaha Wh pro MILP (nad min_soc_wh).
Hodně očekávané FVE energie v dalších ARB_LOOKAHEAD_SLOTS → podlaha klesá k min_soc_wh;
málo slunce → zůstává u arb_base_wh (typicky reserve z DB).
"""
T = len(slots)
if T == 0:
return []
e_ref = max(1.0, ARB_FLOOR_E_REF_FRAC * float(usable_wh))
spread = max(0.0, float(arb_base_wh) - float(min_soc_wh))
out: list[float] = []
for t in range(T):
e_pv_wh = 0.0
for k in range(t, min(T, t + ARB_LOOKAHEAD_SLOTS)):
s = slots[k]
e_pv_wh += max(0, s.pv_a_forecast_w + s.pv_b_forecast_w) * INTERVAL_H
f = min(1.0, e_pv_wh / e_ref) if e_ref > 1e-9 else 1.0
arb_t = float(min_soc_wh) + (1.0 - f) * spread
out.append(arb_t)
return out
def _soc_security_profile(slots: list["PlanningSlot"], battery) -> tuple[float, float]:
"""
Při nízkém očekávaném slunci drží solver vyšší SoC buffer:
- cílový buffer: reserve + až 20 % usable capacity,
- ekonomická penalizace deficitu vůči bufferu z průměrné ceny.
"""
coverage = _pv_coverage_ratio(slots, battery, hours=24)
scarcity = 1.0 - coverage
usable_wh = float(getattr(battery, "usable_capacity_wh", 0.0))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0.0))
soc_max_wh = float(getattr(battery, "soc_max_wh", usable_wh))
extra_buffer_wh = 0.35 * usable_wh * scarcity
target_wh = min(soc_max_wh, reserve_wh + extra_buffer_wh)
h24 = min(len(slots), int(24 / INTERVAL_H))
avg_buy = (
sum(float(s.buy_price) for s in slots[:h24]) / h24
if h24 > 0
else 4.0
)
penalty_czk_kwh = max(0.1, avg_buy * 1.00 * scarcity)
return target_wh, penalty_czk_kwh
def _soc_min_wh_series(
slots: list[PlanningSlot],
usable_wh: float,
base_min_wh: float,
buy_extreme_threshold: float,
planner_discharge_floor_pct: float | None,
) -> list[float]:
"""
Spodní mez SoC (Wh) pro každý slot: při extrémně záporném buy v lookahead povolit hlubší vybíjení
až na planner_discharge_floor_percent (jinak min_soc z DB). Absolutní minimum 5 % usable.
"""
t_len = len(slots)
abs_min_wh = max(usable_wh * 0.05, 1.0)
if planner_discharge_floor_pct is None:
relaxed_wh = base_min_wh
else:
relaxed_wh = max(abs_min_wh, float(planner_discharge_floor_pct) / 100.0 * usable_wh)
effective_relaxed = min(base_min_wh, relaxed_wh)
out: list[float] = []
for t in range(t_len):
j_end = min(t_len, t + SOC_MIN_RELAX_LOOKAHEAD_SLOTS)
min_buy_fwd = min(float(slots[k].buy_price) for k in range(t, j_end))
if min_buy_fwd <= buy_extreme_threshold:
out.append(float(effective_relaxed))
else:
out.append(float(base_min_wh))
return out
def _slots_until_buy_le_threshold(
slots: list[PlanningSlot], buy_threshold: float
) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s buy_price <= buy_threshold.
Pokud v [t, T) žádný takový není, vrátí T + 1 (větší než jakýkoli rozumný prewindow).
"""
t_len = len(slots)
sentinel = t_len + 1
next_le = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].buy_price) <= buy_threshold:
next_le = t
next_at_or_after[t] = next_le
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _slots_until_sell_lt(slots: list[PlanningSlot], sell_upper: float) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s sell_price < sell_upper.
Typicky sell_upper=0 (první záporný / „ztrátový“ prodej z pohledu OTE).
Pokud v [t, T) žádný takový není, vrátí T + 1.
"""
t_len = len(slots)
sentinel = t_len + 1
next_lt = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].sell_price) < sell_upper:
next_lt = t
next_at_or_after[t] = next_lt
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _prewindow_deferral_slots(
slots: list[PlanningSlot], buy_extreme_threshold: float, sell_upper: float = 0.0
) -> list[int]:
"""
Vzdálenost (v 15min slotech) pro zpoždění hlubokého planner flooru:
primárně do prvního sell < sell_upper (poslední „bez ztráty na prodeji“ je k-1),
pokud v horizontu není záporný prodej, fallback na první buy <= buy_extreme_threshold.
"""
t_len = len(slots)
sell_d = _slots_until_sell_lt(slots, sell_upper)
buy_d = _slots_until_buy_le_threshold(slots, buy_extreme_threshold)
sentinel = t_len + 1
out: list[int] = []
for t in range(t_len):
if sell_d[t] < sentinel:
out.append(sell_d[t])
else:
out.append(buy_d[t])
return out
def _soc_panel_min_wh_series(
soc_min_series: list[float],
slots_until_relax_anchor: list[int],
min_soc_wh: float,
arb_base_wh: float,
prewindow_slots: int,
) -> list[float]:
"""
Zpoždění hluboké relaxace: pokud je lookahead extrémní (soc_min pod min_soc), ale kotva
(záporný prodej / fallback extrémní buy) je dál než prewindow_slots, drž spodek na
max(relax_wh, arb_base_wh) — prakticky na rezervě.
"""
t_len = len(soc_min_series)
out: list[float] = []
for t in range(t_len):
sm = float(soc_min_series[t])
if sm < min_soc_wh - 1e-3 and slots_until_relax_anchor[t] > prewindow_slots:
out.append(max(sm, float(arb_base_wh)))
else:
out.append(sm)
return out
def _recompute_charge_acquisition_from_results(
slots: list[PlanningSlot],
results: list["DispatchResult"],
battery,
) -> float:
"""Vážený buy z nabíjecích slotů (grid import + bat charge) z prvního solve."""
wh_total = 0.0
cost = 0.0
for s, r in zip(slots, results):
if not s.allow_charge:
continue
# Zaporne buy sloty (OTE) nejsou grid acquisition pro arbitraz exportu baterie.
if float(s.buy_price) < 0:
continue
gi_w = max(0, int(r.grid_setpoint_w or 0))
bc_w = max(0, int(r.battery_setpoint_w or 0))
wh = (gi_w + bc_w) * INTERVAL_H
if wh <= 0:
continue
wh_total += wh
cost += float(s.buy_price) * wh
if wh_total <= 0:
raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
if raw is not None:
return float(raw)
return min(float(s.buy_price) for s in slots)
return cost / wh_total
def _slots_with_charge_acquisition(
slots: list[PlanningSlot],
acquisition_czk_kwh: float,
) -> list[PlanningSlot]:
return [
replace(s, charge_acquisition_buy_czk_kwh=acquisition_czk_kwh)
for s in slots
]
def _pv_store_value_czk_kwh(slot: PlanningSlot, min_spread: float) -> float:
"""
Práh pro tvrdý zákaz ge_pv (sell pod budoucím max sell v horizontu).
U spotu při sell >= 0 se neaplikuje — export vs. nabíjení řeší LP; baterii
na večerní peak drží ge_bat (evening_early / push), ne ge_pv == 0.
"""
future = float(
slot.future_sell_opportunity_czk_kwh
if slot.future_sell_opportunity_czk_kwh is not None
else slot.sell_price
)
return future - min_spread
def _slot_profitable_battery_export(
slot: PlanningSlot,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
) -> bool:
"""
Export z baterie do sítě má kladnou marži.
Spot: sell > charge_acquisition + spread (energie ze sítě / vážený nákup).
Fixní tarif (BA81/KV1): stejně jako R__063 discharge maska — sell > buy + spread;
acquisition může být nafouknutá grid nabíjením a blokovat večerní špičku (3,7 < 3,9).
"""
sell_t = float(slot.sell_price)
acq = float(charge_acquisition_czk_kwh)
if fixed_tariff:
buy_t = float(slot.buy_price)
if buy_t >= 0.0:
return sell_t > buy_t + min_spread
return sell_t > acq + min_spread
def _purchase_pricing_fixed(grid: Any) -> bool:
"""Režim nákupu z DB (`site_market_config.purchase_pricing_mode`), ne odhad z rozptylu buy."""
return (
str(getattr(grid, "purchase_pricing_mode", "spot") or "spot").strip().lower()
== "fixed"
)
def _horizon_fixed_tariff_like(slots: list[PlanningSlot]) -> bool:
"""
Heuristika pro drahý import / charge_acquisition: buy v horizontu je prakticky konstantní.
U spotu (home-01) nesmí expensive_import používat charge_acquisition — jinak
buy > ~1 Kč označí téměř všechny sloty jako drahé (gi=0 pro dům) → Infeasible.
BA81 má fixní nákup v DB, ale NT/VT → buy skáče; proto neg-sell export řídí _purchase_pricing_fixed.
"""
buys = [float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0]
if not buys:
return False
if len(buys) == 1:
return True
return max(buys) - min(buys) < 0.25
def _future_extreme_buy_from(
slots: list[PlanningSlot],
buy_thr: float,
) -> list[bool]:
"""True v t, pokud v některém budoucím slotu buy <= buy_thr."""
t_len = len(slots)
out = [False] * t_len
seen = False
for i in range(t_len - 1, -1, -1):
if float(slots[i].buy_price) <= buy_thr:
seen = True
out[i] = seen
return out
def _neg_sell_bat_dump_slots(
slots: list[PlanningSlot],
*,
operating_mode: str,
purchase_fixed: bool,
grid: Any,
buy_extreme_thr: float,
degrad_czk_kwh: float,
) -> set[int]:
"""Sloty, kde smí ge_bat>0 při sell<0 (výboj před extrémně záporným buy)."""
if operating_mode != "AUTO" or purchase_fixed:
return set()
if bool(getattr(grid, "block_export_on_negative_sell", False)):
return set()
t_len = len(slots)
future_extreme = _future_extreme_buy_from(slots, buy_extreme_thr)
dist = _slots_until_buy_le(slots, buy_extreme_thr)
out: set[int] = set()
for t, s in enumerate(slots):
if float(s.sell_price) >= 0.0:
continue
future_min = min(
(float(slots[j].buy_price) for j in range(t + 1, t_len)),
default=float(s.buy_price),
)
if (
future_extreme[t]
and 0 < dist[t] <= EXTREME_BUY_DUMP_PREWINDOW_SLOTS
and future_min < float(s.sell_price) - degrad_czk_kwh
):
out.add(t)
return out
def _slots_until_buy_le(
slots: list[PlanningSlot],
buy_thr: float,
) -> list[int]:
"""Počet slotů do nejbližšího buy <= thr (0 = v tomto slotu, T = nikdy)."""
t_len = len(slots)
dist = [t_len] * t_len
next_idx = t_len
for i in range(t_len - 1, -1, -1):
if float(slots[i].buy_price) <= buy_thr:
next_idx = i
dist[i] = (next_idx - i) if next_idx < t_len else t_len
return dist
def _pre_negative_sell_export_window(
slots: list[PlanningSlot],
) -> tuple[int | None, int | None]:
"""Index prvního sell<0 a posledního slotu před ním (pro strategii „vyvézt dřív“)."""
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
if first_neg is None or first_neg <= 0:
return first_neg, None
return first_neg, first_neg - 1
def _neg_sell_phases_enabled(battery: Any) -> bool:
# Bez atributů z DB (unit testy) = legacy; z DB default 80 % / 4 sloty (V083).
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
tail_slots = int(getattr(battery, "planner_neg_sell_full_soc_tail_slots", 0))
return prep_pct < 100.0 - 1e-6 and tail_slots > 0
def _neg_sell_indices_by_prague_day(
slots: list[PlanningSlot],
) -> dict[object, list[int]]:
by_day: dict[object, list[int]] = {}
for t, st in enumerate(slots):
if float(st.sell_price) < 0.0:
by_day.setdefault(_prague_calendar_date(st), []).append(t)
for day in by_day:
by_day[day].sort()
return by_day
def _neg_sell_t_detach_index(
indices: list[int],
charge_b: dict[int, float],
soc_need: dict[int, float],
tail_start: int,
soc_max: float,
*,
margin: float = 1.05,
min_gap_wh: float = 500.0,
detach_soc_frac: float = 0.85,
) -> int:
"""
Bod T: první prep slot, kde (1) soc_need[t] ≥ detach_soc_frac × soc_max a
(2) zbývající B-nabití od t do konce pokryje mezeru do 100 %.
Dřívější chyba: soc_need[t] ≤ soc_need[tail_start] platilo hned na začátku okna.
"""
if not indices:
return 0
suffix_from: dict[int, float] = {}
run = 0.0
for t in reversed(indices):
run += float(charge_b.get(t, 0.0))
suffix_from[t] = run
thresh_wh = max(
soc_max * detach_soc_frac,
float(soc_need.get(tail_start, soc_max)) * 0.92,
)
for t in indices:
if t >= tail_start:
continue
need_t = float(soc_need.get(t, soc_max))
if need_t < thresh_wh:
continue
gap_rem = soc_max - need_t
if gap_rem <= min_gap_wh:
return t
if suffix_from.get(t, 0.0) >= gap_rem * margin:
return t
return tail_start
def _neg_sell_pv_b_charge_wh(slot: PlanningSlot, battery: Any) -> float:
"""Odhad Wh nabitelné jen z PV B v jednom sell<0 slotu (surplus nad load, cap výkonu)."""
pv_surplus_b = max(0.0, float(slot.pv_b_forecast_w) - float(slot.load_baseline_w))
if pv_surplus_b <= 500.0:
return 0.0
cap_w = min(pv_surplus_b, float(battery.max_charge_power_w))
return cap_w * INTERVAL_H * float(battery.charge_efficiency)
def _neg_sell_pv_forecast_charge_wh(slot: PlanningSlot, battery: Any) -> float:
"""Odhad Wh z FVE A+B v sell<0 slotu pro zpětnou projekci soc_need (v44)."""
pv_surplus = max(
0.0,
float(slot.pv_a_forecast_w)
+ float(slot.pv_b_forecast_w)
- float(slot.load_baseline_w),
)
if pv_surplus <= 500.0:
return 0.0
cap_w = min(pv_surplus, float(battery.max_charge_power_w))
return cap_w * INTERVAL_H * float(battery.charge_efficiency)
def _neg_sell_day_pv_b_usable_wh(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
battery: Any,
) -> float:
"""Součet B-nabíjení ve všech sell<0 slotech téhož pražského dne."""
if first_neg_sell_idx is None:
return 0.0
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
total = 0.0
for s in slots:
if _prague_calendar_date(s) != neg_day:
continue
if float(s.sell_price) >= 0.0:
continue
total += _neg_sell_pv_b_charge_wh(s, battery)
return total
def _neg_sell_e_surplus_after_t_wh(
slots: list[PlanningSlot],
t_detach: int,
last_neg: int,
battery: Any,
) -> float:
"""Integrál přebytku FVE nad load+bat cap od t_detach do last_neg (Wh)."""
total = 0.0
for t in range(t_detach, last_neg + 1):
if t < 0 or t >= len(slots):
continue
st = slots[t]
if float(st.sell_price) >= 0.0:
continue
pv_surplus = max(
0.0,
float(st.pv_a_forecast_w)
+ float(st.pv_b_forecast_w)
- float(st.load_baseline_w),
)
if pv_surplus <= 500.0:
continue
cap_charge_wh = (
min(pv_surplus, float(battery.max_charge_power_w))
* INTERVAL_H
* float(battery.charge_efficiency)
)
total += max(0.0, pv_surplus * INTERVAL_H - cap_charge_wh)
return total
def _neg_sell_day_phases(
slots: list[PlanningSlot],
battery: Any,
) -> tuple[list[str], list[Optional[float]], list[float], dict[str, Any]]:
"""
Per slot: phase (none|prep|tail), soc_target_wh (rampa z PV B, ne fixní %), shortfall váha.
V35: zpětná projekce soc_need z B od tail.
V36: t_detach = první prep slot kde suffix B-nabití pokryje (soc_max soc_need[t]).
"""
t_len = len(slots)
phases: list[str] = ["none"] * t_len
soc_targets: list[Optional[float]] = [None] * t_len
shortfall_weights: list[float] = [0.0] * t_len
tail_n = int(getattr(battery, "planner_neg_sell_full_soc_tail_slots", 0))
soc_max = float(battery.soc_max_wh)
min_soc = float(battery.min_soc_wh)
post_detach_prep_ts: set[int] = set()
day_meta: list[dict[str, Any]] = []
by_day: dict[object, list[int]] = {}
for t, st in enumerate(slots):
if float(st.sell_price) < 0.0:
by_day.setdefault(_prague_calendar_date(st), []).append(t)
for day, indices in by_day.items():
if not indices:
continue
indices.sort()
last_t = indices[-1]
tail_start = max(indices[0], last_t - tail_n + 1) if tail_n > 0 else last_t + 1
charge_b = {
t: _neg_sell_pv_forecast_charge_wh(slots[t], battery) for t in indices
}
soc_need: dict[int, float] = {last_t: soc_max}
for i in range(len(indices) - 1, 0, -1):
t_cur = indices[i]
t_prev = indices[i - 1]
soc_need[t_prev] = max(min_soc, soc_need[t_cur] - charge_b[t_cur])
t_detach = _neg_sell_t_detach_index(
indices,
charge_b,
soc_need,
tail_start,
soc_max,
)
soc_detach_wh = float(soc_need.get(t_detach, soc_max))
e_surplus = _neg_sell_e_surplus_after_t_wh(slots, t_detach, last_t, battery)
for t in indices:
if t >= tail_start:
phases[t] = "tail"
if tail_n <= 1:
soc_targets[t] = soc_max
else:
pos = t - tail_start
frac = pos / float(max(1, tail_n - 1))
lo = float(soc_need.get(tail_start, soc_max))
soc_targets[t] = lo + frac * (soc_max - lo)
else:
phases[t] = "prep"
soc_targets[t] = float(soc_need[t])
if t >= t_detach:
post_detach_prep_ts.add(t)
shortfall_weights[t] = float(last_t - t + 1) / float(len(indices))
day_meta.append(
{
"prague_date": str(day),
"first_neg_idx": indices[0],
"last_neg_idx": last_t,
"tail_start_idx": tail_start,
"t_detach_idx": t_detach,
"soc_detach_wh": soc_detach_wh,
"e_surplus_after_t_wh": e_surplus,
"soc_ramp_wh": [
{
"slot": slots[t].interval_start.isoformat(),
"soc_need_wh": float(soc_need[t]),
"phase": phases[t],
"soc_target_wh": float(soc_targets[t] or 0.0),
}
for t in indices
],
}
)
meta: dict[str, Any] = {
"neg_sell_b_ramp_v35": True,
"neg_sell_prep_window_v36": True,
"days": day_meta,
"post_detach_prep_ts": sorted(post_detach_prep_ts),
}
if day_meta:
meta["t_detach_idx"] = day_meta[0]["t_detach_idx"]
meta["e_surplus_after_t_wh"] = day_meta[0]["e_surplus_after_t_wh"]
return phases, soc_targets, shortfall_weights, meta
def _neg_sell_day_pv_usable_wh(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
*,
max_charge_power_w: float,
charge_efficiency: float,
) -> float:
"""
Odhad Wh nabitelné z FVE v sell<0 slotech téhož pražského dne (forecast surplus × cap nabíjení).
"""
if first_neg_sell_idx is None:
return 0.0
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
total_wh = 0.0
for s in slots:
if _prague_calendar_date(s) != neg_day:
continue
if float(s.sell_price) >= 0.0:
continue
pv_surplus_w = max(
0.0,
float(s.pv_a_forecast_w)
+ float(s.pv_b_forecast_w)
- float(s.load_baseline_w),
)
if pv_surplus_w <= 500.0:
continue
cap_w = min(pv_surplus_w, float(max_charge_power_w))
total_wh += cap_w * INTERVAL_H * float(charge_efficiency)
return total_wh
def _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots: list[PlanningSlot],
battery: Any,
first_neg_t: int,
observed_soc_wh: float,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> bool:
"""
Cushion pro jeden pražský den: usable A+B v sell<0 okně pokryje dobítí na soc_need[first_neg].
Vstup SoC = pozorovaná telemetrie (ne trajektorie z předchozího solve).
"""
if first_neg_t < 0 or first_neg_t >= len(slots):
return False
if neg_sell_phases_en and soc_target_by_t is not None:
tgt = soc_target_by_t[first_neg_t]
target_wh = float(tgt) if tgt is not None else float(battery.soc_max_wh)
else:
target_wh = float(battery.soc_max_wh)
soc_obs = max(
float(battery.min_soc_wh),
min(float(observed_soc_wh), float(battery.soc_max_wh)),
)
if soc_obs >= target_wh - 1e-3:
return True
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_t,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
needed_wh = max(0.0, target_wh - soc_obs)
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
def _pre_neg_pv_export_forecast_cushion_ok(
slots: list[PlanningSlot],
battery: Any,
observed_soc_wh: float,
first_neg_sell_idx: int | None,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> bool:
"""Zpětná kompatibilita: cushion pro první sell<0 v horizontu (pozorované SoC)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return False
targets = soc_target_by_t
if neg_sell_phases_en and targets is None:
_ph, targets, _w, _meta = _neg_sell_day_phases(slots, battery)
return _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots,
battery,
first_neg_sell_idx,
observed_soc_wh,
neg_sell_phases_en=neg_sell_phases_en,
soc_target_by_t=targets,
)
def _pre_neg_pv_export_slot_indices_for_day(
slots: list[PlanningSlot],
first_neg_t: int,
first_neg_buy_idx: int | None,
) -> set[int]:
"""Kladný sell téhož dne před prvním sell<0, PV přebytek."""
if first_neg_t <= 0:
return set()
neg_day = _prague_calendar_date(slots[first_neg_t])
out: set[int] = set()
for t in range(first_neg_t):
if _prague_calendar_date(slots[t]) != neg_day:
continue
if float(slots[t].sell_price) < 0.0:
continue
if first_neg_buy_idx is not None and t >= first_neg_buy_idx:
continue
if _slot_pv_surplus_w(slots[t]) <= NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W:
continue
out.add(t)
return out
def _pre_neg_pv_export_bundle(
slots: list[PlanningSlot],
battery: Any,
observed_soc_wh: float,
first_neg_buy_idx: int | None,
*,
neg_sell_phases_en: bool,
soc_target_by_t: list[Optional[float]] | None = None,
) -> tuple[set[int], dict[str, bool]]:
"""
v36: pre-neg export per pražský den s vlastním cushion (A+B v neg okně dne).
v40: cushion vždy z pozorovaného SoC (telemetrie), bez řetězení modelových cílů mezi dny.
"""
by_day = _neg_sell_indices_by_prague_day(slots)
export_ts: set[int] = set()
cushion_by_day: dict[str, bool] = {}
for day in sorted(by_day.keys()):
indices = by_day[day]
if not indices:
continue
first_t = indices[0]
ok = _pre_neg_pv_export_forecast_cushion_ok_for_day(
slots,
battery,
first_t,
observed_soc_wh,
neg_sell_phases_en=neg_sell_phases_en,
soc_target_by_t=soc_target_by_t,
)
cushion_by_day[str(day)] = ok
if ok:
export_ts |= _pre_neg_pv_export_slot_indices_for_day(
slots,
first_t,
first_neg_buy_idx,
)
return export_ts, cushion_by_day
def _pre_neg_pv_export_slot_indices(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
pre_neg_export_last_t: int | None,
first_neg_buy_idx: int | None,
) -> set[int]:
"""Legacy: jen před globálním prvním sell<0 (v36 preferuj _pre_neg_pv_export_bundle)."""
if first_neg_sell_idx is None or pre_neg_export_last_t is None:
return set()
out: set[int] = set()
for t in range(pre_neg_export_last_t + 1):
if float(slots[t].sell_price) < 0.0:
continue
if first_neg_buy_idx is not None and t >= first_neg_buy_idx:
continue
if _slot_pv_surplus_w(slots[t]) <= NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W:
continue
out.add(t)
return out
def _discharge_before_first_neg_sell_ts(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
) -> set[int]:
"""Všechny kladné-sell sloty před 1. sell<0 (funguje i v rolling bez D1 večera v horizontu)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return set()
return {
t
for t in range(first_neg_sell_idx)
if float(slots[t].sell_price) >= 0.0
}
def _evening_discharge_before_neg_day_ts(
slots: list[PlanningSlot],
neg_sell_day_meta: dict[str, Any],
) -> set[int]:
"""
Večer/noc kalendářního dne D1 před pražským dnem D s sell<0: příprava headroomu.
"""
from datetime import timedelta
out: set[int] = set()
for day_info in neg_sell_day_meta.get("days") or []:
first_neg = int(day_info.get("first_neg_idx", -1))
if first_neg < 0 or first_neg >= len(slots):
continue
neg_date = _prague_calendar_date(slots[first_neg])
prev_date = neg_date - timedelta(days=1)
for t, st in enumerate(slots):
if _prague_calendar_date(st) != prev_date:
continue
if float(st.sell_price) < 0.0:
continue
h = _prague_hour(st)
if not (17 <= h <= 23 or _in_night_battery_export_window(st)):
continue
if float(st.sell_price) < 0.0:
continue
out.add(t)
return out
def _night_baseload_buffer_wh_from_slots(
slots: list[PlanningSlot],
battery: Any,
) -> float:
"""Buffer Wh nad reserve pro noc (R__063 nebo % z asset_battery)."""
if not slots:
return 0.0
slot0 = slots[0]
buf = getattr(slot0, "night_baseload_buffer_wh", None)
if buf is not None:
return max(0.0, float(buf))
target = getattr(slot0, "night_baseload_target_wh", None)
if target is not None:
pct = float(getattr(battery, "planner_night_baseload_buffer_percent", 20.0) or 20.0)
return max(0.0, float(target) * pct / 100.0)
return 0.0
def _neg_evening_discharge_budget_wh(
*,
observed_soc_wh: float,
reserve_soc_wh: float,
night_baseload_buffer_wh: float,
) -> float:
"""Wh k výboji nad reserve + noční buffer — z telemetrie, ne z LP trajektorie."""
return max(
0.0,
float(observed_soc_wh) - float(reserve_soc_wh) - float(night_baseload_buffer_wh),
)
def _first_neg_sell_idx_on_prague_day(
slots: list[PlanningSlot],
prague_day: object,
) -> int | None:
for t, st in enumerate(slots):
if _prague_calendar_date(st) != prague_day:
continue
if float(st.sell_price) < 0.0:
return t
return None
def _terminal_neg_buy_weight(
slots: list[PlanningSlot],
*,
first_neg_buy_idx: int | None,
) -> float:
"""
w_neg ∈ [0, TERMINAL_NEG_BUY_WEIGHT_CAP]: snížení terminal SoC shadow price při buy<0 v horizontu.
Blížší a zápornější okno → vyšší váha; effective_factor = planner_terminal_soc_value_factor × (1 w_neg).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return 0.0
slots_ahead = first_neg_buy_idx
prox = max(
0.0,
1.0 - slots_ahead / TERMINAL_NEG_BUY_WEIGHT_HORIZON_SLOTS,
)
if prox <= 0.0:
return 0.0
window_end = min(len(slots), first_neg_buy_idx + int(24 / INTERVAL_H))
neg_buys = [
float(slots[t].buy_price)
for t in range(first_neg_buy_idx, window_end)
if float(slots[t].buy_price) < 0.0
]
if not neg_buys:
return 0.0
min_neg_buy = min(neg_buys)
mag_raw = min(1.0, abs(min_neg_buy) / TERMINAL_NEG_BUY_MAGNITUDE_REF_CZK)
mag = TERMINAL_NEG_BUY_MAGNITUDE_FLOOR + (1.0 - TERMINAL_NEG_BUY_MAGNITUDE_FLOOR) * mag_raw
return min(TERMINAL_NEG_BUY_WEIGHT_CAP, prox * mag)
def _future_neg_buy_discharge_enabled(
slots: list[PlanningSlot],
battery: Any,
*,
first_neg_buy_idx: int,
first_neg_sell_idx: int | None,
observed_soc_wh: float,
neg_sell_phases_en: bool,
neg_sell_soc_target_by_t: list[Optional[float]] | None = None,
) -> bool:
"""
Večerní vývoz k reserve_soc před dnem s buy<0: aktivní i při relaxed_neg_prep_window,
pokud FVE v sell<0 okně pokryje deficit do prep rampy (× PRE_NEG_PV_EXPORT_FORECAST_MARGIN).
"""
if first_neg_buy_idx <= 0:
return False
neg_buy_day = _prague_calendar_date(slots[first_neg_buy_idx])
neg_sell_t = first_neg_sell_idx
if (
neg_sell_t is None
or _prague_calendar_date(slots[neg_sell_t]) != neg_buy_day
):
neg_sell_t = _first_neg_sell_idx_on_prague_day(slots, neg_buy_day)
if neg_sell_t is None:
return False
if neg_sell_phases_en and neg_sell_soc_target_by_t is not None:
tgt = neg_sell_soc_target_by_t[neg_sell_t]
target_wh = float(tgt) if tgt is not None else float(battery.soc_max_wh)
else:
target_wh = float(battery.soc_max_wh)
reserve_wh = float(
getattr(battery, "reserve_soc_wh", getattr(battery, "min_soc_wh", 0.0))
)
soc_obs = max(
float(battery.min_soc_wh),
min(float(observed_soc_wh), float(battery.soc_max_wh)),
)
if soc_obs <= reserve_wh + 1e-3:
return False
if soc_obs >= target_wh - 1e-3:
return True
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
neg_sell_t,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
needed_wh = max(0.0, target_wh - soc_obs)
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
def _pos_sell_pre_neg_buy_evening_export_exempt_ts(
slots: list[PlanningSlot],
pos_sell_pre_neg_buy_ts: list[int],
evening_peak_export_ts: list[int],
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
future_neg_buy_discharge_en: bool,
) -> set[int]:
"""Večerní peak před buy<0: neaplikovat ge=0, pokud je vývoz ekonomicky výhodný."""
if not future_neg_buy_discharge_en:
return set()
evening_peak_set = set(evening_peak_export_ts)
out: set[int] = set()
for t in pos_sell_pre_neg_buy_ts:
if t not in evening_peak_set and not _in_evening_push_hour_window(slots[t]):
continue
if not _slot_profitable_battery_export(
slots[t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread,
fixed_tariff=fixed_tariff,
):
continue
out.add(t)
return out
def _neg_evening_before_neg_push_indices(
slots: list[PlanningSlot],
candidate_ts: set[int],
*,
export_budget_wh: float,
per_slot_discharge_wh: float,
discharge_export_ok: set[int] | None = None,
) -> set[int]:
"""Nejdražší kladné-sell sloty v kandidátech, dokud budget z pozorovaného SoC."""
if export_budget_wh < per_slot_discharge_wh * 0.5 or not candidate_ts:
return set()
eligible = {
t
for t in candidate_ts
if discharge_export_ok is None or t in discharge_export_ok
}
if not eligible:
return set()
ranked = sorted(
eligible,
key=lambda t: (float(slots[t].sell_price), -t),
reverse=True,
)
out: set[int] = set()
cum_wh = 0.0
for t in ranked:
if float(slots[t].sell_price) < 0.0:
continue
if cum_wh + per_slot_discharge_wh > export_budget_wh + 1e-6:
break
out.add(t)
cum_wh += per_slot_discharge_wh
return out
def _neg_evening_reserve_soc_anchors(
slots: list[PlanningSlot],
neg_sell_day_meta: dict[str, Any],
battery: Any,
) -> list[tuple[int, float]]:
"""
Kotvy SoC ≤ reserve_soc před neg oknem:
- večer D1 (23:45) pokud je v horizontu,
- slot těsně před 1. sell<0 (rolling: ráno bez včerejška v okně).
"""
from datetime import timedelta
reserve_wh = float(
getattr(battery, "reserve_soc_wh", getattr(battery, "min_soc_wh", 0.0))
)
out: list[tuple[int, float]] = []
seen: set[int] = set()
for day_info in neg_sell_day_meta.get("days") or []:
first_neg = int(day_info.get("first_neg_idx", -1))
if first_neg < 0 or first_neg >= len(slots):
continue
neg_date = _prague_calendar_date(slots[first_neg])
prev_date = neg_date - timedelta(days=1)
eve_slots = [
t
for t, st in enumerate(slots)
if _prague_calendar_date(st) == prev_date
and (
17 <= _prague_hour(st) <= 23
or _in_night_battery_export_window(st)
)
]
if eve_slots:
t_eve = max(eve_slots)
if t_eve not in seen:
out.append((t_eve, reserve_wh))
seen.add(t_eve)
if first_neg > 0:
t_pre = first_neg - 1
if (
t_pre not in seen
and float(slots[t_pre].sell_price) >= 0.0
):
out.append((t_pre, reserve_wh))
seen.add(t_pre)
return out
def _battery_export_cap_w(battery: Any, grid: Any) -> float:
"""Max výkon vývozu baterie do sítě [W] — z DB, ne hardcoded konstanta."""
return min(
float(battery.max_discharge_power_w),
float(grid.max_export_power_w),
)
def _evening_push_battery_export_w(
slot: PlanningSlot,
battery: Any,
grid: Any,
) -> float:
"""
Tvrdý push ge_bat: min(site/inverter export cap, BMS load).
Stejná fyzika jako Deye SELL — load pokryje baterie, zbytek výkonu jde do sítě
(ne (maxload)/2 z dvojího započtení bd+ge_bat v LP).
"""
cap = _battery_export_cap_w(battery, grid)
load_w = max(0.0, float(slot.load_baseline_w))
discharge_headroom = max(
0.0,
float(battery.max_discharge_power_w) - load_w,
)
return min(cap, discharge_headroom)
def _dispatch_grid_setpoint_w(
*,
gi_w: float,
ge_w: float,
ge_bat_w: float,
ge_pv_w: float,
max_export_power_w: int,
) -> tuple[int, str]:
"""
grid_setpoint pro export do sítě (záporný W) a export_mode.
gige může být ~0 při load-first, i když ge_bat exportuje — Deye reg 143 potřebuje |grid_setpoint|.
"""
ge_total = max(0.0, float(ge_w))
ge_bat_v = max(0.0, float(ge_bat_w))
cap = float(max_export_power_w)
if ge_bat_v >= GE_MIN_EXPORT_W and ge_bat_v >= max(ge_total * 0.5, 500.0):
export_w = min(cap, max(ge_total, ge_bat_v + max(0.0, float(ge_pv_w))))
return -int(round(export_w)), "BATTERY_SELL"
if ge_total >= GE_MIN_EXPORT_W:
return -int(round(min(cap, ge_total))), "PV_SURPLUS"
return round(float(gi_w) - ge_total), "NONE"
def _morning_pre_neg_zone_peak_sell(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
) -> float | None:
"""Max kladný sell v pásmu 511 Prague před prvním sell<0 (shodně s R__063)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return None
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
sells = [
float(slots[i].sell_price)
for i in range(first_neg_sell_idx)
if float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
]
if not sells:
return None
return max(sells)
def _pre_neg_peak_sell_idx(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
) -> int | None:
"""Nejvyšší kladný sell v ranním pásmu před prvním sell<0 (ne půlnoc celého dne)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return None
zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx)
if zone_peak is None:
return None
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
positive = [
(i, float(slots[i].sell_price))
for i in range(first_neg_sell_idx)
if float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
]
if not positive:
return None
return max(positive, key=lambda x: (x[1], x[0]))[0]
def _morning_pre_neg_export_indices(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
*,
degrad_czk_kwh: float,
) -> list[int]:
"""Všechny ranní peak sloty (sell ≥ zónový max degrad) před prvním sell<0."""
zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx)
if zone_peak is None or first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return []
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
out: list[int] = []
for i in range(first_neg_sell_idx):
if (
float(slots[i].sell_price) >= zone_peak - degrad_czk_kwh
and float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
):
out.append(i)
return out
def _pre_neg_buy_discharge_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
) -> set[int]:
"""
Sloty před prvním buy<0: výboj baterie do sítě při kladném sell (včetně noci).
Bez rozšíření discharge_export_slots (v19b — jinak w_arb → Infeasible).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return set()
out: set[int] = set()
for i in range(first_neg_buy_idx):
s = slots[i]
if float(s.buy_price) < 0.0:
continue
if float(s.sell_price) < PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH:
continue
if not _slot_profitable_battery_export(
s,
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread,
fixed_tariff=fixed_tariff,
):
continue
out.add(i)
return out
def _slot_pv_surplus_w(slot: PlanningSlot) -> float:
load_w = float(slot.load_baseline_w)
pv_w = float(slot.pv_a_forecast_w) + float(slot.pv_b_forecast_w)
return max(0.0, pv_w - load_w)
def _battery_export_push_defer_to_pv(slot: PlanningSlot) -> bool:
"""
Při kladném sell a PV přebytku nevnucovat vývoz z baterie (ge_bat / z_export).
Platí pro ranní pre-neg, večerní push i KV1 odpoledne (block_export + fixní tarif):
přetok řeší ge_pv / Deye PASSIVE, ne BATTERY_SELL.
"""
if float(slot.sell_price) < 0.0:
return False
return _slot_pv_surplus_w(slot) > NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W
def _in_night_battery_export_window(slot: PlanningSlot) -> bool:
"""
Noční okno pro večerní push / peak sell: >=17h Prague, nebo 05h (přes půlnoc).
Končí prvním slotem s významným PV přebytkem (východ FVE), ne kalendářním dnem.
"""
if _slot_pv_surplus_w(slot) > NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W:
return False
h = _prague_hour(slot)
if h >= NIGHT_EXPORT_EVENING_START_HOUR:
return True
return h <= NIGHT_EXPORT_MORNING_END_HOUR
def _in_evening_push_hour_window(slot: PlanningSlot) -> bool:
"""Tvrdý večerní push jen ≥17h Prague — ne noční vývoz ve 0206h (sell < buy)."""
return _prague_hour(slot) >= NIGHT_EXPORT_EVENING_START_HOUR
def _night_export_window_segments(slots: list[PlanningSlot]) -> list[list[int]]:
"""Souvislé úseky nočního okna v horizontu (oddělené denní pauzou / východem FVE)."""
segments: list[list[int]] = []
current: list[int] = []
for t, s in enumerate(slots):
if _in_night_battery_export_window(s):
current.append(t)
else:
if current:
segments.append(current)
current = []
if current:
segments.append(current)
return segments
def _night_peak_sell_czk_kwh(slots: list[PlanningSlot], slot_index: int) -> float:
"""Max sell v nočním úseku, do kterého slot patří (pro evening_early)."""
for seg in _night_export_window_segments(slots):
if slot_index in seg:
return max(float(slots[t].sell_price) for t in seg)
return 0.0
def _evening_peak_export_indices(
slots: list[PlanningSlot],
*,
degrad_czk_kwh: float,
evening_start_hour: int = 17,
) -> list[int]:
"""
Noční špičky sell: jeden peak na souvislý úsek (17h → půlnoc → ráno do východu FVE),
ne per kalendářní den (oprava 23:30 vs 00:00).
"""
_ = evening_start_hour # kompatibilita volání; okno řídí NIGHT_EXPORT_* konstanty
out: list[int] = []
for seg in _night_export_window_segments(slots):
if not seg:
continue
peak = max(float(slots[t].sell_price) for t in seg)
if peak <= 0.0:
continue
for t in seg:
if float(slots[t].sell_price) >= peak - degrad_czk_kwh:
out.append(t)
return sorted(out)
def _planner_discharge_floor_wh(battery: Any) -> float:
"""Provozní podlaha vývoje: reserve_soc (domluva), ne jen min_soc."""
return max(
float(getattr(battery, "min_soc_wh", 0.0)),
float(getattr(battery, "reserve_soc_wh", 0.0)),
)
def _evening_push_discharge_budget_wh(
*,
current_soc_wh: float,
discharge_floor_wh: float,
soc_max_wh: float,
discharge_slot_buffer: float,
) -> float:
"""
Rozpočet Wh pro tvrdý večerní push — stejný princip jako R__063 (discharge_slot_buffer).
Podlaha = reserve_soc (typ. 20 %), ne min_soc (10 %).
"""
exportable_full_wh = max(0.0, float(soc_max_wh) - float(discharge_floor_wh))
available_wh = max(0.0, float(current_soc_wh) - float(discharge_floor_wh))
buf = float(discharge_slot_buffer)
if buf <= 0.0:
return available_wh
return min(available_wh, exportable_full_wh * buf)
def _kv1_block_export_fixed_evening_push(
grid: Any,
*,
purchase_fixed: bool,
) -> bool:
"""KV1: fixní buy + block_export — večerní push jiná profitabilita než acq+spread."""
return purchase_fixed and bool(
getattr(grid, "block_export_on_negative_sell", False)
)
def _slot_evening_push_profitable(
slot: PlanningSlot,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
slots: list[PlanningSlot] | None = None,
first_neg_sell_idx: int | None = None,
kv1_evening_push: bool = False,
purchase_fixed: bool = False,
) -> bool:
"""
Push večerní špičky.
Spot: sell > acq+spread (zásoba z levného nabití).
Fixní tarif (BA81/KV1): sell > buy+spread (stejně jako R__063 discharge maska).
KV1 (fixed + block_export, v52): navíc sell ≥ max sell v pásmu 511 před 1. sell<0 spread.
"""
sell_t = float(slot.sell_price)
if kv1_evening_push:
if sell_t < PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH:
return False
if slots is not None:
zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx)
if zone_peak is not None:
return sell_t >= float(zone_peak) - float(min_spread)
return True
if purchase_fixed:
buy_t = float(slot.buy_price)
if buy_t >= 0.0:
return sell_t > buy_t + float(min_spread)
return sell_t > float(charge_acquisition_czk_kwh) + float(min_spread)
def _evening_push_segment_candidates(
slots: list[PlanningSlot],
seg: list[int],
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
discharge_export_ok: set[int] | None = None,
first_neg_sell_idx: int | None = None,
kv1_evening_push: bool = False,
purchase_fixed: bool = False,
) -> list[int]:
"""Profitable sloty v nočním úseku — výběr pořadí a strop dělá rozpočet Wh (sell desc)."""
if not seg:
return []
out: list[int] = []
for t in seg:
if discharge_export_ok is not None and t not in discharge_export_ok:
continue
if not _in_evening_push_hour_window(slots[t]):
continue
if not _slot_evening_push_profitable(
slots[t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread,
slots=slots,
first_neg_sell_idx=first_neg_sell_idx,
kv1_evening_push=kv1_evening_push,
purchase_fixed=purchase_fixed,
):
continue
out.append(t)
return out
def _strict_late_replan_evening_slot_indices(
slots: list[PlanningSlot],
*,
first_neg_buy_idx: int | None,
observed_soc_wh: float,
reserve_soc_wh: float,
) -> set[int]:
"""
Strict solve: večer D0 (1722h) před dnem s buy<0 — vývoz k reserve, výjimka z pos_sell ge=0.
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return set()
if not any(float(s.buy_price) < 0.0 for s in slots):
return set()
if observed_soc_wh <= float(reserve_soc_wh) + 500.0:
return set()
replan_day = _prague_calendar_date(slots[0])
out: set[int] = set()
for t, s in enumerate(slots):
if _prague_calendar_date(s) != replan_day:
continue
h = _prague_hour(s)
if h < NIGHT_EXPORT_EVENING_START_HOUR or h > 22:
continue
if float(s.sell_price) < 0.0:
continue
out.add(t)
return out
def _strict_late_replan_night_self_consume_indices(
slots: list[PlanningSlot],
*,
evening_export_ts: set[int],
) -> set[int]:
"""Strict: noc po 22h — dům z baterie, ne drahý import (mimo večerní export sloty)."""
out: set[int] = set()
for t, s in enumerate(slots):
if t in evening_export_ts:
continue
if not _in_night_battery_export_window(s):
continue
if float(s.load_baseline_w) <= 0:
continue
if float(s.buy_price) < 0.0:
continue
out.add(t)
return out
def _degraded_relaxed_night_self_consume_indices(
slots: list[PlanningSlot],
) -> set[int]:
"""
relaxed_solver_masks: celé noční okno — dům z baterie (až min_soc), ne import za spot buy.
"""
out: set[int] = set()
for t, s in enumerate(slots):
if not _in_night_battery_export_window(s):
continue
if float(s.load_baseline_w) <= 0:
continue
if float(s.buy_price) < 0.0:
continue
out.add(t)
return out
def _degraded_relaxed_evening_export_to_reserve_indices(
slots: list[PlanningSlot],
*,
observed_soc_wh: float,
reserve_soc_wh: float,
first_neg_buy_idx: int | None,
) -> set[int]:
"""
Nouzový solve: večer D0 smí vývoz bat k reserve_soc před dnem s buy<0 (headroom na zítra).
Jen kalendářní večer 1722h — po 22h už noc (dům z baterie, ne držet kvůli exportu).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return set()
if observed_soc_wh <= float(reserve_soc_wh) + 500.0:
return set()
replan_day = _prague_calendar_date(slots[0])
out: set[int] = set()
for t, s in enumerate(slots):
if _prague_calendar_date(s) != replan_day:
continue
h = _prague_hour(s)
if h < NIGHT_EXPORT_EVENING_START_HOUR or h > 22:
continue
if float(s.sell_price) < 0.0:
continue
out.add(t)
return out
def _post_evening_push_night_self_consume_indices(
slots: list[PlanningSlot],
evening_push_ts: set[int],
) -> set[int]:
"""
Po posledním evening_push daného večera až do rána: dům z baterie, ne import za ~5 Kč.
"""
if not evening_push_ts:
return set()
last_push_by_day: dict[object, int] = {}
for t in evening_push_ts:
last_push_by_day[_prague_calendar_date(slots[t])] = max(
last_push_by_day.get(_prague_calendar_date(slots[t]), -1),
t,
)
out: set[int] = set()
for t, s in enumerate(slots):
day = _prague_calendar_date(s)
t_last = last_push_by_day.get(day)
if t_last is None or t <= t_last:
continue
if t in evening_push_ts:
continue
if not _in_night_battery_export_window(s):
continue
if float(s.buy_price) <= 0.0:
continue
if float(s.load_baseline_w) <= 0:
continue
out.add(t)
return out
def _evening_push_calendar_segments(
slots: list[PlanningSlot],
discharge_export_ok: set[int] | None = None,
) -> list[list[int]]:
"""Kalendářní večery (≥17h) v nočním okně — každý den vlastní push rozpočet."""
by_date: dict[object, list[int]] = {}
for t, s in enumerate(slots):
if not _in_evening_push_hour_window(s):
continue
if not _in_night_battery_export_window(s):
continue
if discharge_export_ok is not None and t not in discharge_export_ok:
continue
by_date.setdefault(_prague_calendar_date(s), []).append(t)
return [sorted(v) for v in by_date.values() if v]
def _primary_night_export_segment_indices(slots: list[PlanningSlot]) -> set[int]:
"""
První noční epizoda v horizontu (17h → půlnoc → do východu FVE), která platí pro
rozpočet Wh z aktuální SoC. Další večery v horizontu (po dni FVE / nabíjení) se
plánují až vlastním rolling replanem — nesdílí dnešní baterii.
"""
segs = _night_export_window_segments(slots)
if not segs:
return set()
for seg in segs:
if 0 in seg:
return set(seg)
return set(segs[0])
def _evening_push_soc_budget_calendar_segments(
slots: list[PlanningSlot],
discharge_export_ok: set[int] | None = None,
) -> list[list[int]]:
"""Kalendářní večery jen v primární noční epizodě — vhodné pro push_budget z current_soc."""
primary = _primary_night_export_segment_indices(slots)
if not primary:
return []
return [
seg
for seg in _evening_push_calendar_segments(slots, discharge_export_ok)
if seg and all(t in primary for t in seg)
]
def _night_self_consume_discourage_import_indices(
slots: list[PlanningSlot],
*,
evening_push_ts: set[int],
charge_acquisition_czk_kwh: float,
min_spread: float,
purchase_fixed: bool = False,
) -> set[int]:
"""
Noční sloty mimo evening_push: penalizace importu pro dům (preferovat bd).
v45: celé noční okno, ne jen evening_early_export_ban subset.
KV1: buy ≈ acq (konstantní ~6,35) — jinak prázdná množina, síť místo baterie v noci.
"""
out: set[int] = set()
for t, s in enumerate(slots):
if t in evening_push_ts:
continue
if not _in_night_battery_export_window(s):
continue
if float(s.load_baseline_w) <= 0:
continue
buy_t = float(s.buy_price)
if purchase_fixed:
if buy_t >= 0.0:
out.add(t)
continue
if buy_t <= float(charge_acquisition_czk_kwh) + float(min_spread):
continue
out.add(t)
return out
def _evening_battery_export_push_indices(
slots: list[PlanningSlot],
*,
charge_acquisition_czk_kwh: float,
degrad_czk_kwh: float,
current_soc_wh: float,
min_soc_wh: float,
soc_max_wh: float,
per_slot_discharge_wh: float,
discharge_slot_buffer: float,
discharge_export_ok: set[int] | None = None,
evening_start_hour: int = 17,
first_neg_sell_idx: int | None = None,
kv1_evening_push: bool = False,
purchase_fixed: bool = False,
) -> list[int]:
"""
Večerní push (≥17h): plný ge_bat v nejdražších slotách (sell desc), rozpočet Wh
z aktuální SoC jen pro **primární noční epizodu** (dnešní večer → ráno).
Zítřejší večer v horizontu se nekrade polovinou budgetu (v43 split) — nabije se
přes den / neg okno; push přidá zítřejší rolling replan.
per_slot_discharge_wh: min(BMS, export cap) × účinnost × 0,25 h.
"""
_ = evening_start_hour # kompatibilita volání
if per_slot_discharge_wh <= 0.0:
return []
push_budget_wh = _evening_push_discharge_budget_wh(
current_soc_wh=current_soc_wh,
discharge_floor_wh=min_soc_wh,
soc_max_wh=soc_max_wh,
discharge_slot_buffer=discharge_slot_buffer,
)
if push_budget_wh < per_slot_discharge_wh * 0.5:
return []
evening_segments = _evening_push_soc_budget_calendar_segments(
slots,
discharge_export_ok=discharge_export_ok,
)
if not evening_segments:
return []
candidates: list[int] = []
seen: set[int] = set()
for seg in evening_segments:
for t in _evening_push_segment_candidates(
slots,
seg,
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=degrad_czk_kwh,
discharge_export_ok=discharge_export_ok,
first_neg_sell_idx=first_neg_sell_idx,
kv1_evening_push=kv1_evening_push,
purchase_fixed=purchase_fixed,
):
if t not in seen:
seen.add(t)
candidates.append(t)
if not candidates:
return []
ranked = sorted(
candidates,
key=lambda i: (float(slots[i].sell_price), -i),
reverse=True,
)
remaining_wh = float(push_budget_wh)
out: list[int] = []
for t in ranked:
if remaining_wh + 1e-6 < per_slot_discharge_wh:
break
out.append(t)
remaining_wh -= per_slot_discharge_wh
return sorted(out)
def _evening_push_peak_fallback_indices(
slots: list[PlanningSlot],
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
discharge_export_ok: set[int] | None,
first_neg_sell_idx: int | None,
kv1_evening_push: bool,
purchase_fixed: bool = False,
) -> set[int]:
"""Alespoň jeden večerní peak slot (sell desc), když rozpočet Wh nevybral žádný push."""
best_t: int | None = None
best_sell = -1.0
for t, s in enumerate(slots):
if discharge_export_ok is not None and t not in discharge_export_ok:
continue
if not _in_evening_push_hour_window(s):
continue
if not _slot_evening_push_profitable(
s,
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread,
slots=slots,
first_neg_sell_idx=first_neg_sell_idx,
kv1_evening_push=kv1_evening_push,
purchase_fixed=purchase_fixed,
):
continue
sell_t = float(s.sell_price)
if sell_t > best_sell:
best_sell = sell_t
best_t = t
return {best_t} if best_t is not None else set()
def _evening_night_peak_sell_czk(slots: list[PlanningSlot]) -> float:
sells = [
float(s.sell_price)
for s in slots
if _in_night_battery_export_window(s) and float(s.sell_price) >= 0.0
]
return max(sells) if sells else 0.0
def _evening_push_peak_sell_czk(slots: list[PlanningSlot], push_ts: set[int]) -> float:
if not push_ts:
return 0.0
return max(float(slots[t].sell_price) for t in push_ts)
def _evening_push_ts_from_iso(slots: list[PlanningSlot], iso_slots: list[str]) -> set[int]:
by_iso = {s.interval_start.isoformat(): t for t, s in enumerate(slots)}
return {by_iso[iso] for iso in iso_slots if iso in by_iso}
def _evening_push_hysteresis_active(
*,
prev_peak_sell_czk: float | None,
new_peak_sell_czk: float,
prev_soc_wh: float | None,
current_soc_wh: float,
usable_capacity_wh: float,
) -> bool:
if prev_peak_sell_czk is None:
return False
if abs(new_peak_sell_czk - float(prev_peak_sell_czk)) >= (
EVENING_PUSH_HYSTERESIS_SELL_PEAK_DELTA_CZK_KWH
):
return False
if prev_soc_wh is not None and usable_capacity_wh > 1e-6:
delta_pct = (
abs(float(current_soc_wh) - float(prev_soc_wh))
/ float(usable_capacity_wh)
* 100.0
)
if delta_pct >= EVENING_PUSH_HYSTERESIS_SOC_PCT:
return False
return True
def _evening_early_export_penalty_indices(
slots: list[PlanningSlot],
*,
discharge_export_slots: set[int],
evening_push_ts: set[int],
exempt_ts: set[int] | None = None,
) -> set[int]:
"""
ge_bat=0 v nočním okně mimo tvrdý evening_push (a mimo pre-neg / neg-evening větve).
"""
exempt = exempt_ts or set()
out: set[int] = set()
for t_ev, s_ev in enumerate(slots):
if not _in_night_battery_export_window(s_ev):
continue
if t_ev not in discharge_export_slots:
continue
if t_ev in evening_push_ts or t_ev in exempt:
continue
out.add(t_ev)
return out
def _last_non_negative_sell_before_neg_buy(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
) -> int | None:
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return None
candidates = [
i for i in range(first_neg_buy_idx) if float(slots[i].sell_price) >= 0.0
]
return max(candidates) if candidates else None
def _positive_sell_pre_neg_buy_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
) -> list[int]:
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return []
return [
t
for t in range(first_neg_buy_idx)
if float(slots[t].sell_price) >= 0.0
]
def _pre_neg_buy_empty_discharge_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
last_pos_sell_idx: int | None,
) -> list[int]:
"""Sloty mezi posledním sell≥0 a prvním buy<0 — vyprázdnit před levným importem."""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return []
if last_pos_sell_idx is None:
return []
start = last_pos_sell_idx + 1
end = first_neg_buy_idx - 1
if start > end:
return []
return list(range(start, end + 1))
def _pre_neg_buy_soc_ceiling_wh(
slots: list[PlanningSlot],
*,
first_neg_buy_idx: int | None,
min_soc_wh: float,
soc_max_wh: float,
max_charge_w: float,
charge_eff: float,
evening_start_hour: int = 17,
) -> float | None:
"""
Horní SoC těsně před prvním buy<0: pod soc_max musí vejít import v buy<0,
PV B v tom okně a rezerva na odpolední sell<0 (stejný den, před večerem).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return None
per_slot_chg = max(0.0, float(max_charge_w) * float(charge_eff) * INTERVAL_H)
neg_buy_ts = [t for t, s in enumerate(slots) if float(s.buy_price) < 0.0]
if not neg_buy_ts:
return None
last_neg_buy = max(neg_buy_ts)
neg_day = _prague_calendar_date(slots[first_neg_buy_idx])
grid_wh = len(neg_buy_ts) * per_slot_chg
pv_b_wh = 0.0
for t in neg_buy_ts:
s = slots[t]
sur = max(
0.0,
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - float(s.load_baseline_w),
)
pv_b_wh += min(sur, float(max_charge_w)) * float(charge_eff) * INTERVAL_H
post_wh = 0.0
for t in range(last_neg_buy + 1, len(slots)):
s = slots[t]
if _prague_calendar_date(s) != neg_day:
continue
if float(s.buy_price) < 0.0:
continue
if float(s.sell_price) >= 0.0:
break
if _prague_hour(s) >= evening_start_hour:
break
sur = max(0.0, float(s.pv_b_forecast_w) - float(s.load_baseline_w) * 0.25)
post_wh += min(sur, float(max_charge_w)) * float(charge_eff) * INTERVAL_H
buffer_wh = max(per_slot_chg * 2.0, 3000.0)
needed = grid_wh + pv_b_wh + post_wh + buffer_wh
ceiling = float(soc_max_wh) - needed
floor = float(min_soc_wh) + max(per_slot_chg, 1000.0)
return max(floor, min(float(soc_max_wh) - per_slot_chg, ceiling))
def _planner_soc_for_solver(
current_soc_wh: float,
battery,
) -> tuple[float, float | None]:
"""
SoC pro MILP. Při telemetrii na soc_max a dlouhém sell<0 s vysokou FVE bez rezervy pod stropem
je model neřešitelný (nelze nabít / odvést přebytek). Necháme min. ~650 Wh pod soc_max.
"""
soc_max = float(battery.soc_max_wh)
soc_min = float(battery.min_soc_wh)
soc = max(soc_min, min(float(current_soc_wh), soc_max))
charge_slot_wh = (
float(battery.max_charge_power_w)
* INTERVAL_H
/ max(float(battery.charge_efficiency), 1e-6)
)
headroom = max(650.0, 0.382 * charge_slot_wh)
if soc > soc_max - headroom:
return max(soc_min, soc_max - headroom), headroom
return soc, None
def _pv_forced_vent_export_allowed(
t: int,
*,
current_soc_wh: float,
battery,
soc_headroom_wh: float,
pv_surplus_w: float,
) -> bool:
"""Přebytek FVE do sítě jen když baterie na konci předchozího slotu nemá kapacitu."""
if pv_surplus_w <= 0:
return False
if t == 0:
return current_soc_wh >= float(battery.soc_max_wh) - soc_headroom_wh
return False
def _relax_solver_slot_masks(slots: list[PlanningSlot]) -> list[PlanningSlot]:
"""Nouzově permissivní allow_* — SQL masky nesmí učinit LP neřešitelným."""
return [
replace(
s,
allow_charge=True,
allow_discharge_export=float(s.sell_price) >= 0.0,
)
for s in slots
]
def _unlock_late_replan_evening_slots(
slots: list[PlanningSlot],
*,
current_soc_wh: float,
reserve_soc_wh: float,
) -> None:
"""Pozdní replan: večer D0 povolit grid import + export (SQL allow_charge často false)."""
if not slots or current_soc_wh <= float(reserve_soc_wh) + 500.0:
return
if not any(float(s.buy_price) < 0.0 for s in slots):
return
replan_day = _prague_calendar_date(slots[0])
unlocked = 0
for i, s in enumerate(slots):
if _prague_calendar_date(s) != replan_day:
continue
if float(s.sell_price) < 0.0:
continue
if not _in_evening_push_hour_window(s):
continue
if s.allow_charge and s.allow_discharge_export:
continue
slots[i] = replace(s, allow_charge=True, allow_discharge_export=True)
unlocked += 1
if unlocked:
logger.info(
"Late replan: unlocked evening slot masks on %d slot(s) (soc=%.0f Wh)",
unlocked,
float(current_soc_wh),
)
def _evening_push_override_for_solve(
evening_push_ts_override: Optional[set[int]],
*,
relaxed_expensive_import: bool,
relaxed_neg_buy_charge: bool,
relaxed_neg_prep_hold_only: bool,
relaxed_neg_prep_window: bool,
neg_sell_phases_fallback: bool,
relaxed_pos_sell_ge_block: bool = False,
relaxed_solver_masks: bool = False,
) -> Optional[set[int]]:
"""Po Infeasible nesmí retry držet hysterézní push z minulého běhu."""
if evening_push_ts_override is None:
return None
if (
relaxed_expensive_import
or relaxed_neg_buy_charge
or relaxed_neg_prep_hold_only
or relaxed_neg_prep_window
or neg_sell_phases_fallback
or relaxed_pos_sell_ge_block
or relaxed_solver_masks
):
return None
return set(evening_push_ts_override)
def _filter_evening_push_override_indices(
slots: list[PlanningSlot],
override_ts: set[int],
*,
battery: Any,
grid: Any,
discharge_export_ok: set[int] | None,
) -> set[int]:
"""Hysterézní push jen na sloty, kde dnes smí a dává smysl tvrdý ge_bat push."""
out: set[int] = set()
for t in override_ts:
if t < 0 or t >= len(slots):
continue
if discharge_export_ok is not None and t not in discharge_export_ok:
continue
if _battery_export_push_defer_to_pv(slots[t]):
continue
push_floor_w = _evening_push_battery_export_w(slots[t], battery, grid)
if push_floor_w < GE_MIN_EXPORT_W:
continue
out.add(t)
return out