Files
ems/backend/services/notification_service.py
Dusan Vojacek f8e1eed127
All checks were successful
CI and deploy / migration-check (push) Successful in 6s
CI and deploy / deploy (push) Successful in 29s
fix rs485 s eror self_sustain
2026-04-19 15:29:58 +02:00

223 lines
6.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Discord a další notifikace pro provoz EMS."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
import asyncpg
import httpx
from app.config import get_settings
logger = logging.getLogger(__name__)
def _discord_level_for_mode_change(activated_by: str) -> str:
if activated_by == "system:mismatch":
return "critical"
if activated_by.startswith("system:"):
return "warning"
return "info"
async def notify_operating_mode_changed(
site_code: str,
previous_mode: str,
new_mode: str,
activated_by: str,
notes: str | None,
*,
level: str | None = None,
) -> None:
lvl = level or _discord_level_for_mode_change(activated_by)
note_line = f"\nPoznámka: {notes}" if notes else ""
msg = (
f"Přepnutí provozního režimu lokalita `{site_code}`\n"
f"**{previous_mode}** → **{new_mode}**\n"
f"Aktivoval: `{activated_by}`{note_line}"
)
await send_discord(msg, level=lvl)
async def _auto_rolling_replan_after_self_sustain_exit(site_id: int) -> None:
"""Po návratu z SELF_SUSTAIN do AUTO přepočítat rolling plán (nové DB spojení)."""
try:
from app.deps import get_pg_pool
from services.planning_engine import run_plan_api
pool = await get_pg_pool()
except Exception as e:
logger.warning("Auto replan after SELF_SUSTAIN→AUTO: pool unavailable: %s", e)
return
try:
async with pool.acquire() as replan_conn:
await run_plan_api(
site_id,
"rolling",
replan_conn,
triggered_by="mode:self_sustain_exit",
)
except Exception as e:
logger.warning(
"Auto rolling replan after SELF_SUSTAIN→AUTO failed: %s",
e,
exc_info=True,
)
async def run_fn_set_mode_with_discord(
conn: asyncpg.Connection,
site_id: int,
mode_code: str,
activated_by: str,
valid_until: datetime | None,
notes: str | None,
*,
notify_level: str | None = None,
) -> str:
"""
Zavolá ems.fn_set_mode. Při skutečné změně režimu pošle Discord (pokud je webhook).
Vrátí aktuální mode_code z DB po volání.
"""
prev = await conn.fetchval(
"SELECT mode_code FROM ems.site_operating_mode WHERE site_id = $1",
site_id,
)
await conn.execute(
"SELECT ems.fn_set_mode($1, $2, $3, $4, $5)",
site_id,
mode_code,
activated_by,
valid_until,
notes,
)
new = await conn.fetchval(
"SELECT mode_code FROM ems.site_operating_mode WHERE site_id = $1",
site_id,
)
if new is None:
new = mode_code
if prev is not None and prev != new:
site_code = await conn.fetchval(
"SELECT code FROM ems.site WHERE id = $1", site_id
)
await notify_operating_mode_changed(
site_code or str(site_id),
str(prev),
str(new),
activated_by,
notes,
level=notify_level,
)
prev_u = str(prev).upper()
new_u = str(new).upper()
if prev_u == "SELF_SUSTAIN" and new_u == "AUTO":
try:
asyncio.get_running_loop().create_task(
_auto_rolling_replan_after_self_sustain_exit(site_id)
)
except RuntimeError:
logger.debug("No event loop; skip auto rolling replan")
return str(new)
async def send_discord(message: str, level: str = "info") -> bool:
"""
Pošle notifikaci na Discord webhook.
level: 'info', 'warning', 'error', 'critical'
Vrátí True při úspěchu.
"""
settings = get_settings()
webhook_url = settings.discord_webhook_url
if not webhook_url:
logger.debug("Discord webhook not configured, skipping notification")
return False
emoji = {"info": "", "warning": "⚠️", "error": "", "critical": "🚨"}.get(level, "")
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
webhook_url,
json={
"content": f"{emoji} **EMS Alert** [{level.upper()}]\n{message}",
},
)
resp.raise_for_status()
return True
except Exception as e:
logger.warning("Discord notification failed: %s", e)
return False
async def notify_modbus_mismatch(
asset_code: str,
register: int,
register_name: str,
value_written: int,
value_verified: int,
attempt: int,
) -> None:
msg = (
f"Modbus mismatch na **{asset_code}**\n"
f"Registr: `0x{register:04X}` ({register_name})\n"
f"Zapsáno: `{value_written}` | Přečteno: `{value_verified}`\n"
f"Pokus č. {attempt}"
)
await send_discord(msg, level="error")
async def notify_self_sustain_activated(site_code: str, reason: str) -> None:
msg = (
f"Přepnutí na **SELF_SUSTAIN** lokalita `{site_code}`\n"
f"Důvod: {reason}"
)
await send_discord(msg, level="critical")
async def notify_modbus_clock_verify_exhausted(
site_code: str,
asset_code: str,
written: tuple[int, int, int],
actual: tuple[int, int, int],
) -> None:
msg = (
f"Modbus **systémový čas 6264** po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n"
f"Lokalita `{site_code}`, zařízení `{asset_code}`\n"
f"Zapsáno: `{written}` | Přečteno: `{actual}`\n"
f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky."
)
await send_discord(msg, level="critical")
async def notify_daily_economics(
site_code: str,
day: str,
import_kwh: float,
import_cost: float,
export_kwh: float,
export_revenue: float,
green_bonus: float,
total_balance: float,
planned_balance: float | None,
) -> None:
lines = [
f"Ekonomika **{site_code}** {day}:",
f" Import: {import_kwh:.1f} kWh = {import_cost:.2f}",
f" Export: {export_kwh:.1f} kWh = {export_revenue:.2f}",
]
if green_bonus > 0:
lines.append(f" Zelený bonus: {green_bonus:.2f}")
sign = "+" if total_balance >= 0 else ""
lines.append(f" **BILANCE: {sign}{total_balance:.2f} Kč**")
if planned_balance is not None:
dev = total_balance - planned_balance
dev_sign = "+" if dev >= 0 else ""
lines.append(
f" Plán předpokládal: {planned_balance:+.2f}"
f"(odchylka {dev_sign}{dev:.2f} Kč)"
)
await send_discord("\n".join(lines), level="info")