# backend/services/planning_engine.py # # EMS Platform – plánovací engine # Obsahuje: hlavní denní plán + rolling 15min replan # # Spouštění (APScheduler v lifespan.py): # scheduler.add_job(run_daily_plan, 'cron', hour=15, minute=0) # scheduler.add_job(run_rolling_replan, 'cron', minute='*/15') # Horizont: ems.fn_planning_horizon_end (OTE + strop/min v SQL). import json import logging import time from dataclasses import dataclass, replace from datetime import datetime, timezone, timedelta from types import SimpleNamespace from typing import Any, Optional from zoneinfo import ZoneInfo import pulp from app.config import get_settings logger = logging.getLogger(__name__) # ============================================================ # Konstanty # ============================================================ # Když DB vrátí NULL (skoro žádná OTE data), denní plán použije krátký fallback (soulad s min hodinami ve fn_planning_horizon_end). _DAILY_FALLBACK_HORIZON_HOURS = 1.0 # Shadow cena zbytkové energie na konci horizontu: - (avg_buy * FACTOR / 1000) * soc[T-1] (Kč; soc v Wh). INTERVAL_H = 0.25 # 15 minut v hodinách CURTAILMENT_PENALTY = 0.001 # Kč/Wh – malá penalizace za omezení FVE pole A SOLVER_TIME_LIMIT = 10 # sekund # MILP: významný export ge (W) ⇒ koncové soc[t] ≥ podlaha; mimo arbitrážní relax je to arb_base_wh # (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_panel_min[t] # (planner floor), jinak by šlo jen do zátěže a nešlo by „vypustit do sítě“ před levným nákupem. GE_MIN_EXPORT_W = 1.0 # Dvouprůchodové solve: stop když acquisition z pass1 vs pass2 se liší méně než (Kč/kWh). ACQUISITION_TWO_PASS_EPS_KWH = 0.05 # Load-first (Deye): PV nejdřív pokryje load+EV+TČ; bc_pv/ge_pv jen z pv_sp (přebytek). LOAD_FIRST_INCENTIVE_CZK_KWH = 0.05 # Dokud je kotva pro hluboký dump (první sell < 0 v horizontu, jinak první extrémní buy) dál než # tento počet 15min slotů, držíme plánovací spodek na rezervě (arb_base_wh) místo planner floor — # priorita: beze „ztráty na prodeji“ (sell >= 0) držet buffer, hluboký vývoz až těsně před záporným prodejem. DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS = 8 # Měkká kotva: chceme být u planner floor už v posledním slotu před prvním sell < 0. # Penalizace je v Kč/Wh (např. 0.20 = 200 Kč/kWh). Musí být dost velká, aby přebila # bezpečnostní SoC buffer + terminal shadow cenu a solver skutečně „dovylil“ před sell<0. PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH = 0.20 PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0 # Měkký tlak: v okně sell<0 + block_export využít PV přebytek do baterie (ne curtail). PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 120.0 # Curtailment při sell<0 + allow_charge: nesmí být téměř zdarma oproti nabíjení (BA81). NEG_SELL_CURTAIL_PENALTY_CZK_KWH = 1.0 # Odměna v objective za FVE→baterie při sell<0 (doplňuje shortfall; BA81 fixed tarif). NEG_SELL_PV_CHARGE_REWARD_CZK_KWH = 0.8 # Měkký tlak: v okně sell<0 dobít na soc_max (ne zastavit na ~94 % kvůli curtail). NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH = 0.35 # Jen ventil nekontrolovatelného pole B při plné baterii a sell<0 (spot); ne celý PV přebytek. NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH = 4.0 # Výboj baterie při sell<0 jen těsně před extrémně záporným buy (round-trip arbitráž). EXTREME_BUY_DUMP_PREWINDOW_SLOTS = 12 NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH = 80.0 PLANNER_BUILD_TAG = "2026-05-26-neg-sell-bat-dump-extreme-buy-v11" CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru # Útlum korekce: čím dál od aktuálního času, tím méně korigujeme forecast CORRECTION_DECAY_SLOTS = 16 # po 16 slotech (4h) klesne korekce na 0 # Dynamická ekonomická podlaha (MILP w_arb): lookahead FVE energie v dalších slotech ARB_LOOKAHEAD_SLOTS = 32 # 8 h při INTERVAL_H=0.25 ARB_FLOOR_E_REF_FRAC = 0.5 # má scale Wh = tato frakce usable_capacity (0..1) _PRAGUE_TZ = ZoneInfo("Europe/Prague") def _timestamptz_from_db(val: object) -> Optional[datetime]: if val is None: return None if isinstance(val, datetime): return val if val.tzinfo else val.replace(tzinfo=timezone.utc) return datetime.fromisoformat(str(val).replace("Z", "+00:00")) def _planner_engine_version(explicit: str | None = None) -> str: if explicit is not None and str(explicit).strip(): return str(explicit).strip().lower() return str(get_settings().planning_engine_version or "v1").strip().lower() def _planner_compare_enabled() -> bool: return bool(get_settings().planning_engine_compare_enabled) def _planner_peer_version(version: str) -> str: v = str(version).strip().lower() if v == "v1": return "v2" if v == "v2": return "v1" return "v1" def _dispatch_result_summary(results: list["DispatchResult"], duration_ms: int, version: str) -> dict[str, Any]: charge_slots = [r.interval_start.isoformat() for r in results if r.battery_setpoint_w > 500] discharge_slots = [r.interval_start.isoformat() for r in results if r.battery_setpoint_w < -500] export_slots = [r.interval_start.isoformat() for r in results if r.grid_setpoint_w < 0] return { "planner_version": version, "solver_duration_ms": int(duration_ms), "total_expected_cost_czk": round(sum(float(r.expected_cost_czk) for r in results), 4), "charge_slots": len(charge_slots), "discharge_slots": len(discharge_slots), "export_slots": len(export_slots), "first_charge_slot": charge_slots[0] if charge_slots else None, "first_discharge_slot": discharge_slots[0] if discharge_slots else None, "first_export_slot": export_slots[0] if export_slots else None, } def _dispatch_result_comparison( active_results: list["DispatchResult"], active_ms: int, active_version: str, peer_results: list["DispatchResult"], peer_ms: int, peer_version: str, ) -> dict[str, Any]: active_summary = _dispatch_result_summary(active_results, active_ms, active_version) peer_summary = _dispatch_result_summary(peer_results, peer_ms, peer_version) slot_rows: list[dict[str, Any]] = [] for a, b in zip(active_results, peer_results): row = { "interval_start": a.interval_start.isoformat(), "active": { "battery_setpoint_w": a.battery_setpoint_w, "grid_setpoint_w": a.grid_setpoint_w, "export_mode": a.export_mode, "deye_physical_mode": a.deye_physical_mode, "deye_gen_cutoff_enabled": a.deye_gen_cutoff_enabled, "pv_a_curtailed_w": a.pv_a_curtailed_w, "battery_soc_target": a.battery_soc_target, "expected_cost_czk": a.expected_cost_czk, }, "peer": { "battery_setpoint_w": b.battery_setpoint_w, "grid_setpoint_w": b.grid_setpoint_w, "export_mode": b.export_mode, "deye_physical_mode": b.deye_physical_mode, "deye_gen_cutoff_enabled": b.deye_gen_cutoff_enabled, "pv_a_curtailed_w": b.pv_a_curtailed_w, "battery_soc_target": b.battery_soc_target, "expected_cost_czk": b.expected_cost_czk, }, } if row["active"] != row["peer"]: slot_rows.append(row) total_cost_diff = round( float(active_summary["total_expected_cost_czk"]) - float(peer_summary["total_expected_cost_czk"]), 4, ) return { "compare_enabled": True, "active": active_summary, "peer": peer_summary, "diff": { "total_expected_cost_czk": total_cost_diff, "absolute_total_expected_cost_czk": round(abs(total_cost_diff), 4), "changed_slots": len(slot_rows), }, "slot_diffs": slot_rows, } def _maybe_add_planner_comparison( *, slots: list["PlanningSlot"], battery, heat_pump, grid, ev_sessions: list, vehicles: list, current_soc_wh: float, current_tuv_temp_c: float, operating_mode: str, tuv_delta_stats: Optional[dict[tuple[int, int], float]], active_version: str, charge_commitment_prev_w: Optional[list[Optional[float]]] = None, ) -> dict[str, Any] | None: if not _planner_compare_enabled(): return None peer_version = _planner_peer_version(active_version) if peer_version == active_version: return None peer_results, peer_ms, peer_snapshot = solve_dispatch( slots, battery, heat_pump, grid, ev_sessions, vehicles, current_soc_wh, current_tuv_temp_c, tuv_delta_stats=tuv_delta_stats, operating_mode=operating_mode, charge_commitment_prev_w=charge_commitment_prev_w, planner_version=peer_version, ) # active_results / active_ms jsou doplněny později v calleru return { "peer_version": peer_version, "peer_results": peer_results, "peer_ms": peer_ms, "peer_snapshot": peer_snapshot, } async def _planning_horizon_end(site_id: int, horizon_from: datetime, db) -> Optional[datetime]: """Konec horizontu z DB (`fn_planning_horizon_end`); NULL = rolling skip / daily fallback.""" raw = await db.fetchval( "select ems.fn_planning_horizon_end($1::int, $2::timestamptz)", site_id, horizon_from, ) return _timestamptz_from_db(raw) def _pv_scarcity_penalty_multiplier(slots: list["PlanningSlot"], battery) -> float: """ Měkká úprava ekonomiky cyklu podle očekávaného slunečního zisku. - málo očekávané FVE energie -> nižší penalizace cyklu (podpora precharge ze sítě), - hodně očekávané FVE energie -> standardní penalizace. """ horizon_slots = min(len(slots), int(24 / INTERVAL_H)) # konzervativní 1 den dopředu if horizon_slots <= 0: return 1.0 pv_kwh = 0.0 for s in slots[:horizon_slots]: pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0 batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0) # coverage = kolikanásobek baterie očekáváme ze slunce v horizontu. coverage = pv_kwh / batt_kwh coverage_clamped = max(0.0, min(1.0, coverage)) # 0.65 při nízkém slunci, 1.0 při vysokém slunci. return 0.65 + 0.35 * coverage_clamped def _pv_coverage_ratio(slots: list["PlanningSlot"], battery, hours: int = 24) -> float: horizon_slots = min(len(slots), int(hours / INTERVAL_H)) if horizon_slots <= 0: return 1.0 pv_kwh = 0.0 for s in slots[:horizon_slots]: pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0 batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0) return max(0.0, min(1.0, pv_kwh / batt_kwh)) def _dynamic_arb_floor_wh_series( slots: list["PlanningSlot"], min_soc_wh: float, arb_base_wh: float, usable_wh: float, ) -> list[float]: """ Časově proměnná ekonomická podlaha Wh pro MILP (nad min_soc_wh). Hodně očekávané FVE energie v dalších ARB_LOOKAHEAD_SLOTS → podlaha klesá k min_soc_wh; málo slunce → zůstává u arb_base_wh (typicky reserve z DB). """ T = len(slots) if T == 0: return [] e_ref = max(1.0, ARB_FLOOR_E_REF_FRAC * float(usable_wh)) spread = max(0.0, float(arb_base_wh) - float(min_soc_wh)) out: list[float] = [] for t in range(T): e_pv_wh = 0.0 for k in range(t, min(T, t + ARB_LOOKAHEAD_SLOTS)): s = slots[k] e_pv_wh += max(0, s.pv_a_forecast_w + s.pv_b_forecast_w) * INTERVAL_H f = min(1.0, e_pv_wh / e_ref) if e_ref > 1e-9 else 1.0 arb_t = float(min_soc_wh) + (1.0 - f) * spread out.append(arb_t) return out def _soc_security_profile(slots: list["PlanningSlot"], battery) -> tuple[float, float]: """ Při nízkém očekávaném slunci drží solver vyšší SoC buffer: - cílový buffer: reserve + až 20 % usable capacity, - ekonomická penalizace deficitu vůči bufferu z průměrné ceny. """ coverage = _pv_coverage_ratio(slots, battery, hours=24) scarcity = 1.0 - coverage usable_wh = float(getattr(battery, "usable_capacity_wh", 0.0)) reserve_wh = float(getattr(battery, "reserve_soc_wh", 0.0)) soc_max_wh = float(getattr(battery, "soc_max_wh", usable_wh)) extra_buffer_wh = 0.35 * usable_wh * scarcity target_wh = min(soc_max_wh, reserve_wh + extra_buffer_wh) h24 = min(len(slots), int(24 / INTERVAL_H)) avg_buy = ( sum(float(s.buy_price) for s in slots[:h24]) / h24 if h24 > 0 else 4.0 ) penalty_czk_kwh = max(0.1, avg_buy * 1.00 * scarcity) return target_wh, penalty_czk_kwh def _slot_float_nullable(d: dict[str, Any], key: str) -> float | None: v = d.get(key) if v is None: return None return float(v) def _prague_dow_hour(interval_start: datetime) -> tuple[int, int]: """DOW v konvenci PostgreSQL EXTRACT(DOW, Europe/Prague): 0=Ne … 6=So.""" dt = interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) loc = dt.astimezone(_PRAGUE_TZ) return (loc.weekday() + 1) % 7, loc.hour # ============================================================ # Datové třídy (lze nahradit pydantic modely) # ============================================================ @dataclass class PlanningSlot: interval_start: datetime buy_price: float # Kč/kWh sell_price: float # Kč/kWh pv_a_forecast_w: int # W – pole A (řiditelné) pv_b_forecast_w: int # W – pole B (zelený bonus, pevné) load_baseline_w: int # W – predikce bazální spotřeby ev1_connected: bool ev2_connected: bool is_predicted_price: bool = False allow_charge: bool = True allow_discharge_export: bool = True #: Měkké LP vstupy z `ems.fn_load_planning_slots_full` (mimo masky allow_*). night_baseload_target_wh: float | None = None night_baseload_buffer_wh: float | None = None safety_soc_target_wh: float | None = None future_avoided_buy_czk_kwh: float | None = None future_sell_opportunity_czk_kwh: float | None = None is_daytime_pv_surplus_slot: bool = False #: Vážená nákupní / opportunity cena zásoby před prvním exportním oknem (SQL odhad z masek). charge_acquisition_buy_czk_kwh: float | None = None charge_acquisition_cutoff_at: datetime | None = None # Lookahead pro relax spodní meze SoC: až 36 h od indexu slotu (pevné OTE ceny v horizontu). SOC_MIN_RELAX_LOOKAHEAD_SLOTS = 144 def _soc_min_wh_series( slots: list[PlanningSlot], usable_wh: float, base_min_wh: float, buy_extreme_threshold: float, planner_discharge_floor_pct: float | None, ) -> list[float]: """ Spodní mez SoC (Wh) pro každý slot: při extrémně záporném buy v lookahead povolit hlubší vybíjení až na planner_discharge_floor_percent (jinak min_soc z DB). Absolutní minimum 5 % usable. """ t_len = len(slots) abs_min_wh = max(usable_wh * 0.05, 1.0) if planner_discharge_floor_pct is None: relaxed_wh = base_min_wh else: relaxed_wh = max(abs_min_wh, float(planner_discharge_floor_pct) / 100.0 * usable_wh) effective_relaxed = min(base_min_wh, relaxed_wh) out: list[float] = [] for t in range(t_len): j_end = min(t_len, t + SOC_MIN_RELAX_LOOKAHEAD_SLOTS) min_buy_fwd = min(float(slots[k].buy_price) for k in range(t, j_end)) if min_buy_fwd <= buy_extreme_threshold: out.append(float(effective_relaxed)) else: out.append(float(base_min_wh)) return out def _slots_until_buy_le_threshold( slots: list[PlanningSlot], buy_threshold: float ) -> list[int]: """ Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s buy_price <= buy_threshold. Pokud v [t, T) žádný takový není, vrátí T + 1 (větší než jakýkoli rozumný prewindow). """ t_len = len(slots) sentinel = t_len + 1 next_le = sentinel next_at_or_after: list[int] = [sentinel] * t_len for t in range(t_len - 1, -1, -1): if float(slots[t].buy_price) <= buy_threshold: next_le = t next_at_or_after[t] = next_le out: list[int] = [] for t in range(t_len): nxt = next_at_or_after[t] if nxt >= t_len: out.append(sentinel) else: out.append(nxt - t) return out def _slots_until_sell_lt(slots: list[PlanningSlot], sell_upper: float) -> list[int]: """ Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s sell_price < sell_upper. Typicky sell_upper=0 (první záporný / „ztrátový“ prodej z pohledu OTE). Pokud v [t, T) žádný takový není, vrátí T + 1. """ t_len = len(slots) sentinel = t_len + 1 next_lt = sentinel next_at_or_after: list[int] = [sentinel] * t_len for t in range(t_len - 1, -1, -1): if float(slots[t].sell_price) < sell_upper: next_lt = t next_at_or_after[t] = next_lt out: list[int] = [] for t in range(t_len): nxt = next_at_or_after[t] if nxt >= t_len: out.append(sentinel) else: out.append(nxt - t) return out def _prewindow_deferral_slots( slots: list[PlanningSlot], buy_extreme_threshold: float, sell_upper: float = 0.0 ) -> list[int]: """ Vzdálenost (v 15min slotech) pro zpoždění hlubokého planner flooru: primárně do prvního sell < sell_upper (poslední „bez ztráty na prodeji“ je k-1), pokud v horizontu není záporný prodej, fallback na první buy <= buy_extreme_threshold. """ t_len = len(slots) sell_d = _slots_until_sell_lt(slots, sell_upper) buy_d = _slots_until_buy_le_threshold(slots, buy_extreme_threshold) sentinel = t_len + 1 out: list[int] = [] for t in range(t_len): if sell_d[t] < sentinel: out.append(sell_d[t]) else: out.append(buy_d[t]) return out def _soc_panel_min_wh_series( soc_min_series: list[float], slots_until_relax_anchor: list[int], min_soc_wh: float, arb_base_wh: float, prewindow_slots: int, ) -> list[float]: """ Zpoždění hluboké relaxace: pokud je lookahead extrémní (soc_min pod min_soc), ale kotva (záporný prodej / fallback extrémní buy) je dál než prewindow_slots, drž spodek na max(relax_wh, arb_base_wh) — prakticky na rezervě. """ t_len = len(soc_min_series) out: list[float] = [] for t in range(t_len): sm = float(soc_min_series[t]) if sm < min_soc_wh - 1e-3 and slots_until_relax_anchor[t] > prewindow_slots: out.append(max(sm, float(arb_base_wh))) else: out.append(sm) return out @dataclass class DispatchResult: interval_start: datetime battery_setpoint_w: int # kladné = nabíjení, záporné = vybíjení battery_soc_target: float # % SoC na konci intervalu grid_setpoint_w: int # kladné = import, záporné = export export_limit_w: int # tvrdý limit exportu do sítě; 0 = bez exportu export_mode: str # NONE / PV_SURPLUS / BATTERY_SELL #: Explicitní fyzický režim Deye pro control exporter (PASSIVE / SELL / CHARGE). #: Cíl: odstranit heuristiky z exporteru a nést záměr přímo v plánu. deye_physical_mode: str #: True = v daném slotu odpojit GEN port (MI export cutoff) přes reg 178 bits0–1 (0-based; v UI často jako "register 179"). #: None = lokalita tuto funkci nemá / nepoužívá. deye_gen_cutoff_enabled: bool | None ev1_setpoint_w: Optional[int] ev2_setpoint_w: Optional[int] ev1_via_bat_w: int ev2_via_bat_w: int heat_pump_enabled: bool heat_pump_setpoint_w: int pv_a_curtailed_w: int expected_cost_czk: float effective_buy_price: float effective_sell_price: float is_predicted_price: bool # shodné s PlanningSlot (chybí OTE v efektivní ceně → fn_get_predicted_price) # ============================================================ # Korekce forecastu na základě skutečné výroby # ============================================================ async def compute_correction_factor( site_id: int, now: datetime, db, window_h: float = CORRECTION_WINDOW_H, ) -> tuple[float, dict]: """ Spočítá korekční faktor FVE forecastu z posledních window_h hodin. Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP]. factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný. """ window_start = now - timedelta(hours=window_h) raw = await db.fetchval( """ select ems.fn_pv_forecast_correction_factor( $1::int, $2::timestamptz, $3::timestamptz, $4::numeric, $5::numeric ) """, site_id, window_start, now, CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP, ) j = raw if isinstance(raw, dict) else json.loads(raw) factor = float(j.get("correction_factor", 1.0)) # JSON z DB má často ISO řetězce; asyncpg u $2/$3 vyžaduje datetime ws = _parse_json_dt(j.get("window_start")) or window_start we = _parse_json_dt(j.get("window_end")) or now log_data = { "window_start": ws, "window_end": we, "actual_pv_wh": j.get("actual_pv_wh"), "forecast_pv_wh": j.get("forecast_pv_wh"), "correction_factor": factor, "reason": j.get("reason", "ok"), } if j.get("raw_factor") is not None: log_data["raw_factor"] = j["raw_factor"] return factor, log_data def apply_forecast_correction( slots: list[PlanningSlot], now: datetime, factor: float, decay_slots: int = CORRECTION_DECAY_SLOTS, ) -> list[PlanningSlot]: """ Aplikuje korekční faktor na FVE forecast zbývajících slotů. Korekce se lineárně utlumuje: na 1. slotu plná korekce, na decay_slots-tém slotu žádná korekce. Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce """ corrected = [] for i, slot in enumerate(slots): if factor == 1.0 or i >= decay_slots: corrected.append(slot) continue # Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots) weight = 1.0 - (i / decay_slots) effective_factor = 1.0 + (factor - 1.0) * weight corrected.append( replace( slot, pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)), pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)), ) ) return corrected # ============================================================ # LP Solver # ============================================================ def _recompute_charge_acquisition_from_results( slots: list[PlanningSlot], results: list["DispatchResult"], battery, ) -> float: """Vážený buy z nabíjecích slotů (grid import + bat charge) z prvního solve.""" wh_total = 0.0 cost = 0.0 for s, r in zip(slots, results): if not s.allow_charge: continue gi_w = max(0, int(r.grid_setpoint_w or 0)) bc_w = max(0, int(r.battery_setpoint_w or 0)) wh = (gi_w + bc_w) * INTERVAL_H if wh <= 0: continue wh_total += wh cost += float(s.buy_price) * wh if wh_total <= 0: raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None) if raw is not None: return float(raw) return min(float(s.buy_price) for s in slots) return cost / wh_total def _slots_with_charge_acquisition( slots: list[PlanningSlot], acquisition_czk_kwh: float, ) -> list[PlanningSlot]: return [ replace(s, charge_acquisition_buy_czk_kwh=acquisition_czk_kwh) for s in slots ] def _pv_store_value_czk_kwh(slot: PlanningSlot, min_spread: float) -> float: """ Minimální sell [Kč/kWh], pod kterým je FVE→síť horší než uložení na večerní peak. Používá jen future_sell_opportunity (ne charge_acquisition — u fixního tarifu KV1 by jinak blokoval export i při kladném sell 2 Kč). """ future = float( slot.future_sell_opportunity_czk_kwh if slot.future_sell_opportunity_czk_kwh is not None else slot.sell_price ) return future - min_spread def _slot_profitable_battery_export( slot: PlanningSlot, *, charge_acquisition_czk_kwh: float, min_spread: float, fixed_tariff: bool, ) -> bool: """ Export z baterie do sítě má kladnou marži. Spot: sell > charge_acquisition + spread (energie ze sítě / vážený nákup). Fixní tarif (BA81/KV1): stejně jako R__063 discharge maska — sell > buy + spread; acquisition může být nafouknutá grid nabíjením a blokovat večerní špičku (3,7 < 3,9). """ sell_t = float(slot.sell_price) acq = float(charge_acquisition_czk_kwh) if fixed_tariff: buy_t = float(slot.buy_price) if buy_t >= 0.0: return sell_t > buy_t + min_spread return sell_t > acq + min_spread def _purchase_pricing_fixed(grid: Any) -> bool: """Režim nákupu z DB (`site_market_config.purchase_pricing_mode`), ne odhad z rozptylu buy.""" return ( str(getattr(grid, "purchase_pricing_mode", "spot") or "spot").strip().lower() == "fixed" ) def _horizon_fixed_tariff_like(slots: list[PlanningSlot]) -> bool: """ Heuristika pro drahý import / charge_acquisition: buy v horizontu je prakticky konstantní. U spotu (home-01) nesmí expensive_import používat charge_acquisition — jinak buy > ~1 Kč označí téměř všechny sloty jako drahé (gi=0 pro dům) → Infeasible. BA81 má fixní nákup v DB, ale NT/VT → buy skáče; proto neg-sell export řídí _purchase_pricing_fixed. """ buys = [float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0] if not buys: return False if len(buys) == 1: return True return max(buys) - min(buys) < 0.25 def _future_extreme_buy_from( slots: list[PlanningSlot], buy_thr: float, ) -> list[bool]: """True v t, pokud v některém budoucím slotu buy <= buy_thr.""" t_len = len(slots) out = [False] * t_len seen = False for i in range(t_len - 1, -1, -1): if float(slots[i].buy_price) <= buy_thr: seen = True out[i] = seen return out def _neg_sell_bat_dump_slots( slots: list[PlanningSlot], *, operating_mode: str, purchase_fixed: bool, grid: Any, buy_extreme_thr: float, degrad_czk_kwh: float, ) -> set[int]: """Sloty, kde smí ge_bat>0 při sell<0 (výboj před extrémně záporným buy).""" if operating_mode != "AUTO" or purchase_fixed: return set() if bool(getattr(grid, "block_export_on_negative_sell", False)): return set() t_len = len(slots) future_extreme = _future_extreme_buy_from(slots, buy_extreme_thr) dist = _slots_until_buy_le(slots, buy_extreme_thr) out: set[int] = set() for t, s in enumerate(slots): if float(s.sell_price) >= 0.0: continue future_min = min( (float(slots[j].buy_price) for j in range(t + 1, t_len)), default=float(s.buy_price), ) if ( future_extreme[t] and 0 < dist[t] <= EXTREME_BUY_DUMP_PREWINDOW_SLOTS and future_min < float(s.sell_price) - degrad_czk_kwh ): out.add(t) return out def _slots_until_buy_le( slots: list[PlanningSlot], buy_thr: float, ) -> list[int]: """Počet slotů do nejbližšího buy <= thr (0 = v tomto slotu, T = nikdy).""" t_len = len(slots) dist = [t_len] * t_len next_idx = t_len for i in range(t_len - 1, -1, -1): if float(slots[i].buy_price) <= buy_thr: next_idx = i dist[i] = (next_idx - i) if next_idx < t_len else t_len return dist def _pre_negative_sell_export_window( slots: list[PlanningSlot], ) -> tuple[int | None, int | None]: """Index prvního sell<0 a posledního slotu před ním (pro strategii „vyvézt dřív“).""" first_neg = next( (i for i, s in enumerate(slots) if float(s.sell_price) < 0), None, ) if first_neg is None or first_neg <= 0: return first_neg, None return first_neg, first_neg - 1 def _prague_calendar_date(slot: PlanningSlot): dt = slot.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(ZoneInfo("Europe/Prague")).date() MORNING_PRENEG_START_HOUR = 5 MORNING_PRENEG_END_HOUR = 11 PRENEG_MORNING_EXPORT_MIN_W = 8_000.0 EVENING_BATTERY_EXPORT_MIN_W = 8_000.0 def _prague_hour(slot: PlanningSlot) -> int: dt = slot.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(ZoneInfo("Europe/Prague")).hour def _morning_pre_neg_zone_peak_sell( slots: list[PlanningSlot], first_neg_sell_idx: int | None, ) -> float | None: """Max kladný sell v pásmu 5–11 Prague před prvním sell<0 (shodně s R__063).""" if first_neg_sell_idx is None or first_neg_sell_idx <= 0: return None neg_day = _prague_calendar_date(slots[first_neg_sell_idx]) sells = [ float(slots[i].sell_price) for i in range(first_neg_sell_idx) if float(slots[i].sell_price) >= 0.0 and _prague_calendar_date(slots[i]) == neg_day and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR ] if not sells: return None return max(sells) def _pre_neg_peak_sell_idx( slots: list[PlanningSlot], first_neg_sell_idx: int | None, ) -> int | None: """Nejvyšší kladný sell v ranním pásmu před prvním sell<0 (ne půlnoc celého dne).""" if first_neg_sell_idx is None or first_neg_sell_idx <= 0: return None zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx) if zone_peak is None: return None neg_day = _prague_calendar_date(slots[first_neg_sell_idx]) positive = [ (i, float(slots[i].sell_price)) for i in range(first_neg_sell_idx) if float(slots[i].sell_price) >= 0.0 and _prague_calendar_date(slots[i]) == neg_day and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR ] if not positive: return None return max(positive, key=lambda x: (x[1], x[0]))[0] def _morning_pre_neg_export_indices( slots: list[PlanningSlot], first_neg_sell_idx: int | None, *, degrad_czk_kwh: float, ) -> list[int]: """Všechny ranní peak sloty (sell ≥ zónový max − degrad) před prvním sell<0.""" zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx) if zone_peak is None or first_neg_sell_idx is None or first_neg_sell_idx <= 0: return [] neg_day = _prague_calendar_date(slots[first_neg_sell_idx]) out: list[int] = [] for i in range(first_neg_sell_idx): if ( float(slots[i].sell_price) >= zone_peak - degrad_czk_kwh and float(slots[i].sell_price) >= 0.0 and _prague_calendar_date(slots[i]) == neg_day and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR ): out.append(i) return out def _evening_peak_export_indices( slots: list[PlanningSlot], *, degrad_czk_kwh: float, evening_start_hour: int = 17, ) -> list[int]: """Večerní špičky per den (shodně s R__063, hour >= 17 Prague).""" peak_by_day: dict = {} for s in slots: if _prague_hour(s) < evening_start_hour: continue d = _prague_calendar_date(s) peak_by_day[d] = max(peak_by_day.get(d, 0.0), float(s.sell_price)) out: list[int] = [] for t, s in enumerate(slots): if _prague_hour(s) < evening_start_hour: continue d = _prague_calendar_date(s) peak = peak_by_day.get(d, 0.0) if peak > 0 and float(s.sell_price) >= peak - degrad_czk_kwh: out.append(t) return out def _evening_battery_export_push_indices( slots: list[PlanningSlot], *, profitable_export_ts: set[int], degrad_czk_kwh: float, evening_start_hour: int = 17, max_slots_per_day: int = 3, ) -> list[int]: """ Tvrdý push ge_bat jen u několika nejlepších večerních slotů/den (profitable ∩ peak). Jinak součet ge_bat × z_export přes celý peak pásmo může překročit dostupné SoC → Infeasible. """ peak_ts = _evening_peak_export_indices( slots, degrad_czk_kwh=degrad_czk_kwh, evening_start_hour=evening_start_hour, ) by_day: dict = {} for t in peak_ts: if t not in profitable_export_ts: continue d = _prague_calendar_date(slots[t]) by_day.setdefault(d, []).append(t) out: list[int] = [] for d in sorted(by_day.keys()): ranked = sorted( by_day[d], key=lambda i: float(slots[i].sell_price), reverse=True, ) out.extend(ranked[:max_slots_per_day]) return sorted(out) def _planner_soc_for_solver( current_soc_wh: float, battery, ) -> tuple[float, float | None]: """ SoC pro MILP. Při telemetrii na soc_max a dlouhém sell<0 s vysokou FVE bez rezervy pod stropem je model neřešitelný (nelze nabít / odvést přebytek). Necháme min. ~650 Wh pod soc_max. """ soc_max = float(battery.soc_max_wh) soc_min = float(battery.min_soc_wh) soc = max(soc_min, min(float(current_soc_wh), soc_max)) charge_slot_wh = ( float(battery.max_charge_power_w) * INTERVAL_H / max(float(battery.charge_efficiency), 1e-6) ) headroom = max(650.0, 0.382 * charge_slot_wh) if soc > soc_max - headroom: return max(soc_min, soc_max - headroom), headroom return soc, None def _pv_forced_vent_export_allowed( t: int, *, current_soc_wh: float, battery, soc_headroom_wh: float, pv_surplus_w: float, ) -> bool: """Přebytek FVE do sítě jen když baterie na konci předchozího slotu nemá kapacitu.""" if pv_surplus_w <= 0: return False if t == 0: return current_soc_wh >= float(battery.soc_max_wh) - soc_headroom_wh return False def solve_dispatch_two_pass( slots: list[PlanningSlot], battery, heat_pump, grid, ev_sessions: list, vehicles: list, current_soc_wh: float, current_tuv_temp_c: float, *, tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None, operating_mode: str = "AUTO", charge_commitment_prev_w: Optional[list[Optional[float]]] = None, planner_version: str | None = None, ) -> tuple[list["DispatchResult"], int, dict[str, Any]]: """ Dva průchody solve_dispatch: pass2 používá acquisition z váženého buy nabíjení v pass1. """ results1, ms1, snap1 = solve_dispatch( slots, battery, heat_pump, grid, ev_sessions, vehicles, current_soc_wh, current_tuv_temp_c, tuv_delta_stats=tuv_delta_stats, operating_mode=operating_mode, charge_commitment_prev_w=charge_commitment_prev_w, planner_version=planner_version, ) acq1 = float( snap1.get("inputs", {}).get("charge_acquisition_buy_czk_kwh") or getattr(slots[0], "charge_acquisition_buy_czk_kwh", None) or min(float(s.buy_price) for s in slots) ) acq2 = _recompute_charge_acquisition_from_results(slots, results1, battery) converged = abs(acq2 - acq1) < ACQUISITION_TWO_PASS_EPS_KWH if converged: if isinstance(snap1.get("inputs"), dict): snap1["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6) snap1["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6) snap1["inputs"]["two_pass_enabled"] = True snap1["inputs"]["two_pass_converged"] = True return results1, ms1, snap1 slots2 = _slots_with_charge_acquisition(slots, acq2) results2, ms2, snap2 = solve_dispatch( slots2, battery, heat_pump, grid, ev_sessions, vehicles, current_soc_wh, current_tuv_temp_c, tuv_delta_stats=tuv_delta_stats, operating_mode=operating_mode, charge_commitment_prev_w=charge_commitment_prev_w, planner_version=planner_version, ) if isinstance(snap2.get("inputs"), dict): snap2["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6) snap2["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6) snap2["inputs"]["two_pass_enabled"] = True snap2["inputs"]["two_pass_converged"] = False snap2["inputs"]["solver_duration_ms_pass1"] = ms1 return results2, ms1 + ms2, snap2 def solve_dispatch( slots: list[PlanningSlot], battery, heat_pump, grid, ev_sessions: list, # aktivní EV sessions [ev1_session, ev2_session] vehicles: list, # [vehicle1, vehicle2] current_soc_wh: float, current_tuv_temp_c: float, *, tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None, operating_mode: str = "AUTO", charge_commitment_prev_w: Optional[list[Optional[float]]] = None, planner_version: str | None = None, relaxed_expensive_import: bool = False, ) -> tuple[list[DispatchResult], int, dict[str, Any]]: """ LP solver pro dispatch optimalizaci. Vrátí (výsledky, solver_duration_ms, solver_debug_snapshot). relaxed_expensive_import: nouzový režim po Infeasible — síť smí krmit baseload v drahých slotech. """ T = len(slots) if T < 1: raise RuntimeError("solve_dispatch requires at least one slot") EV = len(vehicles) # počet EV (typicky 2) planner_version_resolved = _planner_engine_version(planner_version) planner_v2 = planner_version_resolved == "v2" EV_ROUNDTRIP_FACTOR = 1.0 / (battery.charge_efficiency * battery.discharge_efficiency) cycle_penalty_mult = _pv_scarcity_penalty_multiplier(slots, battery) degradation_cost_effective = battery.degradation_cost_czk_kwh * cycle_penalty_mult soc_buffer_target_wh, soc_deficit_penalty_czk_kwh = _soc_security_profile(slots, battery) prob = pulp.LpProblem("ems_dispatch", pulp.LpMinimize) # Penalizace překročení breakeru (Kč/kWh importu nad max_import_power_w). # Záměr: breaker je fyzický strop, ale kvůli chybám forecastu a krátkým „extrémním“ oknům # (např. záporná nákupní cena) umožníme solveru nominálně jít nad breaker, ovšem pouze za cenu. IMPORT_OVER_BREAKER_PENALTY_CZK_KWH = 10.0 min_soc_wh = float(getattr(battery, "min_soc_wh", battery.reserve_soc_wh)) buy_extreme_thr = float(getattr(battery, "planner_extreme_buy_threshold_czk_kwh", -5.0)) floor_pct_raw = getattr(battery, "planner_discharge_floor_percent", None) floor_pct = float(floor_pct_raw) if floor_pct_raw is not None else None prewin = max( 0, int( getattr( battery, "planner_discharge_relax_prewindow_slots", DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS, ) ), ) # Planner floor v Wh (nezávisle na lookahead extrémním buy) – použije se pro kotvu před sell<0. abs_min_wh = max(float(battery.usable_capacity_wh) * 0.05, 1.0) planner_floor_wh = ( min_soc_wh if floor_pct is None else max(abs_min_wh, float(floor_pct) / 100.0 * float(battery.usable_capacity_wh)) ) planner_floor_effective_wh = min(min_soc_wh, float(planner_floor_wh)) soc_min_series = _soc_min_wh_series( slots, float(battery.usable_capacity_wh), min_soc_wh, buy_extreme_thr, floor_pct, ) # Pokud se blíží první sell<0, dovol hluboký planner floor i bez extrémního buy. # Záměr: „dovylít“ baterii před záporným prodejem a pak už baterii v sell<0 okně nevybíjet. if floor_pct is not None: dist_to_neg_sell = _slots_until_sell_lt(slots, 0.0) soc_min_series = [ min(float(sm), float(planner_floor_effective_wh)) if dist_to_neg_sell[i] <= prewin else float(sm) for i, sm in enumerate(soc_min_series) ] soc_headroom_applied_wh: float | None = None current_soc_wh, soc_headroom_applied_wh = _planner_soc_for_solver( current_soc_wh, battery ) current_soc_wh = max(soc_min_series[0], min(current_soc_wh, float(battery.soc_max_wh))) arb_base_wh = max( float(getattr(battery, "arb_floor_wh", battery.reserve_soc_wh)), min_soc_wh, ) if getattr(battery, "disable_dynamic_arb_floor", False): arb_floor_series = [arb_base_wh] * T else: arb_floor_series = _dynamic_arb_floor_wh_series( slots, min_soc_wh, arb_base_wh, float(battery.usable_capacity_wh) ) deferral_slots = _prewindow_deferral_slots(slots, buy_extreme_thr) soc_panel_min = _soc_panel_min_wh_series( soc_min_series, deferral_slots, min_soc_wh, arb_base_wh, prewin, ) # --- Proměnné --- # Import ze sítě: tvrdý strop = site breaker (max_import_power_w). gi_upper = float(grid.max_import_power_w) gi = [pulp.LpVariable(f"gi_{t}", 0, gi_upper) for t in range(T)] gi_over = [ pulp.LpVariable(f"gi_over_{t}", 0, max(0.0, gi_upper - float(grid.max_import_power_w))) for t in range(T) ] ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)] ge_pv = [pulp.LpVariable(f"ge_pv_{t}", 0, grid.max_export_power_w) for t in range(T)] ge_bat = [pulp.LpVariable(f"ge_bat_{t}", 0, grid.max_export_power_w) for t in range(T)] bc_pv = [pulp.LpVariable(f"bc_pv_{t}", 0, battery.max_charge_power_w) for t in range(T)] bc_gi = [pulp.LpVariable(f"bc_gi_{t}", 0, battery.max_charge_power_w) for t in range(T)] bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)] pv_ld = [pulp.LpVariable(f"pv_ld_{t}", 0) for t in range(T)] pv_sp = [pulp.LpVariable(f"pv_sp_{t}", 0) for t in range(T)] soc = [ pulp.LpVariable(f"soc_{t}", soc_panel_min[t], battery.soc_max_wh) for t in range(T) ] w_arb = [pulp.LpVariable(f"w_arb_{t}", cat=pulp.LpBinary) for t in range(T)] z_export = [pulp.LpVariable(f"z_export_{t}", cat=pulp.LpBinary) for t in range(T)] ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)] hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)] soc_deficit_24h = pulp.LpVariable("soc_deficit_24h", 0, battery.usable_capacity_wh) soc_anchor_slack = None t_anchor = None # GEN port cut-off (BA81): binární proměnná pouze pokud je feature povolená v konfiguraci site/invertoru. gen_cutoff_enabled = bool(getattr(grid, "deye_gen_microinverter_cutoff_enabled", False)) z_gen_cutoff = ( [pulp.LpVariable(f"z_gen_cutoff_{t}", cat=pulp.LpBinary) for t in range(T)] if gen_cutoff_enabled else None ) om = (operating_mode or "AUTO").strip().upper() charge_slots: set[int] = set() discharge_export_slots: set[int] = set() if om == "AUTO": charge_slots = {t for t, s in enumerate(slots) if s.allow_charge} charge_slots |= { t for t, s in enumerate(slots) if float(s.buy_price) < 0.0 } # Stejně jako R__063 (sell<0 + PV přebytek): shortfall/curtail penalizace i bez block_export. charge_slots |= { t for t, s in enumerate(slots) if float(s.sell_price) < 0.0 and max( 0, int(s.pv_a_forecast_w) + int(s.pv_b_forecast_w) - int(s.load_baseline_w), ) > 500 } discharge_export_slots = { t for t, s in enumerate(slots) if s.allow_discharge_export } # SELF_SUSTAIN dřív vynucoval ge[t] == 0, což umí udělat MILP infeasible v okamžiku, kdy: # - baterie je na max SoC (nelze nabíjet), # - PV pole B není curtailable, # - a pv_b_forecast_w > load_baseline_w (typicky po ručním snížení baseline). # Export v SELF_SUSTAIN proto povolíme jako nouzový ventil, ale silně penalizujeme, # aby k němu docházelo jen když už neexistuje jiné fyzikálně možné řešení. SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH = 100.0 # Penalizace vypnutí GEN portu (mikroinvertory): preferujeme nechat zapnuto a vypnout jen když # by to jinak vedlo k nežádoucímu exportu / infeasible řešení. GEN_CUTOFF_PENALTY_CZK_KWH = 2.0 if planner_v2 else 5.0 # Heuristika: pokud existuje necurtailable PV B a v budoucnu v horizontu nastane buy < 0, # chceme mít motivaci držet baterii „prázdnější“ pro pozdější výhodný import / bonusové PV B okno. # V okně sell < 0 pak preferujeme curtail PV A (místo placeného exportu), a to tak, # že dočasně snížíme penalizaci ca[t] (curtailment) na 0. has_pv_b = any(float(s.pv_b_forecast_w) > 0.0 for s in slots) future_neg_buy_from: list[bool] = [False] * T seen_neg_buy = False for i in range(T - 1, -1, -1): if float(slots[i].buy_price) < 0.0: seen_neg_buy = True future_neg_buy_from[i] = seen_neg_buy future_extreme_buy_from = _future_extreme_buy_from(slots, buy_extreme_thr) dist_to_extreme_buy = _slots_until_buy_le(slots, buy_extreme_thr) # EV proměnné per vozidlo ev_direct = [[pulp.LpVariable(f"evd_{e}_{t}", 0, min(vehicles[e].max_charge_power_w, grid.max_import_power_w)) for t in range(T)] for e in range(EV)] ev_via_bat = [[pulp.LpVariable(f"evb_{e}_{t}", 0, vehicles[e].max_charge_power_w) for t in range(T)] for e in range(EV)] horizon_slots_h24 = min(T, int(24 / INTERVAL_H)) avg_buy_terminal = ( sum(float(slots[t].buy_price) for t in range(horizon_slots_h24)) / horizon_slots_h24 if horizon_slots_h24 > 0 else 4.0 ) terminal_factor = float(battery.planner_terminal_soc_value_factor) # Kč/Wh: ocenění energie ponechané v baterii na konci horizontu (receding horizon kotva). terminal_soc_kcz_per_wh = ( avg_buy_terminal * terminal_factor / 1000.0 ) charge_acq_raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None) charge_acquisition_czk_kwh = ( float(charge_acq_raw) if charge_acq_raw is not None else min(float(s.buy_price) for s in slots) ) soc_headroom_wh = max(2000.0, 0.05 * float(battery.soc_max_wh)) # Kotva: poslední slot před prvním sell<0 by měl končit u planner floor (pokud relaxace existuje). # Slack penalizujeme v objective; samotné omezení přidáme až po definici soc. first_neg_sell_idx, pre_neg_export_last_t = _pre_negative_sell_export_window(slots) last_neg_sell_by_prague_date: dict[object, int] = {} for t_ln, st_ln in enumerate(slots): if float(st_ln.sell_price) < 0: last_neg_sell_by_prague_date[_prague_calendar_date(st_ln)] = t_ln t_pre_neg_peak = _pre_neg_peak_sell_idx(slots, first_neg_sell_idx) morning_pre_neg_export_ts = _morning_pre_neg_export_indices( slots, first_neg_sell_idx, degrad_czk_kwh=float(degradation_cost_effective), ) evening_peak_export_ts = _evening_peak_export_indices( slots, degrad_czk_kwh=float(degradation_cost_effective), ) non_negative_buys_pre = [ float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0 ] ref_buy_horizon_pre = ( min(non_negative_buys_pre) if non_negative_buys_pre else min(float(s.buy_price) for s in slots) ) min_spread_pre = float(degradation_cost_effective) purchase_fixed_pre = _purchase_pricing_fixed(grid) fixed_tariff_like_pre = purchase_fixed_pre or _horizon_fixed_tariff_like(slots) neg_sell_bat_dump_slots = _neg_sell_bat_dump_slots( slots, operating_mode=om, purchase_fixed=purchase_fixed_pre, grid=grid, buy_extreme_thr=buy_extreme_thr, degrad_czk_kwh=float(degradation_cost_effective), ) profitable_export_ts_pre: set[int] = set() if om == "AUTO": for _t in range(T): if _t not in discharge_export_slots: continue if _slot_profitable_battery_export( slots[_t], charge_acquisition_czk_kwh=charge_acquisition_czk_kwh, min_spread=min_spread_pre, fixed_tariff=fixed_tariff_like_pre, ): profitable_export_ts_pre.add(_t) if first_neg_sell_idx is not None and first_neg_sell_idx > 0 and floor_pct is not None: # Kotva na ranním peaku (ne na posledním slotu před sell<0) — jinak dump až v 07:30. if ( t_pre_neg_peak is not None and t_pre_neg_peak < first_neg_sell_idx - 1 ): t_anchor = t_pre_neg_peak else: t_anchor = first_neg_sell_idx - 1 soc_anchor_slack = pulp.LpVariable("soc_anchor_slack_wh", 0, float(battery.usable_capacity_wh)) daytime_en = bool(getattr(battery, "planner_daytime_charge_target_enabled", True)) safety_pen_czk_per_wh: list[float] = [] safety_vars: list[Optional[pulp.LpVariable]] = [] safety_active: list[bool] = [] post_neg_pv_topup: list[bool] = [] high_sell_slot: list[bool] = [] for t in range(T): sft = slots[t].safety_soc_target_wh if daytime_en else None # High-sell slot: typicky lokální maximum v SQL lookaheadu (future_sell_opportunity_czk_kwh). # V těchto slotech safety floor nepoužijeme, aby se zachovala arbitráž na špičkách. fso = slots[t].future_sell_opportunity_czk_kwh hs = bool(fso is not None and float(slots[t].sell_price) >= float(fso) - 1e-6) high_sell_slot.append(hs) fb = float(slots[t].future_avoided_buy_czk_kwh or slots[t].buy_price) fs = float(slots[t].future_sell_opportunity_czk_kwh or slots[t].sell_price) bv = max(fb, fs) - float(degradation_cost_effective) bv = max(0.0, min(5.0, bv)) st_d = _prague_calendar_date(slots[t]) ln_neg = last_neg_sell_by_prague_date.get(st_d) pv_topup_after_neg = bool( om == "AUTO" and ln_neg is not None and t > ln_neg and float(slots[t].sell_price) >= 0.0 and bool(slots[t].is_daytime_pv_surplus_slot) and not hs ) post_neg_pv_topup.append(pv_topup_after_neg) # Safety deficit penalizujeme jen v PV surplus slotech, a ne ve high-sell špičce. # Záměr: safety není obecná „nabij co nejdřív“ motivace; je to preference využít přebytek PV. active = bool( ( sft is not None and ( bool(slots[t].is_daytime_pv_surplus_slot) or (planner_v2 and float(slots[t].buy_price) < 0.0) ) and not hs ) or pv_topup_after_neg ) safety_active.append(active) safety_pen_czk_per_wh.append(bv / 1000.0 if active else 0.0) if active: safety_vars.append( pulp.LpVariable(f"safety_def_{t}", 0, float(battery.usable_capacity_wh)) ) else: safety_vars.append(None) commit_pen = float(getattr(battery, "planner_charge_commitment_penalty_czk_kwh", 0.2)) commit_lp: list[tuple[int, pulp.LpVariable, float]] = [] if charge_commitment_prev_w is not None and len(charge_commitment_prev_w) == T: for t in range(T): prev = charge_commitment_prev_w[t] if prev is not None and prev > 500: cap_prev = float(prev) cv = pulp.LpVariable(f"ccommit_{t}", 0, cap_prev) commit_lp.append((t, cv, cap_prev)) peak_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = [] pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = [] neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = [] neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable]] = [] fixed_tariff_like = fixed_tariff_like_pre block_export_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False)) if om == "AUTO": for t in range(T): if t not in discharge_export_slots: continue if not _slot_profitable_battery_export( slots[t], charge_acquisition_czk_kwh=charge_acquisition_czk_kwh, min_spread=float(degradation_cost_effective), fixed_tariff=fixed_tariff_like, ): continue cap_w = float(min( grid.max_export_power_w, battery.max_discharge_power_w, )) sf = pulp.LpVariable(f"export_shortfall_{t}", 0, cap_w) peak_export_shortfall.append((t, sf, cap_w)) for t in range(T): if float(slots[t].sell_price) >= 0: continue if t not in charge_slots: continue pv_surplus_w = max( 0.0, float(slots[t].pv_a_forecast_w) + float(slots[t].pv_b_forecast_w) - float(slots[t].load_baseline_w), ) if pv_surplus_w <= 500: continue cap_w = float(min(pv_surplus_w, battery.max_charge_power_w)) sf_pv = pulp.LpVariable(f"pv_charge_shortfall_{t}", 0, cap_w) pv_charge_shortfall.append((t, sf_pv, cap_w)) for t in range(T): if not post_neg_pv_topup[t]: continue if float(slots[t].sell_price) < 0: continue pv_surplus_w = max( 0.0, float(slots[t].pv_a_forecast_w) + float(slots[t].pv_b_forecast_w) - float(slots[t].load_baseline_w), ) if pv_surplus_w <= 500: continue cap_w = float(min(pv_surplus_w, battery.max_charge_power_w)) sf_pv = pulp.LpVariable(f"post_neg_pv_shortfall_{t}", 0, cap_w) pv_charge_shortfall.append((t, sf_pv, cap_w)) for t in range(T): if float(slots[t].sell_price) >= 0: continue if t not in charge_slots: continue pv_surplus_w = max( 0.0, float(slots[t].pv_a_forecast_w) + float(slots[t].pv_b_forecast_w) - float(slots[t].load_baseline_w), ) if pv_surplus_w <= 500: continue us = pulp.LpVariable( f"neg_soc_under_{t}", 0, float(battery.usable_capacity_wh), ) neg_sell_soc_underfill.append((t, us)) for t in neg_sell_bat_dump_slots: dump_target_w = min( float(EVENING_BATTERY_EXPORT_MIN_W), float(battery.max_discharge_power_w), float(grid.max_export_power_w), ) sf_dump = pulp.LpVariable(f"neg_bat_dump_shortfall_{t}", 0, dump_target_w) neg_sell_bat_dump_shortfall.append((t, sf_dump, dump_target_w)) # --- Účelová funkce (jen OTE sloty; terminal SoC shadow price na konci horizontu) --- # Kanály: gi×buy, −ge_pv×sell, −ge_bat×sell, +ge_bat×acquisition (export bat. jen v discharge slotách). # Viz docs/04-modules/planning-arbitrage-accounting.md — mezi-slotová arbitráž, ne sell vs buy v jednom slotu. prob += ( pulp.lpSum( gi[t] * slots[t].buy_price * INTERVAL_H / 1000 - ge_pv[t] * slots[t].sell_price * INTERVAL_H / 1000 - ge_bat[t] * slots[t].sell_price * INTERVAL_H / 1000 + ( ge_pv[t] * SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH * INTERVAL_H / 1000 if om == "SELF_SUSTAIN" else 0 ) + ( (slots[t].pv_b_forecast_w * z_gen_cutoff[t]) * GEN_CUTOFF_PENALTY_CZK_KWH * INTERVAL_H / 1000 if z_gen_cutoff is not None else 0 ) + gi_over[t] * IMPORT_OVER_BREAKER_PENALTY_CZK_KWH * INTERVAL_H / 1000 + 0.5 * (bc_pv[t] + bc_gi[t] + bd[t]) * degradation_cost_effective * INTERVAL_H / 1000 - ( pv_ld[t] * LOAD_FIRST_INCENTIVE_CZK_KWH * INTERVAL_H / 1000 if om == "AUTO" else 0 ) + ( ge_bat[t] * charge_acquisition_czk_kwh * INTERVAL_H / 1000 if om == "AUTO" and t in discharge_export_slots else 0 ) - ( bc_pv[t] * NEG_SELL_PV_CHARGE_REWARD_CZK_KWH * INTERVAL_H / 1000 if ( om == "AUTO" and float(slots[t].sell_price) < 0.0 and t in charge_slots ) else 0 ) + ( ge_pv[t] * NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH * INTERVAL_H / 1000 if ( om == "AUTO" and float(slots[t].sell_price) < 0.0 and not purchase_fixed_pre ) else 0 ) + pulp.lpSum( ev_direct[e][t] * slots[t].buy_price * INTERVAL_H / 1000 + ev_via_bat[e][t] * slots[t].buy_price * EV_ROUNDTRIP_FACTOR * INTERVAL_H / 1000 for e in range(EV) ) + ca[t] * ( NEG_SELL_CURTAIL_PENALTY_CZK_KWH if ( om == "AUTO" and float(slots[t].sell_price) < 0.0 and t in charge_slots ) else ( 0.0 if ( has_pv_b and future_neg_buy_from[t] and float(slots[t].sell_price) < 0.0 ) else CURTAILMENT_PENALTY ) ) for t in range(T) ) + soc_deficit_24h * soc_deficit_penalty_czk_kwh / 1000 - terminal_soc_kcz_per_wh * soc[T - 1] + ( soc_anchor_slack * PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH if soc_anchor_slack is not None else 0 ) + pulp.lpSum( safety_vars[t] * safety_pen_czk_per_wh[t] for t in range(T) if safety_vars[t] is not None ) + pulp.lpSum(cv * INTERVAL_H / 1000.0 * commit_pen for _t, cv, _p in commit_lp) + pulp.lpSum( sf * PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0 for _t, sf, _cap in peak_export_shortfall ) + pulp.lpSum( sf * PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0 for _t, sf, _cap in pv_charge_shortfall ) + pulp.lpSum( us * NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH for _t, us in neg_sell_soc_underfill ) + pulp.lpSum( sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0 for _t, sf, _cap in neg_sell_bat_dump_shortfall ) + pulp.lpSum( -25.0 * z_export[t] for t in range(T) if t in discharge_export_slots and t in profitable_export_ts_pre ) ) # --- Omezení --- for t_sf, sf, cap_w in peak_export_shortfall: prob += sf >= cap_w - ge_bat[t_sf] for t_sf, sf, cap_w in pv_charge_shortfall: prob += sf >= cap_w - bc_pv[t_sf] for t_sf, sf, cap_w in neg_sell_bat_dump_shortfall: prob += sf >= cap_w - ge_bat[t_sf] for t_us, us in neg_sell_soc_underfill: prob += us >= float(battery.soc_max_wh) - soc[t_us] preneg_export_min_soc_wh = float(min_soc_wh) + max( float(battery.max_discharge_power_w) * float(battery.discharge_efficiency) * INTERVAL_H, 1000.0, ) if om == "AUTO": profitable_export_ts = profitable_export_ts_pre export_push_w = min( float(EVENING_BATTERY_EXPORT_MIN_W), float(battery.max_discharge_power_w), float(grid.max_export_power_w), ) for t_peak in morning_pre_neg_export_ts: if t_peak in profitable_export_ts: prob += ge_bat[t_peak] >= float(PRENEG_MORNING_EXPORT_MIN_W) * z_export[t_peak] evening_export_push_w = export_push_w evening_push_ts = _evening_battery_export_push_indices( slots, profitable_export_ts=profitable_export_ts, degrad_czk_kwh=float(degradation_cost_effective), ) for t_peak in evening_push_ts: if t_peak not in discharge_export_slots: continue prob += ge_bat[t_peak] >= evening_export_push_w * z_export[t_peak] # Ostatní profitable sloty: jen shortfall penalizace (ne tvrdý push na celý horizont). if t_anchor is not None and soc_anchor_slack is not None: target_floor_wh = float(planner_floor_effective_wh) prob += soc[t_anchor] <= target_floor_wh + soc_anchor_slack for t in range(T): s = slots[t] pv_a_net = s.pv_a_forecast_w - ca[t] ev_total_t = pulp.lpSum(ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV)) # Energetická bilance pv_b_effective = ( float(s.pv_b_forecast_w) * (1 - z_gen_cutoff[t]) if z_gen_cutoff is not None else float(s.pv_b_forecast_w) ) pv_total_ub = float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) # Součet nabíjení z FVE + ze sítě nesmí překročit max_charge_power_w baterie. prob += bc_pv[t] + bc_gi[t] <= battery.max_charge_power_w # Vybíjení do domu (bd) + export z baterie (ge_bat) sdílí jeden BMS limit. prob += bd[t] + ge_bat[t] <= battery.max_discharge_power_w # Breaker: import ze site je tvrdě omezen (gi_over jen numerická pojistka). prob += gi[t] <= gi_upper if om == "AUTO": load_site_expr = float(s.load_baseline_w) + ev_total_t + hp[t] prob += pv_ld[t] + pv_sp[t] == pv_a_net + pv_b_effective prob += pv_ld[t] <= load_site_expr prob += pv_ld[t] <= pv_a_net + pv_b_effective prob += pv_sp[t] <= pv_total_ub prob += pv_sp[t] >= pv_a_net + pv_b_effective - load_site_expr prob += bc_pv[t] <= pv_sp[t] prob += bc_gi[t] <= gi[t] prob += ge_pv[t] <= pv_sp[t] prob += bc_pv[t] + ge_pv[t] <= pv_sp[t] # Import na deficit po PV→load, nebo na grid-nabíjení (bc_gi). prob += gi[t] <= load_site_expr + bc_gi[t] # Vybíjení do domu až po pv_ld (Deye load-first); v exportních slotech smí bd→síť. if t not in discharge_export_slots: prob += bd[t] <= load_site_expr - pv_ld[t] prob += pv_ld[t] >= load_site_expr - gi[t] - bd[t] # Plná bilance (pv_ld+pv_sp rozpad je ortogonální k tokům přebytku). prob += ( pv_a_net + pv_b_effective + gi[t] + bd[t] == float(s.load_baseline_w) + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t] ) else: prob += pv_ld[t] == 0 prob += pv_sp[t] == pv_a_net + pv_b_effective prob += bc_pv[t] <= pv_sp[t] prob += bc_gi[t] <= gi[t] prob += ( pv_a_net + pv_b_effective + gi[t] + bd[t] == s.load_baseline_w + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t] ) prob += ge[t] == ge_pv[t] + ge_bat[t] # Baterie nesmí „přestrojit“ FVE export: jen z pv_sp (po load-first). if om == "AUTO": prob += ge_bat[t] >= ge[t] - pv_sp[t] else: prob += ge_bat[t] >= ge[t] - (pv_a_net + pv_b_effective) # Měkký breaker cap: gi_over[t] >= max(0, gi[t] - breaker). prob += gi_over[t] >= gi[t] - float(grid.max_import_power_w) # SoC kontinuita (bd do domu i ge_bat do sítě vybíjí baterii) soc_prev = current_soc_wh if t == 0 else soc[t - 1] prob += soc[t] == ( soc_prev + (bc_pv[t] + bc_gi[t]) * battery.charge_efficiency * INTERVAL_H - (bd[t] + ge_bat[t]) / battery.discharge_efficiency * INTERVAL_H ) sv = safety_vars[t] tgt_s = slots[t].safety_soc_target_wh if daytime_en else None if sv is not None: eff_tgt_s = float(tgt_s) if tgt_s is not None else float(min_soc_wh) if ( om == "AUTO" and float(s.sell_price) < 0.0 and t in charge_slots ): # Záporný výkup: dobít na planner soc_max (typicky 95–100 %), ne jen SQL safety ~50 %. eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh)) elif post_neg_pv_topup[t]: # Po konci sell<0: dobit z FVE na plno, pak teprve export (kladný sell, ne večerní peak). eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh)) prob += sv >= eff_tgt_s - soc[t] # ev_via_bat kryto z discharge prob += pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t] # GEN port cut-off chceme vůbec připustit jen v režimech/politikách, kde má smysl: # - SELF_SUSTAIN (no-export intent; typicky ge=0, takže cut-off je bezpečnostní ventil), # - BLOCK_EXPORT okna (v projektu reprezentované sloty se sell_price < 0), # - případně explicitní no_export politika (pokud bude v kontextu dostupná). allow_gen_cutoff = ( om == "SELF_SUSTAIN" or float(s.sell_price) < 0 or bool(getattr(grid, "no_export", False)) ) if z_gen_cutoff is not None and not allow_gen_cutoff: prob += z_gen_cutoff[t] == 0 # Záporná nákupní cena → import jen na load + nabíjení + EV + TČ (stále ≤ breaker). if s.buy_price < 0: prob += gi[t] <= min( gi_upper, float(s.load_baseline_w) + battery.max_charge_power_w + sum(v.max_charge_power_w for v in vehicles) + heat_pump.rated_heating_power_w, ) # Záporný prodej (sell < 0): výboj baterie jen před extrémně záporným buy (v11). # Export FVE při sell<0: spot = nabíjení/curtail A; ventil jen pole B při plné baterii. if s.sell_price < 0: prob += w_arb[t] == 0 prob += bd[t] <= pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) block_neg_sell_export_t = bool( getattr(grid, "block_export_on_negative_sell", False) ) if t not in neg_sell_bat_dump_slots: prob += ge_bat[t] == 0 ev_cap_neg = sum( float(vehicles[e].max_charge_power_w) for e in range(EV) if (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected) ) load_neg = ( float(s.load_baseline_w) + ev_cap_neg + float(heat_pump.rated_heating_power_w) ) pv_surplus_neg_w = max( 0.0, float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_neg, ) # FVE→síť při záporném výkupu: u KV1 (block_export) jen bc/curtail A; # u home-01 s polem B musí přebytek jít do sítě (ge_pv), jinak infeasible. block_pv_export_neg_sell = bool( getattr(grid, "block_export_on_negative_sell", False) ) or ( float(s.pv_b_forecast_w) <= 0 and not _pv_forced_vent_export_allowed( t, current_soc_wh=current_soc_wh, battery=battery, soc_headroom_wh=soc_headroom_wh, pv_surplus_w=pv_surplus_neg_w, ) ) if block_pv_export_neg_sell: prob += ge_pv[t] == 0 # Tvrdý zákaz vývozu jen při block_export_on_negative_sell (KV1). if block_neg_sell_export_t: prob += ge[t] == 0 prob += ge_pv[t] == 0 prob += ge_bat[t] == 0 elif purchase_fixed_pre: # Fixní nákup + spot výkup (BA81, KV1 bez block_export): sell<0 = platíš za vývoz. prob += ge[t] == 0 prob += ge_pv[t] == 0 elif not purchase_fixed_pre: # Spot (home-01): ge_pv=0 dokud není plná baterie; pak jen ventil pole B (ne celý surplus). soc_prev_neg = current_soc_wh if t == 0 else soc[t - 1] w_pv_b_vent = pulp.LpVariable(f"w_pv_b_vent_neg_{t}", cat=pulp.LpBinary) m_soc_neg = float(battery.soc_max_wh) prob += soc_prev_neg >= ( m_soc_neg - soc_headroom_wh - m_soc_neg * (1 - w_pv_b_vent) ) prob += ge_pv[t] <= float(s.pv_b_forecast_w) * w_pv_b_vent soc_prev_expr = current_soc_wh if t == 0 else soc[t - 1] arb_t = arb_floor_series[t] soc_low_t = soc_panel_min[t] # Při relaxovaném dnu (soc_low pod DB min_soc Wh) nesmí větev w_arb=1 znovu vynutit arb_t # (typicky ~rezerva 20 %) — jinak nejde „vypustit“ baterku k planner floor 5 %. if soc_low_t < min_soc_wh - 1e-3: arb_cap_t = min(arb_t, soc_low_t) else: arb_cap_t = arb_t if om == "AUTO" and t in discharge_export_slots: prob += soc_prev_expr >= ( arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t]) ) prob += bd[t] <= ( battery.max_discharge_power_w * w_arb[t] + pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) ) elif om == "AUTO": # PASSIVE: vlastní spotřeba (bd); export baterie jen ge_bat (ge_bat=0 níže). prob += soc_prev_expr >= ( arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t]) ) prob += bd[t] <= ( s.load_baseline_w + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] ) else: prob += soc_prev_expr >= ( arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t]) ) prob += bd[t] <= ( s.load_baseline_w + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + battery.max_discharge_power_w * w_arb[t] ) # Významný export z baterie ⇒ koncové SoC ≥ podlaha (FVE export ge_pv bez této podlahy). m_ge = float(grid.max_export_power_w) m_soc_bigm = float(battery.usable_capacity_wh) if t in neg_sell_bat_dump_slots: prob += ge_bat[t] <= m_ge else: prob += ge_bat[t] <= m_ge * z_export[t] prob += ge_bat[t] >= GE_MIN_EXPORT_W * z_export[t] # Bez hluboké relaxace: export končí ≥ rezerva. Při hluboké relaxaci (soc_panel_min pod min_soc) # sladit s LP spodkem — jinak z_export vynutil arb_base a blokoval vývoz k planner floor. if ( om == "AUTO" and first_neg_sell_idx is not None and t < first_neg_sell_idx and floor_pct is not None ): export_soc_floor_t = float(planner_floor_effective_wh) elif ( om == "AUTO" and t in morning_pre_neg_export_ts and floor_pct is not None ): export_soc_floor_t = float(planner_floor_effective_wh) elif soc_panel_min[t] < min_soc_wh - 1e-3: export_soc_floor_t = float(soc_panel_min[t]) else: export_soc_floor_t = float(arb_base_wh) # Večerní exportní slot: podlaha jen min_soc (ne safety ramp), aby šlo vybít při z_export=1. if ( om == "AUTO" and t in discharge_export_slots and t in evening_peak_export_ts ): export_soc_floor_t = float(min_soc_wh) # Safety export floor: v běžných (ne high-sell) slotech nevybít exportem energii potřebnou pro # robustnost/noční baseload. Použije se pouze pokud je safety target v SQL vyplněný. tgt_s = slots[t].safety_soc_target_wh if daytime_en else None if ( tgt_s is not None and not high_sell_slot[t] and t not in profitable_export_ts_pre and not ( om == "AUTO" and t in discharge_export_slots and t in evening_peak_export_ts ) ): export_soc_floor_t = max( export_soc_floor_t, min( float(battery.soc_max_wh), max(min_soc_wh, float(tgt_s)), ), ) prob += soc[t] >= export_soc_floor_t - m_soc_bigm * (1 - z_export[t]) # EV – limity a připojení for e in range(EV): connected = ( (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected) ) if not connected: prob += ev_direct[e][t] == 0 prob += ev_via_bat[e][t] == 0 else: prob += ev_direct[e][t] + ev_via_bat[e][t] <= vehicles[e].max_charge_power_w for tt, cv, prev in commit_lp: prob += cv >= prev - (bc_pv[tt] + bc_gi[tt]) if om == "SELF_SUSTAIN": for t in range(T): prob += gi[t] <= slots[t].load_baseline_w elif om == "PRESERVE": for t in range(T): prob += bc_pv[t] == 0 prob += bc_gi[t] == 0 prob += bd[t] == 0 elif om == "CHARGE_CHEAP": for t in range(T): prob += ge[t] == 0 prob += ge_pv[t] == 0 prob += ge_bat[t] == 0 prob += bd[t] == 0 # Slot pre-selection (z DB fn_load_planning_slots_full → allow_*) if om == "AUTO": for t in range(T): if t not in charge_slots: s = slots[t] pv_surplus_w = max( 0, int(s.pv_a_forecast_w) + int(s.pv_b_forecast_w) - int(s.load_baseline_w), ) # Mimo grid-charge masku: jen PV přebytek; výjimka záporný buy (spot arbitráž). if float(s.buy_price) >= 0.0: prob += bc_gi[t] == 0 if pv_surplus_w <= 0: prob += bc_pv[t] == 0 else: prob += bc_pv[t] <= float(pv_surplus_w) if t not in discharge_export_slots and t not in neg_sell_bat_dump_slots: prob += ge_bat[t] == 0 prob += z_export[t] == 0 # Ekonomické guardy: ceny v objective nestačí proti maskám / terminal SoC. # Referenční buy jen z ne-záporných slotů: jinak jeden buy<0 v horizontu označí # téměř všechny sloty jako „drahé“ (gi=0 pro dům) → Infeasible (home-01). non_negative_buys = [ float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0 ] ref_buy_horizon = ( min(non_negative_buys) if non_negative_buys else min(float(s.buy_price) for s in slots) ) min_spread = float(degradation_cost_effective) for t in range(T): s = slots[t] buy_t = float(s.buy_price) sell_t = float(s.sell_price) load_t = float(s.load_baseline_w) ev_cap_t = sum( float(vehicles[e].max_charge_power_w) for e in range(EV) if (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected) ) pv_surplus_w = max( 0.0, float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_t, ) # FVE export: před prvním sell<0 smí jít přebytek do sítě (kladný sell), pak nabít # v záporném okně z PV. Jinak držet energii na future_sell peak. allow_pre_neg_pv_export = ( first_neg_sell_idx is not None and pre_neg_export_last_t is not None and t <= pre_neg_export_last_t and sell_t >= 0 ) pv_store_val = _pv_store_value_czk_kwh(s, min_spread) skip_pv_store_block = ( float(s.pv_b_forecast_w) > 0 and not getattr(grid, "block_export_on_negative_sell", False) and sell_t < 0 and not purchase_fixed_pre ) or ( # KV1: plná baterie + kladný sell — neblokovat ge_pv==0 (jinak masivní curtail). getattr(grid, "block_export_on_negative_sell", False) and sell_t >= 0 and pv_surplus_w > 500 ) # BA81: export pole B jen při kladném sell (po sell<0 jinak ge==0 výše). fixed_pv_b_export_cap = ( purchase_fixed_pre and float(s.pv_b_forecast_w) > 0 and not getattr(grid, "block_export_on_negative_sell", False) and sell_t >= 0 ) if fixed_pv_b_export_cap: if z_gen_cutoff is not None: prob += ge_pv[t] <= float(s.pv_b_forecast_w) * (1 - z_gen_cutoff[t]) else: prob += ge_pv[t] <= max(0.0, float(s.pv_b_forecast_w)) if ( not allow_pre_neg_pv_export and not skip_pv_store_block and not fixed_pv_b_export_cap and sell_t < pv_store_val and not _pv_forced_vent_export_allowed( t, current_soc_wh=current_soc_wh, battery=battery, soc_headroom_wh=soc_headroom_wh, pv_surplus_w=pv_surplus_w, ) ): prob += ge_pv[t] == 0 # Drahý nákup: dům + TČ z baterie (ne import ze sítě); síť jen EV (+ případně TČ). # Spot (home-01): buy > min ne-záporného buy v horizontu. # Fixní tarif (KV1): navíc buy > charge_acquisition (konstantní buy ≈ ref). expensive_import_slot = buy_t > ref_buy_horizon + min_spread if fixed_tariff_like_pre: expensive_import_slot = expensive_import_slot or ( buy_t > charge_acquisition_czk_kwh + min_spread ) if expensive_import_slot and t not in charge_slots and buy_t >= 0.0: # Strict: síť jen EV+TČ; baseload z baterie/FVE. Relaxed: síť smí krmit baseload (nouzový režim). prob += gi[t] <= ev_cap_t + hp[t] + ( float(s.load_baseline_w) if relaxed_expensive_import else 0.0 ) if not relaxed_expensive_import and om == "AUTO": prob += ( bd[t] + pv_ld[t] >= float(s.load_baseline_w) + hp[t] ) # Anti souběžný vývoz FVE + významný import (mikrocyklus). if buy_t > sell_t + min_spread and pv_surplus_w > 0: prob += ge_pv[t] <= pv_surplus_w # Deadline constraints pro EV for e, session in enumerate(ev_sessions): if session and session.target_deadline and session.energy_needed_wh > 0: t_dl = next( (t for t, s in enumerate(slots) if s.interval_start >= session.target_deadline), T - 1 ) prob += pulp.lpSum( (ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H for t in range(t_dl + 1) if (e == 0 and slots[t].ev1_connected) or (e == 1 and slots[t].ev2_connected) ) >= session.energy_needed_wh # TUV look-ahead podle tuv_usage_stats (DOW+hodina, konvence jako v DB) if ( tuv_delta_stats and heat_pump.rated_heating_power_w > 0 and getattr(heat_pump, "tuv_min_temp_c", 0) is not None ): tuv_pred = float(current_tuv_temp_c) tgt = float(getattr(heat_pump, "tuv_target_temp_c", 55.0) or 55.0) thr = float(heat_pump.tuv_min_temp_c) + 5.0 for t in range(T): dow, hour = _prague_dow_hour(slots[t].interval_start) delta = tuv_delta_stats.get((dow, hour), -0.1) tuv_pred += float(delta) * INTERVAL_H if tuv_pred < thr: prob += ( pulp.lpSum(hp[s] for s in range(max(0, t - 8), t + 1)) >= heat_pump.rated_heating_power_w * 0.5 ) tuv_pred = tgt # Nouzový ohřev TUV if current_tuv_temp_c < heat_pump.tuv_min_temp_c: prob += hp[0] >= heat_pump.rated_heating_power_w * 0.8 # SoC bezpečnostní buffer vyhodnocený až na konci 24h horizontu eod_idx = min(T - 1, int(24 / INTERVAL_H) - 1) prob += soc_deficit_24h >= soc_buffer_target_wh - soc[eod_idx] # --- Řešení (HiGHS přes highspy / PuLP API; bez externí binárky HiGHS_CMD) --- t_start = time.monotonic() try: solver = pulp.getSolver( "HiGHS", msg=False, timeLimit=SOLVER_TIME_LIMIT ) except Exception: logger.warning("HiGHS nedostupný, používám CBC fallback") solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT) status = prob.solve(solver) duration_ms = int((time.monotonic() - t_start) * 1000) if pulp.LpStatus[status] != "Optimal": if not relaxed_expensive_import: logger.warning( "solve_dispatch Infeasible, retry with relaxed_expensive_import " "(grid may supply baseload in expensive slots)" ) return solve_dispatch( slots, battery, heat_pump, grid, ev_sessions, vehicles, current_soc_wh, current_tuv_temp_c, tuv_delta_stats=tuv_delta_stats, operating_mode=operating_mode, charge_commitment_prev_w=charge_commitment_prev_w, planner_version=planner_version, relaxed_expensive_import=True, ) raise RuntimeError(f"Solver: {pulp.LpStatus[status]}") # --- Post-processing --- results = [] for t in range(T): hp_raw = pulp.value(hp[t]) hp_on = hp_raw > heat_pump.rated_heating_power_w * 0.3 bc_tot = float(pulp.value(bc_pv[t]) or 0) + float(pulp.value(bc_gi[t]) or 0) batt_w = round(bc_tot - float(pulp.value(bd[t]) or 0)) grid_w = round(pulp.value(gi[t]) - pulp.value(ge[t])) soc_pct = round(pulp.value(soc[t]) / battery.usable_capacity_wh * 100, 1) export_limit_w = int(grid.max_export_power_w) if grid_w < 0 else 0 ge_bat_w = round(float(pulp.value(ge_bat[t]) or 0)) export_mode = "NONE" if grid_w < 0: export_mode = ( "BATTERY_SELL" if ge_bat_w >= GE_MIN_EXPORT_W else "PV_SURPLUS" ) # Deye: default PASSIVE (střídač pokryje load). CHARGE/SELL jen v maskovaných AUTO slotech. deye_mode = "PASSIVE" if om == "AUTO": if ( slots[t].allow_discharge_export and batt_w < 0 and grid_w < 0 ): deye_mode = "SELL" elif slots[t].allow_charge and batt_w > 0 and grid_w > 0: deye_mode = "CHARGE" elif batt_w < 0 and grid_w < 0: deye_mode = "SELL" elif batt_w > 0 and grid_w > 0: deye_mode = "CHARGE" deye_gen_cutoff = None if z_gen_cutoff is not None: deye_gen_cutoff = bool(round(float(pulp.value(z_gen_cutoff[t]) or 0))) cost = ( pulp.value(gi[t]) * slots[t].buy_price * INTERVAL_H / 1000 - pulp.value(ge[t]) * slots[t].sell_price * INTERVAL_H / 1000 ) results.append(DispatchResult( interval_start = slots[t].interval_start, battery_setpoint_w = batt_w, battery_soc_target = soc_pct, grid_setpoint_w = grid_w, export_limit_w = export_limit_w, export_mode = export_mode, deye_physical_mode = deye_mode, deye_gen_cutoff_enabled = deye_gen_cutoff, ev1_setpoint_w = round(pulp.value(ev_direct[0][t]) + pulp.value(ev_via_bat[0][t])) if slots[t].ev1_connected else None, ev2_setpoint_w = round(pulp.value(ev_direct[1][t]) + pulp.value(ev_via_bat[1][t])) if slots[t].ev2_connected else None, ev1_via_bat_w = round(pulp.value(ev_via_bat[0][t])), ev2_via_bat_w = round(pulp.value(ev_via_bat[1][t])), heat_pump_enabled = hp_on, heat_pump_setpoint_w = heat_pump.rated_heating_power_w if hp_on else 0, pv_a_curtailed_w = round(pulp.value(ca[t])), expected_cost_czk = round(cost, 4), effective_buy_price = slots[t].buy_price, effective_sell_price = slots[t].sell_price, is_predicted_price = bool(slots[t].is_predicted_price), )) sell_rank = sorted(range(T), key=lambda i: float(slots[i].sell_price), reverse=True)[: min(3, T)] charge_commit_snapshot = [ { "slot": slots[tt].interval_start.isoformat(), "previous_charge_w": prev, "shortfall_w": float(pulp.value(cv) or 0.0), } for tt, cv, prev in commit_lp ] masks_snap: list[dict[str, Any]] = [] soc_bounds_snap: list[dict[str, Any]] = [] objective_terms_snap: list[dict[str, Any]] = [] for t in range(T): st = slots[t] masks_snap.append( { "slot": st.interval_start.isoformat(), "allow_charge": bool(st.allow_charge), "allow_discharge_export": bool(st.allow_discharge_export), } ) tgt_s = st.safety_soc_target_wh if daytime_en else None # Export floor pro debug snapshot (kopie logiky z constraintů výše). if soc_panel_min[t] < min_soc_wh - 1e-3: export_floor_wh = float(soc_panel_min[t]) export_floor_reason = "deep_relax" else: export_floor_wh = float(arb_base_wh) export_floor_reason = "arb_base" if tgt_s is not None and not high_sell_slot[t]: export_floor_wh = max( export_floor_wh, min( float(battery.soc_max_wh), max(min_soc_wh, float(tgt_s)), ), ) export_floor_reason = "safety_export_floor" soc_bounds_snap.append( { "slot": st.interval_start.isoformat(), "soc_min_wh": float(soc_panel_min[t]), "arb_floor_wh": float(arb_floor_series[t]), "soc_panel_min_wh": float(soc_panel_min[t]), "safety_soc_target_wh": float(tgt_s) if tgt_s is not None else None, "export_soc_floor_wh": float(export_floor_wh), "export_floor_reason": export_floor_reason, "high_sell_slot": bool(high_sell_slot[t]), } ) fb = float(st.future_avoided_buy_czk_kwh or st.buy_price) fs = float(st.future_sell_opportunity_czk_kwh or st.sell_price) bv = max(fb, fs) - float(degradation_cost_effective) bv = max(0.0, min(5.0, bv)) pen_wh = bv / 1000.0 if tgt_s is not None else 0.0 sv = safety_vars[t] sdv = float(pulp.value(sv) or 0.0) if sv is not None else None cshort = next((float(pulp.value(cv) or 0.0) for tt, cv, _p in commit_lp if tt == t), None) objective_terms_snap.append( { "slot": st.interval_start.isoformat(), "buy_price": float(st.buy_price), "sell_price": float(st.sell_price), "future_avoided_buy_czk_kwh": float(st.future_avoided_buy_czk_kwh or st.buy_price), "future_sell_opportunity_czk_kwh": float( st.future_sell_opportunity_czk_kwh or st.sell_price ), "battery_value_czk_kwh": float(bv), "safety_deficit_penalty_czk_per_wh": float(pen_wh) if safety_active[t] else 0.0, "safety_penalty_active": bool(safety_active[t]), "safety_deficit_wh": sdv, "commitment_shortfall_w": cshort, "commitment_penalty_czk_kwh": float(commit_pen) if cshort is not None else None, } ) night0 = slots[0] solver_snapshot: dict[str, Any] = { "version": 1, "planner_build_tag": PLANNER_BUILD_TAG, "inputs": { "current_soc_wh": float(current_soc_wh), "soc_headroom_applied_wh": soc_headroom_applied_wh, "operating_mode": operating_mode, "planner_version": planner_version_resolved, "battery": { "usable_capacity_wh": float(battery.usable_capacity_wh), "min_soc_wh": float(battery.min_soc_wh), "reserve_soc_wh": float(getattr(battery, "reserve_soc_wh", 0.0)), "degradation_cost_czk_kwh": float(battery.degradation_cost_czk_kwh), "planner_terminal_soc_value_factor": float(battery.planner_terminal_soc_value_factor), "planner_daytime_charge_target_enabled": daytime_en, "planner_charge_commitment_penalty_czk_kwh": float(commit_pen), }, "load_first_enabled": om == "AUTO", "relaxed_expensive_import": relaxed_expensive_import, "charge_acquisition_buy_czk_kwh": charge_acquisition_czk_kwh, "charge_acquisition_cutoff_at": ( slots[0].charge_acquisition_cutoff_at.isoformat() if slots[0].charge_acquisition_cutoff_at is not None else None ), }, "masks": masks_snap, "soc_bounds": soc_bounds_snap, "objective_terms": objective_terms_snap, "chosen_slots": { "charge_commitment": charge_commit_snapshot, "high_sell_windows": [slots[i].interval_start.isoformat() for i in sell_rank], "night_window": { "definition": "Europe/Prague 20:00–06:00 projected baseload Wh (fn_load_planning_slots_full)", "target_wh": night0.night_baseload_target_wh, "buffer_wh": night0.night_baseload_buffer_wh, }, }, } return results, duration_ms, solver_snapshot # ============================================================ # Denní plán (15:00) # ============================================================ async def run_daily_plan( site_id: int, db, triggered_by: str = "scheduler:daily", *, planner_version: str | None = None, ) -> tuple[int, int]: """ Hlavní denní plánování. Spouštět v 15:00 po importu cen (14:00) a aktualizaci forecastu (14:30). Horizont: `ems.fn_planning_horizon_end` (OTE, strop a práh v SQL). """ now = datetime.now(timezone.utc) horizon_from = _current_slot_start(now) horizon_to = await _planning_horizon_end(site_id, horizon_from, db) if horizon_to is None: horizon_to = horizon_from + timedelta(hours=_DAILY_FALLBACK_HORIZON_HOURS) logger.warning( "[site=%s] Daily plan: fn_planning_horizon_end NULL, fallback %.1fh", site_id, _DAILY_FALLBACK_HORIZON_HOURS, ) logger.info(f"[site={site_id}] Daily plan: {horizon_from} → {horizon_to}") battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode, tuv_stats = ( await _load_site_context(site_id, db) ) planner_version_resolved = _planner_engine_version(planner_version) slots = await _load_slots(site_id, horizon_from, horizon_to, db, soc_wh=soc_wh) om = operating_mode or "AUTO" if om == "AUTO": results, duration_ms, solver_snapshot = solve_dispatch_two_pass( slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp, tuv_delta_stats=tuv_stats, operating_mode=om, planner_version=planner_version_resolved, ) else: results, duration_ms, solver_snapshot = solve_dispatch( slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp, tuv_delta_stats=tuv_stats, operating_mode=om, planner_version=planner_version_resolved, ) comparison_ctx = _maybe_add_planner_comparison( slots=slots, battery=battery, heat_pump=hp, grid=grid, ev_sessions=ev_sessions, vehicles=vehicles, current_soc_wh=soc_wh, current_tuv_temp_c=tuv_temp, operating_mode=om, tuv_delta_stats=tuv_stats, active_version=planner_version_resolved, ) if comparison_ctx is not None: peer_results = comparison_ctx["peer_results"] peer_ms = comparison_ctx["peer_ms"] peer_snapshot = comparison_ctx["peer_snapshot"] solver_snapshot["comparison"] = _dispatch_result_comparison( results, duration_ms, planner_version_resolved, peer_results, peer_ms, comparison_ctx["peer_version"], ) slot_inputs = _build_slot_inputs(slots, slots) run_id = await _save_planning_run( site_id, results, horizon_from, horizon_to, run_type="daily", triggered_by=triggered_by, replan_from=None, soc_wh=soc_wh, duration_ms=duration_ms, correction=1.0, db=db, slot_inputs=slot_inputs, solver_snapshot=solver_snapshot, ) if comparison_ctx is not None: compare_snapshot = dict(peer_snapshot) compare_snapshot["comparison_of_run_id"] = run_id compare_snapshot["compare_peer_version"] = comparison_ctx["peer_version"] await _save_planning_run( site_id, comparison_ctx["peer_results"], horizon_from, horizon_to, run_type="daily", triggered_by=f"{triggered_by}:compare", replan_from=None, soc_wh=soc_wh, duration_ms=comparison_ctx["peer_ms"], correction=1.0, db=db, slot_inputs=slot_inputs, activate_run=False, solver_snapshot=compare_snapshot, ) logger.info(f"[site={site_id}] Daily plan done in {duration_ms} ms") return run_id, duration_ms # ============================================================ # Rolling replan (každých 15min) # ============================================================ async def run_rolling_replan( site_id: int, db, *, triggered_by: str = "scheduler:rolling", allow_skip: bool = True, planner_version: str | None = None, ) -> tuple[Optional[int], Optional[int]]: """ Rolling replan každých 15 minut. 1. Zjistí aktuální SoC baterie z telemetrie 2. Spočítá korekční faktor FVE forecastu z poslední hodiny 3. Aplikuje korekci na forecast zbytku dne (s útlumem) 4. Spustí solver pro zbývající horizont aktivního plánu 5. Uloží jako nový planning_run (aktivní plán se stane superseded) Pokud allow_skip=True (scheduler) a horizont je vyčerpaný → vrátí (None, None). Pokud allow_skip=False (API) → spustí denní plán jako náhradu. """ now = datetime.now(timezone.utc) replan_from = _current_slot_start(now) planner_version_resolved = _planner_engine_version(planner_version) ar_raw = await db.fetchval( "select ems.fn_planning_active_run($1::int)", site_id, ) ar = ar_raw if isinstance(ar_raw, dict) else json.loads(ar_raw) if ar.get("error") == "no_active_plan": logger.warning(f"[site={site_id}] Rolling replan: no active plan, triggering daily plan") return await run_daily_plan( site_id, db, triggered_by=triggered_by, planner_version=planner_version_resolved, ) horizon_to = await _planning_horizon_end(site_id, replan_from, db) if horizon_to is None: if allow_skip: logger.info( "[site=%s] Rolling replan: fn_planning_horizon_end NULL (krátký OTE horizont), skipping", site_id, ) return None, None logger.warning( "[site=%s] Rolling replan: fn_planning_horizon_end NULL, running daily plan", site_id, ) return await run_daily_plan( site_id, db, triggered_by=triggered_by, planner_version=planner_version_resolved, ) if (horizon_to - replan_from).total_seconds() < 1800: if allow_skip: logger.info(f"[site={site_id}] Rolling replan: horizon almost exhausted, skipping") return None, None logger.info(f"[site={site_id}] Rolling replan: horizon exhausted, running daily plan") return await run_daily_plan( site_id, db, triggered_by=triggered_by, planner_version=planner_version_resolved, ) logger.info(f"[site={site_id}] Rolling replan from {replan_from} → {horizon_to}") battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode, tuv_stats = ( await _load_site_context(site_id, db) ) slots = await _load_slots(site_id, replan_from, horizon_to, db, soc_wh=soc_wh) # PV forecast korekce je kanonicky v DB (delta + rolling faktor + decay) a do LP vstupuje přes # ems.fn_load_planning_slots_full. Pro audit/debug ale chceme ukládat i RAW (bez korekcí). correction_factor, correction_log = 1.0, { "window_start": None, "window_end": None, "actual_pv_wh": None, "forecast_pv_wh": None, "correction_factor": None, "reason": "canonical_db", } # RAW PV pro slot_inputs: přímý součet nejnovějších forecast_pv_interval per array/slot (bez delta/rolling). raw_pv_rows = await db.fetchval( "select ems.fn_forecast_pv_slots_range_raw_ab($1::int, $2::timestamptz, $3::timestamptz)", site_id, replan_from, horizon_to, ) raw_pv = raw_pv_rows if isinstance(raw_pv_rows, list) else json.loads(raw_pv_rows) raw_by_ts: dict[str, tuple[int, int]] = {} if isinstance(raw_pv, list): for r in raw_pv: if not isinstance(r, dict): continue ts = r.get("interval_start") if isinstance(ts, str): raw_by_ts[ts] = ( int(r.get("pv_a_forecast_raw_w") or 0), int(r.get("pv_b_forecast_raw_w") or 0), ) slots_raw_pv: list[PlanningSlot] = [] for s in slots: key = s.interval_start.isoformat() pva, pvb = raw_by_ts.get(key, (s.pv_a_forecast_w, s.pv_b_forecast_w)) slots_raw_pv.append(replace(s, pv_a_forecast_w=pva, pv_b_forecast_w=pvb)) commitment_prev = await _load_previous_plan_charge_commitment_prev_w(site_id, slots, db) om = operating_mode or "AUTO" if om == "AUTO": results, duration_ms, solver_snapshot = solve_dispatch_two_pass( slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp, tuv_delta_stats=tuv_stats, operating_mode=om, charge_commitment_prev_w=commitment_prev, planner_version=planner_version_resolved, ) else: results, duration_ms, solver_snapshot = solve_dispatch( slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp, tuv_delta_stats=tuv_stats, operating_mode=om, charge_commitment_prev_w=commitment_prev, planner_version=planner_version_resolved, ) comparison_ctx = _maybe_add_planner_comparison( slots=slots, battery=battery, heat_pump=hp, grid=grid, ev_sessions=ev_sessions, vehicles=vehicles, current_soc_wh=soc_wh, current_tuv_temp_c=tuv_temp, operating_mode=om, tuv_delta_stats=tuv_stats, active_version=planner_version_resolved, charge_commitment_prev_w=commitment_prev, ) if comparison_ctx is not None: peer_results = comparison_ctx["peer_results"] peer_ms = comparison_ctx["peer_ms"] solver_snapshot["comparison"] = _dispatch_result_comparison( results, duration_ms, planner_version_resolved, peer_results, peer_ms, comparison_ctx["peer_version"], ) slot_inputs = _build_slot_inputs(slots_raw_pv, slots) run_id = await _save_planning_run( site_id, results, replan_from, horizon_to, run_type="rolling", triggered_by=triggered_by, replan_from=replan_from, soc_wh=soc_wh, duration_ms=duration_ms, correction=correction_factor, db=db, slot_inputs=slot_inputs, solver_snapshot=solver_snapshot, ) if comparison_ctx is not None: compare_snapshot = dict(comparison_ctx["peer_snapshot"]) compare_snapshot["comparison_of_run_id"] = run_id compare_snapshot["compare_peer_version"] = comparison_ctx["peer_version"] await _save_planning_run( site_id, comparison_ctx["peer_results"], replan_from, horizon_to, run_type="rolling", triggered_by=f"{triggered_by}:compare", replan_from=replan_from, soc_wh=soc_wh, duration_ms=comparison_ctx["peer_ms"], correction=correction_factor, db=db, slot_inputs=slot_inputs, activate_run=False, solver_snapshot=compare_snapshot, ) # Historický log rolling korekce: dřív se psal z Pythonu. Nově se rolling faktor počítá v DB # v kanonické PV řadě; log se případně přesune do DB (todo). logger.info(f"[site={site_id}] Rolling replan done in {duration_ms} ms (pv=canonical_db)") return run_id, duration_ms async def run_plan_api( site_id: int, plan_type: str, db, *, triggered_by: str = "api", planner_version: str | None = None, ) -> tuple[int, int]: """Ruční / UI spuštění plánu. Vždy vrátí (run_id, solver_duration_ms).""" pt = plan_type.lower().strip() planner_version_resolved = _planner_engine_version(planner_version) if pt == "daily": return await run_daily_plan( site_id, db, triggered_by=triggered_by, planner_version=planner_version_resolved, ) if pt == "rolling": rid, ms = await run_rolling_replan( site_id, db, triggered_by=triggered_by, allow_skip=False, planner_version=planner_version_resolved, ) if rid is None or ms is None: raise RuntimeError("Rolling replan did not return a run") return rid, ms raise ValueError(f"Unknown plan_type: {plan_type!r} (use daily or rolling)") # ============================================================ # Pomocné funkce # ============================================================ def _current_slot_start(dt: datetime) -> datetime: """Zaokrouhlí čas dolů na začátek aktuálního 15min slotu.""" minute = (dt.minute // 15) * 15 return dt.replace(minute=minute, second=0, microsecond=0) def _parse_json_dt(val: object) -> Optional[datetime]: if val is None: return None if isinstance(val, datetime): return val if val.tzinfo else val.replace(tzinfo=timezone.utc) return datetime.fromisoformat(str(val).replace("Z", "+00:00")) def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]: if obj is None or obj == []: return None if isinstance(obj, str): obj = json.loads(obj) if not isinstance(obj, dict): return None td = _parse_json_dt(obj.get("target_deadline")) if td is None: return None return SimpleNamespace( target_deadline=td, energy_needed_wh=float(obj["energy_needed_wh"]), ) async def _load_site_context(site_id: int, db): """ Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL). """ raw = await db.fetchval( "select ems.fn_planning_site_context($1::int)", site_id, ) ctx = raw if isinstance(raw, dict) else json.loads(raw) if ctx.get("error") == "unknown_site": raise RuntimeError(f"Site not found: {site_id}") b = ctx["battery"] ec_i = int(b["max_charge_power_w"]) ed_i = int(b["max_discharge_power_w"]) planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"])) floor_pct = b.get("planner_discharge_floor_percent") buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh") relax_prewin = b.get("planner_discharge_relax_prewindow_slots") battery = SimpleNamespace( usable_capacity_wh=float(b["usable_capacity_wh"]), min_soc_wh=float(b["min_soc_wh"]), arb_floor_wh=float(b["arb_floor_wh"]), reserve_soc_wh=float(b["reserve_soc_wh"]), soc_max_wh=planner_soc_max, charge_efficiency=float(b["charge_efficiency"]), discharge_efficiency=float(b["discharge_efficiency"]), degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]), max_charge_power_w=ec_i, max_discharge_power_w=ed_i, charge_slot_buffer=float(b["charge_slot_buffer"]) if b.get("charge_slot_buffer") is not None else 0, discharge_slot_buffer=float(b["discharge_slot_buffer"]) if b.get("discharge_slot_buffer") is not None else 0, planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0, planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None, planner_discharge_relax_prewindow_slots=int(relax_prewin) if relax_prewin is not None else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS, planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]), planner_daytime_charge_target_enabled=bool( b.get("planner_daytime_charge_target_enabled", True) ), planner_night_baseload_buffer_percent=float( b.get("planner_night_baseload_buffer_percent") or 20.0 ), planner_daytime_charge_price_quantile=float( b.get("planner_daytime_charge_price_quantile") or 0.70 ), planner_charge_commitment_penalty_czk_kwh=float( b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20 ), ) hpj = ctx["heat_pump"] heat_pump = SimpleNamespace( rated_heating_power_w=int(hpj["rated_heating_power_w"]), tuv_min_temp_c=float(hpj["tuv_min_temp_c"]), tuv_target_temp_c=float(hpj["tuv_target_temp_c"]), ) g = ctx["grid"] m = ctx.get("market") or {} grid = SimpleNamespace( max_import_power_w=int(g["max_import_power_w"]), max_export_power_w=int(g["max_export_power_w"]), block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False), deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False), purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(), sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(), ) vehicles: list[SimpleNamespace] = [] for v in ctx.get("vehicles") or []: vehicles.append( SimpleNamespace( max_charge_power_w=int(v["max_charge_power_w"]), battery_capacity_kwh=float(v["battery_capacity_kwh"]), default_target_soc_pct=float(v["default_target_soc_pct"]), ) ) while len(vehicles) < 2: vehicles.append( SimpleNamespace( max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0, ) ) ev_raw = ctx.get("ev_sessions") or [] ev_sessions = [ _ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None, _ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None, ] soc_wh = float(ctx["soc_wh"]) tuv_temp = float(ctx["tuv_temp"]) operating_mode = ctx.get("operating_mode") tuv_stats: dict[tuple[int, int], float] = {} for row in ctx.get("tuv_delta_stats") or []: tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"]) return ( battery, heat_pump, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode, tuv_stats, ) async def _load_previous_plan_charge_commitment_prev_w( site_id: int, slots: list[PlanningSlot], db, ) -> list[Optional[float]]: """ Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty. Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky). """ if not slots: return [] rows = await db.fetch( """ select pi.interval_start, pi.battery_setpoint_w, pi.grid_setpoint_w, coalesce(pi.pv_a_forecast_solver_w, 0) as pva, coalesce(pi.pv_b_forecast_solver_w, 0) as pvb, coalesce(pi.load_baseline_w, 0) as lb from ems.planning_interval pi inner join ems.planning_run pr on pr.id = pi.run_id where pr.site_id = $1::int and pr.status = 'active' """, site_id, ) by_start = {r["interval_start"]: r for r in rows} out: list[Optional[float]] = [] for s in slots: r = by_start.get(s.interval_start) if r is None: out.append(None) continue bw = int(r["battery_setpoint_w"] or 0) gw = int(r["grid_setpoint_w"] or 0) pva = int(r["pva"] or 0) pvb = int(r["pvb"] or 0) lb = int(r["lb"] or 0) # Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně # výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus. if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500: out.append(float(bw)) else: out.append(None) return out async def _load_slots( site_id: int, from_dt: datetime, to_dt: datetime, db, *, soc_wh: float, ) -> list[PlanningSlot]: """15min sloty z ems.fn_load_planning_slots_full.""" rows = await db.fetch( """ select slot_ord, interval_start, buy_price, sell_price, is_predicted_price, pv_a_forecast_w, pv_b_forecast_w, load_baseline_w, ev1_connected, ev2_connected, allow_charge, allow_discharge_export, night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh, future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh, is_daytime_pv_surplus_slot, charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at from ems.fn_load_planning_slots_full( $1::int, $2::timestamptz, $3::timestamptz, $4::numeric ) """, site_id, from_dt, to_dt, soc_wh, ) out: list[PlanningSlot] = [] for r in rows: d = dict(r) out.append( PlanningSlot( interval_start=d["interval_start"], buy_price=float(d["buy_price"]), sell_price=float(d["sell_price"]), pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0), pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0), load_baseline_w=int(d["load_baseline_w"] or 0), ev1_connected=bool(d["ev1_connected"]), ev2_connected=bool(d["ev2_connected"]), is_predicted_price=bool(d.get("is_predicted_price")), allow_charge=bool(d.get("allow_charge", True)), allow_discharge_export=bool(d.get("allow_discharge_export", True)), night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"), night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"), safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"), future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"), future_sell_opportunity_czk_kwh=_slot_float_nullable( d, "future_sell_opportunity_czk_kwh" ), is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)), charge_acquisition_buy_czk_kwh=_slot_float_nullable( d, "charge_acquisition_buy_czk_kwh" ), charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"), ) ) if not out: raise RuntimeError( "No planning slots available – check market prices and horizon settings" ) if any(s.is_predicted_price for s in out): logger.warning( "[site=%s] Unexpected predicted-price slots in planning horizon", site_id, ) return out def _build_slot_inputs( slots_raw_pv: list[PlanningSlot], slots_solver: list[PlanningSlot], ) -> list[tuple[int, int, int, int, int]]: """(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot.""" if len(slots_raw_pv) != len(slots_solver): raise ValueError("slots_raw_pv and slots_solver length mismatch") out: list[tuple[int, int, int, int, int]] = [] for raw, sol in zip(slots_raw_pv, slots_solver): out.append( ( int(raw.load_baseline_w), int(raw.pv_a_forecast_w), int(raw.pv_b_forecast_w), int(sol.pv_a_forecast_w), int(sol.pv_b_forecast_w), ) ) return out async def _save_planning_run( site_id, results, horizon_from, horizon_to, run_type, triggered_by, replan_from, soc_wh, duration_ms, correction, db, slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None, *, activate_run: bool = True, solver_snapshot: Optional[dict[str, Any]] = None, ) -> int: """Uloží výsledky solveru přes ems.fn_planning_run_commit.""" if slot_inputs is not None and len(slot_inputs) != len(results): raise ValueError("slot_inputs and results length mismatch") run_meta: dict[str, Any] = { "run_type": run_type, "triggered_by": triggered_by, "replan_from": replan_from.isoformat() if replan_from else None, "soc_at_replan_wh": soc_wh, "solver_duration_ms": duration_ms, "forecast_correction_factor": correction, } if solver_snapshot is not None: run_meta["solver_params"] = solver_snapshot intervals: list[dict] = [] for i, r in enumerate(results): row: dict = { "interval_start": r.interval_start.isoformat() if hasattr(r.interval_start, "isoformat") else r.interval_start, "battery_setpoint_w": r.battery_setpoint_w, "battery_soc_target_pct": r.battery_soc_target, "grid_setpoint_w": r.grid_setpoint_w, "export_limit_w": r.export_limit_w, "export_mode": r.export_mode, "deye_physical_mode": r.deye_physical_mode, "deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled, "ev1_setpoint_w": r.ev1_setpoint_w, "ev2_setpoint_w": r.ev2_setpoint_w, "ev1_via_bat_w": r.ev1_via_bat_w, "ev2_via_bat_w": r.ev2_via_bat_w, "heat_pump_enabled": r.heat_pump_enabled, "heat_pump_setpoint_w": r.heat_pump_setpoint_w, "pv_a_curtailed_w": r.pv_a_curtailed_w, "expected_cost_czk": float(r.expected_cost_czk), "effective_buy_price": float(r.effective_buy_price), "effective_sell_price": float(r.effective_sell_price), "is_predicted_price": r.is_predicted_price, } if slot_inputs is not None: si = slot_inputs[i] row["load_baseline_w"] = si[0] row["pv_a_forecast_raw_w"] = si[1] row["pv_b_forecast_raw_w"] = si[2] row["pv_a_forecast_solver_w"] = si[3] row["pv_b_forecast_solver_w"] = si[4] intervals.append(row) return int( await db.fetchval( """ select ems.fn_planning_run_commit( $1::int, $2::timestamptz, $3::timestamptz, $4::jsonb, $5::jsonb, $6::boolean ) """, site_id, horizon_from, horizon_to, json.dumps(run_meta, default=str), json.dumps(intervals, default=str), activate_run, ) )