"""EMS FastAPI – health, provozní režimy, PostgREST doplňky.""" from __future__ import annotations import asyncio import logging import os from contextlib import asynccontextmanager from datetime import date, datetime, timedelta, timezone from typing import Annotated, Any, Literal from zoneinfo import ZoneInfo import asyncpg import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from app.db_json import record_to_dict from app.deps import set_pg_pool from app.routers.economics import router as economics_router from app.routers.energy_flows import router as energy_flows_router from app.routers.ev import router as ev_router from app.routers.full_status import router as full_status_router from app.routers.plan import router as plan_router from app.routers.site_configuration import router as site_configuration_router from app.ws_log_handler import WSLogHandler from app.ws_manager import manager from fastapi import ( APIRouter, Depends, FastAPI, HTTPException, Query, Request, WebSocket, WebSocketDisconnect, ) from fastapi.middleware.cors import CORSMiddleware from services.audit_filler import fill_audit_for_completed_intervals from services.control_exporter import ( export_setpoints, read_deye_registers_live, verify_modbus_commands, ) from services.heartbeat_service import send_heartbeat from services.forecast_service import fetch_pv_forecast from services.notification_service import ( notify_operating_mode_changed, run_fn_set_mode_with_discord, ) from services.price_importer import import_ote_prices from services.telemetry_collector import run_telemetry_loop_wrapper from pydantic import BaseModel, Field logger = logging.getLogger(__name__) def _dsn() -> str: host = os.getenv("DB_HOST", "localhost") port = os.getenv("DB_PORT", "5432") name = os.getenv("DB_NAME", "ems") user = os.getenv("DB_USER", "ems_user") password = os.getenv("DB_PASSWORD", "") return f"postgresql://{user}:{password}@{host}:{port}/{name}" pool: asyncpg.Pool | None = None async def get_pool() -> asyncpg.Pool: if pool is None: raise HTTPException(status_code=503, detail="Database pool not ready") return pool # Cron hodiny/minuty = Europe/Prague (import OTE 13:30 / 14:00, denní plán 15:00, …) scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague")) @asynccontextmanager async def lifespan(app: FastAPI): global pool pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5) set_pg_pool(pool) app.state.pg_pool = pool app.state.ws_log_handler = WSLogHandler() app.state.ws_log_handler.setLevel(logging.INFO) logging.getLogger().addHandler(app.state.ws_log_handler) from services.planning_engine import run_daily_plan, run_rolling_replan async def scheduled_heartbeat() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await send_heartbeat(site["id"], conn) except Exception: logger.exception("scheduled_heartbeat site=%s failed", site["id"]) async def scheduled_audit_filler() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await fill_audit_for_completed_intervals(site["id"], conn) except Exception: logger.exception("scheduled_audit_filler site=%s failed", site["id"]) async def scheduled_forecast_accuracy() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: n = await conn.fetchval( "SELECT ems.fn_fill_forecast_accuracy($1, 48)", site["id"], ) if n: logger.info( "forecast_accuracy filled %s slots for site %s", n, site["id"], ) except Exception: logger.exception( "scheduled_forecast_accuracy site=%s failed", site["id"] ) async def scheduled_expire_modes() -> None: async with app.state.pg_pool.acquire() as conn: try: rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()") for r in rows: await notify_operating_mode_changed( str(r["site_code"]), str(r["old_mode"]), str(r["new_mode"]), "system:expiry", "Automatické vypršení dočasného režimu", ) except Exception: logger.exception("scheduled_expire_modes failed") async def scheduled_control_export() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: await export_setpoints(site["id"], conn) except Exception as e: logger.exception("scheduled_control_export site=%s: %s", site["id"], e) async def scheduled_verify_modbus() -> None: """ Ověří příkazy ve stavu written z posledních 20 minut. Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu). """ async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: cmd_rows = await conn.fetch( """ SELECT id FROM ems.modbus_command WHERE site_id = $1 AND status = 'written' AND written_at >= now() - INTERVAL '20 minutes' ORDER BY written_at """, site["id"], ) if cmd_rows: await verify_modbus_commands( [int(r["id"]) for r in cmd_rows], conn, int(site["id"]), ) except Exception: logger.exception( "scheduled_verify_modbus site=%s failed", site["id"] ) async def scheduled_daily_plan() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: site_id = int(site["id"]) try: await run_daily_plan(site_id, conn) # Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu. await export_setpoints(site_id, conn) except Exception: logger.exception("scheduled_daily_plan site=%s failed", site_id) async def scheduled_rolling_replan() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: site_id = int(site["id"]) try: await run_rolling_replan(site_id, conn) # Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu. await export_setpoints(site_id, conn) except Exception: logger.exception("scheduled_rolling_replan site=%s failed", site_id) async def scheduled_baseline_update() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: n = await conn.fetchval( "SELECT ems.fn_update_baseline_stats($1, 30)", site["id"], ) logger.info( "baseline_stats updated %s rows for site %s", n, site["id"], ) except Exception: logger.exception( "scheduled_baseline_update site=%s failed", site["id"] ) async def scheduled_market_price_stats() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: n = await conn.fetchval( "SELECT ems.fn_update_market_price_stats($1, 90)", site["id"], ) logger.info( "market_price_stats updated %s rows site=%s", n, site["id"], ) except Exception: logger.exception( "scheduled_market_price_stats site=%s failed", site["id"] ) async def scheduled_tuv_usage_stats() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: try: n = await conn.fetchval( "SELECT ems.fn_update_tuv_usage_stats($1, 30)", site["id"], ) logger.info( "tuv_usage_stats updated %s rows site=%s", n, site["id"], ) except Exception: logger.exception( "scheduled_tuv_usage_stats site=%s failed", site["id"] ) async def scheduled_forecast_refresh() -> None: async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: site_id = int(site["id"]) try: intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) if intervals >= 0: logger.info( "scheduled_forecast_refresh site=%s intervals=%s arrays=%s", site_id, intervals, pv_arrays, ) await _refresh_negative_price_predictions(conn, site_id) else: logger.warning( "scheduled_forecast_refresh site=%s failed", site_id, ) except Exception: logger.exception("scheduled_forecast_refresh site=%s failed", site_id) async def _count_ote_slots_for_day( conn: asyncpg.Connection, target_day: date ) -> int: return int( await conn.fetchval( """ SELECT COUNT(*)::int FROM ems.market_interval_price WHERE market_source = 'OTE_CZ' AND interval_start::date = $1::date """, target_day, ) or 0 ) async def _refresh_negative_price_predictions_all_active( conn: asyncpg.Connection, ) -> None: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: await _refresh_negative_price_predictions(conn, int(site["id"])) async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None: """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site.""" prague_tz = ZoneInfo("Europe/Prague") now_loc = datetime.now(prague_tz) today = now_loc.date() tomorrow = today + timedelta(days=1) any_import_ok = False for day in (today, tomorrow): slots = await _count_ote_slots_for_day(conn, day) if slots >= 96: continue n, imported_day, _, err = await import_ote_prices( conn, site_id=None, target_date=day ) if n < 0: logger.warning( "scheduled_ote_import_global day=%s failed (%s)", day.isoformat(), err, ) continue logger.info( "scheduled_ote_import_global day=%s imported=%s slots", imported_day, n, ) any_import_ok = True if any_import_ok: await _refresh_negative_price_predictions_all_active(conn) async def scheduled_ote_import() -> None: async with app.state.pg_pool.acquire() as conn: try: await _scheduled_ote_import_global(conn) except Exception: logger.exception("scheduled_ote_import_global failed") scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") scheduler.add_job( scheduled_audit_filler, "cron", minute="1,16,31,46", second=0, id="audit_filler", ) scheduler.add_job( scheduled_forecast_accuracy, "cron", minute="2,17,32,47", id="forecast_accuracy", replace_existing=True, ) scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes") scheduler.add_job( scheduled_control_export, "cron", minute="14,29,44,59", second=0, id="control_export", ) scheduler.add_job( scheduled_verify_modbus, "interval", minutes=2, id="verify_modbus", replace_existing=True, ) scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") scheduler.add_job( scheduled_rolling_replan, "cron", minute="*/15", id="rolling_replan", ) scheduler.add_job( scheduled_baseline_update, "cron", hour=0, minute=30, id="baseline_update", replace_existing=True, ) scheduler.add_job( scheduled_market_price_stats, "cron", hour=14, minute=45, id="market_price_stats", replace_existing=True, ) scheduler.add_job( scheduled_tuv_usage_stats, "cron", hour=0, minute=45, id="tuv_usage_stats", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=13, minute=30, id="ote_import_preopen", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=14, minute=0, id="ote_import_main", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=0, minute=5, id="ote_import_backfill", replace_existing=True, ) scheduler.add_job( scheduled_forecast_refresh, "cron", hour="*/2", minute=5, id="forecast_refresh_2h", replace_existing=True, ) async def scheduled_daily_economics_notification() -> None: from services.notification_service import notify_daily_economics async with app.state.pg_pool.acquire() as conn: sites = await conn.fetch("SELECT id, code FROM ems.site WHERE active = true") for site in sites: site_id = int(site["id"]) site_code = site["code"] try: row = await conn.fetchrow( """ SELECT import_kwh, export_kwh, import_cost_czk, export_revenue_czk, green_bonus_czk, total_balance_czk, planned_balance_czk FROM ems.vw_economics_daily WHERE site_id = $1 AND day_local = ( CURRENT_DATE AT TIME ZONE 'Europe/Prague' - INTERVAL '1 day' )::date """, site_id, ) if row is None: continue yesterday = ( datetime.now(ZoneInfo("Europe/Prague")) - timedelta(days=1) ).strftime("%Y-%m-%d") await notify_daily_economics( site_code=site_code, day=yesterday, import_kwh=float(row["import_kwh"] or 0), import_cost=float(row["import_cost_czk"] or 0), export_kwh=float(row["export_kwh"] or 0), export_revenue=float(row["export_revenue_czk"] or 0), green_bonus=float(row["green_bonus_czk"] or 0), total_balance=float(row["total_balance_czk"] or 0), planned_balance=float(row["planned_balance_czk"]) if row["planned_balance_czk"] is not None else None, ) except Exception: logger.exception( "scheduled_daily_economics_notification site=%s failed", site_id, ) scheduler.add_job( scheduled_daily_economics_notification, "cron", hour=7, minute=0, id="daily_economics_notification", replace_existing=True, ) scheduler.start() telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool)) app.state.telemetry_task = telemetry_task yield ws_h = getattr(app.state, "ws_log_handler", None) if ws_h is not None: logging.getLogger().removeHandler(ws_h) app.state.ws_log_handler = None telemetry_task.cancel() try: await telemetry_task except asyncio.CancelledError: pass scheduler.shutdown(wait=False) set_pg_pool(None) app.state.pg_pool = None if pool: await pool.close() pool = None app = FastAPI(title="EMS Platform", lifespan=lifespan) app.include_router(plan_router, prefix="/api/v1") app.include_router(ev_router, prefix="/api/v1") app.include_router(full_status_router, prefix="/api/v1") app.include_router(site_configuration_router, prefix="/api/v1") app.include_router(economics_router, prefix="/api/v1") app.include_router(energy_flows_router, prefix="/api/v1") sites_router = APIRouter(prefix="/api/v1/sites", tags=["sites"]) def _parse_ymd(s: str) -> date: try: return date.fromisoformat(s) except ValueError: raise HTTPException(status_code=400, detail="Invalid date, expected YYYY-MM-DD") from None @sites_router.get("") async def list_sites(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> list[dict[str, Any]]: async with db.acquire() as conn: rows = await conn.fetch( """ SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at FROM ems.site ORDER BY id """ ) return [record_to_dict(r) for r in rows] @sites_router.get("/{site_id}/prices") async def get_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default today"), ) -> list[dict[str, Any]]: if date_str is None: date_str = date.today().isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT * FROM ems.vw_site_effective_price WHERE site_id = $1 AND interval_start::date = $2::date ORDER BY interval_start """, site_id, d, ) return [record_to_dict(r) for r in rows] class PricesImportResponse(BaseModel): slots_imported: int date: str first_price_czk_kwh: float class PricesLatestResponse(BaseModel): latest_date: str slots: int min_price: float max_price: float avg_price: float class ForecastRunResponse(BaseModel): intervals_saved: int pv_arrays: int class ModbusCommandVerifyItem(BaseModel): id: int asset_code: str register_name: str | None value_to_write: int value_verified: int | None status: str class ModbusVerifyResponse(BaseModel): checked: int verified: int mismatch: int commands: list[ModbusCommandVerifyItem] async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None: """Po importu cen / forecastu obnoví cache predikce záporných cen.""" try: await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", site_id) except Exception: logger.warning( "fn_predict_negative_price_windows failed for site %s", site_id, exc_info=True, ) @sites_router.post( "/{site_id}/prices/import", response_model=PricesImportResponse, summary="Import OTE cen (globální)", description=( "Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). " "site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se " "obnoví predikce záporných cen pro všechny aktivní lokality." ), ) async def post_import_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query( None, alias="date", description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)", ), ) -> PricesImportResponse: target: date | None = _parse_ymd(date_str) if date_str is not None else None import_error: str | None = None async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") n, day, first_price, import_error = await import_ote_prices( conn, site_id=None, target_date=target ) if n >= 0: sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: await _refresh_negative_price_predictions(conn, int(site["id"])) if n < 0: raise HTTPException( status_code=422, detail=f"OTE import selhal ({import_error or 'unknown'})", ) return PricesImportResponse( slots_imported=n, date=day, first_price_czk_kwh=first_price, ) class NegPricePredictionItem(BaseModel): predicted_date: str window_start_hour: int window_end_hour: int probability_pct: float expected_min_price: float | None reason: str class NegativePredictionsResponse(BaseModel): predictions: list[NegPricePredictionItem] insufficient_history: bool @sites_router.get( "/{site_id}/prices/negative-predictions", response_model=NegativePredictionsResponse, ) async def get_site_negative_price_predictions( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> NegativePredictionsResponse: """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE.""" async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") ndays = await conn.fetchval( """ SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int FROM ems.market_interval_price WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') AND interval_start >= now() - INTERVAL '400 days' """ ) rows = await conn.fetch( """ SELECT p.predicted_date, p.window_start_hour, p.window_end_hour, p.probability_pct, p.expected_min_price, p.reason FROM ems.predicted_negative_price_window p WHERE p.site_id = $1 AND p.predicted_date > ( CURRENT_TIMESTAMP AT TIME ZONE COALESCE( NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''), 'Europe/Prague' ) )::date AND p.predicted_date <= ( CURRENT_TIMESTAMP AT TIME ZONE COALESCE( NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''), 'Europe/Prague' ) )::date + 7 ORDER BY p.predicted_date, p.window_start_hour """, site_id, ) n_hist = int(ndays or 0) predictions: list[NegPricePredictionItem] = [] for r in rows: em = r["expected_min_price"] pd = r["predicted_date"] predictions.append( NegPricePredictionItem( predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd), window_start_hour=int(r["window_start_hour"]), window_end_hour=int(r["window_end_hour"]), probability_pct=float(r["probability_pct"]), expected_min_price=float(em) if em is not None else None, reason=r["reason"] if r["reason"] is not None else "", ) ) return NegativePredictionsResponse( predictions=predictions, insufficient_history=n_hist < 28, ) @sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) async def get_site_prices_latest( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> PricesLatestResponse: async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") row = await conn.fetchrow( """ SELECT (interval_start AT TIME ZONE 'Europe/Prague')::date AS day, COUNT(*)::int AS slots, MIN(buy_raw_price_czk_kwh)::float AS min_price, MAX(buy_raw_price_czk_kwh)::float AS max_price, AVG(buy_raw_price_czk_kwh)::float AS avg_price FROM ems.market_interval_price WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') GROUP BY day ORDER BY day DESC LIMIT 1 """ ) if row is None or row["day"] is None: raise HTTPException(status_code=404, detail="Žádná tržní data v databázi") return PricesLatestResponse( latest_date=row["day"].isoformat(), slots=int(row["slots"] or 0), min_price=float(row["min_price"] or 0.0), max_price=float(row["max_price"] or 0.0), avg_price=float(row["avg_price"] or 0.0), ) @sites_router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse) async def get_verify_modbus_commands( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"), ) -> ModbusVerifyResponse: """ Ruční ověření Modbus zápisů (written) z posledních N minut. Vhodné hned po manuálním exportu setpointů. """ async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") lookback = timedelta(minutes=minutes) rows = await conn.fetch( """ SELECT id FROM ems.modbus_command WHERE site_id = $1 AND status = 'written' AND written_at >= now() - $2::interval ORDER BY written_at """, site_id, lookback, ) ids = [int(r["id"]) for r in rows] checked = len(ids) if ids: await verify_modbus_commands(ids, conn, site_id) detail_rows = ( await conn.fetch( """ SELECT id, asset_code, register_name, value_to_write, value_verified, status FROM ems.modbus_command WHERE id = ANY($1::int[]) ORDER BY id """, ids, ) if ids else [] ) commands = [ ModbusCommandVerifyItem( id=int(r["id"]), asset_code=r["asset_code"], register_name=r["register_name"], value_to_write=int(r["value_to_write"]), value_verified=int(r["value_verified"]) if r["value_verified"] is not None else None, status=r["status"], ) for r in detail_rows ] verified = sum(1 for c in commands if c.status == "verified") mismatch = sum(1 for c in commands if c.status == "mismatch") return ModbusVerifyResponse( checked=checked, verified=verified, mismatch=mismatch, commands=commands, ) class DeyeRegistersLiveResponse(BaseModel): reg108_charge_a: int reg109_discharge_a: int reg141_energy_mode: int reg142_limit_control: int reg143_export_limit_w: int reg178_peak_shaving_switch: int reg191_peak_shaving_w: int read_at: str @sites_router.get( "/{site_id}/control/registers", response_model=DeyeRegistersLiveResponse, ) async def get_control_registers_live( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> DeyeRegistersLiveResponse: """Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient.""" async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: payload = await read_deye_registers_live(site_id, conn) except ValueError: raise HTTPException( status_code=404, detail="No controllable Modbus inverter for this site", ) from None except Exception as e: logger.warning("get_control_registers_live site=%s: %s", site_id, e) raise HTTPException( status_code=503, detail=f"Modbus read failed: {e}", ) from e return DeyeRegistersLiveResponse(**payload) class ModbusJournalCommandRow(BaseModel): id: int register: int register_name: str | None value_to_write: int value_written: int | None value_verified: int | None status: str attempt_count: int created_at: str class ModbusJournalListResponse(BaseModel): commands: list[ModbusJournalCommandRow] @sites_router.get( "/{site_id}/control/journal", response_model=ModbusJournalListResponse, ) async def get_control_command_journal( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], limit: int = Query(50, ge=1, le=100), ) -> ModbusJournalListResponse: async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT id, register, register_name, value_to_write, value_written, value_verified, status, attempt_count, created_at FROM ems.modbus_command WHERE site_id = $1 ORDER BY created_at DESC LIMIT $2 """, site_id, limit, ) cmds: list[ModbusJournalCommandRow] = [] for r in rows: d = record_to_dict(r) ca = d["created_at"] cmds.append( ModbusJournalCommandRow( id=int(d["id"]), register=int(d["register"]), register_name=d.get("register_name"), value_to_write=int(d["value_to_write"]), value_written=int(d["value_written"]) if d.get("value_written") is not None else None, value_verified=int(d["value_verified"]) if d.get("value_verified") is not None else None, status=str(d["status"]), attempt_count=int(d["attempt_count"]), created_at=ca if isinstance(ca, str) else str(ca), ) ) return ModbusJournalListResponse(commands=cmds) @sites_router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse) async def post_run_site_forecast( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> ForecastRunResponse: async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) except Exception as e: logger.error("Forecast failed: %s", e, exc_info=True) raise HTTPException(status_code=422, detail=str(e)) from e if intervals >= 0: await _refresh_negative_price_predictions(conn, site_id) if intervals < 0: raise HTTPException( status_code=422, detail="Forecast se nepodařilo stáhnout nebo zpracovat", ) return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays) @sites_router.get("/{site_id}/forecast/pv") async def get_site_forecast_pv( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default tomorrow"), ) -> dict[str, list[dict[str, Any]]]: if date_str is None: date_str = (date.today() + timedelta(days=1)).isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT run_id, pv_array_id, interval_start, power_w, irradiance_wm2, temp_c, pv_array_code, controllable FROM ( SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id) fpi.run_id, fpi.pv_array_id, fpi.interval_start, fpi.power_w, fpi.irradiance_wm2, fpi.temp_c, apa.code AS pv_array_code, apa.controllable FROM ems.forecast_pv_interval fpi JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id JOIN ems.asset_pv_array apa ON apa.id = fpr.pv_array_id AND apa.site_id = fpr.site_id WHERE fpr.site_id = $1 AND ( fpi.interval_start AT TIME ZONE COALESCE( (SELECT timezone FROM ems.site WHERE id = $1), 'Europe/Prague' ) )::date = $2::date AND fpr.status = 'ok' ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC ) latest ORDER BY controllable DESC, pv_array_code, interval_start """, site_id, d, ) # pv_a = řiditelná pole (curtailment / Deye), pv_b = neřízená (GEN, …) — sloučí více orientací pv_a: list[dict[str, Any]] = [] pv_b: list[dict[str, Any]] = [] for r in rows: item = record_to_dict(r) item.pop("controllable", None) if r["controllable"]: pv_a.append(item) else: pv_b.append(item) return {"pv_a": pv_a, "pv_b": pv_b} me_router = APIRouter(prefix="/api/v1/me", tags=["me"]) @me_router.get( "/sites", summary="Lokality přihlášeného uživatele (fáze bez auth)", description="Aktuálně vrací všechny aktivní lokality; po zavedení autentizace se odfiltruje podle oprávnění.", ) async def list_my_sites( db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> list[dict[str, Any]]: async with db.acquire() as conn: rows = await conn.fetch( """ SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at FROM ems.site WHERE active = true ORDER BY code """ ) return [record_to_dict(r) for r in rows] app.include_router(me_router) app.include_router(sites_router) app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173").split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.websocket("/ws/telemetry") async def ws_telemetry(websocket: WebSocket) -> None: await manager.connect_telemetry(websocket) try: while True: await websocket.receive_text() # keepalive except WebSocketDisconnect: manager.disconnect(websocket) @app.websocket("/ws/logs") async def ws_logs(websocket: WebSocket) -> None: await manager.connect_logs(websocket) try: while True: await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]: db_status = "error" active_plan_slots = 0 try: async with db.acquire() as conn: await conn.fetchval("SELECT 1") db_status = "ok" active_plan_slots = int( await conn.fetchval( """ SELECT COUNT(*)::bigint FROM ems.planning_interval pi INNER JOIN ems.planning_run pr ON pr.id = pi.run_id WHERE pr.status = 'active' """ ) or 0 ) except Exception as e: logger.warning("health DB check failed: %s", e) db_status = "error" return { "status": "ok" if db_status == "ok" else "degraded", "db": db_status, "timestamp": datetime.now(timezone.utc).isoformat(), "active_plan_slots": active_plan_slots, } @app.get("/health") @app.get("/api/v1/health") async def health(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> dict[str, Any]: return await _health_payload(db) @app.get("/api/v1/health/detailed") async def health_detailed( request: Request, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> dict[str, Any]: db_status: Literal["ok", "error"] = "error" last_telemetry_age_sec = -1 last_plan_age_sec = -1 try: async with db.acquire() as conn: await conn.fetchval("SELECT 1") db_status = "ok" tel = await conn.fetchval( """ SELECT CASE WHEN MAX(measured_at) IS NULL THEN -1 ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(measured_at)))::int) END FROM ems.telemetry_inverter """ ) if tel is not None: last_telemetry_age_sec = int(tel) plan_age = await conn.fetchval( """ SELECT CASE WHEN MAX(pr.created_at) IS NULL THEN -1 ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(pr.created_at)))::int) END FROM ems.planning_run pr WHERE pr.status = 'active' """ ) if plan_age is not None: last_plan_age_sec = int(plan_age) except Exception as e: logger.warning("health detailed DB check failed: %s", e) db_status = "error" sched_state: Literal["running", "stopped"] = "running" if scheduler.running else "stopped" t_task = getattr(request.app.state, "telemetry_task", None) tel_loop: Literal["running", "stopped"] = ( "running" if t_task is not None and not t_task.done() else "stopped" ) active_jobs: list[dict[str, Any]] = [] for job in scheduler.get_jobs(): nrt = job.next_run_time active_jobs.append( { "id": str(job.id), "next_run_time": nrt.isoformat() if nrt is not None else None, } ) return { "db": db_status, "scheduler": sched_state, "telemetry_loop": tel_loop, "last_telemetry_age_sec": last_telemetry_age_sec, "last_plan_age_sec": last_plan_age_sec, "active_jobs": active_jobs, } class SetSiteModeBody(BaseModel): mode: str = Field(..., min_length=1) notes: str | None = None valid_until: datetime | None = None class SetSiteModeResponse(BaseModel): success: bool mode: str activated_at: datetime @app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse) async def set_site_mode( site_id: int, body: SetSiteModeBody, db: Annotated[asyncpg.Pool, Depends(get_pool)], ) -> SetSiteModeResponse: mode = body.mode.strip().upper() allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"} if mode not in allowed: raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}") async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: await run_fn_set_mode_with_discord( conn, site_id, mode, "user:api", body.valid_until, body.notes, ) except asyncpg.PostgresError as e: logger.warning("fn_set_mode failed: %s", e) raise HTTPException(status_code=400, detail=str(e)) from e row = await conn.fetchrow( """ SELECT m.mode_code, m.activated_at, d.loxone_mode_value FROM ems.site_operating_mode m JOIN ems.operating_mode_def d ON d.code = m.mode_code WHERE m.site_id = $1 """, site_id, ) if row is None: raise HTTPException(status_code=500, detail="Mode row missing after set") ep = await conn.fetchrow( """ SELECT host, port, protocol FROM ems.site_endpoint WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true ORDER BY id LIMIT 1 """, site_id, ) activated_at: datetime = row["activated_at"] if activated_at.tzinfo is None: activated_at = activated_at.replace(tzinfo=timezone.utc) loxone_val: int | None = row["loxone_mode_value"] if ep and loxone_val is not None: proto = (ep["protocol"] or "http").lower() if proto not in ("http", "https"): proto = "http" host = ep["host"] port = int(ep["port"] or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}" url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}" user = os.getenv("LOXONE_USER") or "" password = os.getenv("LOXONE_PASSWORD") or "" auth = (user, password) if user else None try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url, auth=auth) r.raise_for_status() except Exception as e: logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e) return SetSiteModeResponse(success=True, mode=row["mode_code"], activated_at=activated_at)