"""GET /sites/{site_id}/configuration – read-only souhrn konfigurace lokality.""" from __future__ import annotations from datetime import datetime, timezone from typing import Annotated, Any import asyncpg from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from app.db_json import record_to_dict from app.deps import get_pg_pool router = APIRouter(prefix="/sites/{site_id}", tags=["sites"]) class InverterModbusCurrentCapsBody(BaseModel): """Tvrdý strop proudu pro zápis Deye reg 108/109 (A); NULL ve JSONu = smaž strop v DB.""" deye_register_max_charge_a: int | None = Field( default=None, ge=0, le=640, description="None při vynechání klíče = nezměnit; explicitní null = smazat strop" ) deye_register_max_discharge_a: int | None = Field( default=None, ge=0, le=640, description="Jako u nabíjení" ) _DEYE_KEYS = frozenset( { "deye_last_system_time_sync_at", "deye_last_system_time_sync_minute", "deye_last_tou_inactive_write_prague_date", "deye_tou_inactive_signature", } ) def _mask_secret_reference(raw: str | None) -> str | None: if raw is None: return None s = str(raw).strip() if not s: return None if len(s) <= 4: return "nastaveno" return f"…{s[-2:]}" def _iso_utc(dt: datetime | None) -> str | None: if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).isoformat() @router.get("/configuration") async def get_site_configuration( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: async with pool.acquire() as conn: site_row = await conn.fetchrow( """ SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at FROM ems.site WHERE id = $1 """, site_id, ) if site_row is None: raise HTTPException(status_code=404, detail="Site not found") grid_row = await conn.fetchrow( "SELECT * FROM ems.site_grid_connection WHERE site_id = $1", site_id, ) market_row = await conn.fetchrow( """ SELECT * FROM ems.site_market_config WHERE site_id = $1 AND valid_from <= now() AND (valid_to IS NULL OR valid_to > now()) ORDER BY valid_from DESC LIMIT 1 """, site_id, ) endpoint_rows = await conn.fetch( """ SELECT id, site_id, endpoint_type, host, port, protocol, unit_id, auth_reference, enabled, notes FROM ems.site_endpoint WHERE site_id = $1 ORDER BY id """, site_id, ) endpoints: list[dict[str, Any]] = [] for er in endpoint_rows: d = record_to_dict(er) d["auth_reference"] = _mask_secret_reference(er["auth_reference"]) endpoints.append(d) inv_rows = await conn.fetch( """ SELECT ai.*, (SELECT ep.host || CASE WHEN ep.port IS NOT NULL THEN ':' || ep.port::text ELSE '' END FROM ems.site_endpoint ep WHERE ep.id = ai.endpoint_id) AS endpoint_connection FROM ems.asset_inverter ai WHERE ai.site_id = $1 ORDER BY ai.id """, site_id, ) inverters: list[dict[str, Any]] = [] for ir in inv_rows: full = record_to_dict(ir) ep_label = full.pop("endpoint_connection", None) core = {k: v for k, v in full.items() if k not in _DEYE_KEYS} deye_meta = {k: full[k] for k in _DEYE_KEYS if full.get(k) is not None} core["endpoint_connection"] = ep_label core["deye_meta"] = deye_meta if deye_meta else None inverters.append(core) bat_rows = await conn.fetch( "SELECT * FROM ems.asset_battery WHERE site_id = $1 ORDER BY id", site_id, ) pv_rows = await conn.fetch( "SELECT * FROM ems.asset_pv_array WHERE site_id = $1 ORDER BY id", site_id, ) ev_rows = await conn.fetch( """ SELECT ec.*, se.host || CASE WHEN se.port IS NOT NULL THEN ':' || se.port::text ELSE '' END AS endpoint_connection FROM ems.asset_ev_charger ec LEFT JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 ORDER BY ec.id """, site_id, ) ev_chargers = [record_to_dict(r) for r in ev_rows] veh_rows = await conn.fetch( """ SELECT id, site_id, code, name, make, model, battery_capacity_kwh, max_charge_power_w, default_charger_id, api_type, api_reference, default_target_soc_pct, default_deadline_hour, active FROM ems.asset_vehicle WHERE site_id = $1 ORDER BY code """, site_id, ) vehicles: list[dict[str, Any]] = [] for vr in veh_rows: d = record_to_dict(vr) d["api_reference"] = _mask_secret_reference(vr["api_reference"]) vehicles.append(d) hp_rows = await conn.fetch( """ SELECT hp.*, se.host || CASE WHEN se.port IS NOT NULL THEN ':' || se.port::text ELSE '' END AS endpoint_connection FROM ems.asset_heat_pump hp LEFT JOIN ems.site_endpoint se ON se.id = hp.endpoint_id WHERE hp.site_id = $1 ORDER BY hp.id """, site_id, ) heat_pumps = [record_to_dict(r) for r in hp_rows] mode_row = await conn.fetchrow( """ SELECT m.mode_code, m.activated_at, m.activated_by, m.valid_until, m.previous_mode, m.notes, d.name AS mode_name, d.description AS mode_description, d.loxone_mode_value, d.ev_enabled, d.heat_pump_enabled, d.battery_mode, d.grid_mode, d.is_autonomous FROM ems.site_operating_mode m JOIN ems.operating_mode_def d ON d.code = m.mode_code WHERE m.site_id = $1 """, site_id, ) override_rows = await conn.fetch( """ SELECT id, override_type, value_json, valid_from, valid_to, reason, created_by, created_at FROM ems.site_override WHERE site_id = $1 AND valid_from <= now() AND (valid_to IS NULL OR valid_to > now()) ORDER BY valid_from DESC LIMIT 50 """, site_id, ) hb_row = await conn.fetchrow( "SELECT last_seen, status FROM ems.site_heartbeat WHERE site_id = $1", site_id, ) run_row = await conn.fetchrow( """ SELECT id, created_at FROM ems.planning_run WHERE site_id = $1 AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, site_id, ) site = record_to_dict(site_row) lat = site_row["latitude"] lon = site_row["longitude"] site["latitude"] = float(lat) if lat is not None else None site["longitude"] = float(lon) if lon is not None else None operating_mode = record_to_dict(mode_row) if mode_row else None return { "site": site, "grid_connection": record_to_dict(grid_row) if grid_row else None, "market_config": record_to_dict(market_row) if market_row else None, "market_config_note": ( "Zelený bonus za výrobu je u FVE polí (asset_pv_array), ne v obchodní konfiguraci." ), "endpoints": endpoints, "inverters": inverters, "batteries": [record_to_dict(r) for r in bat_rows], "pv_arrays": [record_to_dict(r) for r in pv_rows], "ev_chargers": ev_chargers, "vehicles": vehicles, "heat_pumps": heat_pumps, "operating_mode": operating_mode, "active_overrides": [record_to_dict(r) for r in override_rows], "operational": { "heartbeat_last_seen": _iso_utc(hb_row["last_seen"]) if hb_row else None, "heartbeat_status": hb_row["status"] if hb_row else None, "has_active_plan": run_row is not None, "active_plan_created_at": _iso_utc(run_row["created_at"]) if run_row else None, }, } @router.patch("/inverters/{inverter_id}/modbus-current-caps") async def patch_inverter_modbus_current_caps( site_id: int, inverter_id: int, body: InverterModbusCurrentCapsBody, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: """ Nastavení `deye_register_max_charge_a` / `deye_register_max_discharge_a` na `ems.asset_inverter`. Hodnoty se uplatní v dotazu `_load_inverter_config` jako `COALESCE(strop_A, FLOOR(…z_kW))` pro reg 108/109. """ updates = body.model_dump(exclude_unset=True) if not updates: raise HTTPException( status_code=400, detail="Send at least one of: deye_register_max_charge_a, deye_register_max_discharge_a", ) async with pool.acquire() as conn: owner = await conn.fetchval( """ SELECT id FROM ems.asset_inverter WHERE id = $1 AND site_id = $2 """, inverter_id, site_id, ) if owner is None: raise HTTPException(status_code=404, detail="Inverter not found for this site") sets: list[str] = [] args: list[Any] = [] n = 1 if "deye_register_max_charge_a" in updates: sets.append(f"deye_register_max_charge_a = ${n}") args.append(updates["deye_register_max_charge_a"]) n += 1 if "deye_register_max_discharge_a" in updates: sets.append(f"deye_register_max_discharge_a = ${n}") args.append(updates["deye_register_max_discharge_a"]) n += 1 args.extend([inverter_id, site_id]) await conn.execute( f""" UPDATE ems.asset_inverter SET {", ".join(sets)} WHERE id = ${n} AND site_id = ${n + 1} """, *args, ) row = await conn.fetchrow( """ SELECT id, code, deye_register_max_charge_a, deye_register_max_discharge_a FROM ems.asset_inverter WHERE id = $1 AND site_id = $2 """, inverter_id, site_id, ) assert row is not None return { "inverter_id": int(row["id"]), "code": row["code"], "deye_register_max_charge_a": row["deye_register_max_charge_a"], "deye_register_max_discharge_a": row["deye_register_max_discharge_a"], }