"""REST API – aktivní plán a ruční přepočet.""" import logging from datetime import datetime, timezone from typing import Annotated, Any, Literal import asyncpg from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, ConfigDict, Field from app.db_json import record_to_dict from app.deps import get_pg_pool from services.control_exporter import export_setpoints from services.planning_engine import run_plan_api router = APIRouter(prefix="/sites/{site_id}/plan", tags=["plan"]) logger = logging.getLogger(__name__) class RunPlanResponse(BaseModel): run_id: int solver_duration_ms: int horizon_start: datetime horizon_end: datetime class PlanningIntervalDto(BaseModel): """Řádek `ems.planning_interval` v odpovědi aktivního plánu.""" model_config = ConfigDict(extra="allow") interval_start: str is_predicted_price: bool = Field( default=False, description=( "True pokud solver pro slot použil predikovanou cenu (market_price_stats), " "nikoli přesný řádek z vw_site_effective_price / OTE." ), ) class CurrentPlanResponseModel(BaseModel): run: dict[str, Any] intervals: list[PlanningIntervalDto] summary: dict[str, Any] def _build_summary(intervals: list[dict[str, Any]]) -> dict[str, Any]: total_cost = 0.0 total_curtailed_kwh = 0.0 charge_slots = 0 discharge_slots = 0 export_slots = 0 for row in intervals: ec = row.get("expected_cost_czk") if ec is not None: total_cost += float(ec) c = row.get("pv_a_curtailed_w") or 0 total_curtailed_kwh += int(c) * 0.25 / 1000.0 b = row.get("battery_setpoint_w") if b is not None: if int(b) > 0: charge_slots += 1 elif int(b) < 0: discharge_slots += 1 g = row.get("grid_setpoint_w") if g is not None and int(g) < 0: export_slots += 1 return { "total_expected_cost_czk": round(total_cost, 4), "total_pv_curtailed_kwh": round(total_curtailed_kwh, 6), "charge_slots": charge_slots, "discharge_slots": discharge_slots, "export_slots": export_slots, } def _pv_scarcity_factor_from_intervals( intervals: list[dict[str, Any]], battery_usable_wh: float | None ) -> float: """Stejná logika jako v solveru: 0.65..1.0 podle očekávané FVE energie na ~24h.""" if not intervals: return 1.0 batt_kwh = max(1.0, float(battery_usable_wh or 0.0) / 1000.0) horizon_slots = min(len(intervals), int(24 / 0.25)) pv_kwh = 0.0 for row in intervals[:horizon_slots]: pv = row.get("pv_forecast_total_w") if pv is not None: pv_kwh += max(0.0, float(pv)) * 0.25 / 1000.0 coverage = pv_kwh / batt_kwh coverage_clamped = max(0.0, min(1.0, coverage)) return round(0.65 + 0.35 * coverage_clamped, 4) @router.get("/current", response_model=CurrentPlanResponseModel) async def get_current_plan( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> CurrentPlanResponseModel: async with pool.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") run_row = await conn.fetchrow( """ SELECT pr.* FROM ems.planning_run pr WHERE pr.site_id = $1 AND pr.status = 'active' ORDER BY pr.created_at DESC LIMIT 1 """, site_id, ) if not run_row: raise HTTPException(status_code=404, detail="No active plan") run_id = run_row["id"] int_rows = await conn.fetch( """ WITH fc_slot AS ( SELECT interval_start, COALESCE(SUM(power_w), 0)::BIGINT AS pv_forecast_total_w FROM ( SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id) fpi.interval_start, fpi.power_w FROM ems.forecast_pv_interval fpi JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id JOIN ems.asset_pv_array apa ON apa.id = fpr.pv_array_id AND apa.site_id = fpr.site_id WHERE fpr.site_id = $2 AND fpr.status = 'ok' ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC ) latest_per_array GROUP BY interval_start ) SELECT pi.*, ai.actual_pv_power_w AS pv_power_w, fs.pv_forecast_total_w AS pv_forecast_total_w FROM ems.planning_interval pi LEFT JOIN ems.audit_interval ai ON ai.site_id = $2 AND ai.interval_start = pi.interval_start LEFT JOIN fc_slot fs ON fs.interval_start = pi.interval_start WHERE pi.run_id = $1 ORDER BY pi.interval_start """, run_id, site_id, ) battery_usable_wh = await conn.fetchval( """ SELECT COALESCE(SUM(ab.usable_capacity_wh), 0)::float FROM ems.asset_battery ab WHERE ab.site_id = $1 """, site_id, ) intervals_raw = [record_to_dict(r) for r in int_rows] summary = _build_summary(intervals_raw) summary["pv_scarcity_factor"] = _pv_scarcity_factor_from_intervals( intervals_raw, float(battery_usable_wh or 0.0) ) intervals = [PlanningIntervalDto.model_validate(d) for d in intervals_raw] return CurrentPlanResponseModel( run=record_to_dict(run_row), intervals=intervals, summary=summary, ) @router.post("/run", response_model=RunPlanResponse) async def post_run_plan( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], plan_type: Literal["daily", "rolling"] = Query("rolling", alias="type"), ) -> RunPlanResponse: async with pool.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") days_with_prices = await conn.fetchval( """ SELECT COUNT(DISTINCT interval_start::date)::int AS days_with_prices FROM ems.market_interval_price WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') AND interval_start >= now() AND interval_start < now() + INTERVAL '48 hours' """ ) if (days_with_prices or 0) < 1: raise HTTPException( status_code=422, detail="Nejsou dostupné tržní ceny", ) try: run_id, solver_duration_ms = await run_plan_api( site_id, plan_type, conn, triggered_by="api" ) # Nový active run aplikuj hned; nečekej na periodický control_export job. await export_setpoints(site_id, conn) row = await conn.fetchrow( """ SELECT horizon_start, horizon_end FROM ems.planning_run WHERE id = $1 """, run_id, ) except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e except RuntimeError as e: raise HTTPException(status_code=422, detail=str(e)) from e except Exception as e: logger.error("Plan run failed: %s", e, exc_info=True) raise HTTPException(status_code=422, detail=str(e)) from e if row is None: raise HTTPException(status_code=500, detail="Planning run row missing after insert") return RunPlanResponse( run_id=run_id, solver_duration_ms=solver_duration_ms, horizon_start=row["horizon_start"], horizon_end=row["horizon_end"], )