From 6074535d96f7ae822c1f31de45f8586ce6b9d218 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Wed, 29 Apr 2026 14:17:24 +0200 Subject: [PATCH] OTE informatin discord --- backend/services/notification_service.py | 60 +++++++ backend/services/price_importer.py | 37 +++- .../R__081_fn_ote_day_brief_prague.sql | 82 +++++++++ .../R__082_fn_ote_day_signals_prague.sql | 169 ++++++++++++++++++ docs/04-modules/market-prices.md | 1 + 5 files changed, 348 insertions(+), 1 deletion(-) create mode 100644 db/routines/R__081_fn_ote_day_brief_prague.sql create mode 100644 db/routines/R__082_fn_ote_day_signals_prague.sql diff --git a/backend/services/notification_service.py b/backend/services/notification_service.py index 3312823..d2e5f00 100644 --- a/backend/services/notification_service.py +++ b/backend/services/notification_service.py @@ -16,6 +16,7 @@ logger = logging.getLogger(__name__) _WEBHOOK_CACHE: dict[tuple[int, str], str] = {} _OTE_IMPORT_ALERT_CACHE: dict[tuple[str, str], float] = {} +_OTE_IMPORT_OK_CACHE: dict[str, float] = {} async def _get_site_webhook_url( @@ -255,6 +256,65 @@ async def notify_ote_import_format_changed( await send_discord(conn, site_id=None, message=msg, level="critical") +def _should_send_ote_ok(report_date: str, *, cooldown_s: float) -> bool: + now = datetime.now(timezone.utc).timestamp() + key = str(report_date) + last = _OTE_IMPORT_OK_CACHE.get(key) + if last is not None and (now - last) < cooldown_s: + return False + _OTE_IMPORT_OK_CACHE[key] = now + return True + + +async def notify_ote_import_ok_brief( + conn: asyncpg.Connection | None, + *, + report_date: str, + brief: dict, + url: str, +) -> None: + """ + Info notifikace po úspěšném importu kompletního dne OTE (stručná analýza "co čekat zítra"). + Dedup: 1× za cooldown na report_date. + """ + if not _should_send_ote_ok(report_date, cooldown_s=20 * 3600): + return + + def _f(x, default: float = 0.0) -> float: + try: + if x is None: + return default + return float(x) + except Exception: + return default + + min_p = _f(brief.get("min_price")) + max_p = _f(brief.get("max_price")) + + raw_signals = brief.get("signals") or [] + signals: list[str] = [] + if isinstance(raw_signals, list): + for s in raw_signals[:6]: + if not isinstance(s, dict): + continue + title = str(s.get("title") or s.get("code") or "").strip() + detail = str(s.get("detail") or "").strip() + if title and detail: + signals.append(f"{title} ({detail})") + elif title: + signals.append(title) + if not signals: + signals.append("běžný den (bez extrémů)") + + msg = ( + f"OTE ceny staženy – `{report_date}`\n" + f"URL: `{url}`\n" + f"Min: **{min_p:.3f}** | Max: **{max_p:.3f}** Kč/kWh\n" + f"Signály: " + "; ".join(f"**{s}**" for s in signals) + ) + await send_discord(conn, site_id=None, message=msg, level="info") + + async def notify_modbus_mismatch( conn: asyncpg.Connection | None, site_id: int | None, diff --git a/backend/services/price_importer.py b/backend/services/price_importer.py index 3f533dd..7e42a44 100644 --- a/backend/services/price_importer.py +++ b/backend/services/price_importer.py @@ -12,7 +12,10 @@ import httpx from app.config import get_settings from app.db_json import fetch_json -from services.notification_service import notify_ote_import_format_changed +from services.notification_service import ( + notify_ote_import_format_changed, + notify_ote_import_ok_brief, +) logger = logging.getLogger(__name__) @@ -154,6 +157,7 @@ async def import_ote_prices_for_day( stats_after = json.loads(stats_after) first_price = stats_after.get("first_price") n_imported = int(stats_after.get("count") or 0) + is_complete = bool(stats_after.get("is_complete")) if not ote_prague_day_slots_look_complete(n_imported): logger.warning( "OTE: %s slotů pro %s (plný den = jedna z %s; jinak neúplná data)", @@ -161,6 +165,21 @@ async def import_ote_prices_for_day( day_str, sorted(OTE_FULL_DAY_SLOT_COUNTS), ) + if is_complete: + brief = await fetch_json( + conn, + "select ems.fn_ote_day_signals_prague($1::date, $2::int)", + target_day, + 14, + ) + if not isinstance(brief, dict): + brief = json.loads(brief) + await notify_ote_import_ok_brief( + conn, + report_date=day_str, + brief=brief if isinstance(brief, dict) else {}, + url=OTE_URL.format(date=day_str), + ) logger.info( "OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh", n, @@ -305,6 +324,7 @@ async def import_ote_prices( stats_after = json.loads(stats_after) first_price = stats_after.get("first_price") n_imported = int(stats_after.get("count") or 0) + is_complete = bool(stats_after.get("is_complete")) incomplete = not ote_prague_day_slots_look_complete(n_imported or 0) if incomplete: now_p = datetime.now(ZoneInfo("Europe/Prague")) @@ -320,6 +340,21 @@ async def import_ote_prices( date_str, sorted(OTE_FULL_DAY_SLOT_COUNTS), ) + if is_complete: + brief = await fetch_json( + db, + "select ems.fn_ote_day_signals_prague($1::date, $2::int)", + target_day, + 14, + ) + if not isinstance(brief, dict): + brief = json.loads(brief) + await notify_ote_import_ok_brief( + db, + report_date=date_str, + brief=brief if isinstance(brief, dict) else {}, + url=OTE_URL.format(date=date_str), + ) logger.info( "OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh", n, diff --git a/db/routines/R__081_fn_ote_day_brief_prague.sql b/db/routines/R__081_fn_ote_day_brief_prague.sql new file mode 100644 index 0000000..6dd1e84 --- /dev/null +++ b/db/routines/R__081_fn_ote_day_brief_prague.sql @@ -0,0 +1,82 @@ +-- OTE CZ: stručná analýza cen pro den (TZ Europe/Prague) + +create or replace function ems.fn_ote_day_brief_prague(p_day date) +returns jsonb +language sql +stable +as $fn$ + with slots as ( + select + mip.interval_start, + mip.buy_raw_price_czk_kwh as price, + (mip.interval_start at time zone 'Europe/Prague') as local_ts, + extract(hour from mip.interval_start at time zone 'Europe/Prague')::int as local_hour + from ems.market_interval_price mip + where mip.market_source = 'OTE_CZ' + and (mip.interval_start at time zone 'Europe/Prague')::date = p_day + ), + agg as ( + select + count(*)::int as slot_count, + min(price) as min_price, + max(price) as max_price, + avg(price) as avg_price, + count(*) filter (where price < 0)::int as negative_slots, + count(*) filter (where abs(price) <= 0.25)::int as zeroish_slots + from slots + ), + min_slot as ( + select s.local_ts, s.local_hour, s.price + from slots s + order by s.price asc, s.local_ts asc + limit 1 + ), + max_slot as ( + select s.local_ts, s.local_hour, s.price + from slots s + order by s.price desc, s.local_ts asc + limit 1 + ), + noon as ( + select + min(price) as noon_min_price, + avg(price) as noon_avg_price + from slots + where local_hour between 10 and 14 + ), + morning as ( + select + max(price) as morning_max_price, + avg(price) as morning_avg_price + from slots + where local_hour between 6 and 9 + ), + evening as ( + select + max(price) as evening_max_price, + avg(price) as evening_avg_price + from slots + where local_hour between 17 and 21 + ) + select jsonb_build_object( + 'day', p_day, + 'slot_count', (select slot_count from agg), + 'is_complete', (select slot_count from agg) in (92, 96, 100), + 'min_price', round(coalesce((select min_price from agg), 0)::numeric, 6), + 'max_price', round(coalesce((select max_price from agg), 0)::numeric, 6), + 'avg_price', round(coalesce((select avg_price from agg), 0)::numeric, 6), + 'negative_slots', (select negative_slots from agg), + 'zeroish_slots', (select zeroish_slots from agg), + 'min_slot_local', (select local_ts from min_slot), + 'max_slot_local', (select local_ts from max_slot), + 'noon_min_price', round(coalesce((select noon_min_price from noon), 0)::numeric, 6), + 'noon_avg_price', round(coalesce((select noon_avg_price from noon), 0)::numeric, 6), + 'morning_max_price', round(coalesce((select morning_max_price from morning), 0)::numeric, 6), + 'morning_avg_price', round(coalesce((select morning_avg_price from morning), 0)::numeric, 6), + 'evening_max_price', round(coalesce((select evening_max_price from evening), 0)::numeric, 6) + ); +$fn$; + +comment on function ems.fn_ote_day_brief_prague(date) is + 'Stručná analýza OTE_CZ cen pro den v Europe/Prague (min/max/avg, podíl záporných/okolo nuly, poledne/rano/vecer).'; + diff --git a/db/routines/R__082_fn_ote_day_signals_prague.sql b/db/routines/R__082_fn_ote_day_signals_prague.sql new file mode 100644 index 0000000..1af3c90 --- /dev/null +++ b/db/routines/R__082_fn_ote_day_signals_prague.sql @@ -0,0 +1,169 @@ +-- OTE CZ: signály pro "briefing" dalšího dne (TZ Europe/Prague) +-- +-- Cíl: vrátit pár srozumitelných signálů (negativní/okolo nuly/špička), +-- které se dají posílat do Discordu bez logiky v Pythonu. + +create or replace function ems.fn_ote_day_signals_prague( + p_day date, + p_lookback_days int default 14 +) +returns jsonb +language sql +stable +as $fn$ + with params as ( + select + greatest(1, least(coalesce(p_lookback_days, 14), 60))::int as lookback_days, + 0.25::numeric as zeroish_abs_czk_kwh, + 4::numeric as spike_interesting_czk_kwh, + 6::numeric as spike_extreme_czk_kwh + ), + day_slots as ( + select + mip.interval_start, + (mip.interval_start at time zone 'Europe/Prague') as local_ts, + extract(hour from mip.interval_start at time zone 'Europe/Prague')::int as local_hour, + mip.buy_raw_price_czk_kwh as price + from ems.market_interval_price mip + where mip.market_source = 'OTE_CZ' + and (mip.interval_start at time zone 'Europe/Prague')::date = p_day + ), + day_agg as ( + select + count(*)::int as slot_count, + min(price) as min_price, + max(price) as max_price, + avg(price) as avg_price, + count(*) filter (where price < 0)::int as neg_slots, + count(*) filter (where abs(price) <= (select zeroish_abs_czk_kwh from params))::int as zeroish_slots, + min(price) filter (where local_hour between 10 and 14) as noon_min_price, + avg(price) filter (where local_hour between 10 and 14) as noon_avg_price, + max(price) filter (where local_hour between 6 and 9) as morning_max_price, + max(price) filter (where local_hour between 17 and 21) as evening_max_price, + max(price) filter (where local_hour between 17 and 21) + - avg(price) filter (where local_hour between 10 and 14) as evening_minus_noon + from day_slots + ), + hist_hours as ( + select + (mip.interval_start at time zone 'Europe/Prague')::date as d, + extract(hour from mip.interval_start at time zone 'Europe/Prague')::int as h, + avg(mip.buy_raw_price_czk_kwh) as avg_price + from ems.market_interval_price mip + cross join params p + where mip.market_source = 'OTE_CZ' + and (mip.interval_start at time zone 'Europe/Prague')::date < p_day + and (mip.interval_start at time zone 'Europe/Prague')::date >= p_day - (p.lookback_days || ' days')::interval + group by 1, 2 + ), + hist_windows as ( + select + avg(avg_price) filter (where h between 17 and 21) as avg_evening, + avg(avg_price) filter (where h between 6 and 9) as avg_morning, + avg(avg_price) filter (where h between 10 and 14) as avg_noon + from hist_hours + ), + signals as ( + select jsonb_agg(s order by (s ->> 'severity') desc, (s ->> 'code')) as arr + from ( + -- negativní + select jsonb_build_object( + 'code', 'NEG_EXTREME', + 'severity', 3, + 'title', 'extrémně záporné ceny', + 'detail', format('min %.3f Kč/kWh, záporné sloty %s', (select min_price from day_agg), (select neg_slots from day_agg)) + ) s + where (select min_price from day_agg) <= -0.50 + or (select neg_slots from day_agg) >= 8 + + union all + select jsonb_build_object( + 'code', 'NEG_PRESENT', + 'severity', 2, + 'title', 'záporné ceny', + 'detail', format('min %.3f Kč/kWh, záporné sloty %s', (select min_price from day_agg), (select neg_slots from day_agg)) + ) s + where (select min_price from day_agg) < 0 + and not ( + (select min_price from day_agg) <= -0.50 + or (select neg_slots from day_agg) >= 8 + ) + + -- okolo nuly přes den + union all + select jsonb_build_object( + 'code', 'NOON_ZEROISH', + 'severity', 2, + 'title', 'poledne okolo nuly', + 'detail', format('polední průměr %.3f Kč/kWh (10–14)', (select noon_avg_price from day_agg)) + ) s + where coalesce((select noon_avg_price from day_agg), 999) <= (select zeroish_abs_czk_kwh from params) + + union all + select jsonb_build_object( + 'code', 'MANY_ZEROISH', + 'severity', 1, + 'title', 'hodně slotů okolo nuly', + 'detail', format('okolo nuly slotů %s (|p| ≤ %.2f Kč/kWh)', (select zeroish_slots from day_agg), (select zeroish_abs_czk_kwh from params)) + ) s + where (select zeroish_slots from day_agg) >= 16 + + -- špička večer (hard) + union all + select jsonb_build_object( + 'code', 'EVENING_SPIKE_EXTREME', + 'severity', 3, + 'title', 'večer extrémně drahý', + 'detail', format('max večer %.3f Kč/kWh (17–21)', (select evening_max_price from day_agg)) + ) s + where coalesce((select evening_max_price from day_agg), 0) >= (select spike_extreme_czk_kwh from params) + + union all + select jsonb_build_object( + 'code', 'EVENING_SPIKE_INTERESTING', + 'severity', 2, + 'title', 'večer drahý', + 'detail', format('max večer %.3f Kč/kWh (17–21)', (select evening_max_price from day_agg)) + ) s + where coalesce((select evening_max_price from day_agg), 0) >= (select spike_interesting_czk_kwh from params) + and coalesce((select evening_max_price from day_agg), 0) < (select spike_extreme_czk_kwh from params) + + -- špička večer (relativní vůči posledním N dnům) + union all + select jsonb_build_object( + 'code', 'EVENING_SPIKE_REL', + 'severity', 2, + 'title', 'večer nadprůměrná špička', + 'detail', format( + 'max večer %.3f vs. průměr %.3f (lookback %s dní)', + (select evening_max_price from day_agg), + (select avg_evening from hist_windows), + (select lookback_days from params) + ) + ) s + where (select avg_evening from hist_windows) is not null + and coalesce((select evening_max_price from day_agg), 0) >= (select avg_evening from hist_windows) + 1.5 + ) t + ) + select jsonb_build_object( + 'day', p_day, + 'lookback_days', (select lookback_days from params), + 'metrics', jsonb_build_object( + 'slot_count', (select slot_count from day_agg), + 'min_price', round(coalesce((select min_price from day_agg), 0)::numeric, 6), + 'max_price', round(coalesce((select max_price from day_agg), 0)::numeric, 6), + 'avg_price', round(coalesce((select avg_price from day_agg), 0)::numeric, 6), + 'neg_slots', (select neg_slots from day_agg), + 'zeroish_slots', (select zeroish_slots from day_agg), + 'noon_avg_price', round(coalesce((select noon_avg_price from day_agg), 0)::numeric, 6), + 'evening_max_price', round(coalesce((select evening_max_price from day_agg), 0)::numeric, 6), + 'avg_evening_last_n', round(coalesce((select avg_evening from hist_windows), 0)::numeric, 6), + 'avg_noon_last_n', round(coalesce((select avg_noon from hist_windows), 0)::numeric, 6) + ), + 'signals', coalesce((select arr from signals), '[]'::jsonb) + ); +$fn$; + +comment on function ems.fn_ote_day_signals_prague(date, int) is + 'Signály pro briefing OTE_CZ cen pro p_day (negativní/okolo nuly/špička), včetně relativní špičky proti posledním N dnům.'; + diff --git a/docs/04-modules/market-prices.md b/docs/04-modules/market-prices.md index c6724cf..cbaf340 100644 --- a/docs/04-modules/market-prices.md +++ b/docs/04-modules/market-prices.md @@ -150,6 +150,7 @@ PRICE_IMPORT_RETRY_BACKOFF_SEC=300 - Alert pokud do 16:00 nejsou v DB ceny na zítřek - Discord (CRITICAL) pokud OTE změní formát `@@chart-data` tak, že DB parser (`ems.fn_ote_parse_15m_price_json`) nenajde vhodnou sérii (`dataLine[].tooltip`) nebo narazí na neočekávaný počet bodů; posílá `services.notification_service.notify_ote_import_format_changed`. +- Discord (INFO) po úspěšném importu kompletního dne (92/96/100 slotů) – krátký briefing pro další den (min/max + signály: záporné/okolo nuly/špička) z `ems.fn_ote_day_signals_prague` (read-model) přes `services.notification_service.notify_ote_import_ok_brief` (dedup). - Log každého importu (datum, počet intervalů, zdroj, trvání) - Endpoint `GET /health/prices?date=YYYY-MM-DD` → vrátí počet importovaných intervalů