Files
ems/backend/app/lifespan.py
Dusan Vojacek f48a7aad61
Some checks failed
CI and deploy / migration-check (push) Failing after 13s
CI and deploy / deploy (push) Has been skipped
zkraceni intervalu planneru na max 35h
2026-04-19 21:09:48 +02:00

478 lines
17 KiB
Python

"""FastAPI lifespan: DB pool, APScheduler joby, telemetrie."""
from __future__ import annotations
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from datetime import date, datetime, timedelta, timezone
from typing import Any
import asyncpg
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI
from zoneinfo import ZoneInfo
from app.db_json import fetch_json
from app.deps import set_pg_pool
from app.refresh_negative_prices import refresh_negative_price_predictions
from app.ws_log_handler import WSLogHandler
from services.audit_filler import fill_audit_for_completed_intervals
from services.control_exporter import export_setpoints, verify_modbus_commands
from services.forecast_service import fetch_pv_forecast
from services.heartbeat_service import send_heartbeat
from services.notification_service import notify_operating_mode_changed
from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete
from services.telemetry_collector import run_telemetry_loop_wrapper
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague"))
def _dsn() -> str:
host = os.getenv("DB_HOST", "localhost")
port = os.getenv("DB_PORT", "5432")
name = os.getenv("DB_NAME", "ems")
user = os.getenv("DB_USER", "ems_user")
password = os.getenv("DB_PASSWORD", "")
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
async def _active_site_rows(conn: asyncpg.Connection) -> list[dict[str, Any]]:
raw = await fetch_json(conn, "select ems.fn_vw_site_directory_active()")
if not isinstance(raw, list):
return []
return [x for x in raw if isinstance(x, dict)]
@asynccontextmanager
async def lifespan(app: FastAPI):
pg_pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5)
set_pg_pool(pg_pool)
app.state.pg_pool = pg_pool
app.state.ws_log_handler = WSLogHandler()
app.state.ws_log_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(app.state.ws_log_handler)
from services.planning_engine import run_daily_plan, run_rolling_replan
async def scheduled_heartbeat() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
await send_heartbeat(int(site["id"]), conn)
except Exception:
logger.exception("scheduled_heartbeat site=%s failed", site["id"])
async def scheduled_audit_filler() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
await fill_audit_for_completed_intervals(int(site["id"]), conn)
except Exception:
logger.exception("scheduled_audit_filler site=%s failed", site["id"])
async def scheduled_forecast_accuracy() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
n = await conn.fetchval(
"SELECT ems.fn_fill_forecast_accuracy($1, 48)",
site["id"],
)
if n:
logger.info(
"forecast_accuracy filled %s slots for site %s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_forecast_accuracy site=%s failed", site["id"]
)
async def scheduled_expire_modes() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()")
for r in rows:
await notify_operating_mode_changed(
str(r["site_code"]),
str(r["old_mode"]),
str(r["new_mode"]),
"system:expiry",
"Automatické vypršení dočasného režimu",
)
except Exception:
logger.exception("scheduled_expire_modes failed")
async def scheduled_control_export() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
await export_setpoints(int(site["id"]), conn)
except Exception as e:
logger.exception(
"scheduled_control_export site=%s: %s", site["id"], e
)
async def scheduled_verify_modbus() -> None:
"""
Ověří příkazy ve stavu written z posledních 20 minut.
Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu).
"""
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
site_id = int(site["id"])
try:
id_json = await fetch_json(
conn,
"select ems.fn_modbus_written_command_ids($1::int, interval '20 minutes')",
site_id,
)
if not isinstance(id_json, list):
id_json = []
ids = [int(x) for x in id_json]
if ids:
await verify_modbus_commands(ids, conn, site_id)
except Exception:
logger.exception("scheduled_verify_modbus site=%s failed", site_id)
async def scheduled_daily_plan() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
site_id = int(site["id"])
try:
await run_daily_plan(site_id, conn)
await export_setpoints(site_id, conn)
except Exception:
logger.exception("scheduled_daily_plan site=%s failed", site_id)
async def scheduled_rolling_replan() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
site_id = int(site["id"])
try:
await run_rolling_replan(site_id, conn)
await export_setpoints(site_id, conn)
except Exception:
logger.exception("scheduled_rolling_replan site=%s failed", site_id)
async def scheduled_baseline_update() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
n = await conn.fetchval(
"SELECT ems.fn_update_baseline_stats($1, 30)",
site["id"],
)
logger.info(
"baseline_stats updated %s rows for site %s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_baseline_update site=%s failed", site["id"]
)
async def scheduled_market_price_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
n = await conn.fetchval(
"SELECT ems.fn_update_market_price_stats($1, 90)",
site["id"],
)
logger.info(
"market_price_stats updated %s rows site=%s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_market_price_stats site=%s failed", site["id"]
)
async def scheduled_tuv_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
n = await conn.fetchval(
"SELECT ems.fn_update_tuv_usage_stats($1, 30)",
site["id"],
)
logger.info(
"tuv_usage_stats updated %s rows site=%s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_tuv_usage_stats site=%s failed", site["id"]
)
async def scheduled_forecast_refresh() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
site_id = int(site["id"])
try:
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
if intervals >= 0:
logger.info(
"scheduled_forecast_refresh site=%s intervals=%s arrays=%s",
site_id,
intervals,
pv_arrays,
)
await refresh_negative_price_predictions(conn, site_id)
else:
logger.warning(
"scheduled_forecast_refresh site=%s failed",
site_id,
)
except Exception:
logger.exception("scheduled_forecast_refresh site=%s failed", site_id)
async def _count_ote_slots_for_day(
conn: asyncpg.Connection, target_day: date
) -> int:
return int(
await conn.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start::date = $1::date
""",
target_day,
)
or 0
)
async def _refresh_negative_price_predictions_all_active(
conn: asyncpg.Connection,
) -> None:
for site in await _active_site_rows(conn):
await refresh_negative_price_predictions(conn, int(site["id"]))
async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None:
"""Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site."""
prague_tz = ZoneInfo("Europe/Prague")
now_loc = datetime.now(prague_tz)
today = now_loc.date()
tomorrow = today + timedelta(days=1)
any_import_ok = False
for day in (today, tomorrow):
slots = await _count_ote_slots_for_day(conn, day)
if ote_prague_day_slots_look_complete(slots):
continue
n, imported_day, _, err = await import_ote_prices(
conn, site_id=None, target_date=day
)
if n < 0:
logger.warning(
"scheduled_ote_import_global day=%s failed (%s)",
day.isoformat(),
err,
)
continue
logger.info(
"scheduled_ote_import_global day=%s imported=%s slots",
imported_day,
n,
)
any_import_ok = True
if any_import_ok:
await _refresh_negative_price_predictions_all_active(conn)
async def scheduled_ote_import() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
await _scheduled_ote_import_global(conn)
except Exception:
logger.exception("scheduled_ote_import_global failed")
scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat")
scheduler.add_job(
scheduled_audit_filler,
"cron",
minute="1,16,31,46",
second=0,
id="audit_filler",
)
scheduler.add_job(
scheduled_forecast_accuracy,
"cron",
minute="2,17,32,47",
id="forecast_accuracy",
replace_existing=True,
)
scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes")
scheduler.add_job(
scheduled_control_export,
"cron",
minute="14,29,44,59",
second=0,
id="control_export",
)
scheduler.add_job(
scheduled_verify_modbus,
"interval",
minutes=2,
id="verify_modbus",
replace_existing=True,
)
scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan")
scheduler.add_job(
scheduled_rolling_replan,
"cron",
minute="*/15",
id="rolling_replan",
)
scheduler.add_job(
scheduled_baseline_update,
"cron",
hour=0,
minute=30,
id="baseline_update",
replace_existing=True,
)
scheduler.add_job(
scheduled_market_price_stats,
"cron",
hour=14,
minute=45,
id="market_price_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_tuv_usage_stats,
"cron",
hour=0,
minute=45,
id="tuv_usage_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=13,
minute=30,
id="ote_import_preopen",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour="13,14",
minute=15,
id="ote_import_retry_early",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour="13,14",
minute=45,
id="ote_import_retry_late",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=14,
minute=0,
id="ote_import_main",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=0,
minute=5,
id="ote_import_backfill",
replace_existing=True,
)
scheduler.add_job(
scheduled_forecast_refresh,
"cron",
hour="*/2",
minute=5,
id="forecast_refresh_2h",
replace_existing=True,
)
async def scheduled_daily_economics_notification() -> None:
from services.notification_service import notify_daily_economics
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
site_id = int(site["id"])
site_code = str(site["code"])
try:
row = await fetch_json(
conn,
"select ems.fn_site_economics_yesterday_notification($1::int)",
site_id,
)
if row is None or not isinstance(row, dict) or not row:
continue
yesterday = (
datetime.now(ZoneInfo("Europe/Prague")) - timedelta(days=1)
).strftime("%Y-%m-%d")
await notify_daily_economics(
site_code=site_code,
day=yesterday,
import_kwh=float(row.get("import_kwh") or 0),
import_cost=float(row.get("import_cost_czk") or 0),
export_kwh=float(row.get("export_kwh") or 0),
export_revenue=float(row.get("export_revenue_czk") or 0),
green_bonus=float(row.get("green_bonus_czk") or 0),
total_balance=float(row.get("total_balance_czk") or 0),
planned_balance=float(row["planned_balance_czk"])
if row.get("planned_balance_czk") is not None
else None,
)
except Exception:
logger.exception(
"scheduled_daily_economics_notification site=%s failed",
site_id,
)
scheduler.add_job(
scheduled_daily_economics_notification,
"cron",
hour=7,
minute=0,
id="daily_economics_notification",
replace_existing=True,
)
scheduler.start()
telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool))
app.state.telemetry_task = telemetry_task
yield
ws_h = getattr(app.state, "ws_log_handler", None)
if ws_h is not None:
logging.getLogger().removeHandler(ws_h)
app.state.ws_log_handler = None
telemetry_task.cancel()
try:
await telemetry_task
except asyncio.CancelledError:
pass
scheduler.shutdown(wait=False)
set_pg_pool(None)
app.state.pg_pool = None
await pg_pool.close()