Files
ems/backend/services/planning/db_io.py
Dusan Vojacek dcbb5de98c Fáze 1.3+1.4: extrakce forecast korekce a DB vrstvy plánovače
- services/planning/forecast.py: compute_correction_factor, apply_forecast_correction
- services/planning/db_io.py: _ev_session_from_json, _load_site_context,
  _load_previous_plan_charge_commitment_prev_w, _load_slots, _build_slot_inputs,
  _save_planning_run, _save_failed_planning_run
- .claude/settings.json: projektový allowlist (autonomní běhy bez promptů)

Fasáda beze změny chování; golden 5/5, baseline faily beze změny.
planning_engine.py: 6345 → 5717 řádků.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 12:39:55 +02:00

451 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning/db_io.py
#
# EMS plánovač DB vrstva: načtení site contextu a slotů, uložení běhu
# (Fáze 1 dekompozice, čistý přesun z planning_engine.py).
# Jediné SQL: select ems.fn_* (SQL-first pravidlo CLAUDE.md).
import json
import logging
from datetime import datetime
from types import SimpleNamespace
from typing import Any, Optional
from services.planning.constants import (
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
PLANNER_BUILD_TAG,
)
from services.planning.types import (
PlannerSolverError,
PlanningSlot,
_parse_json_dt,
_slot_float_nullable,
)
logger = logging.getLogger(__name__)
def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]:
if obj is None or obj == []:
return None
if isinstance(obj, str):
obj = json.loads(obj)
if not isinstance(obj, dict):
return None
td = _parse_json_dt(obj.get("target_deadline"))
if td is None:
return None
return SimpleNamespace(
target_deadline=td,
energy_needed_wh=float(obj["energy_needed_wh"]),
)
async def _load_site_context(site_id: int, db):
"""
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL).
"""
raw = await db.fetchval(
"select ems.fn_planning_site_context($1::int)",
site_id,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise RuntimeError(f"Site not found: {site_id}")
b = ctx["battery"]
ec_i = int(b["max_charge_power_w"])
ed_i = int(b["max_discharge_power_w"])
planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"]))
floor_pct = b.get("planner_discharge_floor_percent")
buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh")
relax_prewin = b.get("planner_discharge_relax_prewindow_slots")
battery = SimpleNamespace(
usable_capacity_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
arb_floor_wh=float(b["arb_floor_wh"]),
reserve_soc_wh=float(b["reserve_soc_wh"]),
soc_max_wh=planner_soc_max,
charge_efficiency=float(b["charge_efficiency"]),
discharge_efficiency=float(b["discharge_efficiency"]),
degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]),
max_charge_power_w=ec_i,
max_discharge_power_w=ed_i,
charge_slot_buffer=float(b["charge_slot_buffer"])
if b.get("charge_slot_buffer") is not None
else 0,
discharge_slot_buffer=float(b["discharge_slot_buffer"])
if b.get("discharge_slot_buffer") is not None
else 0,
planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0,
planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None,
planner_discharge_relax_prewindow_slots=int(relax_prewin)
if relax_prewin is not None
else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]),
planner_daytime_charge_target_enabled=bool(
b.get("planner_daytime_charge_target_enabled", True)
),
planner_night_baseload_buffer_percent=float(
b.get("planner_night_baseload_buffer_percent") or 20.0
),
planner_daytime_charge_price_quantile=float(
b.get("planner_daytime_charge_price_quantile") or 0.70
),
planner_charge_commitment_penalty_czk_kwh=float(
b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20
),
planner_neg_sell_prep_soc_percent=float(
b.get("planner_neg_sell_prep_soc_percent") or 80.0
),
planner_neg_sell_full_soc_tail_slots=int(
b.get("planner_neg_sell_full_soc_tail_slots") or 4
),
planner_neg_sell_vent_min_sell_czk_kwh=(
float(b["planner_neg_sell_vent_min_sell_czk_kwh"])
if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None
else None
),
)
hpj = ctx["heat_pump"]
heat_pump = SimpleNamespace(
rated_heating_power_w=int(hpj["rated_heating_power_w"]),
tuv_min_temp_c=float(hpj["tuv_min_temp_c"]),
tuv_target_temp_c=float(hpj["tuv_target_temp_c"]),
)
g = ctx["grid"]
m = ctx.get("market") or {}
grid = SimpleNamespace(
max_import_power_w=int(g["max_import_power_w"]),
max_export_power_w=int(g["max_export_power_w"]),
block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False),
deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False),
purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(),
sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(),
)
vehicles: list[SimpleNamespace] = []
for v in ctx.get("vehicles") or []:
vehicles.append(
SimpleNamespace(
max_charge_power_w=int(v["max_charge_power_w"]),
battery_capacity_kwh=float(v["battery_capacity_kwh"]),
default_target_soc_pct=float(v["default_target_soc_pct"]),
)
)
while len(vehicles) < 2:
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)
)
ev_raw = ctx.get("ev_sessions") or []
ev_sessions = [
_ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None,
_ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None,
]
soc_wh = float(ctx["soc_wh"])
tuv_temp = float(ctx["tuv_temp"])
operating_mode = ctx.get("operating_mode")
tuv_stats: dict[tuple[int, int], float] = {}
for row in ctx.get("tuv_delta_stats") or []:
tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"])
return (
battery,
heat_pump,
grid,
vehicles,
ev_sessions,
soc_wh,
tuv_temp,
operating_mode,
tuv_stats,
)
async def _load_previous_plan_charge_commitment_prev_w(
site_id: int,
slots: list[PlanningSlot],
db,
) -> list[Optional[float]]:
"""
Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty.
Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky).
"""
if not slots:
return []
rows = await db.fetch(
"""
select pi.interval_start,
pi.battery_setpoint_w,
pi.grid_setpoint_w,
coalesce(pi.pv_a_forecast_solver_w, 0) as pva,
coalesce(pi.pv_b_forecast_solver_w, 0) as pvb,
coalesce(pi.load_baseline_w, 0) as lb
from ems.planning_interval pi
inner join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1::int
and pr.status = 'active'
""",
site_id,
)
by_start = {r["interval_start"]: r for r in rows}
out: list[Optional[float]] = []
for s in slots:
r = by_start.get(s.interval_start)
if r is None:
out.append(None)
continue
bw = int(r["battery_setpoint_w"] or 0)
gw = int(r["grid_setpoint_w"] or 0)
pva = int(r["pva"] or 0)
pvb = int(r["pvb"] or 0)
lb = int(r["lb"] or 0)
# Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně
# výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus.
if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500:
out.append(float(bw))
else:
out.append(None)
return out
async def _load_slots(
site_id: int,
from_dt: datetime,
to_dt: datetime,
db,
*,
soc_wh: float,
) -> list[PlanningSlot]:
"""15min sloty z ems.fn_load_planning_slots_full."""
rows = await db.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason,
charge_target_wh, pre_window_wh, in_window_wh,
charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[PlanningSlot] = []
for r in rows:
d = dict(r)
out.append(
PlanningSlot(
interval_start=d["interval_start"],
buy_price=float(d["buy_price"]),
sell_price=float(d["sell_price"]),
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
load_baseline_w=int(d["load_baseline_w"] or 0),
ev1_connected=bool(d["ev1_connected"]),
ev2_connected=bool(d["ev2_connected"]),
is_predicted_price=bool(d.get("is_predicted_price")),
allow_charge=bool(d.get("allow_charge", True)),
allow_discharge_export=bool(d.get("allow_discharge_export", True)),
night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"),
night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"),
safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"),
future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"),
future_sell_opportunity_czk_kwh=_slot_float_nullable(
d, "future_sell_opportunity_czk_kwh"
),
is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)),
charge_acquisition_buy_czk_kwh=_slot_float_nullable(
d, "charge_acquisition_buy_czk_kwh"
),
charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"),
min_buy_before_cutoff_czk_kwh=_slot_float_nullable(
d, "min_buy_before_cutoff_czk_kwh"
),
pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"),
neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"),
grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"),
charge_target_wh=_slot_float_nullable(d, "charge_target_wh"),
pre_window_wh=_slot_float_nullable(d, "pre_window_wh"),
in_window_wh=_slot_float_nullable(d, "in_window_wh"),
charge_slot_wh=_slot_float_nullable(d, "charge_slot_wh"),
charge_cum_wh=_slot_float_nullable(d, "charge_cum_wh"),
charge_layer=d.get("charge_layer"),
charge_slot_reason=d.get("charge_slot_reason"),
)
)
if not out:
raise RuntimeError(
"No planning slots available check market prices and horizon settings"
)
if any(s.is_predicted_price for s in out):
logger.warning(
"[site=%s] Unexpected predicted-price slots in planning horizon",
site_id,
)
return out
def _build_slot_inputs(
slots_raw_pv: list[PlanningSlot],
slots_solver: list[PlanningSlot],
) -> list[tuple[int, int, int, int, int]]:
"""(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot."""
if len(slots_raw_pv) != len(slots_solver):
raise ValueError("slots_raw_pv and slots_solver length mismatch")
out: list[tuple[int, int, int, int, int]] = []
for raw, sol in zip(slots_raw_pv, slots_solver):
out.append(
(
int(raw.load_baseline_w),
int(raw.pv_a_forecast_w),
int(raw.pv_b_forecast_w),
int(sol.pv_a_forecast_w),
int(sol.pv_b_forecast_w),
)
)
return out
async def _save_planning_run(
site_id, results, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction, db,
slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None,
*,
activate_run: bool = True,
solver_snapshot: Optional[dict[str, Any]] = None,
) -> int:
"""Uloží výsledky solveru přes ems.fn_planning_run_commit."""
if slot_inputs is not None and len(slot_inputs) != len(results):
raise ValueError("slot_inputs and results length mismatch")
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": duration_ms,
"forecast_correction_factor": correction,
}
if solver_snapshot is not None:
run_meta["solver_params"] = solver_snapshot
intervals: list[dict] = []
for i, r in enumerate(results):
row: dict = {
"interval_start": r.interval_start.isoformat()
if hasattr(r.interval_start, "isoformat")
else r.interval_start,
"battery_setpoint_w": r.battery_setpoint_w,
"battery_soc_target_pct": r.battery_soc_target,
"grid_setpoint_w": r.grid_setpoint_w,
"export_limit_w": r.export_limit_w,
"export_mode": r.export_mode,
"deye_physical_mode": r.deye_physical_mode,
"deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled,
"ev1_setpoint_w": r.ev1_setpoint_w,
"ev2_setpoint_w": r.ev2_setpoint_w,
"ev1_via_bat_w": r.ev1_via_bat_w,
"ev2_via_bat_w": r.ev2_via_bat_w,
"heat_pump_enabled": r.heat_pump_enabled,
"heat_pump_setpoint_w": r.heat_pump_setpoint_w,
"pv_a_curtailed_w": r.pv_a_curtailed_w,
"expected_cost_czk": float(r.expected_cost_czk),
"cashflow_czk": float(r.cashflow_czk),
"battery_arbitrage_czk": float(r.battery_arbitrage_czk),
"penalty_czk": float(r.penalty_czk),
"green_bonus_czk": float(r.green_bonus_czk),
"effective_buy_price": float(r.effective_buy_price),
"effective_sell_price": float(r.effective_sell_price),
"is_predicted_price": r.is_predicted_price,
}
if slot_inputs is not None:
si = slot_inputs[i]
row["load_baseline_w"] = si[0]
row["pv_a_forecast_raw_w"] = si[1]
row["pv_b_forecast_raw_w"] = si[2]
row["pv_a_forecast_solver_w"] = si[3]
row["pv_b_forecast_solver_w"] = si[4]
intervals.append(row)
return int(
await db.fetchval(
"""
select ems.fn_planning_run_commit(
$1::int, $2::timestamptz, $3::timestamptz,
$4::jsonb, $5::jsonb, $6::boolean
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
json.dumps(intervals, default=str),
activate_run,
)
)
async def _save_failed_planning_run(
site_id: int,
horizon_from: datetime,
horizon_to: datetime,
*,
run_type: str,
triggered_by: str,
replan_from: datetime | None,
soc_wh: float,
correction: float,
db,
error: PlannerSolverError,
slot_count: int | None = None,
) -> int:
"""Uloží neúspěšný běh plánovače (status=failed); aktivní plán nemění."""
run_meta: dict[str, Any] = {
"run_type": run_type,
"triggered_by": triggered_by,
"replan_from": replan_from.isoformat() if replan_from else None,
"soc_at_replan_wh": soc_wh,
"solver_duration_ms": 0,
"forecast_correction_factor": correction,
"error_text": str(error),
"solver_params": {
"status": "failed",
"planner_build_tag": PLANNER_BUILD_TAG,
"solver_status": error.solver_status,
"relax_chain": error.relax_chain,
"slot_count": slot_count,
},
}
run_id = int(
await db.fetchval(
"""
select ems.fn_planning_run_fail(
$1::int, $2::timestamptz, $3::timestamptz, $4::jsonb
)
""",
site_id,
horizon_from,
horizon_to,
json.dumps(run_meta, default=str),
)
)
logger.error(
"[site=%s] Planning solver failed run_id=%s: %s relax_chain=%s",
site_id,
run_id,
error,
error.relax_chain,
)
return run_id