Files
ems/backend/app/routers/economics.py
Dusan Vojacek 93f883f5e0
Some checks failed
CI and deploy / migration-check (push) Successful in 5s
CI and deploy / deploy (push) Failing after 20s
sql first refactor
2026-04-19 20:02:20 +02:00

321 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""REST API denní ekonomické vyhodnocení provozu."""
from __future__ import annotations
import json
import logging
from datetime import date, datetime
from typing import Annotated, Any
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from app.db_json import fetch_json
from app.deps import get_pg_pool
router = APIRouter(
prefix="/sites/{site_id}/economics",
tags=["economics"],
)
logger = logging.getLogger(__name__)
class DailyEconomics(BaseModel):
day: date
interval_count: int
import_kwh: float
export_kwh: float
pv_kwh: float
load_kwh: float
pv_self_consumption_kwh: float
ev_kwh: float
hp_kwh: float
import_cost_czk: float
export_revenue_czk: float
grid_import_cashflow_czk: float
grid_export_revenue_czk: float
net_cost_czk: float
green_bonus_czk: float
total_balance_czk: float
planned_balance_czk: float | None
deviation_cost_czk: float | None
is_locked: bool
class DailyEconomicsResponse(BaseModel):
days: list[DailyEconomics]
has_green_bonus: bool
class IntervalEconomics(BaseModel):
interval_start: str
import_kwh: float
export_kwh: float
dynamic_cost_czk: float | None
grid_import_cashflow_czk: float | None
grid_export_revenue_czk: float | None
stored_cost_czk: float | None
green_bonus_czk: float | None
planned_cost_czk: float | None
planned_grid_w: int | None
actual_grid_power_w: int | None
effective_buy_price: float | None
effective_sell_price: float | None
planned_buy_price: float | None
planned_sell_price: float | None
actual_pv_power_w: int | None
actual_load_power_w: int | None
actual_battery_power_w: int | None
actual_battery_soc_pct: float | None
class ChartDayPoint(BaseModel):
day: date
daily_balance_czk: float
daily_grid_balance_czk: float
daily_green_bonus_czk: float
daily_import_cost_czk: float
daily_export_revenue_czk: float
cumulative_balance_czk: float
cumulative_grid_balance_czk: float
class LockResponse(BaseModel):
locked: bool
day: date
def _num(val: Any) -> float:
if val is None:
return 0.0
return float(val)
def _opt(val: Any) -> float | None:
if val is None:
return None
return float(val)
async def _check_site(conn: asyncpg.Connection, site_id: int) -> None:
ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not ok:
raise HTTPException(status_code=404, detail="Site not found")
def _parse_day(val: Any) -> date:
if isinstance(val, datetime):
return val.date()
if isinstance(val, date):
return val
if isinstance(val, str):
return date.fromisoformat(val[:10])
raise ValueError(val)
@router.get("/daily", response_model=DailyEconomicsResponse)
async def get_economics_daily(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
month: str = Query(
...,
description="YYYY-MM (měsíc pro zobrazení)",
pattern=r"^\d{4}-\d{2}$",
),
) -> DailyEconomicsResponse:
try:
year, mon = month.split("-")
month_start = date(int(year), int(mon), 1)
if int(mon) == 12:
month_end = date(int(year) + 1, 1, 1)
else:
month_end = date(int(year), int(mon) + 1, 1)
except (ValueError, IndexError):
raise HTTPException(status_code=400, detail="Invalid month, expected YYYY-MM")
async with db.acquire() as conn:
await _check_site(conn, site_id)
raw = await fetch_json(
conn,
"select ems.fn_economics_daily_month($1::int, $2::date, $3::date)",
site_id,
month_start,
month_end,
)
if not isinstance(raw, dict):
raw = json.loads(raw)
days_in: list[Any] = list(raw.get("days") or [])
days: list[DailyEconomics] = []
for d in days_in:
if not isinstance(d, dict):
continue
days.append(
DailyEconomics(
day=_parse_day(d.get("day")),
interval_count=int(d.get("interval_count") or 0),
import_kwh=_num(d.get("import_kwh")),
export_kwh=_num(d.get("export_kwh")),
pv_kwh=_num(d.get("pv_kwh")),
load_kwh=_num(d.get("load_kwh")),
pv_self_consumption_kwh=_num(d.get("pv_self_consumption_kwh")),
ev_kwh=_num(d.get("ev_kwh")),
hp_kwh=_num(d.get("hp_kwh")),
import_cost_czk=_num(d.get("import_cost_czk")),
export_revenue_czk=_num(d.get("export_revenue_czk")),
grid_import_cashflow_czk=_num(d.get("grid_import_cashflow_czk")),
grid_export_revenue_czk=_num(d.get("grid_export_revenue_czk")),
net_cost_czk=_num(d.get("net_cost_czk")),
green_bonus_czk=_num(d.get("green_bonus_czk")),
total_balance_czk=_num(d.get("total_balance_czk")),
planned_balance_czk=_opt(d.get("planned_balance_czk")),
deviation_cost_czk=_opt(d.get("deviation_cost_czk")),
is_locked=bool(d.get("is_locked")),
)
)
return DailyEconomicsResponse(
days=days,
has_green_bonus=bool(raw.get("has_green_bonus")),
)
@router.get("/daily/{day}/intervals", response_model=list[IntervalEconomics])
async def get_economics_intervals(
site_id: int,
day: date,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> list[IntervalEconomics]:
async with db.acquire() as conn:
await _check_site(conn, site_id)
rows = await conn.fetch(
"""
SELECT *
FROM ems.vw_economics_interval
WHERE site_id = $1
AND date_trunc('day', interval_start AT TIME ZONE 'Europe/Prague')::date = $2
ORDER BY interval_start
""",
site_id,
day,
)
return [
IntervalEconomics(
interval_start=r["interval_start"].isoformat(),
import_kwh=_num(r["import_kwh"]),
export_kwh=_num(r["export_kwh"]),
dynamic_cost_czk=_opt(r["dynamic_cost_czk"]),
grid_import_cashflow_czk=_opt(r["grid_import_cashflow_czk"]),
grid_export_revenue_czk=_opt(r["grid_export_revenue_czk"]),
stored_cost_czk=_opt(r["stored_cost_czk"]),
green_bonus_czk=_opt(r["green_bonus_czk"]),
planned_cost_czk=_opt(r["planned_cost_czk"]),
planned_grid_w=int(r["planned_grid_w"]) if r["planned_grid_w"] is not None else None,
actual_grid_power_w=int(r["actual_grid_power_w"]) if r["actual_grid_power_w"] is not None else None,
effective_buy_price=_opt(r["effective_buy_price_czk_kwh"]),
effective_sell_price=_opt(r["effective_sell_price_czk_kwh"]),
planned_buy_price=_opt(r["planned_buy_price"]),
planned_sell_price=_opt(r["planned_sell_price"]),
actual_pv_power_w=int(r["actual_pv_power_w"]) if r["actual_pv_power_w"] is not None else None,
actual_load_power_w=int(r["actual_load_power_w"]) if r["actual_load_power_w"] is not None else None,
actual_battery_power_w=int(r["actual_battery_power_w"]) if r["actual_battery_power_w"] is not None else None,
actual_battery_soc_pct=_opt(r["actual_battery_soc_pct"]),
)
for r in rows
]
@router.post("/daily/{day}/lock", response_model=LockResponse)
async def lock_day(
site_id: int,
day: date,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> LockResponse:
async with db.acquire() as conn:
await _check_site(conn, site_id)
raw = await fetch_json(
conn,
"select ems.fn_economics_lock_day($1::int, $2::date)",
site_id,
day,
)
if not isinstance(raw, dict):
raw = json.loads(raw)
if raw.get("locked") is not True:
raise HTTPException(
status_code=404,
detail=f"No economics data for {day.isoformat()}",
)
return LockResponse(locked=True, day=day)
@router.delete("/daily/{day}/lock", response_model=LockResponse)
async def unlock_day(
site_id: int,
day: date,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> LockResponse:
async with db.acquire() as conn:
await _check_site(conn, site_id)
await fetch_json(
conn,
"select ems.fn_economics_unlock_day($1::int, $2::date)",
site_id,
day,
)
return LockResponse(locked=False, day=day)
@router.get("/monthly-chart", response_model=list[ChartDayPoint])
async def get_monthly_chart(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
month: str = Query(
...,
description="YYYY-MM",
pattern=r"^\d{4}-\d{2}$",
),
) -> list[ChartDayPoint]:
try:
year, mon = month.split("-")
month_start = date(int(year), int(mon), 1)
if int(mon) == 12:
month_end = date(int(year) + 1, 1, 1)
else:
month_end = date(int(year), int(mon) + 1, 1)
except (ValueError, IndexError):
raise HTTPException(status_code=400, detail="Invalid month, expected YYYY-MM")
async with db.acquire() as conn:
await _check_site(conn, site_id)
arr = await fetch_json(
conn,
"select ems.fn_economics_monthly_chart($1::int, $2::date, $3::date)",
site_id,
month_start,
month_end,
)
if not isinstance(arr, list):
arr = json.loads(arr) if isinstance(arr, str) else []
points: list[ChartDayPoint] = []
for r in arr:
if not isinstance(r, dict):
continue
points.append(
ChartDayPoint(
day=_parse_day(r.get("day")),
daily_balance_czk=float(r.get("daily_balance_czk") or 0),
daily_grid_balance_czk=float(r.get("daily_grid_balance_czk") or 0),
daily_green_bonus_czk=float(r.get("daily_green_bonus_czk") or 0),
daily_import_cost_czk=float(r.get("daily_import_cost_czk") or 0),
daily_export_revenue_czk=float(r.get("daily_export_revenue_czk") or 0),
cumulative_balance_czk=float(r.get("cumulative_balance_czk") or 0),
cumulative_grid_balance_czk=float(r.get("cumulative_grid_balance_czk") or 0),
)
)
return points