implmentace plan guardu
Some checks failed
CI and deploy / migration-check (push) Failing after 17s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-04-19 23:10:25 +02:00
parent d8221e3169
commit e3776226a4
9 changed files with 369 additions and 7 deletions

View File

@@ -68,6 +68,8 @@ Multi-site Energy Management System: optimalizuje FVE, baterii a flexibilní zá
Projekt je **SQL-first**: doménová logika, agregace, joiny mezi tabulkami a stabilní čtecí rozhraní patří do **PostgreSQL** (`ems.fn_*`, případně **`ems.vw_*`**). Python (FastAPI, joby) volá DB; neskladá vlastní dotazy nad schématem mimo výjimky níže.
**Formát SQL v repu (`db/migration`, `db/routines`, `db/views`):** odsazení **2 mezery** na úroveň vnoření; **rezervovaná klíčová slova PostgreSQL vždy malými písmeny** (`create table`, `select`, `where`, `references`, …). Identifikátory (`ems.*`, sloupce) **`snake_case`**; typy v deklaracích též malými (`int`, `text`, `timestamptz`, `jsonb`). Nový / upravený SQL v tomto stylu — nesmí se objevovat verzované migrace psané „ALL CAPS keywords“.
- **Preferuj:** novou nebo rozšířenou **`ems.fn_*(…)`** s jasnými parametry; potřebuješ často stejné sloupce z více tabulek → **`ems.vw_*`** (view zapouzdřuje joiny a strukturu DB; z Pythonu je `SELECT … FROM ems.vw_*` v pořádku).
- **Nechtěné:** skládání dotazů v Pythonu (**vlastní JOIN / WITH / poddotazy** nad `ems.*` tabulkami). Místo toho funkce nebo view v `db/routines/` / `db/views/` + jedno volání z aplikace.
- **Jediné SQL v `backend/services/*.py` a `backend/app/routers/*.py`:** `SELECT 1` / `EXISTS`; **`select ems.fn_*(…)`**; **`SELECT … FROM ems.vw_*`** (read přes view); žádné jiné ad-hoc **`SELECT`/`INSERT`/`UPDATE`**. IO (Modbus, HTTP); **PuLP**; orchestrace scheduleru.
@@ -197,7 +199,7 @@ Specifikace z `docs/02-architecture.md`, modulových docs a komentářů v `plan
## Konvence (krátce)
- Python: `snake_case`, type hints, Pydantic pro API modely.
- SQL: `snake_case`, explicitní FK; Flyway pořadí `V###__` / repeatable `R__NNN_*.sql` (třímístný prefix = pořadí závislostí mezi fn/vw).
- SQL: viz také odstavec **Formát SQL** u sekce SQL-first výše — **2 mezery** odsazení, **klíčová slova malými písmeny**, `snake_case` identifikátory, explicitní FK; Flyway pořadí `V###__` / repeatable `R__NNN_*.sql` (třímístný prefix = pořadí závislostí mezi fn/vw).
- Timescale **continuous aggregate** (CA): komentář k objektu CA je **`COMMENT ON VIEW`**, ne `COMMENT ON MATERIALIZED VIEW` (PG hlásí 42809). Viz `.cursor/rules/timescale-continuous-aggregate.mdc`.
- Výkon **W**, energie **Wh**, ceny **Kč/kWh**; čas v DB **`TIMESTAMPTZ` (UTC)**.
- NIKDY neupravuj existující V__ migrační soubory po jejich aplikaci na DB.

View File

