"""GET /sites/{site_id}/status/full – monitoring snapshot + alert pravidla.""" from __future__ import annotations from datetime import date, datetime, timedelta, timezone from typing import Annotated, Any, Literal from zoneinfo import ZoneInfo import asyncpg from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from app.db_json import record_to_dict from app.deps import get_pg_pool from app.notifications_logic import ( EvSessionRow, NegWindowRow, PriceSlot, build_smart_notifications, ) router = APIRouter(prefix="/sites/{site_id}", tags=["sites"]) class SiteNotificationItem(BaseModel): id: str level: Literal["success", "info", "warning", "error"] title: str body: str eta_minutes: int | None = None action: Literal["connect_ev", "replan", "import_prices", "switch_auto"] | None = None class SiteNotificationsResponse(BaseModel): notifications: list[SiteNotificationItem] = Field(default_factory=list) INV_STALE_SEC = 300 HEARTBEAT_STALE_SEC = 300 EXPECTED_TOMORROW_PRICE_SLOTS = 90 def _iso_utc(dt: datetime | None) -> str | None: if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc).isoformat() def _age_seconds(at: datetime | None) -> int | None: if at is None: return None if at.tzinfo is None: at = at.replace(tzinfo=timezone.utc) return max(0, int((datetime.now(timezone.utc) - at).total_seconds())) def _next_plan_interval( intervals: list[dict[str, Any]], now_utc: datetime ) -> tuple[str | None, int | None]: """Nejbližší 15min slot od aktuálního času včetně probíhajícího.""" slot_ms = 15 * 60 * 1000 boundary_ms = (int(now_utc.timestamp() * 1000) // slot_ms) * slot_ms boundary = datetime.fromtimestamp(boundary_ms / 1000, tz=timezone.utc) for row in sorted(intervals, key=lambda r: r["interval_start"]): istart = row["interval_start"] if isinstance(istart, str): istart = datetime.fromisoformat(istart.replace("Z", "+00:00")) if istart.tzinfo is None: istart = istart.replace(tzinfo=timezone.utc) if istart >= boundary - timedelta(milliseconds=1): bat = row.get("battery_setpoint_w") bi = int(bat) if bat is not None else None return _iso_utc(istart), bi return None, None @router.get("/status/full") async def get_site_status_full( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: async with pool.acquire() as conn: site = await conn.fetchrow( """ SELECT id, code, name, timezone FROM ems.site WHERE id = $1 """, site_id, ) if site is None: raise HTTPException(status_code=404, detail="Site not found") tz = site["timezone"] or "Europe/Prague" mode_row = await conn.fetchrow( """ SELECT m.mode_code, d.name AS mode_name, m.activated_at, m.activated_by FROM ems.site_operating_mode m JOIN ems.operating_mode_def d ON d.code = m.mode_code WHERE m.site_id = $1 """, site_id, ) hb_row = await conn.fetchrow( """ SELECT last_seen, status FROM ems.site_heartbeat WHERE site_id = $1 """, site_id, ) inv_row = await conn.fetchrow( """ SELECT pv_power_w, battery_soc_percent, grid_power_w, measured_at FROM ems.vw_latest_inverter WHERE site_id = $1 ORDER BY measured_at DESC NULLS LAST LIMIT 1 """, site_id, ) ev_rows = await conn.fetch( """ SELECT DISTINCT ON (charger_id) charger_code AS code, status, power_w, measured_at FROM ems.vw_latest_ev_charger WHERE site_id = $1 ORDER BY charger_id, measured_at DESC NULLS LAST """, site_id, ) hp_row = await conn.fetchrow( """ SELECT power_w, tuv_tank_temp_c, measured_at FROM ems.vw_latest_heat_pump WHERE site_id = $1 ORDER BY measured_at DESC NULLS LAST LIMIT 1 """, site_id, ) reserve_row = await conn.fetchrow( """ SELECT MIN(reserve_soc_percent)::float AS reserve_soc, MIN(min_soc_percent)::float AS min_soc FROM ems.asset_battery WHERE site_id = $1 """, site_id, ) run_row = await conn.fetchrow( """ SELECT id, created_at FROM ems.planning_run WHERE site_id = $1 AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, site_id, ) intervals: list[dict[str, Any]] = [] if run_row: int_rows = await conn.fetch( """ SELECT interval_start, battery_setpoint_w, load_baseline_w, pv_a_forecast_raw_w, pv_b_forecast_raw_w, pv_a_forecast_solver_w, pv_b_forecast_solver_w FROM ems.planning_interval WHERE run_id = $1 ORDER BY interval_start """, run_row["id"], ) intervals = [record_to_dict(r) for r in int_rows] tomorrow_slots = await conn.fetchval( """ SELECT COUNT(*)::int FROM ems.vw_site_effective_price v WHERE v.site_id = $1 AND (v.interval_start AT TIME ZONE $2)::date = ((CURRENT_TIMESTAMP AT TIME ZONE $2)::date + INTERVAL '1 day')::date """, site_id, tz, ) tomorrow_slots = int(tomorrow_slots or 0) now_utc = datetime.now(timezone.utc) hb_last = hb_row["last_seen"] if hb_row else None hb_age = _age_seconds(hb_last) inv_measured = inv_row["measured_at"] if inv_row else None inv_age = _age_seconds(inv_measured) next_start, next_bat = _next_plan_interval(intervals, now_utc) ev_list: list[dict[str, Any]] = [] for r in ev_rows: ev_list.append( { "code": r["code"], "status": r["status"], "power_w": int(r["power_w"]) if r["power_w"] is not None else None, } ) telemetry: dict[str, Any] = { "inverter": { "pv_power_w": int(inv_row["pv_power_w"]) if inv_row and inv_row["pv_power_w"] is not None else None, "battery_soc_pct": float(inv_row["battery_soc_percent"]) if inv_row and inv_row["battery_soc_percent"] is not None else None, "grid_power_w": int(inv_row["grid_power_w"]) if inv_row and inv_row["grid_power_w"] is not None else None, "measured_at": _iso_utc(inv_measured), "age_seconds": inv_age, }, "ev_chargers": ev_list, "heat_pump": { "power_w": int(hp_row["power_w"]) if hp_row and hp_row["power_w"] is not None else None, "tank_temp_c": float(hp_row["tuv_tank_temp_c"]) if hp_row and hp_row["tuv_tank_temp_c"] is not None else None, "measured_at": _iso_utc(hp_row["measured_at"]) if hp_row else None, }, } has_plan = run_row is not None planning = { "has_active_plan": has_plan, "plan_created_at": _iso_utc(run_row["created_at"]) if run_row else None, "next_interval_start": next_start, "next_battery_setpoint_w": next_bat, } mode_code = (mode_row["mode_code"] if mode_row else None) or "" reserve_soc = float(reserve_row["reserve_soc"]) if reserve_row and reserve_row["reserve_soc"] is not None else None min_soc = float(reserve_row["min_soc"]) if reserve_row and reserve_row["min_soc"] is not None else None soc = float(inv_row["battery_soc_percent"]) if inv_row and inv_row["battery_soc_percent"] is not None else None alerts: list[dict[str, str]] = [] def add_alert(level: Literal["warn", "error"], message: str) -> None: alerts.append({"level": level, "message": message}) if inv_age is None or inv_age > INV_STALE_SEC: add_alert("error", "Telemetrie střídače nedostupná") if not has_plan: add_alert("warn", "Není aktivní plán – EMS neoptimalizuje") # OTE D+1 typicky až po ~14:30 Europe/Prague – před tím nevarovat now_prague = datetime.now(ZoneInfo("Europe/Prague")) prices_expected = (now_prague.hour, now_prague.minute) >= (14, 30) if tomorrow_slots < EXPECTED_TOMORROW_PRICE_SLOTS and prices_expected: add_alert("warn", "Chybí spotové ceny pro zítřek") if mode_code.upper() == "MANUAL": add_alert("warn", "Systém v manuálním režimu") if min_soc is not None and soc is not None and soc < min_soc: add_alert("error", "SoC baterie pod minimálním limitem") elif reserve_soc is not None and soc is not None and soc < reserve_soc: add_alert("warn", "SoC pod ekonomickou rezervou (arbitrážní podlaha)") if hb_age is None or hb_age > HEARTBEAT_STALE_SEC: add_alert("error", "EMS heartbeat výpadek") alerts.sort(key=lambda a: (0 if a["level"] == "error" else 1, a["message"])) return { "site": {"id": site["id"], "code": site["code"], "name": site["name"]}, "operating_mode": { "mode_code": mode_row["mode_code"] if mode_row else None, "mode_name": mode_row["mode_name"] if mode_row else None, "activated_at": _iso_utc(mode_row["activated_at"]) if mode_row else None, "activated_by": mode_row["activated_by"] if mode_row else None, }, "heartbeat": { "last_seen": _iso_utc(hb_last), "age_seconds": hb_age, "status": hb_row["status"] if hb_row else None, }, "telemetry": telemetry, "planning": planning, "alerts": alerts, } _NOTIF_LEVEL_PRIORITY = {"error": 0, "success": 1, "warning": 2, "info": 3} def _infrastructure_notification_items( *, has_plan: bool, tomorrow_slots: int, mode_code: str, min_soc: float | None, reserve_soc: float | None, soc: float | None, inv_age: int | None, hb_age: int | None, ) -> list[SiteNotificationItem]: """Kritické / provozní notifikace (telemetrie, plán, ceny, režim, heartbeat).""" items: list[SiteNotificationItem] = [] def push( nid: str, level: Literal["success", "info", "warning", "error"], title: str, body: str, *, eta_minutes: int | None = None, action: Literal["connect_ev", "replan", "import_prices", "switch_auto"] | None = None, ) -> None: items.append( SiteNotificationItem( id=nid, level=level, title=title, body=body, eta_minutes=eta_minutes, action=action, ) ) if inv_age is None or inv_age > INV_STALE_SEC: push("telemetry_inverter", "error", "Telemetrie střídače", "Data ze střídače nejsou aktuální.") if not has_plan: push( "no_active_plan", "warning", "Chybí aktivní plán", "EMS zatím neoptimalizuje provoz – spusťte plánování.", action="replan", ) now_prague = datetime.now(ZoneInfo("Europe/Prague")) prices_expected = (now_prague.hour, now_prague.minute) >= (14, 30) if tomorrow_slots < EXPECTED_TOMORROW_PRICE_SLOTS and prices_expected: push( "prices_tomorrow", "warning", "Ceny na zítřek", "Nejsou kompletní spotové ceny OTE pro následující den.", action="import_prices", ) if mode_code.upper() == "MANUAL": push("mode_manual", "info", "Manuální režim", "Automatická optimalizace je vypnutá.") if min_soc is not None and soc is not None and soc < min_soc: push( "soc_min", "error", "SoC pod minimem", "SoC je pod absolutním minimem z konfigurace baterie.", ) elif reserve_soc is not None and soc is not None and soc < reserve_soc: push( "soc_reserve", "warning", "SoC pod ekonomickou rezervou", "SoC je pod arbitrážní podlahou – plánovač může v tomto pásmu omezovat export.", ) if hb_age is None or hb_age > HEARTBEAT_STALE_SEC: push("heartbeat", "error", "EMS heartbeat", "Služba EMS nehlásí pravidelný heartbeat.") return items def _float_or_none(v: Any) -> float | None: if v is None: return None return float(v) @router.get("/notifications", response_model=SiteNotificationsResponse) async def get_site_notifications( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> SiteNotificationsResponse: async with pool.acquire() as conn: site = await conn.fetchrow( "SELECT id, timezone FROM ems.site WHERE id = $1", site_id, ) if site is None: raise HTTPException(status_code=404, detail="Site not found") tz = site["timezone"] or "Europe/Prague" mode_row = await conn.fetchrow( """ SELECT m.mode_code FROM ems.site_operating_mode m WHERE m.site_id = $1 """, site_id, ) run_row = await conn.fetchrow( """ SELECT id FROM ems.planning_run WHERE site_id = $1 AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, site_id, ) reserve_row = await conn.fetchrow( """ SELECT MIN(reserve_soc_percent)::float AS reserve_soc, MIN(min_soc_percent)::float AS min_soc FROM ems.asset_battery WHERE site_id = $1 """, site_id, ) inv_row = await conn.fetchrow( """ SELECT battery_soc_percent, measured_at FROM ems.vw_latest_inverter WHERE site_id = $1 ORDER BY measured_at DESC NULLS LAST LIMIT 1 """, site_id, ) hb_row = await conn.fetchrow( "SELECT last_seen FROM ems.site_heartbeat WHERE site_id = $1", site_id, ) tomorrow_slots = await conn.fetchval( """ SELECT COUNT(*)::int FROM ems.vw_site_effective_price v WHERE v.site_id = $1 AND (v.interval_start AT TIME ZONE $2)::date = ((CURRENT_TIMESTAMP AT TIME ZONE $2)::date + INTERVAL '1 day')::date """, site_id, tz, ) price_rows = await conn.fetch( """ SELECT interval_start, effective_buy_price_czk_kwh, effective_sell_price_czk_kwh FROM ems.vw_site_effective_price WHERE site_id = $1 AND interval_start >= now() AND interval_start < now() + INTERVAL '48 hours' ORDER BY interval_start """, site_id, ) avg_row = await conn.fetchrow( """ SELECT AVG(effective_buy_price_czk_kwh)::float AS avg_buy FROM ems.vw_site_effective_price WHERE site_id = $1 AND interval_start::date IN (CURRENT_DATE, CURRENT_DATE + INTERVAL '1 day') """, site_id, ) bat_row = await conn.fetchrow( """ SELECT COALESCE(SUM(ab.usable_capacity_wh), 0)::float AS usable_wh FROM ems.asset_battery ab JOIN ems.asset_inverter ai ON ai.id = ab.inverter_id WHERE ai.site_id = $1 """, site_id, ) ev_rows = await conn.fetch( """ SELECT DISTINCT ON (es.id) es.id, es.charger_id, es.energy_delivered_wh, es.target_soc_pct, es.session_start, es.soc_at_connect_pct, COALESCE(av_id.battery_capacity_kwh, av_def.battery_capacity_kwh) AS battery_capacity_kwh, COALESCE(av_id.make, av_def.make) AS make, COALESCE(av_id.model, av_def.model) AS model, COALESCE(av_id.default_target_soc_pct, av_def.default_target_soc_pct) AS default_target_soc_pct, ac.code AS charger_code FROM ems.ev_session es JOIN ems.asset_ev_charger ac ON ac.id = es.charger_id LEFT JOIN ems.asset_vehicle av_id ON av_id.id = es.vehicle_id LEFT JOIN ems.asset_vehicle av_def ON av_def.default_charger_id = ac.id AND es.vehicle_id IS NULL WHERE es.site_id = $1 AND es.session_end IS NULL ORDER BY es.id, av_def.id NULLS LAST """, site_id, ) neg_rows = await conn.fetch( """ SELECT predicted_date, window_start_hour, window_end_hour, probability_pct FROM ems.predicted_negative_price_window WHERE site_id = $1 AND predicted_date BETWEEN CURRENT_DATE AND CURRENT_DATE + 2 AND probability_pct >= 50 ORDER BY predicted_date, window_start_hour """, site_id, ) has_plan = run_row is not None mode_code = (mode_row["mode_code"] if mode_row else None) or "" reserve_soc = ( float(reserve_row["reserve_soc"]) if reserve_row and reserve_row["reserve_soc"] is not None else None ) min_soc = ( float(reserve_row["min_soc"]) if reserve_row and reserve_row["min_soc"] is not None else None ) soc = ( float(inv_row["battery_soc_percent"]) if inv_row and inv_row["battery_soc_percent"] is not None else None ) inv_age = _age_seconds(inv_row["measured_at"] if inv_row else None) hb_age = _age_seconds(hb_row["last_seen"] if hb_row else None) infra = _infrastructure_notification_items( has_plan=has_plan, tomorrow_slots=int(tomorrow_slots or 0), mode_code=mode_code, min_soc=min_soc, reserve_soc=reserve_soc, soc=soc, inv_age=inv_age, hb_age=hb_age, ) prices: list[PriceSlot] = [] for r in price_rows: buy = _float_or_none(r["effective_buy_price_czk_kwh"]) if buy is None: continue sell_v = _float_or_none(r["effective_sell_price_czk_kwh"]) istart = r["interval_start"] prices.append( PriceSlot( interval_start=istart, buy=buy, sell=sell_v if sell_v is not None else buy, ) ) avg_buy = _float_or_none(avg_row["avg_buy"]) if avg_row else None usable_wh = _float_or_none(bat_row["usable_wh"]) if bat_row else None battery_kwh = (usable_wh / 1000.0) if usable_wh is not None else None ev_sessions: list[EvSessionRow] = [] for er in ev_rows: ev_sessions.append( EvSessionRow( id=int(er["id"]), charger_id=int(er["charger_id"]), energy_delivered_wh=float(er["energy_delivered_wh"] or 0), target_soc_pct=_float_or_none(er["target_soc_pct"]), session_start=er["session_start"], battery_capacity_kwh=_float_or_none(er["battery_capacity_kwh"]), make=er["make"], model=er["model"], default_target_soc_pct=_float_or_none(er["default_target_soc_pct"]), charger_code=str(er["charger_code"] or ""), soc_at_connect_pct=_float_or_none(er["soc_at_connect_pct"]), ) ) neg_windows: list[NegWindowRow] = [] for nr in neg_rows: dr = nr["predicted_date"] if isinstance(dr, datetime): d_conv = dr.date() elif isinstance(dr, date): d_conv = dr else: d_conv = date.today() neg_windows.append( NegWindowRow( predicted_date=d_conv, window_start_hour=int(nr["window_start_hour"]), window_end_hour=int(nr["window_end_hour"]), probability_pct=int(nr["probability_pct"]), ) ) sell_now = prices[0].sell if prices else None smart_raw = build_smart_notifications( prices=prices, avg_buy=avg_buy, soc_pct=soc, battery_kwh=battery_kwh, ev_sessions=ev_sessions, neg_windows=neg_windows, mode=mode_code, sell_price_now=sell_now, ) smart_items = [ SiteNotificationItem( id=d["id"], level=d["level"], title=d["title"], body=d["body"], eta_minutes=d.get("eta_minutes"), action=d.get("action"), ) for d in smart_raw ] merged = infra + smart_items merged.sort(key=lambda x: _NOTIF_LEVEL_PRIORITY.get(x.level, 9)) return SiteNotificationsResponse(notifications=merged[:5])