""" Kontrola plán vs. skutečnost po uzavření 15min slotu. Pravidla a dedup INSERT drží ems.fn_plan_actual_slot_guard_site / fn_plan_actual_slot_guard_all_active (repeatable R__076). Python jen zavolá funkci a pošle Discord podle vrácených alertů. """ from __future__ import annotations import logging from datetime import datetime, timezone from typing import Any import asyncpg from zoneinfo import ZoneInfo from app.db_json import fetch_json from services.notification_service import notify_plan_vs_actual_fatal logger = logging.getLogger(__name__) _PRAGUE = ZoneInfo("Europe/Prague") def _interval_start_utc(value: Any) -> datetime: if isinstance(value, datetime): if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) if isinstance(value, str): s = value.replace("Z", "+00:00") dt = datetime.fromisoformat(s) if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) raise TypeError(f"expected datetime or str for interval_start, got {type(value)!r}") def _slot_label_prague(interval_start: datetime) -> str: loc = interval_start.astimezone(_PRAGUE) return loc.strftime("%Y-%m-%d %H:%M") + " Europe/Prague" async def _dispatch_site_result(site_payload: dict[str, Any]) -> None: if site_payload.get("error") == "unknown_site": logger.warning("plan_actual_slot_guard: unknown site_id=%s", site_payload.get("site_id")) return site_code = str(site_payload.get("site_code") or site_payload.get("site_id") or "") alerts = site_payload.get("alerts") if not isinstance(alerts, list): return for alert in alerts: if not isinstance(alert, dict): continue if not alert.get("notify"): continue interval_start = _interval_start_utc(alert["interval_start"]) reason_code = str(alert.get("reason_code") or "") detail = str(alert.get("detail") or "") plan_grid_w = int(alert.get("plan_grid_w") or 0) actual_grid_w = int(alert.get("actual_grid_w") or 0) deviation_grid_w = int(alert.get("deviation_grid_w") or 0) slot_label = _slot_label_prague(interval_start) await notify_plan_vs_actual_fatal( site_code=site_code, slot_label=slot_label, interval_start_utc=interval_start, plan_grid_w=plan_grid_w, actual_grid_w=actual_grid_w, deviation_grid_w=deviation_grid_w, reason_code=reason_code, detail=detail, ) logger.warning( "[site=%s] plan_actual fatal %s slot=%s: %s", site_payload.get("site_id"), reason_code, interval_start.isoformat(), detail, ) async def run_plan_actual_slot_guard_for_all_active_sites( pool: asyncpg.Pool, *, now: datetime | None = None, ) -> None: """Scheduler: jeden dotaz přes aktivní lokality (SQL dedup + klasifikace).""" async with pool.acquire() as conn: try: if now is not None: raw = await fetch_json( conn, "SELECT ems.fn_plan_actual_slot_guard_all_active($1::timestamptz)", now, ) else: raw = await fetch_json(conn, "SELECT ems.fn_plan_actual_slot_guard_all_active()") except Exception: logger.exception("plan_actual_slot_guard fn_plan_actual_slot_guard_all_active failed") return if raw is None: return if not isinstance(raw, list): logger.warning("plan_actual_slot_guard: unexpected payload type %s", type(raw)) return for site_payload in raw: if not isinstance(site_payload, dict): continue try: await _dispatch_site_result(site_payload) except Exception: logger.exception( "plan_actual_slot_guard site=%s failed", site_payload.get("site_id"), )