Files
ems/backend/services/notification_service.py
Dusan Vojacek e3776226a4
Some checks failed
CI and deploy / migration-check (push) Failing after 17s
CI and deploy / deploy (push) Has been skipped
implmentace plan guardu
2026-04-19 23:10:25 +02:00

240 lines
7.3 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Discord a další notifikace pro provoz EMS."""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
import asyncpg
import httpx
from app.config import get_settings
logger = logging.getLogger(__name__)
def _discord_level_for_mode_change(activated_by: str) -> str:
if activated_by == "system:mismatch":
return "critical"
if activated_by.startswith("system:"):
return "warning"
return "info"
async def notify_operating_mode_changed(
site_code: str,
previous_mode: str,
new_mode: str,
activated_by: str,
notes: str | None,
*,
level: str | None = None,
) -> None:
lvl = level or _discord_level_for_mode_change(activated_by)
note_line = f"\nPoznámka: {notes}" if notes else ""
msg = (
f"Přepnutí provozního režimu lokalita `{site_code}`\n"
f"**{previous_mode}** → **{new_mode}**\n"
f"Aktivoval: `{activated_by}`{note_line}"
)
await send_discord(msg, level=lvl)
async def _auto_rolling_replan_after_self_sustain_exit(site_id: int) -> None:
"""Po návratu z SELF_SUSTAIN do AUTO přepočítat rolling plán (nové DB spojení)."""
try:
from app.deps import get_pg_pool
from services.planning_engine import run_plan_api
pool = await get_pg_pool()
except Exception as e:
logger.warning("Auto replan after SELF_SUSTAIN→AUTO: pool unavailable: %s", e)
return
try:
async with pool.acquire() as replan_conn:
await run_plan_api(
site_id,
"rolling",
replan_conn,
triggered_by="mode:self_sustain_exit",
)
except Exception as e:
logger.warning(
"Auto rolling replan after SELF_SUSTAIN→AUTO failed: %s",
e,
exc_info=True,
)
async def run_fn_set_mode_with_discord(
conn: asyncpg.Connection,
site_id: int,
mode_code: str,
activated_by: str,
valid_until: datetime | None,
notes: str | None,
*,
notify_level: str | None = None,
) -> str:
"""
Zavolá ems.fn_set_mode_with_context. Při skutečné změně režimu pošle Discord (pokud je webhook).
Vrátí aktuální mode_code z DB po volání.
"""
raw = await conn.fetchval(
"""
select ems.fn_set_mode_with_context($1::int, $2::text, $3::text, $4::timestamptz, $5::text)
""",
site_id,
mode_code,
activated_by,
valid_until,
notes,
)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
prev = ctx.get("previous_mode")
new = ctx.get("new_mode")
if new is None:
new = mode_code
site_code = ctx.get("site_code")
if prev is not None and prev != new:
await notify_operating_mode_changed(
site_code or str(site_id),
str(prev),
str(new),
activated_by,
notes,
level=notify_level,
)
prev_u = str(prev).upper()
new_u = str(new).upper()
if prev_u == "SELF_SUSTAIN" and new_u == "AUTO":
try:
asyncio.get_running_loop().create_task(
_auto_rolling_replan_after_self_sustain_exit(site_id)
)
except RuntimeError:
logger.debug("No event loop; skip auto rolling replan")
return str(new)
async def notify_plan_vs_actual_fatal(
site_code: str,
slot_label: str,
interval_start_utc: datetime,
plan_grid_w: int,
actual_grid_w: int,
deviation_grid_w: int,
reason_code: str,
detail: str,
) -> None:
"""Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot."""
utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
msg = (
f"**Fatální odchylka plán vs. realita (síť)** `{site_code}`\n"
f"Slot: **{slot_label}** (`{utc_label}`)\n"
f"**{reason_code}**: {detail}\n"
f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (actplan): **{deviation_grid_w}** W"
)
await send_discord(msg, level="critical")
async def send_discord(message: str, level: str = "info") -> bool:
"""
Pošle notifikaci na Discord webhook.
level: 'info', 'warning', 'error', 'critical'
Vrátí True při úspěchu.
"""
settings = get_settings()
webhook_url = settings.discord_webhook_url
if not webhook_url:
logger.debug("Discord webhook not configured, skipping notification")
return False
emoji = {"info": "", "warning": "⚠️", "error": "", "critical": "🚨"}.get(level, "")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
webhook_url,
json={
"content": f"{emoji} **EMS Alert** [{level.upper()}]\n{message}",
},
)
resp.raise_for_status()
return True
except Exception as e:
logger.warning("Discord notification failed: %s", e)
return False
async def notify_modbus_mismatch(
asset_code: str,
register: int,
register_name: str,
value_written: int,
value_verified: int,
attempt: int,
) -> None:
msg = (
f"Modbus mismatch na **{asset_code}**\n"
f"Registr: `0x{register:04X}` ({register_name})\n"
f"Zapsáno: `{value_written}` | Přečteno: `{value_verified}`\n"
f"Pokus č. {attempt}"
)
await send_discord(msg, level="error")
async def notify_self_sustain_activated(site_code: str, reason: str) -> None:
msg = (
f"Přepnutí na **SELF_SUSTAIN** lokalita `{site_code}`\n"
f"Důvod: {reason}"
)
await send_discord(msg, level="critical")
async def notify_modbus_clock_verify_exhausted(
site_code: str,
asset_code: str,
written: tuple[int, int, int],
actual: tuple[int, int, int],
) -> None:
msg = (
f"Modbus **systémový čas 6264** po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n"
f"Lokalita `{site_code}`, zařízení `{asset_code}`\n"
f"Zapsáno: `{written}` | Přečteno: `{actual}`\n"
f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky."
)
await send_discord(msg, level="critical")
async def notify_daily_economics(
site_code: str,
day: str,
import_kwh: float,
import_cost: float,
export_kwh: float,
export_revenue: float,
green_bonus: float,
total_balance: float,
planned_balance: float | None,
) -> None:
lines = [
f"Ekonomika **{site_code}** {day}:",
f" Import: {import_kwh:.1f} kWh = {import_cost:.2f}",
f" Export: {export_kwh:.1f} kWh = {export_revenue:.2f}",
]
if green_bonus > 0:
lines.append(f" Zelený bonus: {green_bonus:.2f}")
sign = "+" if total_balance >= 0 else ""
lines.append(f" **BILANCE: {sign}{total_balance:.2f} Kč**")
if planned_balance is not None:
dev = total_balance - planned_balance
dev_sign = "+" if dev >= 0 else ""
lines.append(
f" Plán předpokládal: {planned_balance:+.2f}"
f"(odchylka {dev_sign}{dev:.2f} Kč)"
)
await send_discord("\n".join(lines), level="info")