Files
ems/backend/app/routers/plan.py
Dusan Vojacek d984716f69
Some checks failed
CI and deploy / migration-check (push) Failing after 12s
CI and deploy / deploy (push) Has been skipped
speedup zalozka planning
2026-05-21 10:37:32 +02:00

270 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""REST API aktivní plán a ruční přepočet."""
import json
import logging
from datetime import datetime, timezone
from typing import Annotated, Any, Literal
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, ConfigDict, Field
from app.db_json import fetch_json
from app.deps import get_pg_pool
from services.control_exporter import export_setpoints
from services.planning_engine import run_plan_api
router = APIRouter(prefix="/sites/{site_id}/plan", tags=["plan"])
logger = logging.getLogger(__name__)
class RunPlanResponse(BaseModel):
run_id: int
solver_duration_ms: int
horizon_start: datetime
horizon_end: datetime
class PlanningIntervalDto(BaseModel):
"""Řádek `ems.planning_interval` v odpovědi aktivního plánu."""
model_config = ConfigDict(extra="allow")
interval_start: str
is_predicted_price: bool = Field(
default=False,
description=(
"True pokud solver pro slot použil predikovanou cenu (market_price_stats), "
"nikoli přesný řádek z vw_site_effective_price / OTE."
),
)
class PlanningBundleDto(BaseModel):
run: dict[str, Any]
intervals: list[PlanningIntervalDto]
summary: dict[str, Any]
class CurrentPlanResponseModel(PlanningBundleDto):
pass
class ComparisonSlotDiffDto(BaseModel):
interval_start: str
active: dict[str, Any]
comparison: dict[str, Any]
class PlanningCompareResponseModel(BaseModel):
active: PlanningBundleDto
comparison: PlanningBundleDto
diff: dict[str, Any]
slot_diffs: list[ComparisonSlotDiffDto]
def _bundle_from_payload(payload: dict[str, Any], *, run_key: str) -> PlanningBundleDto:
run_raw = payload.get(run_key) or {}
if not isinstance(run_raw, dict):
run_raw = {}
intervals_raw = payload.get("intervals") or []
if not isinstance(intervals_raw, list):
intervals_raw = []
intervals = [PlanningIntervalDto.model_validate(d) for d in intervals_raw if isinstance(d, dict)]
summary = payload.get("summary") or {}
if not isinstance(summary, dict):
summary = {}
return PlanningBundleDto(run=run_raw, intervals=intervals, summary=summary)
def _bundle_from_current(payload: dict[str, Any]) -> PlanningBundleDto:
return _bundle_from_payload(payload, run_key="run")
def _bundle_from_debug(payload: dict[str, Any]) -> PlanningBundleDto:
return _bundle_from_payload(payload, run_key="planning_run")
def _build_plan_diff(
active: PlanningBundleDto,
comparison: PlanningBundleDto,
) -> tuple[dict[str, Any], list[ComparisonSlotDiffDto]]:
active_by_ts = {i.interval_start: i for i in active.intervals}
compare_by_ts = {i.interval_start: i for i in comparison.intervals}
diffs: list[ComparisonSlotDiffDto] = []
interesting_keys = (
"battery_setpoint_w",
"battery_soc_target_pct",
"grid_setpoint_w",
"export_limit_w",
"export_mode",
"deye_physical_mode",
"deye_gen_cutoff_enabled",
"pv_a_curtailed_w",
"expected_cost_czk",
)
for ts, a in active_by_ts.items():
b = compare_by_ts.get(ts)
if b is None:
continue
active_payload = a.model_dump()
comparison_payload = b.model_dump()
if any(active_payload.get(k) != comparison_payload.get(k) for k in interesting_keys):
diffs.append(
ComparisonSlotDiffDto(
interval_start=ts,
active={k: active_payload.get(k) for k in interesting_keys},
comparison={k: comparison_payload.get(k) for k in interesting_keys},
)
)
def _summary_num(bundle: PlanningBundleDto, key: str) -> float:
raw = bundle.summary.get(key)
try:
return float(raw) if raw is not None else 0.0
except (TypeError, ValueError):
return 0.0
active_cost = _summary_num(active, "total_expected_cost_czk")
compare_cost = _summary_num(comparison, "total_expected_cost_czk")
diff = {
"active_total_expected_cost_czk": active_cost,
"comparison_total_expected_cost_czk": compare_cost,
"total_expected_cost_czk": round(active_cost - compare_cost, 4),
"absolute_total_expected_cost_czk": round(abs(active_cost - compare_cost), 4),
"active_charge_slots": int(_summary_num(active, "charge_slots")),
"comparison_charge_slots": int(_summary_num(comparison, "charge_slots")),
"active_discharge_slots": int(_summary_num(active, "discharge_slots")),
"comparison_discharge_slots": int(_summary_num(comparison, "discharge_slots")),
"active_export_slots": int(_summary_num(active, "export_slots")),
"comparison_export_slots": int(_summary_num(comparison, "export_slots")),
"changed_slots": len(diffs),
}
return diff, diffs
@router.get("/current", response_model=CurrentPlanResponseModel)
async def get_current_plan(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> CurrentPlanResponseModel:
async with pool.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
bundle = await fetch_json(
conn,
"select ems.fn_plan_current_bundle($1::int)",
site_id,
)
if not isinstance(bundle, dict):
bundle = json.loads(bundle)
if bundle.get("error") == "no_active_plan":
raise HTTPException(status_code=404, detail="No active plan")
plan = _bundle_from_current(bundle)
return CurrentPlanResponseModel(
run=plan.run,
intervals=plan.intervals,
summary=plan.summary,
)
@router.get("/compare", response_model=PlanningCompareResponseModel)
async def get_plan_compare(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> PlanningCompareResponseModel:
async with pool.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
payload = await fetch_json(
conn,
"select ems.fn_plan_compare_bundle($1::int)",
site_id,
)
if not isinstance(payload, dict):
payload = json.loads(payload)
err = payload.get("error")
if err == "no_active_plan":
raise HTTPException(status_code=404, detail="No active plan")
if err == "no_comparison_plan":
raise HTTPException(status_code=404, detail="No comparison plan")
active_raw = payload.get("active") or {}
compare_raw = payload.get("comparison")
if not isinstance(active_raw, dict):
active_raw = {}
if not isinstance(compare_raw, dict):
raise HTTPException(status_code=404, detail="No comparison plan")
active = _bundle_from_current(active_raw)
diff, slot_diffs = _build_plan_diff(active, comparison)
return PlanningCompareResponseModel(
active=active,
comparison=comparison,
diff=diff,
slot_diffs=slot_diffs,
)
@router.post("/run", response_model=RunPlanResponse)
async def post_run_plan(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
plan_type: Literal["daily", "rolling"] = Query("rolling", alias="type"),
) -> RunPlanResponse:
async with pool.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
days_with_prices = await conn.fetchval(
"select ems.fn_planning_future_price_days()",
)
if (days_with_prices or 0) < 1:
raise HTTPException(
status_code=422,
detail="Nejsou dostupné tržní ceny",
)
try:
run_id, solver_duration_ms = await run_plan_api(
site_id, plan_type, conn, triggered_by="api"
)
await export_setpoints(site_id, conn)
row = await fetch_json(
conn,
"select ems.fn_planning_run_horizon($1::int)",
run_id,
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except RuntimeError as e:
raise HTTPException(status_code=422, detail=str(e)) from e
except Exception as e:
logger.error("Plan run failed: %s", e, exc_info=True)
raise HTTPException(status_code=422, detail=str(e)) from e
if not isinstance(row, dict) or row.get("horizon_start") is None:
raise HTTPException(status_code=500, detail="Planning run row missing after insert")
return RunPlanResponse(
run_id=run_id,
solver_duration_ms=solver_duration_ms,
horizon_start=row["horizon_start"],
horizon_end=row["horizon_end"],
)