# backend/services/planning/solver_v2.py # # EMS plánovač v2 — ČISTÉ ekonomické jádro (Fáze 3). # # Filozofie: objective = reálné peníze (nákup − prodej + degradace − terminal # hodnota energie). Žádné heuristické penalty z constants.py, žádné pre-solver # fáze/okna/kotvy. Chování (neg-sell příprava, evening export, arbitráž) má # VYPLYNOUT z cen a fyziky, ne z ručně laděných vah. # # Co zůstává (tvrdá pravidla — fyzika, HW, CLAUDE.md): # - bilance sběrnice, SoC dynamika s účinnostmi, výkonové stropy # - curtailment jen pole A (pravidlo 5); GEN cutoff binárka pole B (pravidlo 6) # - block_export_on_negative_sell → ge == 0 při sell < 0 (pravidlo 6, KV1) # - buy < 0 → ge == 0 (žádná pumpa import−export přes jeden elektroměr; import # je omezen breakerem — pravidlo 7) # - export z BATERIE ⇒ koncové SoC ≥ arb floor (pravidlo 19; PV export floor nevynucuje) # - zákaz současného importu a exportu (binárka) # - load-first Deye: bc_pv + ge_pv jen z PV přebytku nad zátěží # - EV deadline, TUV look-ahead, provozní režimy (legitimní constraints) # - noční SoC polštář: plán nesmí kalkulovat s vybitím až na min_soc — chyba # predikce noční spotřeby by znamenala neplánovaný noční nákup. Velikost # z DB (planner_night_baseload_buffer_percent → slot.night_baseload_buffer_wh, # klesá k 0 do rána); porušení je PLACENÉ cenou buy daného slotu (riziko # zpětného nákupu), takže extrémní sell špička ho smí racionálně prodat. # - PV-risk front-load: v okně sell<0 je nabíjení z PV zdarma kdykoliv → # indiference v čase; odložení ale spoléhá na predikci (večerní mrak). # Malá prémie za držení energie dřív (DB planner_pv_risk_frontload_czk_kwh) # vede k "nabít plným výkonem hned, pak řezat A" — emergentně, bez rampy. # - oportunistické EV („měkký cíl"): nad tvrdý target smí auto vzít až # headroom_wh (do 100 %), oceněno opportunistic_value_czk_kwh (= budoucí # ušetřené nabíjení, session override → vozidlo, DB) — kupuje jen velmi # levnou/zápornou energii. Dekompozice Σ(EV energie) == needed − unmet + opp # zároveň stropuje celkovou energii do auta (dřív při buy<0 bez stropu); # opp vrstva NENÍ vázaná deadline (auto bývá doma dál, odjezd řeší rolling # replan); bez session je EV == 0 (stop-session). Deadline suma jde po # slot PŘED deadline (slot začínající v deadline už nepatří „do deadline"). # - min. výkon wallboxu (asset_ev_charger.min_power_w, 6 A ≈ 1380 W): # binárka ev_on → setpoint ∈ {0} ∪ [min_power_w, max]; ev_direct ≤ gi + PV # (fyzikální split direct/via_bat). Reporting: kWh přes ev_via_bat plní # battery_arbitrage_czk oportunitní cenou (min sell exportního slotu dne, # jinak terminal value) — slotový buy pro ně neplatí. U TŘÍFÁZOVÉHO wallboxu # (asset_ev_charger.phases ≥ 3) je floor zvednut na 6 A × fáze × 230 V (≈ 4140 # W pro 3f) místo 1f ~1380 W → ruší sub-6A 1f trickle drobky (cap = max výkon # vozidla). Fáze/min jdou z DB přes vehicle kontext (R__039). # - anti-fragmentace EV (Fix B): per-slot binárka ev_on (vždy při floor NEBO # start penaltě) + hrana ev_start[t] ≥ ev_on[t] − ev_on[t−1]; objektiv += # Σ ev_start × asset_ev_charger.planner_ev_start_penalty_czk (Kč). Drobná # penalta (filozofie v2: nejistota/opotřebení = cena, ne tvrdá priorita) → # souvislá dávka místo rozsekání. Default 0 = no-op (golden-safe). # - denní SoC rampa: deficit pod slot.safety_soc_target_wh (R__063: reserve → # reserve+noc, 6–19 h) platí za slot nájem buy×faktor (DB # planner_safety_soc_risk_factor) — ráno se nejdřív dotáhne rezerva # (nenadálý odběr by se kupoval draho), pak se prodává. # # Vědomé odchylky od v1 (změří harness): # - SQL masky allow_charge / allow_discharge_export se IGNORUJÍ (jsou to # výstupy charge-slot-budget heuristik, ne fyzika) # - EV náklady jen přes bilanci (v1 je účtuje navíc v objective — dvojí započtení) # - import breaker je tvrdý strop (v1 měkký s 10 Kč/kWh) # - nedodaná EV energie má explicitní cenu místo infeasibility from __future__ import annotations import logging import time from typing import Any, Optional import pulp from services.planning.constants import ( EV_MIN_CHARGE_CURRENT_A, EV_MULTIPHASE_FLOOR_MIN_PHASES, EV_PHASE_VOLTAGE_V, INTERVAL_H, SOLVER_TIME_LIMIT, ) from services.planning.types import ( DispatchResult, PlanningSlot, _prague_calendar_date, _prague_dow_hour, ) from services.planning.heuristics import _dispatch_grid_setpoint_w logger = logging.getLogger(__name__) V2_BUILD_TAG = "v2-ev-accounting-2026-06-12" # Cena za vypnutí GEN portu (mikroinvertory pole B): reálné riziko/opotřebení # cyklování stykače — drobná, ale nenulová, aby cutoff platil jen při sell < 0. V2_GEN_CUTOFF_CZK_KWH = 2.0 # SELF_SUSTAIN: export je nežádoucí, ale tvrdé ge=0 by s neřiditelným polem B # a plnou baterií bylo infeasible — vysoká cena funguje jako ventil. V2_SELF_SUSTAIN_EXPORT_CZK_KWH = 100.0 # Cena nedodané EV energie do deadline (Kč/kWh) — místo tvrdé infeasibility. V2_EV_UNMET_CZK_KWH = 50.0 # Nepatrný tie-break proti zbytečnému curtailu při cenové indiferenci (Kč/kWh). V2_CURTAIL_TIEBREAK_CZK_KWH = 0.001 def _terminal_value_czk_per_wh(slots: list[PlanningSlot], battery: Any) -> float: """Shadow cena zbytkové energie: průměrný buy prvních 24 h × DB faktor (pravidlo 16).""" n24 = min(len(slots), int(24 / INTERVAL_H)) avg_buy = sum(float(s.buy_price) for s in slots[:n24]) / max(1, n24) factor = float(getattr(battery, "planner_terminal_soc_value_factor", 1.0) or 1.0) return max(0.0, avg_buy) * factor / 1000.0 def _arb_floor_wh(battery: Any) -> float: """Podlaha SoC pro export z baterie (pravidlo 19): ekonomická rezerva z DB.""" floor = getattr(battery, "arb_floor_wh", None) if floor is None: floor = getattr(battery, "reserve_soc_wh", None) return max(float(floor or 0.0), float(battery.min_soc_wh)) def solve_dispatch_v2( slots: list[PlanningSlot], battery: Any, heat_pump: Any, grid: Any, ev_sessions: list, vehicles: list, current_soc_wh: float, current_tuv_temp_c: float, *, tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None, operating_mode: str = "AUTO", planner_version: str | None = None, ) -> tuple[list[DispatchResult], int, dict[str, Any]]: """Čistý ekonomický MILP; rozhraní kompatibilní se solve_dispatch (v1).""" if not slots: raise RuntimeError("solve_dispatch_v2 requires at least one slot") t0 = time.monotonic() T = len(slots) om = (operating_mode or "AUTO").upper() EV = min(len(vehicles), 2) max_imp = float(grid.max_import_power_w) max_exp = float(grid.max_export_power_w) max_chg = float(battery.max_charge_power_w) max_dis = float(battery.max_discharge_power_w) eff_c = float(battery.charge_efficiency) eff_d = float(battery.discharge_efficiency) deg = float(battery.degradation_cost_czk_kwh) soc_min = float(battery.min_soc_wh) soc_max = float(battery.soc_max_wh) usable = float(battery.usable_capacity_wh) arb_floor = _arb_floor_wh(battery) terminal = _terminal_value_czk_per_wh(slots, battery) block_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False)) gen_cutoff_avail = bool(getattr(grid, "deye_gen_microinverter_cutoff_enabled", False)) soc0 = min(max(float(current_soc_wh), soc_min), soc_max) prob = pulp.LpProblem("dispatch_v2", pulp.LpMinimize) gi = [pulp.LpVariable(f"gi_{t}", 0, max_imp) for t in range(T)] ge_pv = [pulp.LpVariable(f"gepv_{t}", 0, max_exp) for t in range(T)] ge_bat = [pulp.LpVariable(f"gebat_{t}", 0, max_exp) for t in range(T)] bc_pv = [pulp.LpVariable(f"bcpv_{t}", 0, max_chg) for t in range(T)] bc_gi = [pulp.LpVariable(f"bcgi_{t}", 0, max_chg) for t in range(T)] bd = [pulp.LpVariable(f"bd_{t}", 0, max_dis) for t in range(T)] ca = [pulp.LpVariable(f"ca_{t}", 0, max(0, int(slots[t].pv_a_forecast_w))) for t in range(T)] soc = [pulp.LpVariable(f"soc_{t}", soc_min, soc_max) for t in range(T)] hp = [pulp.LpVariable(f"hp_{t}", 0, float(heat_pump.rated_heating_power_w)) for t in range(T)] y_imp = [pulp.LpVariable(f"yimp_{t}", cat=pulp.LpBinary) for t in range(T)] z_exp = [pulp.LpVariable(f"zexp_{t}", cat=pulp.LpBinary) for t in range(T)] z_gen = ( [pulp.LpVariable(f"zgen_{t}", cat=pulp.LpBinary) for t in range(T)] if gen_cutoff_avail else None ) ev_direct = [ [ pulp.LpVariable(f"evd_{e}_{t}", 0, min(float(vehicles[e].max_charge_power_w), max_imp)) for t in range(T) ] for e in range(EV) ] ev_via_bat = [ [ pulp.LpVariable(f"evb_{e}_{t}", 0, float(vehicles[e].max_charge_power_w)) for t in range(T) ] for e in range(EV) ] ev_unmet: list = [] # slack Wh per session (cena V2_EV_UNMET_CZK_KWH) ev_opp: list = [] # (var, value_czk_kwh) — energie nad target (měkký cíl) ev_start_terms: list = [] # (ev_start var, penalta Kč) — anti-fragmentace (Fix B) def _ev_min_power_w(e: int) -> float: """Dolní mez nabíjecí dávky (W): u 3f wallboxu fyzikální 6 A × fáze × napětí (≈ 4140 W) místo 1f ~1380 W → zruší sub-6A 1f trickle. Stropuje se max výkonem vozidla (jinak by připojený slot byl infeasible). Bez spolehlivého počtu fází padá zpět na min_power_w z DB.""" veh = vehicles[e] base_min = max(0.0, float(getattr(veh, "min_power_w", 0) or 0)) phases = int(getattr(veh, "phases", 0) or 0) ev_max = float(veh.max_charge_power_w) if phases >= EV_MULTIPHASE_FLOOR_MIN_PHASES: floor = EV_MIN_CHARGE_CURRENT_A * phases * EV_PHASE_VOLTAGE_V base_min = max(base_min, floor) # strop max výkonem vozidla — floor nesmí překročit, co auto/wallbox umí if ev_max > 0: base_min = min(base_min, ev_max) return base_min def _ev_start_penalty_czk(e: int) -> float: return max(0.0, float(getattr(vehicles[e], "planner_ev_start_penalty_czk", 0.0) or 0.0)) ev_min_w = [_ev_min_power_w(e) for e in range(EV)] ev_start_pen = [_ev_start_penalty_czk(e) for e in range(EV)] # ev_on[e][t]: zapnutost wallboxu v slotu. Vždy potřeba, pokud platí min-power # floor (gate) NEBO start penalta (anti-fragmentace). ev_start[e][t]: náběžná # hrana ev_on (start nové dávky) — jen když je start penalta > 0 (jinak žádný # extra MILP balast a default 0 = no-op, golden-safe). ev_needs_on = [(ev_min_w[e] > 0.0) or (ev_start_pen[e] > 0.0) for e in range(EV)] ev_on = [ [ pulp.LpVariable(f"evon_{e}_{t}", cat=pulp.LpBinary) for t in range(T) ] if ev_needs_on[e] else None for e in range(EV) ] ev_start = [ [ pulp.LpVariable(f"evstart_{e}_{t}", 0, 1) for t in range(T) ] if ev_start_pen[e] > 0.0 else None for e in range(EV) ] nb_buffer_wh = [max(0.0, float(s.night_baseload_buffer_wh or 0.0)) for s in slots] safety_risk = float(getattr(battery, "planner_safety_soc_risk_factor", 0.0) or 0.0) safety_tgt_wh = [ min(soc_max, max(0.0, float(s.safety_soc_target_wh or 0.0))) if safety_risk > 0 else 0.0 for s in slots ] ds_slack = [ pulp.LpVariable(f"dss_{t}", 0, soc_max) if safety_tgt_wh[t] > 0 else None for t in range(T) ] nb_slack = [ pulp.LpVariable(f"nbs_{t}", 0, nb_buffer_wh[t]) if nb_buffer_wh[t] > 0 else None for t in range(T) ] def _connected(e: int, t: int) -> bool: return bool(slots[t].ev1_connected if e == 0 else slots[t].ev2_connected) for t in range(T): s = slots[t] pv_a = max(0.0, float(s.pv_a_forecast_w)) pv_b = max(0.0, float(s.pv_b_forecast_w)) pv_a_net = pv_a - ca[t] pv_b_eff = pv_b - (pv_b * z_gen[t] if z_gen is not None else 0.0) ev_total_t = pulp.lpSum( ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV) ) load_site = float(s.load_baseline_w) + ev_total_t + hp[t] # bilance sběrnice (W) prob += ( pv_a_net + pv_b_eff + gi[t] + bd[t] == load_site + bc_pv[t] + bc_gi[t] + ge_pv[t] + ge_bat[t] ), f"balance_{t}" # SoC dynamika (Wh) prev = soc0 if t == 0 else soc[t - 1] prob += ( soc[t] == prev + (bc_pv[t] + bc_gi[t]) * eff_c * INTERVAL_H - bd[t] / eff_d * INTERVAL_H ), f"soc_{t}" # výkonové stropy prob += bc_pv[t] + bc_gi[t] <= max_chg, f"chg_cap_{t}" prob += ge_pv[t] + ge_bat[t] <= max_exp, f"exp_cap_{t}" # PV cesty omezené dostupnou výrobou (load-first vynucuje HW; bilance účtuje energii) prob += bc_pv[t] + ge_pv[t] <= pv_a_net + pv_b_eff, f"pv_src_{t}" # bc_gi jen ze sítě: prob += bc_gi[t] <= gi[t], f"bcgi_src_{t}" # vybíjení kryje dům + EV-via-bat + export z baterie prob += ge_bat[t] + pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t], f"bd_split_{t}" # ev_direct fyzicky jen ze sítě + PV (ne z baterie) — split direct/via_bat # není arbitrární, ekonomiku nemění (bilance platí stejně) prob += ( pulp.lpSum(ev_direct[e][t] for e in range(EV)) <= gi[t] + pv_a_net + pv_b_eff ), f"evd_src_{t}" # zákaz současného importu a exportu prob += gi[t] <= max_imp * y_imp[t], f"imp_excl_{t}" prob += ge_pv[t] + ge_bat[t] <= max_exp * (1 - y_imp[t]), f"exp_excl_{t}" # pravidlo 19: export z baterie ⇒ SoC ≥ arb floor prob += ge_bat[t] <= max_exp * z_exp[t], f"zexp_link_{t}" prob += soc[t] >= arb_floor - (soc_max - soc_min) * (1 - z_exp[t]), f"zexp_floor_{t}" # noční SoC polštář (viz hlavička): soft floor nad min_soc if nb_slack[t] is not None: prob += soc[t] >= soc_min + nb_buffer_wh[t] - nb_slack[t], f"night_buf_{t}" # denní SoC rampa (viz hlavička): soft floor k safety targetu if ds_slack[t] is not None: prob += soc[t] >= safety_tgt_wh[t] - ds_slack[t], f"day_safety_{t}" # tvrdá cenová pravidla if float(s.buy_price) < 0.0: prob += ge_pv[t] + ge_bat[t] == 0, f"neg_buy_noexp_{t}" if float(s.sell_price) < 0.0 and block_neg_sell: prob += ge_pv[t] + ge_bat[t] == 0, f"neg_sell_block_{t}" # EV dostupnost + min. výkon wallboxu (binárka ev_on) + start hrana. # ev_on existuje, když platí min-power floor NEBO start penalta. for e in range(EV): on_t = ev_on[e][t] if ev_on[e] is not None else None if not _connected(e, t): prob += ev_direct[e][t] == 0 prob += ev_via_bat[e][t] == 0 if on_t is not None: prob += on_t == 0, f"ev_off_{e}_{t}" else: ev_max_w = float(vehicles[e].max_charge_power_w) ev_total = ev_direct[e][t] + ev_via_bat[e][t] if on_t is not None and ev_max_w > 0: # on=1 nutné kdykoli ev_total > 0 (start penalta i floor to potřebují) prob += ev_total <= ev_max_w * on_t, f"ev_max_{e}_{t}" if 0 < ev_min_w[e] <= ev_max_w: prob += ev_total >= ev_min_w[e] * on_t, f"ev_min_{e}_{t}" else: prob += ev_total <= ev_max_w # start = náběžná hrana ev_on (≥ on[t] − on[t−1]); slot 0 startuje vždy, # když je on (žádný předchozí stav v horizontu). if ev_start[e] is not None and on_t is not None: prev_on = ev_on[e][t - 1] if t > 0 else 0 prob += ev_start[e][t] >= on_t - prev_on, f"ev_start_{e}_{t}" # provozní režimy (tvrdé constraints dle operating-modes.md) if om == "SELF_SUSTAIN": prob += gi[t] <= float(s.load_baseline_w), f"ss_gi_{t}" elif om == "PRESERVE": prob += bc_pv[t] == 0 prob += bc_gi[t] == 0 prob += bd[t] == 0 elif om == "CHARGE_CHEAP": prob += ge_pv[t] + ge_bat[t] == 0 prob += bd[t] == 0 # EV deadline (s placeným slackem místo infeasibility) + měkký cíl. # Bez session není mandát nabíjet: připojené auto bez session (stop-session, # golden fixtures s vynulovanými sessions) nesmí při buy<0 „pumpovat" energii. for e in range(EV): sess = ev_sessions[e] if e < len(ev_sessions) else None if sess is None: for t in range(T): if _connected(e, t): prob += ev_direct[e][t] == 0, f"ev_nosess_d_{e}_{t}" prob += ev_via_bat[e][t] == 0, f"ev_nosess_b_{e}_{t}" continue needed = max(0.0, float(getattr(sess, "energy_needed_wh", 0.0) or 0.0)) unmet = pulp.LpVariable(f"ev_unmet_{e}", 0, needed) ev_unmet.append(unmet) if needed > 0: # první slot s interval_start >= deadline už do deadline NEPATŘÍ # (slot [deadline, deadline+15min) dodává energii až po odjezdu) t_dl = next( (t for t in range(T) if slots[t].interval_start >= sess.target_deadline), T, ) prob += ( pulp.lpSum( (ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H for t in range(t_dl) if _connected(e, t) ) + unmet >= needed ), f"ev_deadline_{e}" # měkký cíl: dekompozice celkové energie == needed − unmet + opp. # Oportunistická vrstva NENÍ omezená deadline — auto bývá doma dál, # odjezd řeší rolling replan (rozhodnutí 2026-06-12). headroom = max(0.0, float(getattr(sess, "headroom_wh", 0.0) or 0.0)) opp_val = float(getattr(sess, "opportunistic_value_czk_kwh", 0.0) or 0.0) opp = pulp.LpVariable(f"ev_opp_{e}", 0, headroom if opp_val > 0 else 0.0) ev_opp.append((opp, opp_val)) prob += ( pulp.lpSum( (ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H for t in range(T) if _connected(e, t) ) == needed - unmet + opp ), f"ev_total_{e}" # TUV look-ahead (převzato z v1 — komfortní constraint, ne heuristika) rated_hp = float(heat_pump.rated_heating_power_w) if tuv_delta_stats and rated_hp > 0 and getattr(heat_pump, "tuv_min_temp_c", None): tuv_pred = float(current_tuv_temp_c) tgt = float(getattr(heat_pump, "tuv_target_temp_c", 55.0) or 55.0) thr = float(heat_pump.tuv_min_temp_c) + 5.0 for t in range(T): dow, hour = _prague_dow_hour(slots[t].interval_start) delta = tuv_delta_stats.get((dow, hour), -0.1) tuv_pred += float(delta) * INTERVAL_H if tuv_pred < thr: prob += ( pulp.lpSum(hp[s_] for s_ in range(max(0, t - 8), t + 1)) >= rated_hp * 0.5 ), f"tuv_heat_{t}" tuv_pred = tgt if float(current_tuv_temp_c) < float(heat_pump.tuv_min_temp_c): prob += hp[0] >= rated_hp * 0.8, "tuv_emergency" # ---------------- objective: jen reálné peníze ---------------- wh = INTERVAL_H / 1000.0 # W → kWh za slot cash = pulp.lpSum( gi[t] * float(slots[t].buy_price) * wh - (ge_pv[t] + ge_bat[t]) * float(slots[t].sell_price) * wh for t in range(T) ) degradation = pulp.lpSum( 0.5 * (bc_pv[t] + bc_gi[t] + bd[t]) * deg * wh for t in range(T) ) extras = pulp.lpSum(ca[t] * V2_CURTAIL_TIEBREAK_CZK_KWH * wh for t in range(T)) if z_gen is not None: extras += pulp.lpSum( max(0.0, float(slots[t].pv_b_forecast_w)) * z_gen[t] * V2_GEN_CUTOFF_CZK_KWH * wh for t in range(T) ) if om == "SELF_SUSTAIN": extras += pulp.lpSum( (ge_pv[t] + ge_bat[t]) * V2_SELF_SUSTAIN_EXPORT_CZK_KWH * wh for t in range(T) ) if ev_unmet: extras += pulp.lpSum(u * V2_EV_UNMET_CZK_KWH / 1000.0 for u in ev_unmet) if ev_opp: extras -= pulp.lpSum(o / 1000.0 * val for o, val in ev_opp if val > 0) # anti-fragmentace EV (Fix B): Σ ev_start × start_penalta (Kč). Default 0 → no-op. ev_start_terms = [ ev_start[e][t] * ev_start_pen[e] for e in range(EV) if ev_start[e] is not None and ev_start_pen[e] > 0.0 for t in range(T) ] if ev_start_terms: extras += pulp.lpSum(ev_start_terms) nb_terms = [ nb_slack[t] / 1000.0 * max(0.0, float(slots[t].buy_price)) for t in range(T) if nb_slack[t] is not None ] if nb_terms: extras += pulp.lpSum(nb_terms) ds_terms = [ ds_slack[t] / 1000.0 * max(0.0, float(slots[t].buy_price)) * safety_risk for t in range(T) if ds_slack[t] is not None ] if ds_terms: extras += pulp.lpSum(ds_terms) frontload = float(getattr(battery, "planner_pv_risk_frontload_czk_kwh", 0.0) or 0.0) neg_idx = [t for t in range(T) if float(slots[t].sell_price) < 0.0] if frontload > 0 and neg_idx: # odměna za soc[t] v neg slotech = dřívější nabití vyhrává při indiferenci extras -= pulp.lpSum(soc[t] / 1000.0 * frontload for t in neg_idx) prob += cash + degradation + extras - terminal * soc[T - 1] solver = ( pulp.HiGHS_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT) ) status = prob.solve(solver) duration_ms = int((time.monotonic() - t0) * 1000) status_str = pulp.LpStatus[status] if status_str != "Optimal": # v2 nemá relax řetězec — model je navržen tak, aby byl feasible # (placené slacky místo tvrdých kotev). Ne-Optimal je skutečná chyba. raise RuntimeError(f"solver_v2: {status_str}") # ---------------- DispatchResult assembly (parita s v1) ---------------- def _val(var) -> float: v = pulp.value(var) return float(v) if v is not None else 0.0 # Reporting EV-via-bat: kWh do auta z baterie neplatí slotový buy (jdou # z baterie), ale ušlou příležitost. Aproximace oportunitní ceny: nejnižší # sell slotu, kde plán exportuje, v témže pražském dni; bez exportu ten den # terminal value (Kč/kWh). Plní battery_arbitrage_czk (dřív konstantní 0). day_min_export_sell: dict[Any, float] = {} for t in range(T): if _val(ge_pv[t]) + _val(ge_bat[t]) >= 1.0: d_key = _prague_calendar_date(slots[t]) sp = float(slots[t].sell_price) if d_key not in day_min_export_sell or sp < day_min_export_sell[d_key]: day_min_export_sell[d_key] = sp results: list[DispatchResult] = [] for t in range(T): s = slots[t] via1_w = _val(ev_via_bat[0][t]) if EV > 0 else 0.0 via2_w = _val(ev_via_bat[1][t]) if EV > 1 else 0.0 via_kwh = (via1_w + via2_w) * wh if via_kwh > 1e-9: opp_price = max( 0.0, day_min_export_sell.get(_prague_calendar_date(s), terminal * 1000.0), ) arb_czk = via_kwh * opp_price else: arb_czk = 0.0 bc_tot = _val(bc_pv[t]) + _val(bc_gi[t]) bd_v = _val(bd[t]) batt_w = round(bc_tot - bd_v) ge_pv_w = round(_val(ge_pv[t])) ge_bat_w = round(_val(ge_bat[t])) gi_w = _val(gi[t]) ge_w = float(ge_pv_w + ge_bat_w) grid_w, export_mode = _dispatch_grid_setpoint_w( gi_w=gi_w, ge_w=ge_w, ge_bat_w=float(ge_bat_w), ge_pv_w=float(ge_pv_w), max_export_power_w=int(max_exp), ) if batt_w < 0 and grid_w < 0: deye_mode = "SELL" elif batt_w > 0 and grid_w > 0: deye_mode = "CHARGE" else: deye_mode = "PASSIVE" gen_cut = bool(round(_val(z_gen[t]))) if z_gen is not None else None hp_v = _val(hp[t]) hp_on = hp_v > rated_hp * 0.5 if rated_hp > 0 else False cash_t = gi_w * float(s.buy_price) * wh - ge_w * float(s.sell_price) * wh pen_t = 0.0 if gen_cut: pen_t += max(0.0, float(s.pv_b_forecast_w)) * V2_GEN_CUTOFF_CZK_KWH * wh results.append( DispatchResult( interval_start=s.interval_start, battery_setpoint_w=batt_w, battery_soc_target=round(_val(soc[t]) / usable * 100.0, 2), grid_setpoint_w=grid_w, export_limit_w=int(max_exp) if grid_w < 0 else 0, export_mode=export_mode, deye_physical_mode=deye_mode, deye_gen_cutoff_enabled=gen_cut, ev1_setpoint_w=( round(_val(ev_direct[0][t]) + _val(ev_via_bat[0][t])) if EV > 0 and s.ev1_connected else None ), ev2_setpoint_w=( round(_val(ev_direct[1][t]) + _val(ev_via_bat[1][t])) if EV > 1 and s.ev2_connected else None ), ev1_via_bat_w=round(via1_w), ev2_via_bat_w=round(via2_w), heat_pump_enabled=hp_on, heat_pump_setpoint_w=int(rated_hp) if hp_on else 0, pv_a_curtailed_w=round(_val(ca[t])), expected_cost_czk=round(cash_t, 4), effective_buy_price=float(s.buy_price), effective_sell_price=float(s.sell_price), is_predicted_price=bool(s.is_predicted_price), cashflow_czk=round(cash_t, 4), battery_arbitrage_czk=round(arb_czk, 4), penalty_czk=round(pen_t, 4), green_bonus_czk=float(getattr(s, "green_bonus_czk_per_slot", 0.0) or 0.0), ) ) snapshot: dict[str, Any] = { "version": planner_version or "v2-clean", "planner_build_tag": V2_BUILD_TAG, "inputs": { "operating_mode": om, "current_soc_wh": soc0, "terminal_czk_per_wh": round(terminal, 8), "arb_floor_wh": arb_floor, "block_export_on_negative_sell": block_neg_sell, "gen_cutoff_available": gen_cutoff_avail, "slot_count": T, "ev_sessions": sum(1 for x in ev_sessions if x is not None), "ev_min_power_w": ev_min_w, "ev_phases": [int(getattr(vehicles[e], "phases", 0) or 0) for e in range(EV)], "ev_start_penalty_czk": ev_start_pen, "masks_ignored": True, "night_buffer_slots": sum(1 for b in nb_buffer_wh if b > 0), "pv_risk_frontload_czk_kwh": frontload if neg_idx else 0.0, "safety_soc_risk_factor": safety_risk, "safety_soc_slots": sum(1 for x in safety_tgt_wh if x > 0), "night_buffer_max_wh": round(max(nb_buffer_wh), 1) if nb_buffer_wh else 0, }, "objective_terms": { "cash_czk": round(float(pulp.value(cash)), 3), "degradation_czk": round(float(pulp.value(degradation)), 3), "extras_czk": round(float(pulp.value(extras)), 3) if not isinstance(extras, float) else 0.0, "terminal_value_czk": round(terminal * _val(soc[T - 1]), 3), "ev_unmet_wh": [round(_val(u), 1) for u in ev_unmet], "ev_opp_wh": [round(_val(o), 1) for o, _v in ev_opp], "ev_starts": [ int(round(sum(_val(ev_start[e][t]) for t in range(T)))) if ev_start[e] is not None else 0 for e in range(EV) ], }, "solver_duration_ms": duration_ms, "solver_status": status_str, } return results, duration_ms, snapshot