Files
ems/backend/app/routers/energy_flows.py
Dusan Vojacek 93f883f5e0
Some checks failed
CI and deploy / migration-check (push) Successful in 5s
CI and deploy / deploy (push) Failing after 20s
sql first refactor
2026-04-19 20:02:20 +02:00

193 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""REST API analýza energetických toků (modelované toky z audit_interval)."""
from __future__ import annotations
import json
from datetime import date
from typing import Annotated, Any
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from app.db_json import fetch_json
from app.deps import get_pg_pool
router = APIRouter(
prefix="/sites/{site_id}/energy-flows",
tags=["energy-flows"],
)
class DailyEnergyFlows(BaseModel):
day: date
interval_count: int
pv_production_kwh: float
grid_import_kwh: float
grid_export_kwh: float
batt_charge_kwh: float
batt_discharge_kwh: float
load_kwh: float
pv_to_load_kwh: float
pv_to_batt_kwh: float
pv_to_grid_kwh: float
batt_to_load_kwh: float
batt_to_grid_kwh: float
grid_to_load_kwh: float
grid_to_batt_kwh: float
grid_import_cashflow_czk: float
grid_export_revenue_czk: float
grid_to_load_cost_czk: float
grid_to_batt_cost_czk: float
class DailyEnergyFlowsResponse(BaseModel):
days: list[DailyEnergyFlows]
class IntervalEnergyFlows(BaseModel):
interval_start: str
pv_production_kwh: float | None
grid_import_kwh: float | None
grid_export_kwh: float | None
batt_charge_kwh: float | None
batt_discharge_kwh: float | None
load_kwh: float | None
pv_to_load_kwh: float | None
pv_to_batt_kwh: float | None
pv_to_grid_kwh: float | None
batt_to_load_kwh: float | None
batt_to_grid_kwh: float | None
grid_to_load_kwh: float | None
grid_to_batt_kwh: float | None
def _num(val: Any) -> float:
if val is None:
return 0.0
return float(val)
async def _check_site(conn: asyncpg.Connection, site_id: int) -> None:
ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not ok:
raise HTTPException(status_code=404, detail="Site not found")
def _parse_day(val: Any) -> date:
from datetime import datetime as _dt
if isinstance(val, _dt):
return val.date()
if isinstance(val, date):
return val
if isinstance(val, str):
return date.fromisoformat(val[:10])
raise ValueError(val)
@router.get("/daily", response_model=DailyEnergyFlowsResponse)
async def get_energy_flows_daily(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
month: str = Query(
...,
description="YYYY-MM",
pattern=r"^\d{4}-\d{2}$",
),
) -> DailyEnergyFlowsResponse:
try:
year, mon = month.split("-")
month_start = date(int(year), int(mon), 1)
if int(mon) == 12:
month_end = date(int(year) + 1, 1, 1)
else:
month_end = date(int(year), int(mon) + 1, 1)
except (ValueError, IndexError):
raise HTTPException(status_code=400, detail="Invalid month, expected YYYY-MM")
async with db.acquire() as conn:
await _check_site(conn, site_id)
raw = await fetch_json(
conn,
"select ems.fn_energy_flows_daily_month($1::int, $2::date, $3::date)",
site_id,
month_start,
month_end,
)
if not isinstance(raw, dict):
raw = json.loads(raw)
rows = raw.get("days") or []
days: list[DailyEnergyFlows] = []
for r in rows:
if not isinstance(r, dict):
continue
days.append(
DailyEnergyFlows(
day=_parse_day(r.get("day")),
interval_count=int(r.get("interval_count") or 0),
pv_production_kwh=_num(r.get("pv_production_kwh")),
grid_import_kwh=_num(r.get("grid_import_kwh")),
grid_export_kwh=_num(r.get("grid_export_kwh")),
batt_charge_kwh=_num(r.get("batt_charge_kwh")),
batt_discharge_kwh=_num(r.get("batt_discharge_kwh")),
load_kwh=_num(r.get("load_kwh")),
pv_to_load_kwh=_num(r.get("pv_to_load_kwh")),
pv_to_batt_kwh=_num(r.get("pv_to_batt_kwh")),
pv_to_grid_kwh=_num(r.get("pv_to_grid_kwh")),
batt_to_load_kwh=_num(r.get("batt_to_load_kwh")),
batt_to_grid_kwh=_num(r.get("batt_to_grid_kwh")),
grid_to_load_kwh=_num(r.get("grid_to_load_kwh")),
grid_to_batt_kwh=_num(r.get("grid_to_batt_kwh")),
grid_import_cashflow_czk=_num(r.get("grid_import_cashflow_czk")),
grid_export_revenue_czk=_num(r.get("grid_export_revenue_czk")),
grid_to_load_cost_czk=_num(r.get("grid_to_load_cost_czk")),
grid_to_batt_cost_czk=_num(r.get("grid_to_batt_cost_czk")),
)
)
return DailyEnergyFlowsResponse(days=days)
@router.get("/daily/{day}/intervals", response_model=list[IntervalEnergyFlows])
async def get_energy_flows_intervals(
site_id: int,
day: date,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> list[IntervalEnergyFlows]:
async with db.acquire() as conn:
await _check_site(conn, site_id)
rows = await fetch_json(
conn,
"select ems.fn_energy_flows_intervals_day($1::int, $2::date)",
site_id,
day,
)
if not isinstance(rows, list):
rows = json.loads(rows) if isinstance(rows, str) else []
out: list[IntervalEnergyFlows] = []
for r in rows:
if not isinstance(r, dict):
continue
ist = r.get("interval_start")
out.append(
IntervalEnergyFlows(
interval_start=ist if isinstance(ist, str) else str(ist),
pv_production_kwh=r.get("pv_production_kwh"),
grid_import_kwh=r.get("grid_import_kwh"),
grid_export_kwh=r.get("grid_export_kwh"),
batt_charge_kwh=r.get("batt_charge_kwh"),
batt_discharge_kwh=r.get("batt_discharge_kwh"),
load_kwh=r.get("load_kwh"),
pv_to_load_kwh=r.get("pv_to_load_kwh"),
pv_to_batt_kwh=r.get("pv_to_batt_kwh"),
pv_to_grid_kwh=r.get("pv_to_grid_kwh"),
batt_to_load_kwh=r.get("batt_to_load_kwh"),
batt_to_grid_kwh=r.get("batt_to_grid_kwh"),
grid_to_load_kwh=r.get("grid_to_load_kwh"),
grid_to_batt_kwh=r.get("grid_to_batt_kwh"),
)
)
return out