"""EMS FastAPI – routery, health, režim, WebSockety.""" from __future__ import annotations import logging import os from datetime import datetime, timezone from typing import Annotated, Any, Literal import asyncpg import httpx from app.db_json import fetch_json from app.deps import get_pg_pool from app.lifespan import lifespan, scheduler from app.routers.economics import router as economics_router from app.routers.energy_flows import router as energy_flows_router from app.routers.ev import router as ev_router from app.routers.full_status import router as full_status_router from app.routers.me import router as me_router from app.routers.plan import router as plan_router from app.routers.site_configuration import router as site_configuration_router from app.routers.sites import router as sites_router from app.ws_manager import manager from fastapi import Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from services.notification_service import run_fn_set_mode_with_discord logger = logging.getLogger(__name__) app = FastAPI(title="EMS Platform", lifespan=lifespan) app.include_router(plan_router, prefix="/api/v1") app.include_router(ev_router, prefix="/api/v1") app.include_router(full_status_router, prefix="/api/v1") app.include_router(site_configuration_router, prefix="/api/v1") app.include_router(economics_router, prefix="/api/v1") app.include_router(energy_flows_router, prefix="/api/v1") app.include_router(me_router) app.include_router(sites_router) app.add_middleware( CORSMiddleware, allow_origins=os.getenv( "CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173" ).split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.websocket("/ws/telemetry") async def ws_telemetry(websocket: WebSocket) -> None: await manager.connect_telemetry(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/logs") async def ws_logs(websocket: WebSocket) -> None: await manager.connect_logs(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) def _iso_utc_from_json_value(v: Any) -> str: if v is None: return datetime.now(timezone.utc).isoformat() if isinstance(v, datetime): if v.tzinfo is None: v = v.replace(tzinfo=timezone.utc) return v.isoformat() if isinstance(v, str): s = v.replace("Z", "+00:00") try: return datetime.fromisoformat(s).isoformat() except ValueError: return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat() async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]: try: async with db.acquire() as conn: data = await fetch_json(conn, "select ems.fn_health_summary()") if not isinstance(data, dict): data = {} return { "status": "ok", "db": str(data.get("db") or "ok"), "timestamp": _iso_utc_from_json_value(data.get("timestamp")), "active_plan_slots": int(data.get("active_plan_slots") or 0), } except Exception as e: logger.warning("health DB check failed: %s", e) return { "status": "degraded", "db": "error", "timestamp": datetime.now(timezone.utc).isoformat(), "active_plan_slots": 0, } @app.get("/health") @app.get("/api/v1/health") async def health( db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: return await _health_payload(db) @app.get("/api/v1/health/detailed") async def health_detailed( request: Request, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: db_status: Literal["ok", "error"] = "error" last_telemetry_age_sec = -1 last_plan_age_sec = -1 try: async with db.acquire() as conn: metrics = await fetch_json(conn, "select ems.fn_health_detailed_db()") if not isinstance(metrics, dict): metrics = {} db_status = "ok" last_telemetry_age_sec = int(metrics.get("last_telemetry_age_sec", -1)) last_plan_age_sec = int(metrics.get("last_plan_age_sec", -1)) except Exception as e: logger.warning("health detailed DB check failed: %s", e) db_status = "error" sched_state: Literal["running", "stopped"] = ( "running" if scheduler.running else "stopped" ) t_task = getattr(request.app.state, "telemetry_task", None) tel_loop: Literal["running", "stopped"] = ( "running" if t_task is not None and not t_task.done() else "stopped" ) active_jobs: list[dict[str, Any]] = [] for job in scheduler.get_jobs(): nrt = job.next_run_time active_jobs.append( { "id": str(job.id), "next_run_time": nrt.isoformat() if nrt is not None else None, } ) return { "db": db_status, "scheduler": sched_state, "telemetry_loop": tel_loop, "last_telemetry_age_sec": last_telemetry_age_sec, "last_plan_age_sec": last_plan_age_sec, "active_jobs": active_jobs, } class SetSiteModeBody(BaseModel): mode: str = Field(..., min_length=1) notes: str | None = None valid_until: datetime | None = None class SetSiteModeResponse(BaseModel): success: bool mode: str activated_at: datetime def _parse_activated_at(raw: Any) -> datetime: if isinstance(raw, datetime): at = raw if at.tzinfo is None: at = at.replace(tzinfo=timezone.utc) return at if isinstance(raw, str): s = raw.replace("Z", "+00:00") return datetime.fromisoformat(s) raise HTTPException(status_code=500, detail="Invalid activated_at from DB") @app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse) async def set_site_mode( site_id: int, body: SetSiteModeBody, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> SetSiteModeResponse: mode = body.mode.strip().upper() allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"} if mode not in allowed: raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}") async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: await run_fn_set_mode_with_discord( conn, site_id, mode, "user:api", body.valid_until, body.notes, ) except asyncpg.PostgresError as e: logger.warning("fn_set_mode failed: %s", e) raise HTTPException(status_code=400, detail=str(e)) from e bundle = await fetch_json( conn, "select ems.fn_site_mode_loxone_bundle($1::int)", site_id ) if not isinstance(bundle, dict) or bundle.get("mode_code") is None: raise HTTPException(status_code=500, detail="Mode row missing after set") activated_at = _parse_activated_at(bundle.get("activated_at")) mode_code = str(bundle["mode_code"]) loxone_val = bundle.get("loxone_mode_value") host = bundle.get("loxone_host") if host and loxone_val is not None: proto = (bundle.get("loxone_protocol") or "http").lower() if proto not in ("http", "https"): proto = "http" port_raw = bundle.get("loxone_port") port = int(port_raw or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}" url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}" user = os.getenv("LOXONE_USER") or "" password = os.getenv("LOXONE_PASSWORD") or "" auth = (user, password) if user else None try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url, auth=auth) r.raise_for_status() except Exception as e: logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e) return SetSiteModeResponse( success=True, mode=mode_code, activated_at=activated_at )