"""Discord a další notifikace pro provoz EMS.""" from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone import asyncpg import httpx from app.config import get_settings logger = logging.getLogger(__name__) _WEBHOOK_CACHE: dict[tuple[int, str], str] = {} _OTE_IMPORT_ALERT_CACHE: dict[tuple[str, str], float] = {} _OTE_IMPORT_OK_CACHE: dict[str, float] = {} async def _get_site_webhook_url( conn: asyncpg.Connection | None, site_id: int | None, kind: str, ) -> str: """ kind: 'daily' | 'error' Fallback: settings.discord_webhook_url """ settings = get_settings() if site_id is None: return settings.discord_webhook_url cache_key = (int(site_id), str(kind)) cached = _WEBHOOK_CACHE.get(cache_key) if cached is not None: return cached if conn is None: return settings.discord_webhook_url col = "discord_webhook_daily_url" if kind == "daily" else "discord_webhook_error_url" try: url = await conn.fetchval( f"select {col} from ems.site where id = $1::int", int(site_id), ) except Exception: logger.exception("Failed to load site webhook url site_id=%s kind=%s", site_id, kind) url = None final = str(url or settings.discord_webhook_url or "") _WEBHOOK_CACHE[cache_key] = final return final def _discord_level_for_mode_change(activated_by: str) -> str: if activated_by == "system:mismatch": return "critical" if activated_by.startswith("system:"): return "warning" return "info" async def notify_operating_mode_changed( conn: asyncpg.Connection | None, site_id: int | None, site_code: str, previous_mode: str, new_mode: str, activated_by: str, notes: str | None, *, level: str | None = None, ) -> None: lvl = level or _discord_level_for_mode_change(activated_by) note_line = f"\nPoznámka: {notes}" if notes else "" msg = ( f"Přepnutí provozního režimu – lokalita `{site_code}`\n" f"**{previous_mode}** → **{new_mode}**\n" f"Aktivoval: `{activated_by}`{note_line}" ) await send_discord(conn, site_id, msg, level=lvl) async def _auto_rolling_replan_after_self_sustain_exit(site_id: int) -> None: """Po návratu z SELF_SUSTAIN do AUTO přepočítat rolling plán (nové DB spojení).""" try: from app.deps import get_pg_pool from services.planning_engine import run_plan_api pool = await get_pg_pool() except Exception as e: logger.warning("Auto replan after SELF_SUSTAIN→AUTO: pool unavailable: %s", e) return try: async with pool.acquire() as replan_conn: await run_plan_api( site_id, "rolling", replan_conn, triggered_by="mode:self_sustain_exit", ) except Exception as e: logger.warning( "Auto rolling replan after SELF_SUSTAIN→AUTO failed: %s", e, exc_info=True, ) async def run_fn_set_mode_with_discord( conn: asyncpg.Connection, site_id: int, mode_code: str, activated_by: str, valid_until: datetime | None, notes: str | None, *, notify_level: str | None = None, ) -> str: """ Zavolá ems.fn_set_mode_with_context. Při skutečné změně režimu pošle Discord (pokud je webhook). Vrátí aktuální mode_code z DB po volání. """ raw = await conn.fetchval( """ select ems.fn_set_mode_with_context($1::int, $2::text, $3::text, $4::timestamptz, $5::text) """, site_id, mode_code, activated_by, valid_until, notes, ) ctx = raw if isinstance(raw, dict) else json.loads(raw) prev = ctx.get("previous_mode") new = ctx.get("new_mode") if new is None: new = mode_code site_code = ctx.get("site_code") if prev is not None and prev != new: await notify_operating_mode_changed( conn, site_id, site_code or str(site_id), str(prev), str(new), activated_by, notes, level=notify_level, ) prev_u = str(prev).upper() new_u = str(new).upper() if prev_u == "SELF_SUSTAIN" and new_u == "AUTO": try: asyncio.get_running_loop().create_task( _auto_rolling_replan_after_self_sustain_exit(site_id) ) except RuntimeError: logger.debug("No event loop; skip auto rolling replan") return str(new) async def notify_plan_vs_actual_fatal( conn: asyncpg.Connection | None, site_id: int | None, site_code: str, slot_label: str, interval_start_utc: datetime, plan_grid_w: int, actual_grid_w: int, deviation_grid_w: int, reason_code: str, detail: str, ) -> None: """Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot.""" utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") msg = ( f"**Fatální odchylka plán vs. realita (síť)** – `{site_code}`\n" f"Slot: **{slot_label}** (`{utc_label}`)\n" f"**{reason_code}**: {detail}\n" f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (act−plan): **{deviation_grid_w}** W" ) await send_discord(conn, site_id, msg, level="critical") async def send_discord( conn: asyncpg.Connection | None, site_id: int | None, message: str, level: str = "info", ) -> bool: """ Pošle notifikaci na Discord webhook. level: 'info', 'warning', 'error', 'critical' Vrátí True při úspěchu. """ kind = "daily" if level == "info" else "error" webhook_url = await _get_site_webhook_url(conn, site_id, kind) if not webhook_url: logger.debug("Discord webhook not configured, skipping notification") return False emoji = {"info": "ℹ️", "warning": "⚠️", "error": "❌", "critical": "🚨"}.get(level, "ℹ️") try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post( webhook_url, json={ "content": f"{emoji} **EMS Alert** [{level.upper()}]\n{message}", }, ) resp.raise_for_status() return True except Exception as e: logger.warning("Discord notification failed: %s", e) return False def _should_send_ote_alert(date_str: str, signature: str, *, cooldown_s: float) -> bool: now = datetime.now(timezone.utc).timestamp() key = (str(date_str), str(signature)) last = _OTE_IMPORT_ALERT_CACHE.get(key) if last is not None and (now - last) < cooldown_s: return False _OTE_IMPORT_ALERT_CACHE[key] = now return True async def notify_ote_import_format_changed( conn: asyncpg.Connection | None, *, report_date: str, error_detail: str, url: str, ) -> None: """ Discord alert pro situaci, kdy OTE změnilo formát chart-data a import selže na parseru v DB. Dedup: stejný report_date + stejná chyba se pošle max 1× za cooldown. """ signature = (error_detail or "").strip().splitlines()[0][:160] if not _should_send_ote_alert(report_date, signature, cooldown_s=6 * 3600): return detail = (error_detail or "").strip() if len(detail) > 1600: detail = detail[:1600] + "…" msg = ( f"**OTE import selhal – pravděpodobná změna formátu dat**\n" f"Report date: `{report_date}`\n" f"URL: `{url}`\n" f"Chyba: {detail}\n" f"Doporučení: zkontrolovat `ems.fn_ote_parse_15m_price_json` (tooltipy / struktura payloadu) " f"a upravit parser." ) await send_discord(conn, site_id=None, message=msg, level="critical") def _should_send_ote_ok(report_date: str, *, cooldown_s: float) -> bool: now = datetime.now(timezone.utc).timestamp() key = str(report_date) last = _OTE_IMPORT_OK_CACHE.get(key) if last is not None and (now - last) < cooldown_s: return False _OTE_IMPORT_OK_CACHE[key] = now return True async def notify_ote_import_ok_brief( conn: asyncpg.Connection | None, *, report_date: str, brief: dict, url: str, ) -> None: """ Info notifikace po úspěšném importu kompletního dne OTE (stručná analýza "co čekat zítra"). Dedup: 1× za cooldown na report_date. """ if not _should_send_ote_ok(report_date, cooldown_s=20 * 3600): return def _f(x, default: float = 0.0) -> float: try: if x is None: return default return float(x) except Exception: return default min_p = _f(brief.get("min_price")) max_p = _f(brief.get("max_price")) raw_signals = brief.get("signals") or [] signals: list[str] = [] if isinstance(raw_signals, list): for s in raw_signals[:6]: if not isinstance(s, dict): continue title = str(s.get("title") or s.get("code") or "").strip() detail = str(s.get("detail") or "").strip() if title and detail: signals.append(f"{title} ({detail})") elif title: signals.append(title) if not signals: signals.append("běžný den (bez extrémů)") msg = ( f"OTE ceny staženy – `{report_date}`\n" f"URL: `{url}`\n" f"Min: **{min_p:.3f}** | Max: **{max_p:.3f}** Kč/kWh\n" f"Signály: " + "; ".join(f"**{s}**" for s in signals) ) await send_discord(conn, site_id=None, message=msg, level="info") async def notify_modbus_mismatch( conn: asyncpg.Connection | None, site_id: int | None, asset_code: str, register: int, register_name: str, value_written: int, value_verified: int, attempt: int, ) -> None: msg = ( f"Modbus mismatch na **{asset_code}**\n" f"Registr: `0x{register:04X}` ({register_name})\n" f"Zapsáno: `{value_written}` | Přečteno: `{value_verified}`\n" f"Pokus č. {attempt}" ) await send_discord(conn, site_id, msg, level="error") async def notify_self_sustain_activated( conn: asyncpg.Connection | None, site_id: int | None, site_code: str, reason: str, ) -> None: msg = ( f"Přepnutí na **SELF_SUSTAIN** – lokalita `{site_code}`\n" f"Důvod: {reason}" ) await send_discord(conn, site_id, msg, level="critical") async def notify_modbus_clock_verify_exhausted( conn: asyncpg.Connection | None, site_id: int | None, site_code: str, asset_code: str, written: tuple[int, int, int], actual: tuple[int, int, int], ) -> None: msg = ( f"Modbus **systémový čas 62–64** – po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n" f"Lokalita `{site_code}`, zařízení `{asset_code}`\n" f"Zapsáno: `{written}` | Přečteno: `{actual}`\n" f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky." ) await send_discord(conn, site_id, msg, level="critical") async def notify_daily_economics( conn: asyncpg.Connection | None, site_id: int | None, site_code: str, day: str, import_kwh: float, import_cost: float, export_kwh: float, export_revenue: float, green_bonus: float, total_balance: float, planned_balance: float | None, ) -> None: lines = [ f"Ekonomika **{site_code}** {day}:", f" Import: {import_kwh:.1f} kWh = {import_cost:.2f} Kč", f" Export: {export_kwh:.1f} kWh = {export_revenue:.2f} Kč", ] if green_bonus > 0: lines.append(f" Zelený bonus: {green_bonus:.2f} Kč") sign = "+" if total_balance >= 0 else "" lines.append(f" **BILANCE: {sign}{total_balance:.2f} Kč**") if planned_balance is not None: dev = total_balance - planned_balance dev_sign = "+" if dev >= 0 else "" lines.append( f" Plán předpokládal: {planned_balance:+.2f} Kč " f"(odchylka {dev_sign}{dev:.2f} Kč)" ) await send_discord(conn, site_id, "\n".join(lines), level="info")