@@ -19,6 +19,7 @@ from app.deps import set_pg_pool
from app.refresh_negative_prices import refresh_negative_price_predictions
from app.ws_log_handler import WSLogHandler
from services.audit_filler import fill_audit_for_completed_intervals
from services.plan_actual_slot_guard import run_plan_actual_slot_guard_for_all_active_sites
from services.control_exporter import export_setpoints, verify_modbus_commands
from services.forecast_service import fetch_pv_forecast
from services.heartbeat_service import send_heartbeat
@@ -75,6 +76,13 @@ async def lifespan(app: FastAPI):
except Exception:
logger.exception("scheduled_audit_filler site=%s failed", site["id"])
async def scheduled_plan_actual_slot_guard() -> None:
"""Po audit filleru: fatální odchylka plán vs. skutečnost (síť) → Discord (dedup v DB)."""
try:
await run_plan_actual_slot_guard_for_all_active_sites(app.state.pg_pool)
except Exception:
logger.exception("scheduled_plan_actual_slot_guard failed")
async def scheduled_forecast_accuracy() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
@@ -306,6 +314,14 @@ async def lifespan(app: FastAPI):
second=0,
id="audit_filler",
)
scheduler.add_job(
scheduled_plan_actual_slot_guard,
"cron",
minute="5,20,35,50",
second=0,
id="plan_actual_slot_guard",
replace_existing=True,
)
scheduler.add_job(
scheduled_forecast_accuracy,
"cron",

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from datetime import datetime, timezone
import asyncpg
import httpx
@@ -119,6 +119,27 @@ async def run_fn_set_mode_with_discord(
return str(new)
async def notify_plan_vs_actual_fatal(
site_code: str,
slot_label: str,
interval_start_utc: datetime,
plan_grid_w: int,
actual_grid_w: int,
deviation_grid_w: int,
reason_code: str,
detail: str,
) -> None:
"""Discord po fatální odchylce plán vs. audit (síť) pro uzavřený 15min slot."""
utc_label = interval_start_utc.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
msg = (
f"**Fatální odchylka plán vs. realita (síť)** `{site_code}`\n"
f"Slot: **{slot_label}** (`{utc_label}`)\n"
f"**{reason_code}**: {detail}\n"
f"Plán grid: **{plan_grid_w}** W | Skutečnost: **{actual_grid_w}** W | Δ (actplan): **{deviation_grid_w}** W"
)
await send_discord(msg, level="critical")
async def send_discord(message: str, level: str = "info") -> bool:
"""
Pošle notifikaci na Discord webhook.

View File

@@ -0,0 +1,116 @@
"""
Kontrola plán vs. skutečnost po uzavření 15min slotu.
Pravidla a dedup INSERT drží ems.fn_plan_actual_slot_guard_site / fn_plan_actual_slot_guard_all_active
(repeatable R__076). Python jen zavolá funkci a pošle Discord podle vrácených alertů.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
import asyncpg
from zoneinfo import ZoneInfo
from app.db_json import fetch_json
from services.notification_service import notify_plan_vs_actual_fatal
logger = logging.getLogger(__name__)
_PRAGUE = ZoneInfo("Europe/Prague")
def _interval_start_utc(value: Any) -> datetime:
if isinstance(value, datetime):
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
if isinstance(value, str):
s = value.replace("Z", "+00:00")
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
raise TypeError(f"expected datetime or str for interval_start, got {type(value)!r}")
def _slot_label_prague(interval_start: datetime) -> str:
loc = interval_start.astimezone(_PRAGUE)
return loc.strftime("%Y-%m-%d %H:%M") + " Europe/Prague"
async def _dispatch_site_result(site_payload: dict[str, Any]) -> None:
if site_payload.get("error") == "unknown_site":
logger.warning("plan_actual_slot_guard: unknown site_id=%s", site_payload.get("site_id"))
return
site_code = str(site_payload.get("site_code") or site_payload.get("site_id") or "")
alerts = site_payload.get("alerts")
if not isinstance(alerts, list):
return
for alert in alerts:
if not isinstance(alert, dict):
continue
if not alert.get("notify"):
continue
interval_start = _interval_start_utc(alert["interval_start"])
reason_code = str(alert.get("reason_code") or "")
detail = str(alert.get("detail") or "")
plan_grid_w = int(alert.get("plan_grid_w") or 0)
actual_grid_w = int(alert.get("actual_grid_w") or 0)
deviation_grid_w = int(alert.get("deviation_grid_w") or 0)
slot_label = _slot_label_prague(interval_start)
await notify_plan_vs_actual_fatal(
site_code=site_code,
slot_label=slot_label,
interval_start_utc=interval_start,
plan_grid_w=plan_grid_w,
actual_grid_w=actual_grid_w,
deviation_grid_w=deviation_grid_w,
reason_code=reason_code,
detail=detail,
)
logger.warning(
"[site=%s] plan_actual fatal %s slot=%s: %s",
site_payload.get("site_id"),
reason_code,
interval_start.isoformat(),
detail,
)
async def run_plan_actual_slot_guard_for_all_active_sites(
pool: asyncpg.Pool,
*,
now: datetime | None = None,
) -> None:
"""Scheduler: jeden dotaz přes aktivní lokality (SQL dedup + klasifikace)."""
async with pool.acquire() as conn:
try:
if now is not None:
raw = await fetch_json(
conn,
"SELECT ems.fn_plan_actual_slot_guard_all_active($1::timestamptz)",
now,
)
else:
raw = await fetch_json(conn, "SELECT ems.fn_plan_actual_slot_guard_all_active()")
except Exception:
logger.exception("plan_actual_slot_guard fn_plan_actual_slot_guard_all_active failed")
return
if raw is None:
return
if not isinstance(raw, list):
logger.warning("plan_actual_slot_guard: unexpected payload type %s", type(raw))
return
for site_payload in raw:
if not isinstance(site_payload, dict):
continue
try:
await _dispatch_site_result(site_payload)
except Exception:
logger.exception(
"plan_actual_slot_guard site=%s failed",
site_payload.get("site_id"),
)

View File

@@ -0,0 +1,18 @@
-- Jednorázové potvrzení odeslání fatálního Discord alertu plán vs. skutečnost (deduplikace po slotu).
create table ems.plan_fatal_deviation_sent (
site_id int not null references ems.site (id),
interval_start timestamptz not null,
reason_code text not null,
sent_at timestamptz not null default now(),
primary key (site_id, interval_start)
);
create index idx_plan_fatal_deviation_sent_sent_at
on ems.plan_fatal_deviation_sent (sent_at desc);
comment on table ems.plan_fatal_deviation_sent is
'Backend job po uzavření 15min slotu: při fatální odchylce grid plán vs. audit jednou pošle Discord a zapíše řádek (PK site_id + interval_start).';
comment on column ems.plan_fatal_deviation_sent.reason_code is
'Kód z ems.fn_plan_actual_slot_guard_site (např. GRID_SIGN_MISMATCH, GRID_EXPORT_SPIKE).';

View File

@@ -1,6 +1,12 @@
-- začátek aktuálního (+offset) 15min slotu v Europe/Prague jako timestamptz (UTC instants)
-- Začátek aktuálního (+offset) 15min slotu v Europe/Prague jako timestamptz (UTC instants).
-- Volitelné p_at (např. job po uzavření slotu); null = now().
create or replace function ems.fn_planning_slot_boundary_prague(p_offset_slots int default 0)
drop function if exists ems.fn_planning_slot_boundary_prague(int);
create or replace function ems.fn_planning_slot_boundary_prague(
p_offset_slots int default 0,
p_at timestamptz default null
)
returns timestamptz
language sql
stable
@@ -13,8 +19,10 @@ as $fn$
)
)::timestamp at time zone 'Europe/Prague'
) + make_interval(mins => coalesce(p_offset_slots, 0) * 15)
from (select now() at time zone 'Europe/Prague' as ts) loc;
from (
select coalesce(p_at, now()) at time zone 'Europe/Prague' as ts
) loc;
$fn$;
comment on function ems.fn_planning_slot_boundary_prague(int) is
'Začátek 15min slotu v časové zóně site provozu (Europe/Prague floor); offset v násobcích 15 min.';
comment on function ems.fn_planning_slot_boundary_prague(int, timestamptz) is
'Začátek 15min slotu (Europe/Prague floor); offset v násobcích 15 min; p_at volitelně místo now().';

