254 lines
8.4 KiB
Python
254 lines
8.4 KiB
Python
"""EMS FastAPI – routery, health, režim, WebSockety."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from datetime import datetime, timezone
|
||
from typing import Annotated, Any, Literal
|
||
|
||
import asyncpg
|
||
import httpx
|
||
from app.db_json import fetch_json
|
||
from app.deps import get_pg_pool
|
||
from app.lifespan import lifespan, scheduler
|
||
from app.routers.economics import router as economics_router
|
||
from app.routers.energy_flows import router as energy_flows_router
|
||
from app.routers.ev import router as ev_router
|
||
from app.routers.full_status import router as full_status_router
|
||
from app.routers.me import router as me_router
|
||
from app.routers.plan import router as plan_router
|
||
from app.routers.site_configuration import router as site_configuration_router
|
||
from app.routers.sites import router as sites_router
|
||
from app.ws_manager import manager
|
||
from fastapi import Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel, Field
|
||
from services.notification_service import run_fn_set_mode_with_discord
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
app = FastAPI(title="EMS Platform", lifespan=lifespan)
|
||
|
||
app.include_router(plan_router, prefix="/api/v1")
|
||
app.include_router(ev_router, prefix="/api/v1")
|
||
app.include_router(full_status_router, prefix="/api/v1")
|
||
app.include_router(site_configuration_router, prefix="/api/v1")
|
||
app.include_router(economics_router, prefix="/api/v1")
|
||
app.include_router(energy_flows_router, prefix="/api/v1")
|
||
app.include_router(me_router)
|
||
app.include_router(sites_router)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=os.getenv(
|
||
"CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173"
|
||
).split(","),
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
@app.websocket("/ws/telemetry")
|
||
async def ws_telemetry(websocket: WebSocket) -> None:
|
||
await manager.connect_telemetry(websocket)
|
||
try:
|
||
while True:
|
||
await websocket.receive_text()
|
||
except WebSocketDisconnect:
|
||
manager.disconnect(websocket)
|
||
|
||
|
||
@app.websocket("/ws/logs")
|
||
async def ws_logs(websocket: WebSocket) -> None:
|
||
await manager.connect_logs(websocket)
|
||
try:
|
||
while True:
|
||
await websocket.receive_text()
|
||
except WebSocketDisconnect:
|
||
manager.disconnect(websocket)
|
||
|
||
|
||
def _iso_utc_from_json_value(v: Any) -> str:
|
||
if v is None:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
if isinstance(v, datetime):
|
||
if v.tzinfo is None:
|
||
v = v.replace(tzinfo=timezone.utc)
|
||
return v.isoformat()
|
||
if isinstance(v, str):
|
||
s = v.replace("Z", "+00:00")
|
||
try:
|
||
return datetime.fromisoformat(s).isoformat()
|
||
except ValueError:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
|
||
async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]:
|
||
try:
|
||
async with db.acquire() as conn:
|
||
data = await fetch_json(conn, "select ems.fn_health_summary()")
|
||
if not isinstance(data, dict):
|
||
data = {}
|
||
return {
|
||
"status": "ok",
|
||
"db": str(data.get("db") or "ok"),
|
||
"timestamp": _iso_utc_from_json_value(data.get("timestamp")),
|
||
"active_plan_slots": int(data.get("active_plan_slots") or 0),
|
||
}
|
||
except Exception as e:
|
||
logger.warning("health DB check failed: %s", e)
|
||
return {
|
||
"status": "degraded",
|
||
"db": "error",
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"active_plan_slots": 0,
|
||
}
|
||
|
||
|
||
@app.get("/health")
|
||
@app.get("/api/v1/health")
|
||
async def health(
|
||
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
) -> dict[str, Any]:
|
||
return await _health_payload(db)
|
||
|
||
|
||
@app.get("/api/v1/health/detailed")
|
||
async def health_detailed(
|
||
request: Request,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
) -> dict[str, Any]:
|
||
db_status: Literal["ok", "error"] = "error"
|
||
last_telemetry_age_sec = -1
|
||
last_plan_age_sec = -1
|
||
try:
|
||
async with db.acquire() as conn:
|
||
metrics = await fetch_json(conn, "select ems.fn_health_detailed_db()")
|
||
if not isinstance(metrics, dict):
|
||
metrics = {}
|
||
db_status = "ok"
|
||
last_telemetry_age_sec = int(metrics.get("last_telemetry_age_sec", -1))
|
||
last_plan_age_sec = int(metrics.get("last_plan_age_sec", -1))
|
||
except Exception as e:
|
||
logger.warning("health detailed DB check failed: %s", e)
|
||
db_status = "error"
|
||
|
||
sched_state: Literal["running", "stopped"] = (
|
||
"running" if scheduler.running else "stopped"
|
||
)
|
||
t_task = getattr(request.app.state, "telemetry_task", None)
|
||
tel_loop: Literal["running", "stopped"] = (
|
||
"running" if t_task is not None and not t_task.done() else "stopped"
|
||
)
|
||
|
||
active_jobs: list[dict[str, Any]] = []
|
||
for job in scheduler.get_jobs():
|
||
nrt = job.next_run_time
|
||
active_jobs.append(
|
||
{
|
||
"id": str(job.id),
|
||
"next_run_time": nrt.isoformat() if nrt is not None else None,
|
||
}
|
||
)
|
||
|
||
return {
|
||
"db": db_status,
|
||
"scheduler": sched_state,
|
||
"telemetry_loop": tel_loop,
|
||
"last_telemetry_age_sec": last_telemetry_age_sec,
|
||
"last_plan_age_sec": last_plan_age_sec,
|
||
"active_jobs": active_jobs,
|
||
}
|
||
|
||
|
||
class SetSiteModeBody(BaseModel):
|
||
mode: str = Field(..., min_length=1)
|
||
notes: str | None = None
|
||
valid_until: datetime | None = None
|
||
|
||
|
||
class SetSiteModeResponse(BaseModel):
|
||
success: bool
|
||
mode: str
|
||
activated_at: datetime
|
||
|
||
|
||
def _parse_activated_at(raw: Any) -> datetime:
|
||
if isinstance(raw, datetime):
|
||
at = raw
|
||
if at.tzinfo is None:
|
||
at = at.replace(tzinfo=timezone.utc)
|
||
return at
|
||
if isinstance(raw, str):
|
||
s = raw.replace("Z", "+00:00")
|
||
return datetime.fromisoformat(s)
|
||
raise HTTPException(status_code=500, detail="Invalid activated_at from DB")
|
||
|
||
|
||
@app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse)
|
||
async def set_site_mode(
|
||
site_id: int,
|
||
body: SetSiteModeBody,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
|
||
) -> SetSiteModeResponse:
|
||
mode = body.mode.strip().upper()
|
||
allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"}
|
||
if mode not in allowed:
|
||
raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}")
|
||
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
try:
|
||
await run_fn_set_mode_with_discord(
|
||
conn,
|
||
site_id,
|
||
mode,
|
||
"user:api",
|
||
body.valid_until,
|
||
body.notes,
|
||
)
|
||
except asyncpg.PostgresError as e:
|
||
logger.warning("fn_set_mode failed: %s", e)
|
||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||
|
||
bundle = await fetch_json(
|
||
conn, "select ems.fn_site_mode_loxone_bundle($1::int)", site_id
|
||
)
|
||
|
||
if not isinstance(bundle, dict) or bundle.get("mode_code") is None:
|
||
raise HTTPException(status_code=500, detail="Mode row missing after set")
|
||
|
||
activated_at = _parse_activated_at(bundle.get("activated_at"))
|
||
mode_code = str(bundle["mode_code"])
|
||
loxone_val = bundle.get("loxone_mode_value")
|
||
host = bundle.get("loxone_host")
|
||
if host and loxone_val is not None:
|
||
proto = (bundle.get("loxone_protocol") or "http").lower()
|
||
if proto not in ("http", "https"):
|
||
proto = "http"
|
||
port_raw = bundle.get("loxone_port")
|
||
port = int(port_raw or (443 if proto == "https" else 80))
|
||
base = f"{proto}://{host}:{port}"
|
||
url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}"
|
||
user = os.getenv("LOXONE_USER") or ""
|
||
password = os.getenv("LOXONE_PASSWORD") or ""
|
||
auth = (user, password) if user else None
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
r = await client.get(url, auth=auth)
|
||
r.raise_for_status()
|
||
except Exception as e:
|
||
logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e)
|
||
|
||
return SetSiteModeResponse(
|
||
success=True, mode=mode_code, activated_at=activated_at
|
||
)
|