Files
ems/backend/services/planning/db_io.py
Dusan Vojacek d81a150014 fix(planner): EV session viditelna i bez deadline / nad targetem (BUG2)
Zivy incident home-01: aktivni plan mel ev_sessions:0, ac session bezela
(target 70 %). Planovac neviděl ~6 kW zatez auta a spatne rozvrhl baterii
(zbytecny vecerni import).

Root cause (dve pasti):
- fn_planning_site_context vracela session jako null, kdyz needed_wh=0
  (auto nad targetem) i kdyz target_deadline is null.
- _ev_session_from_json (Python) zahazovala session bez deadline.

Fix:
- R__038 fn_ev_session_planning_json: session se vyradi (null) JEN bez tvrdych
  dat (kapacita vozidla / soc_at_connect). target_deadline smi byt NULL --
  solver hard deadline constraint aplikuje jen pri needed>0; oportunisticka
  vrstva bezi i bez deadline. Auto nad targetem zustava v planu jako znama
  zatez i s headroomem k levnemu doplneni. R__039 vola helper (deduplikace
  dvou inline poddotazu, SQL-first).
- _ev_session_from_json si NULL deadline ponecha (energy_needed_wh default 0).
- testy test_ev_session_parse.py; docs ev-charging + planning-changelog;
  CLAUDE.md funkce.

Navrh agresivnejsiho oportunistickeho algoritmu (P50 levnych oken z
market_price_stats misto konstanty 1 Kc/kWh) -- NEnasazeno, k rozhodnuti,
sepsano v docs/04-modules/planning.md (EV oportunismus); riziko regrese
golden ekonomiky, nutny EV fixture + eval.

Overeni: pytest -q 362 passed; golden replay gate 7 passed; solver_v2_eval
beze zmeny (fixtures bez EV session).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-13 22:03:27 +02:00