View File

@@ -0,0 +1,177 @@
-- Kontrola plán vs. audit (síť) po uzavření slotu: pravidla v DB, dedup insert, výstup pro Discord z Pythonu.
create or replace function ems.fn_plan_actual_slot_guard_site(
p_site_id int,
p_now timestamptz default now()
)
returns jsonb
language sql
volatile
as $fn$
with v_code as (
select s.code as site_code
from ems.site s
where s.id = p_site_id
),
slots as (
select distinct unnest(array[
ems.fn_planning_slot_boundary_prague(-1, p_now),
ems.fn_planning_slot_boundary_prague(-2, p_now)
]) as interval_start
),
base as (
select
s.interval_start,
ai.actual_grid_power_w,
ai.deviation_grid_w,
pi.grid_setpoint_w as plan_grid_w
from slots s
inner join ems.audit_interval ai
on ai.site_id = p_site_id
and ai.interval_start = s.interval_start
left join ems.planning_interval pi
on pi.run_id = ai.planning_run_id
and pi.interval_start = ai.interval_start
),
cls as (
select
b.interval_start,
b.plan_grid_w,
coalesce(b.actual_grid_power_w, 0) as actual_grid_w,
b.deviation_grid_w,
case
when b.plan_grid_w is null or b.deviation_grid_w is null then null::text
when b.plan_grid_w < -2000 and coalesce(b.actual_grid_power_w, 0) > 2500
then 'GRID_IMPORT_VS_EXPORT_PLAN'
when b.plan_grid_w <> 0
and coalesce(b.actual_grid_power_w, 0) <> 0
and (b.plan_grid_w > 0)
<> (coalesce(b.actual_grid_power_w, 0) > 0)
and least(
abs(b.plan_grid_w),
abs(coalesce(b.actual_grid_power_w, 0))
) >= 400
then 'GRID_SIGN_MISMATCH'
when b.plan_grid_w > -1000 and coalesce(b.actual_grid_power_w, 0) < -4000
then 'GRID_EXPORT_SPIKE'
when abs(b.deviation_grid_w) >= 10000 and abs(b.plan_grid_w) <= 2500
then 'GRID_LARGE_DEVIATION'
else null::text
end as reason_code,
case
when b.plan_grid_w is null or b.deviation_grid_w is null then null::text
when b.plan_grid_w < -2000 and coalesce(b.actual_grid_power_w, 0) > 2500
then format(
'plán síť %s W vs skutečnost %s W (plán vývoz, skutečnost silný odběr)',
b.plan_grid_w,
coalesce(b.actual_grid_power_w, 0)
)
when b.plan_grid_w <> 0
and coalesce(b.actual_grid_power_w, 0) <> 0
and (b.plan_grid_w > 0)
<> (coalesce(b.actual_grid_power_w, 0) > 0)
and least(
abs(b.plan_grid_w),
abs(coalesce(b.actual_grid_power_w, 0))
) >= 400
then format(
'plán síť %s W vs skutečnost %s W (opačný směr import/export)',
b.plan_grid_w,
coalesce(b.actual_grid_power_w, 0)
)
when b.plan_grid_w > -1000 and coalesce(b.actual_grid_power_w, 0) < -4000
then format(
'plán síť %s W vs skutečnost %s W (neočekávaný silný vývoz)',
b.plan_grid_w,
coalesce(b.actual_grid_power_w, 0)
)
when abs(b.deviation_grid_w) >= 10000 and abs(b.plan_grid_w) <= 2500
then format(
'odchylka výkonu sítě %s W (plán %s W, skutečnost %s W)',
b.deviation_grid_w,
b.plan_grid_w,
coalesce(b.actual_grid_power_w, 0)
)
else null::text
end as detail_cs
from base b
),
ins as (
insert into ems.plan_fatal_deviation_sent (site_id, interval_start, reason_code)
select
p_site_id,
c.interval_start,
c.reason_code
from cls c
where c.reason_code is not null
on conflict (site_id, interval_start) do nothing
returning interval_start, reason_code
),
notified as (
select
c.interval_start,
c.reason_code,
c.detail_cs,
c.plan_grid_w,
c.actual_grid_w,
c.deviation_grid_w
from cls c
inner join ins i
on i.interval_start = c.interval_start
and i.reason_code = c.reason_code
)
select
case
when not exists (select 1 from v_code) then
jsonb_build_object('error', 'unknown_site', 'site_id', p_site_id)
else
jsonb_build_object(
'site_id', p_site_id,
'site_code', (select vc.site_code from v_code vc),
'alerts', coalesce(
(
select coalesce(
jsonb_agg(
jsonb_build_object(
'interval_start', n.interval_start,
'reason_code', n.reason_code,
'detail', n.detail_cs,
'plan_grid_w', n.plan_grid_w,
'actual_grid_w', n.actual_grid_w,
'deviation_grid_w', n.deviation_grid_w,
'notify', true
)
order by n.interval_start
),
'[]'::jsonb
)
from notified n
),
'[]'::jsonb
)
)
end;
$fn$;
comment on function ems.fn_plan_actual_slot_guard_site(int, timestamptz) is
'Poslední 2 uzavřené 15min sloty: fatální odchylka síť plán vs. audit → insert plan_fatal_deviation_sent (dedup); vrátí JSON s alerts k odeslání na Discord.';
create or replace function ems.fn_plan_actual_slot_guard_all_active(
p_now timestamptz default now()
)
returns jsonb
language sql
volatile
as $fn$
select coalesce(
jsonb_agg(
ems.fn_plan_actual_slot_guard_site((elem->>'id')::int, p_now)
order by (elem->>'id')::int
),
'[]'::jsonb
)
from jsonb_array_elements(ems.fn_vw_site_directory_active()) as t(elem);
$fn$;
comment on function ems.fn_plan_actual_slot_guard_all_active(timestamptz) is
'Projde aktivní lokality (fn_vw_site_directory_active) a zavolá fn_plan_actual_slot_guard_site; pole výsledků pro scheduler.';

View File

@@ -31,6 +31,7 @@
planning_engine (denně 15:00) │
control_exporter (každých 15min) │
audit_filler (každých 15min) │
plan_actual_slot_guard (:05,:20,:35,:50) │
verify_modbus (každé 2 min) │
└──────┬──────────────────────────┬────────────┘
│ Modbus TCP │ HTTP

View File

@@ -53,6 +53,9 @@ Implementace: `services/control_exporter.py` — `verify_modbus_commands`, `_ver
| Job | Frekvence | Popis |
|-----|-----------|--------|
| `verify_modbus` | každé **2 min** | Pro každou aktivní site vybere `written` příkazy s `written_at` v posledních **20 min** a zavolá `verify_modbus_commands`. |
| `plan_actual_slot_guard` | **:05, :20, :35, :50** (po `audit_filler`) | `ems.fn_plan_actual_slot_guard_all_active` (+ `plan_actual_slot_guard.py` jen Discord): poslední 2 uzavřené 15min sloty — fatální odchylka **plán vs. audit síť****Discord** (`critical`), dedup přes `ems.plan_fatal_deviation_sent`. |
Plná tabulka jobů je v [`lifespan.py`](../../backend/app/lifespan.py).
## Ruční API