Files
ems/backend/app/routers/site_configuration.py
Dusan Vojacek 1dfab8c7a1
Some checks failed
CI and deploy / migration-check (push) Failing after 17s
CI and deploy / deploy (push) Has been skipped
dalsi uprava vypoctu delty (ignorujeme orezane vyroby)
2026-04-22 22:42:12 +02:00

206 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""GET /sites/{site_id}/configuration read-only souhrn konfigurace lokality."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Annotated, Any
import asyncpg
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, ConfigDict, Field
from app.db_json import fetch_json
from app.deps import get_pg_pool
router = APIRouter(prefix="/sites/{site_id}", tags=["sites"])
class PvForecastCalibrationPatch(BaseModel):
"""Částečná úprava `ems.site_pv_forecast_calibration`. Vynechané klíče = beze změny."""
model_config = ConfigDict(extra="forbid")
delta_learn_min_ts: datetime | None = None
pv_curtailment_policy_effective_from: datetime | None = None
top_n_days: int | None = Field(default=None, ge=0, le=31)
non_top_day_factor: float | None = Field(default=None, ge=0, le=1)
day_weight_gamma: float | None = Field(default=None, ge=0.25, le=8)
half_life_days: float | None = Field(default=None, ge=1, le=90)
threshold_w: int | None = Field(default=None, ge=0, le=10_000)
class InverterModbusCurrentCapsBody(BaseModel):
"""Tvrdý strop proudu pro zápis Deye reg 108/109 (A); NULL ve JSONu = smaž strop v DB."""
deye_register_max_charge_a: int | None = Field(
default=None,
ge=0,
le=640,
description="None při vynechání klíče = nezměnit; explicitní null = smazat strop",
)
deye_register_max_discharge_a: int | None = Field(
default=None,
ge=0,
le=640,
description="Jako u nabíjení",
)
def _iso_utc_from_cfg(val: Any) -> str | None:
if val is None:
return None
if isinstance(val, str):
return val
if isinstance(val, datetime):
dt = val
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
return str(val)
@router.get("/configuration")
async def get_site_configuration(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
async with pool.acquire() as conn:
raw = await fetch_json(
conn,
"select ems.fn_site_configuration($1::int)",
site_id,
)
if raw is None:
raise HTTPException(status_code=404, detail="Site not found")
if not isinstance(raw, dict):
raw = json.loads(raw)
op = raw.get("operational")
if isinstance(op, dict):
op = dict(op)
op["heartbeat_last_seen"] = _iso_utc_from_cfg(op.get("heartbeat_last_seen"))
op["active_plan_created_at"] = _iso_utc_from_cfg(op.get("active_plan_created_at"))
raw["operational"] = op
lat = raw.get("site", {}).get("latitude") if isinstance(raw.get("site"), dict) else None
lon = raw.get("site", {}).get("longitude") if isinstance(raw.get("site"), dict) else None
if isinstance(raw.get("site"), dict):
site = dict(raw["site"])
site["latitude"] = float(lat) if lat is not None else None
site["longitude"] = float(lon) if lon is not None else None
raw["site"] = site
return raw
@router.patch("/configuration/pv-forecast-calibration")
async def patch_pv_forecast_calibration(
site_id: int,
body: PvForecastCalibrationPatch,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
"""Aktualizace kalibrace PV delty (`ems.site_pv_forecast_calibration`)."""
updates = body.model_dump(exclude_unset=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
if updates.get("delta_learn_min_ts") is None and "delta_learn_min_ts" in updates:
raise HTTPException(
status_code=422,
detail="delta_learn_min_ts cannot be null (column is NOT NULL)",
)
allowed = {
"delta_learn_min_ts",
"pv_curtailment_policy_effective_from",
"top_n_days",
"non_top_day_factor",
"day_weight_gamma",
"half_life_days",
"threshold_w",
}
bad = set(updates) - allowed
if bad:
raise HTTPException(status_code=400, detail=f"Unknown fields: {sorted(bad)}")
cols = list(updates.keys())
set_parts: list[str] = []
args: list[Any] = [site_id]
for i, col in enumerate(cols, start=2):
set_parts.append(f"{col} = ${i}")
args.append(updates[col])
set_sql = ", ".join(set_parts) + ", updated_at = now()"
async with pool.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
n = await conn.execute(
f"""
UPDATE ems.site_pv_forecast_calibration
SET {set_sql}
WHERE site_id = $1
""",
*args,
)
if n == "UPDATE 0":
raise HTTPException(
status_code=404,
detail="PV forecast calibration row missing; run migration V057",
)
row = await conn.fetchrow(
"""
SELECT to_jsonb(c.*) AS j
FROM ems.site_pv_forecast_calibration c
WHERE c.site_id = $1
""",
site_id,
)
raw = row["j"] if row else {}
if not isinstance(raw, dict):
raw = json.loads(raw)
return raw
@router.patch("/inverters/{inverter_id}/modbus-current-caps")
async def patch_inverter_modbus_current_caps(
site_id: int,
inverter_id: int,
body: InverterModbusCurrentCapsBody,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
"""
Nastavení `deye_register_max_charge_a` / `deye_register_max_discharge_a` na `ems.asset_inverter`.
"""
updates = body.model_dump(exclude_unset=True)
if not updates:
raise HTTPException(
status_code=400,
detail="Send at least one of: deye_register_max_charge_a, deye_register_max_discharge_a",
)
patch: dict[str, Any] = {}
if "deye_register_max_charge_a" in updates:
patch["deye_register_max_charge_a"] = updates["deye_register_max_charge_a"]
if "deye_register_max_discharge_a" in updates:
patch["deye_register_max_discharge_a"] = updates["deye_register_max_discharge_a"]
async with pool.acquire() as conn:
raw = await fetch_json(
conn,
"select ems.fn_inverter_modbus_caps_patch($1::int, $2::int, $3::jsonb)",
site_id,
inverter_id,
json.dumps(patch),
)
if not isinstance(raw, dict):
raw = json.loads(raw)
if not raw.get("ok"):
if raw.get("error") == "not_found":
raise HTTPException(status_code=404, detail="Inverter not found for this site")
raise HTTPException(status_code=400, detail=raw.get("error", "patch_failed"))
return {
"inverter_id": int(raw["inverter_id"]),
"code": raw["code"],
"deye_register_max_charge_a": raw.get("deye_register_max_charge_a"),
"deye_register_max_discharge_a": raw.get("deye_register_max_discharge_a"),
}