510 lines
18 KiB
Python
510 lines
18 KiB
Python
"""FastAPI lifespan: DB pool, APScheduler joby, telemetrie."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from contextlib import asynccontextmanager
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from fastapi import FastAPI
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from app.db_json import fetch_json
|
|
from app.deps import set_pg_pool
|
|
from app.refresh_negative_prices import refresh_negative_price_predictions
|
|
from app.ws_log_handler import WSLogHandler
|
|
from services.audit_filler import fill_audit_for_completed_intervals
|
|
from services.plan_actual_slot_guard import run_plan_actual_slot_guard_for_all_active_sites
|
|
from services.control_exporter import export_setpoints, verify_modbus_commands
|
|
from services.forecast_service import fetch_pv_forecast
|
|
from services.heartbeat_service import send_heartbeat
|
|
from services.notification_service import notify_operating_mode_changed
|
|
from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete
|
|
from services.telemetry_collector import run_telemetry_loop_wrapper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague"))
|
|
|
|
|
|
def _dsn() -> str:
|
|
host = os.getenv("DB_HOST", "localhost")
|
|
port = os.getenv("DB_PORT", "5432")
|
|
name = os.getenv("DB_NAME", "ems")
|
|
user = os.getenv("DB_USER", "ems_user")
|
|
password = os.getenv("DB_PASSWORD", "")
|
|
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
|
|
|
|
|
|
async def _active_site_rows(conn: asyncpg.Connection) -> list[dict[str, Any]]:
|
|
raw = await fetch_json(conn, "select ems.fn_vw_site_directory_active()")
|
|
if not isinstance(raw, list):
|
|
return []
|
|
return [x for x in raw if isinstance(x, dict)]
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
pg_pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5)
|
|
set_pg_pool(pg_pool)
|
|
app.state.pg_pool = pg_pool
|
|
|
|
# Fail fast if Flyway routines are missing (otherwise heartbeat silently goes stale in FE).
|
|
async with pg_pool.acquire() as conn:
|
|
fn_ok = await conn.fetchval(
|
|
"""
|
|
select exists(
|
|
select 1
|
|
from pg_proc p
|
|
join pg_namespace n on n.oid = p.pronamespace
|
|
where n.nspname = 'ems'
|
|
and p.proname = 'fn_update_heartbeat'
|
|
)
|
|
"""
|
|
)
|
|
if not fn_ok:
|
|
raise RuntimeError("Missing DB routine: ems.fn_update_heartbeat")
|
|
|
|
app.state.ws_log_handler = WSLogHandler()
|
|
app.state.ws_log_handler.setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(app.state.ws_log_handler)
|
|
|
|
from services.planning_engine import run_daily_plan, run_rolling_replan
|
|
|
|
async def scheduled_heartbeat() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
await send_heartbeat(int(site["id"]), conn)
|
|
except Exception:
|
|
logger.exception("scheduled_heartbeat site=%s failed", site["id"])
|
|
|
|
async def scheduled_audit_filler() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
await fill_audit_for_completed_intervals(int(site["id"]), conn)
|
|
except Exception:
|
|
logger.exception("scheduled_audit_filler site=%s failed", site["id"])
|
|
|
|
async def scheduled_plan_actual_slot_guard() -> None:
|
|
"""Po audit filleru: fatální odchylka plán vs. skutečnost (síť) → Discord (dedup v DB)."""
|
|
try:
|
|
await run_plan_actual_slot_guard_for_all_active_sites(app.state.pg_pool)
|
|
except Exception:
|
|
logger.exception("scheduled_plan_actual_slot_guard failed")
|
|
|
|
async def scheduled_forecast_accuracy() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
n = await conn.fetchval(
|
|
"SELECT ems.fn_fill_forecast_accuracy($1, 48)",
|
|
site["id"],
|
|
)
|
|
if n:
|
|
logger.info(
|
|
"forecast_accuracy filled %s slots for site %s",
|
|
n,
|
|
site["id"],
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"scheduled_forecast_accuracy site=%s failed", site["id"]
|
|
)
|
|
|
|
async def scheduled_expire_modes() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
try:
|
|
rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()")
|
|
for r in rows:
|
|
await notify_operating_mode_changed(
|
|
str(r["site_code"]),
|
|
str(r["old_mode"]),
|
|
str(r["new_mode"]),
|
|
"system:expiry",
|
|
"Automatické vypršení dočasného režimu",
|
|
)
|
|
except Exception:
|
|
logger.exception("scheduled_expire_modes failed")
|
|
|
|
async def scheduled_control_export() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
await export_setpoints(int(site["id"]), conn)
|
|
except Exception as e:
|
|
logger.exception(
|
|
"scheduled_control_export site=%s: %s", site["id"], e
|
|
)
|
|
|
|
async def scheduled_verify_modbus() -> None:
|
|
"""
|
|
Ověří příkazy ve stavu written z posledních 20 minut.
|
|
Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu).
|
|
"""
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
site_id = int(site["id"])
|
|
try:
|
|
id_json = await fetch_json(
|
|
conn,
|
|
"select ems.fn_modbus_written_command_ids($1::int, interval '20 minutes')",
|
|
site_id,
|
|
)
|
|
if not isinstance(id_json, list):
|
|
id_json = []
|
|
ids = [int(x) for x in id_json]
|
|
if ids:
|
|
await verify_modbus_commands(ids, conn, site_id)
|
|
except Exception:
|
|
logger.exception("scheduled_verify_modbus site=%s failed", site_id)
|
|
|
|
async def scheduled_daily_plan() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
site_id = int(site["id"])
|
|
try:
|
|
await run_daily_plan(site_id, conn)
|
|
await export_setpoints(site_id, conn)
|
|
except Exception:
|
|
logger.exception("scheduled_daily_plan site=%s failed", site_id)
|
|
|
|
async def scheduled_rolling_replan() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
site_id = int(site["id"])
|
|
try:
|
|
await run_rolling_replan(site_id, conn)
|
|
await export_setpoints(site_id, conn)
|
|
except Exception:
|
|
logger.exception("scheduled_rolling_replan site=%s failed", site_id)
|
|
|
|
async def scheduled_baseline_update() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
n = await conn.fetchval(
|
|
"SELECT ems.fn_update_baseline_stats($1, 30)",
|
|
site["id"],
|
|
)
|
|
logger.info(
|
|
"baseline_stats updated %s rows for site %s",
|
|
n,
|
|
site["id"],
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"scheduled_baseline_update site=%s failed", site["id"]
|
|
)
|
|
|
|
async def scheduled_market_price_stats() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
n = await conn.fetchval(
|
|
"SELECT ems.fn_update_market_price_stats($1, 90)",
|
|
site["id"],
|
|
)
|
|
logger.info(
|
|
"market_price_stats updated %s rows site=%s",
|
|
n,
|
|
site["id"],
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"scheduled_market_price_stats site=%s failed", site["id"]
|
|
)
|
|
|
|
async def scheduled_tuv_usage_stats() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
try:
|
|
n = await conn.fetchval(
|
|
"SELECT ems.fn_update_tuv_usage_stats($1, 30)",
|
|
site["id"],
|
|
)
|
|
logger.info(
|
|
"tuv_usage_stats updated %s rows site=%s",
|
|
n,
|
|
site["id"],
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"scheduled_tuv_usage_stats site=%s failed", site["id"]
|
|
)
|
|
|
|
async def scheduled_forecast_refresh() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
site_id = int(site["id"])
|
|
try:
|
|
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
|
|
if intervals >= 0:
|
|
logger.info(
|
|
"scheduled_forecast_refresh site=%s intervals=%s arrays=%s",
|
|
site_id,
|
|
intervals,
|
|
pv_arrays,
|
|
)
|
|
await refresh_negative_price_predictions(conn, site_id)
|
|
else:
|
|
logger.warning(
|
|
"scheduled_forecast_refresh site=%s failed",
|
|
site_id,
|
|
)
|
|
except Exception:
|
|
logger.exception("scheduled_forecast_refresh site=%s failed", site_id)
|
|
|
|
async def _count_ote_slots_for_day(
|
|
conn: asyncpg.Connection, target_day: date
|
|
) -> int:
|
|
return int(
|
|
await conn.fetchval(
|
|
"""
|
|
SELECT COUNT(*)::int
|
|
FROM ems.market_interval_price
|
|
WHERE market_source = 'OTE_CZ'
|
|
AND interval_start::date = $1::date
|
|
""",
|
|
target_day,
|
|
)
|
|
or 0
|
|
)
|
|
|
|
async def _refresh_negative_price_predictions_all_active(
|
|
conn: asyncpg.Connection,
|
|
) -> None:
|
|
for site in await _active_site_rows(conn):
|
|
await refresh_negative_price_predictions(conn, int(site["id"]))
|
|
|
|
async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None:
|
|
"""Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site."""
|
|
prague_tz = ZoneInfo("Europe/Prague")
|
|
now_loc = datetime.now(prague_tz)
|
|
today = now_loc.date()
|
|
tomorrow = today + timedelta(days=1)
|
|
any_import_ok = False
|
|
|
|
for day in (today, tomorrow):
|
|
slots = await _count_ote_slots_for_day(conn, day)
|
|
if ote_prague_day_slots_look_complete(slots):
|
|
continue
|
|
n, imported_day, _, err = await import_ote_prices(
|
|
conn, site_id=None, target_date=day
|
|
)
|
|
if n < 0:
|
|
logger.warning(
|
|
"scheduled_ote_import_global day=%s failed (%s)",
|
|
day.isoformat(),
|
|
err,
|
|
)
|
|
continue
|
|
logger.info(
|
|
"scheduled_ote_import_global day=%s imported=%s slots",
|
|
imported_day,
|
|
n,
|
|
)
|
|
any_import_ok = True
|
|
|
|
if any_import_ok:
|
|
await _refresh_negative_price_predictions_all_active(conn)
|
|
|
|
async def scheduled_ote_import() -> None:
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
try:
|
|
await _scheduled_ote_import_global(conn)
|
|
except Exception:
|
|
logger.exception("scheduled_ote_import_global failed")
|
|
|
|
scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat")
|
|
scheduler.add_job(
|
|
scheduled_audit_filler,
|
|
"cron",
|
|
minute="1,16,31,46",
|
|
second=0,
|
|
id="audit_filler",
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_plan_actual_slot_guard,
|
|
"cron",
|
|
minute="5,20,35,50",
|
|
second=0,
|
|
id="plan_actual_slot_guard",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_forecast_accuracy,
|
|
"cron",
|
|
minute="2,17,32,47",
|
|
id="forecast_accuracy",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes")
|
|
scheduler.add_job(
|
|
scheduled_control_export,
|
|
"cron",
|
|
minute="14,29,44,59",
|
|
second=0,
|
|
id="control_export",
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_verify_modbus,
|
|
"interval",
|
|
minutes=2,
|
|
id="verify_modbus",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan")
|
|
scheduler.add_job(
|
|
scheduled_rolling_replan,
|
|
"cron",
|
|
minute="*/15",
|
|
id="rolling_replan",
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_baseline_update,
|
|
"cron",
|
|
hour=0,
|
|
minute=30,
|
|
id="baseline_update",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_market_price_stats,
|
|
"cron",
|
|
hour=14,
|
|
minute=45,
|
|
id="market_price_stats",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_tuv_usage_stats,
|
|
"cron",
|
|
hour=0,
|
|
minute=45,
|
|
id="tuv_usage_stats",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_ote_import,
|
|
"cron",
|
|
hour=13,
|
|
minute=25,
|
|
id="ote_import_preopen",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_ote_import,
|
|
"cron",
|
|
hour="13,14",
|
|
minute=12,
|
|
id="ote_import_retry_early",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_ote_import,
|
|
"cron",
|
|
hour="13,14",
|
|
minute=45,
|
|
id="ote_import_retry_late",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_ote_import,
|
|
"cron",
|
|
hour=14,
|
|
minute=0,
|
|
id="ote_import_main",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_ote_import,
|
|
"cron",
|
|
hour=0,
|
|
minute=5,
|
|
id="ote_import_backfill",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
scheduled_forecast_refresh,
|
|
"cron",
|
|
hour="*/2",
|
|
minute=5,
|
|
id="forecast_refresh_2h",
|
|
replace_existing=True,
|
|
)
|
|
|
|
async def scheduled_daily_economics_notification() -> None:
|
|
from services.notification_service import notify_daily_economics
|
|
|
|
async with app.state.pg_pool.acquire() as conn:
|
|
for site in await _active_site_rows(conn):
|
|
site_id = int(site["id"])
|
|
site_code = str(site["code"])
|
|
try:
|
|
row = await fetch_json(
|
|
conn,
|
|
"select ems.fn_site_economics_yesterday_notification($1::int)",
|
|
site_id,
|
|
)
|
|
if row is None or not isinstance(row, dict) or not row:
|
|
continue
|
|
yesterday = (
|
|
datetime.now(ZoneInfo("Europe/Prague")) - timedelta(days=1)
|
|
).strftime("%Y-%m-%d")
|
|
await notify_daily_economics(
|
|
site_code=site_code,
|
|
day=yesterday,
|
|
import_kwh=float(row.get("import_kwh") or 0),
|
|
import_cost=float(row.get("import_cost_czk") or 0),
|
|
export_kwh=float(row.get("export_kwh") or 0),
|
|
export_revenue=float(row.get("export_revenue_czk") or 0),
|
|
green_bonus=float(row.get("green_bonus_czk") or 0),
|
|
total_balance=float(row.get("total_balance_czk") or 0),
|
|
planned_balance=float(row["planned_balance_czk"])
|
|
if row.get("planned_balance_czk") is not None
|
|
else None,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"scheduled_daily_economics_notification site=%s failed",
|
|
site_id,
|
|
)
|
|
|
|
scheduler.add_job(
|
|
scheduled_daily_economics_notification,
|
|
"cron",
|
|
hour=7,
|
|
minute=0,
|
|
id="daily_economics_notification",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.start()
|
|
|
|
telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool))
|
|
app.state.telemetry_task = telemetry_task
|
|
|
|
yield
|
|
|
|
ws_h = getattr(app.state, "ws_log_handler", None)
|
|
if ws_h is not None:
|
|
logging.getLogger().removeHandler(ws_h)
|
|
app.state.ws_log_handler = None
|
|
|
|
telemetry_task.cancel()
|
|
try:
|
|
await telemetry_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
scheduler.shutdown(wait=False)
|
|
set_pg_pool(None)
|
|
app.state.pg_pool = None
|
|
await pg_pool.close()
|