"""REST API – analýza energetických toků (modelované toky z audit_interval).""" from __future__ import annotations import json from datetime import date from typing import Annotated, Any import asyncpg from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from app.db_json import fetch_json from app.deps import get_pg_pool router = APIRouter( prefix="/sites/{site_id}/energy-flows", tags=["energy-flows"], ) class DailyEnergyFlows(BaseModel): day: date interval_count: int pv_production_kwh: float grid_import_kwh: float grid_export_kwh: float batt_charge_kwh: float batt_discharge_kwh: float load_kwh: float pv_to_load_kwh: float pv_to_batt_kwh: float pv_to_grid_kwh: float batt_to_load_kwh: float batt_to_grid_kwh: float grid_to_load_kwh: float grid_to_batt_kwh: float grid_import_cashflow_czk: float grid_export_revenue_czk: float grid_to_load_cost_czk: float grid_to_batt_cost_czk: float class DailyEnergyFlowsResponse(BaseModel): days: list[DailyEnergyFlows] class IntervalEnergyFlows(BaseModel): interval_start: str pv_production_kwh: float | None grid_import_kwh: float | None grid_export_kwh: float | None batt_charge_kwh: float | None batt_discharge_kwh: float | None load_kwh: float | None pv_to_load_kwh: float | None pv_to_batt_kwh: float | None pv_to_grid_kwh: float | None batt_to_load_kwh: float | None batt_to_grid_kwh: float | None grid_to_load_kwh: float | None grid_to_batt_kwh: float | None def _num(val: Any) -> float: if val is None: return 0.0 return float(val) async def _check_site(conn: asyncpg.Connection, site_id: int) -> None: ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not ok: raise HTTPException(status_code=404, detail="Site not found") def _parse_day(val: Any) -> date: from datetime import datetime as _dt if isinstance(val, _dt): return val.date() if isinstance(val, date): return val if isinstance(val, str): return date.fromisoformat(val[:10]) raise ValueError(val) @router.get("/daily", response_model=DailyEnergyFlowsResponse) async def get_energy_flows_daily( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], month: str = Query( ..., description="YYYY-MM", pattern=r"^\d{4}-\d{2}$", ), ) -> DailyEnergyFlowsResponse: try: year, mon = month.split("-") month_start = date(int(year), int(mon), 1) if int(mon) == 12: month_end = date(int(year) + 1, 1, 1) else: month_end = date(int(year), int(mon) + 1, 1) except (ValueError, IndexError): raise HTTPException(status_code=400, detail="Invalid month, expected YYYY-MM") async with db.acquire() as conn: await _check_site(conn, site_id) raw = await fetch_json( conn, "select ems.fn_energy_flows_daily_month($1::int, $2::date, $3::date)", site_id, month_start, month_end, ) if not isinstance(raw, dict): raw = json.loads(raw) rows = raw.get("days") or [] days: list[DailyEnergyFlows] = [] for r in rows: if not isinstance(r, dict): continue days.append( DailyEnergyFlows( day=_parse_day(r.get("day")), interval_count=int(r.get("interval_count") or 0), pv_production_kwh=_num(r.get("pv_production_kwh")), grid_import_kwh=_num(r.get("grid_import_kwh")), grid_export_kwh=_num(r.get("grid_export_kwh")), batt_charge_kwh=_num(r.get("batt_charge_kwh")), batt_discharge_kwh=_num(r.get("batt_discharge_kwh")), load_kwh=_num(r.get("load_kwh")), pv_to_load_kwh=_num(r.get("pv_to_load_kwh")), pv_to_batt_kwh=_num(r.get("pv_to_batt_kwh")), pv_to_grid_kwh=_num(r.get("pv_to_grid_kwh")), batt_to_load_kwh=_num(r.get("batt_to_load_kwh")), batt_to_grid_kwh=_num(r.get("batt_to_grid_kwh")), grid_to_load_kwh=_num(r.get("grid_to_load_kwh")), grid_to_batt_kwh=_num(r.get("grid_to_batt_kwh")), grid_import_cashflow_czk=_num(r.get("grid_import_cashflow_czk")), grid_export_revenue_czk=_num(r.get("grid_export_revenue_czk")), grid_to_load_cost_czk=_num(r.get("grid_to_load_cost_czk")), grid_to_batt_cost_czk=_num(r.get("grid_to_batt_cost_czk")), ) ) return DailyEnergyFlowsResponse(days=days) @router.get("/daily/{day}/intervals", response_model=list[IntervalEnergyFlows]) async def get_energy_flows_intervals( site_id: int, day: date, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> list[IntervalEnergyFlows]: async with db.acquire() as conn: await _check_site(conn, site_id) rows = await fetch_json( conn, "select ems.fn_energy_flows_intervals_day($1::int, $2::date)", site_id, day, ) if not isinstance(rows, list): rows = json.loads(rows) if isinstance(rows, str) else [] out: list[IntervalEnergyFlows] = [] for r in rows: if not isinstance(r, dict): continue ist = r.get("interval_start") out.append( IntervalEnergyFlows( interval_start=ist if isinstance(ist, str) else str(ist), pv_production_kwh=r.get("pv_production_kwh"), grid_import_kwh=r.get("grid_import_kwh"), grid_export_kwh=r.get("grid_export_kwh"), batt_charge_kwh=r.get("batt_charge_kwh"), batt_discharge_kwh=r.get("batt_discharge_kwh"), load_kwh=r.get("load_kwh"), pv_to_load_kwh=r.get("pv_to_load_kwh"), pv_to_batt_kwh=r.get("pv_to_batt_kwh"), pv_to_grid_kwh=r.get("pv_to_grid_kwh"), batt_to_load_kwh=r.get("batt_to_load_kwh"), batt_to_grid_kwh=r.get("batt_to_grid_kwh"), grid_to_load_kwh=r.get("grid_to_load_kwh"), grid_to_batt_kwh=r.get("grid_to_batt_kwh"), ) ) return out