diff --git a/backend/services/price_importer.py b/backend/services/price_importer.py index 2ce5e3d..9117dae 100644 --- a/backend/services/price_importer.py +++ b/backend/services/price_importer.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import json import logging +from dataclasses import dataclass, field from datetime import date, datetime, timedelta from zoneinfo import ZoneInfo @@ -13,6 +14,8 @@ from app.config import get_settings logger = logging.getLogger(__name__) +OTE_EXPECTED_SLOTS = 96 + OTE_URL = ( "https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/" "@@chart-data?report_date={date}&time_resolution=PT15M" @@ -93,6 +96,135 @@ async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]: OTE_TZ = ZoneInfo("Europe/Prague") +async def _apply_ote_json_to_db(conn, payload: dict) -> int: + """Zapíše JSON z OTE přes ems.fn_ote_import_from_json; vrátí ROW_COUNT z funkce.""" + settings = get_settings() + eur_czk = float(settings.eur_czk_rate) + n = await conn.fetchval( + "SELECT ems.fn_ote_import_from_json($1::jsonb, $2)", + json.dumps(payload), + eur_czk, + ) + return int(n) + + +async def count_ote_slots_prague_day(conn, target_day: date) -> int: + """Počet řádků OTE_CZ pro kalendářní den v Europe/Prague (96 očekáváno).""" + return int( + await conn.fetchval( + """ + SELECT COUNT(*)::int + FROM ems.market_interval_price + WHERE market_source = 'OTE_CZ' + AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date + """, + target_day, + ) + or 0 + ) + + +async def import_ote_prices_for_day( + conn, + target_day: date, +) -> tuple[int, str, float, str | None]: + """ + Stáhne OTE pro jeden konkrétní report_date a uloží přes fn_ote_import_from_json. + Stejný význam návratové hodnoty jako import_ote_prices(). + """ + day_str = target_day.isoformat() + payload, fetch_error = await _fetch_ote_json(day_str) + if payload is None: + return -1, day_str, 0.0, fetch_error or "fetch_failed" + try: + n = await _apply_ote_json_to_db(conn, payload) + first_price = await conn.fetchval( + """ + SELECT buy_raw_price_czk_kwh + FROM ems.market_interval_price + WHERE market_source = 'OTE_CZ' + AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date + ORDER BY interval_start + LIMIT 1 + """, + target_day, + ) + n_imported = await count_ote_slots_prague_day(conn, target_day) + if n_imported < OTE_EXPECTED_SLOTS: + logger.warning( + "OTE: jen %s/%s slotů pro %s (Europe/Prague)", + n_imported, + OTE_EXPECTED_SLOTS, + day_str, + ) + logger.info( + "OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh", + n, + day_str, + float(first_price or 0), + ) + return n, day_str, float(first_price or 0.0), None + except Exception as e: + detail = str(e).strip() or e.__class__.__name__ + logger.error("OTE import DB error pro %s: %s", day_str, detail, exc_info=True) + short = detail[:200] if len(detail) > 200 else detail + return -1, day_str, 0.0, f"db_import:{e.__class__.__name__}: {short}" + + +@dataclass +class OteBackfillStats: + start_date: date + end_date: date + days_checked: int = 0 + days_imported: int = 0 + days_skipped_complete: int = 0 + days_skipped_future: int = 0 + days_failed: int = 0 + failures: list[tuple[str, str]] = field(default_factory=list) + + +async def backfill_ote_prices( + conn, + *, + start_date: date, + end_date: date, + only_missing: bool = True, + pause_between_days_s: float = 0.35, + max_failures_logged: int = 80, +) -> OteBackfillStats: + """ + Projde rozsah [start_date, end_date] (kalendář Prague) a doplní chybějící dny z OTE. + + only_missing: přeskočí dny, kde už je count >= OTE_EXPECTED_SLOTS. + pause_between_days_s: krátká pauza mezi HTTP požadavky (ohleduplnost k OTE). + """ + stats = OteBackfillStats(start_date=start_date, end_date=end_date) + today_prague = datetime.now(OTE_TZ).date() + d = start_date + while d <= end_date: + stats.days_checked += 1 + if d > today_prague: + stats.days_skipped_future += 1 + d += timedelta(days=1) + continue + slots = await count_ote_slots_prague_day(conn, d) + if only_missing and slots >= OTE_EXPECTED_SLOTS: + stats.days_skipped_complete += 1 + d += timedelta(days=1) + continue + n, day_str, _, err = await import_ote_prices_for_day(conn, d) + if n < 0: + stats.days_failed += 1 + if len(stats.failures) < max_failures_logged: + stats.failures.append((day_str, err or "unknown")) + else: + stats.days_imported += 1 + if pause_between_days_s > 0: + await asyncio.sleep(pause_between_days_s) + d += timedelta(days=1) + return stats + + async def import_ote_prices( db, site_id: int | None = None, @@ -105,8 +237,6 @@ async def import_ote_prices( Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code) (-1, datum_str, 0.0, error_code) při chybě """ - settings = get_settings() - if site_id is not None: row = await db.fetchrow( "SELECT timezone FROM ems.site WHERE id = $1", site_id @@ -149,14 +279,8 @@ async def import_ote_prices( date_str = target_day.isoformat() - # Vše ostatní řeší PostgreSQL funkce - eur_czk = float(settings.eur_czk_rate) try: - n = await db.fetchval( - "SELECT ems.fn_ote_import_from_json($1::jsonb, $2)", - json.dumps(payload), - eur_czk, - ) + n = await _apply_ote_json_to_db(db, payload) first_price = await db.fetchval( """ SELECT buy_raw_price_czk_kwh @@ -177,7 +301,7 @@ async def import_ote_prices( """, target_day, ) - incomplete = (n_imported or 0) < 96 + incomplete = (n_imported or 0) < OTE_EXPECTED_SLOTS if incomplete: now_p = datetime.now(ZoneInfo("Europe/Prague")) tomorrow_p = (now_p + timedelta(days=1)).date() @@ -186,10 +310,17 @@ async def import_ote_prices( target_day == tomorrow_p and (now_p.hour, now_p.minute) < (14, 30) ): - logger.warning("OTE: jen %s/96 slotů pro %s", n_imported, date_str) + logger.warning( + "OTE: jen %s/%s slotů pro %s", + n_imported, + OTE_EXPECTED_SLOTS, + date_str, + ) logger.info( "OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh", - n, date_str, float(first_price or 0), + n, + date_str, + float(first_price or 0), ) return int(n), date_str, float(first_price or 0.0), None except Exception as e: diff --git a/scripts/backfill_ote_prices.py b/scripts/backfill_ote_prices.py new file mode 100644 index 0000000..566f688 --- /dev/null +++ b/scripts/backfill_ote_prices.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Doplnění ems.market_interval_price z veřejného OTE JSON endpointu (stejný jako price_importer). + +Spuštění z kořene repozitáře (načte .env z kořene): + + cd /path/to/ems-cursor + PYTHONPATH=backend python scripts/backfill_ote_prices.py + +Volby: + --days 730 posledních N kalendářních dní (Europe/Prague), výchozí 730 ≈ 2 roky + --from-date / --to-date pevný rozsah YYYY-MM-DD (má přednost před --days u konce rozsahu) + --force stáhnout znovu i dny, kde už je 96 slotů + --dry-run jen vypsat chybějící dny, bez HTTP + --delay SEC pauza mezi dny (výchozí 0.35) + --refresh-predictions po skončení zavolat fn_predict_negative_price_windows pro aktivní site +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +from datetime import date, datetime, timedelta +from pathlib import Path +from zoneinfo import ZoneInfo + +# Kořen repa = rodič scripts/ +_REPO_ROOT = Path(__file__).resolve().parent.parent +_BACKEND = _REPO_ROOT / "backend" +if str(_BACKEND) not in sys.path: + sys.path.insert(0, str(_BACKEND)) + +os.chdir(_REPO_ROOT) + +import asyncpg # noqa: E402 + +from app.config import get_settings # noqa: E402 +from services.price_importer import ( # noqa: E402 + OTE_EXPECTED_SLOTS, + backfill_ote_prices, + count_ote_slots_prague_day, +) + +PRAGUE = ZoneInfo("Europe/Prague") + + +def _parse_ymd(s: str) -> date: + y, m, d = (int(p) for p in s.split("-", 2)) + return date(y, m, d) + + +async def _dry_run_missing( + conn: asyncpg.Connection, + start: date, + end: date, + today_prague: date, +) -> list[date]: + out: list[date] = [] + d = start + while d <= end: + if d > today_prague: + break + n = await count_ote_slots_prague_day(conn, d) + if n < OTE_EXPECTED_SLOTS: + out.append(d) + d += timedelta(days=1) + return out + + +async def _refresh_predictions_all(conn: asyncpg.Connection) -> None: + sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") + for row in sites: + sid = int(row["id"]) + try: + await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", sid) + logging.info("Predikce záporných cen obnovena pro site_id=%s", sid) + except Exception: + logging.exception("fn_predict_negative_price_windows selhalo pro site_id=%s", sid) + + +async def main_async(args: argparse.Namespace) -> int: + settings = get_settings() + pool = await asyncpg.create_pool( + host=settings.db_host, + port=settings.db_port, + user=settings.db_user, + password=settings.db_password, + database=settings.db_name, + min_size=1, + max_size=3, + ) + try: + today_prague = datetime.now(PRAGUE).date() + if args.to_date: + end = _parse_ymd(args.to_date) + else: + end = today_prague + if args.from_date: + start = _parse_ymd(args.from_date) + else: + start = end - timedelta(days=max(0, int(args.days) - 1)) + + if start > end: + logging.error("--from-date je po --to-date") + return 2 + + logging.info( + "Rozsah backfillu: %s … %s (kurz EUR/CZK z .env = %s)", + start.isoformat(), + end.isoformat(), + settings.eur_czk_rate, + ) + + async with pool.acquire() as conn: + if args.dry_run: + missing = await _dry_run_missing(conn, start, end, today_prague) + logging.info( + "Dry-run: %s chybějících nebo neúplných dní (< %s slotů)", + len(missing), + OTE_EXPECTED_SLOTS, + ) + for md in missing[:50]: + n = await count_ote_slots_prague_day(conn, md) + logging.info(" %s (%s slotů)", md.isoformat(), n) + if len(missing) > 50: + logging.info(" … a dalších %s dní", len(missing) - 50) + return 0 + + stats = await backfill_ote_prices( + conn, + start_date=start, + end_date=end, + only_missing=not args.force, + pause_between_days_s=float(args.delay), + ) + logging.info( + "Hotovo: zkontrolováno %s dní, importováno %s, přeskočeno (kompletní) %s, " + "přeskočeno (budoucnost) %s, selhalo %s", + stats.days_checked, + stats.days_imported, + stats.days_skipped_complete, + stats.days_skipped_future, + stats.days_failed, + ) + for day_str, err in stats.failures[:20]: + logging.warning(" %s: %s", day_str, err) + if len(stats.failures) > 20: + logging.warning(" … dalších %s chyb v seznamu", len(stats.failures) - 20) + + if args.refresh_predictions and stats.days_imported > 0: + await _refresh_predictions_all(conn) + + return 1 if stats.days_failed else 0 + finally: + await pool.close() + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s %(message)s", + ) + parser = argparse.ArgumentParser(description="Backfill OTE cen do ems.market_interval_price") + parser.add_argument( + "--days", + type=int, + default=730, + help="Počet dní zpět od --to-date (výchozí 730)", + ) + parser.add_argument("--from-date", type=str, default=None, help="YYYY-MM-DD začátek rozsahu") + parser.add_argument( + "--to-date", + type=str, + default=None, + help="YYYY-MM-DD konec rozsahu (výchozí dnes Europe/Prague)", + ) + parser.add_argument( + "--force", + action="store_true", + help="Stáhnout znovu i dny s plnými %s sloty" % OTE_EXPECTED_SLOTS, + ) + parser.add_argument("--dry-run", action="store_true", help="Jen vypsat chybějící dny") + parser.add_argument( + "--delay", + type=float, + default=0.35, + help="Sekundy pauzy mezi dny (výchozí 0.35)", + ) + parser.add_argument( + "--refresh-predictions", + action="store_true", + help="Po importu obnovit fn_predict_negative_price_windows pro aktivní lokality", + ) + ns = parser.parse_args() + raise SystemExit(asyncio.run(main_async(ns))) + + +if __name__ == "__main__": + main()