Files
ems/backend/services/planning_engine.py
Dusan Vojacek 37a525cb4f
Some checks failed
CI and deploy / migration-check (push) Failing after 20s
CI and deploy / deploy (push) Has been skipped
predvybiti baterky
2026-05-25 03:09:33 +02:00

3258 lines
129 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning_engine.py
#
# EMS Platform plánovací engine
# Obsahuje: hlavní denní plán + rolling 15min replan
#
# Spouštění (APScheduler v lifespan.py):
# scheduler.add_job(run_daily_plan, 'cron', hour=15, minute=0)
# scheduler.add_job(run_rolling_replan, 'cron', minute='*/15')
# Horizont: ems.fn_planning_horizon_end (OTE + strop/min v SQL).
import json
import logging
import time
from dataclasses import dataclass, replace
from datetime import datetime, timezone, timedelta
from types import SimpleNamespace
from typing import Any, Optional
from zoneinfo import ZoneInfo
import pulp
from app.config import get_settings
logger = logging.getLogger(__name__)
# ============================================================
# Konstanty
# ============================================================
# Když DB vrátí NULL (skoro žádná OTE data), denní plán použije krátký fallback (soulad s min hodinami ve fn_planning_horizon_end).
_DAILY_FALLBACK_HORIZON_HOURS = 1.0
# Shadow cena zbytkové energie na konci horizontu: - (avg_buy * FACTOR / 1000) * soc[T-1] (Kč; soc v Wh).
INTERVAL_H = 0.25 # 15 minut v hodinách
CURTAILMENT_PENALTY = 0.001 # Kč/Wh malá penalizace za omezení FVE pole A
SOLVER_TIME_LIMIT = 10 # sekund
# MILP: významný export ge (W) ⇒ koncové soc[t] ≥ podlaha; mimo arbitrážní relax je to arb_base_wh
# (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_panel_min[t]
# (planner floor), jinak by šlo jen do zátěže a nešlo by „vypustit do sítě“ před levným nákupem.
GE_MIN_EXPORT_W = 1.0
# Dvouprůchodové solve: stop když acquisition z pass1 vs pass2 se liší méně než (Kč/kWh).
ACQUISITION_TWO_PASS_EPS_KWH = 0.05
# Load-first (Deye): PV nejdřív pokryje load+EV+TČ; bc_pv/ge_pv jen z pv_sp (přebytek).
LOAD_FIRST_INCENTIVE_CZK_KWH = 0.05
# Dokud je kotva pro hluboký dump (první sell < 0 v horizontu, jinak první extrémní buy) dál než
# tento počet 15min slotů, držíme plánovací spodek na rezervě (arb_base_wh) místo planner floor —
# priorita: beze „ztráty na prodeji“ (sell >= 0) držet buffer, hluboký vývoz až těsně před záporným prodejem.
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS = 8
# Měkká kotva: chceme být u planner floor už v posledním slotu před prvním sell < 0.
# Penalizace je v Kč/Wh (např. 0.20 = 200 Kč/kWh). Musí být dost velká, aby přebila
# bezpečnostní SoC buffer + terminal shadow cenu a solver skutečně „dovylil“ před sell<0.
PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH = 0.20
PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
# Měkký tlak: v okně sell<0 + block_export využít PV přebytek do baterie (ne curtail).
PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 120.0
# Curtailment při sell<0 + allow_charge: nesmí být téměř zdarma oproti nabíjení (BA81).
NEG_SELL_CURTAIL_PENALTY_CZK_KWH = 1.0
# Odměna v objective za FVE→baterie při sell<0 (doplňuje shortfall; BA81 fixed tarif).
NEG_SELL_PV_CHARGE_REWARD_CZK_KWH = 0.8
# Měkký tlak: v okně sell<0 dobít na soc_max (ne zastavit na ~94 % kvůli curtail).
NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH = 0.35
# Jen ventil nekontrolovatelného pole B při plné baterii a sell<0 (spot); ne celý PV přebytek.
NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH = 4.0
# Výboj baterie při sell<0 jen těsně před extrémně záporným buy (round-trip arbitráž).
EXTREME_BUY_DUMP_PREWINDOW_SLOTS = 12
NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH = 80.0
NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-pre-neg-batt-discharge-v23"
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
# Útlum korekce: čím dál od aktuálního času, tím méně korigujeme forecast
CORRECTION_DECAY_SLOTS = 16 # po 16 slotech (4h) klesne korekce na 0
# Dynamická ekonomická podlaha (MILP w_arb): lookahead FVE energie v dalších slotech
ARB_LOOKAHEAD_SLOTS = 32 # 8 h při INTERVAL_H=0.25
ARB_FLOOR_E_REF_FRAC = 0.5 # má scale Wh = tato frakce usable_capacity (0..1)
_PRAGUE_TZ = ZoneInfo("Europe/Prague")
def _timestamptz_from_db(val: object) -> Optional[datetime]:
if val is None:
return None
if isinstance(val, datetime):
return val if val.tzinfo else val.replace(tzinfo=timezone.utc)
return datetime.fromisoformat(str(val).replace("Z", "+00:00"))
def _planner_engine_version(explicit: str | None = None) -> str:
if explicit is not None and str(explicit).strip():
return str(explicit).strip().lower()
return str(get_settings().planning_engine_version or "v1").strip().lower()
def _planner_compare_enabled() -> bool:
return bool(get_settings().planning_engine_compare_enabled)
def _planner_peer_version(version: str) -> str:
v = str(version).strip().lower()
if v == "v1":
return "v2"
if v == "v2":
return "v1"
return "v1"
def _dispatch_result_summary(results: list["DispatchResult"], duration_ms: int, version: str) -> dict[str, Any]:
charge_slots = [r.interval_start.isoformat() for r in results if r.battery_setpoint_w > 500]
discharge_slots = [r.interval_start.isoformat() for r in results if r.battery_setpoint_w < -500]
export_slots = [r.interval_start.isoformat() for r in results if r.grid_setpoint_w < 0]
return {
"planner_version": version,
"solver_duration_ms": int(duration_ms),
"total_expected_cost_czk": round(sum(float(r.expected_cost_czk) for r in results), 4),
"charge_slots": len(charge_slots),
"discharge_slots": len(discharge_slots),
"export_slots": len(export_slots),
"first_charge_slot": charge_slots[0] if charge_slots else None,
"first_discharge_slot": discharge_slots[0] if discharge_slots else None,
"first_export_slot": export_slots[0] if export_slots else None,
}
def _dispatch_result_comparison(
active_results: list["DispatchResult"],
active_ms: int,
active_version: str,
peer_results: list["DispatchResult"],
peer_ms: int,
peer_version: str,
) -> dict[str, Any]:
active_summary = _dispatch_result_summary(active_results, active_ms, active_version)
peer_summary = _dispatch_result_summary(peer_results, peer_ms, peer_version)
slot_rows: list[dict[str, Any]] = []
for a, b in zip(active_results, peer_results):
row = {
"interval_start": a.interval_start.isoformat(),
"active": {
"battery_setpoint_w": a.battery_setpoint_w,
"grid_setpoint_w": a.grid_setpoint_w,
"export_mode": a.export_mode,
"deye_physical_mode": a.deye_physical_mode,
"deye_gen_cutoff_enabled": a.deye_gen_cutoff_enabled,
"pv_a_curtailed_w": a.pv_a_curtailed_w,
"battery_soc_target": a.battery_soc_target,
"expected_cost_czk": a.expected_cost_czk,
},
"peer": {
"battery_setpoint_w": b.battery_setpoint_w,
"grid_setpoint_w": b.grid_setpoint_w,
"export_mode": b.export_mode,
"deye_physical_mode": b.deye_physical_mode,
"deye_gen_cutoff_enabled": b.deye_gen_cutoff_enabled,
"pv_a_curtailed_w": b.pv_a_curtailed_w,
"battery_soc_target": b.battery_soc_target,
"expected_cost_czk": b.expected_cost_czk,
},
}
if row["active"] != row["peer"]:
slot_rows.append(row)
total_cost_diff = round(
float(active_summary["total_expected_cost_czk"]) - float(peer_summary["total_expected_cost_czk"]),
4,
)
return {
"compare_enabled": True,
"active": active_summary,
"peer": peer_summary,
"diff": {
"total_expected_cost_czk": total_cost_diff,
"absolute_total_expected_cost_czk": round(abs(total_cost_diff), 4),
"changed_slots": len(slot_rows),
},
"slot_diffs": slot_rows,
}
def _maybe_add_planner_comparison(
*,
slots: list["PlanningSlot"],
battery,
heat_pump,
grid,
ev_sessions: list,
vehicles: list,
current_soc_wh: float,
current_tuv_temp_c: float,
operating_mode: str,
tuv_delta_stats: Optional[dict[tuple[int, int], float]],
active_version: str,
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
) -> dict[str, Any] | None:
if not _planner_compare_enabled():
return None
peer_version = _planner_peer_version(active_version)
if peer_version == active_version:
return None
peer_results, peer_ms, peer_snapshot = solve_dispatch(
slots,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=peer_version,
)
# active_results / active_ms jsou doplněny později v calleru
return {
"peer_version": peer_version,
"peer_results": peer_results,
"peer_ms": peer_ms,
"peer_snapshot": peer_snapshot,
}
async def _planning_horizon_end(site_id: int, horizon_from: datetime, db) -> Optional[datetime]:
"""Konec horizontu z DB (`fn_planning_horizon_end`); NULL = rolling skip / daily fallback."""
raw = await db.fetchval(
"select ems.fn_planning_horizon_end($1::int, $2::timestamptz)",
site_id,
horizon_from,
)
return _timestamptz_from_db(raw)
def _pv_scarcity_penalty_multiplier(slots: list["PlanningSlot"], battery) -> float:
"""
Měkká úprava ekonomiky cyklu podle očekávaného slunečního zisku.
- málo očekávané FVE energie -> nižší penalizace cyklu (podpora precharge ze sítě),
- hodně očekávané FVE energie -> standardní penalizace.
"""
horizon_slots = min(len(slots), int(24 / INTERVAL_H)) # konzervativní 1 den dopředu
if horizon_slots <= 0:
return 1.0
pv_kwh = 0.0
for s in slots[:horizon_slots]:
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
# coverage = kolikanásobek baterie očekáváme ze slunce v horizontu.
coverage = pv_kwh / batt_kwh
coverage_clamped = max(0.0, min(1.0, coverage))
# 0.65 při nízkém slunci, 1.0 při vysokém slunci.
return 0.65 + 0.35 * coverage_clamped
def _pv_coverage_ratio(slots: list["PlanningSlot"], battery, hours: int = 24) -> float:
horizon_slots = min(len(slots), int(hours / INTERVAL_H))
if horizon_slots <= 0:
return 1.0
pv_kwh = 0.0
for s in slots[:horizon_slots]:
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
return max(0.0, min(1.0, pv_kwh / batt_kwh))
def _dynamic_arb_floor_wh_series(
slots: list["PlanningSlot"],
min_soc_wh: float,
arb_base_wh: float,
usable_wh: float,
) -> list[float]:
"""
Časově proměnná ekonomická podlaha Wh pro MILP (nad min_soc_wh).
Hodně očekávané FVE energie v dalších ARB_LOOKAHEAD_SLOTS → podlaha klesá k min_soc_wh;
málo slunce → zůstává u arb_base_wh (typicky reserve z DB).
"""
T = len(slots)
if T == 0:
return []
e_ref = max(1.0, ARB_FLOOR_E_REF_FRAC * float(usable_wh))
spread = max(0.0, float(arb_base_wh) - float(min_soc_wh))
out: list[float] = []
for t in range(T):
e_pv_wh = 0.0
for k in range(t, min(T, t + ARB_LOOKAHEAD_SLOTS)):
s = slots[k]
e_pv_wh += max(0, s.pv_a_forecast_w + s.pv_b_forecast_w) * INTERVAL_H
f = min(1.0, e_pv_wh / e_ref) if e_ref > 1e-9 else 1.0
arb_t = float(min_soc_wh) + (1.0 - f) * spread
out.append(arb_t)
return out
def _soc_security_profile(slots: list["PlanningSlot"], battery) -> tuple[float, float]:
"""
Při nízkém očekávaném slunci drží solver vyšší SoC buffer:
- cílový buffer: reserve + až 20 % usable capacity,
- ekonomická penalizace deficitu vůči bufferu z průměrné ceny.
"""
coverage = _pv_coverage_ratio(slots, battery, hours=24)
scarcity = 1.0 - coverage
usable_wh = float(getattr(battery, "usable_capacity_wh", 0.0))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0.0))
soc_max_wh = float(getattr(battery, "soc_max_wh", usable_wh))
extra_buffer_wh = 0.35 * usable_wh * scarcity
target_wh = min(soc_max_wh, reserve_wh + extra_buffer_wh)
h24 = min(len(slots), int(24 / INTERVAL_H))
avg_buy = (
sum(float(s.buy_price) for s in slots[:h24]) / h24
if h24 > 0
else 4.0
)
penalty_czk_kwh = max(0.1, avg_buy * 1.00 * scarcity)
return target_wh, penalty_czk_kwh
def _slot_float_nullable(d: dict[str, Any], key: str) -> float | None:
v = d.get(key)
if v is None:
return None
return float(v)
def _prague_dow_hour(interval_start: datetime) -> tuple[int, int]:
"""DOW v konvenci PostgreSQL EXTRACT(DOW, Europe/Prague): 0=Ne … 6=So."""
dt = interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
loc = dt.astimezone(_PRAGUE_TZ)
return (loc.weekday() + 1) % 7, loc.hour
# ============================================================
# Datové třídy (lze nahradit pydantic modely)
# ============================================================
@dataclass
class PlanningSlot:
interval_start: datetime
buy_price: float # Kč/kWh
sell_price: float # Kč/kWh
pv_a_forecast_w: int # W pole A (řiditelné)
pv_b_forecast_w: int # W pole B (zelený bonus, pevné)
load_baseline_w: int # W predikce bazální spotřeby
ev1_connected: bool
ev2_connected: bool
is_predicted_price: bool = False
allow_charge: bool = True
allow_discharge_export: bool = True
#: Měkké LP vstupy z `ems.fn_load_planning_slots_full` (mimo masky allow_*).
night_baseload_target_wh: float | None = None
night_baseload_buffer_wh: float | None = None
safety_soc_target_wh: float | None = None
future_avoided_buy_czk_kwh: float | None = None
future_sell_opportunity_czk_kwh: float | None = None
is_daytime_pv_surplus_slot: bool = False
#: Vážená nákupní / opportunity cena zásoby před prvním exportním oknem (SQL odhad z masek).
charge_acquisition_buy_czk_kwh: float | None = None
charge_acquisition_cutoff_at: datetime | None = None
min_buy_before_cutoff_czk_kwh: float | None = None
pv_charge_wh_ahead: float | None = None
neg_buy_wh_ahead: float | None = None
grid_charge_suppressed_reason: str | None = None
#: Pomocny atribut pro green_bonus v planning_interval (Kc/slot); lite default 0.
green_bonus_czk_per_slot: float = 0.0
# Lookahead pro relax spodní meze SoC: až 36 h od indexu slotu (pevné OTE ceny v horizontu).
SOC_MIN_RELAX_LOOKAHEAD_SLOTS = 144
def _soc_min_wh_series(
slots: list[PlanningSlot],
usable_wh: float,
base_min_wh: float,
buy_extreme_threshold: float,
planner_discharge_floor_pct: float | None,
) -> list[float]:
"""
Spodní mez SoC (Wh) pro každý slot: při extrémně záporném buy v lookahead povolit hlubší vybíjení
až na planner_discharge_floor_percent (jinak min_soc z DB). Absolutní minimum 5 % usable.
"""
t_len = len(slots)
abs_min_wh = max(usable_wh * 0.05, 1.0)
if planner_discharge_floor_pct is None:
relaxed_wh = base_min_wh
else:
relaxed_wh = max(abs_min_wh, float(planner_discharge_floor_pct) / 100.0 * usable_wh)
effective_relaxed = min(base_min_wh, relaxed_wh)
out: list[float] = []
for t in range(t_len):
j_end = min(t_len, t + SOC_MIN_RELAX_LOOKAHEAD_SLOTS)
min_buy_fwd = min(float(slots[k].buy_price) for k in range(t, j_end))
if min_buy_fwd <= buy_extreme_threshold:
out.append(float(effective_relaxed))
else:
out.append(float(base_min_wh))
return out
def _slots_until_buy_le_threshold(
slots: list[PlanningSlot], buy_threshold: float
) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s buy_price <= buy_threshold.
Pokud v [t, T) žádný takový není, vrátí T + 1 (větší než jakýkoli rozumný prewindow).
"""
t_len = len(slots)
sentinel = t_len + 1
next_le = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].buy_price) <= buy_threshold:
next_le = t
next_at_or_after[t] = next_le
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _slots_until_sell_lt(slots: list[PlanningSlot], sell_upper: float) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s sell_price < sell_upper.
Typicky sell_upper=0 (první záporný / „ztrátový“ prodej z pohledu OTE).
Pokud v [t, T) žádný takový není, vrátí T + 1.
"""
t_len = len(slots)
sentinel = t_len + 1
next_lt = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].sell_price) < sell_upper:
next_lt = t
next_at_or_after[t] = next_lt
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _prewindow_deferral_slots(
slots: list[PlanningSlot], buy_extreme_threshold: float, sell_upper: float = 0.0
) -> list[int]:
"""
Vzdálenost (v 15min slotech) pro zpoždění hlubokého planner flooru:
primárně do prvního sell < sell_upper (poslední „bez ztráty na prodeji“ je k-1),
pokud v horizontu není záporný prodej, fallback na první buy <= buy_extreme_threshold.
"""
t_len = len(slots)
sell_d = _slots_until_sell_lt(slots, sell_upper)
buy_d = _slots_until_buy_le_threshold(slots, buy_extreme_threshold)
sentinel = t_len + 1
out: list[int] = []
for t in range(t_len):
if sell_d[t] < sentinel:
out.append(sell_d[t])
else:
out.append(buy_d[t])
return out
def _soc_panel_min_wh_series(
soc_min_series: list[float],
slots_until_relax_anchor: list[int],
min_soc_wh: float,
arb_base_wh: float,
prewindow_slots: int,
) -> list[float]:
"""
Zpoždění hluboké relaxace: pokud je lookahead extrémní (soc_min pod min_soc), ale kotva
(záporný prodej / fallback extrémní buy) je dál než prewindow_slots, drž spodek na
max(relax_wh, arb_base_wh) — prakticky na rezervě.
"""
t_len = len(soc_min_series)
out: list[float] = []
for t in range(t_len):
sm = float(soc_min_series[t])
if sm < min_soc_wh - 1e-3 and slots_until_relax_anchor[t] > prewindow_slots:
out.append(max(sm, float(arb_base_wh)))
else:
out.append(sm)
return out
@dataclass
class DispatchResult:
interval_start: datetime
battery_setpoint_w: int # kladné = nabíjení, záporné = vybíjení
battery_soc_target: float # % SoC na konci intervalu
grid_setpoint_w: int # kladné = import, záporné = export
export_limit_w: int # tvrdý limit exportu do sítě; 0 = bez exportu
export_mode: str # NONE / PV_SURPLUS / BATTERY_SELL
#: Explicitní fyzický režim Deye pro control exporter (PASSIVE / SELL / CHARGE).
#: Cíl: odstranit heuristiky z exporteru a nést záměr přímo v plánu.
deye_physical_mode: str
#: True = v daném slotu odpojit GEN port (MI export cutoff) přes reg 178 bits01 (0-based; v UI často jako "register 179").
#: None = lokalita tuto funkci nemá / nepoužívá.
deye_gen_cutoff_enabled: bool | None
ev1_setpoint_w: Optional[int]
ev2_setpoint_w: Optional[int]
ev1_via_bat_w: int
ev2_via_bat_w: int
heat_pump_enabled: bool
heat_pump_setpoint_w: int
pv_a_curtailed_w: int
expected_cost_czk: float
effective_buy_price: float
effective_sell_price: float
is_predicted_price: bool # shodné s PlanningSlot (chybí OTE v efektivní ceně → fn_get_predicted_price)
cashflow_czk: float
battery_arbitrage_czk: float
penalty_czk: float
green_bonus_czk: float
# ============================================================
# Korekce forecastu na základě skutečné výroby
# ============================================================
async def compute_correction_factor(
site_id: int,
now: datetime,
db,
window_h: float = CORRECTION_WINDOW_H,
) -> tuple[float, dict]:
"""
Spočítá korekční faktor FVE forecastu z posledních window_h hodin.
Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP].
factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný.
"""
window_start = now - timedelta(hours=window_h)
raw = await db.fetchval(
"""
select ems.fn_pv_forecast_correction_factor(
$1::int, $2::timestamptz, $3::timestamptz,
$4::numeric, $5::numeric
)
""",
site_id,
window_start,
now,
CORRECTION_MIN_CLAMP,
CORRECTION_MAX_CLAMP,
)
j = raw if isinstance(raw, dict) else json.loads(raw)
factor = float(j.get("correction_factor", 1.0))
# JSON z DB má často ISO řetězce; asyncpg u $2/$3 vyžaduje datetime
ws = _parse_json_dt(j.get("window_start")) or window_start
we = _parse_json_dt(j.get("window_end")) or now
log_data = {
"window_start": ws,
"window_end": we,
"actual_pv_wh": j.get("actual_pv_wh"),
"forecast_pv_wh": j.get("forecast_pv_wh"),
"correction_factor": factor,
"reason": j.get("reason", "ok"),
}
if j.get("raw_factor") is not None:
log_data["raw_factor"] = j["raw_factor"]
return factor, log_data
def apply_forecast_correction(
slots: list[PlanningSlot],
now: datetime,
factor: float,
decay_slots: int = CORRECTION_DECAY_SLOTS,
) -> list[PlanningSlot]:
"""
Aplikuje korekční faktor na FVE forecast zbývajících slotů.
Korekce se lineárně utlumuje: na 1. slotu plná korekce,
na decay_slots-tém slotu žádná korekce.
Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce
"""
corrected = []
for i, slot in enumerate(slots):
if factor == 1.0 or i >= decay_slots:
corrected.append(slot)
continue
# Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots)
weight = 1.0 - (i / decay_slots)
effective_factor = 1.0 + (factor - 1.0) * weight
corrected.append(
replace(
slot,
pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)),
pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)),
)
)
return corrected
# ============================================================
# LP Solver
# ============================================================
def _recompute_charge_acquisition_from_results(
slots: list[PlanningSlot],
results: list["DispatchResult"],
battery,
) -> float:
"""Vážený buy z nabíjecích slotů (grid import + bat charge) z prvního solve."""
wh_total = 0.0
cost = 0.0
for s, r in zip(slots, results):
if not s.allow_charge:
continue
# Zaporne buy sloty (OTE) nejsou grid acquisition pro arbitraz exportu baterie.
if float(s.buy_price) < 0:
continue
gi_w = max(0, int(r.grid_setpoint_w or 0))
bc_w = max(0, int(r.battery_setpoint_w or 0))
wh = (gi_w + bc_w) * INTERVAL_H
if wh <= 0:
continue
wh_total += wh
cost += float(s.buy_price) * wh
if wh_total <= 0:
raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
if raw is not None:
return float(raw)
return min(float(s.buy_price) for s in slots)
return cost / wh_total
def _slots_with_charge_acquisition(
slots: list[PlanningSlot],
acquisition_czk_kwh: float,
) -> list[PlanningSlot]:
return [
replace(s, charge_acquisition_buy_czk_kwh=acquisition_czk_kwh)
for s in slots
]
def _pv_store_value_czk_kwh(slot: PlanningSlot, min_spread: float) -> float:
"""
Minimální sell [Kč/kWh], pod kterým je FVE→síť horší než uložení na večerní peak.
Používá jen future_sell_opportunity (ne charge_acquisition — u fixního tarifu KV1
by jinak blokoval export i při kladném sell 2 Kč).
"""
future = float(
slot.future_sell_opportunity_czk_kwh
if slot.future_sell_opportunity_czk_kwh is not None
else slot.sell_price
)
return future - min_spread
def _slot_profitable_battery_export(
slot: PlanningSlot,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
) -> bool:
"""
Export z baterie do sítě má kladnou marži.
Spot: sell > charge_acquisition + spread (energie ze sítě / vážený nákup).
Fixní tarif (BA81/KV1): stejně jako R__063 discharge maska — sell > buy + spread;
acquisition může být nafouknutá grid nabíjením a blokovat večerní špičku (3,7 < 3,9).
"""
sell_t = float(slot.sell_price)
acq = float(charge_acquisition_czk_kwh)
if fixed_tariff:
buy_t = float(slot.buy_price)
if buy_t >= 0.0:
return sell_t > buy_t + min_spread
return sell_t > acq + min_spread
def _purchase_pricing_fixed(grid: Any) -> bool:
"""Režim nákupu z DB (`site_market_config.purchase_pricing_mode`), ne odhad z rozptylu buy."""
return (
str(getattr(grid, "purchase_pricing_mode", "spot") or "spot").strip().lower()
== "fixed"
)
def _horizon_fixed_tariff_like(slots: list[PlanningSlot]) -> bool:
"""
Heuristika pro drahý import / charge_acquisition: buy v horizontu je prakticky konstantní.
U spotu (home-01) nesmí expensive_import používat charge_acquisition — jinak
buy > ~1 Kč označí téměř všechny sloty jako drahé (gi=0 pro dům) → Infeasible.
BA81 má fixní nákup v DB, ale NT/VT → buy skáče; proto neg-sell export řídí _purchase_pricing_fixed.
"""
buys = [float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0]
if not buys:
return False
if len(buys) == 1:
return True
return max(buys) - min(buys) < 0.25
def _future_extreme_buy_from(
slots: list[PlanningSlot],
buy_thr: float,
) -> list[bool]:
"""True v t, pokud v některém budoucím slotu buy <= buy_thr."""
t_len = len(slots)
out = [False] * t_len
seen = False
for i in range(t_len - 1, -1, -1):
if float(slots[i].buy_price) <= buy_thr:
seen = True
out[i] = seen
return out
def _neg_sell_bat_dump_slots(
slots: list[PlanningSlot],
*,
operating_mode: str,
purchase_fixed: bool,
grid: Any,
buy_extreme_thr: float,
degrad_czk_kwh: float,
) -> set[int]:
"""Sloty, kde smí ge_bat>0 při sell<0 (výboj před extrémně záporným buy)."""
if operating_mode != "AUTO" or purchase_fixed:
return set()
if bool(getattr(grid, "block_export_on_negative_sell", False)):
return set()
t_len = len(slots)
future_extreme = _future_extreme_buy_from(slots, buy_extreme_thr)
dist = _slots_until_buy_le(slots, buy_extreme_thr)
out: set[int] = set()
for t, s in enumerate(slots):
if float(s.sell_price) >= 0.0:
continue
future_min = min(
(float(slots[j].buy_price) for j in range(t + 1, t_len)),
default=float(s.buy_price),
)
if (
future_extreme[t]
and 0 < dist[t] <= EXTREME_BUY_DUMP_PREWINDOW_SLOTS
and future_min < float(s.sell_price) - degrad_czk_kwh
):
out.add(t)
return out
def _slots_until_buy_le(
slots: list[PlanningSlot],
buy_thr: float,
) -> list[int]:
"""Počet slotů do nejbližšího buy <= thr (0 = v tomto slotu, T = nikdy)."""
t_len = len(slots)
dist = [t_len] * t_len
next_idx = t_len
for i in range(t_len - 1, -1, -1):
if float(slots[i].buy_price) <= buy_thr:
next_idx = i
dist[i] = (next_idx - i) if next_idx < t_len else t_len
return dist
def _pre_negative_sell_export_window(
slots: list[PlanningSlot],
) -> tuple[int | None, int | None]:
"""Index prvního sell<0 a posledního slotu před ním (pro strategii „vyvézt dřív“)."""
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
if first_neg is None or first_neg <= 0:
return first_neg, None
return first_neg, first_neg - 1
def _prague_calendar_date(slot: PlanningSlot):
dt = slot.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(ZoneInfo("Europe/Prague")).date()
MORNING_PRENEG_START_HOUR = 5
MORNING_PRENEG_END_HOUR = 11
def _battery_export_cap_w(battery: Any, grid: Any) -> float:
"""Max výkon vývozu baterie do sítě [W] — z DB, ne hardcoded konstanta."""
return min(
float(battery.max_discharge_power_w),
float(grid.max_export_power_w),
)
def _prague_hour(slot: PlanningSlot) -> int:
dt = slot.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(ZoneInfo("Europe/Prague")).hour
def _morning_pre_neg_zone_peak_sell(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
) -> float | None:
"""Max kladný sell v pásmu 511 Prague před prvním sell<0 (shodně s R__063)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return None
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
sells = [
float(slots[i].sell_price)
for i in range(first_neg_sell_idx)
if float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
]
if not sells:
return None
return max(sells)
def _pre_neg_peak_sell_idx(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
) -> int | None:
"""Nejvyšší kladný sell v ranním pásmu před prvním sell<0 (ne půlnoc celého dne)."""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return None
zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx)
if zone_peak is None:
return None
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
positive = [
(i, float(slots[i].sell_price))
for i in range(first_neg_sell_idx)
if float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
]
if not positive:
return None
return max(positive, key=lambda x: (x[1], x[0]))[0]
def _morning_pre_neg_export_indices(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
*,
degrad_czk_kwh: float,
) -> list[int]:
"""Všechny ranní peak sloty (sell ≥ zónový max degrad) před prvním sell<0."""
zone_peak = _morning_pre_neg_zone_peak_sell(slots, first_neg_sell_idx)
if zone_peak is None or first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return []
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
out: list[int] = []
for i in range(first_neg_sell_idx):
if (
float(slots[i].sell_price) >= zone_peak - degrad_czk_kwh
and float(slots[i].sell_price) >= 0.0
and _prague_calendar_date(slots[i]) == neg_day
and MORNING_PRENEG_START_HOUR <= _prague_hour(slots[i]) <= MORNING_PRENEG_END_HOUR
):
out.append(i)
return out
def _pre_neg_buy_discharge_indices(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
*,
charge_acquisition_czk_kwh: float,
min_spread: float,
fixed_tariff: bool,
) -> set[int]:
"""
Sloty před prvním buy<0: výboj baterie do sítě při kladném sell (včetně noci).
Bez rozšíření discharge_export_slots (v19b — jinak w_arb → Infeasible).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return set()
out: set[int] = set()
for i in range(first_neg_buy_idx):
s = slots[i]
if float(s.buy_price) < 0.0:
continue
if float(s.sell_price) < PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH:
continue
if not _slot_profitable_battery_export(
s,
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread,
fixed_tariff=fixed_tariff,
):
continue
out.add(i)
return out
def _evening_peak_export_indices(
slots: list[PlanningSlot],
*,
degrad_czk_kwh: float,
evening_start_hour: int = 17,
) -> list[int]:
"""Večerní špičky per den (shodně s R__063, hour >= 17 Prague)."""
peak_by_day: dict = {}
for s in slots:
if _prague_hour(s) < evening_start_hour:
continue
d = _prague_calendar_date(s)
peak_by_day[d] = max(peak_by_day.get(d, 0.0), float(s.sell_price))
out: list[int] = []
for t, s in enumerate(slots):
if _prague_hour(s) < evening_start_hour:
continue
d = _prague_calendar_date(s)
peak = peak_by_day.get(d, 0.0)
if peak > 0 and float(s.sell_price) >= peak - degrad_czk_kwh:
out.append(t)
return out
def _evening_battery_export_push_indices(
slots: list[PlanningSlot],
*,
profitable_export_ts: set[int],
degrad_czk_kwh: float,
evening_start_hour: int = 17,
max_slots_per_day: int = 3,
) -> list[int]:
"""
Tvrdý push ge_bat jen u několika nejlepších večerních slotů/den (profitable ∩ peak).
Jinak součet ge_bat × z_export přes celý peak pásmo může překročit dostupné SoC → Infeasible.
"""
peak_ts = _evening_peak_export_indices(
slots,
degrad_czk_kwh=degrad_czk_kwh,
evening_start_hour=evening_start_hour,
)
by_day: dict = {}
for t in peak_ts:
if t not in profitable_export_ts:
continue
d = _prague_calendar_date(slots[t])
by_day.setdefault(d, []).append(t)
out: list[int] = []
for d in sorted(by_day.keys()):
ranked = sorted(
by_day[d],
key=lambda i: float(slots[i].sell_price),
reverse=True,
)
out.extend(ranked[:max_slots_per_day])
return sorted(out)
def _planner_soc_for_solver(
current_soc_wh: float,
battery,
) -> tuple[float, float | None]:
"""
SoC pro MILP. Při telemetrii na soc_max a dlouhém sell<0 s vysokou FVE bez rezervy pod stropem
je model neřešitelný (nelze nabít / odvést přebytek). Necháme min. ~650 Wh pod soc_max.
"""
soc_max = float(battery.soc_max_wh)
soc_min = float(battery.min_soc_wh)
soc = max(soc_min, min(float(current_soc_wh), soc_max))
charge_slot_wh = (
float(battery.max_charge_power_w)
* INTERVAL_H
/ max(float(battery.charge_efficiency), 1e-6)
)
headroom = max(650.0, 0.382 * charge_slot_wh)
if soc > soc_max - headroom:
return max(soc_min, soc_max - headroom), headroom
return soc, None
def _pv_forced_vent_export_allowed(
t: int,
*,
current_soc_wh: float,
battery,
soc_headroom_wh: float,
pv_surplus_w: float,
) -> bool:
"""Přebytek FVE do sítě jen když baterie na konci předchozího slotu nemá kapacitu."""
if pv_surplus_w <= 0:
return False
if t == 0:
return current_soc_wh >= float(battery.soc_max_wh) - soc_headroom_wh
return False
def solve_dispatch_two_pass(
slots: list[PlanningSlot],
battery,
heat_pump,
grid,
ev_sessions: list,
vehicles: list,
current_soc_wh: float,
current_tuv_temp_c: float,
*,
tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None,
operating_mode: str = "AUTO",
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
planner_version: str | None = None,
) -> tuple[list["DispatchResult"], int, dict[str, Any]]:
"""
Dva průchody solve_dispatch: pass2 používá acquisition z váženého buy nabíjení v pass1.
"""
results1, ms1, snap1 = solve_dispatch(
slots,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
)
acq1 = float(
snap1.get("inputs", {}).get("charge_acquisition_buy_czk_kwh")
or getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
or min(float(s.buy_price) for s in slots)
)
acq2 = _recompute_charge_acquisition_from_results(slots, results1, battery)
converged = abs(acq2 - acq1) < ACQUISITION_TWO_PASS_EPS_KWH
if converged:
if isinstance(snap1.get("inputs"), dict):
snap1["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6)
snap1["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6)
snap1["inputs"]["two_pass_enabled"] = True
snap1["inputs"]["two_pass_converged"] = True
snap1["inputs"]["two_pass_skipped"] = False
return results1, ms1, snap1
slots2 = _slots_with_charge_acquisition(slots, acq2)
results2, ms2, snap2 = solve_dispatch(
slots2,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
)
if isinstance(snap2.get("inputs"), dict):
snap2["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6)
snap2["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6)
snap2["inputs"]["two_pass_enabled"] = True
snap2["inputs"]["two_pass_converged"] = False
snap2["inputs"]["two_pass_skipped"] = False
snap2["inputs"]["solver_duration_ms_pass1"] = ms1
return results2, ms1 + ms2, snap2
def solve_dispatch(
slots: list[PlanningSlot],
battery,
heat_pump,
grid,
ev_sessions: list, # aktivní EV sessions [ev1_session, ev2_session]
vehicles: list, # [vehicle1, vehicle2]
current_soc_wh: float,
current_tuv_temp_c: float,
*,
tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None,
operating_mode: str = "AUTO",
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
planner_version: str | None = None,
relaxed_expensive_import: bool = False,
relaxed_neg_buy_charge: bool = False,
) -> tuple[list[DispatchResult], int, dict[str, Any]]:
"""
LP solver pro dispatch optimalizaci.
Vrátí (výsledky, solver_duration_ms, solver_debug_snapshot).
relaxed_expensive_import: nouzový režim po Infeasible — síť smí krmit baseload v drahých slotech.
relaxed_neg_buy_charge: druhý nouzový retry bez neg_buy charge shortfall.
"""
T = len(slots)
if T < 1:
raise RuntimeError("solve_dispatch requires at least one slot")
EV = len(vehicles) # počet EV (typicky 2)
planner_version_resolved = _planner_engine_version(planner_version)
planner_v2 = planner_version_resolved == "v2"
EV_ROUNDTRIP_FACTOR = 1.0 / (battery.charge_efficiency * battery.discharge_efficiency)
cycle_penalty_mult = _pv_scarcity_penalty_multiplier(slots, battery)
degradation_cost_effective = battery.degradation_cost_czk_kwh * cycle_penalty_mult
soc_buffer_target_wh, soc_deficit_penalty_czk_kwh = _soc_security_profile(slots, battery)
prob = pulp.LpProblem("ems_dispatch", pulp.LpMinimize)
# Penalizace překročení breakeru (Kč/kWh importu nad max_import_power_w).
# Záměr: breaker je fyzický strop, ale kvůli chybám forecastu a krátkým „extrémním“ oknům
# (např. záporná nákupní cena) umožníme solveru nominálně jít nad breaker, ovšem pouze za cenu.
IMPORT_OVER_BREAKER_PENALTY_CZK_KWH = 10.0
min_soc_wh = float(getattr(battery, "min_soc_wh", battery.reserve_soc_wh))
buy_extreme_thr = float(getattr(battery, "planner_extreme_buy_threshold_czk_kwh", -5.0))
floor_pct_raw = getattr(battery, "planner_discharge_floor_percent", None)
floor_pct = float(floor_pct_raw) if floor_pct_raw is not None else None
prewin = max(
0,
int(
getattr(
battery,
"planner_discharge_relax_prewindow_slots",
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
)
),
)
# Planner floor v Wh (nezávisle na lookahead extrémním buy) použije se pro kotvu před sell<0.
abs_min_wh = max(float(battery.usable_capacity_wh) * 0.05, 1.0)
planner_floor_wh = (
min_soc_wh
if floor_pct is None
else max(abs_min_wh, float(floor_pct) / 100.0 * float(battery.usable_capacity_wh))
)
planner_floor_effective_wh = min(min_soc_wh, float(planner_floor_wh))
soc_min_series = _soc_min_wh_series(
slots,
float(battery.usable_capacity_wh),
min_soc_wh,
buy_extreme_thr,
floor_pct,
)
# Pokud se blíží první sell<0, dovol hluboký planner floor i bez extrémního buy.
# Záměr: „dovylít“ baterii před záporným prodejem a pak už baterii v sell<0 okně nevybíjet.
if floor_pct is not None:
dist_to_neg_sell = _slots_until_sell_lt(slots, 0.0)
soc_min_series = [
min(float(sm), float(planner_floor_effective_wh))
if dist_to_neg_sell[i] <= prewin
else float(sm)
for i, sm in enumerate(soc_min_series)
]
soc_headroom_applied_wh: float | None = None
current_soc_wh, soc_headroom_applied_wh = _planner_soc_for_solver(
current_soc_wh, battery
)
current_soc_wh = max(soc_min_series[0], min(current_soc_wh, float(battery.soc_max_wh)))
arb_base_wh = max(
float(getattr(battery, "arb_floor_wh", battery.reserve_soc_wh)),
min_soc_wh,
)
if getattr(battery, "disable_dynamic_arb_floor", False):
arb_floor_series = [arb_base_wh] * T
else:
arb_floor_series = _dynamic_arb_floor_wh_series(
slots, min_soc_wh, arb_base_wh, float(battery.usable_capacity_wh)
)
deferral_slots = _prewindow_deferral_slots(slots, buy_extreme_thr)
soc_panel_min = _soc_panel_min_wh_series(
soc_min_series,
deferral_slots,
min_soc_wh,
arb_base_wh,
prewin,
)
# --- Proměnné ---
# Import ze sítě: tvrdý strop = site breaker (max_import_power_w).
gi_upper = float(grid.max_import_power_w)
gi = [pulp.LpVariable(f"gi_{t}", 0, gi_upper) for t in range(T)]
gi_over = [
pulp.LpVariable(f"gi_over_{t}", 0, max(0.0, gi_upper - float(grid.max_import_power_w)))
for t in range(T)
]
ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)]
ge_pv = [pulp.LpVariable(f"ge_pv_{t}", 0, grid.max_export_power_w) for t in range(T)]
ge_bat = [pulp.LpVariable(f"ge_bat_{t}", 0, grid.max_export_power_w) for t in range(T)]
bc_pv = [pulp.LpVariable(f"bc_pv_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bc_gi = [pulp.LpVariable(f"bc_gi_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)]
pv_ld = [pulp.LpVariable(f"pv_ld_{t}", 0) for t in range(T)]
pv_sp = [pulp.LpVariable(f"pv_sp_{t}", 0) for t in range(T)]
soc = [
pulp.LpVariable(f"soc_{t}", soc_panel_min[t], battery.soc_max_wh) for t in range(T)
]
w_arb = [pulp.LpVariable(f"w_arb_{t}", cat=pulp.LpBinary) for t in range(T)]
z_export = [pulp.LpVariable(f"z_export_{t}", cat=pulp.LpBinary) for t in range(T)]
ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)]
hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)]
soc_deficit_24h = pulp.LpVariable("soc_deficit_24h", 0, battery.usable_capacity_wh)
soc_anchor_slack = None
t_anchor = None
# GEN port cut-off (BA81): binární proměnná pouze pokud je feature povolená v konfiguraci site/invertoru.
gen_cutoff_enabled = bool(getattr(grid, "deye_gen_microinverter_cutoff_enabled", False))
z_gen_cutoff = (
[pulp.LpVariable(f"z_gen_cutoff_{t}", cat=pulp.LpBinary) for t in range(T)]
if gen_cutoff_enabled
else None
)
om = (operating_mode or "AUTO").strip().upper()
charge_slots: set[int] = set()
discharge_export_slots: set[int] = set()
if om == "AUTO":
charge_slots = {t for t, s in enumerate(slots) if s.allow_charge}
charge_slots |= {
t for t, s in enumerate(slots) if float(s.buy_price) < 0.0
}
# Stejně jako R__063 (sell<0 + PV přebytek): shortfall/curtail penalizace i bez block_export.
charge_slots |= {
t
for t, s in enumerate(slots)
if float(s.sell_price) < 0.0
and max(
0,
int(s.pv_a_forecast_w)
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
> 500
}
discharge_export_slots = {
t for t, s in enumerate(slots) if s.allow_discharge_export
}
# SELF_SUSTAIN dřív vynucoval ge[t] == 0, což umí udělat MILP infeasible v okamžiku, kdy:
# - baterie je na max SoC (nelze nabíjet),
# - PV pole B není curtailable,
# - a pv_b_forecast_w > load_baseline_w (typicky po ručním snížení baseline).
# Export v SELF_SUSTAIN proto povolíme jako nouzový ventil, ale silně penalizujeme,
# aby k němu docházelo jen když už neexistuje jiné fyzikálně možné řešení.
SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH = 100.0
# Penalizace vypnutí GEN portu (mikroinvertory): preferujeme nechat zapnuto a vypnout jen když
# by to jinak vedlo k nežádoucímu exportu / infeasible řešení.
GEN_CUTOFF_PENALTY_CZK_KWH = 2.0 if planner_v2 else 5.0
# Heuristika: pokud existuje necurtailable PV B a v budoucnu v horizontu nastane buy < 0,
# chceme mít motivaci držet baterii „prázdnější“ pro pozdější výhodný import / bonusové PV B okno.
# V okně sell < 0 pak preferujeme curtail PV A (místo placeného exportu), a to tak,
# že dočasně snížíme penalizaci ca[t] (curtailment) na 0.
has_pv_b = any(float(s.pv_b_forecast_w) > 0.0 for s in slots)
future_neg_buy_from: list[bool] = [False] * T
seen_neg_buy = False
for i in range(T - 1, -1, -1):
if float(slots[i].buy_price) < 0.0:
seen_neg_buy = True
future_neg_buy_from[i] = seen_neg_buy
future_extreme_buy_from = _future_extreme_buy_from(slots, buy_extreme_thr)
dist_to_extreme_buy = _slots_until_buy_le(slots, buy_extreme_thr)
# EV proměnné per vozidlo
ev_direct = [[pulp.LpVariable(f"evd_{e}_{t}", 0,
min(vehicles[e].max_charge_power_w, grid.max_import_power_w))
for t in range(T)] for e in range(EV)]
ev_via_bat = [[pulp.LpVariable(f"evb_{e}_{t}", 0,
vehicles[e].max_charge_power_w)
for t in range(T)] for e in range(EV)]
horizon_slots_h24 = min(T, int(24 / INTERVAL_H))
avg_buy_terminal = (
sum(float(slots[t].buy_price) for t in range(horizon_slots_h24)) / horizon_slots_h24
if horizon_slots_h24 > 0
else 4.0
)
terminal_factor = float(battery.planner_terminal_soc_value_factor)
# Kč/Wh: ocenění energie ponechané v baterii na konci horizontu (receding horizon kotva).
terminal_soc_kcz_per_wh = (
avg_buy_terminal * terminal_factor / 1000.0
)
charge_acq_raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
charge_acquisition_czk_kwh = (
float(charge_acq_raw)
if charge_acq_raw is not None
else min(float(s.buy_price) for s in slots)
)
soc_headroom_wh = max(2000.0, 0.05 * float(battery.soc_max_wh))
# Kotva: poslední slot před prvním sell<0 by měl končit u planner floor (pokud relaxace existuje).
# Slack penalizujeme v objective; samotné omezení přidáme až po definici soc.
first_neg_sell_idx, pre_neg_export_last_t = _pre_negative_sell_export_window(slots)
first_neg_buy_idx = next(
(t for t, s in enumerate(slots) if float(s.buy_price) < 0.0),
None,
)
last_neg_sell_by_prague_date: dict[object, int] = {}
for t_ln, st_ln in enumerate(slots):
if float(st_ln.sell_price) < 0:
last_neg_sell_by_prague_date[_prague_calendar_date(st_ln)] = t_ln
t_pre_neg_peak = _pre_neg_peak_sell_idx(slots, first_neg_sell_idx)
morning_pre_neg_export_ts = _morning_pre_neg_export_indices(
slots,
first_neg_sell_idx,
degrad_czk_kwh=float(degradation_cost_effective),
)
evening_peak_export_ts = _evening_peak_export_indices(
slots,
degrad_czk_kwh=float(degradation_cost_effective),
)
non_negative_buys_pre = [
float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0
]
ref_buy_horizon_pre = (
min(non_negative_buys_pre)
if non_negative_buys_pre
else min(float(s.buy_price) for s in slots)
)
min_spread_pre = float(degradation_cost_effective)
purchase_fixed_pre = _purchase_pricing_fixed(grid)
fixed_tariff_like_pre = purchase_fixed_pre or _horizon_fixed_tariff_like(slots)
pre_neg_buy_discharge_ts: set[int] = set()
if om == "AUTO" and first_neg_buy_idx is not None and first_neg_buy_idx > 0:
pre_neg_buy_discharge_ts = _pre_neg_buy_discharge_indices(
slots,
first_neg_buy_idx,
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread_pre,
fixed_tariff=fixed_tariff_like_pre,
)
neg_sell_bat_dump_slots = _neg_sell_bat_dump_slots(
slots,
operating_mode=om,
purchase_fixed=purchase_fixed_pre,
grid=grid,
buy_extreme_thr=buy_extreme_thr,
degrad_czk_kwh=float(degradation_cost_effective),
)
profitable_export_ts_pre: set[int] = set()
if om == "AUTO":
for _t in range(T):
if _t not in discharge_export_slots:
continue
if _slot_profitable_battery_export(
slots[_t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=min_spread_pre,
fixed_tariff=fixed_tariff_like_pre,
):
profitable_export_ts_pre.add(_t)
if first_neg_sell_idx is not None and first_neg_sell_idx > 0 and floor_pct is not None:
# Kotva na ranním peaku (ne na posledním slotu před sell<0) — jinak dump až v 07:30.
if (
t_pre_neg_peak is not None
and t_pre_neg_peak < first_neg_sell_idx - 1
):
t_anchor = t_pre_neg_peak
else:
t_anchor = first_neg_sell_idx - 1
soc_anchor_slack = pulp.LpVariable("soc_anchor_slack_wh", 0, float(battery.usable_capacity_wh))
daytime_en = bool(getattr(battery, "planner_daytime_charge_target_enabled", True))
safety_pen_czk_per_wh: list[float] = []
safety_vars: list[Optional[pulp.LpVariable]] = []
safety_active: list[bool] = []
post_neg_pv_topup: list[bool] = []
high_sell_slot: list[bool] = []
for t in range(T):
sft = slots[t].safety_soc_target_wh if daytime_en else None
# High-sell slot: typicky lokální maximum v SQL lookaheadu (future_sell_opportunity_czk_kwh).
# V těchto slotech safety floor nepoužijeme, aby se zachovala arbitráž na špičkách.
fso = slots[t].future_sell_opportunity_czk_kwh
hs = bool(fso is not None and float(slots[t].sell_price) >= float(fso) - 1e-6)
high_sell_slot.append(hs)
fb = float(slots[t].future_avoided_buy_czk_kwh or slots[t].buy_price)
fs = float(slots[t].future_sell_opportunity_czk_kwh or slots[t].sell_price)
bv = max(fb, fs) - float(degradation_cost_effective)
bv = max(0.0, min(5.0, bv))
st_d = _prague_calendar_date(slots[t])
ln_neg = last_neg_sell_by_prague_date.get(st_d)
pv_topup_after_neg = bool(
om == "AUTO"
and ln_neg is not None
and t > ln_neg
and float(slots[t].sell_price) >= 0.0
and bool(slots[t].is_daytime_pv_surplus_slot)
and not hs
)
post_neg_pv_topup.append(pv_topup_after_neg)
# Safety deficit penalizujeme jen v PV surplus slotech, a ne ve high-sell špičce.
# Záměr: safety není obecná „nabij co nejdřív“ motivace; je to preference využít přebytek PV.
active = bool(
(
sft is not None
and (
bool(slots[t].is_daytime_pv_surplus_slot)
or (planner_v2 and float(slots[t].buy_price) < 0.0)
)
and not hs
)
or pv_topup_after_neg
)
safety_active.append(active)
safety_pen_czk_per_wh.append(bv / 1000.0 if active else 0.0)
if active:
safety_vars.append(
pulp.LpVariable(f"safety_def_{t}", 0, float(battery.usable_capacity_wh))
)
else:
safety_vars.append(None)
commit_pen = float(getattr(battery, "planner_charge_commitment_penalty_czk_kwh", 0.2))
commit_lp: list[tuple[int, pulp.LpVariable, float]] = []
if charge_commitment_prev_w is not None and len(charge_commitment_prev_w) == T:
for t in range(T):
prev = charge_commitment_prev_w[t]
if prev is not None and prev > 500:
cap_prev = float(prev)
cv = pulp.LpVariable(f"ccommit_{t}", 0, cap_prev)
commit_lp.append((t, cv, cap_prev))
peak_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable]] = []
neg_buy_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_batt_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
fixed_tariff_like = fixed_tariff_like_pre
block_export_neg_sell = bool(getattr(grid, "block_export_on_negative_sell", False))
if om == "AUTO":
for t in range(T):
if t not in discharge_export_slots:
continue
if not _slot_profitable_battery_export(
slots[t],
charge_acquisition_czk_kwh=charge_acquisition_czk_kwh,
min_spread=float(degradation_cost_effective),
fixed_tariff=fixed_tariff_like,
):
continue
cap_w = float(min(
grid.max_export_power_w,
battery.max_discharge_power_w,
))
sf = pulp.LpVariable(f"export_shortfall_{t}", 0, cap_w)
peak_export_shortfall.append((t, sf, cap_w))
export_cap_w = _battery_export_cap_w(battery, grid)
for t_pnd in sorted(pre_neg_buy_discharge_ts):
sf_pnd = pulp.LpVariable(f"pre_neg_bat_export_sf_{t_pnd}", 0, export_cap_w)
pre_neg_batt_export_shortfall.append((t_pnd, sf_pnd, export_cap_w))
if not relaxed_neg_buy_charge:
neg_buy_slot_indices = [
t for t, s in enumerate(slots) if float(s.buy_price) < 0.0
]
if neg_buy_slot_indices:
t_nb_last = max(neg_buy_slot_indices)
cap_w = float(battery.max_charge_power_w)
sf_nb = pulp.LpVariable(f"neg_buy_charge_sf_{t_nb_last}", 0, cap_w)
neg_buy_charge_shortfall.append((t_nb_last, sf_nb, cap_w))
for t in range(T):
if float(slots[t].sell_price) >= 0:
continue
if float(slots[t].buy_price) < 0.0:
continue
if t not in charge_slots:
continue
# Před buy<0: nepenalizovat / netlačit PV→bat (jinak 98 % v 09:15 a export v sell<0).
if first_neg_buy_idx is not None and t < first_neg_buy_idx:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
+ float(slots[t].pv_b_forecast_w)
- float(slots[t].load_baseline_w),
)
if pv_surplus_w <= 500:
continue
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"pv_charge_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
for t in range(T):
if not post_neg_pv_topup[t]:
continue
if float(slots[t].sell_price) < 0:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
+ float(slots[t].pv_b_forecast_w)
- float(slots[t].load_baseline_w),
)
if pv_surplus_w <= 500:
continue
cap_w = float(min(pv_surplus_w, battery.max_charge_power_w))
sf_pv = pulp.LpVariable(f"post_neg_pv_shortfall_{t}", 0, cap_w)
pv_charge_shortfall.append((t, sf_pv, cap_w))
for t in range(T):
if float(slots[t].sell_price) >= 0:
continue
if float(slots[t].buy_price) < 0.0:
continue
if t not in charge_slots:
continue
if first_neg_buy_idx is not None and t < first_neg_buy_idx:
continue
pv_surplus_w = max(
0.0,
float(slots[t].pv_a_forecast_w)
+ float(slots[t].pv_b_forecast_w)
- float(slots[t].load_baseline_w),
)
if pv_surplus_w <= 500:
continue
us = pulp.LpVariable(
f"neg_soc_under_{t}",
0,
float(battery.usable_capacity_wh),
)
neg_sell_soc_underfill.append((t, us))
for t in neg_sell_bat_dump_slots:
dump_target_w = _battery_export_cap_w(battery, grid)
sf_dump = pulp.LpVariable(f"neg_bat_dump_shortfall_{t}", 0, dump_target_w)
neg_sell_bat_dump_shortfall.append((t, sf_dump, dump_target_w))
# --- Účelová funkce (jen OTE sloty; terminal SoC shadow price na konci horizontu) ---
# Kanály: gi×buy, ge_pv×sell, ge_bat×sell, +ge_bat×acquisition (export bat. jen v discharge slotách).
# Viz docs/04-modules/planning-arbitrage-accounting.md — mezi-slotová arbitráž, ne sell vs buy v jednom slotu.
prob += (
pulp.lpSum(
gi[t] * slots[t].buy_price * INTERVAL_H / 1000
- ge_pv[t] * slots[t].sell_price * INTERVAL_H / 1000
- ge_bat[t] * slots[t].sell_price * INTERVAL_H / 1000
+ (
ge_pv[t] * SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH * INTERVAL_H / 1000
if om == "SELF_SUSTAIN"
else 0
)
+ (
(slots[t].pv_b_forecast_w * z_gen_cutoff[t]) * GEN_CUTOFF_PENALTY_CZK_KWH * INTERVAL_H / 1000
if z_gen_cutoff is not None
else 0
)
+ gi_over[t] * IMPORT_OVER_BREAKER_PENALTY_CZK_KWH * INTERVAL_H / 1000
+ 0.5 * (bc_pv[t] + bc_gi[t] + bd[t]) * degradation_cost_effective * INTERVAL_H / 1000
- (
pv_ld[t] * LOAD_FIRST_INCENTIVE_CZK_KWH * INTERVAL_H / 1000
if om == "AUTO"
else 0
)
+ (
ge_bat[t] * charge_acquisition_czk_kwh * INTERVAL_H / 1000
if om == "AUTO" and t in discharge_export_slots
else 0
)
- (
bc_pv[t]
* NEG_SELL_PV_CHARGE_REWARD_CZK_KWH
* INTERVAL_H
/ 1000
if (
om == "AUTO"
and float(slots[t].sell_price) < 0.0
and t in charge_slots
)
else 0
)
+ (
ge_pv[t]
* NEG_SELL_PV_B_VENT_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000
if (
om == "AUTO"
and float(slots[t].sell_price) < 0.0
and not purchase_fixed_pre
)
else 0
)
+ pulp.lpSum(
ev_direct[e][t] * slots[t].buy_price * INTERVAL_H / 1000
+ ev_via_bat[e][t] * slots[t].buy_price * EV_ROUNDTRIP_FACTOR * INTERVAL_H / 1000
for e in range(EV)
)
+ ca[t]
* (
NEG_SELL_CURTAIL_PENALTY_CZK_KWH
if (
om == "AUTO"
and float(slots[t].sell_price) < 0.0
and t in charge_slots
)
else (
0.0
if (
has_pv_b
and future_neg_buy_from[t]
and float(slots[t].sell_price) < 0.0
)
else CURTAILMENT_PENALTY
)
)
for t in range(T)
)
+ soc_deficit_24h * soc_deficit_penalty_czk_kwh / 1000
- terminal_soc_kcz_per_wh * soc[T - 1]
+ (
soc_anchor_slack * PRENEG_SELL_SOC_ANCHOR_SLACK_PENALTY_CZK_PER_WH
if soc_anchor_slack is not None
else 0
)
+ pulp.lpSum(
safety_vars[t] * safety_pen_czk_per_wh[t]
for t in range(T)
if safety_vars[t] is not None
)
+ pulp.lpSum(cv * INTERVAL_H / 1000.0 * commit_pen for _t, cv, _p in commit_lp)
+ pulp.lpSum(
sf * PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in peak_export_shortfall
)
+ pulp.lpSum(
sf * PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pv_charge_shortfall
)
+ pulp.lpSum(
us * NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH
for _t, us in neg_sell_soc_underfill
)
+ pulp.lpSum(
sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_sell_bat_dump_shortfall
)
+ pulp.lpSum(
sf * NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_buy_charge_shortfall
)
+ pulp.lpSum(
sf * PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_batt_export_shortfall
)
+ pulp.lpSum(
(bc_pv[t] + bc_gi[t])
* PRE_NEG_CHARGE_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
for t in range(T)
if (
first_neg_buy_idx is not None
and t < first_neg_buy_idx
and float(slots[t].buy_price) >= 0.0
)
)
+ pulp.lpSum(
-25.0 * z_export[t]
for t in range(T)
if t in discharge_export_slots and t in profitable_export_ts_pre
)
)
# --- Omezení ---
for t_sf, sf, cap_w in peak_export_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_sf, sf, cap_w in pv_charge_shortfall:
prob += sf >= cap_w - bc_pv[t_sf]
for t_sf, sf, cap_w in neg_sell_bat_dump_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
for t_us, us in neg_sell_soc_underfill:
prob += us >= float(battery.soc_max_wh) - soc[t_us]
for t_sf, sf, cap_w in neg_buy_charge_shortfall:
prob += sf >= cap_w - (bc_gi[t_sf] + bc_pv[t_sf])
for t_sf, sf, cap_w in pre_neg_batt_export_shortfall:
prob += sf >= cap_w - ge_bat[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
* INTERVAL_H,
1000.0,
)
if om == "AUTO":
profitable_export_ts = profitable_export_ts_pre
export_push_w = _battery_export_cap_w(battery, grid)
for t_peak in morning_pre_neg_export_ts:
if t_peak in profitable_export_ts:
prob += ge_bat[t_peak] >= export_push_w * z_export[t_peak]
for t_pnd in pre_neg_buy_discharge_ts:
prob += ge_bat[t_pnd] >= export_push_w * z_export[t_pnd]
evening_push_ts = _evening_battery_export_push_indices(
slots,
profitable_export_ts=profitable_export_ts,
degrad_czk_kwh=float(degradation_cost_effective),
)
# Push jen při reálném večerním okně (≥2 sloty); 1-slot regresní testy bez tvrdého push.
if len(evening_push_ts) >= 2:
for t_peak in evening_push_ts:
if t_peak not in discharge_export_slots:
continue
prob += ge_bat[t_peak] >= export_push_w * z_export[t_peak]
# Ostatní profitable sloty: shortfall penalizace (ne tvrdý push na celý horizont).
if t_anchor is not None and soc_anchor_slack is not None:
target_floor_wh = float(planner_floor_effective_wh)
prob += soc[t_anchor] <= target_floor_wh + soc_anchor_slack
for t in range(T):
s = slots[t]
pv_a_net = s.pv_a_forecast_w - ca[t]
ev_total_t = pulp.lpSum(ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV))
# Energetická bilance
pv_b_effective = (
float(s.pv_b_forecast_w) * (1 - z_gen_cutoff[t])
if z_gen_cutoff is not None
else float(s.pv_b_forecast_w)
)
pv_total_ub = float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w)
# Součet nabíjení z FVE + ze sítě nesmí překročit max_charge_power_w baterie.
prob += bc_pv[t] + bc_gi[t] <= battery.max_charge_power_w
# Vybíjení do domu (bd) + export z baterie (ge_bat) sdílí jeden BMS limit.
prob += bd[t] + ge_bat[t] <= battery.max_discharge_power_w
# Breaker: import ze site je tvrdě omezen (gi_over jen numerická pojistka).
prob += gi[t] <= gi_upper
if om == "AUTO":
load_site_expr = float(s.load_baseline_w) + ev_total_t + hp[t]
prob += pv_ld[t] + pv_sp[t] == pv_a_net + pv_b_effective
prob += pv_ld[t] <= load_site_expr
prob += pv_ld[t] <= pv_a_net + pv_b_effective
prob += pv_sp[t] <= pv_total_ub
prob += pv_sp[t] >= pv_a_net + pv_b_effective - load_site_expr
prob += bc_pv[t] <= pv_sp[t]
prob += bc_gi[t] <= gi[t]
prob += ge_pv[t] <= pv_sp[t]
prob += bc_pv[t] + ge_pv[t] <= pv_sp[t]
# Import na deficit po PV→load, nebo na grid-nabíjení (bc_gi).
prob += gi[t] <= load_site_expr + bc_gi[t]
# Vybíjení do domu až po pv_ld (Deye load-first); v exportních slotech smí bd→síť.
if t not in discharge_export_slots:
prob += bd[t] <= load_site_expr - pv_ld[t]
prob += pv_ld[t] >= load_site_expr - gi[t] - bd[t]
# Plná bilance (pv_ld+pv_sp rozpad je ortogonální k tokům přebytku).
prob += (
pv_a_net + pv_b_effective + gi[t] + bd[t]
== float(s.load_baseline_w) + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t]
)
else:
prob += pv_ld[t] == 0
prob += pv_sp[t] == pv_a_net + pv_b_effective
prob += bc_pv[t] <= pv_sp[t]
prob += bc_gi[t] <= gi[t]
prob += (
pv_a_net + pv_b_effective + gi[t] + bd[t]
== s.load_baseline_w + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t]
)
prob += ge[t] == ge_pv[t] + ge_bat[t]
# Baterie nesmí „přestrojit“ FVE export: jen z pv_sp (po load-first).
if om == "AUTO":
prob += ge_bat[t] >= ge[t] - pv_sp[t]
else:
prob += ge_bat[t] >= ge[t] - (pv_a_net + pv_b_effective)
# Měkký breaker cap: gi_over[t] >= max(0, gi[t] - breaker).
prob += gi_over[t] >= gi[t] - float(grid.max_import_power_w)
# SoC kontinuita (bd do domu i ge_bat do sítě vybíjí baterii)
soc_prev = current_soc_wh if t == 0 else soc[t - 1]
prob += soc[t] == (
soc_prev
+ (bc_pv[t] + bc_gi[t]) * battery.charge_efficiency * INTERVAL_H
- (bd[t] + ge_bat[t]) / battery.discharge_efficiency * INTERVAL_H
)
sv = safety_vars[t]
tgt_s = slots[t].safety_soc_target_wh if daytime_en else None
if sv is not None:
eff_tgt_s = float(tgt_s) if tgt_s is not None else float(min_soc_wh)
if (
om == "AUTO"
and float(s.sell_price) < 0.0
and t in charge_slots
and (first_neg_buy_idx is None or t >= first_neg_buy_idx)
):
# Záporný výkup: dobít na planner soc_max (typicky 95100 %), ne jen SQL safety ~50 %.
eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh))
elif post_neg_pv_topup[t]:
# Po konci sell<0: dobit z FVE na plno, pak teprve export (kladný sell, ne večerní peak).
eff_tgt_s = max(eff_tgt_s, float(battery.soc_max_wh))
prob += sv >= eff_tgt_s - soc[t]
# ev_via_bat kryto z discharge
prob += pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t]
# GEN port cut-off chceme vůbec připustit jen v režimech/politikách, kde má smysl:
# - SELF_SUSTAIN (no-export intent; typicky ge=0, takže cut-off je bezpečnostní ventil),
# - BLOCK_EXPORT okna (v projektu reprezentované sloty se sell_price < 0),
# - případně explicitní no_export politika (pokud bude v kontextu dostupná).
allow_gen_cutoff = (
om == "SELF_SUSTAIN"
or float(s.sell_price) < 0
or bool(getattr(grid, "no_export", False))
)
if z_gen_cutoff is not None and not allow_gen_cutoff:
prob += z_gen_cutoff[t] == 0
# Záporná nákupní cena → import jen na load + nabíjení + EV + TČ (stále ≤ breaker).
if s.buy_price < 0:
prob += gi[t] <= min(
gi_upper,
float(s.load_baseline_w)
+ battery.max_charge_power_w
+ sum(v.max_charge_power_w for v in vehicles)
+ heat_pump.rated_heating_power_w,
)
prob += ge[t] == 0
prob += ge_pv[t] == 0
prob += ge_bat[t] == 0
# Záporný prodej (sell < 0): výboj baterie jen před extrémně záporným buy (v11).
# Export FVE při sell<0: spot = nabíjení/curtail A; ventil jen pole B při plné baterii.
if s.sell_price < 0:
prob += w_arb[t] == 0
prob += bd[t] <= pulp.lpSum(ev_via_bat[e][t] for e in range(EV))
# buy<0: export už zakázán výše; neaplikovat sell<0 ventil (bilance / infeasible).
if float(s.buy_price) < 0.0:
continue
block_neg_sell_export_t = bool(
getattr(grid, "block_export_on_negative_sell", False)
)
if t not in neg_sell_bat_dump_slots:
prob += ge_bat[t] == 0
ev_cap_neg = sum(
float(vehicles[e].max_charge_power_w)
for e in range(EV)
if (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected)
)
load_neg = (
float(s.load_baseline_w)
+ ev_cap_neg
+ float(heat_pump.rated_heating_power_w)
)
pv_surplus_neg_w = max(
0.0,
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_neg,
)
# FVE→síť při záporném výkupu: u KV1 (block_export) jen bc/curtail A;
# u home-01 s polem B musí přebytek jít do sítě (ge_pv), jinak infeasible.
block_pv_export_neg_sell = bool(
getattr(grid, "block_export_on_negative_sell", False)
) or (
float(s.pv_b_forecast_w) <= 0
and not _pv_forced_vent_export_allowed(
t,
current_soc_wh=current_soc_wh,
battery=battery,
soc_headroom_wh=soc_headroom_wh,
pv_surplus_w=pv_surplus_neg_w,
)
)
if block_pv_export_neg_sell:
prob += ge_pv[t] == 0
# Tvrdý zákaz vývozu jen při block_export_on_negative_sell (KV1).
if block_neg_sell_export_t:
prob += ge[t] == 0
prob += ge_pv[t] == 0
prob += ge_bat[t] == 0
elif purchase_fixed_pre:
# Fixní nákup + spot výkup (BA81, KV1 bez block_export): sell<0 = platíš za vývoz.
prob += ge[t] == 0
prob += ge_pv[t] == 0
elif not purchase_fixed_pre:
# Spot (home-01): ge_pv=0 dokud není plná baterie; pak jen ventil pole B (ne celý surplus).
# Před buy<0 + bc_pv=0: přebytek pole B musí jít do sítě (ge_pv≤pv_b), jinak Infeasible.
before_first_neg_buy = (
first_neg_buy_idx is not None and t < first_neg_buy_idx
)
if before_first_neg_buy and float(s.pv_b_forecast_w) > 0:
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
else:
soc_prev_neg = current_soc_wh if t == 0 else soc[t - 1]
w_pv_b_vent = pulp.LpVariable(
f"w_pv_b_vent_neg_{t}", cat=pulp.LpBinary
)
m_soc_neg = float(battery.soc_max_wh)
prob += soc_prev_neg >= (
m_soc_neg
- soc_headroom_wh
- m_soc_neg * (1 - w_pv_b_vent)
)
prob += ge_pv[t] <= float(s.pv_b_forecast_w) * w_pv_b_vent
soc_prev_expr = current_soc_wh if t == 0 else soc[t - 1]
arb_t = arb_floor_series[t]
soc_low_t = soc_panel_min[t]
# Při relaxovaném dnu (soc_low pod DB min_soc Wh) nesmí větev w_arb=1 znovu vynutit arb_t
# (typicky ~rezerva 20 %) — jinak nejde „vypustit“ baterku k planner floor 5 %.
if soc_low_t < min_soc_wh - 1e-3:
arb_cap_t = min(arb_t, soc_low_t)
else:
arb_cap_t = arb_t
if om == "AUTO" and t in discharge_export_slots:
prob += soc_prev_expr >= (
arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t])
)
prob += bd[t] <= (
battery.max_discharge_power_w * w_arb[t]
+ pulp.lpSum(ev_via_bat[e][t] for e in range(EV))
)
elif om == "AUTO":
# PASSIVE: vlastní spotřeba (bd); export baterie jen ge_bat (ge_bat=0 níže).
prob += soc_prev_expr >= (
arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t])
)
prob += bd[t] <= (
s.load_baseline_w
+ ev_total_t
+ hp[t]
+ bc_pv[t]
+ bc_gi[t]
)
else:
prob += soc_prev_expr >= (
arb_cap_t - (arb_cap_t - soc_low_t) * (1 - w_arb[t])
)
prob += bd[t] <= (
s.load_baseline_w
+ ev_total_t
+ hp[t]
+ bc_pv[t]
+ bc_gi[t]
+ battery.max_discharge_power_w * w_arb[t]
)
# Významný export z baterie ⇒ koncové SoC ≥ podlaha (FVE export ge_pv bez této podlahy).
m_ge = float(grid.max_export_power_w)
m_soc_bigm = float(battery.usable_capacity_wh)
if t in neg_sell_bat_dump_slots:
prob += ge_bat[t] <= m_ge
else:
prob += ge_bat[t] <= m_ge * z_export[t]
prob += ge_bat[t] >= GE_MIN_EXPORT_W * z_export[t]
# Bez hluboké relaxace: export končí ≥ rezerva. Při hluboké relaxaci (soc_panel_min pod min_soc)
# sladit s LP spodkem — jinak z_export vynutil arb_base a blokoval vývoz k planner floor.
if (
om == "AUTO"
and first_neg_sell_idx is not None
and t < first_neg_sell_idx
and floor_pct is not None
):
export_soc_floor_t = float(planner_floor_effective_wh)
elif om == "AUTO" and t in pre_neg_buy_discharge_ts:
export_soc_floor_t = float(min_soc_wh)
elif (
om == "AUTO"
and t in morning_pre_neg_export_ts
and floor_pct is not None
):
export_soc_floor_t = float(planner_floor_effective_wh)
elif soc_panel_min[t] < min_soc_wh - 1e-3:
export_soc_floor_t = float(soc_panel_min[t])
else:
export_soc_floor_t = float(arb_base_wh)
# Večerní exportní slot: podlaha jen min_soc (ne safety ramp), aby šlo vybít při z_export=1.
if (
om == "AUTO"
and t in discharge_export_slots
and t in evening_peak_export_ts
):
export_soc_floor_t = float(min_soc_wh)
# Safety export floor: v běžných (ne high-sell) slotech nevybít exportem energii potřebnou pro
# robustnost/noční baseload. Použije se pouze pokud je safety target v SQL vyplněný.
tgt_s = slots[t].safety_soc_target_wh if daytime_en else None
if (
tgt_s is not None
and not high_sell_slot[t]
and t not in profitable_export_ts_pre
and not (
om == "AUTO"
and t in discharge_export_slots
and t in evening_peak_export_ts
)
):
export_soc_floor_t = max(
export_soc_floor_t,
min(
float(battery.soc_max_wh),
max(min_soc_wh, float(tgt_s)),
),
)
prob += soc[t] >= export_soc_floor_t - m_soc_bigm * (1 - z_export[t])
# EV limity a připojení
for e in range(EV):
connected = (
(e == 0 and s.ev1_connected) or
(e == 1 and s.ev2_connected)
)
if not connected:
prob += ev_direct[e][t] == 0
prob += ev_via_bat[e][t] == 0
else:
prob += ev_direct[e][t] + ev_via_bat[e][t] <= vehicles[e].max_charge_power_w
for tt, cv, prev in commit_lp:
prob += cv >= prev - (bc_pv[tt] + bc_gi[tt])
if om == "SELF_SUSTAIN":
for t in range(T):
prob += gi[t] <= slots[t].load_baseline_w
elif om == "PRESERVE":
for t in range(T):
prob += bc_pv[t] == 0
prob += bc_gi[t] == 0
prob += bd[t] == 0
elif om == "CHARGE_CHEAP":
for t in range(T):
prob += ge[t] == 0
prob += ge_pv[t] == 0
prob += ge_bat[t] == 0
prob += bd[t] == 0
# Slot pre-selection (z DB fn_load_planning_slots_full → allow_*)
if om == "AUTO":
for t in range(T):
s = slots[t]
sell_t_pre = float(s.sell_price)
pv_surplus_w = max(
0,
int(s.pv_a_forecast_w)
+ int(s.pv_b_forecast_w)
- int(s.load_baseline_w),
)
pv_surplus_for_gi = pv_surplus_w
if (
t in charge_slots
and sell_t_pre < 0
and pv_surplus_for_gi > 0
and float(s.buy_price) >= 0.0
):
prob += bc_gi[t] == 0
before_neg_buy = (
first_neg_buy_idx is not None and t < first_neg_buy_idx
)
if before_neg_buy and sell_t_pre < 0.0 and pv_surplus_w > 0:
# Ranní sell<0 před buy<0: PV do sítě/curtail, ne do baterie (kapacita na import).
prob += bc_pv[t] == 0
elif t not in charge_slots:
if float(s.buy_price) >= 0.0:
prob += bc_gi[t] == 0
if pv_surplus_w <= 0:
prob += bc_pv[t] == 0
else:
prob += bc_pv[t] <= float(pv_surplus_w)
if (
t not in discharge_export_slots
and t not in neg_sell_bat_dump_slots
and t not in pre_neg_buy_discharge_ts
):
prob += ge_bat[t] == 0
prob += z_export[t] == 0
# Ekonomické guardy: ceny v objective nestačí proti maskám / terminal SoC.
# Referenční buy jen z ne-záporných slotů: jinak jeden buy<0 v horizontu označí
# téměř všechny sloty jako „drahé“ (gi=0 pro dům) → Infeasible (home-01).
non_negative_buys = [
float(s.buy_price) for s in slots if float(s.buy_price) >= 0.0
]
ref_buy_horizon = (
min(non_negative_buys)
if non_negative_buys
else min(float(s.buy_price) for s in slots)
)
min_spread = float(degradation_cost_effective)
for t in range(T):
s = slots[t]
buy_t = float(s.buy_price)
sell_t = float(s.sell_price)
load_t = float(s.load_baseline_w)
ev_cap_t = sum(
float(vehicles[e].max_charge_power_w)
for e in range(EV)
if (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected)
)
pv_surplus_w = max(
0.0,
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_t,
)
# FVE export: před prvním sell<0 smí jít přebytek do sítě (kladný sell), pak nabít
# v záporném okně z PV. Jinak držet energii na future_sell peak.
allow_pre_neg_pv_export = (
first_neg_sell_idx is not None
and pre_neg_export_last_t is not None
and t <= pre_neg_export_last_t
and sell_t >= 0
and (
first_neg_buy_idx is None
or t < first_neg_buy_idx
)
)
pv_store_val = _pv_store_value_czk_kwh(s, min_spread)
skip_pv_store_block = (
float(s.pv_b_forecast_w) > 0
and not getattr(grid, "block_export_on_negative_sell", False)
and sell_t < 0
and buy_t >= 0.0
and not purchase_fixed_pre
) or (
# KV1: plná baterie + kladný sell — neblokovat ge_pv==0 (jinak masivní curtail).
getattr(grid, "block_export_on_negative_sell", False)
and sell_t >= 0
and pv_surplus_w > 500
)
# BA81: export pole B jen při kladném sell (po sell<0 jinak ge==0 výše).
fixed_pv_b_export_cap = (
purchase_fixed_pre
and float(s.pv_b_forecast_w) > 0
and not getattr(grid, "block_export_on_negative_sell", False)
and sell_t >= 0
)
if fixed_pv_b_export_cap:
if z_gen_cutoff is not None:
prob += ge_pv[t] <= float(s.pv_b_forecast_w) * (1 - z_gen_cutoff[t])
else:
prob += ge_pv[t] <= max(0.0, float(s.pv_b_forecast_w))
if (
not allow_pre_neg_pv_export
and not skip_pv_store_block
and not fixed_pv_b_export_cap
and sell_t < pv_store_val
and not _pv_forced_vent_export_allowed(
t,
current_soc_wh=current_soc_wh,
battery=battery,
soc_headroom_wh=soc_headroom_wh,
pv_surplus_w=pv_surplus_w,
)
):
prob += ge_pv[t] == 0
# Při `sell < 0` exportovat MAX pole B (má green bonus 7+ Kč/kWh → čistá hodnota
# i při sell=-1 = +6 Kč). Pole A green bonus nemá → export A za sell<0 je čistá ztráta.
# Constraint: ge_pv ≤ pv_b_forecast_w (pole A jde do baterie / curtail).
# Aplikuje se jen u sites bez block_export_on_negative_sell (home-01 áno; KV1 ne)
# A jen pokud reálně existuje pole B (pv_b_forecast_w > 0 — jinak by ge_pv ≤ 0
# zablokovalo legitimní pre-neg-pv export pole A z testů).
if (
sell_t < 0
and buy_t >= 0.0
and float(s.pv_b_forecast_w) > 0
and not getattr(grid, "block_export_on_negative_sell", False)
):
prob += ge_pv[t] <= float(s.pv_b_forecast_w)
# Drahý nákup: dům + TČ z baterie (ne import ze sítě); síť jen EV (+ případně TČ).
# Spot (home-01): buy > min ne-záporného buy v horizontu.
# Fixní tarif (KV1): navíc buy > charge_acquisition (konstantní buy ≈ ref).
expensive_import_slot = buy_t > ref_buy_horizon + min_spread
if fixed_tariff_like_pre:
expensive_import_slot = expensive_import_slot or (
buy_t > charge_acquisition_czk_kwh + min_spread
)
if expensive_import_slot and t not in charge_slots and buy_t >= 0.0:
# Strict: síť jen EV+TČ; baseload z baterie/FVE. Relaxed: síť smí krmit baseload (nouzový režim).
prob += gi[t] <= ev_cap_t + hp[t] + (
float(s.load_baseline_w) if relaxed_expensive_import else 0.0
)
if not relaxed_expensive_import and om == "AUTO":
prob += (
bd[t] + pv_ld[t]
>= float(s.load_baseline_w) + hp[t]
)
# Anti souběžný vývoz FVE + významný import (mikrocyklus).
if buy_t > sell_t + min_spread and pv_surplus_w > 0:
prob += ge_pv[t] <= pv_surplus_w
# Deadline constraints pro EV
for e, session in enumerate(ev_sessions):
if session and session.target_deadline and session.energy_needed_wh > 0:
t_dl = next(
(t for t, s in enumerate(slots) if s.interval_start >= session.target_deadline),
T - 1
)
prob += pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(t_dl + 1)
if (e == 0 and slots[t].ev1_connected) or (e == 1 and slots[t].ev2_connected)
) >= session.energy_needed_wh
# TUV look-ahead podle tuv_usage_stats (DOW+hodina, konvence jako v DB)
if (
tuv_delta_stats
and heat_pump.rated_heating_power_w > 0
and getattr(heat_pump, "tuv_min_temp_c", 0) is not None
):
tuv_pred = float(current_tuv_temp_c)
tgt = float(getattr(heat_pump, "tuv_target_temp_c", 55.0) or 55.0)
thr = float(heat_pump.tuv_min_temp_c) + 5.0
for t in range(T):
dow, hour = _prague_dow_hour(slots[t].interval_start)
delta = tuv_delta_stats.get((dow, hour), -0.1)
tuv_pred += float(delta) * INTERVAL_H
if tuv_pred < thr:
prob += (
pulp.lpSum(hp[s] for s in range(max(0, t - 8), t + 1))
>= heat_pump.rated_heating_power_w * 0.5
)
tuv_pred = tgt
# Nouzový ohřev TUV
if current_tuv_temp_c < heat_pump.tuv_min_temp_c:
prob += hp[0] >= heat_pump.rated_heating_power_w * 0.8
# SoC bezpečnostní buffer vyhodnocený až na konci 24h horizontu
eod_idx = min(T - 1, int(24 / INTERVAL_H) - 1)
prob += soc_deficit_24h >= soc_buffer_target_wh - soc[eod_idx]
# --- Řešení (HiGHS přes highspy / PuLP API; bez externí binárky HiGHS_CMD) ---
t_start = time.monotonic()
try:
solver = pulp.getSolver(
"HiGHS", msg=False, timeLimit=SOLVER_TIME_LIMIT
)
except Exception:
logger.warning("HiGHS nedostupný, používám CBC fallback")
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT)
status = prob.solve(solver)
duration_ms = int((time.monotonic() - t_start) * 1000)
if pulp.LpStatus[status] != "Optimal":
if not relaxed_expensive_import:
logger.warning(
"solve_dispatch Infeasible, retry with relaxed_expensive_import "
"(grid may supply baseload in expensive slots)"
)
return solve_dispatch(
slots,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
relaxed_expensive_import=True,
)
if not relaxed_neg_buy_charge:
logger.warning(
"solve_dispatch still Infeasible, retry without neg_buy_charge_shortfall"
)
return solve_dispatch(
slots,
battery,
heat_pump,
grid,
ev_sessions,
vehicles,
current_soc_wh,
current_tuv_temp_c,
tuv_delta_stats=tuv_delta_stats,
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
relaxed_expensive_import=True,
relaxed_neg_buy_charge=True,
)
raise RuntimeError(f"Solver: {pulp.LpStatus[status]}")
# --- Post-processing ---
results = []
for t in range(T):
hp_raw = pulp.value(hp[t])
hp_on = hp_raw > heat_pump.rated_heating_power_w * 0.3
bc_tot = float(pulp.value(bc_pv[t]) or 0) + float(pulp.value(bc_gi[t]) or 0)
batt_w = round(bc_tot - float(pulp.value(bd[t]) or 0))
grid_w = round(pulp.value(gi[t]) - pulp.value(ge[t]))
soc_pct = round(pulp.value(soc[t]) / battery.usable_capacity_wh * 100, 1)
export_limit_w = int(grid.max_export_power_w) if grid_w < 0 else 0
ge_bat_w = round(float(pulp.value(ge_bat[t]) or 0))
export_mode = "NONE"
if grid_w < 0:
export_mode = (
"BATTERY_SELL"
if ge_bat_w >= GE_MIN_EXPORT_W
else "PV_SURPLUS"
)
# Deye: default PASSIVE (střídač pokryje load). CHARGE/SELL jen v maskovaných AUTO slotech.
deye_mode = "PASSIVE"
if om == "AUTO":
if (
slots[t].allow_discharge_export
and batt_w < 0
and grid_w < 0
):
deye_mode = "SELL"
elif slots[t].allow_charge and batt_w > 0 and grid_w > 0:
deye_mode = "CHARGE"
elif batt_w < 0 and grid_w < 0:
deye_mode = "SELL"
elif batt_w > 0 and grid_w > 0:
deye_mode = "CHARGE"
deye_gen_cutoff = None
if z_gen_cutoff is not None:
deye_gen_cutoff = bool(round(float(pulp.value(z_gen_cutoff[t]) or 0)))
cashflow_czk_t = (
pulp.value(gi[t]) * slots[t].buy_price * INTERVAL_H / 1000
- pulp.value(ge[t]) * slots[t].sell_price * INTERVAL_H / 1000
)
ge_bat_value = float(pulp.value(ge_bat[t]) or 0)
battery_arbitrage_czk_t = (
ge_bat_value
* (float(slots[t].sell_price) - float(charge_acquisition_czk_kwh))
* INTERVAL_H
/ 1000.0
)
penalty_terms_t = 0.0
for _tt, _sf, _cap in peak_export_shortfall:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_sf) or 0.0)
* PEAK_EXPORT_SHORTFALL_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
)
for _tt, _sf, _cap in pv_charge_shortfall:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_sf) or 0.0)
* PV_CHARGE_SHORTFALL_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
)
for _tt, _sf, _cap in neg_sell_bat_dump_shortfall:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_sf) or 0.0)
* NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH
* INTERVAL_H
/ 1000.0
)
for _tt, _us in neg_sell_soc_underfill:
if _tt == t:
penalty_terms_t += (
float(pulp.value(_us) or 0.0)
* NEG_SELL_SOC_UNDERFILL_PENALTY_CZK_PER_WH
)
sv_t = safety_vars[t]
if sv_t is not None:
penalty_terms_t += float(pulp.value(sv_t) or 0.0) * safety_pen_czk_per_wh[t]
for _tt, _cv, _prev in commit_lp:
if _tt == t:
penalty_terms_t += float(pulp.value(_cv) or 0.0) * INTERVAL_H / 1000.0 * commit_pen
penalty_terms_t += float(pulp.value(ca[t]) or 0.0) * CURTAILMENT_PENALTY
green_bonus_czk_t = float(
getattr(slots[t], "green_bonus_czk_per_slot", 0.0) or 0.0
)
cost = cashflow_czk_t
results.append(DispatchResult(
interval_start = slots[t].interval_start,
battery_setpoint_w = batt_w,
battery_soc_target = soc_pct,
grid_setpoint_w = grid_w,
export_limit_w = export_limit_w,
export_mode = export_mode,
deye_physical_mode = deye_mode,
deye_gen_cutoff_enabled = deye_gen_cutoff,
ev1_setpoint_w = round(pulp.value(ev_direct[0][t]) + pulp.value(ev_via_bat[0][t]))
if slots[t].ev1_connected else None,
ev2_setpoint_w = round(pulp.value(ev_direct[1][t]) + pulp.value(ev_via_bat[1][t]))
if slots[t].ev2_connected else None,
ev1_via_bat_w = round(pulp.value(ev_via_bat[0][t])),
ev2_via_bat_w = round(pulp.value(ev_via_bat[1][t])),
heat_pump_enabled = hp_on,
heat_pump_setpoint_w = heat_pump.rated_heating_power_w if hp_on else 0,
pv_a_curtailed_w = round(pulp.value(ca[t])),
expected_cost_czk = round(cost, 4),
effective_buy_price = slots[t].buy_price,
effective_sell_price = slots[t].sell_price,
is_predicted_price = bool(slots[t].is_predicted_price),
cashflow_czk = round(cashflow_czk_t, 4),
battery_arbitrage_czk = round(battery_arbitrage_czk_t, 4),
penalty_czk = round(penalty_terms_t, 4),
green_bonus_czk = round(green_bonus_czk_t, 4),
))
sell_rank = sorted(range(T), key=lambda i: float(slots[i].sell_price), reverse=True)[: min(3, T)]
charge_commit_snapshot = [
{
"slot": slots[tt].interval_start.isoformat(),
"previous_charge_w": prev,
"shortfall_w": float(pulp.value(cv) or 0.0),
}
for tt, cv, prev in commit_lp
]
masks_snap: list[dict[str, Any]] = []
soc_bounds_snap: list[dict[str, Any]] = []
objective_terms_snap: list[dict[str, Any]] = []
for t in range(T):
st = slots[t]
masks_snap.append(
{
"slot": st.interval_start.isoformat(),
"allow_charge": bool(st.allow_charge),
"allow_discharge_export": bool(st.allow_discharge_export),
}
)
tgt_s = st.safety_soc_target_wh if daytime_en else None
# Export floor pro debug snapshot (kopie logiky z constraintů výše).
if soc_panel_min[t] < min_soc_wh - 1e-3:
export_floor_wh = float(soc_panel_min[t])
export_floor_reason = "deep_relax"
else:
export_floor_wh = float(arb_base_wh)
export_floor_reason = "arb_base"
if tgt_s is not None and not high_sell_slot[t]:
export_floor_wh = max(
export_floor_wh,
min(
float(battery.soc_max_wh),
max(min_soc_wh, float(tgt_s)),
),
)
export_floor_reason = "safety_export_floor"
soc_bounds_snap.append(
{
"slot": st.interval_start.isoformat(),
"soc_min_wh": float(soc_panel_min[t]),
"arb_floor_wh": float(arb_floor_series[t]),
"soc_panel_min_wh": float(soc_panel_min[t]),
"safety_soc_target_wh": float(tgt_s) if tgt_s is not None else None,
"export_soc_floor_wh": float(export_floor_wh),
"export_floor_reason": export_floor_reason,
"high_sell_slot": bool(high_sell_slot[t]),
}
)
fb = float(st.future_avoided_buy_czk_kwh or st.buy_price)
fs = float(st.future_sell_opportunity_czk_kwh or st.sell_price)
bv = max(fb, fs) - float(degradation_cost_effective)
bv = max(0.0, min(5.0, bv))
pen_wh = bv / 1000.0 if tgt_s is not None else 0.0
sv = safety_vars[t]
sdv = float(pulp.value(sv) or 0.0) if sv is not None else None
cshort = next((float(pulp.value(cv) or 0.0) for tt, cv, _p in commit_lp if tt == t), None)
objective_terms_snap.append(
{
"slot": st.interval_start.isoformat(),
"buy_price": float(st.buy_price),
"sell_price": float(st.sell_price),
"future_avoided_buy_czk_kwh": float(st.future_avoided_buy_czk_kwh or st.buy_price),
"future_sell_opportunity_czk_kwh": float(
st.future_sell_opportunity_czk_kwh or st.sell_price
),
"battery_value_czk_kwh": float(bv),
"safety_deficit_penalty_czk_per_wh": float(pen_wh) if safety_active[t] else 0.0,
"safety_penalty_active": bool(safety_active[t]),
"safety_deficit_wh": sdv,
"commitment_shortfall_w": cshort,
"commitment_penalty_czk_kwh": float(commit_pen) if cshort is not None else None,
"acquisition_used_czk_kwh": float(charge_acquisition_czk_kwh),
"grid_charge_suppressed_reason": getattr(
st, "grid_charge_suppressed_reason", None
),
"pv_charge_wh_ahead": float(
getattr(st, "pv_charge_wh_ahead", 0.0) or 0.0
),
"min_buy_before_cutoff_czk_kwh": (
float(st.min_buy_before_cutoff_czk_kwh)
if getattr(st, "min_buy_before_cutoff_czk_kwh", None) is not None
else None
),
}
)
night0 = slots[0]
solver_snapshot: dict[str, Any] = {
"version": 1,
"planner_build_tag": PLANNER_BUILD_TAG,
"inputs": {
"current_soc_wh": float(current_soc_wh),
"soc_headroom_applied_wh": soc_headroom_applied_wh,
"operating_mode": operating_mode,
"planner_version": planner_version_resolved,
"battery": {
"usable_capacity_wh": float(battery.usable_capacity_wh),
"min_soc_wh": float(battery.min_soc_wh),
"reserve_soc_wh": float(getattr(battery, "reserve_soc_wh", 0.0)),
"degradation_cost_czk_kwh": float(battery.degradation_cost_czk_kwh),
"planner_terminal_soc_value_factor": float(battery.planner_terminal_soc_value_factor),
"planner_daytime_charge_target_enabled": daytime_en,
"planner_charge_commitment_penalty_czk_kwh": float(commit_pen),
},
"load_first_enabled": om == "AUTO",
"relaxed_expensive_import": relaxed_expensive_import,
"charge_acquisition_buy_czk_kwh": charge_acquisition_czk_kwh,
"charge_acquisition_cutoff_at": (
slots[0].charge_acquisition_cutoff_at.isoformat()
if slots[0].charge_acquisition_cutoff_at is not None
else None
),
},
"masks": masks_snap,
"soc_bounds": soc_bounds_snap,
"objective_terms": objective_terms_snap,
"chosen_slots": {
"charge_commitment": charge_commit_snapshot,
"high_sell_windows": [slots[i].interval_start.isoformat() for i in sell_rank],
"night_window": {
"definition": "Europe/Prague 20:0006:00 projected baseload Wh (fn_load_planning_slots_full)",
"target_wh": night0.night_baseload_target_wh,
"buffer_wh": night0.night_baseload_buffer_wh,
},
},
}
return results, duration_ms, solver_snapshot
# ============================================================
# Denní plán (15:00)
# ============================================================
async def run_daily_plan(
site_id: int,
db,
triggered_by: str = "scheduler:daily",
*,
planner_version: str | None = None,
) -> tuple[int, int]:
"""
Hlavní denní plánování. Spouštět v 15:00 po importu cen (14:00)
a aktualizaci forecastu (14:30).
Horizont: `ems.fn_planning_horizon_end` (OTE, strop a práh v SQL).
"""
now = datetime.now(timezone.utc)
horizon_from = _current_slot_start(now)
horizon_to = await _planning_horizon_end(site_id, horizon_from, db)
if horizon_to is None:
horizon_to = horizon_from + timedelta(hours=_DAILY_FALLBACK_HORIZON_HOURS)
logger.warning(
"[site=%s] Daily plan: fn_planning_horizon_end NULL, fallback %.1fh",
site_id,
_DAILY_FALLBACK_HORIZON_HOURS,
)
logger.info(f"[site={site_id}] Daily plan: {horizon_from}{horizon_to}")
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode, tuv_stats = (
await _load_site_context(site_id, db)
)
planner_version_resolved = _planner_engine_version(planner_version)
slots = await _load_slots(site_id, horizon_from, horizon_to, db, soc_wh=soc_wh)
om = operating_mode or "AUTO"
if om == "AUTO":
results, duration_ms, solver_snapshot = solve_dispatch_two_pass(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
tuv_delta_stats=tuv_stats,
operating_mode=om,
planner_version=planner_version_resolved,
)
else:
results, duration_ms, solver_snapshot = solve_dispatch(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
tuv_delta_stats=tuv_stats,
operating_mode=om,
planner_version=planner_version_resolved,
)
comparison_ctx = _maybe_add_planner_comparison(
slots=slots,
battery=battery,
heat_pump=hp,
grid=grid,
ev_sessions=ev_sessions,
vehicles=vehicles,
current_soc_wh=soc_wh,
current_tuv_temp_c=tuv_temp,
operating_mode=om,
tuv_delta_stats=tuv_stats,
active_version=planner_version_resolved,
)
if comparison_ctx is not None:
peer_results = comparison_ctx["peer_results"]
peer_ms = comparison_ctx["peer_ms"]
peer_snapshot = comparison_ctx["peer_snapshot"]
solver_snapshot["comparison"] = _dispatch_result_comparison(
results,
duration_ms,
planner_version_resolved,
peer_results,
peer_ms,
comparison_ctx["peer_version"],
)
slot_inputs = _build_slot_inputs(slots, slots)
run_id = await _save_planning_run(
site_id,
results,
horizon_from,
horizon_to,
run_type="daily",
triggered_by=triggered_by,
replan_from=None,
soc_wh=soc_wh,
duration_ms=duration_ms,
correction=1.0,
db=db,
slot_inputs=slot_inputs,
solver_snapshot=solver_snapshot,
)
if comparison_ctx is not None:
compare_snapshot = dict(peer_snapshot)
compare_snapshot["comparison_of_run_id"] = run_id
compare_snapshot["compare_peer_version"] = comparison_ctx["peer_version"]
await _save_planning_run(
site_id,
comparison_ctx["peer_results"],
horizon_from,
horizon_to,
run_type="daily",
triggered_by=f"{triggered_by}:compare",
replan_from=None,
soc_wh=soc_wh,
duration_ms=comparison_ctx["peer_ms"],
correction=1.0,
db=db,
slot_inputs=slot_inputs,
activate_run=False,
solver_snapshot=compare_snapshot,
)
logger.info(f"[site={site_id}] Daily plan done in {duration_ms} ms")
return run_id, duration_ms
# ============================================================
# Rolling replan (každých 15min)
# ============================================================
async def run_rolling_replan(
site_id: int,
db,
*,
triggered_by: str = "scheduler:rolling",
allow_skip: bool = True,
planner_version: str | None = None,
) -> tuple[Optional[int], Optional[int]]:
"""
Rolling replan každých 15 minut.
1. Zjistí aktuální SoC baterie z telemetrie
2. Spočítá korekční faktor FVE forecastu z poslední hodiny
3. Aplikuje korekci na forecast zbytku dne (s útlumem)
4. Spustí solver pro zbývající horizont aktivního plánu
5. Uloží jako nový planning_run (aktivní plán se stane superseded)
Pokud allow_skip=True (scheduler) a horizont je vyčerpaný → vrátí (None, None).
Pokud allow_skip=False (API) → spustí denní plán jako náhradu.
"""
now = datetime.now(timezone.utc)
replan_from = _current_slot_start(now)
planner_version_resolved = _planner_engine_version(planner_version)
ar_raw = await db.fetchval(
"select ems.fn_planning_active_run($1::int)",
site_id,
)
ar = ar_raw if isinstance(ar_raw, dict) else json.loads(ar_raw)
if ar.get("error") == "no_active_plan":
logger.warning(f"[site={site_id}] Rolling replan: no active plan, triggering daily plan")
return await run_daily_plan(
site_id,
db,
triggered_by=triggered_by,
planner_version=planner_version_resolved,
)
horizon_to = await _planning_horizon_end(site_id, replan_from, db)
if horizon_to is None:
if allow_skip:
logger.info(
"[site=%s] Rolling replan: fn_planning_horizon_end NULL (krátký OTE horizont), skipping",
site_id,
)
return None, None
logger.warning(
"[site=%s] Rolling replan: fn_planning_horizon_end NULL, running daily plan",
site_id,
)
return await run_daily_plan(
site_id,
db,
triggered_by=triggered_by,
planner_version=planner_version_resolved,
)
if (horizon_to - replan_from).total_seconds() < 1800:
if allow_skip:
logger.info(f"[site={site_id}] Rolling replan: horizon almost exhausted, skipping")
return None, None
logger.info(f"[site={site_id}] Rolling replan: horizon exhausted, running daily plan")
return await run_daily_plan(
site_id,
db,
triggered_by=triggered_by,
planner_version=planner_version_resolved,
)
logger.info(f"[site={site_id}] Rolling replan from {replan_from}{horizon_to}")
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode, tuv_stats = (
await _load_site_context(site_id, db)
)
slots = await _load_slots(site_id, replan_from, horizon_to, db, soc_wh=soc_wh)
# PV forecast korekce je kanonicky v DB (delta + rolling faktor + decay) a do LP vstupuje přes
# ems.fn_load_planning_slots_full. Pro audit/debug ale chceme ukládat i RAW (bez korekcí).
correction_factor, correction_log = 1.0, {
"window_start": None,
"window_end": None,
"actual_pv_wh": None,
"forecast_pv_wh": None,
"correction_factor": None,
"reason": "canonical_db",
}
# RAW PV pro slot_inputs: přímý součet nejnovějších forecast_pv_interval per array/slot (bez delta/rolling).
raw_pv_rows = await db.fetchval(
"select ems.fn_forecast_pv_slots_range_raw_ab($1::int, $2::timestamptz, $3::timestamptz)",
site_id,
replan_from,
horizon_to,
)
raw_pv = raw_pv_rows if isinstance(raw_pv_rows, list) else json.loads(raw_pv_rows)
raw_by_ts: dict[str, tuple[int, int]] = {}
if isinstance(raw_pv, list):
for r in raw_pv:
if not isinstance(r, dict):
continue
ts = r.get("interval_start")
if isinstance(ts, str):
raw_by_ts[ts] = (
int(r.get("pv_a_forecast_raw_w") or 0),
int(r.get("pv_b_forecast_raw_w") or 0),
)
slots_raw_pv: list[PlanningSlot] = []
for s in slots:
key = s.interval_start.isoformat()
pva, pvb = raw_by_ts.get(key, (s.pv_a_forecast_w, s.pv_b_forecast_w))
slots_raw_pv.append(replace(s, pv_a_forecast_w=pva, pv_b_forecast_w=pvb))
commitment_prev = await _load_previous_plan_charge_commitment_prev_w(site_id, slots, db)
om = operating_mode or "AUTO"
if om == "AUTO":
results, duration_ms, solver_snapshot = solve_dispatch_two_pass(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
tuv_delta_stats=tuv_stats,
operating_mode=om,
charge_commitment_prev_w=commitment_prev,
planner_version=planner_version_resolved,
)
else:
results, duration_ms, solver_snapshot = solve_dispatch(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
tuv_delta_stats=tuv_stats,
operating_mode=om,
charge_commitment_prev_w=commitment_prev,
planner_version=planner_version_resolved,
)
comparison_ctx = _maybe_add_planner_comparison(
slots=slots,
battery=battery,
heat_pump=hp,
grid=grid,
ev_sessions=ev_sessions,
vehicles=vehicles,
current_soc_wh=soc_wh,
current_tuv_temp_c=tuv_temp,
operating_mode=om,
tuv_delta_stats=tuv_stats,
active_version=planner_version_resolved,
charge_commitment_prev_w=commitment_prev,
)
if comparison_ctx is not None:
peer_results = comparison_ctx["peer_results"]
peer_ms = comparison_ctx["peer_ms"]
solver_snapshot["comparison"] = _dispatch_result_comparison(
results,
duration_ms,
planner_version_resolved,
peer_results,
peer_ms,
comparison_ctx["peer_version"],
)
slot_inputs = _build_slot_inputs(slots_raw_pv, slots)
run_id = await _save_planning_run(
site_id,
results,
replan_from,
horizon_to,
run_type="rolling",
triggered_by=triggered_by,
replan_from=replan_from,
soc_wh=soc_wh,
duration_ms=duration_ms,
correction=correction_factor,
db=db,
slot_inputs=slot_inputs,
solver_snapshot=solver_snapshot,
)
if comparison_ctx is not None:
compare_snapshot = dict(comparison_ctx["peer_snapshot"])
compare_snapshot["comparison_of_run_id"] = run_id
compare_snapshot["compare_peer_version"] = comparison_ctx["peer_version"]
await _save_planning_run(
site_id,
comparison_ctx["peer_results"],
replan_from,
horizon_to,
run_type="rolling",
triggered_by=f"{triggered_by}:compare",
replan_from=replan_from,
soc_wh=soc_wh,
duration_ms=comparison_ctx["peer_ms"],
correction=correction_factor,
db=db,
slot_inputs=slot_inputs,
activate_run=False,
solver_snapshot=compare_snapshot,
)
# Historický log rolling korekce: dřív se psal z Pythonu. Nově se rolling faktor počítá v DB
# v kanonické PV řadě; log se případně přesune do DB (todo).
logger.info(f"[site={site_id}] Rolling replan done in {duration_ms} ms (pv=canonical_db)")
return run_id, duration_ms
async def run_plan_api(
site_id: int,
plan_type: str,
db,
*,
triggered_by: str = "api",
planner_version: str | None = None,
) -> tuple[int, int]:
"""Ruční / UI spuštění plánu. Vždy vrátí (run_id, solver_duration_ms)."""
pt = plan_type.lower().strip()
planner_version_resolved = _planner_engine_version(planner_version)
if pt == "daily":
return await run_daily_plan(
site_id,
db,
triggered_by=triggered_by,
planner_version=planner_version_resolved,
)
if pt == "rolling":
rid, ms = await run_rolling_replan(
site_id,
db,
triggered_by=triggered_by,
allow_skip=False,
planner_version=planner_version_resolved,
)
if rid is None or ms is None:
raise RuntimeError("Rolling replan did not return a run")
return rid, ms
raise ValueError(f"Unknown plan_type: {plan_type!r} (use daily or rolling)")
# ============================================================
# Pomocné funkce
# ============================================================
def _current_slot_start(dt: datetime) -> datetime:
"""Zaokrouhlí čas dolů na začátek aktuálního 15min slotu."""
minute = (dt.minute // 15) * 15
return dt.replace(minute=minute, second=0, microsecond=0)
def _parse_json_dt(val: object) -> Optional[datetime]:
if val is None:
return None
if isinstance(val, datetime):
return val if val.tzinfo else val.replace(tzinfo=timezone.utc)
return datetime.fromisoformat(str(val).replace("Z", "+00:00"))
def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]:
if obj is None or obj == []:
return None
if isinstance(obj, str):
obj = json.loads(obj)
if not isinstance(obj, dict):
return None
td = _parse_json_dt(obj.get("target_deadline"))
if td is None:
return None
return SimpleNamespace(
target_deadline=td,
energy_needed_wh=float(obj["energy_needed_wh"]),
)
async def _load_site_context(site_id: int, db):
"""
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL).
"""
raw = await db.fetchval(
"select ems.fn_planning_site_context($1::int)",
site_id,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise RuntimeError(f"Site not found: {site_id}")
b = ctx["battery"]
ec_i = int(b["max_charge_power_w"])
ed_i = int(b["max_discharge_power_w"])
planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"]))
floor_pct = b.get("planner_discharge_floor_percent")
buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh")
relax_prewin = b.get("planner_discharge_relax_prewindow_slots")
battery = SimpleNamespace(
usable_capacity_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
arb_floor_wh=float(b["arb_floor_wh"]),
reserve_soc_wh=float(b["reserve_soc_wh"]),
soc_max_wh=planner_soc_max,
charge_efficiency=float(b["charge_efficiency"]),
discharge_efficiency=float(b["discharge_efficiency"]),
degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]),
max_charge_power_w=ec_i,
max_discharge_power_w=ed_i,
charge_slot_buffer=float(b["charge_slot_buffer"])
if b.get("charge_slot_buffer") is not None
else 0,
discharge_slot_buffer=float(b["discharge_slot_buffer"])
if b.get("discharge_slot_buffer") is not None
else 0,
planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0,
planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None,
planner_discharge_relax_prewindow_slots=int(relax_prewin)
if relax_prewin is not None
else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]),
planner_daytime_charge_target_enabled=bool(
b.get("planner_daytime_charge_target_enabled", True)
),
planner_night_baseload_buffer_percent=float(
b.get("planner_night_baseload_buffer_percent") or 20.0
),
planner_daytime_charge_price_quantile=float(
b.get("planner_daytime_charge_price_quantile") or 0.70
),
planner_charge_commitment_penalty_czk_kwh=float(
b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20
),
)
hpj = ctx["heat_pump"]
heat_pump = SimpleNamespace(
rated_heating_power_w=int(hpj["rated_heating_power_w"]),
tuv_min_temp_c=float(hpj["tuv_min_temp_c"]),
tuv_target_temp_c=float(hpj["tuv_target_temp_c"]),
)
g = ctx["grid"]
m = ctx.get("market") or {}
grid = SimpleNamespace(
max_import_power_w=int(g["max_import_power_w"]),
max_export_power_w=int(g["max_export_power_w"]),
block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False),
deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False),
purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(),
sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(),
)
vehicles: list[SimpleNamespace] = []
for v in ctx.get("vehicles") or []:
vehicles.append(
SimpleNamespace(
max_charge_power_w=int(v["max_charge_power_w"]),
battery_capacity_kwh=float(v["battery_capacity_kwh"]),
default_target_soc_pct=float(v["default_target_soc_pct"]),
)
)
while len(vehicles) < 2:
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)
)
ev_raw = ctx.get("ev_sessions") or []
ev_sessions = [
_ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None,
_ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None,
]
soc_wh = float(ctx["soc_wh"])
tuv_temp = float(ctx["tuv_temp"])
operating_mode = ctx.get("operating_mode")
tuv_stats: dict[tuple[int, int], float] = {}
for row in ctx.get("tuv_delta_stats") or []:
tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"])
return (
battery,
heat_pump,
grid,
vehicles,
ev_sessions,
soc_wh,
tuv_temp,
operating_mode,
tuv_stats,
)
async def _load_previous_plan_charge_commitment_prev_w(
site_id: int,
slots: list[PlanningSlot],
db,
) -> list[Optional[float]]:
"""
Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty.
Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky).
"""
if not slots:
return []
rows = await db.fetch(
"""
select pi.interval_start,
pi.battery_setpoint_w,
pi.grid_setpoint_w,
coalesce(pi.pv_a_forecast_solver_w, 0) as pva,
coalesce(pi.pv_b_forecast_solver_w, 0) as pvb,
coalesce(pi.load_baseline_w, 0) as lb
from ems.planning_interval pi
inner join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1::int
and pr.status = 'active'
""",
site_id,
)
by_start = {r["interval_start"]: r for r in rows}
out: list[Optional[float]] = []
for s in slots:
r = by_start.get(s.interval_start)
if r is None:
out.append(None)
continue
bw = int(r["battery_setpoint_w"] or 0)
gw = int(r["grid_setpoint_w"] or 0)
pva = int(r["pva"] or 0)
pvb = int(r["pvb"] or 0)
lb = int(r["lb"] or 0)
# Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně
# výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus.
if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500:
out.append(float(bw))
else:
out.append(None)
return out
async def _load_slots(
site_id: int,
from_dt: datetime,
to_dt: datetime,
db,
*,
soc_wh: float,
) -> list[PlanningSlot]:
"""15min sloty z ems.fn_load_planning_slots_full."""
rows = await db.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[PlanningSlot] = []
for r in rows:
d = dict(r)
out.append(
PlanningSlot(
interval_start=d["interval_start"],
buy_price=float(d["buy_price"]),
sell_price=float(d["sell_price"]),
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
load_baseline_w=int(d["load_baseline_w"] or 0),
ev1_connected=bool(d["ev1_connected"]),
ev2_connected=bool(d["ev2_connected"]),
is_predicted_price=bool(d.get("is_predicted_price")),
allow_charge=bool(d.get("allow_charge", True)),
allow_discharge_export=bool(d.get("allow_discharge_export", True)),
night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"),
night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"),
safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"),
future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"),
future_sell_opportunity_czk_kwh=_slot_float_nullable(
d, "future_sell_opportunity_czk_kwh"
),
is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)),
charge_acquisition_buy_czk_kwh=_slot_float_nullable(
d, "charge_acquisition_buy_czk_kwh"
),
charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"),
min_buy_before_cutoff_czk_kwh=_slot_float_nullable(
d, "min_buy_before_cutoff_czk_kwh"
),
pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"),
neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"),
grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"),
)
)
if not out:
raise RuntimeError(
"No planning slots available check market prices and horizon settings"
)
if any(s.is_predicted_price for s in out):
logger.warning(
"[site=%s] Unexpected predicted-price slots in planning horizon",
site_id,
)
return out
def _build_slot_inputs(
slots_raw_pv: list[PlanningSlot],
slots_solver: list[PlanningSlot],
) -> list[tuple[int, int, int, int, int]]:
"""(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot."""
if len(slots_raw_pv) != len(slots_solver):
raise ValueError("slots_raw_pv and slots_solver length mismatch")
out: list[tuple[int, int, int, int, int]] = []
for raw, sol in zip(slots_raw_pv, slots_solver):
out.append(
(
int(raw.load_baseline_w),
int(raw.pv_a_forecast_w),
int(raw.pv_b_forecast_w),
int(sol.pv_a_forecast_w),
int(sol.pv_b_forecast_w),
)
)
return out
async def _save_planning_run(
site_id, results, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction, db,
slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None,
*,
activate_run: bool = True,
solver_snapshot: Optional[dict[str, Any]] = None,
) -> int:
"""Uloží výsledky solveru přes ems.fn_planning_run_commit."""
if slot_inputs is not None and len(slot_inputs) != len(results):
raise ValueError("slot_inputs and results length mismatch")
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": duration_ms,
"forecast_correction_factor": correction,
}
if solver_snapshot is not None:
run_meta["solver_params"] = solver_snapshot
intervals: list[dict] = []
for i, r in enumerate(results):
row: dict = {
"interval_start": r.interval_start.isoformat()
if hasattr(r.interval_start, "isoformat")
else r.interval_start,
"battery_setpoint_w": r.battery_setpoint_w,
"battery_soc_target_pct": r.battery_soc_target,
"grid_setpoint_w": r.grid_setpoint_w,
"export_limit_w": r.export_limit_w,
"export_mode": r.export_mode,
"deye_physical_mode": r.deye_physical_mode,
"deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled,
"ev1_setpoint_w": r.ev1_setpoint_w,
"ev2_setpoint_w": r.ev2_setpoint_w,
"ev1_via_bat_w": r.ev1_via_bat_w,
"ev2_via_bat_w": r.ev2_via_bat_w,
"heat_pump_enabled": r.heat_pump_enabled,
"heat_pump_setpoint_w": r.heat_pump_setpoint_w,
"pv_a_curtailed_w": r.pv_a_curtailed_w,
"expected_cost_czk": float(r.expected_cost_czk),
"cashflow_czk": float(r.cashflow_czk),
"battery_arbitrage_czk": float(r.battery_arbitrage_czk),
"penalty_czk": float(r.penalty_czk),
"green_bonus_czk": float(r.green_bonus_czk),
"effective_buy_price": float(r.effective_buy_price),
"effective_sell_price": float(r.effective_sell_price),
"is_predicted_price": r.is_predicted_price,
}
if slot_inputs is not None:
si = slot_inputs[i]
row["load_baseline_w"] = si[0]
row["pv_a_forecast_raw_w"] = si[1]
row["pv_b_forecast_raw_w"] = si[2]
row["pv_a_forecast_solver_w"] = si[3]
row["pv_b_forecast_solver_w"] = si[4]
intervals.append(row)
return int(
await db.fetchval(
"""
select ems.fn_planning_run_commit(
$1::int, $2::timestamptz, $3::timestamptz,
$4::jsonb, $5::jsonb, $6::boolean
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
json.dumps(intervals, default=str),
activate_run,
)
)