From e3776226a496477223fbad11a30f364959a6ea86 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Sun, 19 Apr 2026 23:10:25 +0200 Subject: [PATCH] implmentace plan guardu --- CLAUDE.md | 4 +- backend/app/lifespan.py | 16 ++ backend/services/notification_service.py | 23 ++- backend/services/plan_actual_slot_guard.py | 116 ++++++++++++ .../V052__plan_fatal_deviation_sent.sql | 18 ++ ...__040_fn_planning_slot_boundary_prague.sql | 18 +- .../R__076_fn_plan_actual_slot_guard.sql | 177 ++++++++++++++++++ docs/02-architecture.md | 1 + docs/04-modules/modbus-command-journal.md | 3 + 9 files changed, 369 insertions(+), 7 deletions(-) create mode 100644 backend/services/plan_actual_slot_guard.py create mode 100644 db/migration/V052__plan_fatal_deviation_sent.sql create mode 100644 db/routines/R__076_fn_plan_actual_slot_guard.sql diff --git a/CLAUDE.md b/CLAUDE.md index 88fa861..c3e7228 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -68,6 +68,8 @@ Multi-site Energy Management System: optimalizuje FVE, baterii a flexibilní zá Projekt je **SQL-first**: doménová logika, agregace, joiny mezi tabulkami a stabilní čtecí rozhraní patří do **PostgreSQL** (`ems.fn_*`, případně **`ems.vw_*`**). Python (FastAPI, joby) volá DB; neskladá vlastní dotazy nad schématem mimo výjimky níže. +**Formát SQL v repu (`db/migration`, `db/routines`, `db/views`):** odsazení **2 mezery** na úroveň vnoření; **rezervovaná klíčová slova PostgreSQL vždy malými písmeny** (`create table`, `select`, `where`, `references`, …). Identifikátory (`ems.*`, sloupce) **`snake_case`**; typy v deklaracích též malými (`int`, `text`, `timestamptz`, `jsonb`). Nový / upravený SQL v tomto stylu — nesmí se objevovat verzované migrace psané „ALL CAPS keywords“. + - **Preferuj:** novou nebo rozšířenou **`ems.fn_*(…)`** s jasnými parametry; potřebuješ často stejné sloupce z více tabulek → **`ems.vw_*`** (view zapouzdřuje joiny a strukturu DB; z Pythonu je `SELECT … FROM ems.vw_*` v pořádku). - **Nechtěné:** skládání dotazů v Pythonu (**vlastní JOIN / WITH / poddotazy** nad `ems.*` tabulkami). Místo toho funkce nebo view v `db/routines/` / `db/views/` + jedno volání z aplikace. - **Jediné SQL v `backend/services/*.py` a `backend/app/routers/*.py`:** `SELECT 1` / `EXISTS`; **`select ems.fn_*(…)`**; **`SELECT … FROM ems.vw_*`** (read přes view); žádné jiné ad-hoc **`SELECT`/`INSERT`/`UPDATE`**. IO (Modbus, HTTP); **PuLP**; orchestrace scheduleru. @@ -197,7 +199,7 @@ Specifikace z `docs/02-architecture.md`, modulových docs a komentářů v `plan ## Konvence (krátce) - Python: `snake_case`, type hints, Pydantic pro API modely. -- SQL: `snake_case`, explicitní FK; Flyway pořadí `V###__` / repeatable `R__NNN_*.sql` (třímístný prefix = pořadí závislostí mezi fn/vw). +- SQL: viz také odstavec **Formát SQL** u sekce SQL-first výše — **2 mezery** odsazení, **klíčová slova malými písmeny**, `snake_case` identifikátory, explicitní FK; Flyway pořadí `V###__` / repeatable `R__NNN_*.sql` (třímístný prefix = pořadí závislostí mezi fn/vw). - Timescale **continuous aggregate** (CA): komentář k objektu CA je **`COMMENT ON VIEW`**, ne `COMMENT ON MATERIALIZED VIEW` (PG hlásí 42809). Viz `.cursor/rules/timescale-continuous-aggregate.mdc`. - Výkon **W**, energie **Wh**, ceny **Kč/kWh**; čas v DB **`TIMESTAMPTZ` (UTC)**. - NIKDY neupravuj existující V__ migrační soubory po jejich aplikaci na DB. diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py index b3f9391..85f0dc0 100644 --- a/backend/app/lifespan.py +++ b/backend/app/lifespan.py @@ -19,6 +19,7 @@ from app.deps import set_pg_pool from app.refresh_negative_prices import refresh_negative_price_predictions from app.ws_log_handler import WSLogHandler from services.audit_filler import fill_audit_for_completed_intervals +from services.plan_actual_slot_guard import run_plan_actual_slot_guard_for_all_active_sites from services.control_exporter import export_setpoints, verify_modbus_commands from services.forecast_service import fetch_pv_forecast from services.heartbeat_service import send_heartbeat @@ -75,6 +76,13 @@ async def lifespan(app: FastAPI): except Exception: logger.exception("scheduled_audit_filler site=%s failed", site["id"]) + async def scheduled_plan_actual_slot_guard() -> None: + """Po audit filleru: fatální odchylka plán vs. skutečnost (síť) → Discord (dedup v DB).""" + try: + await run_plan_actual_slot_guard_for_all_active_sites(app.state.pg_pool) + except Exception: + logger.exception("scheduled_plan_actual_slot_guard failed") + async def scheduled_forecast_accuracy() -> None: async with app.state.pg_pool.acquire() as conn: for site in await _active_site_rows(conn): @@ -306,6 +314,14 @@ async def lifespan(app: FastAPI): second=0, id="audit_filler", ) + scheduler.add_job( + scheduled_plan_actual_slot_guard, + "cron", + minute="5,20,35,50", + second=0, + id="plan_actual_slot_guard", + replace_existing=True, + ) scheduler.add_job( scheduled_forecast_accuracy, "cron", diff --git a/backend/services/notification_service.py b/backend/services/notification_service.py index 5e50dbe..eb514da 100644 --- a/backend/services/notification_service.py +++ b/backend/services/notification_service.py @@ -5,7 +5,7 @@ from __future__ import annotations import asyncio import json import logging -from datetime import datetime +from datetime import datetime, timezone import asyncpg import httpx @@ -119,6 +119,27 @@ async def run_fn_set_mode_with_discord( return str(new) +async def notify_plan_vs_actual_fatal( + site_code: str, + slot_label: str, + interval_start_utc: datetime, + plan_grid_w: int, + actual_grid_w: int, + deviation_grid_w: int, + reason_code: str, + detail: str, +) -> None: + """Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot.""" + utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + msg = ( + f"**Fatální odchylka plán vs. realita (síť)** – `{site_code}`\n" + f"Slot: **{slot_label}** (`{utc_label}`)\n" + f"**{reason_code}**: {detail}\n" + f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (act−plan): **{deviation_grid_w}** W" + ) + await send_discord(msg, level="critical") + + async def send_discord(message: str, level: str = "info") -> bool: """ Pošle notifikaci na Discord webhook. diff --git a/backend/services/plan_actual_slot_guard.py b/backend/services/plan_actual_slot_guard.py new file mode 100644 index 0000000..44bd44c --- /dev/null +++ b/backend/services/plan_actual_slot_guard.py @@ -0,0 +1,116 @@ +""" +Kontrola plán vs. skutečnost po uzavření 15min slotu. + +Pravidla a dedup INSERT drží ems.fn_plan_actual_slot_guard_site / fn_plan_actual_slot_guard_all_active +(repeatable R__076). Python jen zavolá funkci a pošle Discord podle vrácených alertů. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any + +import asyncpg +from zoneinfo import ZoneInfo + +from app.db_json import fetch_json +from services.notification_service import notify_plan_vs_actual_fatal + +logger = logging.getLogger(__name__) + +_PRAGUE = ZoneInfo("Europe/Prague") + + +def _interval_start_utc(value: Any) -> datetime: + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc) + if isinstance(value, str): + s = value.replace("Z", "+00:00") + dt = datetime.fromisoformat(s) + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + raise TypeError(f"expected datetime or str for interval_start, got {type(value)!r}") + + +def _slot_label_prague(interval_start: datetime) -> str: + loc = interval_start.astimezone(_PRAGUE) + return loc.strftime("%Y-%m-%d %H:%M") + " Europe/Prague" + + +async def _dispatch_site_result(site_payload: dict[str, Any]) -> None: + if site_payload.get("error") == "unknown_site": + logger.warning("plan_actual_slot_guard: unknown site_id=%s", site_payload.get("site_id")) + return + site_code = str(site_payload.get("site_code") or site_payload.get("site_id") or "") + alerts = site_payload.get("alerts") + if not isinstance(alerts, list): + return + for alert in alerts: + if not isinstance(alert, dict): + continue + if not alert.get("notify"): + continue + interval_start = _interval_start_utc(alert["interval_start"]) + reason_code = str(alert.get("reason_code") or "") + detail = str(alert.get("detail") or "") + plan_grid_w = int(alert.get("plan_grid_w") or 0) + actual_grid_w = int(alert.get("actual_grid_w") or 0) + deviation_grid_w = int(alert.get("deviation_grid_w") or 0) + slot_label = _slot_label_prague(interval_start) + await notify_plan_vs_actual_fatal( + site_code=site_code, + slot_label=slot_label, + interval_start_utc=interval_start, + plan_grid_w=plan_grid_w, + actual_grid_w=actual_grid_w, + deviation_grid_w=deviation_grid_w, + reason_code=reason_code, + detail=detail, + ) + logger.warning( + "[site=%s] plan_actual fatal %s slot=%s: %s", + site_payload.get("site_id"), + reason_code, + interval_start.isoformat(), + detail, + ) + + +async def run_plan_actual_slot_guard_for_all_active_sites( + pool: asyncpg.Pool, + *, + now: datetime | None = None, +) -> None: + """Scheduler: jeden dotaz přes aktivní lokality (SQL dedup + klasifikace).""" + async with pool.acquire() as conn: + try: + if now is not None: + raw = await fetch_json( + conn, + "SELECT ems.fn_plan_actual_slot_guard_all_active($1::timestamptz)", + now, + ) + else: + raw = await fetch_json(conn, "SELECT ems.fn_plan_actual_slot_guard_all_active()") + except Exception: + logger.exception("plan_actual_slot_guard fn_plan_actual_slot_guard_all_active failed") + return + if raw is None: + return + if not isinstance(raw, list): + logger.warning("plan_actual_slot_guard: unexpected payload type %s", type(raw)) + return + for site_payload in raw: + if not isinstance(site_payload, dict): + continue + try: + await _dispatch_site_result(site_payload) + except Exception: + logger.exception( + "plan_actual_slot_guard site=%s failed", + site_payload.get("site_id"), + ) diff --git a/db/migration/V052__plan_fatal_deviation_sent.sql b/db/migration/V052__plan_fatal_deviation_sent.sql new file mode 100644 index 0000000..4d7107e --- /dev/null +++ b/db/migration/V052__plan_fatal_deviation_sent.sql @@ -0,0 +1,18 @@ +-- Jednorázové potvrzení odeslání fatálního Discord alertu plán vs. skutečnost (deduplikace po slotu). + +create table ems.plan_fatal_deviation_sent ( + site_id int not null references ems.site (id), + interval_start timestamptz not null, + reason_code text not null, + sent_at timestamptz not null default now(), + primary key (site_id, interval_start) +); + +create index idx_plan_fatal_deviation_sent_sent_at + on ems.plan_fatal_deviation_sent (sent_at desc); + +comment on table ems.plan_fatal_deviation_sent is +'Backend job po uzavření 15min slotu: při fatální odchylce grid plán vs. audit jednou pošle Discord a zapíše řádek (PK site_id + interval_start).'; + +comment on column ems.plan_fatal_deviation_sent.reason_code is +'Kód z ems.fn_plan_actual_slot_guard_site (např. GRID_SIGN_MISMATCH, GRID_EXPORT_SPIKE).'; diff --git a/db/routines/R__040_fn_planning_slot_boundary_prague.sql b/db/routines/R__040_fn_planning_slot_boundary_prague.sql index a68dfc9..cd4e95f 100644 --- a/db/routines/R__040_fn_planning_slot_boundary_prague.sql +++ b/db/routines/R__040_fn_planning_slot_boundary_prague.sql @@ -1,6 +1,12 @@ --- začátek aktuálního (+offset) 15min slotu v Europe/Prague jako timestamptz (UTC instants) +-- Začátek aktuálního (+offset) 15min slotu v Europe/Prague jako timestamptz (UTC instants). +-- Volitelné p_at (např. job po uzavření slotu); null = now(). -create or replace function ems.fn_planning_slot_boundary_prague(p_offset_slots int default 0) +drop function if exists ems.fn_planning_slot_boundary_prague(int); + +create or replace function ems.fn_planning_slot_boundary_prague( + p_offset_slots int default 0, + p_at timestamptz default null +) returns timestamptz language sql stable @@ -13,8 +19,10 @@ as $fn$ ) )::timestamp at time zone 'Europe/Prague' ) + make_interval(mins => coalesce(p_offset_slots, 0) * 15) - from (select now() at time zone 'Europe/Prague' as ts) loc; + from ( + select coalesce(p_at, now()) at time zone 'Europe/Prague' as ts + ) loc; $fn$; -comment on function ems.fn_planning_slot_boundary_prague(int) is - 'Začátek 15min slotu v časové zóně site provozu (Europe/Prague floor); offset v násobcích 15 min.'; +comment on function ems.fn_planning_slot_boundary_prague(int, timestamptz) is +'Začátek 15min slotu (Europe/Prague floor); offset v násobcích 15 min; p_at volitelně místo now().'; diff --git a/db/routines/R__076_fn_plan_actual_slot_guard.sql b/db/routines/R__076_fn_plan_actual_slot_guard.sql new file mode 100644 index 0000000..ef3e609 --- /dev/null +++ b/db/routines/R__076_fn_plan_actual_slot_guard.sql @@ -0,0 +1,177 @@ +-- Kontrola plán vs. audit (síť) po uzavření slotu: pravidla v DB, dedup insert, výstup pro Discord z Pythonu. + +create or replace function ems.fn_plan_actual_slot_guard_site( + p_site_id int, + p_now timestamptz default now() +) +returns jsonb +language sql +volatile +as $fn$ + with v_code as ( + select s.code as site_code + from ems.site s + where s.id = p_site_id + ), + slots as ( + select distinct unnest(array[ + ems.fn_planning_slot_boundary_prague(-1, p_now), + ems.fn_planning_slot_boundary_prague(-2, p_now) + ]) as interval_start + ), + base as ( + select + s.interval_start, + ai.actual_grid_power_w, + ai.deviation_grid_w, + pi.grid_setpoint_w as plan_grid_w + from slots s + inner join ems.audit_interval ai + on ai.site_id = p_site_id + and ai.interval_start = s.interval_start + left join ems.planning_interval pi + on pi.run_id = ai.planning_run_id + and pi.interval_start = ai.interval_start + ), + cls as ( + select + b.interval_start, + b.plan_grid_w, + coalesce(b.actual_grid_power_w, 0) as actual_grid_w, + b.deviation_grid_w, + case + when b.plan_grid_w is null or b.deviation_grid_w is null then null::text + when b.plan_grid_w < -2000 and coalesce(b.actual_grid_power_w, 0) > 2500 + then 'GRID_IMPORT_VS_EXPORT_PLAN' + when b.plan_grid_w <> 0 + and coalesce(b.actual_grid_power_w, 0) <> 0 + and (b.plan_grid_w > 0) + <> (coalesce(b.actual_grid_power_w, 0) > 0) + and least( + abs(b.plan_grid_w), + abs(coalesce(b.actual_grid_power_w, 0)) + ) >= 400 + then 'GRID_SIGN_MISMATCH' + when b.plan_grid_w > -1000 and coalesce(b.actual_grid_power_w, 0) < -4000 + then 'GRID_EXPORT_SPIKE' + when abs(b.deviation_grid_w) >= 10000 and abs(b.plan_grid_w) <= 2500 + then 'GRID_LARGE_DEVIATION' + else null::text + end as reason_code, + case + when b.plan_grid_w is null or b.deviation_grid_w is null then null::text + when b.plan_grid_w < -2000 and coalesce(b.actual_grid_power_w, 0) > 2500 + then format( + 'plán síť %s W vs skutečnost %s W (plán vývoz, skutečnost silný odběr)', + b.plan_grid_w, + coalesce(b.actual_grid_power_w, 0) + ) + when b.plan_grid_w <> 0 + and coalesce(b.actual_grid_power_w, 0) <> 0 + and (b.plan_grid_w > 0) + <> (coalesce(b.actual_grid_power_w, 0) > 0) + and least( + abs(b.plan_grid_w), + abs(coalesce(b.actual_grid_power_w, 0)) + ) >= 400 + then format( + 'plán síť %s W vs skutečnost %s W (opačný směr import/export)', + b.plan_grid_w, + coalesce(b.actual_grid_power_w, 0) + ) + when b.plan_grid_w > -1000 and coalesce(b.actual_grid_power_w, 0) < -4000 + then format( + 'plán síť %s W vs skutečnost %s W (neočekávaný silný vývoz)', + b.plan_grid_w, + coalesce(b.actual_grid_power_w, 0) + ) + when abs(b.deviation_grid_w) >= 10000 and abs(b.plan_grid_w) <= 2500 + then format( + 'odchylka výkonu sítě %s W (plán %s W, skutečnost %s W)', + b.deviation_grid_w, + b.plan_grid_w, + coalesce(b.actual_grid_power_w, 0) + ) + else null::text + end as detail_cs + from base b + ), + ins as ( + insert into ems.plan_fatal_deviation_sent (site_id, interval_start, reason_code) + select + p_site_id, + c.interval_start, + c.reason_code + from cls c + where c.reason_code is not null + on conflict (site_id, interval_start) do nothing + returning interval_start, reason_code + ), + notified as ( + select + c.interval_start, + c.reason_code, + c.detail_cs, + c.plan_grid_w, + c.actual_grid_w, + c.deviation_grid_w + from cls c + inner join ins i + on i.interval_start = c.interval_start + and i.reason_code = c.reason_code + ) + select + case + when not exists (select 1 from v_code) then + jsonb_build_object('error', 'unknown_site', 'site_id', p_site_id) + else + jsonb_build_object( + 'site_id', p_site_id, + 'site_code', (select vc.site_code from v_code vc), + 'alerts', coalesce( + ( + select coalesce( + jsonb_agg( + jsonb_build_object( + 'interval_start', n.interval_start, + 'reason_code', n.reason_code, + 'detail', n.detail_cs, + 'plan_grid_w', n.plan_grid_w, + 'actual_grid_w', n.actual_grid_w, + 'deviation_grid_w', n.deviation_grid_w, + 'notify', true + ) + order by n.interval_start + ), + '[]'::jsonb + ) + from notified n + ), + '[]'::jsonb + ) + ) + end; +$fn$; + +comment on function ems.fn_plan_actual_slot_guard_site(int, timestamptz) is +'Poslední 2 uzavřené 15min sloty: fatální odchylka síť plán vs. audit → insert plan_fatal_deviation_sent (dedup); vrátí JSON s alerts k odeslání na Discord.'; + +create or replace function ems.fn_plan_actual_slot_guard_all_active( + p_now timestamptz default now() +) +returns jsonb +language sql +volatile +as $fn$ + select coalesce( + jsonb_agg( + ems.fn_plan_actual_slot_guard_site((elem->>'id')::int, p_now) + order by (elem->>'id')::int + ), + '[]'::jsonb + ) + from jsonb_array_elements(ems.fn_vw_site_directory_active()) as t(elem); +$fn$; + +comment on function ems.fn_plan_actual_slot_guard_all_active(timestamptz) is +'Projde aktivní lokality (fn_vw_site_directory_active) a zavolá fn_plan_actual_slot_guard_site; pole výsledků pro scheduler.'; diff --git a/docs/02-architecture.md b/docs/02-architecture.md index cce8544..cce852d 100644 --- a/docs/02-architecture.md +++ b/docs/02-architecture.md @@ -31,6 +31,7 @@ │ – planning_engine (denně 15:00) │ │ – control_exporter (každých 15min) │ │ – audit_filler (každých 15min) │ +│ – plan_actual_slot_guard (:05,:20,:35,:50) │ │ – verify_modbus (každé 2 min) │ └──────┬──────────────────────────┬────────────┘ │ Modbus TCP │ HTTP diff --git a/docs/04-modules/modbus-command-journal.md b/docs/04-modules/modbus-command-journal.md index b91d94c..35cf42b 100644 --- a/docs/04-modules/modbus-command-journal.md +++ b/docs/04-modules/modbus-command-journal.md @@ -53,6 +53,9 @@ Implementace: `services/control_exporter.py` — `verify_modbus_commands`, `_ver | Job | Frekvence | Popis | |-----|-----------|--------| | `verify_modbus` | každé **2 min** | Pro každou aktivní site vybere `written` příkazy s `written_at` v posledních **20 min** a zavolá `verify_modbus_commands`. | +| `plan_actual_slot_guard` | **:05, :20, :35, :50** (po `audit_filler`) | `ems.fn_plan_actual_slot_guard_all_active` (+ `plan_actual_slot_guard.py` jen Discord): poslední 2 uzavřené 15min sloty — fatální odchylka **plán vs. audit síť** → **Discord** (`critical`), dedup přes `ems.plan_fatal_deviation_sent`. | + +Plná tabulka jobů je v [`lifespan.py`](../../backend/app/lifespan.py). ## Ruční API