"""FastAPI lifespan: DB pool, APScheduler joby, telemetrie.""" from __future__ import annotations import asyncio import logging import os from contextlib import asynccontextmanager from datetime import date, datetime, timedelta, timezone from typing import Any import asyncpg from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI from zoneinfo import ZoneInfo from app.db_json import fetch_json from app.deps import set_pg_pool from app.refresh_negative_prices import refresh_negative_price_predictions from app.ws_log_handler import WSLogHandler from services.audit_filler import fill_audit_for_completed_intervals from services.control_exporter import export_setpoints, verify_modbus_commands from services.forecast_service import fetch_pv_forecast from services.heartbeat_service import send_heartbeat from services.notification_service import notify_operating_mode_changed from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete from services.telemetry_collector import run_telemetry_loop_wrapper logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague")) def _dsn() -> str: host = os.getenv("DB_HOST", "localhost") port = os.getenv("DB_PORT", "5432") name = os.getenv("DB_NAME", "ems") user = os.getenv("DB_USER", "ems_user") password = os.getenv("DB_PASSWORD", "") return f"postgresql://{user}:{password}@{host}:{port}/{name}" async def _active_site_rows(conn: asyncpg.Connection) -> list[dict[str, Any]]: raw = await fetch_json(conn, "select ems.fn_vw_site_directory_active()") if not isinstance(raw, list): return [] return [x for x in raw if isinstance(x, dict)] @asynccontextmanager async def lifespan(app: FastAPI): pg_pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5) set_pg_pool(pg_pool) app.state.pg_pool = pg_pool app.state.ws_log_handler = WSLogHandler() app.state.ws_log_handler.setLevel(logging.INFO) logging.getLogger().addHandler(app.state.ws_log_handler) from services.planning_engine import run_daily_plan, run_rolling_replan async def scheduled_heartbeat() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: await send_heartbeat(int(site["id"]), conn) except Exception: logger.exception("scheduled_heartbeat site=%s failed", site["id"]) async def scheduled_audit_filler() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: await fill_audit_for_completed_intervals(int(site["id"]), conn) except Exception: logger.exception("scheduled_audit_filler site=%s failed", site["id"]) async def scheduled_forecast_accuracy() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: n = await conn.fetchval( "SELECT ems.fn_fill_forecast_accuracy($1, 48)", site["id"], ) if n: logger.info( "forecast_accuracy filled %s slots for site %s", n, site["id"], ) except Exception: logger.exception( "scheduled_forecast_accuracy site=%s failed", site["id"] ) async def scheduled_expire_modes() -> None: async with app.state.pg_pool.acquire() as conn: try: rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()") for r in rows: await notify_operating_mode_changed( str(r["site_code"]), str(r["old_mode"]), str(r["new_mode"]), "system:expiry", "Automatické vypršení dočasného režimu", ) except Exception: logger.exception("scheduled_expire_modes failed") async def scheduled_control_export() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: await export_setpoints(int(site["id"]), conn) except Exception as e: logger.exception( "scheduled_control_export site=%s: %s", site["id"], e ) async def scheduled_verify_modbus() -> None: """ Ověří příkazy ve stavu written z posledních 20 minut. Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu). """ async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): site_id = int(site["id"]) try: id_json = await fetch_json( conn, "select ems.fn_modbus_written_command_ids($1::int, interval '20 minutes')", site_id, ) if not isinstance(id_json, list): id_json = [] ids = [int(x) for x in id_json] if ids: await verify_modbus_commands(ids, conn, site_id) except Exception: logger.exception("scheduled_verify_modbus site=%s failed", site_id) async def scheduled_daily_plan() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): site_id = int(site["id"]) try: await run_daily_plan(site_id, conn) await export_setpoints(site_id, conn) except Exception: logger.exception("scheduled_daily_plan site=%s failed", site_id) async def scheduled_rolling_replan() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): site_id = int(site["id"]) try: await run_rolling_replan(site_id, conn) await export_setpoints(site_id, conn) except Exception: logger.exception("scheduled_rolling_replan site=%s failed", site_id) async def scheduled_baseline_update() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: n = await conn.fetchval( "SELECT ems.fn_update_baseline_stats($1, 30)", site["id"], ) logger.info( "baseline_stats updated %s rows for site %s", n, site["id"], ) except Exception: logger.exception( "scheduled_baseline_update site=%s failed", site["id"] ) async def scheduled_market_price_stats() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: n = await conn.fetchval( "SELECT ems.fn_update_market_price_stats($1, 90)", site["id"], ) logger.info( "market_price_stats updated %s rows site=%s", n, site["id"], ) except Exception: logger.exception( "scheduled_market_price_stats site=%s failed", site["id"] ) async def scheduled_tuv_usage_stats() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): try: n = await conn.fetchval( "SELECT ems.fn_update_tuv_usage_stats($1, 30)", site["id"], ) logger.info( "tuv_usage_stats updated %s rows site=%s", n, site["id"], ) except Exception: logger.exception( "scheduled_tuv_usage_stats site=%s failed", site["id"] ) async def scheduled_forecast_refresh() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): site_id = int(site["id"]) try: intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) if intervals >= 0: logger.info( "scheduled_forecast_refresh site=%s intervals=%s arrays=%s", site_id, intervals, pv_arrays, ) await refresh_negative_price_predictions(conn, site_id) else: logger.warning( "scheduled_forecast_refresh site=%s failed", site_id, ) except Exception: logger.exception("scheduled_forecast_refresh site=%s failed", site_id) async def _count_ote_slots_for_day( conn: asyncpg.Connection, target_day: date ) -> int: return int( await conn.fetchval( """ SELECT COUNT(*)::int FROM ems.market_interval_price WHERE market_source = 'OTE_CZ' AND interval_start::date = $1::date """, target_day, ) or 0 ) async def _refresh_negative_price_predictions_all_active( conn: asyncpg.Connection, ) -> None: for site in await _active_site_rows(conn): await refresh_negative_price_predictions(conn, int(site["id"])) async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None: """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site.""" prague_tz = ZoneInfo("Europe/Prague") now_loc = datetime.now(prague_tz) today = now_loc.date() tomorrow = today + timedelta(days=1) any_import_ok = False for day in (today, tomorrow): slots = await _count_ote_slots_for_day(conn, day) if ote_prague_day_slots_look_complete(slots): continue n, imported_day, _, err = await import_ote_prices( conn, site_id=None, target_date=day ) if n < 0: logger.warning( "scheduled_ote_import_global day=%s failed (%s)", day.isoformat(), err, ) continue logger.info( "scheduled_ote_import_global day=%s imported=%s slots", imported_day, n, ) any_import_ok = True if any_import_ok: await _refresh_negative_price_predictions_all_active(conn) async def scheduled_ote_import() -> None: async with app.state.pg_pool.acquire() as conn: try: await _scheduled_ote_import_global(conn) except Exception: logger.exception("scheduled_ote_import_global failed") scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") scheduler.add_job( scheduled_audit_filler, "cron", minute="1,16,31,46", second=0, id="audit_filler", ) scheduler.add_job( scheduled_forecast_accuracy, "cron", minute="2,17,32,47", id="forecast_accuracy", replace_existing=True, ) scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes") scheduler.add_job( scheduled_control_export, "cron", minute="14,29,44,59", second=0, id="control_export", ) scheduler.add_job( scheduled_verify_modbus, "interval", minutes=2, id="verify_modbus", replace_existing=True, ) scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") scheduler.add_job( scheduled_rolling_replan, "cron", minute="*/15", id="rolling_replan", ) scheduler.add_job( scheduled_baseline_update, "cron", hour=0, minute=30, id="baseline_update", replace_existing=True, ) scheduler.add_job( scheduled_market_price_stats, "cron", hour=14, minute=45, id="market_price_stats", replace_existing=True, ) scheduler.add_job( scheduled_tuv_usage_stats, "cron", hour=0, minute=45, id="tuv_usage_stats", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=13, minute=30, id="ote_import_preopen", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=14, minute=0, id="ote_import_main", replace_existing=True, ) scheduler.add_job( scheduled_ote_import, "cron", hour=0, minute=5, id="ote_import_backfill", replace_existing=True, ) scheduler.add_job( scheduled_forecast_refresh, "cron", hour="*/2", minute=5, id="forecast_refresh_2h", replace_existing=True, ) async def scheduled_daily_economics_notification() -> None: from services.notification_service import notify_daily_economics async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): site_id = int(site["id"]) site_code = str(site["code"]) try: row = await fetch_json( conn, "select ems.fn_site_economics_yesterday_notification($1::int)", site_id, ) if row is None or not isinstance(row, dict) or not row: continue yesterday = ( datetime.now(ZoneInfo("Europe/Prague")) - timedelta(days=1) ).strftime("%Y-%m-%d") await notify_daily_economics( site_code=site_code, day=yesterday, import_kwh=float(row.get("import_kwh") or 0), import_cost=float(row.get("import_cost_czk") or 0), export_kwh=float(row.get("export_kwh") or 0), export_revenue=float(row.get("export_revenue_czk") or 0), green_bonus=float(row.get("green_bonus_czk") or 0), total_balance=float(row.get("total_balance_czk") or 0), planned_balance=float(row["planned_balance_czk"]) if row.get("planned_balance_czk") is not None else None, ) except Exception: logger.exception( "scheduled_daily_economics_notification site=%s failed", site_id, ) scheduler.add_job( scheduled_daily_economics_notification, "cron", hour=7, minute=0, id="daily_economics_notification", replace_existing=True, ) scheduler.start() telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool)) app.state.telemetry_task = telemetry_task yield ws_h = getattr(app.state, "ws_log_handler", None) if ws_h is not None: logging.getLogger().removeHandler(ws_h) app.state.ws_log_handler = None telemetry_task.cancel() try: await telemetry_task except asyncio.CancelledError: pass scheduler.shutdown(wait=False) set_pg_pool(None) app.state.pg_pool = None await pg_pool.close()