Files
ems/backend/services/planning/solver_v2.py
Dusan Vojacek a32839bf67 feat(planner): EV anti-fragmentace + 3f power floor (Fix B)
3f floor (phases>=3 → 6A×fáze×230 ≈4140W, ruší 1f trickle) + block-start penalta
(asset_ev_charger.planner_ev_start_penalty_czk V108, default 0=no-op). Golden gate
zelená (363 passed). Postaveno paralelním worktree agentem, zvalidováno sériově.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 22:55:17 +02:00

624 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning/solver_v2.py
#
# EMS plánovač v2 — ČISTÉ ekonomické jádro (Fáze 3).
#
# Filozofie: objective = reálné peníze (nákup prodej + degradace terminal
# hodnota energie). Žádné heuristické penalty z constants.py, žádné pre-solver
# fáze/okna/kotvy. Chování (neg-sell příprava, evening export, arbitráž) má
# VYPLYNOUT z cen a fyziky, ne z ručně laděných vah.
#
# Co zůstává (tvrdá pravidla — fyzika, HW, CLAUDE.md):
# - bilance sběrnice, SoC dynamika s účinnostmi, výkonové stropy
# - curtailment jen pole A (pravidlo 5); GEN cutoff binárka pole B (pravidlo 6)
# - block_export_on_negative_sell → ge == 0 při sell < 0 (pravidlo 6, KV1)
# - buy < 0 → ge == 0 (žádná pumpa importexport přes jeden elektroměr; import
# je omezen breakerem — pravidlo 7)
# - export z BATERIE ⇒ koncové SoC ≥ arb floor (pravidlo 19; PV export floor nevynucuje)
# - zákaz současného importu a exportu (binárka)
# - load-first Deye: bc_pv + ge_pv jen z PV přebytku nad zátěží
# - EV deadline, TUV look-ahead, provozní režimy (legitimní constraints)
# - noční SoC polštář: plán nesmí kalkulovat s vybitím až na min_soc — chyba
# predikce noční spotřeby by znamenala neplánovaný noční nákup. Velikost
# z DB (planner_night_baseload_buffer_percent → slot.night_baseload_buffer_wh,
# klesá k 0 do rána); porušení je PLACENÉ cenou buy daného slotu (riziko
# zpětného nákupu), takže extrémní sell špička ho smí racionálně prodat.
# - PV-risk front-load: v okně sell<0 je nabíjení z PV zdarma kdykoliv →
# indiference v čase; odložení ale spoléhá na predikci (večerní mrak).
# Malá prémie za držení energie dřív (DB planner_pv_risk_frontload_czk_kwh)
# vede k "nabít plným výkonem hned, pak řezat A" — emergentně, bez rampy.
# - oportunistické EV („měkký cíl"): nad tvrdý target smí auto vzít až
# headroom_wh (do 100 %), oceněno opportunistic_value_czk_kwh (= budoucí
# ušetřené nabíjení, session override → vozidlo, DB) — kupuje jen velmi
# levnou/zápornou energii. Dekompozice Σ(EV energie) == needed unmet + opp
# zároveň stropuje celkovou energii do auta (dřív při buy<0 bez stropu);
# opp vrstva NENÍ vázaná deadline (auto bývá doma dál, odjezd řeší rolling
# replan); bez session je EV == 0 (stop-session). Deadline suma jde po
# slot PŘED deadline (slot začínající v deadline už nepatří „do deadline").
# - min. výkon wallboxu (asset_ev_charger.min_power_w, 6 A ≈ 1380 W):
# binárka ev_on → setpoint ∈ {0} [min_power_w, max]; ev_direct ≤ gi + PV
# (fyzikální split direct/via_bat). Reporting: kWh přes ev_via_bat plní
# battery_arbitrage_czk oportunitní cenou (min sell exportního slotu dne,
# jinak terminal value) — slotový buy pro ně neplatí. U TŘÍFÁZOVÉHO wallboxu
# (asset_ev_charger.phases ≥ 3) je floor zvednut na 6 A × fáze × 230 V (≈ 4140
# W pro 3f) místo 1f ~1380 W → ruší sub-6A 1f trickle drobky (cap = max výkon
# vozidla). Fáze/min jdou z DB přes vehicle kontext (R__039).
# - anti-fragmentace EV (Fix B): per-slot binárka ev_on (vždy při floor NEBO
# start penaltě) + hrana ev_start[t] ≥ ev_on[t] ev_on[t1]; objektiv +=
# Σ ev_start × asset_ev_charger.planner_ev_start_penalty_czk (Kč). Drobná
# penalta (filozofie v2: nejistota/opotřebení = cena, ne tvrdá priorita) →
# souvislá dávka místo rozsekání. Default 0 = no-op (golden-safe).
# - denní SoC rampa: deficit pod slot.safety_soc_target_wh (R__063: reserve →
# reserve+noc, 619 h) platí za slot nájem buy×faktor (DB
# planner_safety_soc_risk_factor) — ráno se nejdřív dotáhne rezerva
# (nenadálý odběr by se kupoval draho), pak se prodává.
#
# Vědomé odchylky od v1 (změří harness):
# - SQL masky allow_charge / allow_discharge_export se IGNORUJÍ (jsou to
# výstupy charge-slot-budget heuristik, ne fyzika)
# - EV náklady jen přes bilanci (v1 je účtuje navíc v objective — dvojí započtení)
# - import breaker je tvrdý strop (v1 měkký s 10 Kč/kWh)
# - nedodaná EV energie má explicitní cenu místo infeasibility
from __future__ import annotations
import logging
import time
from typing import Any, Optional
import pulp
from services.planning.constants import (
EV_MIN_CHARGE_CURRENT_A,
EV_MULTIPHASE_FLOOR_MIN_PHASES,
EV_PHASE_VOLTAGE_V,
INTERVAL_H,
SOLVER_TIME_LIMIT,
)
from services.planning.types import (
DispatchResult,
PlanningSlot,
_prague_calendar_date,
_prague_dow_hour,
)
from services.planning.heuristics import _dispatch_grid_setpoint_w
logger = logging.getLogger(__name__)
V2_BUILD_TAG = "v2-ev-accounting-2026-06-12"
# Cena za vypnutí GEN portu (mikroinvertory pole B): reálné riziko/opotřebení
# cyklování stykače — drobná, ale nenulová, aby cutoff platil jen při sell < 0.
V2_GEN_CUTOFF_CZK_KWH = 2.0
# SELF_SUSTAIN: export je nežádoucí, ale tvrdé ge=0 by s neřiditelným polem B
# a plnou baterií bylo infeasible — vysoká cena funguje jako ventil.
V2_SELF_SUSTAIN_EXPORT_CZK_KWH = 100.0
# Cena nedodané EV energie do deadline (Kč/kWh) — místo tvrdé infeasibility.
V2_EV_UNMET_CZK_KWH = 50.0
# Nepatrný tie-break proti zbytečnému curtailu při cenové indiferenci (Kč/kWh).
V2_CURTAIL_TIEBREAK_CZK_KWH = 0.001
def _terminal_value_czk_per_wh(slots: list[PlanningSlot], battery: Any) -> float:
"""Shadow cena zbytkové energie: průměrný buy prvních 24 h × DB faktor (pravidlo 16)."""
n24 = min(len(slots), int(24 / INTERVAL_H))
avg_buy = sum(float(s.buy_price) for s in slots[:n24]) / max(1, n24)
factor = float(getattr(battery, "planner_terminal_soc_value_factor", 1.0) or 1.0)
return max(0.0, avg_buy) * factor / 1000.0
def _arb_floor_wh(battery: Any) -> float:
"""Podlaha SoC pro export z baterie (pravidlo 19): ekonomická rezerva z DB."""
floor = getattr(battery, "arb_floor_wh", None)
if floor is None:
floor = getattr(battery, "reserve_soc_wh", None)
return max(float(floor or 0.0), float(battery.min_soc_wh))
def solve_dispatch_v2(
slots: list[PlanningSlot],
battery: Any,
heat_pump: Any,
grid: Any,
ev_sessions: list,
vehicles: list,
current_soc_wh: float,
current_tuv_temp_c: float,
*,
tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None,
operating_mode: str = "AUTO",
planner_version: str | None = None,
) -> tuple[list[DispatchResult], int, dict[str, Any]]:
"""Čistý ekonomický MILP; rozhraní kompatibilní se solve_dispatch (v1)."""
if not slots:
raise RuntimeError("solve_dispatch_v2 requires at least one slot")
t0 = time.monotonic()
T = len(slots)
om = (operating_mode or "AUTO").upper()
EV = min(len(vehicles), 2)
max_imp = float(grid.max_import_power_w)
max_exp = float(grid.max_export_power_w)
max_chg = float(battery.max_charge_power_w)
max_dis = float(battery.max_discharge_power_w)
eff_c = float(battery.charge_efficiency)
eff_d = float(battery.discharge_efficiency)
deg = float(battery.degradation_cost_czk_kwh)
soc_min = float(battery.min_soc_wh)
soc_max = float(battery.soc_max_wh)
usable = float(battery.usable_capacity_wh)
arb_floor = _arb_floor_wh(battery)
terminal = _terminal_value_czk_per_wh(slots, battery)
block_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False))
gen_cutoff_avail = bool(getattr(grid, "deye_gen_microinverter_cutoff_enabled", False))
soc0 = min(max(float(current_soc_wh), soc_min), soc_max)
prob = pulp.LpProblem("dispatch_v2", pulp.LpMinimize)
gi = [pulp.LpVariable(f"gi_{t}", 0, max_imp) for t in range(T)]
ge_pv = [pulp.LpVariable(f"gepv_{t}", 0, max_exp) for t in range(T)]
ge_bat = [pulp.LpVariable(f"gebat_{t}", 0, max_exp) for t in range(T)]
bc_pv = [pulp.LpVariable(f"bcpv_{t}", 0, max_chg) for t in range(T)]
bc_gi = [pulp.LpVariable(f"bcgi_{t}", 0, max_chg) for t in range(T)]
bd = [pulp.LpVariable(f"bd_{t}", 0, max_dis) for t in range(T)]
ca = [pulp.LpVariable(f"ca_{t}", 0, max(0, int(slots[t].pv_a_forecast_w))) for t in range(T)]
soc = [pulp.LpVariable(f"soc_{t}", soc_min, soc_max) for t in range(T)]
hp = [pulp.LpVariable(f"hp_{t}", 0, float(heat_pump.rated_heating_power_w)) for t in range(T)]
y_imp = [pulp.LpVariable(f"yimp_{t}", cat=pulp.LpBinary) for t in range(T)]
z_exp = [pulp.LpVariable(f"zexp_{t}", cat=pulp.LpBinary) for t in range(T)]
z_gen = (
[pulp.LpVariable(f"zgen_{t}", cat=pulp.LpBinary) for t in range(T)]
if gen_cutoff_avail
else None
)
ev_direct = [
[
pulp.LpVariable(f"evd_{e}_{t}", 0, min(float(vehicles[e].max_charge_power_w), max_imp))
for t in range(T)
]
for e in range(EV)
]
ev_via_bat = [
[
pulp.LpVariable(f"evb_{e}_{t}", 0, float(vehicles[e].max_charge_power_w))
for t in range(T)
]
for e in range(EV)
]
ev_unmet: list = [] # slack Wh per session (cena V2_EV_UNMET_CZK_KWH)
ev_opp: list = [] # (var, value_czk_kwh) — energie nad target (měkký cíl)
ev_start_terms: list = [] # (ev_start var, penalta Kč) — anti-fragmentace (Fix B)
def _ev_min_power_w(e: int) -> float:
"""Dolní mez nabíjecí dávky (W): u 3f wallboxu fyzikální 6 A × fáze × napětí
(≈ 4140 W) místo 1f ~1380 W → zruší sub-6A 1f trickle. Stropuje se max
výkonem vozidla (jinak by připojený slot byl infeasible). Bez spolehlivého
počtu fází padá zpět na min_power_w z DB."""
veh = vehicles[e]
base_min = max(0.0, float(getattr(veh, "min_power_w", 0) or 0))
phases = int(getattr(veh, "phases", 0) or 0)
ev_max = float(veh.max_charge_power_w)
if phases >= EV_MULTIPHASE_FLOOR_MIN_PHASES:
floor = EV_MIN_CHARGE_CURRENT_A * phases * EV_PHASE_VOLTAGE_V
base_min = max(base_min, floor)
# strop max výkonem vozidla — floor nesmí překročit, co auto/wallbox umí
if ev_max > 0:
base_min = min(base_min, ev_max)
return base_min
def _ev_start_penalty_czk(e: int) -> float:
return max(0.0, float(getattr(vehicles[e], "planner_ev_start_penalty_czk", 0.0) or 0.0))
ev_min_w = [_ev_min_power_w(e) for e in range(EV)]
ev_start_pen = [_ev_start_penalty_czk(e) for e in range(EV)]
# ev_on[e][t]: zapnutost wallboxu v slotu. Vždy potřeba, pokud platí min-power
# floor (gate) NEBO start penalta (anti-fragmentace). ev_start[e][t]: náběžná
# hrana ev_on (start nové dávky) — jen když je start penalta > 0 (jinak žádný
# extra MILP balast a default 0 = no-op, golden-safe).
ev_needs_on = [(ev_min_w[e] > 0.0) or (ev_start_pen[e] > 0.0) for e in range(EV)]
ev_on = [
[
pulp.LpVariable(f"evon_{e}_{t}", cat=pulp.LpBinary)
for t in range(T)
]
if ev_needs_on[e]
else None
for e in range(EV)
]
ev_start = [
[
pulp.LpVariable(f"evstart_{e}_{t}", 0, 1)
for t in range(T)
]
if ev_start_pen[e] > 0.0
else None
for e in range(EV)
]
nb_buffer_wh = [max(0.0, float(s.night_baseload_buffer_wh or 0.0)) for s in slots]
safety_risk = float(getattr(battery, "planner_safety_soc_risk_factor", 0.0) or 0.0)
safety_tgt_wh = [
min(soc_max, max(0.0, float(s.safety_soc_target_wh or 0.0)))
if safety_risk > 0 else 0.0
for s in slots
]
ds_slack = [
pulp.LpVariable(f"dss_{t}", 0, soc_max) if safety_tgt_wh[t] > 0 else None
for t in range(T)
]
nb_slack = [
pulp.LpVariable(f"nbs_{t}", 0, nb_buffer_wh[t]) if nb_buffer_wh[t] > 0 else None
for t in range(T)
]
def _connected(e: int, t: int) -> bool:
return bool(slots[t].ev1_connected if e == 0 else slots[t].ev2_connected)
for t in range(T):
s = slots[t]
pv_a = max(0.0, float(s.pv_a_forecast_w))
pv_b = max(0.0, float(s.pv_b_forecast_w))
pv_a_net = pv_a - ca[t]
pv_b_eff = pv_b - (pv_b * z_gen[t] if z_gen is not None else 0.0)
ev_total_t = pulp.lpSum(
ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV)
)
load_site = float(s.load_baseline_w) + ev_total_t + hp[t]
# bilance sběrnice (W)
prob += (
pv_a_net + pv_b_eff + gi[t] + bd[t]
== load_site + bc_pv[t] + bc_gi[t] + ge_pv[t] + ge_bat[t]
), f"balance_{t}"
# SoC dynamika (Wh)
prev = soc0 if t == 0 else soc[t - 1]
prob += (
soc[t]
== prev
+ (bc_pv[t] + bc_gi[t]) * eff_c * INTERVAL_H
- bd[t] / eff_d * INTERVAL_H
), f"soc_{t}"
# výkonové stropy
prob += bc_pv[t] + bc_gi[t] <= max_chg, f"chg_cap_{t}"
prob += ge_pv[t] + ge_bat[t] <= max_exp, f"exp_cap_{t}"
# PV cesty omezené dostupnou výrobou (load-first vynucuje HW; bilance účtuje energii)
prob += bc_pv[t] + ge_pv[t] <= pv_a_net + pv_b_eff, f"pv_src_{t}"
# bc_gi jen ze sítě:
prob += bc_gi[t] <= gi[t], f"bcgi_src_{t}"
# vybíjení kryje dům + EV-via-bat + export z baterie
prob += ge_bat[t] + pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t], f"bd_split_{t}"
# ev_direct fyzicky jen ze sítě + PV (ne z baterie) — split direct/via_bat
# není arbitrární, ekonomiku nemění (bilance platí stejně)
prob += (
pulp.lpSum(ev_direct[e][t] for e in range(EV)) <= gi[t] + pv_a_net + pv_b_eff
), f"evd_src_{t}"
# zákaz současného importu a exportu
prob += gi[t] <= max_imp * y_imp[t], f"imp_excl_{t}"
prob += ge_pv[t] + ge_bat[t] <= max_exp * (1 - y_imp[t]), f"exp_excl_{t}"
# pravidlo 19: export z baterie ⇒ SoC ≥ arb floor
prob += ge_bat[t] <= max_exp * z_exp[t], f"zexp_link_{t}"
prob += soc[t] >= arb_floor - (soc_max - soc_min) * (1 - z_exp[t]), f"zexp_floor_{t}"
# noční SoC polštář (viz hlavička): soft floor nad min_soc
if nb_slack[t] is not None:
prob += soc[t] >= soc_min + nb_buffer_wh[t] - nb_slack[t], f"night_buf_{t}"
# denní SoC rampa (viz hlavička): soft floor k safety targetu
if ds_slack[t] is not None:
prob += soc[t] >= safety_tgt_wh[t] - ds_slack[t], f"day_safety_{t}"
# tvrdá cenová pravidla
if float(s.buy_price) < 0.0:
prob += ge_pv[t] + ge_bat[t] == 0, f"neg_buy_noexp_{t}"
if float(s.sell_price) < 0.0 and block_neg_sell:
prob += ge_pv[t] + ge_bat[t] == 0, f"neg_sell_block_{t}"
# EV dostupnost + min. výkon wallboxu (binárka ev_on) + start hrana.
# ev_on existuje, když platí min-power floor NEBO start penalta.
for e in range(EV):
on_t = ev_on[e][t] if ev_on[e] is not None else None
if not _connected(e, t):
prob += ev_direct[e][t] == 0
prob += ev_via_bat[e][t] == 0
if on_t is not None:
prob += on_t == 0, f"ev_off_{e}_{t}"
else:
ev_max_w = float(vehicles[e].max_charge_power_w)
ev_total = ev_direct[e][t] + ev_via_bat[e][t]
if on_t is not None and ev_max_w > 0:
# on=1 nutné kdykoli ev_total > 0 (start penalta i floor to potřebují)
prob += ev_total <= ev_max_w * on_t, f"ev_max_{e}_{t}"
if 0 < ev_min_w[e] <= ev_max_w:
prob += ev_total >= ev_min_w[e] * on_t, f"ev_min_{e}_{t}"
else:
prob += ev_total <= ev_max_w
# start = náběžná hrana ev_on (≥ on[t] on[t1]); slot 0 startuje vždy,
# když je on (žádný předchozí stav v horizontu).
if ev_start[e] is not None and on_t is not None:
prev_on = ev_on[e][t - 1] if t > 0 else 0
prob += ev_start[e][t] >= on_t - prev_on, f"ev_start_{e}_{t}"
# provozní režimy (tvrdé constraints dle operating-modes.md)
if om == "SELF_SUSTAIN":
prob += gi[t] <= float(s.load_baseline_w), f"ss_gi_{t}"
elif om == "PRESERVE":
prob += bc_pv[t] == 0
prob += bc_gi[t] == 0
prob += bd[t] == 0
elif om == "CHARGE_CHEAP":
prob += ge_pv[t] + ge_bat[t] == 0
prob += bd[t] == 0
# EV deadline (s placeným slackem místo infeasibility) + měkký cíl.
# Bez session není mandát nabíjet: připojené auto bez session (stop-session,
# golden fixtures s vynulovanými sessions) nesmí při buy<0 „pumpovat" energii.
for e in range(EV):
sess = ev_sessions[e] if e < len(ev_sessions) else None
if sess is None:
for t in range(T):
if _connected(e, t):
prob += ev_direct[e][t] == 0, f"ev_nosess_d_{e}_{t}"
prob += ev_via_bat[e][t] == 0, f"ev_nosess_b_{e}_{t}"
continue
needed = max(0.0, float(getattr(sess, "energy_needed_wh", 0.0) or 0.0))
unmet = pulp.LpVariable(f"ev_unmet_{e}", 0, needed)
ev_unmet.append(unmet)
if needed > 0:
# první slot s interval_start >= deadline už do deadline NEPATŘÍ
# (slot [deadline, deadline+15min) dodává energii až po odjezdu)
t_dl = next(
(t for t in range(T) if slots[t].interval_start >= sess.target_deadline),
T,
)
prob += (
pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(t_dl)
if _connected(e, t)
)
+ unmet
>= needed
), f"ev_deadline_{e}"
# měkký cíl: dekompozice celkové energie == needed unmet + opp.
# Oportunistická vrstva NENÍ omezená deadline — auto bývá doma dál,
# odjezd řeší rolling replan (rozhodnutí 2026-06-12).
headroom = max(0.0, float(getattr(sess, "headroom_wh", 0.0) or 0.0))
opp_val = float(getattr(sess, "opportunistic_value_czk_kwh", 0.0) or 0.0)
opp = pulp.LpVariable(f"ev_opp_{e}", 0, headroom if opp_val > 0 else 0.0)
ev_opp.append((opp, opp_val))
prob += (
pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(T)
if _connected(e, t)
)
== needed - unmet + opp
), f"ev_total_{e}"
# TUV look-ahead (převzato z v1 — komfortní constraint, ne heuristika)
rated_hp = float(heat_pump.rated_heating_power_w)
if tuv_delta_stats and rated_hp > 0 and getattr(heat_pump, "tuv_min_temp_c", None):
tuv_pred = float(current_tuv_temp_c)
tgt = float(getattr(heat_pump, "tuv_target_temp_c", 55.0) or 55.0)
thr = float(heat_pump.tuv_min_temp_c) + 5.0
for t in range(T):
dow, hour = _prague_dow_hour(slots[t].interval_start)
delta = tuv_delta_stats.get((dow, hour), -0.1)
tuv_pred += float(delta) * INTERVAL_H
if tuv_pred < thr:
prob += (
pulp.lpSum(hp[s_] for s_ in range(max(0, t - 8), t + 1))
>= rated_hp * 0.5
), f"tuv_heat_{t}"
tuv_pred = tgt
if float(current_tuv_temp_c) < float(heat_pump.tuv_min_temp_c):
prob += hp[0] >= rated_hp * 0.8, "tuv_emergency"
# ---------------- objective: jen reálné peníze ----------------
wh = INTERVAL_H / 1000.0 # W → kWh za slot
cash = pulp.lpSum(
gi[t] * float(slots[t].buy_price) * wh
- (ge_pv[t] + ge_bat[t]) * float(slots[t].sell_price) * wh
for t in range(T)
)
degradation = pulp.lpSum(
0.5 * (bc_pv[t] + bc_gi[t] + bd[t]) * deg * wh for t in range(T)
)
extras = pulp.lpSum(ca[t] * V2_CURTAIL_TIEBREAK_CZK_KWH * wh for t in range(T))
if z_gen is not None:
extras += pulp.lpSum(
max(0.0, float(slots[t].pv_b_forecast_w)) * z_gen[t] * V2_GEN_CUTOFF_CZK_KWH * wh
for t in range(T)
)
if om == "SELF_SUSTAIN":
extras += pulp.lpSum(
(ge_pv[t] + ge_bat[t]) * V2_SELF_SUSTAIN_EXPORT_CZK_KWH * wh for t in range(T)
)
if ev_unmet:
extras += pulp.lpSum(u * V2_EV_UNMET_CZK_KWH / 1000.0 for u in ev_unmet)
if ev_opp:
extras -= pulp.lpSum(o / 1000.0 * val for o, val in ev_opp if val > 0)
# anti-fragmentace EV (Fix B): Σ ev_start × start_penalta (Kč). Default 0 → no-op.
ev_start_terms = [
ev_start[e][t] * ev_start_pen[e]
for e in range(EV)
if ev_start[e] is not None and ev_start_pen[e] > 0.0
for t in range(T)
]
if ev_start_terms:
extras += pulp.lpSum(ev_start_terms)
nb_terms = [
nb_slack[t] / 1000.0 * max(0.0, float(slots[t].buy_price))
for t in range(T)
if nb_slack[t] is not None
]
if nb_terms:
extras += pulp.lpSum(nb_terms)
ds_terms = [
ds_slack[t] / 1000.0 * max(0.0, float(slots[t].buy_price)) * safety_risk
for t in range(T)
if ds_slack[t] is not None
]
if ds_terms:
extras += pulp.lpSum(ds_terms)
frontload = float(getattr(battery, "planner_pv_risk_frontload_czk_kwh", 0.0) or 0.0)
neg_idx = [t for t in range(T) if float(slots[t].sell_price) < 0.0]
if frontload > 0 and neg_idx:
# odměna za soc[t] v neg slotech = dřívější nabití vyhrává při indiferenci
extras -= pulp.lpSum(soc[t] / 1000.0 * frontload for t in neg_idx)
prob += cash + degradation + extras - terminal * soc[T - 1]
solver = (
pulp.HiGHS_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT)
if pulp.HiGHS_CMD().available()
else pulp.PULP_CBC_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT)
)
status = prob.solve(solver)
duration_ms = int((time.monotonic() - t0) * 1000)
status_str = pulp.LpStatus[status]
if status_str != "Optimal":
# v2 nemá relax řetězec — model je navržen tak, aby byl feasible
# (placené slacky místo tvrdých kotev). Ne-Optimal je skutečná chyba.
raise RuntimeError(f"solver_v2: {status_str}")
# ---------------- DispatchResult assembly (parita s v1) ----------------
def _val(var) -> float:
v = pulp.value(var)
return float(v) if v is not None else 0.0
# Reporting EV-via-bat: kWh do auta z baterie neplatí slotový buy (jdou
# z baterie), ale ušlou příležitost. Aproximace oportunitní ceny: nejnižší
# sell slotu, kde plán exportuje, v témže pražském dni; bez exportu ten den
# terminal value (Kč/kWh). Plní battery_arbitrage_czk (dřív konstantní 0).
day_min_export_sell: dict[Any, float] = {}
for t in range(T):
if _val(ge_pv[t]) + _val(ge_bat[t]) >= 1.0:
d_key = _prague_calendar_date(slots[t])
sp = float(slots[t].sell_price)
if d_key not in day_min_export_sell or sp < day_min_export_sell[d_key]:
day_min_export_sell[d_key] = sp
results: list[DispatchResult] = []
for t in range(T):
s = slots[t]
via1_w = _val(ev_via_bat[0][t]) if EV > 0 else 0.0
via2_w = _val(ev_via_bat[1][t]) if EV > 1 else 0.0
via_kwh = (via1_w + via2_w) * wh
if via_kwh > 1e-9:
opp_price = max(
0.0,
day_min_export_sell.get(_prague_calendar_date(s), terminal * 1000.0),
)
arb_czk = via_kwh * opp_price
else:
arb_czk = 0.0
bc_tot = _val(bc_pv[t]) + _val(bc_gi[t])
bd_v = _val(bd[t])
batt_w = round(bc_tot - bd_v)
ge_pv_w = round(_val(ge_pv[t]))
ge_bat_w = round(_val(ge_bat[t]))
gi_w = _val(gi[t])
ge_w = float(ge_pv_w + ge_bat_w)
grid_w, export_mode = _dispatch_grid_setpoint_w(
gi_w=gi_w,
ge_w=ge_w,
ge_bat_w=float(ge_bat_w),
ge_pv_w=float(ge_pv_w),
max_export_power_w=int(max_exp),
)
if batt_w < 0 and grid_w < 0:
deye_mode = "SELL"
elif batt_w > 0 and grid_w > 0:
deye_mode = "CHARGE"
else:
deye_mode = "PASSIVE"
gen_cut = bool(round(_val(z_gen[t]))) if z_gen is not None else None
hp_v = _val(hp[t])
hp_on = hp_v > rated_hp * 0.5 if rated_hp > 0 else False
cash_t = gi_w * float(s.buy_price) * wh - ge_w * float(s.sell_price) * wh
pen_t = 0.0
if gen_cut:
pen_t += max(0.0, float(s.pv_b_forecast_w)) * V2_GEN_CUTOFF_CZK_KWH * wh
results.append(
DispatchResult(
interval_start=s.interval_start,
battery_setpoint_w=batt_w,
battery_soc_target=round(_val(soc[t]) / usable * 100.0, 2),
grid_setpoint_w=grid_w,
export_limit_w=int(max_exp) if grid_w < 0 else 0,
export_mode=export_mode,
deye_physical_mode=deye_mode,
deye_gen_cutoff_enabled=gen_cut,
ev1_setpoint_w=(
round(_val(ev_direct[0][t]) + _val(ev_via_bat[0][t]))
if EV > 0 and s.ev1_connected
else None
),
ev2_setpoint_w=(
round(_val(ev_direct[1][t]) + _val(ev_via_bat[1][t]))
if EV > 1 and s.ev2_connected
else None
),
ev1_via_bat_w=round(via1_w),
ev2_via_bat_w=round(via2_w),
heat_pump_enabled=hp_on,
heat_pump_setpoint_w=int(rated_hp) if hp_on else 0,
pv_a_curtailed_w=round(_val(ca[t])),
expected_cost_czk=round(cash_t, 4),
effective_buy_price=float(s.buy_price),
effective_sell_price=float(s.sell_price),
is_predicted_price=bool(s.is_predicted_price),
cashflow_czk=round(cash_t, 4),
battery_arbitrage_czk=round(arb_czk, 4),
penalty_czk=round(pen_t, 4),
green_bonus_czk=float(getattr(s, "green_bonus_czk_per_slot", 0.0) or 0.0),
)
)
snapshot: dict[str, Any] = {
"version": planner_version or "v2-clean",
"planner_build_tag": V2_BUILD_TAG,
"inputs": {
"operating_mode": om,
"current_soc_wh": soc0,
"terminal_czk_per_wh": round(terminal, 8),
"arb_floor_wh": arb_floor,
"block_export_on_negative_sell": block_neg_sell,
"gen_cutoff_available": gen_cutoff_avail,
"slot_count": T,
"ev_sessions": sum(1 for x in ev_sessions if x is not None),
"ev_min_power_w": ev_min_w,
"ev_phases": [int(getattr(vehicles[e], "phases", 0) or 0) for e in range(EV)],
"ev_start_penalty_czk": ev_start_pen,
"masks_ignored": True,
"night_buffer_slots": sum(1 for b in nb_buffer_wh if b > 0),
"pv_risk_frontload_czk_kwh": frontload if neg_idx else 0.0,
"safety_soc_risk_factor": safety_risk,
"safety_soc_slots": sum(1 for x in safety_tgt_wh if x > 0),
"night_buffer_max_wh": round(max(nb_buffer_wh), 1) if nb_buffer_wh else 0,
},
"objective_terms": {
"cash_czk": round(float(pulp.value(cash)), 3),
"degradation_czk": round(float(pulp.value(degradation)), 3),
"extras_czk": round(float(pulp.value(extras)), 3) if not isinstance(extras, float) else 0.0,
"terminal_value_czk": round(terminal * _val(soc[T - 1]), 3),
"ev_unmet_wh": [round(_val(u), 1) for u in ev_unmet],
"ev_opp_wh": [round(_val(o), 1) for o, _v in ev_opp],
"ev_starts": [
int(round(sum(_val(ev_start[e][t]) for t in range(T))))
if ev_start[e] is not None
else 0
for e in range(EV)
],
},
"solver_duration_ms": duration_ms,
"solver_status": status_str,
}
return results, duration_ms, snapshot