240 lines
7.3 KiB
Python
240 lines
7.3 KiB
Python
"""Discord a další notifikace pro provoz EMS."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
|
||
import asyncpg
|
||
import httpx
|
||
|
||
from app.config import get_settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _discord_level_for_mode_change(activated_by: str) -> str:
|
||
if activated_by == "system:mismatch":
|
||
return "critical"
|
||
if activated_by.startswith("system:"):
|
||
return "warning"
|
||
return "info"
|
||
|
||
|
||
async def notify_operating_mode_changed(
|
||
site_code: str,
|
||
previous_mode: str,
|
||
new_mode: str,
|
||
activated_by: str,
|
||
notes: str | None,
|
||
*,
|
||
level: str | None = None,
|
||
) -> None:
|
||
lvl = level or _discord_level_for_mode_change(activated_by)
|
||
note_line = f"\nPoznámka: {notes}" if notes else ""
|
||
msg = (
|
||
f"Přepnutí provozního režimu – lokalita `{site_code}`\n"
|
||
f"**{previous_mode}** → **{new_mode}**\n"
|
||
f"Aktivoval: `{activated_by}`{note_line}"
|
||
)
|
||
await send_discord(msg, level=lvl)
|
||
|
||
|
||
async def _auto_rolling_replan_after_self_sustain_exit(site_id: int) -> None:
|
||
"""Po návratu z SELF_SUSTAIN do AUTO přepočítat rolling plán (nové DB spojení)."""
|
||
try:
|
||
from app.deps import get_pg_pool
|
||
from services.planning_engine import run_plan_api
|
||
|
||
pool = await get_pg_pool()
|
||
except Exception as e:
|
||
logger.warning("Auto replan after SELF_SUSTAIN→AUTO: pool unavailable: %s", e)
|
||
return
|
||
try:
|
||
async with pool.acquire() as replan_conn:
|
||
await run_plan_api(
|
||
site_id,
|
||
"rolling",
|
||
replan_conn,
|
||
triggered_by="mode:self_sustain_exit",
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"Auto rolling replan after SELF_SUSTAIN→AUTO failed: %s",
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
|
||
|
||
async def run_fn_set_mode_with_discord(
|
||
conn: asyncpg.Connection,
|
||
site_id: int,
|
||
mode_code: str,
|
||
activated_by: str,
|
||
valid_until: datetime | None,
|
||
notes: str | None,
|
||
*,
|
||
notify_level: str | None = None,
|
||
) -> str:
|
||
"""
|
||
Zavolá ems.fn_set_mode_with_context. Při skutečné změně režimu pošle Discord (pokud je webhook).
|
||
Vrátí aktuální mode_code z DB po volání.
|
||
"""
|
||
raw = await conn.fetchval(
|
||
"""
|
||
select ems.fn_set_mode_with_context($1::int, $2::text, $3::text, $4::timestamptz, $5::text)
|
||
""",
|
||
site_id,
|
||
mode_code,
|
||
activated_by,
|
||
valid_until,
|
||
notes,
|
||
)
|
||
ctx = raw if isinstance(raw, dict) else json.loads(raw)
|
||
prev = ctx.get("previous_mode")
|
||
new = ctx.get("new_mode")
|
||
if new is None:
|
||
new = mode_code
|
||
site_code = ctx.get("site_code")
|
||
if prev is not None and prev != new:
|
||
await notify_operating_mode_changed(
|
||
site_code or str(site_id),
|
||
str(prev),
|
||
str(new),
|
||
activated_by,
|
||
notes,
|
||
level=notify_level,
|
||
)
|
||
prev_u = str(prev).upper()
|
||
new_u = str(new).upper()
|
||
if prev_u == "SELF_SUSTAIN" and new_u == "AUTO":
|
||
try:
|
||
asyncio.get_running_loop().create_task(
|
||
_auto_rolling_replan_after_self_sustain_exit(site_id)
|
||
)
|
||
except RuntimeError:
|
||
logger.debug("No event loop; skip auto rolling replan")
|
||
return str(new)
|
||
|
||
|
||
async def notify_plan_vs_actual_fatal(
|
||
site_code: str,
|
||
slot_label: str,
|
||
interval_start_utc: datetime,
|
||
plan_grid_w: int,
|
||
actual_grid_w: int,
|
||
deviation_grid_w: int,
|
||
reason_code: str,
|
||
detail: str,
|
||
) -> None:
|
||
"""Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot."""
|
||
utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||
msg = (
|
||
f"**Fatální odchylka plán vs. realita (síť)** – `{site_code}`\n"
|
||
f"Slot: **{slot_label}** (`{utc_label}`)\n"
|
||
f"**{reason_code}**: {detail}\n"
|
||
f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (act−plan): **{deviation_grid_w}** W"
|
||
)
|
||
await send_discord(msg, level="critical")
|
||
|
||
|
||
async def send_discord(message: str, level: str = "info") -> bool:
|
||
"""
|
||
Pošle notifikaci na Discord webhook.
|
||
level: 'info', 'warning', 'error', 'critical'
|
||
Vrátí True při úspěchu.
|
||
"""
|
||
settings = get_settings()
|
||
webhook_url = settings.discord_webhook_url
|
||
if not webhook_url:
|
||
logger.debug("Discord webhook not configured, skipping notification")
|
||
return False
|
||
|
||
emoji = {"info": "ℹ️", "warning": "⚠️", "error": "❌", "critical": "🚨"}.get(level, "ℹ️")
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10) as client:
|
||
resp = await client.post(
|
||
webhook_url,
|
||
json={
|
||
"content": f"{emoji} **EMS Alert** [{level.upper()}]\n{message}",
|
||
},
|
||
)
|
||
resp.raise_for_status()
|
||
return True
|
||
except Exception as e:
|
||
logger.warning("Discord notification failed: %s", e)
|
||
return False
|
||
|
||
|
||
async def notify_modbus_mismatch(
|
||
asset_code: str,
|
||
register: int,
|
||
register_name: str,
|
||
value_written: int,
|
||
value_verified: int,
|
||
attempt: int,
|
||
) -> None:
|
||
msg = (
|
||
f"Modbus mismatch na **{asset_code}**\n"
|
||
f"Registr: `0x{register:04X}` ({register_name})\n"
|
||
f"Zapsáno: `{value_written}` | Přečteno: `{value_verified}`\n"
|
||
f"Pokus č. {attempt}"
|
||
)
|
||
await send_discord(msg, level="error")
|
||
|
||
|
||
async def notify_self_sustain_activated(site_code: str, reason: str) -> None:
|
||
msg = (
|
||
f"Přepnutí na **SELF_SUSTAIN** – lokalita `{site_code}`\n"
|
||
f"Důvod: {reason}"
|
||
)
|
||
await send_discord(msg, level="critical")
|
||
|
||
|
||
async def notify_modbus_clock_verify_exhausted(
|
||
site_code: str,
|
||
asset_code: str,
|
||
written: tuple[int, int, int],
|
||
actual: tuple[int, int, int],
|
||
) -> None:
|
||
msg = (
|
||
f"Modbus **systémový čas 62–64** – po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n"
|
||
f"Lokalita `{site_code}`, zařízení `{asset_code}`\n"
|
||
f"Zapsáno: `{written}` | Přečteno: `{actual}`\n"
|
||
f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky."
|
||
)
|
||
await send_discord(msg, level="critical")
|
||
|
||
|
||
async def notify_daily_economics(
|
||
site_code: str,
|
||
day: str,
|
||
import_kwh: float,
|
||
import_cost: float,
|
||
export_kwh: float,
|
||
export_revenue: float,
|
||
green_bonus: float,
|
||
total_balance: float,
|
||
planned_balance: float | None,
|
||
) -> None:
|
||
lines = [
|
||
f"Ekonomika **{site_code}** {day}:",
|
||
f" Import: {import_kwh:.1f} kWh = {import_cost:.2f} Kč",
|
||
f" Export: {export_kwh:.1f} kWh = {export_revenue:.2f} Kč",
|
||
]
|
||
if green_bonus > 0:
|
||
lines.append(f" Zelený bonus: {green_bonus:.2f} Kč")
|
||
sign = "+" if total_balance >= 0 else ""
|
||
lines.append(f" **BILANCE: {sign}{total_balance:.2f} Kč**")
|
||
if planned_balance is not None:
|
||
dev = total_balance - planned_balance
|
||
dev_sign = "+" if dev >= 0 else ""
|
||
lines.append(
|
||
f" Plán předpokládal: {planned_balance:+.2f} Kč "
|
||
f"(odchylka {dev_sign}{dev:.2f} Kč)"
|
||
)
|
||
await send_discord("\n".join(lines), level="info")
|