Files
ems/backend/services/planning/db_io.py
Dusan Vojacek 3b5f07b66e feat(planner): EV účtování v2 — headroom fix, deadline boundary, min. výkon WB, via-bat reporting
Hloubková diagnóza EV potvrdila: oportunitní ekonomika via-baterie je v LP
správně, ale okraje lhaly nebo byly nevykonatelné:

- V099 + R__039: ems.ev_session.opportunistic_value_czk_kwh (NULL = zdědit
  z asset_vehicle, 0 = vypnout pro session); headroom_wh z max(target_soc,
  soc_at_connect) — „nenabíjet" (nízký target) už paradoxně NEzvětšuje
  oportunistickou vrstvu; vehicles JSON nese min_power_w wallboxu.
- R__015: patch klíč opportunistic_value_czk_kwh (validace >= 0).
- solver_v2: (a) deadline suma range(t_dl) — slot začínající v deadline už
  nepatří „do deadline"; (b) Σ ev_direct <= gi + PV (fyzikální split);
  (c) binárka ev_on → setpoint ∈ {0} ∪ [min_power_w, max] (konec 400–900 W
  nevykonatelných setpointů); (d) bez session EV == 0 (stop-session i golden
  fixtures — žádné pumpování při buy<0); dekompozice total == needed − unmet
  + opp i pro needed = 0; (e) battery_arbitrage_czk = via_bat kWh × oportunitní
  cena (min sell exportního slotu téhož pražského dne, jinak terminal value)
  místo konstantní 0. Oportunismus PO deadline zůstává POVOLENÝ (rozhodnutí:
  auto často doma, odjezd řeší rolling replan).
- R__033: fn_plan_current_bundle.intervals + ev1/ev2_via_bat_w (UI nemá cenit
  EV kWh z baterie slotovým buy).

Golden gate beze změny snapshotů (v1 nedotčen, fixtures bez EV sessions);
solver_v2_eval před/po identický (CELKEM −1283.5 Kč, Δ −221.9 vs v1);
tests/test_solver_v2.py +7 testů; plná sada 310 passed / 4 xfailed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 19:31:56 +02:00

