From dcbb5de98c54fc58d416e435497757c7217ef580 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 12:39:55 +0200 Subject: [PATCH] =?UTF-8?q?F=C3=A1ze=201.3+1.4:=20extrakce=20forecast=20ko?= =?UTF-8?q?rekce=20a=20DB=20vrstvy=20pl=C3=A1nova=C4=8De?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - services/planning/forecast.py: compute_correction_factor, apply_forecast_correction - services/planning/db_io.py: _ev_session_from_json, _load_site_context, _load_previous_plan_charge_commitment_prev_w, _load_slots, _build_slot_inputs, _save_planning_run, _save_failed_planning_run - .claude/settings.json: projektový allowlist (autonomní běhy bez promptů) Fasáda beze změny chování; golden 5/5, baseline faily beze změny. planning_engine.py: 6345 → 5717 řádků. Co-Authored-By: Claude Opus 4.8 (1M context) --- .claude/settings.json | 63 ++++ backend/services/planning/db_io.py | 450 +++++++++++++++++++++++ backend/services/planning/forecast.py | 97 +++++ backend/services/planning_engine.py | 505 +------------------------- 4 files changed, 623 insertions(+), 492 deletions(-) create mode 100644 .claude/settings.json create mode 100644 backend/services/planning/db_io.py create mode 100644 backend/services/planning/forecast.py diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..f8ea1b9 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,63 @@ +{ + "permissions": { + "defaultMode": "acceptEdits", + "allow": [ + "Read", + "Edit", + "Write", + "Glob", + "Grep", + "mcp__postgres-ems__query", + "Skill(update-config)", + "Bash(claude mcp *)", + "Bash(python3 *)", + "Bash(python *)", + "Bash(pytest *)", + "Bash(EMS_DB_DSN=*)", + "Bash(GOLDEN_UPDATE=*)", + "Bash(git *)", + "Bash(ls *)", + "Bash(ls)", + "Bash(cat *)", + "Bash(cat)", + "Bash(grep *)", + "Bash(rg *)", + "Bash(find *)", + "Bash(sed *)", + "Bash(awk *)", + "Bash(head *)", + "Bash(tail *)", + "Bash(wc *)", + "Bash(sort *)", + "Bash(uniq *)", + "Bash(diff *)", + "Bash(du *)", + "Bash(mkdir *)", + "Bash(cp *)", + "Bash(mv *)", + "Bash(touch *)", + "Bash(echo *)", + "Bash(which *)", + "Bash(pwd)", + "Bash(cd *)", + "Bash(export *)", + "Bash(env *)", + "Bash(docker ps*)", + "Bash(docker logs *)", + "Bash(jq *)", + "Bash(curl http://localhost*)", + "Bash(curl http://127.0.0.1*)" + ], + "ask": [ + "Bash(git push*)", + "Bash(docker compose down*)", + "Bash(docker compose rm*)" + ], + "deny": [ + "Bash(rm -rf /*)", + "Bash(rm -rf ~*)", + "Bash(git reset --hard*)", + "Bash(git clean*)" + ] + } +} diff --git a/backend/services/planning/db_io.py b/backend/services/planning/db_io.py new file mode 100644 index 0000000..5d86f19 --- /dev/null +++ b/backend/services/planning/db_io.py @@ -0,0 +1,450 @@ +# backend/services/planning/db_io.py +# +# EMS plánovač – DB vrstva: načtení site contextu a slotů, uložení běhu +# (Fáze 1 dekompozice, čistý přesun z planning_engine.py). +# Jediné SQL: select ems.fn_* (SQL-first pravidlo CLAUDE.md). + +import json +import logging +from datetime import datetime +from types import SimpleNamespace +from typing import Any, Optional + +from services.planning.constants import ( + DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS, + PLANNER_BUILD_TAG, +) +from services.planning.types import ( + PlannerSolverError, + PlanningSlot, + _parse_json_dt, + _slot_float_nullable, +) + +logger = logging.getLogger(__name__) + + +def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]: + if obj is None or obj == []: + return None + if isinstance(obj, str): + obj = json.loads(obj) + if not isinstance(obj, dict): + return None + td = _parse_json_dt(obj.get("target_deadline")) + if td is None: + return None + return SimpleNamespace( + target_deadline=td, + energy_needed_wh=float(obj["energy_needed_wh"]), + ) + +async def _load_site_context(site_id: int, db): + """ + Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL). + """ + raw = await db.fetchval( + "select ems.fn_planning_site_context($1::int)", + site_id, + ) + ctx = raw if isinstance(raw, dict) else json.loads(raw) + if ctx.get("error") == "unknown_site": + raise RuntimeError(f"Site not found: {site_id}") + + b = ctx["battery"] + ec_i = int(b["max_charge_power_w"]) + ed_i = int(b["max_discharge_power_w"]) + planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"])) + floor_pct = b.get("planner_discharge_floor_percent") + buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh") + relax_prewin = b.get("planner_discharge_relax_prewindow_slots") + battery = SimpleNamespace( + usable_capacity_wh=float(b["usable_capacity_wh"]), + min_soc_wh=float(b["min_soc_wh"]), + arb_floor_wh=float(b["arb_floor_wh"]), + reserve_soc_wh=float(b["reserve_soc_wh"]), + soc_max_wh=planner_soc_max, + charge_efficiency=float(b["charge_efficiency"]), + discharge_efficiency=float(b["discharge_efficiency"]), + degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]), + max_charge_power_w=ec_i, + max_discharge_power_w=ed_i, + charge_slot_buffer=float(b["charge_slot_buffer"]) + if b.get("charge_slot_buffer") is not None + else 0, + discharge_slot_buffer=float(b["discharge_slot_buffer"]) + if b.get("discharge_slot_buffer") is not None + else 0, + planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0, + planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None, + planner_discharge_relax_prewindow_slots=int(relax_prewin) + if relax_prewin is not None + else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS, + planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]), + planner_daytime_charge_target_enabled=bool( + b.get("planner_daytime_charge_target_enabled", True) + ), + planner_night_baseload_buffer_percent=float( + b.get("planner_night_baseload_buffer_percent") or 20.0 + ), + planner_daytime_charge_price_quantile=float( + b.get("planner_daytime_charge_price_quantile") or 0.70 + ), + planner_charge_commitment_penalty_czk_kwh=float( + b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20 + ), + planner_neg_sell_prep_soc_percent=float( + b.get("planner_neg_sell_prep_soc_percent") or 80.0 + ), + planner_neg_sell_full_soc_tail_slots=int( + b.get("planner_neg_sell_full_soc_tail_slots") or 4 + ), + planner_neg_sell_vent_min_sell_czk_kwh=( + float(b["planner_neg_sell_vent_min_sell_czk_kwh"]) + if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None + else None + ), + ) + + hpj = ctx["heat_pump"] + heat_pump = SimpleNamespace( + rated_heating_power_w=int(hpj["rated_heating_power_w"]), + tuv_min_temp_c=float(hpj["tuv_min_temp_c"]), + tuv_target_temp_c=float(hpj["tuv_target_temp_c"]), + ) + + g = ctx["grid"] + m = ctx.get("market") or {} + grid = SimpleNamespace( + max_import_power_w=int(g["max_import_power_w"]), + max_export_power_w=int(g["max_export_power_w"]), + block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False), + deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False), + purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(), + sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(), + ) + + vehicles: list[SimpleNamespace] = [] + for v in ctx.get("vehicles") or []: + vehicles.append( + SimpleNamespace( + max_charge_power_w=int(v["max_charge_power_w"]), + battery_capacity_kwh=float(v["battery_capacity_kwh"]), + default_target_soc_pct=float(v["default_target_soc_pct"]), + ) + ) + while len(vehicles) < 2: + vehicles.append( + SimpleNamespace( + max_charge_power_w=0, + battery_capacity_kwh=1.0, + default_target_soc_pct=80.0, + ) + ) + + ev_raw = ctx.get("ev_sessions") or [] + ev_sessions = [ + _ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None, + _ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None, + ] + + soc_wh = float(ctx["soc_wh"]) + tuv_temp = float(ctx["tuv_temp"]) + operating_mode = ctx.get("operating_mode") + + tuv_stats: dict[tuple[int, int], float] = {} + for row in ctx.get("tuv_delta_stats") or []: + tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"]) + + return ( + battery, + heat_pump, + grid, + vehicles, + ev_sessions, + soc_wh, + tuv_temp, + operating_mode, + tuv_stats, + ) + +async def _load_previous_plan_charge_commitment_prev_w( + site_id: int, + slots: list[PlanningSlot], + db, +) -> list[Optional[float]]: + """ + Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty. + Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky). + """ + if not slots: + return [] + rows = await db.fetch( + """ + select pi.interval_start, + pi.battery_setpoint_w, + pi.grid_setpoint_w, + coalesce(pi.pv_a_forecast_solver_w, 0) as pva, + coalesce(pi.pv_b_forecast_solver_w, 0) as pvb, + coalesce(pi.load_baseline_w, 0) as lb + from ems.planning_interval pi + inner join ems.planning_run pr on pr.id = pi.run_id + where pr.site_id = $1::int + and pr.status = 'active' + """, + site_id, + ) + by_start = {r["interval_start"]: r for r in rows} + out: list[Optional[float]] = [] + for s in slots: + r = by_start.get(s.interval_start) + if r is None: + out.append(None) + continue + bw = int(r["battery_setpoint_w"] or 0) + gw = int(r["grid_setpoint_w"] or 0) + pva = int(r["pva"] or 0) + pvb = int(r["pvb"] or 0) + lb = int(r["lb"] or 0) + # Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně + # výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus. + if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500: + out.append(float(bw)) + else: + out.append(None) + return out + +async def _load_slots( + site_id: int, + from_dt: datetime, + to_dt: datetime, + db, + *, + soc_wh: float, +) -> list[PlanningSlot]: + """15min sloty z ems.fn_load_planning_slots_full.""" + rows = await db.fetch( + """ + select slot_ord, interval_start, buy_price, sell_price, is_predicted_price, + pv_a_forecast_w, pv_b_forecast_w, load_baseline_w, + ev1_connected, ev2_connected, allow_charge, allow_discharge_export, + night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh, + future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh, + is_daytime_pv_surplus_slot, + charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at, + min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead, + grid_charge_suppressed_reason, + charge_target_wh, pre_window_wh, in_window_wh, + charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason + from ems.fn_load_planning_slots_full( + $1::int, $2::timestamptz, $3::timestamptz, $4::numeric + ) + """, + site_id, + from_dt, + to_dt, + soc_wh, + ) + out: list[PlanningSlot] = [] + for r in rows: + d = dict(r) + out.append( + PlanningSlot( + interval_start=d["interval_start"], + buy_price=float(d["buy_price"]), + sell_price=float(d["sell_price"]), + pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0), + pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0), + load_baseline_w=int(d["load_baseline_w"] or 0), + ev1_connected=bool(d["ev1_connected"]), + ev2_connected=bool(d["ev2_connected"]), + is_predicted_price=bool(d.get("is_predicted_price")), + allow_charge=bool(d.get("allow_charge", True)), + allow_discharge_export=bool(d.get("allow_discharge_export", True)), + night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"), + night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"), + safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"), + future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"), + future_sell_opportunity_czk_kwh=_slot_float_nullable( + d, "future_sell_opportunity_czk_kwh" + ), + is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)), + charge_acquisition_buy_czk_kwh=_slot_float_nullable( + d, "charge_acquisition_buy_czk_kwh" + ), + charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"), + min_buy_before_cutoff_czk_kwh=_slot_float_nullable( + d, "min_buy_before_cutoff_czk_kwh" + ), + pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"), + neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"), + grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"), + charge_target_wh=_slot_float_nullable(d, "charge_target_wh"), + pre_window_wh=_slot_float_nullable(d, "pre_window_wh"), + in_window_wh=_slot_float_nullable(d, "in_window_wh"), + charge_slot_wh=_slot_float_nullable(d, "charge_slot_wh"), + charge_cum_wh=_slot_float_nullable(d, "charge_cum_wh"), + charge_layer=d.get("charge_layer"), + charge_slot_reason=d.get("charge_slot_reason"), + ) + ) + if not out: + raise RuntimeError( + "No planning slots available – check market prices and horizon settings" + ) + if any(s.is_predicted_price for s in out): + logger.warning( + "[site=%s] Unexpected predicted-price slots in planning horizon", + site_id, + ) + return out + +def _build_slot_inputs( + slots_raw_pv: list[PlanningSlot], + slots_solver: list[PlanningSlot], +) -> list[tuple[int, int, int, int, int]]: + """(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot.""" + if len(slots_raw_pv) != len(slots_solver): + raise ValueError("slots_raw_pv and slots_solver length mismatch") + out: list[tuple[int, int, int, int, int]] = [] + for raw, sol in zip(slots_raw_pv, slots_solver): + out.append( + ( + int(raw.load_baseline_w), + int(raw.pv_a_forecast_w), + int(raw.pv_b_forecast_w), + int(sol.pv_a_forecast_w), + int(sol.pv_b_forecast_w), + ) + ) + return out + +async def _save_planning_run( + site_id, results, horizon_from, horizon_to, + run_type, triggered_by, replan_from, + soc_wh, duration_ms, correction, db, + slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None, + *, + activate_run: bool = True, + solver_snapshot: Optional[dict[str, Any]] = None, +) -> int: + """Uloží výsledky solveru přes ems.fn_planning_run_commit.""" + if slot_inputs is not None and len(slot_inputs) != len(results): + raise ValueError("slot_inputs and results length mismatch") + run_meta: dict[str, Any] = { + "run_type": run_type, + "triggered_by": triggered_by, + "replan_from": replan_from.isoformat() if replan_from else None, + "soc_at_replan_wh": soc_wh, + "solver_duration_ms": duration_ms, + "forecast_correction_factor": correction, + } + if solver_snapshot is not None: + run_meta["solver_params"] = solver_snapshot + intervals: list[dict] = [] + for i, r in enumerate(results): + row: dict = { + "interval_start": r.interval_start.isoformat() + if hasattr(r.interval_start, "isoformat") + else r.interval_start, + "battery_setpoint_w": r.battery_setpoint_w, + "battery_soc_target_pct": r.battery_soc_target, + "grid_setpoint_w": r.grid_setpoint_w, + "export_limit_w": r.export_limit_w, + "export_mode": r.export_mode, + "deye_physical_mode": r.deye_physical_mode, + "deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled, + "ev1_setpoint_w": r.ev1_setpoint_w, + "ev2_setpoint_w": r.ev2_setpoint_w, + "ev1_via_bat_w": r.ev1_via_bat_w, + "ev2_via_bat_w": r.ev2_via_bat_w, + "heat_pump_enabled": r.heat_pump_enabled, + "heat_pump_setpoint_w": r.heat_pump_setpoint_w, + "pv_a_curtailed_w": r.pv_a_curtailed_w, + "expected_cost_czk": float(r.expected_cost_czk), + "cashflow_czk": float(r.cashflow_czk), + "battery_arbitrage_czk": float(r.battery_arbitrage_czk), + "penalty_czk": float(r.penalty_czk), + "green_bonus_czk": float(r.green_bonus_czk), + "effective_buy_price": float(r.effective_buy_price), + "effective_sell_price": float(r.effective_sell_price), + "is_predicted_price": r.is_predicted_price, + } + if slot_inputs is not None: + si = slot_inputs[i] + row["load_baseline_w"] = si[0] + row["pv_a_forecast_raw_w"] = si[1] + row["pv_b_forecast_raw_w"] = si[2] + row["pv_a_forecast_solver_w"] = si[3] + row["pv_b_forecast_solver_w"] = si[4] + intervals.append(row) + + return int( + await db.fetchval( + """ + select ems.fn_planning_run_commit( + $1::int, $2::timestamptz, $3::timestamptz, + $4::jsonb, $5::jsonb, $6::boolean + ) + """, + site_id, + horizon_from, + horizon_to, + json.dumps(run_meta, default=str), + json.dumps(intervals, default=str), + activate_run, + ) + ) + +async def _save_failed_planning_run( + site_id: int, + horizon_from: datetime, + horizon_to: datetime, + *, + run_type: str, + triggered_by: str, + replan_from: datetime | None, + soc_wh: float, + correction: float, + db, + error: PlannerSolverError, + slot_count: int | None = None, +) -> int: + """Uloží neúspěšný běh plánovače (status=failed); aktivní plán nemění.""" + run_meta: dict[str, Any] = { + "run_type": run_type, + "triggered_by": triggered_by, + "replan_from": replan_from.isoformat() if replan_from else None, + "soc_at_replan_wh": soc_wh, + "solver_duration_ms": 0, + "forecast_correction_factor": correction, + "error_text": str(error), + "solver_params": { + "status": "failed", + "planner_build_tag": PLANNER_BUILD_TAG, + "solver_status": error.solver_status, + "relax_chain": error.relax_chain, + "slot_count": slot_count, + }, + } + run_id = int( + await db.fetchval( + """ + select ems.fn_planning_run_fail( + $1::int, $2::timestamptz, $3::timestamptz, $4::jsonb + ) + """, + site_id, + horizon_from, + horizon_to, + json.dumps(run_meta, default=str), + ) + ) + logger.error( + "[site=%s] Planning solver failed run_id=%s: %s relax_chain=%s", + site_id, + run_id, + error, + error.relax_chain, + ) + return run_id diff --git a/backend/services/planning/forecast.py b/backend/services/planning/forecast.py new file mode 100644 index 0000000..f28a89e --- /dev/null +++ b/backend/services/planning/forecast.py @@ -0,0 +1,97 @@ +# backend/services/planning/forecast.py +# +# EMS plánovač – korekce FVE forecastu podle skutečné výroby +# (Fáze 1 dekompozice, čistý přesun z planning_engine.py). + +import json +import logging +from dataclasses import replace +from datetime import datetime, timedelta +from typing import Optional + +from services.planning.constants import ( + CORRECTION_DECAY_SLOTS, + CORRECTION_MAX_CLAMP, + CORRECTION_MIN_CLAMP, + CORRECTION_WINDOW_H, +) +from services.planning.types import PlanningSlot, _parse_json_dt + +logger = logging.getLogger(__name__) + + +async def compute_correction_factor( + site_id: int, + now: datetime, + db, + window_h: float = CORRECTION_WINDOW_H, +) -> tuple[float, dict]: + """ + Spočítá korekční faktor FVE forecastu z posledních window_h hodin. + + Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP]. + factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný. + """ + window_start = now - timedelta(hours=window_h) + raw = await db.fetchval( + """ + select ems.fn_pv_forecast_correction_factor( + $1::int, $2::timestamptz, $3::timestamptz, + $4::numeric, $5::numeric + ) + """, + site_id, + window_start, + now, + CORRECTION_MIN_CLAMP, + CORRECTION_MAX_CLAMP, + ) + j = raw if isinstance(raw, dict) else json.loads(raw) + factor = float(j.get("correction_factor", 1.0)) + # JSON z DB má často ISO řetězce; asyncpg u $2/$3 vyžaduje datetime + ws = _parse_json_dt(j.get("window_start")) or window_start + we = _parse_json_dt(j.get("window_end")) or now + log_data = { + "window_start": ws, + "window_end": we, + "actual_pv_wh": j.get("actual_pv_wh"), + "forecast_pv_wh": j.get("forecast_pv_wh"), + "correction_factor": factor, + "reason": j.get("reason", "ok"), + } + if j.get("raw_factor") is not None: + log_data["raw_factor"] = j["raw_factor"] + return factor, log_data + +def apply_forecast_correction( + slots: list[PlanningSlot], + now: datetime, + factor: float, + decay_slots: int = CORRECTION_DECAY_SLOTS, +) -> list[PlanningSlot]: + """ + Aplikuje korekční faktor na FVE forecast zbývajících slotů. + Korekce se lineárně utlumuje: na 1. slotu plná korekce, + na decay_slots-tém slotu žádná korekce. + + Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce + """ + corrected = [] + for i, slot in enumerate(slots): + if factor == 1.0 or i >= decay_slots: + corrected.append(slot) + continue + + # Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots) + weight = 1.0 - (i / decay_slots) + effective_factor = 1.0 + (factor - 1.0) * weight + + corrected.append( + replace( + slot, + pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)), + pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)), + ) + ) + + return corrected diff --git a/backend/services/planning_engine.py b/backend/services/planning_engine.py index f0cdb6c..f617415 100644 --- a/backend/services/planning_engine.py +++ b/backend/services/planning_engine.py @@ -37,6 +37,19 @@ from services.planning.types import ( _parse_json_dt, _current_slot_start, ) +from services.planning.forecast import ( + compute_correction_factor, + apply_forecast_correction, +) +from services.planning.db_io import ( + _ev_session_from_json, + _load_site_context, + _load_previous_plan_charge_commitment_prev_w, + _load_slots, + _build_slot_inputs, + _save_planning_run, + _save_failed_planning_run, +) from services.planning.constants import ( ACQUISITION_TWO_PASS_EPS_KWH, SOLVER_RELAX_STEPS, @@ -506,82 +519,8 @@ def _soc_panel_min_wh_series( # Korekce forecastu na základě skutečné výroby # ============================================================ -async def compute_correction_factor( - site_id: int, - now: datetime, - db, - window_h: float = CORRECTION_WINDOW_H, -) -> tuple[float, dict]: - """ - Spočítá korekční faktor FVE forecastu z posledních window_h hodin. - - Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP]. - factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný. - """ - window_start = now - timedelta(hours=window_h) - raw = await db.fetchval( - """ - select ems.fn_pv_forecast_correction_factor( - $1::int, $2::timestamptz, $3::timestamptz, - $4::numeric, $5::numeric - ) - """, - site_id, - window_start, - now, - CORRECTION_MIN_CLAMP, - CORRECTION_MAX_CLAMP, - ) - j = raw if isinstance(raw, dict) else json.loads(raw) - factor = float(j.get("correction_factor", 1.0)) - # JSON z DB má často ISO řetězce; asyncpg u $2/$3 vyžaduje datetime - ws = _parse_json_dt(j.get("window_start")) or window_start - we = _parse_json_dt(j.get("window_end")) or now - log_data = { - "window_start": ws, - "window_end": we, - "actual_pv_wh": j.get("actual_pv_wh"), - "forecast_pv_wh": j.get("forecast_pv_wh"), - "correction_factor": factor, - "reason": j.get("reason", "ok"), - } - if j.get("raw_factor") is not None: - log_data["raw_factor"] = j["raw_factor"] - return factor, log_data -def apply_forecast_correction( - slots: list[PlanningSlot], - now: datetime, - factor: float, - decay_slots: int = CORRECTION_DECAY_SLOTS, -) -> list[PlanningSlot]: - """ - Aplikuje korekční faktor na FVE forecast zbývajících slotů. - Korekce se lineárně utlumuje: na 1. slotu plná korekce, - na decay_slots-tém slotu žádná korekce. - - Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce - """ - corrected = [] - for i, slot in enumerate(slots): - if factor == 1.0 or i >= decay_slots: - corrected.append(slot) - continue - - # Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots) - weight = 1.0 - (i / decay_slots) - effective_factor = 1.0 + (factor - 1.0) * weight - - corrected.append( - replace( - slot, - pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)), - pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)), - ) - ) - - return corrected # ============================================================ @@ -5699,150 +5638,8 @@ async def run_plan_api( -def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]: - if obj is None or obj == []: - return None - if isinstance(obj, str): - obj = json.loads(obj) - if not isinstance(obj, dict): - return None - td = _parse_json_dt(obj.get("target_deadline")) - if td is None: - return None - return SimpleNamespace( - target_deadline=td, - energy_needed_wh=float(obj["energy_needed_wh"]), - ) -async def _load_site_context(site_id: int, db): - """ - Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV, režim a TUV statistiky (SQL). - """ - raw = await db.fetchval( - "select ems.fn_planning_site_context($1::int)", - site_id, - ) - ctx = raw if isinstance(raw, dict) else json.loads(raw) - if ctx.get("error") == "unknown_site": - raise RuntimeError(f"Site not found: {site_id}") - - b = ctx["battery"] - ec_i = int(b["max_charge_power_w"]) - ed_i = int(b["max_discharge_power_w"]) - planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"])) - floor_pct = b.get("planner_discharge_floor_percent") - buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh") - relax_prewin = b.get("planner_discharge_relax_prewindow_slots") - battery = SimpleNamespace( - usable_capacity_wh=float(b["usable_capacity_wh"]), - min_soc_wh=float(b["min_soc_wh"]), - arb_floor_wh=float(b["arb_floor_wh"]), - reserve_soc_wh=float(b["reserve_soc_wh"]), - soc_max_wh=planner_soc_max, - charge_efficiency=float(b["charge_efficiency"]), - discharge_efficiency=float(b["discharge_efficiency"]), - degradation_cost_czk_kwh=float(b["degradation_cost_czk_kwh"]), - max_charge_power_w=ec_i, - max_discharge_power_w=ed_i, - charge_slot_buffer=float(b["charge_slot_buffer"]) - if b.get("charge_slot_buffer") is not None - else 0, - discharge_slot_buffer=float(b["discharge_slot_buffer"]) - if b.get("discharge_slot_buffer") is not None - else 0, - planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0, - planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None, - planner_discharge_relax_prewindow_slots=int(relax_prewin) - if relax_prewin is not None - else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS, - planner_terminal_soc_value_factor=float(b["planner_terminal_soc_value_factor"]), - planner_daytime_charge_target_enabled=bool( - b.get("planner_daytime_charge_target_enabled", True) - ), - planner_night_baseload_buffer_percent=float( - b.get("planner_night_baseload_buffer_percent") or 20.0 - ), - planner_daytime_charge_price_quantile=float( - b.get("planner_daytime_charge_price_quantile") or 0.70 - ), - planner_charge_commitment_penalty_czk_kwh=float( - b.get("planner_charge_commitment_penalty_czk_kwh") or 0.20 - ), - planner_neg_sell_prep_soc_percent=float( - b.get("planner_neg_sell_prep_soc_percent") or 80.0 - ), - planner_neg_sell_full_soc_tail_slots=int( - b.get("planner_neg_sell_full_soc_tail_slots") or 4 - ), - planner_neg_sell_vent_min_sell_czk_kwh=( - float(b["planner_neg_sell_vent_min_sell_czk_kwh"]) - if b.get("planner_neg_sell_vent_min_sell_czk_kwh") is not None - else None - ), - ) - - hpj = ctx["heat_pump"] - heat_pump = SimpleNamespace( - rated_heating_power_w=int(hpj["rated_heating_power_w"]), - tuv_min_temp_c=float(hpj["tuv_min_temp_c"]), - tuv_target_temp_c=float(hpj["tuv_target_temp_c"]), - ) - - g = ctx["grid"] - m = ctx.get("market") or {} - grid = SimpleNamespace( - max_import_power_w=int(g["max_import_power_w"]), - max_export_power_w=int(g["max_export_power_w"]), - block_export_on_negative_sell=bool(g.get("block_export_on_negative_sell") or False), - deye_gen_microinverter_cutoff_enabled=bool(g.get("deye_gen_microinverter_cutoff_enabled") or False), - purchase_pricing_mode=str(m.get("purchase_pricing_mode") or "spot").strip().lower(), - sale_pricing_mode=str(m.get("sale_pricing_mode") or "spot").strip().lower(), - ) - - vehicles: list[SimpleNamespace] = [] - for v in ctx.get("vehicles") or []: - vehicles.append( - SimpleNamespace( - max_charge_power_w=int(v["max_charge_power_w"]), - battery_capacity_kwh=float(v["battery_capacity_kwh"]), - default_target_soc_pct=float(v["default_target_soc_pct"]), - ) - ) - while len(vehicles) < 2: - vehicles.append( - SimpleNamespace( - max_charge_power_w=0, - battery_capacity_kwh=1.0, - default_target_soc_pct=80.0, - ) - ) - - ev_raw = ctx.get("ev_sessions") or [] - ev_sessions = [ - _ev_session_from_json(ev_raw[0]) if len(ev_raw) > 0 else None, - _ev_session_from_json(ev_raw[1]) if len(ev_raw) > 1 else None, - ] - - soc_wh = float(ctx["soc_wh"]) - tuv_temp = float(ctx["tuv_temp"]) - operating_mode = ctx.get("operating_mode") - - tuv_stats: dict[tuple[int, int], float] = {} - for row in ctx.get("tuv_delta_stats") or []: - tuv_stats[(int(row["dow"]), int(row["hour"]))] = float(row["delta"]) - - return ( - battery, - heat_pump, - grid, - vehicles, - ev_sessions, - soc_wh, - tuv_temp, - operating_mode, - tuv_stats, - ) async def _rolling_evening_push_override( @@ -5910,287 +5707,11 @@ async def _rolling_evening_push_override( return prev_push -async def _load_previous_plan_charge_commitment_prev_w( - site_id: int, - slots: list[PlanningSlot], - db, -) -> list[Optional[float]]: - """ - Pro rolling replan: z aktivního plánu načte battery_setpoint_w pro shodné sloty. - Kotva měkkého commitmentu jen když předchozí plán chtěl nabíjet z PV přebytku (viz podmínky). - """ - if not slots: - return [] - rows = await db.fetch( - """ - select pi.interval_start, - pi.battery_setpoint_w, - pi.grid_setpoint_w, - coalesce(pi.pv_a_forecast_solver_w, 0) as pva, - coalesce(pi.pv_b_forecast_solver_w, 0) as pvb, - coalesce(pi.load_baseline_w, 0) as lb - from ems.planning_interval pi - inner join ems.planning_run pr on pr.id = pi.run_id - where pr.site_id = $1::int - and pr.status = 'active' - """, - site_id, - ) - by_start = {r["interval_start"]: r for r in rows} - out: list[Optional[float]] = [] - for s in slots: - r = by_start.get(s.interval_start) - if r is None: - out.append(None) - continue - bw = int(r["battery_setpoint_w"] or 0) - gw = int(r["grid_setpoint_w"] or 0) - pva = int(r["pva"] or 0) - pvb = int(r["pvb"] or 0) - lb = int(r["lb"] or 0) - # Commitment má kotvit jen „nabíjení z PV přebytku“, ne situace kdy plán současně - # výrazně exportuje do sítě (typicky charge while exporting). To by stabilizovalo špatný cyklus. - if bw > 500 and (pva + pvb) > lb and gw <= 0 and gw >= -500: - out.append(float(bw)) - else: - out.append(None) - return out -async def _load_slots( - site_id: int, - from_dt: datetime, - to_dt: datetime, - db, - *, - soc_wh: float, -) -> list[PlanningSlot]: - """15min sloty z ems.fn_load_planning_slots_full.""" - rows = await db.fetch( - """ - select slot_ord, interval_start, buy_price, sell_price, is_predicted_price, - pv_a_forecast_w, pv_b_forecast_w, load_baseline_w, - ev1_connected, ev2_connected, allow_charge, allow_discharge_export, - night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh, - future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh, - is_daytime_pv_surplus_slot, - charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at, - min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead, - grid_charge_suppressed_reason, - charge_target_wh, pre_window_wh, in_window_wh, - charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason - from ems.fn_load_planning_slots_full( - $1::int, $2::timestamptz, $3::timestamptz, $4::numeric - ) - """, - site_id, - from_dt, - to_dt, - soc_wh, - ) - out: list[PlanningSlot] = [] - for r in rows: - d = dict(r) - out.append( - PlanningSlot( - interval_start=d["interval_start"], - buy_price=float(d["buy_price"]), - sell_price=float(d["sell_price"]), - pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0), - pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0), - load_baseline_w=int(d["load_baseline_w"] or 0), - ev1_connected=bool(d["ev1_connected"]), - ev2_connected=bool(d["ev2_connected"]), - is_predicted_price=bool(d.get("is_predicted_price")), - allow_charge=bool(d.get("allow_charge", True)), - allow_discharge_export=bool(d.get("allow_discharge_export", True)), - night_baseload_target_wh=_slot_float_nullable(d, "night_baseload_target_wh"), - night_baseload_buffer_wh=_slot_float_nullable(d, "night_baseload_buffer_wh"), - safety_soc_target_wh=_slot_float_nullable(d, "safety_soc_target_wh"), - future_avoided_buy_czk_kwh=_slot_float_nullable(d, "future_avoided_buy_czk_kwh"), - future_sell_opportunity_czk_kwh=_slot_float_nullable( - d, "future_sell_opportunity_czk_kwh" - ), - is_daytime_pv_surplus_slot=bool(d.get("is_daytime_pv_surplus_slot", False)), - charge_acquisition_buy_czk_kwh=_slot_float_nullable( - d, "charge_acquisition_buy_czk_kwh" - ), - charge_acquisition_cutoff_at=d.get("charge_acquisition_cutoff_at"), - min_buy_before_cutoff_czk_kwh=_slot_float_nullable( - d, "min_buy_before_cutoff_czk_kwh" - ), - pv_charge_wh_ahead=_slot_float_nullable(d, "pv_charge_wh_ahead"), - neg_buy_wh_ahead=_slot_float_nullable(d, "neg_buy_wh_ahead"), - grid_charge_suppressed_reason=d.get("grid_charge_suppressed_reason"), - charge_target_wh=_slot_float_nullable(d, "charge_target_wh"), - pre_window_wh=_slot_float_nullable(d, "pre_window_wh"), - in_window_wh=_slot_float_nullable(d, "in_window_wh"), - charge_slot_wh=_slot_float_nullable(d, "charge_slot_wh"), - charge_cum_wh=_slot_float_nullable(d, "charge_cum_wh"), - charge_layer=d.get("charge_layer"), - charge_slot_reason=d.get("charge_slot_reason"), - ) - ) - if not out: - raise RuntimeError( - "No planning slots available – check market prices and horizon settings" - ) - if any(s.is_predicted_price for s in out): - logger.warning( - "[site=%s] Unexpected predicted-price slots in planning horizon", - site_id, - ) - return out -def _build_slot_inputs( - slots_raw_pv: list[PlanningSlot], - slots_solver: list[PlanningSlot], -) -> list[tuple[int, int, int, int, int]]: - """(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot.""" - if len(slots_raw_pv) != len(slots_solver): - raise ValueError("slots_raw_pv and slots_solver length mismatch") - out: list[tuple[int, int, int, int, int]] = [] - for raw, sol in zip(slots_raw_pv, slots_solver): - out.append( - ( - int(raw.load_baseline_w), - int(raw.pv_a_forecast_w), - int(raw.pv_b_forecast_w), - int(sol.pv_a_forecast_w), - int(sol.pv_b_forecast_w), - ) - ) - return out -async def _save_planning_run( - site_id, results, horizon_from, horizon_to, - run_type, triggered_by, replan_from, - soc_wh, duration_ms, correction, db, - slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None, - *, - activate_run: bool = True, - solver_snapshot: Optional[dict[str, Any]] = None, -) -> int: - """Uloží výsledky solveru přes ems.fn_planning_run_commit.""" - if slot_inputs is not None and len(slot_inputs) != len(results): - raise ValueError("slot_inputs and results length mismatch") - run_meta: dict[str, Any] = { - "run_type": run_type, - "triggered_by": triggered_by, - "replan_from": replan_from.isoformat() if replan_from else None, - "soc_at_replan_wh": soc_wh, - "solver_duration_ms": duration_ms, - "forecast_correction_factor": correction, - } - if solver_snapshot is not None: - run_meta["solver_params"] = solver_snapshot - intervals: list[dict] = [] - for i, r in enumerate(results): - row: dict = { - "interval_start": r.interval_start.isoformat() - if hasattr(r.interval_start, "isoformat") - else r.interval_start, - "battery_setpoint_w": r.battery_setpoint_w, - "battery_soc_target_pct": r.battery_soc_target, - "grid_setpoint_w": r.grid_setpoint_w, - "export_limit_w": r.export_limit_w, - "export_mode": r.export_mode, - "deye_physical_mode": r.deye_physical_mode, - "deye_gen_cutoff_enabled": r.deye_gen_cutoff_enabled, - "ev1_setpoint_w": r.ev1_setpoint_w, - "ev2_setpoint_w": r.ev2_setpoint_w, - "ev1_via_bat_w": r.ev1_via_bat_w, - "ev2_via_bat_w": r.ev2_via_bat_w, - "heat_pump_enabled": r.heat_pump_enabled, - "heat_pump_setpoint_w": r.heat_pump_setpoint_w, - "pv_a_curtailed_w": r.pv_a_curtailed_w, - "expected_cost_czk": float(r.expected_cost_czk), - "cashflow_czk": float(r.cashflow_czk), - "battery_arbitrage_czk": float(r.battery_arbitrage_czk), - "penalty_czk": float(r.penalty_czk), - "green_bonus_czk": float(r.green_bonus_czk), - "effective_buy_price": float(r.effective_buy_price), - "effective_sell_price": float(r.effective_sell_price), - "is_predicted_price": r.is_predicted_price, - } - if slot_inputs is not None: - si = slot_inputs[i] - row["load_baseline_w"] = si[0] - row["pv_a_forecast_raw_w"] = si[1] - row["pv_b_forecast_raw_w"] = si[2] - row["pv_a_forecast_solver_w"] = si[3] - row["pv_b_forecast_solver_w"] = si[4] - intervals.append(row) - - return int( - await db.fetchval( - """ - select ems.fn_planning_run_commit( - $1::int, $2::timestamptz, $3::timestamptz, - $4::jsonb, $5::jsonb, $6::boolean - ) - """, - site_id, - horizon_from, - horizon_to, - json.dumps(run_meta, default=str), - json.dumps(intervals, default=str), - activate_run, - ) - ) -async def _save_failed_planning_run( - site_id: int, - horizon_from: datetime, - horizon_to: datetime, - *, - run_type: str, - triggered_by: str, - replan_from: datetime | None, - soc_wh: float, - correction: float, - db, - error: PlannerSolverError, - slot_count: int | None = None, -) -> int: - """Uloží neúspěšný běh plánovače (status=failed); aktivní plán nemění.""" - run_meta: dict[str, Any] = { - "run_type": run_type, - "triggered_by": triggered_by, - "replan_from": replan_from.isoformat() if replan_from else None, - "soc_at_replan_wh": soc_wh, - "solver_duration_ms": 0, - "forecast_correction_factor": correction, - "error_text": str(error), - "solver_params": { - "status": "failed", - "planner_build_tag": PLANNER_BUILD_TAG, - "solver_status": error.solver_status, - "relax_chain": error.relax_chain, - "slot_count": slot_count, - }, - } - run_id = int( - await db.fetchval( - """ - select ems.fn_planning_run_fail( - $1::int, $2::timestamptz, $3::timestamptz, $4::jsonb - ) - """, - site_id, - horizon_from, - horizon_to, - json.dumps(run_meta, default=str), - ) - ) - logger.error( - "[site=%s] Planning solver failed run_id=%s: %s relax_chain=%s", - site_id, - run_id, - error, - error.relax_chain, - ) - return run_id