464 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning/db_io.py
#
# EMS plánovač DB vrstva: načtení site contextu a slotů, uložení běhu
# (Fáze 1 dekompozice, čistý přesun z planning_engine.py).
# Jediné SQL: select ems.fn_* (SQL-first pravidlo CLAUDE.md).
import json
import logging
from datetime import datetime
from types import SimpleNamespace
from typing import Any, Optional
from services.planning.constants import (
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
PLANNER_BUILD_TAG,
)
from services.planning.types import (
PlannerSolverError,
PlanningSlot,
_parse_json_dt,
_slot_float_nullable,
)
logger = logging.getLogger(__name__)
def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]:
if obj is None or obj == []:
return None
if isinstance(obj, str):
obj = json.loads(obj)
if not isinstance(obj, dict):
return None
# target_deadline SMÍ být None: oportunistická session (auto nad targetem,
# nebo bez nastaveného cíle) zůstává v plánu kvůli headroomu i jako známá
# zátěž. Tvrdý deadline constraint se aplikuje jen při energy_needed_wh > 0
# (a needed > 0 nastane jen s deadlinem). Dřív se taková session zahazovala
# (None) a plánovač pak neviděl zátěž auta — bug 2026-06-13.
td = _parse_json_dt(obj.get("target_deadline"))
return SimpleNamespace(
target_deadline=td,
energy_needed_wh=float(obj.get("energy_needed_wh") or 0.0),
headroom_wh=float(obj.get("headroom_wh") or 0.0),
opportunistic_value_czk_kwh=float(obj.get("opportunistic_value_czk_kwh") or 0.0),
)
async def _load_site_context(site_id: int, db):
"""
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL).
"""
raw = await db.fetchval(
"select ems.fn_planning_site_context($1::int)",
site_id,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise RuntimeError(f"Site not found: {site_id}")
b = ctx["battery"]
ec_i = int(b["max_charge_power_w"])
ed_i = int(b["max_discharge_power_w"])
planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"]))
floor_pct = b.get("planner_discharge_floor_percent")
buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh")
relax_prewin = b.get("planner_discharge_relax_prewindow_slots")
battery = SimpleNamespace(
usable_capacity_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
arb_floor_wh=float(b["arb_floor_wh"]),
reserve_soc_wh=float(b["reserve_soc_wh"]),
soc_max_wh=planner_soc_max,
charge_efficiency=float(b["charge_efficiency"]),
discharge_efficiency=float(b["discharge_efficiency"]),
degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]),
max_charge_power_w=ec_i,
max_discharge_power_w=ed_i,
charge_slot_buffer=float(b["charge_slot_buffer"])
if b.get("charge_slot_buffer") is not None
else 0,
discharge_slot_buffer=float(b["discharge_slot_buffer"])
if b.get("discharge_slot_buffer") is not None
else 0,
planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0,
planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None,
planner_discharge_relax_prewindow_slots=int(relax_prewin)
if relax_prewin is not None
else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]),
planner_daytime_charge_target_enabled=bool(
b.get("planner_daytime_charge_target_enabled", True)
),
planner_night_baseload_buffer_percent=float(
b.get("planner_night_baseload_buffer_percent") or 20.0
),
planner_daytime_charge_price_quantile=float(
b.get("planner_daytime_charge_price_quantile") or 0.70
),
planner_charge_commitment_penalty_czk_kwh=float(
b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20
),
planner_neg_sell_prep_soc_percent=float(
b.get("planner_neg_sell_prep_soc_percent") or 80.0
),
planner_neg_sell_full_soc_tail_slots=int(
b.get("planner_neg_sell_full_soc_tail_slots") or 4
),
planner_safety_soc_risk_factor=float(
b.get("planner_safety_soc_risk_factor") or 0.0
),
planner_pv_risk_frontload_czk_kwh=float(
b.get("planner_pv_risk_frontload_czk_kwh") or 0.0
),
planner_neg_sell_vent_min_sell_czk_kwh=(
float(b["planner_neg_sell_vent_min_sell_czk_kwh"])
if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None
else None
),
)
hpj = ctx["heat_pump"]
heat_pump = SimpleNamespace(
rated_heating_power_w=int(hpj["rated_heating_power_w"]),
tuv_min_temp_c=float(hpj["tuv_min_temp_c"]),
tuv_target_temp_c=float(hpj["tuv_target_temp_c"]),
)
g = ctx["grid"]
m = ctx.get("market") or {}
grid = SimpleNamespace(
max_import_power_w=int(g["max_import_power_w"]),
max_export_power_w=int(g["max_export_power_w"]),
block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False),
deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False),
purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(),
sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(),
)
vehicles: list[SimpleNamespace] = []
for v in ctx.get("vehicles") or []:
vehicles.append(
SimpleNamespace(
max_charge_power_w=int(v["max_charge_power_w"]),
min_power_w=int(v.get("min_power_w") or 0),
battery_capacity_kwh=float(v["battery_capacity_kwh"]),
default_target_soc_pct=float(v["default_target_soc_pct"]),
)
)
while len(vehicles) < 2:
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
min_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)
)
ev_raw = ctx.get("ev_sessions") or []
ev_sessions = [
_ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None,
_ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None,
]
soc_wh = float(ctx["soc_wh"])
tuv_temp = float(ctx["tuv_temp"])
operating_mode = ctx.get("operating_mode")
tuv_stats: dict[tuple[int, int], float] = {}
for row in ctx.get("tuv_delta_stats") or []:
tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"])
return (
battery,
heat_pump,
grid,
vehicles,
ev_sessions,
soc_wh,
tuv_temp,
operating_mode,
tuv_stats,
)
async def _load_previous_plan_charge_commitment_prev_w(
site_id: int,
slots: list[PlanningSlot],
db,
) -> list[Optional[float]]:
"""
Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty.
Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky).
"""
if not slots:
return []
rows = await db.fetch(
"""
select pi.interval_start,
pi.battery_setpoint_w,
pi.grid_setpoint_w,
coalesce(pi.pv_a_forecast_solver_w, 0) as pva,
coalesce(pi.pv_b_forecast_solver_w, 0) as pvb,
coalesce(pi.load_baseline_w, 0) as lb
from ems.planning_interval pi
inner join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1::int
and pr.status = 'active'
""",
site_id,
)
by_start = {r["interval_start"]: r for r in rows}
out: list[Optional[float]] = []
for s in slots:
r = by_start.get(s.interval_start)
if r is None:
out.append(None)
continue
bw = int(r["battery_setpoint_w"] or 0)
gw = int(r["grid_setpoint_w"] or 0)
pva = int(r["pva"] or 0)
pvb = int(r["pvb"] or 0)
lb = int(r["lb"] or 0)
# Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně
# výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus.
if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500:
out.append(float(bw))
else:
out.append(None)
return out
async def _load_slots(
site_id: int,
from_dt: datetime,
to_dt: datetime,
db,
*,
soc_wh: float,
) -> list[PlanningSlot]:
"""15min sloty z ems.fn_load_planning_slots_full."""
rows = await db.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason,
charge_target_wh, pre_window_wh, in_window_wh,
charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[PlanningSlot] = []
for r in rows:
d = dict(r)
out.append(
PlanningSlot(
interval_start=d["interval_start"],
buy_price=float(d["buy_price"]),
sell_price=float(d["sell_price"]),
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
load_baseline_w=int(d["load_baseline_w"] or 0),
ev1_connected=bool(d["ev1_connected"]),
ev2_connected=bool(d["ev2_connected"]),
is_predicted_price=bool(d.get("is_predicted_price")),
allow_charge=bool(d.get("allow_charge", True)),
allow_discharge_export=bool(d.get("allow_discharge_export", True)),
night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"),
night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"),
safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"),
future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"),
future_sell_opportunity_czk_kwh=_slot_float_nullable(
d, "future_sell_opportunity_czk_kwh"
),
is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)),
charge_acquisition_buy_czk_kwh=_slot_float_nullable(
d, "charge_acquisition_buy_czk_kwh"
),
charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"),
min_buy_before_cutoff_czk_kwh=_slot_float_nullable(
d, "min_buy_before_cutoff_czk_kwh"
),
pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"),
neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"),
grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"),
charge_target_wh=_slot_float_nullable(d, "charge_target_wh"),
pre_window_wh=_slot_float_nullable(d, "pre_window_wh"),
in_window_wh=_slot_float_nullable(d, "in_window_wh"),
charge_slot_wh=_slot_float_nullable(d, "charge_slot_wh"),
charge_cum_wh=_slot_float_nullable(d, "charge_cum_wh"),
charge_layer=d.get("charge_layer"),
charge_slot_reason=d.get("charge_slot_reason"),
)
)
if not out:
raise RuntimeError(
"No planning slots available check market prices and horizon settings"
)
if any(s.is_predicted_price for s in out):
logger.warning(
"[site=%s] Unexpected predicted-price slots in planning horizon",
site_id,
)
return out
def _build_slot_inputs(
slots_raw_pv: list[PlanningSlot],
slots_solver: list[PlanningSlot],
) -> list[tuple[int, int, int, int, int]]:
"""(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot."""
if len(slots_raw_pv) != len(slots_solver):
raise ValueError("slots_raw_pv and slots_solver length mismatch")
out: list[tuple[int, int, int, int, int]] = []
for raw, sol in zip(slots_raw_pv, slots_solver):
out.append(
(
int(raw.load_baseline_w),
int(raw.pv_a_forecast_w),
int(raw.pv_b_forecast_w),
int(sol.pv_a_forecast_w),
int(sol.pv_b_forecast_w),
)
)
return out
async def _save_planning_run(
site_id, results, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction, db,
slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None,
*,
activate_run: bool = True,
solver_snapshot: Optional[dict[str, Any]] = None,
) -> int:
"""Uloží výsledky solveru přes ems.fn_planning_run_commit."""
if slot_inputs is not None and len(slot_inputs) != len(results):
raise ValueError("slot_inputs and results length mismatch")
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": duration_ms,
"forecast_correction_factor": correction,
}
if solver_snapshot is not None:
run_meta["solver_params"] = solver_snapshot
intervals: list[dict] = []
for i, r in enumerate(results):
row: dict = {
"interval_start": r.interval_start.isoformat()
if hasattr(r.interval_start, "isoformat")
else r.interval_start,
"battery_setpoint_w": r.battery_setpoint_w,
"battery_soc_target_pct": r.battery_soc_target,
"grid_setpoint_w": r.grid_setpoint_w,
"export_limit_w": r.export_limit_w,
"export_mode": r.export_mode,
"deye_physical_mode": r.deye_physical_mode,
"deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled,
"ev1_setpoint_w": r.ev1_setpoint_w,
"ev2_setpoint_w": r.ev2_setpoint_w,
"ev1_via_bat_w": r.ev1_via_bat_w,
"ev2_via_bat_w": r.ev2_via_bat_w,
"heat_pump_enabled": r.heat_pump_enabled,
"heat_pump_setpoint_w": r.heat_pump_setpoint_w,
"pv_a_curtailed_w": r.pv_a_curtailed_w,
"expected_cost_czk": float(r.expected_cost_czk),
"cashflow_czk": float(r.cashflow_czk),
"battery_arbitrage_czk": float(r.battery_arbitrage_czk),
"penalty_czk": float(r.penalty_czk),
"green_bonus_czk": float(r.green_bonus_czk),
"effective_buy_price": float(r.effective_buy_price),
"effective_sell_price": float(r.effective_sell_price),
"is_predicted_price": r.is_predicted_price,
}
if slot_inputs is not None:
si = slot_inputs[i]
row["load_baseline_w"] = si[0]
row["pv_a_forecast_raw_w"] = si[1]
row["pv_b_forecast_raw_w"] = si[2]
row["pv_a_forecast_solver_w"] = si[3]
row["pv_b_forecast_solver_w"] = si[4]
intervals.append(row)
return int(
await db.fetchval(
"""
select ems.fn_planning_run_commit(
$1::int, $2::timestamptz, $3::timestamptz,
$4::jsonb, $5::jsonb, $6::boolean
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
json.dumps(intervals, default=str),
activate_run,
)
)
async def _save_failed_planning_run(
site_id: int,
horizon_from: datetime,
horizon_to: datetime,
*,
run_type: str,
triggered_by: str,
replan_from: datetime | None,
soc_wh: float,
correction: float,
db,
error: PlannerSolverError,
slot_count: int | None = None,
) -> int:
"""Uloží neúspěšný běh plánovače (status=failed); aktivní plán nemění."""
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": 0,
"forecast_correction_factor": correction,
"error_text": str(error),
"solver_params": {
"status": "failed",
"planner_build_tag": PLANNER_BUILD_TAG,
"solver_status": error.solver_status,
"relax_chain": error.relax_chain,
"slot_count": slot_count,
},
}
run_id = int(
await db.fetchval(
"""
select ems.fn_planning_run_fail(
$1::int, $2::timestamptz, $3::timestamptz, $4::jsonb
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
)
)
logger.error(
"[site=%s] Planning solver failed run_id=%s: %s relax_chain=%s",
site_id,
run_id,
error,
error.relax_chain,
)
return run_id