461 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning/db_io.py
#
# EMS plánovač DB vrstva: načtení site contextu a slotů, uložení běhu
# (Fáze 1 dekompozice, čistý přesun z planning_engine.py).
# Jediné SQL: select ems.fn_* (SQL-first pravidlo CLAUDE.md).
import json
import logging
from datetime import datetime
from types import SimpleNamespace
from typing import Any, Optional
from services.planning.constants import (
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
PLANNER_BUILD_TAG,
)
from services.planning.types import (
PlannerSolverError,
PlanningSlot,
_parse_json_dt,
_slot_float_nullable,
)
logger = logging.getLogger(__name__)
def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]:
if obj is None or obj == []:
return None
if isinstance(obj, str):
obj = json.loads(obj)
if not isinstance(obj, dict):
return None
td = _parse_json_dt(obj.get("target_deadline"))
if td is None:
return None
return SimpleNamespace(
target_deadline=td,
energy_needed_wh=float(obj["energy_needed_wh"]),
headroom_wh=float(obj.get("headroom_wh") or 0.0),
opportunistic_value_czk_kwh=float(obj.get("opportunistic_value_czk_kwh") or 0.0),
)
async def _load_site_context(site_id: int, db):
"""
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL).
"""
raw = await db.fetchval(
"select ems.fn_planning_site_context($1::int)",
site_id,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise RuntimeError(f"Site not found: {site_id}")
b = ctx["battery"]
ec_i = int(b["max_charge_power_w"])
ed_i = int(b["max_discharge_power_w"])
planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"]))
floor_pct = b.get("planner_discharge_floor_percent")
buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh")
relax_prewin = b.get("planner_discharge_relax_prewindow_slots")
battery = SimpleNamespace(
usable_capacity_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
arb_floor_wh=float(b["arb_floor_wh"]),
reserve_soc_wh=float(b["reserve_soc_wh"]),
soc_max_wh=planner_soc_max,
charge_efficiency=float(b["charge_efficiency"]),
discharge_efficiency=float(b["discharge_efficiency"]),
degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]),
max_charge_power_w=ec_i,
max_discharge_power_w=ed_i,
charge_slot_buffer=float(b["charge_slot_buffer"])
if b.get("charge_slot_buffer") is not None
else 0,
discharge_slot_buffer=float(b["discharge_slot_buffer"])
if b.get("discharge_slot_buffer") is not None
else 0,
planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0,
planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None,
planner_discharge_relax_prewindow_slots=int(relax_prewin)
if relax_prewin is not None
else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]),
planner_daytime_charge_target_enabled=bool(
b.get("planner_daytime_charge_target_enabled", True)
),
planner_night_baseload_buffer_percent=float(
b.get("planner_night_baseload_buffer_percent") or 20.0
),
planner_daytime_charge_price_quantile=float(
b.get("planner_daytime_charge_price_quantile") or 0.70
),
planner_charge_commitment_penalty_czk_kwh=float(
b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20
),
planner_neg_sell_prep_soc_percent=float(
b.get("planner_neg_sell_prep_soc_percent") or 80.0
),
planner_neg_sell_full_soc_tail_slots=int(
b.get("planner_neg_sell_full_soc_tail_slots") or 4
),
planner_safety_soc_risk_factor=float(
b.get("planner_safety_soc_risk_factor") or 0.0
),
planner_pv_risk_frontload_czk_kwh=float(
b.get("planner_pv_risk_frontload_czk_kwh") or 0.0
),
planner_neg_sell_vent_min_sell_czk_kwh=(
float(b["planner_neg_sell_vent_min_sell_czk_kwh"])
if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None
else None
),
)
hpj = ctx["heat_pump"]
heat_pump = SimpleNamespace(
rated_heating_power_w=int(hpj["rated_heating_power_w"]),
tuv_min_temp_c=float(hpj["tuv_min_temp_c"]),
tuv_target_temp_c=float(hpj["tuv_target_temp_c"]),
)
g = ctx["grid"]
m = ctx.get("market") or {}
grid = SimpleNamespace(
max_import_power_w=int(g["max_import_power_w"]),
max_export_power_w=int(g["max_export_power_w"]),
block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False),
deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False),
purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(),
sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(),
)
vehicles: list[SimpleNamespace] = []
for v in ctx.get("vehicles") or []:
vehicles.append(
SimpleNamespace(
max_charge_power_w=int(v["max_charge_power_w"]),
min_power_w=int(v.get("min_power_w") or 0),
battery_capacity_kwh=float(v["battery_capacity_kwh"]),
default_target_soc_pct=float(v["default_target_soc_pct"]),
)
)
while len(vehicles) < 2:
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
min_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)
)
ev_raw = ctx.get("ev_sessions") or []
ev_sessions = [
_ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None,
_ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None,
]
soc_wh = float(ctx["soc_wh"])
tuv_temp = float(ctx["tuv_temp"])
operating_mode = ctx.get("operating_mode")
tuv_stats: dict[tuple[int, int], float] = {}
for row in ctx.get("tuv_delta_stats") or []:
tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"])
return (
battery,
heat_pump,
grid,
vehicles,
ev_sessions,
soc_wh,
tuv_temp,
operating_mode,
tuv_stats,
)
async def _load_previous_plan_charge_commitment_prev_w(
site_id: int,
slots: list[PlanningSlot],
db,
) -> list[Optional[float]]:
"""
Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty.
Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky).
"""
if not slots:
return []
rows = await db.fetch(
"""
select pi.interval_start,
pi.battery_setpoint_w,
pi.grid_setpoint_w,
coalesce(pi.pv_a_forecast_solver_w, 0) as pva,
coalesce(pi.pv_b_forecast_solver_w, 0) as pvb,
coalesce(pi.load_baseline_w, 0) as lb
from ems.planning_interval pi
inner join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1::int
and pr.status = 'active'
""",
site_id,
)
by_start = {r["interval_start"]: r for r in rows}
out: list[Optional[float]] = []
for s in slots:
r = by_start.get(s.interval_start)
if r is None:
out.append(None)
continue
bw = int(r["battery_setpoint_w"] or 0)
gw = int(r["grid_setpoint_w"] or 0)
pva = int(r["pva"] or 0)
pvb = int(r["pvb"] or 0)
lb = int(r["lb"] or 0)
# Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně
# výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus.
if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500:
out.append(float(bw))
else:
out.append(None)
return out
async def _load_slots(
site_id: int,
from_dt: datetime,
to_dt: datetime,
db,
*,
soc_wh: float,
) -> list[PlanningSlot]:
"""15min sloty z ems.fn_load_planning_slots_full."""
rows = await db.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason,
charge_target_wh, pre_window_wh, in_window_wh,
charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[PlanningSlot] = []
for r in rows:
d = dict(r)
out.append(
PlanningSlot(
interval_start=d["interval_start"],
buy_price=float(d["buy_price"]),
sell_price=float(d["sell_price"]),
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
load_baseline_w=int(d["load_baseline_w"] or 0),
ev1_connected=bool(d["ev1_connected"]),
ev2_connected=bool(d["ev2_connected"]),
is_predicted_price=bool(d.get("is_predicted_price")),
allow_charge=bool(d.get("allow_charge", True)),
allow_discharge_export=bool(d.get("allow_discharge_export", True)),
night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"),
night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"),
safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"),
future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"),
future_sell_opportunity_czk_kwh=_slot_float_nullable(
d, "future_sell_opportunity_czk_kwh"
),
is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)),
charge_acquisition_buy_czk_kwh=_slot_float_nullable(
d, "charge_acquisition_buy_czk_kwh"
),
charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"),
min_buy_before_cutoff_czk_kwh=_slot_float_nullable(
d, "min_buy_before_cutoff_czk_kwh"
),
pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"),
neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"),
grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"),
charge_target_wh=_slot_float_nullable(d, "charge_target_wh"),
pre_window_wh=_slot_float_nullable(d, "pre_window_wh"),
in_window_wh=_slot_float_nullable(d, "in_window_wh"),
charge_slot_wh=_slot_float_nullable(d, "charge_slot_wh"),
charge_cum_wh=_slot_float_nullable(d, "charge_cum_wh"),
charge_layer=d.get("charge_layer"),
charge_slot_reason=d.get("charge_slot_reason"),
)
)
if not out:
raise RuntimeError(
"No planning slots available check market prices and horizon settings"
)
if any(s.is_predicted_price for s in out):
logger.warning(
"[site=%s] Unexpected predicted-price slots in planning horizon",
site_id,
)
return out
def _build_slot_inputs(
slots_raw_pv: list[PlanningSlot],
slots_solver: list[PlanningSlot],
) -> list[tuple[int, int, int, int, int]]:
"""(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot."""
if len(slots_raw_pv) != len(slots_solver):
raise ValueError("slots_raw_pv and slots_solver length mismatch")
out: list[tuple[int, int, int, int, int]] = []
for raw, sol in zip(slots_raw_pv, slots_solver):
out.append(
(
int(raw.load_baseline_w),
int(raw.pv_a_forecast_w),
int(raw.pv_b_forecast_w),
int(sol.pv_a_forecast_w),
int(sol.pv_b_forecast_w),
)
)
return out
async def _save_planning_run(
site_id, results, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction, db,
slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None,
*,
activate_run: bool = True,
solver_snapshot: Optional[dict[str, Any]] = None,
) -> int:
"""Uloží výsledky solveru přes ems.fn_planning_run_commit."""
if slot_inputs is not None and len(slot_inputs) != len(results):
raise ValueError("slot_inputs and results length mismatch")
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": duration_ms,
"forecast_correction_factor": correction,
}
if solver_snapshot is not None:
run_meta["solver_params"] = solver_snapshot
intervals: list[dict] = []
for i, r in enumerate(results):
row: dict = {
"interval_start": r.interval_start.isoformat()
if hasattr(r.interval_start, "isoformat")
else r.interval_start,
"battery_setpoint_w": r.battery_setpoint_w,
"battery_soc_target_pct": r.battery_soc_target,
"grid_setpoint_w": r.grid_setpoint_w,
"export_limit_w": r.export_limit_w,
"export_mode": r.export_mode,
"deye_physical_mode": r.deye_physical_mode,
"deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled,
"ev1_setpoint_w": r.ev1_setpoint_w,
"ev2_setpoint_w": r.ev2_setpoint_w,
"ev1_via_bat_w": r.ev1_via_bat_w,
"ev2_via_bat_w": r.ev2_via_bat_w,
"heat_pump_enabled": r.heat_pump_enabled,
"heat_pump_setpoint_w": r.heat_pump_setpoint_w,
"pv_a_curtailed_w": r.pv_a_curtailed_w,
"expected_cost_czk": float(r.expected_cost_czk),
"cashflow_czk": float(r.cashflow_czk),
"battery_arbitrage_czk": float(r.battery_arbitrage_czk),
"penalty_czk": float(r.penalty_czk),
"green_bonus_czk": float(r.green_bonus_czk),
"effective_buy_price": float(r.effective_buy_price),
"effective_sell_price": float(r.effective_sell_price),
"is_predicted_price": r.is_predicted_price,
}
if slot_inputs is not None:
si = slot_inputs[i]
row["load_baseline_w"] = si[0]
row["pv_a_forecast_raw_w"] = si[1]
row["pv_b_forecast_raw_w"] = si[2]
row["pv_a_forecast_solver_w"] = si[3]
row["pv_b_forecast_solver_w"] = si[4]
intervals.append(row)
return int(
await db.fetchval(
"""
select ems.fn_planning_run_commit(
$1::int, $2::timestamptz, $3::timestamptz,
$4::jsonb, $5::jsonb, $6::boolean
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
json.dumps(intervals, default=str),
activate_run,
)
)
async def _save_failed_planning_run(
site_id: int,
horizon_from: datetime,
horizon_to: datetime,
*,
run_type: str,
triggered_by: str,
replan_from: datetime | None,
soc_wh: float,
correction: float,
db,
error: PlannerSolverError,
slot_count: int | None = None,
) -> int:
"""Uloží neúspěšný běh plánovače (status=failed); aktivní plán nemění."""
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": 0,
"forecast_correction_factor": correction,
"error_text": str(error),
"solver_params": {
"status": "failed",
"planner_build_tag": PLANNER_BUILD_TAG,
"solver_status": error.solver_status,
"relax_chain": error.relax_chain,
"slot_count": slot_count,
},
}
run_id = int(
await db.fetchval(
"""
select ems.fn_planning_run_fail(
$1::int, $2::timestamptz, $3::timestamptz, $4::jsonb
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
)
)
logger.error(
"[site=%s] Planning solver failed run_id=%s: %s relax_chain=%s",
site_id,
run_id,
error,
error.relax_chain,
)
return run_id