270 lines
9.1 KiB
Python
270 lines
9.1 KiB
Python
"""REST API – aktivní plán a ruční přepočet."""
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from typing import Annotated, Any, Literal
|
||
|
||
import asyncpg
|
||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||
from pydantic import BaseModel, ConfigDict, Field
|
||
|
||
from app.db_json import fetch_json
|
||
from app.deps import get_pg_pool
|
||
from services.control_exporter import export_setpoints
|
||
from services.planning_engine import run_plan_api
|
||
|
||
router = APIRouter(prefix="/sites/{site_id}/plan", tags=["plan"])
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class RunPlanResponse(BaseModel):
|
||
run_id: int
|
||
solver_duration_ms: int
|
||
horizon_start: datetime
|
||
horizon_end: datetime
|
||
|
||
|
||
class PlanningIntervalDto(BaseModel):
|
||
"""Řádek `ems.planning_interval` v odpovědi aktivního plánu."""
|
||
|
||
model_config = ConfigDict(extra="allow")
|
||
|
||
interval_start: str
|
||
is_predicted_price: bool = Field(
|
||
default=False,
|
||
description=(
|
||
"True pokud solver pro slot použil predikovanou cenu (market_price_stats), "
|
||
"nikoli přesný řádek z vw_site_effective_price / OTE."
|
||
),
|
||
)
|
||
|
||
|
||
class PlanningBundleDto(BaseModel):
|
||
run: dict[str, Any]
|
||
intervals: list[PlanningIntervalDto]
|
||
summary: dict[str, Any]
|
||
|
||
|
||
class CurrentPlanResponseModel(PlanningBundleDto):
|
||
pass
|
||
|
||
|
||
class ComparisonSlotDiffDto(BaseModel):
|
||
interval_start: str
|
||
active: dict[str, Any]
|
||
comparison: dict[str, Any]
|
||
|
||
|
||
class PlanningCompareResponseModel(BaseModel):
|
||
active: PlanningBundleDto
|
||
comparison: PlanningBundleDto
|
||
diff: dict[str, Any]
|
||
slot_diffs: list[ComparisonSlotDiffDto]
|
||
|
||
|
||
def _bundle_from_payload(payload: dict[str, Any], *, run_key: str) -> PlanningBundleDto:
|
||
run_raw = payload.get(run_key) or {}
|
||
if not isinstance(run_raw, dict):
|
||
run_raw = {}
|
||
intervals_raw = payload.get("intervals") or []
|
||
if not isinstance(intervals_raw, list):
|
||
intervals_raw = []
|
||
intervals = [PlanningIntervalDto.model_validate(d) for d in intervals_raw if isinstance(d, dict)]
|
||
summary = payload.get("summary") or {}
|
||
if not isinstance(summary, dict):
|
||
summary = {}
|
||
return PlanningBundleDto(run=run_raw, intervals=intervals, summary=summary)
|
||
|
||
|
||
def _bundle_from_current(payload: dict[str, Any]) -> PlanningBundleDto:
|
||
return _bundle_from_payload(payload, run_key="run")
|
||
|
||
|
||
def _bundle_from_debug(payload: dict[str, Any]) -> PlanningBundleDto:
|
||
return _bundle_from_payload(payload, run_key="planning_run")
|
||
|
||
|
||
def _build_plan_diff(
|
||
active: PlanningBundleDto,
|
||
comparison: PlanningBundleDto,
|
||
) -> tuple[dict[str, Any], list[ComparisonSlotDiffDto]]:
|
||
active_by_ts = {i.interval_start: i for i in active.intervals}
|
||
compare_by_ts = {i.interval_start: i for i in comparison.intervals}
|
||
diffs: list[ComparisonSlotDiffDto] = []
|
||
interesting_keys = (
|
||
"battery_setpoint_w",
|
||
"battery_soc_target_pct",
|
||
"grid_setpoint_w",
|
||
"export_limit_w",
|
||
"export_mode",
|
||
"deye_physical_mode",
|
||
"deye_gen_cutoff_enabled",
|
||
"pv_a_curtailed_w",
|
||
"expected_cost_czk",
|
||
)
|
||
for ts, a in active_by_ts.items():
|
||
b = compare_by_ts.get(ts)
|
||
if b is None:
|
||
continue
|
||
active_payload = a.model_dump()
|
||
comparison_payload = b.model_dump()
|
||
if any(active_payload.get(k) != comparison_payload.get(k) for k in interesting_keys):
|
||
diffs.append(
|
||
ComparisonSlotDiffDto(
|
||
interval_start=ts,
|
||
active={k: active_payload.get(k) for k in interesting_keys},
|
||
comparison={k: comparison_payload.get(k) for k in interesting_keys},
|
||
)
|
||
)
|
||
|
||
def _summary_num(bundle: PlanningBundleDto, key: str) -> float:
|
||
raw = bundle.summary.get(key)
|
||
try:
|
||
return float(raw) if raw is not None else 0.0
|
||
except (TypeError, ValueError):
|
||
return 0.0
|
||
|
||
active_cost = _summary_num(active, "total_expected_cost_czk")
|
||
compare_cost = _summary_num(comparison, "total_expected_cost_czk")
|
||
diff = {
|
||
"active_total_expected_cost_czk": active_cost,
|
||
"comparison_total_expected_cost_czk": compare_cost,
|
||
"total_expected_cost_czk": round(active_cost - compare_cost, 4),
|
||
"absolute_total_expected_cost_czk": round(abs(active_cost - compare_cost), 4),
|
||
"active_charge_slots": int(_summary_num(active, "charge_slots")),
|
||
"comparison_charge_slots": int(_summary_num(comparison, "charge_slots")),
|
||
"active_discharge_slots": int(_summary_num(active, "discharge_slots")),
|
||
"comparison_discharge_slots": int(_summary_num(comparison, "discharge_slots")),
|
||
"active_export_slots": int(_summary_num(active, "export_slots")),
|
||
"comparison_export_slots": int(_summary_num(comparison, "export_slots")),
|
||
"changed_slots": len(diffs),
|
||
}
|
||
return diff, diffs
|
||
|
||
|
||
@router.get("/current", response_model=CurrentPlanResponseModel)
|
||
async def get_current_plan(
|
||
site_id: int,
|
||
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
) -> CurrentPlanResponseModel:
|
||
async with pool.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
bundle = await fetch_json(
|
||
conn,
|
||
"select ems.fn_plan_current_bundle($1::int)",
|
||
site_id,
|
||
)
|
||
if not isinstance(bundle, dict):
|
||
bundle = json.loads(bundle)
|
||
if bundle.get("error") == "no_active_plan":
|
||
raise HTTPException(status_code=404, detail="No active plan")
|
||
|
||
plan = _bundle_from_current(bundle)
|
||
return CurrentPlanResponseModel(
|
||
run=plan.run,
|
||
intervals=plan.intervals,
|
||
summary=plan.summary,
|
||
)
|
||
|
||
|
||
@router.get("/compare", response_model=PlanningCompareResponseModel)
|
||
async def get_plan_compare(
|
||
site_id: int,
|
||
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
) -> PlanningCompareResponseModel:
|
||
async with pool.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
payload = await fetch_json(
|
||
conn,
|
||
"select ems.fn_plan_compare_bundle($1::int)",
|
||
site_id,
|
||
)
|
||
if not isinstance(payload, dict):
|
||
payload = json.loads(payload)
|
||
err = payload.get("error")
|
||
if err == "no_active_plan":
|
||
raise HTTPException(status_code=404, detail="No active plan")
|
||
if err == "no_comparison_plan":
|
||
raise HTTPException(status_code=404, detail="No comparison plan")
|
||
|
||
active_raw = payload.get("active") or {}
|
||
compare_raw = payload.get("comparison")
|
||
if not isinstance(active_raw, dict):
|
||
active_raw = {}
|
||
if not isinstance(compare_raw, dict):
|
||
raise HTTPException(status_code=404, detail="No comparison plan")
|
||
|
||
active = _bundle_from_current(active_raw)
|
||
diff, slot_diffs = _build_plan_diff(active, comparison)
|
||
return PlanningCompareResponseModel(
|
||
active=active,
|
||
comparison=comparison,
|
||
diff=diff,
|
||
slot_diffs=slot_diffs,
|
||
)
|
||
|
||
|
||
@router.post("/run", response_model=RunPlanResponse)
|
||
async def post_run_plan(
|
||
site_id: int,
|
||
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
plan_type: Literal["daily", "rolling"] = Query("rolling", alias="type"),
|
||
) -> RunPlanResponse:
|
||
async with pool.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
days_with_prices = await conn.fetchval(
|
||
"select ems.fn_planning_future_price_days()",
|
||
)
|
||
if (days_with_prices or 0) < 1:
|
||
raise HTTPException(
|
||
status_code=422,
|
||
detail="Nejsou dostupné tržní ceny",
|
||
)
|
||
|
||
try:
|
||
run_id, solver_duration_ms = await run_plan_api(
|
||
site_id, plan_type, conn, triggered_by="api"
|
||
)
|
||
await export_setpoints(site_id, conn)
|
||
row = await fetch_json(
|
||
conn,
|
||
"select ems.fn_planning_run_horizon($1::int)",
|
||
run_id,
|
||
)
|
||
except HTTPException:
|
||
raise
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||
except RuntimeError as e:
|
||
raise HTTPException(status_code=422, detail=str(e)) from e
|
||
except Exception as e:
|
||
logger.error("Plan run failed: %s", e, exc_info=True)
|
||
raise HTTPException(status_code=422, detail=str(e)) from e
|
||
|
||
if not isinstance(row, dict) or row.get("horizon_start") is None:
|
||
raise HTTPException(status_code=500, detail="Planning run row missing after insert")
|
||
|
||
return RunPlanResponse(
|
||
run_id=run_id,
|
||
solver_duration_ms=solver_duration_ms,
|
||
horizon_start=row["horizon_start"],
|
||
horizon_end=row["horizon_end"],
|
||
)
|