diff --git a/CLAUDE.md b/CLAUDE.md index 8d17aaf..3af4875 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,6 +67,7 @@ Multi-site Energy Management System: optimalizuje FVE, baterii a flexibilní zá ### SQL vs Python (read-model) - **Žádné ad-hoc `SELECT`/`INSERT`/`UPDATE` v `backend/services/*.py` a `backend/app/routers/*.py`** kromě: existence `SELECT 1` / `EXISTS`, volání `select ems.fn_*(…)`, a čtení z **`ems.vw_*`**. IO (Modbus, HTTP), PuLP solver a orchestrace zůstávají v Pythonu. +- **Health a Loxone po změně režimu:** `fn_health_summary`, `fn_health_detailed_db`, `fn_vw_site_directory_active`, `fn_site_economics_yesterday_notification`, `fn_site_mode_loxone_bundle` v repeatable `db/routines/R__073_fn_health_site_jobs_mode_bundle.sql`; FastAPI je v [`app/main.py`](backend/app/main.py) + joby v [`app/lifespan.py`](backend/app/lifespan.py). ### Provozní režimy (operating_mode) diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py new file mode 100644 index 0000000..b740425 --- /dev/null +++ b/backend/app/lifespan.py @@ -0,0 +1,461 @@ +"""FastAPI lifespan: DB pool, APScheduler joby, telemetrie.""" + +from __future__ import annotations + +import asyncio +import logging +import os +from contextlib import asynccontextmanager +from datetime import date, datetime, timedelta, timezone +from typing import Any + +import asyncpg +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from fastapi import FastAPI +from zoneinfo import ZoneInfo + +from app.db_json import fetch_json +from app.deps import set_pg_pool +from app.refresh_negative_prices import refresh_negative_price_predictions +from app.ws_log_handler import WSLogHandler +from services.audit_filler import fill_audit_for_completed_intervals +from services.control_exporter import export_setpoints, verify_modbus_commands +from services.forecast_service import fetch_pv_forecast +from services.heartbeat_service import send_heartbeat +from services.notification_service import notify_operating_mode_changed +from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete +from services.telemetry_collector import run_telemetry_loop_wrapper + +logger = logging.getLogger(__name__) + +scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague")) + + +def _dsn() -> str: + host = os.getenv("DB_HOST", "localhost") + port = os.getenv("DB_PORT", "5432") + name = os.getenv("DB_NAME", "ems") + user = os.getenv("DB_USER", "ems_user") + password = os.getenv("DB_PASSWORD", "") + return f"postgresql://{user}:{password}@{host}:{port}/{name}" + + +async def _active_site_rows(conn: asyncpg.Connection) -> list[dict[str, Any]]: + raw = await fetch_json(conn, "select ems.fn_vw_site_directory_active()") + if not isinstance(raw, list): + return [] + return [x for x in raw if isinstance(x, dict)] + + +@asynccontextmanager +async def lifespan(app: FastAPI): + pg_pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5) + set_pg_pool(pg_pool) + app.state.pg_pool = pg_pool + + app.state.ws_log_handler = WSLogHandler() + app.state.ws_log_handler.setLevel(logging.INFO) + logging.getLogger().addHandler(app.state.ws_log_handler) + + from services.planning_engine import run_daily_plan, run_rolling_replan + + async def scheduled_heartbeat() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + await send_heartbeat(int(site["id"]), conn) + except Exception: + logger.exception("scheduled_heartbeat site=%s failed", site["id"]) + + async def scheduled_audit_filler() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + await fill_audit_for_completed_intervals(int(site["id"]), conn) + except Exception: + logger.exception("scheduled_audit_filler site=%s failed", site["id"]) + + async def scheduled_forecast_accuracy() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + n = await conn.fetchval( + "SELECT ems.fn_fill_forecast_accuracy($1, 48)", + site["id"], + ) + if n: + logger.info( + "forecast_accuracy filled %s slots for site %s", + n, + site["id"], + ) + except Exception: + logger.exception( + "scheduled_forecast_accuracy site=%s failed", site["id"] + ) + + async def scheduled_expire_modes() -> None: + async with app.state.pg_pool.acquire() as conn: + try: + rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()") + for r in rows: + await notify_operating_mode_changed( + str(r["site_code"]), + str(r["old_mode"]), + str(r["new_mode"]), + "system:expiry", + "Automatické vypršení dočasného režimu", + ) + except Exception: + logger.exception("scheduled_expire_modes failed") + + async def scheduled_control_export() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + await export_setpoints(int(site["id"]), conn) + except Exception as e: + logger.exception( + "scheduled_control_export site=%s: %s", site["id"], e + ) + + async def scheduled_verify_modbus() -> None: + """ + Ověří příkazy ve stavu written z posledních 20 minut. + Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu). + """ + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + site_id = int(site["id"]) + try: + id_json = await fetch_json( + conn, + "select ems.fn_modbus_written_command_ids($1::int, interval '20 minutes')", + site_id, + ) + if not isinstance(id_json, list): + id_json = [] + ids = [int(x) for x in id_json] + if ids: + await verify_modbus_commands(ids, conn, site_id) + except Exception: + logger.exception("scheduled_verify_modbus site=%s failed", site_id) + + async def scheduled_daily_plan() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + site_id = int(site["id"]) + try: + await run_daily_plan(site_id, conn) + await export_setpoints(site_id, conn) + except Exception: + logger.exception("scheduled_daily_plan site=%s failed", site_id) + + async def scheduled_rolling_replan() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + site_id = int(site["id"]) + try: + await run_rolling_replan(site_id, conn) + await export_setpoints(site_id, conn) + except Exception: + logger.exception("scheduled_rolling_replan site=%s failed", site_id) + + async def scheduled_baseline_update() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + n = await conn.fetchval( + "SELECT ems.fn_update_baseline_stats($1, 30)", + site["id"], + ) + logger.info( + "baseline_stats updated %s rows for site %s", + n, + site["id"], + ) + except Exception: + logger.exception( + "scheduled_baseline_update site=%s failed", site["id"] + ) + + async def scheduled_market_price_stats() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + n = await conn.fetchval( + "SELECT ems.fn_update_market_price_stats($1, 90)", + site["id"], + ) + logger.info( + "market_price_stats updated %s rows site=%s", + n, + site["id"], + ) + except Exception: + logger.exception( + "scheduled_market_price_stats site=%s failed", site["id"] + ) + + async def scheduled_tuv_usage_stats() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + try: + n = await conn.fetchval( + "SELECT ems.fn_update_tuv_usage_stats($1, 30)", + site["id"], + ) + logger.info( + "tuv_usage_stats updated %s rows site=%s", + n, + site["id"], + ) + except Exception: + logger.exception( + "scheduled_tuv_usage_stats site=%s failed", site["id"] + ) + + async def scheduled_forecast_refresh() -> None: + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + site_id = int(site["id"]) + try: + intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) + if intervals >= 0: + logger.info( + "scheduled_forecast_refresh site=%s intervals=%s arrays=%s", + site_id, + intervals, + pv_arrays, + ) + await refresh_negative_price_predictions(conn, site_id) + else: + logger.warning( + "scheduled_forecast_refresh site=%s failed", + site_id, + ) + except Exception: + logger.exception("scheduled_forecast_refresh site=%s failed", site_id) + + async def _count_ote_slots_for_day( + conn: asyncpg.Connection, target_day: date + ) -> int: + return int( + await conn.fetchval( + """ + SELECT COUNT(*)::int + FROM ems.market_interval_price + WHERE market_source = 'OTE_CZ' + AND interval_start::date = $1::date + """, + target_day, + ) + or 0 + ) + + async def _refresh_negative_price_predictions_all_active( + conn: asyncpg.Connection, + ) -> None: + for site in await _active_site_rows(conn): + await refresh_negative_price_predictions(conn, int(site["id"])) + + async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None: + """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site.""" + prague_tz = ZoneInfo("Europe/Prague") + now_loc = datetime.now(prague_tz) + today = now_loc.date() + tomorrow = today + timedelta(days=1) + any_import_ok = False + + for day in (today, tomorrow): + slots = await _count_ote_slots_for_day(conn, day) + if ote_prague_day_slots_look_complete(slots): + continue + n, imported_day, _, err = await import_ote_prices( + conn, site_id=None, target_date=day + ) + if n < 0: + logger.warning( + "scheduled_ote_import_global day=%s failed (%s)", + day.isoformat(), + err, + ) + continue + logger.info( + "scheduled_ote_import_global day=%s imported=%s slots", + imported_day, + n, + ) + any_import_ok = True + + if any_import_ok: + await _refresh_negative_price_predictions_all_active(conn) + + async def scheduled_ote_import() -> None: + async with app.state.pg_pool.acquire() as conn: + try: + await _scheduled_ote_import_global(conn) + except Exception: + logger.exception("scheduled_ote_import_global failed") + + scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") + scheduler.add_job( + scheduled_audit_filler, + "cron", + minute="1,16,31,46", + second=0, + id="audit_filler", + ) + scheduler.add_job( + scheduled_forecast_accuracy, + "cron", + minute="2,17,32,47", + id="forecast_accuracy", + replace_existing=True, + ) + scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes") + scheduler.add_job( + scheduled_control_export, + "cron", + minute="14,29,44,59", + second=0, + id="control_export", + ) + scheduler.add_job( + scheduled_verify_modbus, + "interval", + minutes=2, + id="verify_modbus", + replace_existing=True, + ) + scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") + scheduler.add_job( + scheduled_rolling_replan, + "cron", + minute="*/15", + id="rolling_replan", + ) + scheduler.add_job( + scheduled_baseline_update, + "cron", + hour=0, + minute=30, + id="baseline_update", + replace_existing=True, + ) + scheduler.add_job( + scheduled_market_price_stats, + "cron", + hour=14, + minute=45, + id="market_price_stats", + replace_existing=True, + ) + scheduler.add_job( + scheduled_tuv_usage_stats, + "cron", + hour=0, + minute=45, + id="tuv_usage_stats", + replace_existing=True, + ) + scheduler.add_job( + scheduled_ote_import, + "cron", + hour=13, + minute=30, + id="ote_import_preopen", + replace_existing=True, + ) + scheduler.add_job( + scheduled_ote_import, + "cron", + hour=14, + minute=0, + id="ote_import_main", + replace_existing=True, + ) + scheduler.add_job( + scheduled_ote_import, + "cron", + hour=0, + minute=5, + id="ote_import_backfill", + replace_existing=True, + ) + scheduler.add_job( + scheduled_forecast_refresh, + "cron", + hour="*/2", + minute=5, + id="forecast_refresh_2h", + replace_existing=True, + ) + + async def scheduled_daily_economics_notification() -> None: + from services.notification_service import notify_daily_economics + + async with app.state.pg_pool.acquire() as conn: + for site in await _active_site_rows(conn): + site_id = int(site["id"]) + site_code = str(site["code"]) + try: + row = await fetch_json( + conn, + "select ems.fn_site_economics_yesterday_notification($1::int)", + site_id, + ) + if row is None or not isinstance(row, dict) or not row: + continue + yesterday = ( + datetime.now(ZoneInfo("Europe/Prague")) - timedelta(days=1) + ).strftime("%Y-%m-%d") + await notify_daily_economics( + site_code=site_code, + day=yesterday, + import_kwh=float(row.get("import_kwh") or 0), + import_cost=float(row.get("import_cost_czk") or 0), + export_kwh=float(row.get("export_kwh") or 0), + export_revenue=float(row.get("export_revenue_czk") or 0), + green_bonus=float(row.get("green_bonus_czk") or 0), + total_balance=float(row.get("total_balance_czk") or 0), + planned_balance=float(row["planned_balance_czk"]) + if row.get("planned_balance_czk") is not None + else None, + ) + except Exception: + logger.exception( + "scheduled_daily_economics_notification site=%s failed", + site_id, + ) + + scheduler.add_job( + scheduled_daily_economics_notification, + "cron", + hour=7, + minute=0, + id="daily_economics_notification", + replace_existing=True, + ) + scheduler.start() + + telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool)) + app.state.telemetry_task = telemetry_task + + yield + + ws_h = getattr(app.state, "ws_log_handler", None) + if ws_h is not None: + logging.getLogger().removeHandler(ws_h) + app.state.ws_log_handler = None + + telemetry_task.cancel() + try: + await telemetry_task + except asyncio.CancelledError: + pass + + scheduler.shutdown(wait=False) + set_pg_pool(None) + app.state.pg_pool = None + await pg_pool.close() diff --git a/backend/app/main.py b/backend/app/main.py index 610b530..d5400a1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,528 +1,33 @@ -"""EMS FastAPI – health, provozní režimy, PostgREST doplňky.""" +"""EMS FastAPI – routery, health, režim, WebSockety.""" from __future__ import annotations -import asyncio -import json import logging import os -from contextlib import asynccontextmanager -from datetime import date, datetime, timedelta, timezone +from datetime import datetime, timezone from typing import Annotated, Any, Literal -from zoneinfo import ZoneInfo import asyncpg import httpx -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from app.db_json import fetch_json, record_to_dict -from app.deps import set_pg_pool +from app.db_json import fetch_json +from app.deps import get_pg_pool +from app.lifespan import lifespan, scheduler from app.routers.economics import router as economics_router from app.routers.energy_flows import router as energy_flows_router from app.routers.ev import router as ev_router from app.routers.full_status import router as full_status_router +from app.routers.me import router as me_router from app.routers.plan import router as plan_router from app.routers.site_configuration import router as site_configuration_router -from app.ws_log_handler import WSLogHandler +from app.routers.sites import router as sites_router from app.ws_manager import manager -from fastapi import ( - APIRouter, - Depends, - FastAPI, - HTTPException, - Query, - Request, - WebSocket, - WebSocketDisconnect, -) +from fastapi import Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware -from services.audit_filler import fill_audit_for_completed_intervals -from services.control_exporter import ( - export_setpoints, - read_deye_registers_live, - verify_modbus_commands, -) -from services.heartbeat_service import send_heartbeat -from services.forecast_service import fetch_pv_forecast -from services.notification_service import ( - notify_operating_mode_changed, - run_fn_set_mode_with_discord, -) -from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete -from services.telemetry_collector import run_telemetry_loop_wrapper from pydantic import BaseModel, Field +from services.notification_service import run_fn_set_mode_with_discord logger = logging.getLogger(__name__) - -def _dsn() -> str: - host = os.getenv("DB_HOST", "localhost") - port = os.getenv("DB_PORT", "5432") - name = os.getenv("DB_NAME", "ems") - user = os.getenv("DB_USER", "ems_user") - password = os.getenv("DB_PASSWORD", "") - return f"postgresql://{user}:{password}@{host}:{port}/{name}" - - -pool: asyncpg.Pool | None = None - - -async def get_pool() -> asyncpg.Pool: - if pool is None: - raise HTTPException(status_code=503, detail="Database pool not ready") - return pool - - -# Cron hodiny/minuty = Europe/Prague (import OTE 13:30 / 14:00, denní plán 15:00, …) -scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague")) - - -@asynccontextmanager -async def lifespan(app: FastAPI): - global pool - pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5) - set_pg_pool(pool) - app.state.pg_pool = pool - - app.state.ws_log_handler = WSLogHandler() - app.state.ws_log_handler.setLevel(logging.INFO) - logging.getLogger().addHandler(app.state.ws_log_handler) - - from services.planning_engine import run_daily_plan, run_rolling_replan - - async def scheduled_heartbeat() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - await send_heartbeat(site["id"], conn) - except Exception: - logger.exception("scheduled_heartbeat site=%s failed", site["id"]) - - async def scheduled_audit_filler() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - await fill_audit_for_completed_intervals(site["id"], conn) - except Exception: - logger.exception("scheduled_audit_filler site=%s failed", site["id"]) - - async def scheduled_forecast_accuracy() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - n = await conn.fetchval( - "SELECT ems.fn_fill_forecast_accuracy($1, 48)", - site["id"], - ) - if n: - logger.info( - "forecast_accuracy filled %s slots for site %s", - n, - site["id"], - ) - except Exception: - logger.exception( - "scheduled_forecast_accuracy site=%s failed", site["id"] - ) - - async def scheduled_expire_modes() -> None: - async with app.state.pg_pool.acquire() as conn: - try: - rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()") - for r in rows: - await notify_operating_mode_changed( - str(r["site_code"]), - str(r["old_mode"]), - str(r["new_mode"]), - "system:expiry", - "Automatické vypršení dočasného režimu", - ) - except Exception: - logger.exception("scheduled_expire_modes failed") - - async def scheduled_control_export() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - await export_setpoints(site["id"], conn) - except Exception as e: - logger.exception("scheduled_control_export site=%s: %s", site["id"], e) - - async def scheduled_verify_modbus() -> None: - """ - Ověří příkazy ve stavu written z posledních 20 minut. - Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu). - """ - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - cmd_rows = await conn.fetch( - """ - SELECT id FROM ems.modbus_command - WHERE site_id = $1 - AND status = 'written' - AND written_at >= now() - INTERVAL '20 minutes' - ORDER BY written_at - """, - site["id"], - ) - if cmd_rows: - await verify_modbus_commands( - [int(r["id"]) for r in cmd_rows], - conn, - int(site["id"]), - ) - except Exception: - logger.exception( - "scheduled_verify_modbus site=%s failed", site["id"] - ) - - async def scheduled_daily_plan() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - site_id = int(site["id"]) - try: - await run_daily_plan(site_id, conn) - # Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu. - await export_setpoints(site_id, conn) - except Exception: - logger.exception("scheduled_daily_plan site=%s failed", site_id) - - async def scheduled_rolling_replan() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - site_id = int(site["id"]) - try: - await run_rolling_replan(site_id, conn) - # Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu. - await export_setpoints(site_id, conn) - except Exception: - logger.exception("scheduled_rolling_replan site=%s failed", site_id) - - async def scheduled_baseline_update() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - n = await conn.fetchval( - "SELECT ems.fn_update_baseline_stats($1, 30)", - site["id"], - ) - logger.info( - "baseline_stats updated %s rows for site %s", - n, - site["id"], - ) - except Exception: - logger.exception( - "scheduled_baseline_update site=%s failed", site["id"] - ) - - async def scheduled_market_price_stats() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - n = await conn.fetchval( - "SELECT ems.fn_update_market_price_stats($1, 90)", - site["id"], - ) - logger.info( - "market_price_stats updated %s rows site=%s", - n, - site["id"], - ) - except Exception: - logger.exception( - "scheduled_market_price_stats site=%s failed", site["id"] - ) - - async def scheduled_tuv_usage_stats() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - try: - n = await conn.fetchval( - "SELECT ems.fn_update_tuv_usage_stats($1, 30)", - site["id"], - ) - logger.info( - "tuv_usage_stats updated %s rows site=%s", - n, - site["id"], - ) - except Exception: - logger.exception( - "scheduled_tuv_usage_stats site=%s failed", site["id"] - ) - - async def scheduled_forecast_refresh() -> None: - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - site_id = int(site["id"]) - try: - intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) - if intervals >= 0: - logger.info( - "scheduled_forecast_refresh site=%s intervals=%s arrays=%s", - site_id, - intervals, - pv_arrays, - ) - await _refresh_negative_price_predictions(conn, site_id) - else: - logger.warning( - "scheduled_forecast_refresh site=%s failed", - site_id, - ) - except Exception: - logger.exception("scheduled_forecast_refresh site=%s failed", site_id) - - async def _count_ote_slots_for_day( - conn: asyncpg.Connection, target_day: date - ) -> int: - return int( - await conn.fetchval( - """ - SELECT COUNT(*)::int - FROM ems.market_interval_price - WHERE market_source = 'OTE_CZ' - AND interval_start::date = $1::date - """, - target_day, - ) - or 0 - ) - - async def _refresh_negative_price_predictions_all_active( - conn: asyncpg.Connection, - ) -> None: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - await _refresh_negative_price_predictions(conn, int(site["id"])) - - async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None: - """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site.""" - prague_tz = ZoneInfo("Europe/Prague") - now_loc = datetime.now(prague_tz) - today = now_loc.date() - tomorrow = today + timedelta(days=1) - any_import_ok = False - - for day in (today, tomorrow): - slots = await _count_ote_slots_for_day(conn, day) - if ote_prague_day_slots_look_complete(slots): - continue - n, imported_day, _, err = await import_ote_prices( - conn, site_id=None, target_date=day - ) - if n < 0: - logger.warning( - "scheduled_ote_import_global day=%s failed (%s)", - day.isoformat(), - err, - ) - continue - logger.info( - "scheduled_ote_import_global day=%s imported=%s slots", - imported_day, - n, - ) - any_import_ok = True - - if any_import_ok: - await _refresh_negative_price_predictions_all_active(conn) - - async def scheduled_ote_import() -> None: - async with app.state.pg_pool.acquire() as conn: - try: - await _scheduled_ote_import_global(conn) - except Exception: - logger.exception("scheduled_ote_import_global failed") - - scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") - scheduler.add_job( - scheduled_audit_filler, - "cron", - minute="1,16,31,46", - second=0, - id="audit_filler", - ) - scheduler.add_job( - scheduled_forecast_accuracy, - "cron", - minute="2,17,32,47", - id="forecast_accuracy", - replace_existing=True, - ) - scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes") - scheduler.add_job( - scheduled_control_export, - "cron", - minute="14,29,44,59", - second=0, - id="control_export", - ) - scheduler.add_job( - scheduled_verify_modbus, - "interval", - minutes=2, - id="verify_modbus", - replace_existing=True, - ) - scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") - scheduler.add_job( - scheduled_rolling_replan, - "cron", - minute="*/15", - id="rolling_replan", - ) - scheduler.add_job( - scheduled_baseline_update, - "cron", - hour=0, - minute=30, - id="baseline_update", - replace_existing=True, - ) - scheduler.add_job( - scheduled_market_price_stats, - "cron", - hour=14, - minute=45, - id="market_price_stats", - replace_existing=True, - ) - scheduler.add_job( - scheduled_tuv_usage_stats, - "cron", - hour=0, - minute=45, - id="tuv_usage_stats", - replace_existing=True, - ) - scheduler.add_job( - scheduled_ote_import, - "cron", - hour=13, - minute=30, - id="ote_import_preopen", - replace_existing=True, - ) - scheduler.add_job( - scheduled_ote_import, - "cron", - hour=14, - minute=0, - id="ote_import_main", - replace_existing=True, - ) - scheduler.add_job( - scheduled_ote_import, - "cron", - hour=0, - minute=5, - id="ote_import_backfill", - replace_existing=True, - ) - scheduler.add_job( - scheduled_forecast_refresh, - "cron", - hour="*/2", - minute=5, - id="forecast_refresh_2h", - replace_existing=True, - ) - - async def scheduled_daily_economics_notification() -> None: - from services.notification_service import notify_daily_economics - - async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("select id, code from ems.vw_site_directory where active = true") - for site in sites: - site_id = int(site["id"]) - site_code = site["code"] - try: - row = await conn.fetchrow( - """ - SELECT import_kwh, export_kwh, - import_cost_czk, export_revenue_czk, - green_bonus_czk, total_balance_czk, - planned_balance_czk - FROM ems.vw_economics_daily - WHERE site_id = $1 - AND day_local = ( - CURRENT_DATE AT TIME ZONE 'Europe/Prague' - INTERVAL '1 day' - )::date - """, - site_id, - ) - if row is None: - continue - yesterday = ( - datetime.now(ZoneInfo("Europe/Prague")) - - timedelta(days=1) - ).strftime("%Y-%m-%d") - await notify_daily_economics( - site_code=site_code, - day=yesterday, - import_kwh=float(row["import_kwh"] or 0), - import_cost=float(row["import_cost_czk"] or 0), - export_kwh=float(row["export_kwh"] or 0), - export_revenue=float(row["export_revenue_czk"] or 0), - green_bonus=float(row["green_bonus_czk"] or 0), - total_balance=float(row["total_balance_czk"] or 0), - planned_balance=float(row["planned_balance_czk"]) - if row["planned_balance_czk"] is not None - else None, - ) - except Exception: - logger.exception( - "scheduled_daily_economics_notification site=%s failed", - site_id, - ) - - scheduler.add_job( - scheduled_daily_economics_notification, - "cron", - hour=7, - minute=0, - id="daily_economics_notification", - replace_existing=True, - ) - scheduler.start() - - telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool)) - app.state.telemetry_task = telemetry_task - - yield - - ws_h = getattr(app.state, "ws_log_handler", None) - if ws_h is not None: - logging.getLogger().removeHandler(ws_h) - app.state.ws_log_handler = None - - telemetry_task.cancel() - try: - await telemetry_task - except asyncio.CancelledError: - pass - - scheduler.shutdown(wait=False) - set_pg_pool(None) - app.state.pg_pool = None - if pool: - await pool.close() - pool = None - - app = FastAPI(title="EMS Platform", lifespan=lifespan) app.include_router(plan_router, prefix="/api/v1") @@ -531,485 +36,14 @@ app.include_router(full_status_router, prefix="/api/v1") app.include_router(site_configuration_router, prefix="/api/v1") app.include_router(economics_router, prefix="/api/v1") app.include_router(energy_flows_router, prefix="/api/v1") - -sites_router = APIRouter(prefix="/api/v1/sites", tags=["sites"]) - - -def _parse_ymd(s: str) -> date: - try: - return date.fromisoformat(s) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid date, expected YYYY-MM-DD") from None - - -@sites_router.get("") -async def list_sites(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> list[dict[str, Any]]: - async with db.acquire() as conn: - rows = await conn.fetch( - """ - select id, code, name, timezone, latitude, longitude, active, notes, created_at - from ems.vw_site_directory - order by id - """ - ) - return [record_to_dict(r) for r in rows] - - -@sites_router.get("/{site_id}/prices") -async def get_site_prices( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], - date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default today"), -) -> list[dict[str, Any]]: - if date_str is None: - date_str = date.today().isoformat() - d = _parse_ymd(date_str) - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - rows = await fetch_json( - conn, - "select ems.fn_site_effective_prices_day_prague($1::int, $2::date)", - site_id, - d, - ) - if not isinstance(rows, list): - rows = json.loads(rows) if isinstance(rows, str) else [] - return [r for r in rows if isinstance(r, dict)] - - -class PricesImportResponse(BaseModel): - slots_imported: int - date: str - first_price_czk_kwh: float - - -class PricesLatestResponse(BaseModel): - latest_date: str - slots: int - min_price: float - max_price: float - avg_price: float - - -class ForecastRunResponse(BaseModel): - intervals_saved: int - pv_arrays: int - - -class ModbusCommandVerifyItem(BaseModel): - id: int - asset_code: str - register_name: str | None - value_to_write: int - value_verified: int | None - status: str - - -class ModbusVerifyResponse(BaseModel): - checked: int - verified: int - mismatch: int - commands: list[ModbusCommandVerifyItem] - - -async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None: - """Po importu cen / forecastu obnoví cache predikce záporných cen.""" - try: - await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", site_id) - except Exception: - logger.warning( - "fn_predict_negative_price_windows failed for site %s", - site_id, - exc_info=True, - ) - - -@sites_router.post( - "/{site_id}/prices/import", - response_model=PricesImportResponse, - summary="Import OTE cen (globální)", - description=( - "Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). " - "site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se " - "obnoví predikce záporných cen pro všechny aktivní lokality." - ), -) -async def post_import_site_prices( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], - date_str: str | None = Query( - None, - alias="date", - description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)", - ), -) -> PricesImportResponse: - target: date | None = _parse_ymd(date_str) if date_str is not None else None - import_error: str | None = None - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - n, day, first_price, import_error = await import_ote_prices( - conn, site_id=None, target_date=target - ) - if n >= 0: - sites = await conn.fetch("select id from ems.vw_site_directory where active = true") - for site in sites: - await _refresh_negative_price_predictions(conn, int(site["id"])) - if n < 0: - raise HTTPException( - status_code=422, - detail=f"OTE import selhal ({import_error or 'unknown'})", - ) - return PricesImportResponse( - slots_imported=n, - date=day, - first_price_czk_kwh=first_price, - ) - - -class NegPricePredictionItem(BaseModel): - predicted_date: str - window_start_hour: int - window_end_hour: int - probability_pct: float - expected_min_price: float | None - reason: str - - -class NegativePredictionsResponse(BaseModel): - predictions: list[NegPricePredictionItem] - insufficient_history: bool - - -@sites_router.get( - "/{site_id}/prices/negative-predictions", - response_model=NegativePredictionsResponse, -) -async def get_site_negative_price_predictions( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> NegativePredictionsResponse: - """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE.""" - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - bundle = await fetch_json( - conn, - "select ems.fn_negative_price_predictions($1::int)", - site_id, - ) - if not isinstance(bundle, dict): - bundle = json.loads(bundle) - rows = bundle.get("predictions") or [] - if not isinstance(rows, list): - rows = [] - predictions: list[NegPricePredictionItem] = [] - for r in rows: - if not isinstance(r, dict): - continue - em = r.get("expected_min_price") - pd = r.get("predicted_date") - predictions.append( - NegPricePredictionItem( - predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd), - window_start_hour=int(r.get("window_start_hour") or 0), - window_end_hour=int(r.get("window_end_hour") or 0), - probability_pct=float(r.get("probability_pct") or 0), - expected_min_price=float(em) if em is not None else None, - reason=str(r.get("reason") or ""), - ) - ) - return NegativePredictionsResponse( - predictions=predictions, - insufficient_history=bool(bundle.get("insufficient_history")), - ) - - -@sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) -async def get_site_prices_latest( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> PricesLatestResponse: - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - row = await fetch_json(conn, "select ems.fn_latest_ote_day_stats()") - if not isinstance(row, dict): - row = json.loads(row) - day = row.get("latest_date") - if day is None: - raise HTTPException(status_code=404, detail="Žádná tržní data v databázi") - latest_date = day.isoformat() if hasattr(day, "isoformat") else str(day)[:10] - return PricesLatestResponse( - latest_date=latest_date, - slots=int(row.get("slots") or 0), - min_price=float(row.get("min_price") or 0.0), - max_price=float(row.get("max_price") or 0.0), - avg_price=float(row.get("avg_price") or 0.0), - ) - - -@sites_router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse) -async def get_verify_modbus_commands( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], - minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"), -) -> ModbusVerifyResponse: - """ - Ruční ověření Modbus zápisů (written) z posledních N minut. - Vhodné hned po manuálním exportu setpointů. - """ - async with db.acquire() as conn: - site_ok = await conn.fetchval( - "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id - ) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - - lookback = timedelta(minutes=minutes) - id_json = await fetch_json( - conn, - "select ems.fn_modbus_written_command_ids($1::int, $2::interval)", - site_id, - lookback, - ) - if not isinstance(id_json, list): - id_json = json.loads(id_json) if isinstance(id_json, str) else [] - ids = [int(x) for x in id_json] - checked = len(ids) - if ids: - await verify_modbus_commands(ids, conn, site_id) - - detail_json = ( - await fetch_json( - conn, - "select ems.fn_modbus_commands_by_ids($1::int[])", - ids, - ) - if ids - else [] - ) - if ids and not isinstance(detail_json, list): - detail_json = json.loads(detail_json) if isinstance(detail_json, str) else [] - detail_rows = detail_json if ids else [] - - commands = [ - ModbusCommandVerifyItem( - id=int(r["id"]), - asset_code=str(r.get("asset_code") or ""), - register_name=r.get("register_name"), - value_to_write=int(r["value_to_write"]), - value_verified=int(r["value_verified"]) - if r.get("value_verified") is not None - else None, - status=str(r.get("status") or ""), - ) - for r in detail_rows - if isinstance(r, dict) - ] - verified = sum(1 for c in commands if c.status == "verified") - mismatch = sum(1 for c in commands if c.status == "mismatch") - return ModbusVerifyResponse( - checked=checked, - verified=verified, - mismatch=mismatch, - commands=commands, - ) - - -class DeyeRegistersLiveResponse(BaseModel): - reg108_charge_a: int - reg109_discharge_a: int - reg141_energy_mode: int - reg142_limit_control: int - reg143_export_limit_w: int - reg178_peak_shaving_switch: int - reg191_peak_shaving_w: int - read_at: str - - -@sites_router.get( - "/{site_id}/control/registers", - response_model=DeyeRegistersLiveResponse, -) -async def get_control_registers_live( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> DeyeRegistersLiveResponse: - """Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient.""" - async with db.acquire() as conn: - site_ok = await conn.fetchval( - "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id - ) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - try: - payload = await read_deye_registers_live(site_id, conn) - except ValueError: - raise HTTPException( - status_code=404, - detail="No controllable Modbus inverter for this site", - ) from None - except Exception as e: - logger.warning("get_control_registers_live site=%s: %s", site_id, e) - raise HTTPException( - status_code=503, - detail=f"Modbus read failed: {e}", - ) from e - return DeyeRegistersLiveResponse(**payload) - - -class ModbusJournalCommandRow(BaseModel): - id: int - register: int - register_name: str | None - value_to_write: int - value_written: int | None - value_verified: int | None - status: str - attempt_count: int - created_at: str - - -class ModbusJournalListResponse(BaseModel): - commands: list[ModbusJournalCommandRow] - - -@sites_router.get( - "/{site_id}/control/journal", - response_model=ModbusJournalListResponse, -) -async def get_control_command_journal( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], - limit: int = Query(50, ge=1, le=100), -) -> ModbusJournalListResponse: - async with db.acquire() as conn: - site_ok = await conn.fetchval( - "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id - ) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - rows = await fetch_json( - conn, - "select ems.fn_modbus_journal_list($1::int, $2::int)", - site_id, - limit, - ) - if not isinstance(rows, list): - rows = json.loads(rows) if isinstance(rows, str) else [] - cmds: list[ModbusJournalCommandRow] = [] - for r in rows: - d = r if isinstance(r, dict) else {} - ca = d["created_at"] - cmds.append( - ModbusJournalCommandRow( - id=int(d["id"]), - register=int(d["register"]), - register_name=d.get("register_name"), - value_to_write=int(d["value_to_write"]), - value_written=int(d["value_written"]) - if d.get("value_written") is not None - else None, - value_verified=int(d["value_verified"]) - if d.get("value_verified") is not None - else None, - status=str(d["status"]), - attempt_count=int(d["attempt_count"]), - created_at=ca if isinstance(ca, str) else str(ca), - ) - ) - return ModbusJournalListResponse(commands=cmds) - - -@sites_router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse) -async def post_run_site_forecast( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> ForecastRunResponse: - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - try: - intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) - except Exception as e: - logger.error("Forecast failed: %s", e, exc_info=True) - raise HTTPException(status_code=422, detail=str(e)) from e - if intervals >= 0: - await _refresh_negative_price_predictions(conn, site_id) - if intervals < 0: - raise HTTPException( - status_code=422, - detail="Forecast se nepodařilo stáhnout nebo zpracovat", - ) - return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays) - - -@sites_router.get("/{site_id}/forecast/pv") -async def get_site_forecast_pv( - site_id: int, - db: Annotated[asyncpg.Pool, Depends(get_pool)], - date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default tomorrow"), -) -> dict[str, list[dict[str, Any]]]: - if date_str is None: - date_str = (date.today() + timedelta(days=1)).isoformat() - d = _parse_ymd(date_str) - async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - split = await fetch_json( - conn, - "select ems.fn_forecast_pv_split($1::int, $2::date)", - site_id, - d, - ) - if not isinstance(split, dict): - split = json.loads(split) if isinstance(split, str) else {} - pv_a = split.get("pv_a") or [] - pv_b = split.get("pv_b") or [] - if not isinstance(pv_a, list): - pv_a = [] - if not isinstance(pv_b, list): - pv_b = [] - return {"pv_a": pv_a, "pv_b": pv_b} - - -me_router = APIRouter(prefix="/api/v1/me", tags=["me"]) - - -@me_router.get( - "/sites", - summary="Lokality přihlášeného uživatele (fáze bez auth)", - description="Aktuálně vrací všechny aktivní lokality; po zavedení autentizace se odfiltruje podle oprávnění.", -) -async def list_my_sites( - db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> list[dict[str, Any]]: - async with db.acquire() as conn: - rows = await conn.fetch( - """ - SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at - FROM ems.site - WHERE active = true - ORDER BY code - """ - ) - return [record_to_dict(r) for r in rows] - - app.include_router(me_router) app.include_router(sites_router) app.add_middleware( CORSMiddleware, - allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173").split(","), + allow_origins=os.getenv( + "CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173" + ).split(","), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -1021,7 +55,7 @@ async def ws_telemetry(websocket: WebSocket) -> None: await manager.connect_telemetry(websocket) try: while True: - await websocket.receive_text() # keepalive + await websocket.receive_text() except WebSocketDisconnect: manager.disconnect(websocket) @@ -1036,82 +70,75 @@ async def ws_logs(websocket: WebSocket) -> None: manager.disconnect(websocket) +def _iso_utc_from_json_value(v: Any) -> str: + if v is None: + return datetime.now(timezone.utc).isoformat() + if isinstance(v, datetime): + if v.tzinfo is None: + v = v.replace(tzinfo=timezone.utc) + return v.isoformat() + if isinstance(v, str): + s = v.replace("Z", "+00:00") + try: + return datetime.fromisoformat(s).isoformat() + except ValueError: + return datetime.now(timezone.utc).isoformat() + return datetime.now(timezone.utc).isoformat() + + async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]: - db_status = "error" - active_plan_slots = 0 try: async with db.acquire() as conn: - await conn.fetchval("SELECT 1") - db_status = "ok" - active_plan_slots = int( - await conn.fetchval( - """ - SELECT COUNT(*)::bigint - FROM ems.planning_interval pi - INNER JOIN ems.planning_run pr ON pr.id = pi.run_id - WHERE pr.status = 'active' - """ - ) - or 0 - ) + data = await fetch_json(conn, "select ems.fn_health_summary()") + if not isinstance(data, dict): + data = {} + return { + "status": "ok", + "db": str(data.get("db") or "ok"), + "timestamp": _iso_utc_from_json_value(data.get("timestamp")), + "active_plan_slots": int(data.get("active_plan_slots") or 0), + } except Exception as e: logger.warning("health DB check failed: %s", e) - db_status = "error" - - return { - "status": "ok" if db_status == "ok" else "degraded", - "db": db_status, - "timestamp": datetime.now(timezone.utc).isoformat(), - "active_plan_slots": active_plan_slots, - } + return { + "status": "degraded", + "db": "error", + "timestamp": datetime.now(timezone.utc).isoformat(), + "active_plan_slots": 0, + } @app.get("/health") @app.get("/api/v1/health") -async def health(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> dict[str, Any]: +async def health( + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> dict[str, Any]: return await _health_payload(db) @app.get("/api/v1/health/detailed") async def health_detailed( request: Request, - db: Annotated[asyncpg.Pool, Depends(get_pool)], + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> dict[str, Any]: db_status: Literal["ok", "error"] = "error" last_telemetry_age_sec = -1 last_plan_age_sec = -1 try: async with db.acquire() as conn: - await conn.fetchval("SELECT 1") - db_status = "ok" - tel = await conn.fetchval( - """ - SELECT CASE - WHEN MAX(measured_at) IS NULL THEN -1 - ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(measured_at)))::int) - END - FROM ems.telemetry_inverter - """ - ) - if tel is not None: - last_telemetry_age_sec = int(tel) - plan_age = await conn.fetchval( - """ - SELECT CASE - WHEN MAX(pr.created_at) IS NULL THEN -1 - ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(pr.created_at)))::int) - END - FROM ems.planning_run pr - WHERE pr.status = 'active' - """ - ) - if plan_age is not None: - last_plan_age_sec = int(plan_age) + metrics = await fetch_json(conn, "select ems.fn_health_detailed_db()") + if not isinstance(metrics, dict): + metrics = {} + db_status = "ok" + last_telemetry_age_sec = int(metrics.get("last_telemetry_age_sec", -1)) + last_plan_age_sec = int(metrics.get("last_plan_age_sec", -1)) except Exception as e: logger.warning("health detailed DB check failed: %s", e) db_status = "error" - sched_state: Literal["running", "stopped"] = "running" if scheduler.running else "stopped" + sched_state: Literal["running", "stopped"] = ( + "running" if scheduler.running else "stopped" + ) t_task = getattr(request.app.state, "telemetry_task", None) tel_loop: Literal["running", "stopped"] = ( "running" if t_task is not None and not t_task.done() else "stopped" @@ -1149,11 +176,23 @@ class SetSiteModeResponse(BaseModel): activated_at: datetime +def _parse_activated_at(raw: Any) -> datetime: + if isinstance(raw, datetime): + at = raw + if at.tzinfo is None: + at = at.replace(tzinfo=timezone.utc) + return at + if isinstance(raw, str): + s = raw.replace("Z", "+00:00") + return datetime.fromisoformat(s) + raise HTTPException(status_code=500, detail="Invalid activated_at from DB") + + @app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse) async def set_site_mode( site_id: int, body: SetSiteModeBody, - db: Annotated[asyncpg.Pool, Depends(get_pool)], + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> SetSiteModeResponse: mode = body.mode.strip().upper() allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"} @@ -1161,7 +200,9 @@ async def set_site_mode( raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}") async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") @@ -1178,40 +219,23 @@ async def set_site_mode( logger.warning("fn_set_mode failed: %s", e) raise HTTPException(status_code=400, detail=str(e)) from e - row = await conn.fetchrow( - """ - SELECT m.mode_code, m.activated_at, d.loxone_mode_value - FROM ems.site_operating_mode m - JOIN ems.operating_mode_def d ON d.code = m.mode_code - WHERE m.site_id = $1 - """, - site_id, - ) - if row is None: - raise HTTPException(status_code=500, detail="Mode row missing after set") - - ep = await conn.fetchrow( - """ - SELECT host, port, protocol - FROM ems.site_endpoint - WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true - ORDER BY id - LIMIT 1 - """, - site_id, + bundle = await fetch_json( + conn, "select ems.fn_site_mode_loxone_bundle($1::int)", site_id ) - activated_at: datetime = row["activated_at"] - if activated_at.tzinfo is None: - activated_at = activated_at.replace(tzinfo=timezone.utc) + if not isinstance(bundle, dict) or bundle.get("mode_code") is None: + raise HTTPException(status_code=500, detail="Mode row missing after set") - loxone_val: int | None = row["loxone_mode_value"] - if ep and loxone_val is not None: - proto = (ep["protocol"] or "http").lower() + activated_at = _parse_activated_at(bundle.get("activated_at")) + mode_code = str(bundle["mode_code"]) + loxone_val = bundle.get("loxone_mode_value") + host = bundle.get("loxone_host") + if host and loxone_val is not None: + proto = (bundle.get("loxone_protocol") or "http").lower() if proto not in ("http", "https"): proto = "http" - host = ep["host"] - port = int(ep["port"] or (443 if proto == "https" else 80)) + port_raw = bundle.get("loxone_port") + port = int(port_raw or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}" url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}" user = os.getenv("LOXONE_USER") or "" @@ -1224,4 +248,6 @@ async def set_site_mode( except Exception as e: logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e) - return SetSiteModeResponse(success=True, mode=row["mode_code"], activated_at=activated_at) + return SetSiteModeResponse( + success=True, mode=mode_code, activated_at=activated_at + ) diff --git a/backend/app/refresh_negative_prices.py b/backend/app/refresh_negative_prices.py new file mode 100644 index 0000000..d7e7ef1 --- /dev/null +++ b/backend/app/refresh_negative_prices.py @@ -0,0 +1,22 @@ +"""Sdílený hook po importu cen / forecastu – obnova cache predikce záporných cen.""" + +from __future__ import annotations + +import logging + +import asyncpg + +logger = logging.getLogger(__name__) + + +async def refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None: + try: + await conn.fetch( + "SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", site_id + ) + except Exception: + logger.warning( + "fn_predict_negative_price_windows failed for site %s", + site_id, + exc_info=True, + ) diff --git a/backend/app/routers/me.py b/backend/app/routers/me.py new file mode 100644 index 0000000..9be86c6 --- /dev/null +++ b/backend/app/routers/me.py @@ -0,0 +1,33 @@ +"""REST API – /me (fáze bez auth).""" + +from __future__ import annotations + +from typing import Annotated, Any + +import asyncpg +from fastapi import APIRouter, Depends + +from app.db_json import record_to_dict +from app.deps import get_pg_pool + +router = APIRouter(prefix="/api/v1/me", tags=["me"]) + + +@router.get( + "/sites", + summary="Lokality přihlášeného uživatele (fáze bez auth)", + description="Aktuálně vrací všechny aktivní lokality z vw_site_directory; po zavedení autentizace se odfiltruje podle oprávnění.", +) +async def list_my_sites( + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> list[dict[str, Any]]: + async with db.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at + FROM ems.vw_site_directory + WHERE active = true + ORDER BY code + """ + ) + return [record_to_dict(r) for r in rows] diff --git a/backend/app/routers/sites.py b/backend/app/routers/sites.py new file mode 100644 index 0000000..093c5c1 --- /dev/null +++ b/backend/app/routers/sites.py @@ -0,0 +1,483 @@ +"""REST API – lokality: ceny OTE, forecast, Modbus journal/verify.""" + +from __future__ import annotations + +import json +import logging +from datetime import date, datetime, timedelta +from typing import Annotated, Any + +import asyncpg +from fastapi import APIRouter, Depends, HTTPException, Query +from pydantic import BaseModel + +from app.db_json import fetch_json, record_to_dict +from app.deps import get_pg_pool +from app.refresh_negative_prices import refresh_negative_price_predictions +from services.control_exporter import read_deye_registers_live, verify_modbus_commands +from services.forecast_service import fetch_pv_forecast +from services.price_importer import import_ote_prices + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/v1/sites", tags=["sites"]) + + +def _parse_ymd(s: str) -> date: + try: + return date.fromisoformat(s) + except ValueError: + raise HTTPException( + status_code=400, detail="Invalid date, expected YYYY-MM-DD" + ) from None + + +@router.get("") +async def list_sites( + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> list[dict[str, Any]]: + async with db.acquire() as conn: + rows = await conn.fetch( + """ + select id, code, name, timezone, latitude, longitude, active, notes, created_at + from ems.vw_site_directory + order by id + """ + ) + return [record_to_dict(r) for r in rows] + + +@router.get("/{site_id}/prices") +async def get_site_prices( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], + date_str: str | None = Query( + None, alias="date", description="YYYY-MM-DD, default today" + ), +) -> list[dict[str, Any]]: + if date_str is None: + date_str = date.today().isoformat() + d = _parse_ymd(date_str) + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + rows = await fetch_json( + conn, + "select ems.fn_site_effective_prices_day_prague($1::int, $2::date)", + site_id, + d, + ) + if not isinstance(rows, list): + rows = json.loads(rows) if isinstance(rows, str) else [] + return [r for r in rows if isinstance(r, dict)] + + +class PricesImportResponse(BaseModel): + slots_imported: int + date: str + first_price_czk_kwh: float + + +class PricesLatestResponse(BaseModel): + latest_date: str + slots: int + min_price: float + max_price: float + avg_price: float + + +class ForecastRunResponse(BaseModel): + intervals_saved: int + pv_arrays: int + + +class ModbusCommandVerifyItem(BaseModel): + id: int + asset_code: str + register_name: str | None + value_to_write: int + value_verified: int | None + status: str + + +class ModbusVerifyResponse(BaseModel): + checked: int + verified: int + mismatch: int + commands: list[ModbusCommandVerifyItem] + + +@router.post( + "/{site_id}/prices/import", + response_model=PricesImportResponse, + summary="Import OTE cen (globální)", + description=( + "Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). " + "site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se " + "obnoví predikce záporných cen pro všechny aktivní lokality." + ), +) +async def post_import_site_prices( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], + date_str: str | None = Query( + None, + alias="date", + description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)", + ), +) -> PricesImportResponse: + target: date | None = _parse_ymd(date_str) if date_str is not None else None + import_error: str | None = None + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + n, day, first_price, import_error = await import_ote_prices( + conn, site_id=None, target_date=target + ) + if n >= 0: + sites_raw = await fetch_json( + conn, "select ems.fn_vw_site_directory_active()" + ) + sites_list = sites_raw if isinstance(sites_raw, list) else [] + for site in sites_list: + if isinstance(site, dict): + await refresh_negative_price_predictions(conn, int(site["id"])) + if n < 0: + raise HTTPException( + status_code=422, + detail=f"OTE import selhal ({import_error or 'unknown'})", + ) + return PricesImportResponse( + slots_imported=n, + date=day, + first_price_czk_kwh=first_price, + ) + + +class NegPricePredictionItem(BaseModel): + predicted_date: str + window_start_hour: int + window_end_hour: int + probability_pct: float + expected_min_price: float | None + reason: str + + +class NegativePredictionsResponse(BaseModel): + predictions: list[NegPricePredictionItem] + insufficient_history: bool + + +@router.get( + "/{site_id}/prices/negative-predictions", + response_model=NegativePredictionsResponse, +) +async def get_site_negative_price_predictions( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> NegativePredictionsResponse: + """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE.""" + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + bundle = await fetch_json( + conn, + "select ems.fn_negative_price_predictions($1::int)", + site_id, + ) + if not isinstance(bundle, dict): + bundle = json.loads(bundle) + rows = bundle.get("predictions") or [] + if not isinstance(rows, list): + rows = [] + predictions: list[NegPricePredictionItem] = [] + for r in rows: + if not isinstance(r, dict): + continue + em = r.get("expected_min_price") + pd = r.get("predicted_date") + predictions.append( + NegPricePredictionItem( + predicted_date=pd.isoformat() + if hasattr(pd, "isoformat") + else str(pd), + window_start_hour=int(r.get("window_start_hour") or 0), + window_end_hour=int(r.get("window_end_hour") or 0), + probability_pct=float(r.get("probability_pct") or 0), + expected_min_price=float(em) if em is not None else None, + reason=str(r.get("reason") or ""), + ) + ) + return NegativePredictionsResponse( + predictions=predictions, + insufficient_history=bool(bundle.get("insufficient_history")), + ) + + +@router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) +async def get_site_prices_latest( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> PricesLatestResponse: + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + row = await fetch_json(conn, "select ems.fn_latest_ote_day_stats()") + if not isinstance(row, dict): + row = json.loads(row) + day = row.get("latest_date") + if day is None: + raise HTTPException(status_code=404, detail="Žádná tržní data v databázi") + latest_date = day.isoformat() if hasattr(day, "isoformat") else str(day)[:10] + return PricesLatestResponse( + latest_date=latest_date, + slots=int(row.get("slots") or 0), + min_price=float(row.get("min_price") or 0.0), + max_price=float(row.get("max_price") or 0.0), + avg_price=float(row.get("avg_price") or 0.0), + ) + + +@router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse) +async def get_verify_modbus_commands( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], + minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"), +) -> ModbusVerifyResponse: + """ + Ruční ověření Modbus zápisů (written) z posledních N minut. + Vhodné hned po manuálním exportu setpointů. + """ + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + + lookback = timedelta(minutes=minutes) + id_json = await fetch_json( + conn, + "select ems.fn_modbus_written_command_ids($1::int, $2::interval)", + site_id, + lookback, + ) + if not isinstance(id_json, list): + id_json = json.loads(id_json) if isinstance(id_json, str) else [] + ids = [int(x) for x in id_json] + checked = len(ids) + if ids: + await verify_modbus_commands(ids, conn, site_id) + + detail_json = ( + await fetch_json( + conn, + "select ems.fn_modbus_commands_by_ids($1::int[])", + ids, + ) + if ids + else [] + ) + if ids and not isinstance(detail_json, list): + detail_json = json.loads(detail_json) if isinstance(detail_json, str) else [] + detail_rows = detail_json if ids else [] + + commands = [ + ModbusCommandVerifyItem( + id=int(r["id"]), + asset_code=str(r.get("asset_code") or ""), + register_name=r.get("register_name"), + value_to_write=int(r["value_to_write"]), + value_verified=int(r["value_verified"]) + if r.get("value_verified") is not None + else None, + status=str(r.get("status") or ""), + ) + for r in detail_rows + if isinstance(r, dict) + ] + verified = sum(1 for c in commands if c.status == "verified") + mismatch = sum(1 for c in commands if c.status == "mismatch") + return ModbusVerifyResponse( + checked=checked, + verified=verified, + mismatch=mismatch, + commands=commands, + ) + + +class DeyeRegistersLiveResponse(BaseModel): + reg108_charge_a: int + reg109_discharge_a: int + reg141_energy_mode: int + reg142_limit_control: int + reg143_export_limit_w: int + reg178_peak_shaving_switch: int + reg191_peak_shaving_w: int + read_at: str + + +@router.get( + "/{site_id}/control/registers", + response_model=DeyeRegistersLiveResponse, +) +async def get_control_registers_live( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> DeyeRegistersLiveResponse: + """Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient.""" + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + try: + payload = await read_deye_registers_live(site_id, conn) + except ValueError: + raise HTTPException( + status_code=404, + detail="No controllable Modbus inverter for this site", + ) from None + except Exception as e: + logger.warning("get_control_registers_live site=%s: %s", site_id, e) + raise HTTPException( + status_code=503, + detail=f"Modbus read failed: {e}", + ) from e + return DeyeRegistersLiveResponse(**payload) + + +class ModbusJournalCommandRow(BaseModel): + id: int + register: int + register_name: str | None + value_to_write: int + value_written: int | None + value_verified: int | None + status: str + attempt_count: int + created_at: str + + +class ModbusJournalListResponse(BaseModel): + commands: list[ModbusJournalCommandRow] + + +@router.get( + "/{site_id}/control/journal", + response_model=ModbusJournalListResponse, +) +async def get_control_command_journal( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], + limit: int = Query(50, ge=1, le=100), +) -> ModbusJournalListResponse: + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + rows = await fetch_json( + conn, + "select ems.fn_modbus_journal_list($1::int, $2::int)", + site_id, + limit, + ) + if not isinstance(rows, list): + rows = json.loads(rows) if isinstance(rows, str) else [] + cmds: list[ModbusJournalCommandRow] = [] + for r in rows: + d = r if isinstance(r, dict) else {} + ca = d["created_at"] + cmds.append( + ModbusJournalCommandRow( + id=int(d["id"]), + register=int(d["register"]), + register_name=d.get("register_name"), + value_to_write=int(d["value_to_write"]), + value_written=int(d["value_written"]) + if d.get("value_written") is not None + else None, + value_verified=int(d["value_verified"]) + if d.get("value_verified") is not None + else None, + status=str(d["status"]), + attempt_count=int(d["attempt_count"]), + created_at=ca if isinstance(ca, str) else str(ca), + ) + ) + return ModbusJournalListResponse(commands=cmds) + + +@router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse) +async def post_run_site_forecast( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], +) -> ForecastRunResponse: + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + try: + intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) + except Exception as e: + logger.error("Forecast failed: %s", e, exc_info=True) + raise HTTPException(status_code=422, detail=str(e)) from e + if intervals >= 0: + await refresh_negative_price_predictions(conn, site_id) + if intervals < 0: + raise HTTPException( + status_code=422, + detail="Forecast se nepodařilo stáhnout nebo zpracovat", + ) + return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays) + + +@router.get("/{site_id}/forecast/pv") +async def get_site_forecast_pv( + site_id: int, + db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], + date_str: str | None = Query( + None, alias="date", description="YYYY-MM-DD, default tomorrow" + ), +) -> dict[str, list[dict[str, Any]]]: + if date_str is None: + date_str = (date.today() + timedelta(days=1)).isoformat() + d = _parse_ymd(date_str) + async with db.acquire() as conn: + site_ok = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id + ) + if not site_ok: + raise HTTPException(status_code=404, detail="Site not found") + split = await fetch_json( + conn, + "select ems.fn_forecast_pv_split($1::int, $2::date)", + site_id, + d, + ) + if not isinstance(split, dict): + split = json.loads(split) if isinstance(split, str) else {} + pv_a = split.get("pv_a") or [] + pv_b = split.get("pv_b") or [] + if not isinstance(pv_a, list): + pv_a = [] + if not isinstance(pv_b, list): + pv_b = [] + return {"pv_a": pv_a, "pv_b": pv_b} diff --git a/db/routines/R__073_fn_health_site_jobs_mode_bundle.sql b/db/routines/R__073_fn_health_site_jobs_mode_bundle.sql new file mode 100644 index 0000000..9e24f63 --- /dev/null +++ b/db/routines/R__073_fn_health_site_jobs_mode_bundle.sql @@ -0,0 +1,141 @@ +-- Read-modely: health, aktivní lokality pro joby, včerejší ekonomika (Discord), Loxone po změně režimu. + +create or replace function ems.fn_health_summary() +returns jsonb +language sql +stable +as $fn$ + select jsonb_build_object( + 'db', 'ok', + 'active_plan_slots', ( + select count(*)::bigint + from ems.planning_interval pi + inner join ems.planning_run pr on pr.id = pi.run_id + where pr.status = 'active' + ), + 'timestamp', to_jsonb(now() at time zone 'utc') + ); +$fn$; + +comment on function ems.fn_health_summary() is + 'Lehký health payload (COUNT aktivních intervalů + čas UTC).'; + +create or replace function ems.fn_health_detailed_db() +returns jsonb +language sql +stable +as $fn$ + select jsonb_build_object( + 'last_telemetry_age_sec', ( + select case + when max(ti.measured_at) is null then -1 + else greatest( + 0, + extract(epoch from (now() - max(ti.measured_at)))::int + ) + end + from ems.telemetry_inverter ti + ), + 'last_plan_age_sec', ( + select case + when max(pr.created_at) is null then -1 + else greatest( + 0, + extract(epoch from (now() - max(pr.created_at)))::int + ) + end + from ems.planning_run pr + where pr.status = 'active' + ) + ); +$fn$; + +comment on function ems.fn_health_detailed_db() is + 'Stáří poslední telemetrie (globální max) a posledního aktivního planning_run.'; + +create or replace function ems.fn_vw_site_directory_active() +returns jsonb +language sql +stable +as $fn$ + select coalesce( + jsonb_agg( + jsonb_build_object( + 'id', sd.id, + 'code', sd.code, + 'name', sd.name, + 'timezone', sd.timezone, + 'latitude', sd.latitude, + 'longitude', sd.longitude, + 'active', sd.active, + 'notes', sd.notes, + 'created_at', sd.created_at + ) + order by sd.id + ), + '[]'::jsonb + ) + from ems.vw_site_directory sd + where sd.active is true; +$fn$; + +comment on function ems.fn_vw_site_directory_active() is + 'Řádky vw_site_directory pro active=true (joby po lokalitách).'; + +create or replace function ems.fn_site_economics_yesterday_notification(p_site_id int) +returns jsonb +language sql +stable +as $fn$ + select to_jsonb(d) + from ( + select + ed.import_kwh, + ed.export_kwh, + ed.import_cost_czk, + ed.export_revenue_czk, + ed.green_bonus_czk, + ed.total_balance_czk, + ed.planned_balance_czk + from ems.vw_economics_daily ed + where ed.site_id = p_site_id + and ed.day_local = ( + (current_timestamp at time zone 'Europe/Prague')::date - 1 + ) + limit 1 + ) d; +$fn$; + +comment on function ems.fn_site_economics_yesterday_notification(int) is + 'Včerejší řádek vw_economics_daily (Europe/Prague) pro denní Discord souhrn.'; + +create or replace function ems.fn_site_mode_loxone_bundle(p_site_id int) +returns jsonb +language sql +stable +as $fn$ + select jsonb_build_object( + 'mode_code', m.mode_code, + 'activated_at', m.activated_at, + 'loxone_mode_value', d.loxone_mode_value, + 'loxone_host', ep.host, + 'loxone_port', ep.port, + 'loxone_protocol', ep.protocol + ) + from ems.site_operating_mode m + join ems.operating_mode_def d on d.code = m.mode_code + left join lateral ( + select se.host, se.port, se.protocol + from ems.site_endpoint se + where se.site_id = p_site_id + and se.endpoint_type = 'loxone_http' + and se.enabled is true + order by se.id + limit 1 + ) ep on true + where m.site_id = p_site_id + limit 1; +$fn$; + +comment on function ems.fn_site_mode_loxone_bundle(int) is + 'Režim + Loxone endpoint po úspěšném zápisu režimu (API odpověď / push).';