"""EMS FastAPI – health, provozní režimy, PostgREST doplňky.""" from __future__ import annotations import asyncio import logging import os from contextlib import asynccontextmanager from datetime import date, datetime, timedelta, timezone from typing import Annotated, Any, Literal import asyncpg import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from app.db_json import record_to_dict from app.deps import set_pg_pool from app.routers.ev import router as ev_router from app.routers.full_status import router as full_status_router from app.routers.plan import router as plan_router from services.forecast_service import fetch_pv_forecast from services.price_importer import import_ote_prices from fastapi import APIRouter, Depends, FastAPI, HTTPException, Query, Request from services.audit_filler import fill_audit_for_completed_intervals from services.heartbeat_service import send_heartbeat from services.telemetry_collector import run_telemetry_loop_wrapper from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field logger = logging.getLogger(__name__) def _dsn() -> str: host = os.getenv("DB_HOST", "localhost") port = os.getenv("DB_PORT", "5432") name = os.getenv("DB_NAME", "ems") user = os.getenv("DB_USER", "ems_user") password = os.getenv("DB_PASSWORD", "") return f"postgresql://{user}:{password}@{host}:{port}/{name}" pool: asyncpg.Pool | None = None async def get_pool() -> asyncpg.Pool: if pool is None: raise HTTPException(status_code=503, detail="Database pool not ready") return pool scheduler = AsyncIOScheduler() @asynccontextmanager async def lifespan(app: FastAPI): global pool pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5) set_pg_pool(pool) app.state.pg_pool = pool from services.control_exporter import export_setpoints from services.planning_engine import run_daily_plan, run_rolling_replan async def scheduled_heartbeat() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await send_heartbeat(site["id"], conn) except Exception: logger.exception("scheduled_heartbeat site=%s failed", site["id"]) async def scheduled_audit_filler() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await fill_audit_for_completed_intervals(site["id"], conn) except Exception: logger.exception("scheduled_audit_filler site=%s failed", site["id"]) async def scheduled_expire_modes() -> None: async with app.state.pg_pool.acquire() as conn: try: await conn.fetchval("SELECT ems.fn_expire_modes()") except Exception: logger.exception("scheduled_expire_modes failed") async def scheduled_control_export() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await export_setpoints(site["id"], conn) except Exception as e: logger.exception("scheduled_control_export site=%s: %s", site["id"], e) async def scheduled_daily_plan() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await run_daily_plan(site["id"], conn) except Exception: logger.exception("scheduled_daily_plan site=%s failed", site["id"]) async def scheduled_rolling_replan() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await run_rolling_replan(site["id"], conn) except Exception: logger.exception("scheduled_rolling_replan site=%s failed", site["id"]) scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") scheduler.add_job( scheduled_audit_filler, "cron", minute="1,16,31,46", second=0, id="audit_filler", ) scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes") scheduler.add_job( scheduled_control_export, "cron", minute="14,29,44,59", second=0, id="control_export", ) scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") scheduler.add_job( scheduled_rolling_replan, "cron", minute="*/15", id="rolling_replan", ) scheduler.start() telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool)) app.state.telemetry_task = telemetry_task yield telemetry_task.cancel() try: await telemetry_task except asyncio.CancelledError: pass scheduler.shutdown(wait=False) set_pg_pool(None) app.state.pg_pool = None if pool: await pool.close() pool = None app = FastAPI(title="EMS Platform", lifespan=lifespan) app.include_router(plan_router, prefix="/api/v1") app.include_router(ev_router, prefix="/api/v1") app.include_router(full_status_router, prefix="/api/v1") sites_router = APIRouter(prefix="/api/v1/sites", tags=["sites"]) def _parse_ymd(s: str) -> date: try: return date.fromisoformat(s) except ValueError: raise HTTPException(status_code=400, detail="Invalid date, expected YYYY-MM-DD") from None @sites_router.get("") async def list_sites(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> list[dict[str, Any]]: async with db.acquire() as conn: rows = await conn.fetch( """ SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at FROM ems.site ORDER BY id """ ) return [record_to_dict(r) for r in rows] @sites_router.get("/{site_id}/prices") async def get_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default today"), ) -> list[dict[str, Any]]: if date_str is None: date_str = date.today().isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT * FROM ems.vw_site_effective_price WHERE site_id = $1 AND interval_start::date = $2::date ORDER BY interval_start """, site_id, d, ) return [record_to_dict(r) for r in rows] class PricesImportResponse(BaseModel): slots_imported: int date: str first_price_czk_kwh: float class PricesLatestResponse(BaseModel): latest_date: str slots: int min_price: float max_price: float avg_price: float class ForecastRunResponse(BaseModel): intervals_saved: int pv_arrays: int @sites_router.post("/{site_id}/prices/import", response_model=PricesImportResponse) async def post_import_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query( None, alias="date", description="YYYY-MM-DD; výchozí = zítřek v časové zóně lokality", ), ) -> PricesImportResponse: target: date | None = _parse_ymd(date_str) if date_str is not None else None async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") n, day, first_price = await import_ote_prices(site_id, conn, target_date=target) if n < 0: raise HTTPException( status_code=422, detail="OTE API nedostupné nebo nevrátilo data", ) return PricesImportResponse( slots_imported=n, date=day, first_price_czk_kwh=first_price, ) @sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) async def get_site_prices_latest( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> PricesLatestResponse: async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") row = await conn.fetchrow( """ SELECT (interval_start AT TIME ZONE 'Europe/Prague')::date AS day, COUNT(*)::int AS slots, MIN(buy_raw_price_czk_kwh)::float AS min_price, MAX(buy_raw_price_czk_kwh)::float AS max_price, AVG(buy_raw_price_czk_kwh)::float AS avg_price FROM ems.market_interval_price WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') GROUP BY day ORDER BY day DESC LIMIT 1 """ ) if row is None or row["day"] is None: raise HTTPException(status_code=404, detail="Žádná tržní data v databázi") return PricesLatestResponse( latest_date=row["day"].isoformat(), slots=int(row["slots"] or 0), min_price=float(row["min_price"] or 0.0), max_price=float(row["max_price"] or 0.0), avg_price=float(row["avg_price"] or 0.0), ) @sites_router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse) async def post_run_site_forecast( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> ForecastRunResponse: async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) if intervals < 0: raise HTTPException( status_code=422, detail="Forecast se nepodařilo stáhnout nebo zpracovat", ) return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays) @sites_router.get("/{site_id}/forecast/pv") async def get_site_forecast_pv( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default tomorrow"), ) -> dict[str, list[dict[str, Any]]]: if date_str is None: date_str = (date.today() + timedelta(days=1)).isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT fpi.*, apa.code AS pv_array_code FROM ems.forecast_pv_interval fpi JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id JOIN ems.asset_pv_array apa ON apa.id = fpi.pv_array_id AND apa.site_id = fpr.site_id WHERE fpr.site_id = $1 AND fpi.interval_start::date = $2::date AND fpr.status = 'ok' ORDER BY apa.code, fpi.interval_start """, site_id, d, ) pv_a: list[dict[str, Any]] = [] pv_b: list[dict[str, Any]] = [] for r in rows: item = record_to_dict(r) code = item.get("pv_array_code") if code == "pv-a": pv_a.append(item) elif code == "pv-b": pv_b.append(item) return {"pv_a": pv_a, "pv_b": pv_b} app.include_router(sites_router) app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173").split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]: db_status = "error" active_plan_slots = 0 try: async with db.acquire() as conn: await conn.fetchval("SELECT 1") db_status = "ok" active_plan_slots = int( await conn.fetchval( """ SELECT COUNT(*)::bigint FROM ems.planning_interval pi INNER JOIN ems.planning_run pr ON pr.id = pi.run_id WHERE pr.status = 'active' """ ) or 0 ) except Exception as e: logger.warning("health DB check failed: %s", e) db_status = "error" return { "status": "ok" if db_status == "ok" else "degraded", "db": db_status, "timestamp": datetime.now(timezone.utc).isoformat(), "active_plan_slots": active_plan_slots, } @app.get("/health") @app.get("/api/v1/health") async def health(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> dict[str, Any]: return await _health_payload(db) @app.get("/api/v1/health/detailed") async def health_detailed( request: Request, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> dict[str, Any]: db_status: Literal["ok", "error"] = "error" last_telemetry_age_sec = -1 last_plan_age_sec = -1 try: async with db.acquire() as conn: await conn.fetchval("SELECT 1") db_status = "ok" tel = await conn.fetchval( """ SELECT CASE WHEN MAX(measured_at) IS NULL THEN -1 ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(measured_at)))::int) END FROM ems.telemetry_inverter """ ) if tel is not None: last_telemetry_age_sec = int(tel) plan_age = await conn.fetchval( """ SELECT CASE WHEN MAX(pr.created_at) IS NULL THEN -1 ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(pr.created_at)))::int) END FROM ems.planning_run pr WHERE pr.status = 'active' """ ) if plan_age is not None: last_plan_age_sec = int(plan_age) except Exception as e: logger.warning("health detailed DB check failed: %s", e) db_status = "error" sched_state: Literal["running", "stopped"] = "running" if scheduler.running else "stopped" t_task = getattr(request.app.state, "telemetry_task", None) tel_loop: Literal["running", "stopped"] = ( "running" if t_task is not None and not t_task.done() else "stopped" ) active_jobs: list[dict[str, Any]] = [] for job in scheduler.get_jobs(): nrt = job.next_run_time active_jobs.append( { "id": str(job.id), "next_run_time": nrt.isoformat() if nrt is not None else None, } ) return { "db": db_status, "scheduler": sched_state, "telemetry_loop": tel_loop, "last_telemetry_age_sec": last_telemetry_age_sec, "last_plan_age_sec": last_plan_age_sec, "active_jobs": active_jobs, } class SetSiteModeBody(BaseModel): mode: str = Field(..., min_length=1) notes: str | None = None valid_until: datetime | None = None class SetSiteModeResponse(BaseModel): success: bool mode: str activated_at: datetime @app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse) async def set_site_mode( site_id: int, body: SetSiteModeBody, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> SetSiteModeResponse: mode = body.mode.strip().upper() allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"} if mode not in allowed: raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}") async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: await conn.execute( "SELECT ems.fn_set_mode($1, $2, $3, $4, $5)", site_id, mode, "user:api", body.valid_until, body.notes, ) except asyncpg.PostgresError as e: logger.warning("fn_set_mode failed: %s", e) raise HTTPException(status_code=400, detail=str(e)) from e row = await conn.fetchrow( """ SELECT m.mode_code, m.activated_at, d.loxone_mode_value FROM ems.site_operating_mode m JOIN ems.operating_mode_def d ON d.code = m.mode_code WHERE m.site_id = $1 """, site_id, ) if row is None: raise HTTPException(status_code=500, detail="Mode row missing after set") ep = await conn.fetchrow( """ SELECT host, port, protocol FROM ems.site_endpoint WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true ORDER BY id LIMIT 1 """, site_id, ) activated_at: datetime = row["activated_at"] if activated_at.tzinfo is None: activated_at = activated_at.replace(tzinfo=timezone.utc) loxone_val: int | None = row["loxone_mode_value"] if ep and loxone_val is not None: proto = (ep["protocol"] or "http").lower() if proto not in ("http", "https"): proto = "http" host = ep["host"] port = int(ep["port"] or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}" url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}" user = os.getenv("LOXONE_USER") or "" password = os.getenv("LOXONE_PASSWORD") or "" auth = (user, password) if user else None try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url, auth=auth) r.raise_for_status() except Exception as e: logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e) return SetSiteModeResponse(success=True, mode=row["mode_code"], activated_at=activated_at)