Files
ems/backend/services/notification_service.py
Dusan Vojacek 6074535d96
Some checks failed
CI and deploy / migration-check (push) Failing after 25s
CI and deploy / deploy (push) Has been skipped
OTE informatin discord
2026-04-29 14:17:24 +02:00

397 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Discord a další notifikace pro provoz EMS."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
import asyncpg
import httpx
from app.config import get_settings
logger = logging.getLogger(__name__)
_WEBHOOK_CACHE: dict[tuple[int, str], str] = {}
_OTE_IMPORT_ALERT_CACHE: dict[tuple[str, str], float] = {}
_OTE_IMPORT_OK_CACHE: dict[str, float] = {}
async def _get_site_webhook_url(
conn: asyncpg.Connection | None,
site_id: int | None,
kind: str,
) -> str:
"""
kind: 'daily' | 'error'
Fallback: settings.discord_webhook_url
"""
settings = get_settings()
if site_id is None:
return settings.discord_webhook_url
cache_key = (int(site_id), str(kind))
cached = _WEBHOOK_CACHE.get(cache_key)
if cached is not None:
return cached
if conn is None:
return settings.discord_webhook_url
col = "discord_webhook_daily_url" if kind == "daily" else "discord_webhook_error_url"
try:
url = await conn.fetchval(
f"select {col} from ems.site where id = $1::int",
int(site_id),
)
except Exception:
logger.exception("Failed to load site webhook url site_id=%s kind=%s", site_id, kind)
url = None
final = str(url or settings.discord_webhook_url or "")
_WEBHOOK_CACHE[cache_key] = final
return final
def _discord_level_for_mode_change(activated_by: str) -> str:
if activated_by == "system:mismatch":
return "critical"
if activated_by.startswith("system:"):
return "warning"
return "info"
async def notify_operating_mode_changed(
conn: asyncpg.Connection | None,
site_id: int | None,
site_code: str,
previous_mode: str,
new_mode: str,
activated_by: str,
notes: str | None,
*,
level: str | None = None,
) -> None:
lvl = level or _discord_level_for_mode_change(activated_by)
note_line = f"\nPoznámka: {notes}" if notes else ""
msg = (
f"Přepnutí provozního režimu lokalita `{site_code}`\n"
f"**{previous_mode}** → **{new_mode}**\n"
f"Aktivoval: `{activated_by}`{note_line}"
)
await send_discord(conn, site_id, msg, level=lvl)
async def _auto_rolling_replan_after_self_sustain_exit(site_id: int) -> None:
"""Po návratu z SELF_SUSTAIN do AUTO přepočítat rolling plán (nové DB spojení)."""
try:
from app.deps import get_pg_pool
from services.planning_engine import run_plan_api
pool = await get_pg_pool()
except Exception as e:
logger.warning("Auto replan after SELF_SUSTAIN→AUTO: pool unavailable: %s", e)
return
try:
async with pool.acquire() as replan_conn:
await run_plan_api(
site_id,
"rolling",
replan_conn,
triggered_by="mode:self_sustain_exit",
)
except Exception as e:
logger.warning(
"Auto rolling replan after SELF_SUSTAIN→AUTO failed: %s",
e,
exc_info=True,
)
async def run_fn_set_mode_with_discord(
conn: asyncpg.Connection,
site_id: int,
mode_code: str,
activated_by: str,
valid_until: datetime | None,
notes: str | None,
*,
notify_level: str | None = None,
) -> str:
"""
Zavolá ems.fn_set_mode_with_context. Při skutečné změně režimu pošle Discord (pokud je webhook).
Vrátí aktuální mode_code z DB po volání.
"""
raw = await conn.fetchval(
"""
select ems.fn_set_mode_with_context($1::int, $2::text, $3::text, $4::timestamptz, $5::text)
""",
site_id,
mode_code,
activated_by,
valid_until,
notes,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
prev = ctx.get("previous_mode")
new = ctx.get("new_mode")
if new is None:
new = mode_code
site_code = ctx.get("site_code")
if prev is not None and prev != new:
await notify_operating_mode_changed(
conn,
site_id,
site_code or str(site_id),
str(prev),
str(new),
activated_by,
notes,
level=notify_level,
)
prev_u = str(prev).upper()
new_u = str(new).upper()
if prev_u == "SELF_SUSTAIN" and new_u == "AUTO":
try:
asyncio.get_running_loop().create_task(
_auto_rolling_replan_after_self_sustain_exit(site_id)
)
except RuntimeError:
logger.debug("No event loop; skip auto rolling replan")
return str(new)
async def notify_plan_vs_actual_fatal(
conn: asyncpg.Connection | None,
site_id: int | None,
site_code: str,
slot_label: str,
interval_start_utc: datetime,
plan_grid_w: int,
actual_grid_w: int,
deviation_grid_w: int,
reason_code: str,
detail: str,
) -> None:
"""Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot."""
utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
msg = (
f"**Fatální odchylka plán vs. realita (síť)** `{site_code}`\n"
f"Slot: **{slot_label}** (`{utc_label}`)\n"
f"**{reason_code}**: {detail}\n"
f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (actplan): **{deviation_grid_w}** W"
)
await send_discord(conn, site_id, msg, level="critical")
async def send_discord(
conn: asyncpg.Connection | None,
site_id: int | None,
message: str,
level: str = "info",
) -> bool:
"""
Pošle notifikaci na Discord webhook.
level: 'info', 'warning', 'error', 'critical'
Vrátí True při úspěchu.
"""
kind = "daily" if level == "info" else "error"
webhook_url = await _get_site_webhook_url(conn, site_id, kind)
if not webhook_url:
logger.debug("Discord webhook not configured, skipping notification")
return False
emoji = {"info": "", "warning": "⚠️", "error": "", "critical": "🚨"}.get(level, "")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
webhook_url,
json={
"content": f"{emoji} **EMS Alert** [{level.upper()}]\n{message}",
},
)
resp.raise_for_status()
return True
except Exception as e:
logger.warning("Discord notification failed: %s", e)
return False
def _should_send_ote_alert(date_str: str, signature: str, *, cooldown_s: float) -> bool:
now = datetime.now(timezone.utc).timestamp()
key = (str(date_str), str(signature))
last = _OTE_IMPORT_ALERT_CACHE.get(key)
if last is not None and (now - last) < cooldown_s:
return False
_OTE_IMPORT_ALERT_CACHE[key] = now
return True
async def notify_ote_import_format_changed(
conn: asyncpg.Connection | None,
*,
report_date: str,
error_detail: str,
url: str,
) -> None:
"""
Discord alert pro situaci, kdy OTE změnilo formát chart-data a import selže na parseru v DB.
Dedup: stejný report_date + stejná chyba se pošle max 1× za cooldown.
"""
signature = (error_detail or "").strip().splitlines()[0][:160]
if not _should_send_ote_alert(report_date, signature, cooldown_s=6 * 3600):
return
detail = (error_detail or "").strip()
if len(detail) > 1600:
detail = detail[:1600] + ""
msg = (
f"**OTE import selhal pravděpodobná změna formátu dat**\n"
f"Report date: `{report_date}`\n"
f"URL: `{url}`\n"
f"Chyba: {detail}\n"
f"Doporučení: zkontrolovat `ems.fn_ote_parse_15m_price_json` (tooltipy / struktura payloadu) "
f"a upravit parser."
)
await send_discord(conn, site_id=None, message=msg, level="critical")
def _should_send_ote_ok(report_date: str, *, cooldown_s: float) -> bool:
now = datetime.now(timezone.utc).timestamp()
key = str(report_date)
last = _OTE_IMPORT_OK_CACHE.get(key)
if last is not None and (now - last) < cooldown_s:
return False
_OTE_IMPORT_OK_CACHE[key] = now
return True
async def notify_ote_import_ok_brief(
conn: asyncpg.Connection | None,
*,
report_date: str,
brief: dict,
url: str,
) -> None:
"""
Info notifikace po úspěšném importu kompletního dne OTE (stručná analýza "co čekat zítra").
Dedup: 1× za cooldown na report_date.
"""
if not _should_send_ote_ok(report_date, cooldown_s=20 * 3600):
return
def _f(x, default: float = 0.0) -> float:
try:
if x is None:
return default
return float(x)
except Exception:
return default
min_p = _f(brief.get("min_price"))
max_p = _f(brief.get("max_price"))
raw_signals = brief.get("signals") or []
signals: list[str] = []
if isinstance(raw_signals, list):
for s in raw_signals[:6]:
if not isinstance(s, dict):
continue
title = str(s.get("title") or s.get("code") or "").strip()
detail = str(s.get("detail") or "").strip()
if title and detail:
signals.append(f"{title} ({detail})")
elif title:
signals.append(title)
if not signals:
signals.append("běžný den (bez extrémů)")
msg = (
f"OTE ceny staženy `{report_date}`\n"
f"URL: `{url}`\n"
f"Min: **{min_p:.3f}** | Max: **{max_p:.3f}** Kč/kWh\n"
f"Signály: " + "; ".join(f"**{s}**" for s in signals)
)
await send_discord(conn, site_id=None, message=msg, level="info")
async def notify_modbus_mismatch(
conn: asyncpg.Connection | None,
site_id: int | None,
asset_code: str,
register: int,
register_name: str,
value_written: int,
value_verified: int,
attempt: int,
) -> None:
msg = (
f"Modbus mismatch na **{asset_code}**\n"
f"Registr: `0x{register:04X}` ({register_name})\n"
f"Zapsáno: `{value_written}` | Přečteno: `{value_verified}`\n"
f"Pokus č. {attempt}"
)
await send_discord(conn, site_id, msg, level="error")
async def notify_self_sustain_activated(
conn: asyncpg.Connection | None,
site_id: int | None,
site_code: str,
reason: str,
) -> None:
msg = (
f"Přepnutí na **SELF_SUSTAIN** lokalita `{site_code}`\n"
f"Důvod: {reason}"
)
await send_discord(conn, site_id, msg, level="critical")
async def notify_modbus_clock_verify_exhausted(
conn: asyncpg.Connection | None,
site_id: int | None,
site_code: str,
asset_code: str,
written: tuple[int, int, int],
actual: tuple[int, int, int],
) -> None:
msg = (
f"Modbus **systémový čas 6264** po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n"
f"Lokalita `{site_code}`, zařízení `{asset_code}`\n"
f"Zapsáno: `{written}` | Přečteno: `{actual}`\n"
f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky."
)
await send_discord(conn, site_id, msg, level="critical")
async def notify_daily_economics(
conn: asyncpg.Connection | None,
site_id: int | None,
site_code: str,
day: str,
import_kwh: float,
import_cost: float,
export_kwh: float,
export_revenue: float,
green_bonus: float,
total_balance: float,
planned_balance: float | None,
) -> None:
lines = [
f"Ekonomika **{site_code}** {day}:",
f" Import: {import_kwh:.1f} kWh = {import_cost:.2f}",
f" Export: {export_kwh:.1f} kWh = {export_revenue:.2f}",
]
if green_bonus > 0:
lines.append(f" Zelený bonus: {green_bonus:.2f}")
sign = "+" if total_balance >= 0 else ""
lines.append(f" **BILANCE: {sign}{total_balance:.2f} Kč**")
if planned_balance is not None:
dev = total_balance - planned_balance
dev_sign = "+" if dev >= 0 else ""
lines.append(
f" Plán předpokládal: {planned_balance:+.2f}"
f"(odchylka {dev_sign}{dev:.2f} Kč)"
)
await send_discord(conn, site_id, "\n".join(lines), level="info")