From 9ff7c96c227cd96395e4bc623c6f4485225e3317 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Sun, 12 Apr 2026 21:38:57 +0200 Subject: [PATCH] fix backfill --- backend/scripts/backfill_ote_prices.py | 220 +++++++++++++++++++++++++ deploy/run_backfill_ote_prices.sh | 17 ++ scripts/backfill_ote_prices.py | 195 ++-------------------- 3 files changed, 247 insertions(+), 185 deletions(-) create mode 100644 backend/scripts/backfill_ote_prices.py create mode 100755 deploy/run_backfill_ote_prices.sh diff --git a/backend/scripts/backfill_ote_prices.py b/backend/scripts/backfill_ote_prices.py new file mode 100644 index 0000000..b07e4ed --- /dev/null +++ b/backend/scripts/backfill_ote_prices.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" +Doplnění ems.market_interval_price z veřejného OTE JSON endpointu (stejný jako price_importer). + +Produkce (Docker – závislosti v image backendu), z adresáře kde leží docker-compose.yml: + + cd /opt/ems-deploy + docker compose exec -T backend python3 scripts/backfill_ote_prices.py --dry-run + +Nebo z kořene stacku: bash app/deploy/run_backfill_ote_prices.sh --dry-run + +Lokálně (venv s backend/requirements.txt): + + cd /path/to/ems-cursor + PYTHONPATH=backend python3 backend/scripts/backfill_ote_prices.py --dry-run + +Volby: + --days 730 posledních N kalendářních dní (Europe/Prague), výchozí 730 ≈ 2 roky + --from-date / --to-date pevný rozsah YYYY-MM-DD (má přednost před --days u konce rozsahu) + --force stáhnout znovu i dny, kde už je 96 slotů + --dry-run jen vypsat chybějící dny, bez HTTP + --delay SEC pauza mezi dny (výchozí 0.35) + --refresh-predictions po skončení zavolat fn_predict_negative_price_windows pro aktivní site +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import sys +from datetime import date, datetime, timedelta +from pathlib import Path +from zoneinfo import ZoneInfo + +_BACKEND_ROOT = Path(__file__).resolve().parent.parent +if str(_BACKEND_ROOT) not in sys.path: + sys.path.insert(0, str(_BACKEND_ROOT)) + +os.chdir(_BACKEND_ROOT) + +try: + import asyncpg +except ModuleNotFoundError as e: + print( + "Chybí modul 'asyncpg' (závislost backendu).\n" + "\n" + "Na serveru s Docker stackem EMS spusťte skript uvnitř kontejneru backendu, např.:\n" + " cd /opt/ems-deploy\n" + " docker compose exec -T backend python3 scripts/backfill_ote_prices.py --dry-run\n" + "\n" + "Lokálně nainstalujte závislosti: pip install -r backend/requirements.txt\n", + file=sys.stderr, + ) + raise SystemExit(1) from e + +from app.config import get_settings # noqa: E402 +from services.price_importer import ( # noqa: E402 + OTE_EXPECTED_SLOTS, + backfill_ote_prices, + count_ote_slots_prague_day, +) + +PRAGUE = ZoneInfo("Europe/Prague") + + +def _parse_ymd(s: str) -> date: + y, m, d = (int(p) for p in s.split("-", 2)) + return date(y, m, d) + + +async def _dry_run_missing( + conn: asyncpg.Connection, + start: date, + end: date, + today_prague: date, +) -> list[date]: + out: list[date] = [] + d = start + while d <= end: + if d > today_prague: + break + n = await count_ote_slots_prague_day(conn, d) + if n < OTE_EXPECTED_SLOTS: + out.append(d) + d += timedelta(days=1) + return out + + +async def _refresh_predictions_all(conn: asyncpg.Connection) -> None: + sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") + for row in sites: + sid = int(row["id"]) + try: + await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", sid) + logging.info("Predikce záporných cen obnovena pro site_id=%s", sid) + except Exception: + logging.exception("fn_predict_negative_price_windows selhalo pro site_id=%s", sid) + + +async def main_async(args: argparse.Namespace) -> int: + settings = get_settings() + pool = await asyncpg.create_pool( + host=settings.db_host, + port=settings.db_port, + user=settings.db_user, + password=settings.db_password, + database=settings.db_name, + min_size=1, + max_size=3, + ) + try: + today_prague = datetime.now(PRAGUE).date() + if args.to_date: + end = _parse_ymd(args.to_date) + else: + end = today_prague + if args.from_date: + start = _parse_ymd(args.from_date) + else: + start = end - timedelta(days=max(0, int(args.days) - 1)) + + if start > end: + logging.error("--from-date je po --to-date") + return 2 + + logging.info( + "Rozsah backfillu: %s … %s (kurz EUR/CZK z .env = %s)", + start.isoformat(), + end.isoformat(), + settings.eur_czk_rate, + ) + + async with pool.acquire() as conn: + if args.dry_run: + missing = await _dry_run_missing(conn, start, end, today_prague) + logging.info( + "Dry-run: %s chybějících nebo neúplných dní (< %s slotů)", + len(missing), + OTE_EXPECTED_SLOTS, + ) + for md in missing[:50]: + n = await count_ote_slots_prague_day(conn, md) + logging.info(" %s (%s slotů)", md.isoformat(), n) + if len(missing) > 50: + logging.info(" … a dalších %s dní", len(missing) - 50) + return 0 + + stats = await backfill_ote_prices( + conn, + start_date=start, + end_date=end, + only_missing=not args.force, + pause_between_days_s=float(args.delay), + ) + logging.info( + "Hotovo: zkontrolováno %s dní, importováno %s, přeskočeno (kompletní) %s, " + "přeskočeno (budoucnost) %s, selhalo %s", + stats.days_checked, + stats.days_imported, + stats.days_skipped_complete, + stats.days_skipped_future, + stats.days_failed, + ) + for day_str, err in stats.failures[:20]: + logging.warning(" %s: %s", day_str, err) + if len(stats.failures) > 20: + logging.warning(" … dalších %s chyb v seznamu", len(stats.failures) - 20) + + if args.refresh_predictions and stats.days_imported > 0: + await _refresh_predictions_all(conn) + + return 1 if stats.days_failed else 0 + finally: + await pool.close() + + +def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(levelname)s %(message)s", + ) + parser = argparse.ArgumentParser(description="Backfill OTE cen do ems.market_interval_price") + parser.add_argument( + "--days", + type=int, + default=730, + help="Počet dní zpět od --to-date (výchozí 730)", + ) + parser.add_argument("--from-date", type=str, default=None, help="YYYY-MM-DD začátek rozsahu") + parser.add_argument( + "--to-date", + type=str, + default=None, + help="YYYY-MM-DD konec rozsahu (výchozí dnes Europe/Prague)", + ) + parser.add_argument( + "--force", + action="store_true", + help="Stáhnout znovu i dny s plnými %s sloty" % OTE_EXPECTED_SLOTS, + ) + parser.add_argument("--dry-run", action="store_true", help="Jen vypsat chybějící dny") + parser.add_argument( + "--delay", + type=float, + default=0.35, + help="Sekundy pauzy mezi dny (výchozí 0.35)", + ) + parser.add_argument( + "--refresh-predictions", + action="store_true", + help="Po importu obnovit fn_predict_negative_price_windows pro aktivní lokality", + ) + ns = parser.parse_args() + raise SystemExit(asyncio.run(main_async(ns))) + + +if __name__ == "__main__": + main() diff --git a/deploy/run_backfill_ote_prices.sh b/deploy/run_backfill_ote_prices.sh new file mode 100755 index 0000000..d38087c --- /dev/null +++ b/deploy/run_backfill_ote_prices.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Spustí backfill OTE cen uvnitř kontejneru backend (asyncpg tam už je v image). +set -euo pipefail +HERE="$(cd "$(dirname "$0")" && pwd)" +if [[ -f "$HERE/../docker-compose.yml" ]]; then + # Git checkout: repo/deploy/run_*.sh → compose v kořeni repa + COMPOSE_DIR="$(cd "$HERE/.." && pwd)" +elif [[ -f "$HERE/../../docker-compose.yml" ]]; then + # Produkce: /opt/ems-deploy/app/deploy/run_*.sh → compose v /opt/ems-deploy + COMPOSE_DIR="$(cd "$HERE/../.." && pwd)" +else + echo "Nenašel jsem docker-compose.yml (nadřazený adresář deploy/ nebo app/)." >&2 + echo "Spusťte ručně z kořene stacku: docker compose exec -T backend python3 scripts/backfill_ote_prices.py …" >&2 + exit 1 +fi +cd "$COMPOSE_DIR" +exec docker compose exec -T backend python3 scripts/backfill_ote_prices.py "$@" diff --git a/scripts/backfill_ote_prices.py b/scripts/backfill_ote_prices.py index 566f688..42b44f3 100644 --- a/scripts/backfill_ote_prices.py +++ b/scripts/backfill_ote_prices.py @@ -1,201 +1,26 @@ #!/usr/bin/env python3 """ -Doplnění ems.market_interval_price z veřejného OTE JSON endpointu (stejný jako price_importer). +Skript byl přesunut do backend/scripts/ (je součástí Docker image backendu). -Spuštění z kořene repozitáře (načte .env z kořene): +Lokálně: + PYTHONPATH=backend python3 backend/scripts/backfill_ote_prices.py --dry-run - cd /path/to/ems-cursor - PYTHONPATH=backend python scripts/backfill_ote_prices.py - -Volby: - --days 730 posledních N kalendářních dní (Europe/Prague), výchozí 730 ≈ 2 roky - --from-date / --to-date pevný rozsah YYYY-MM-DD (má přednost před --days u konce rozsahu) - --force stáhnout znovu i dny, kde už je 96 slotů - --dry-run jen vypsat chybějící dny, bez HTTP - --delay SEC pauza mezi dny (výchozí 0.35) - --refresh-predictions po skončení zavolat fn_predict_negative_price_windows pro aktivní site +Produkce (Docker): + cd /opt/ems-deploy && docker compose exec -T backend python3 scripts/backfill_ote_prices.py --dry-run """ from __future__ import annotations -import argparse -import asyncio -import logging -import os import sys -from datetime import date, datetime, timedelta -from pathlib import Path -from zoneinfo import ZoneInfo - -# Kořen repa = rodič scripts/ -_REPO_ROOT = Path(__file__).resolve().parent.parent -_BACKEND = _REPO_ROOT / "backend" -if str(_BACKEND) not in sys.path: - sys.path.insert(0, str(_BACKEND)) - -os.chdir(_REPO_ROOT) - -import asyncpg # noqa: E402 - -from app.config import get_settings # noqa: E402 -from services.price_importer import ( # noqa: E402 - OTE_EXPECTED_SLOTS, - backfill_ote_prices, - count_ote_slots_prague_day, -) - -PRAGUE = ZoneInfo("Europe/Prague") - - -def _parse_ymd(s: str) -> date: - y, m, d = (int(p) for p in s.split("-", 2)) - return date(y, m, d) - - -async def _dry_run_missing( - conn: asyncpg.Connection, - start: date, - end: date, - today_prague: date, -) -> list[date]: - out: list[date] = [] - d = start - while d <= end: - if d > today_prague: - break - n = await count_ote_slots_prague_day(conn, d) - if n < OTE_EXPECTED_SLOTS: - out.append(d) - d += timedelta(days=1) - return out - - -async def _refresh_predictions_all(conn: asyncpg.Connection) -> None: - sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") - for row in sites: - sid = int(row["id"]) - try: - await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", sid) - logging.info("Predikce záporných cen obnovena pro site_id=%s", sid) - except Exception: - logging.exception("fn_predict_negative_price_windows selhalo pro site_id=%s", sid) - - -async def main_async(args: argparse.Namespace) -> int: - settings = get_settings() - pool = await asyncpg.create_pool( - host=settings.db_host, - port=settings.db_port, - user=settings.db_user, - password=settings.db_password, - database=settings.db_name, - min_size=1, - max_size=3, - ) - try: - today_prague = datetime.now(PRAGUE).date() - if args.to_date: - end = _parse_ymd(args.to_date) - else: - end = today_prague - if args.from_date: - start = _parse_ymd(args.from_date) - else: - start = end - timedelta(days=max(0, int(args.days) - 1)) - - if start > end: - logging.error("--from-date je po --to-date") - return 2 - - logging.info( - "Rozsah backfillu: %s … %s (kurz EUR/CZK z .env = %s)", - start.isoformat(), - end.isoformat(), - settings.eur_czk_rate, - ) - - async with pool.acquire() as conn: - if args.dry_run: - missing = await _dry_run_missing(conn, start, end, today_prague) - logging.info( - "Dry-run: %s chybějících nebo neúplných dní (< %s slotů)", - len(missing), - OTE_EXPECTED_SLOTS, - ) - for md in missing[:50]: - n = await count_ote_slots_prague_day(conn, md) - logging.info(" %s (%s slotů)", md.isoformat(), n) - if len(missing) > 50: - logging.info(" … a dalších %s dní", len(missing) - 50) - return 0 - - stats = await backfill_ote_prices( - conn, - start_date=start, - end_date=end, - only_missing=not args.force, - pause_between_days_s=float(args.delay), - ) - logging.info( - "Hotovo: zkontrolováno %s dní, importováno %s, přeskočeno (kompletní) %s, " - "přeskočeno (budoucnost) %s, selhalo %s", - stats.days_checked, - stats.days_imported, - stats.days_skipped_complete, - stats.days_skipped_future, - stats.days_failed, - ) - for day_str, err in stats.failures[:20]: - logging.warning(" %s: %s", day_str, err) - if len(stats.failures) > 20: - logging.warning(" … dalších %s chyb v seznamu", len(stats.failures) - 20) - - if args.refresh_predictions and stats.days_imported > 0: - await _refresh_predictions_all(conn) - - return 1 if stats.days_failed else 0 - finally: - await pool.close() def main() -> None: - logging.basicConfig( - level=logging.INFO, - format="%(levelname)s %(message)s", + print( + "Tento soubor už se nepoužívá — spusťte backend/scripts/backfill_ote_prices.py\n" + "(viz docstring v tomto souboru nebo deploy/run_backfill_ote_prices.sh).", + file=sys.stderr, ) - parser = argparse.ArgumentParser(description="Backfill OTE cen do ems.market_interval_price") - parser.add_argument( - "--days", - type=int, - default=730, - help="Počet dní zpět od --to-date (výchozí 730)", - ) - parser.add_argument("--from-date", type=str, default=None, help="YYYY-MM-DD začátek rozsahu") - parser.add_argument( - "--to-date", - type=str, - default=None, - help="YYYY-MM-DD konec rozsahu (výchozí dnes Europe/Prague)", - ) - parser.add_argument( - "--force", - action="store_true", - help="Stáhnout znovu i dny s plnými %s sloty" % OTE_EXPECTED_SLOTS, - ) - parser.add_argument("--dry-run", action="store_true", help="Jen vypsat chybějící dny") - parser.add_argument( - "--delay", - type=float, - default=0.35, - help="Sekundy pauzy mezi dny (výchozí 0.35)", - ) - parser.add_argument( - "--refresh-predictions", - action="store_true", - help="Po importu obnovit fn_predict_negative_price_windows pro aktivní lokality", - ) - ns = parser.parse_args() - raise SystemExit(asyncio.run(main_async(ns))) + raise SystemExit(1) if __name__ == "__main__":