Files
ems/backend/app/main.py
Dusan Vojacek 014c6f193b
Some checks failed
CI and deploy / migration-check (push) Failing after 17s
CI and deploy / deploy (push) Has been skipped
refactor main.py
2026-04-19 20:42:53 +02:00

254 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""EMS FastAPI routery, health, režim, WebSockety."""
from __future__ import annotations
import logging
import os
from datetime import datetime, timezone
from typing import Annotated, Any, Literal
import asyncpg
import httpx
from app.db_json import fetch_json
from app.deps import get_pg_pool
from app.lifespan import lifespan, scheduler
from app.routers.economics import router as economics_router
from app.routers.energy_flows import router as energy_flows_router
from app.routers.ev import router as ev_router
from app.routers.full_status import router as full_status_router
from app.routers.me import router as me_router
from app.routers.plan import router as plan_router
from app.routers.site_configuration import router as site_configuration_router
from app.routers.sites import router as sites_router
from app.ws_manager import manager
from fastapi import Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from services.notification_service import run_fn_set_mode_with_discord
logger = logging.getLogger(__name__)
app = FastAPI(title="EMS Platform", lifespan=lifespan)
app.include_router(plan_router, prefix="/api/v1")
app.include_router(ev_router, prefix="/api/v1")
app.include_router(full_status_router, prefix="/api/v1")
app.include_router(site_configuration_router, prefix="/api/v1")
app.include_router(economics_router, prefix="/api/v1")
app.include_router(energy_flows_router, prefix="/api/v1")
app.include_router(me_router)
app.include_router(sites_router)
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv(
"CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173"
).split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.websocket("/ws/telemetry")
async def ws_telemetry(websocket: WebSocket) -> None:
await manager.connect_telemetry(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/logs")
async def ws_logs(websocket: WebSocket) -> None:
await manager.connect_logs(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
def _iso_utc_from_json_value(v: Any) -> str:
if v is None:
return datetime.now(timezone.utc).isoformat()
if isinstance(v, datetime):
if v.tzinfo is None:
v = v.replace(tzinfo=timezone.utc)
return v.isoformat()
if isinstance(v, str):
s = v.replace("Z", "+00:00")
try:
return datetime.fromisoformat(s).isoformat()
except ValueError:
return datetime.now(timezone.utc).isoformat()
return datetime.now(timezone.utc).isoformat()
async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]:
try:
async with db.acquire() as conn:
data = await fetch_json(conn, "select ems.fn_health_summary()")
if not isinstance(data, dict):
data = {}
return {
"status": "ok",
"db": str(data.get("db") or "ok"),
"timestamp": _iso_utc_from_json_value(data.get("timestamp")),
"active_plan_slots": int(data.get("active_plan_slots") or 0),
}
except Exception as e:
logger.warning("health DB check failed: %s", e)
return {
"status": "degraded",
"db": "error",
"timestamp": datetime.now(timezone.utc).isoformat(),
"active_plan_slots": 0,
}
@app.get("/health")
@app.get("/api/v1/health")
async def health(
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
return await _health_payload(db)
@app.get("/api/v1/health/detailed")
async def health_detailed(
request: Request,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
db_status: Literal["ok", "error"] = "error"
last_telemetry_age_sec = -1
last_plan_age_sec = -1
try:
async with db.acquire() as conn:
metrics = await fetch_json(conn, "select ems.fn_health_detailed_db()")
if not isinstance(metrics, dict):
metrics = {}
db_status = "ok"
last_telemetry_age_sec = int(metrics.get("last_telemetry_age_sec", -1))
last_plan_age_sec = int(metrics.get("last_plan_age_sec", -1))
except Exception as e:
logger.warning("health detailed DB check failed: %s", e)
db_status = "error"
sched_state: Literal["running", "stopped"] = (
"running" if scheduler.running else "stopped"
)
t_task = getattr(request.app.state, "telemetry_task", None)
tel_loop: Literal["running", "stopped"] = (
"running" if t_task is not None and not t_task.done() else "stopped"
)
active_jobs: list[dict[str, Any]] = []
for job in scheduler.get_jobs():
nrt = job.next_run_time
active_jobs.append(
{
"id": str(job.id),
"next_run_time": nrt.isoformat() if nrt is not None else None,
}
)
return {
"db": db_status,
"scheduler": sched_state,
"telemetry_loop": tel_loop,
"last_telemetry_age_sec": last_telemetry_age_sec,
"last_plan_age_sec": last_plan_age_sec,
"active_jobs": active_jobs,
}
class SetSiteModeBody(BaseModel):
mode: str = Field(..., min_length=1)
notes: str | None = None
valid_until: datetime | None = None
class SetSiteModeResponse(BaseModel):
success: bool
mode: str
activated_at: datetime
def _parse_activated_at(raw: Any) -> datetime:
if isinstance(raw, datetime):
at = raw
if at.tzinfo is None:
at = at.replace(tzinfo=timezone.utc)
return at
if isinstance(raw, str):
s = raw.replace("Z", "+00:00")
return datetime.fromisoformat(s)
raise HTTPException(status_code=500, detail="Invalid activated_at from DB")
@app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse)
async def set_site_mode(
site_id: int,
body: SetSiteModeBody,
db: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> SetSiteModeResponse:
mode = body.mode.strip().upper()
allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"}
if mode not in allowed:
raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}")
async with db.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
try:
await run_fn_set_mode_with_discord(
conn,
site_id,
mode,
"user:api",
body.valid_until,
body.notes,
)
except asyncpg.PostgresError as e:
logger.warning("fn_set_mode failed: %s", e)
raise HTTPException(status_code=400, detail=str(e)) from e
bundle = await fetch_json(
conn, "select ems.fn_site_mode_loxone_bundle($1::int)", site_id
)
if not isinstance(bundle, dict) or bundle.get("mode_code") is None:
raise HTTPException(status_code=500, detail="Mode row missing after set")
activated_at = _parse_activated_at(bundle.get("activated_at"))
mode_code = str(bundle["mode_code"])
loxone_val = bundle.get("loxone_mode_value")
host = bundle.get("loxone_host")
if host and loxone_val is not None:
proto = (bundle.get("loxone_protocol") or "http").lower()
if proto not in ("http", "https"):
proto = "http"
port_raw = bundle.get("loxone_port")
port = int(port_raw or (443 if proto == "https" else 80))
base = f"{proto}://{host}:{port}"
url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}"
user = os.getenv("LOXONE_USER") or ""
password = os.getenv("LOXONE_PASSWORD") or ""
auth = (user, password) if user else None
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url, auth=auth)
r.raise_for_status()
except Exception as e:
logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e)
return SetSiteModeResponse(
success=True, mode=mode_code, activated_at=activated_at
)