OTE backkfill
All checks were successful
deploy / deploy (push) Successful in 24s
test / smoke-test (push) Successful in 6s

This commit is contained in:
Dusan Vojacek
2026-04-12 21:32:27 +02:00
parent 3c9916f2c0
commit 0e5227eb5b
2 changed files with 345 additions and 12 deletions

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
@@ -13,6 +14,8 @@ from app.config import get_settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
OTE_EXPECTED_SLOTS = 96
OTE_URL = ( OTE_URL = (
"https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/" "https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/"
"@@chart-data?report_date={date}&time_resolution=PT15M" "@@chart-data?report_date={date}&time_resolution=PT15M"
@@ -93,6 +96,135 @@ async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]:
OTE_TZ = ZoneInfo("Europe/Prague") OTE_TZ = ZoneInfo("Europe/Prague")
async def _apply_ote_json_to_db(conn, payload: dict) -> int:
"""Zapíše JSON z OTE přes ems.fn_ote_import_from_json; vrátí ROW_COUNT z funkce."""
settings = get_settings()
eur_czk = float(settings.eur_czk_rate)
n = await conn.fetchval(
"SELECT ems.fn_ote_import_from_json($1::jsonb, $2)",
json.dumps(payload),
eur_czk,
)
return int(n)
async def count_ote_slots_prague_day(conn, target_day: date) -> int:
"""Počet řádků OTE_CZ pro kalendářní den v Europe/Prague (96 očekáváno)."""
return int(
await conn.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date
""",
target_day,
)
or 0
)
async def import_ote_prices_for_day(
conn,
target_day: date,
) -> tuple[int, str, float, str | None]:
"""
Stáhne OTE pro jeden konkrétní report_date a uloží přes fn_ote_import_from_json.
Stejný význam návratové hodnoty jako import_ote_prices().
"""
day_str = target_day.isoformat()
payload, fetch_error = await _fetch_ote_json(day_str)
if payload is None:
return -1, day_str, 0.0, fetch_error or "fetch_failed"
try:
n = await _apply_ote_json_to_db(conn, payload)
first_price = await conn.fetchval(
"""
SELECT buy_raw_price_czk_kwh
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date
ORDER BY interval_start
LIMIT 1
""",
target_day,
)
n_imported = await count_ote_slots_prague_day(conn, target_day)
if n_imported < OTE_EXPECTED_SLOTS:
logger.warning(
"OTE: jen %s/%s slotů pro %s (Europe/Prague)",
n_imported,
OTE_EXPECTED_SLOTS,
day_str,
)
logger.info(
"OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh",
n,
day_str,
float(first_price or 0),
)
return n, day_str, float(first_price or 0.0), None
except Exception as e:
detail = str(e).strip() or e.__class__.__name__
logger.error("OTE import DB error pro %s: %s", day_str, detail, exc_info=True)
short = detail[:200] if len(detail) > 200 else detail
return -1, day_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"
@dataclass
class OteBackfillStats:
start_date: date
end_date: date
days_checked: int = 0
days_imported: int = 0
days_skipped_complete: int = 0
days_skipped_future: int = 0
days_failed: int = 0
failures: list[tuple[str, str]] = field(default_factory=list)
async def backfill_ote_prices(
conn,
*,
start_date: date,
end_date: date,
only_missing: bool = True,
pause_between_days_s: float = 0.35,
max_failures_logged: int = 80,
) -> OteBackfillStats:
"""
Projde rozsah [start_date, end_date] (kalendář Prague) a doplní chybějící dny z OTE.
only_missing: přeskočí dny, kde už je count >= OTE_EXPECTED_SLOTS.
pause_between_days_s: krátká pauza mezi HTTP požadavky (ohleduplnost k OTE).
"""
stats = OteBackfillStats(start_date=start_date, end_date=end_date)
today_prague = datetime.now(OTE_TZ).date()
d = start_date
while d <= end_date:
stats.days_checked += 1
if d > today_prague:
stats.days_skipped_future += 1
d += timedelta(days=1)
continue
slots = await count_ote_slots_prague_day(conn, d)
if only_missing and slots >= OTE_EXPECTED_SLOTS:
stats.days_skipped_complete += 1
d += timedelta(days=1)
continue
n, day_str, _, err = await import_ote_prices_for_day(conn, d)
if n < 0:
stats.days_failed += 1
if len(stats.failures) < max_failures_logged:
stats.failures.append((day_str, err or "unknown"))
else:
stats.days_imported += 1
if pause_between_days_s > 0:
await asyncio.sleep(pause_between_days_s)
d += timedelta(days=1)
return stats
async def import_ote_prices( async def import_ote_prices(
db, db,
site_id: int | None = None, site_id: int | None = None,
@@ -105,8 +237,6 @@ async def import_ote_prices(
Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code) Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code)
(-1, datum_str, 0.0, error_code) při chybě (-1, datum_str, 0.0, error_code) při chybě
""" """
settings = get_settings()
if site_id is not None: if site_id is not None:
row = await db.fetchrow( row = await db.fetchrow(
"SELECT timezone FROM ems.site WHERE id = $1", site_id "SELECT timezone FROM ems.site WHERE id = $1", site_id
@@ -149,14 +279,8 @@ async def import_ote_prices(
date_str = target_day.isoformat() date_str = target_day.isoformat()
# Vše ostatní řeší PostgreSQL funkce
eur_czk = float(settings.eur_czk_rate)
try: try:
n = await db.fetchval( n = await _apply_ote_json_to_db(db, payload)
"SELECT ems.fn_ote_import_from_json($1::jsonb, $2)",
json.dumps(payload),
eur_czk,
)
first_price = await db.fetchval( first_price = await db.fetchval(
""" """
SELECT buy_raw_price_czk_kwh SELECT buy_raw_price_czk_kwh
@@ -177,7 +301,7 @@ async def import_ote_prices(
""", """,
target_day, target_day,
) )
incomplete = (n_imported or 0) < 96 incomplete = (n_imported or 0) < OTE_EXPECTED_SLOTS
if incomplete: if incomplete:
now_p = datetime.now(ZoneInfo("Europe/Prague")) now_p = datetime.now(ZoneInfo("Europe/Prague"))
tomorrow_p = (now_p + timedelta(days=1)).date() tomorrow_p = (now_p + timedelta(days=1)).date()
@@ -186,10 +310,17 @@ async def import_ote_prices(
target_day == tomorrow_p target_day == tomorrow_p
and (now_p.hour, now_p.minute) < (14, 30) and (now_p.hour, now_p.minute) < (14, 30)
): ):
logger.warning("OTE: jen %s/96 slotů pro %s", n_imported, date_str) logger.warning(
"OTE: jen %s/%s slotů pro %s",
n_imported,
OTE_EXPECTED_SLOTS,
date_str,
)
logger.info( logger.info(
"OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh", "OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh",
n, date_str, float(first_price or 0), n,
date_str,
float(first_price or 0),
) )
return int(n), date_str, float(first_price or 0.0), None return int(n), date_str, float(first_price or 0.0), None
except Exception as e: except Exception as e:

View File

@@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""
Doplnění ems.market_interval_price z veřejného OTE JSON endpointu (stejný jako price_importer).
Spuštění z kořene repozitáře (načte .env z kořene):
cd /path/to/ems-cursor
PYTHONPATH=backend python scripts/backfill_ote_prices.py
Volby:
--days 730 posledních N kalendářních dní (Europe/Prague), výchozí 730 ≈ 2 roky
--from-date / --to-date pevný rozsah YYYY-MM-DD (má přednost před --days u konce rozsahu)
--force stáhnout znovu i dny, kde už je 96 slotů
--dry-run jen vypsat chybějící dny, bez HTTP
--delay SEC pauza mezi dny (výchozí 0.35)
--refresh-predictions po skončení zavolat fn_predict_negative_price_windows pro aktivní site
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
# Kořen repa = rodič scripts/
_REPO_ROOT = Path(__file__).resolve().parent.parent
_BACKEND = _REPO_ROOT / "backend"
if str(_BACKEND) not in sys.path:
sys.path.insert(0, str(_BACKEND))
os.chdir(_REPO_ROOT)
import asyncpg # noqa: E402
from app.config import get_settings # noqa: E402
from services.price_importer import ( # noqa: E402
OTE_EXPECTED_SLOTS,
backfill_ote_prices,
count_ote_slots_prague_day,
)
PRAGUE = ZoneInfo("Europe/Prague")
def _parse_ymd(s: str) -> date:
y, m, d = (int(p) for p in s.split("-", 2))
return date(y, m, d)
async def _dry_run_missing(
conn: asyncpg.Connection,
start: date,
end: date,
today_prague: date,
) -> list[date]:
out: list[date] = []
d = start
while d <= end:
if d > today_prague:
break
n = await count_ote_slots_prague_day(conn, d)
if n < OTE_EXPECTED_SLOTS:
out.append(d)
d += timedelta(days=1)
return out
async def _refresh_predictions_all(conn: asyncpg.Connection) -> None:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for row in sites:
sid = int(row["id"])
try:
await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", sid)
logging.info("Predikce záporných cen obnovena pro site_id=%s", sid)
except Exception:
logging.exception("fn_predict_negative_price_windows selhalo pro site_id=%s", sid)
async def main_async(args: argparse.Namespace) -> int:
settings = get_settings()
pool = await asyncpg.create_pool(
host=settings.db_host,
port=settings.db_port,
user=settings.db_user,
password=settings.db_password,
database=settings.db_name,
min_size=1,
max_size=3,
)
try:
today_prague = datetime.now(PRAGUE).date()
if args.to_date:
end = _parse_ymd(args.to_date)
else:
end = today_prague
if args.from_date:
start = _parse_ymd(args.from_date)
else:
start = end - timedelta(days=max(0, int(args.days) - 1))
if start > end:
logging.error("--from-date je po --to-date")
return 2
logging.info(
"Rozsah backfillu: %s%s (kurz EUR/CZK z .env = %s)",
start.isoformat(),
end.isoformat(),
settings.eur_czk_rate,
)
async with pool.acquire() as conn:
if args.dry_run:
missing = await _dry_run_missing(conn, start, end, today_prague)
logging.info(
"Dry-run: %s chybějících nebo neúplných dní (< %s slotů)",
len(missing),
OTE_EXPECTED_SLOTS,
)
for md in missing[:50]:
n = await count_ote_slots_prague_day(conn, md)
logging.info(" %s (%s slotů)", md.isoformat(), n)
if len(missing) > 50:
logging.info(" … a dalších %s dní", len(missing) - 50)
return 0
stats = await backfill_ote_prices(
conn,
start_date=start,
end_date=end,
only_missing=not args.force,
pause_between_days_s=float(args.delay),
)
logging.info(
"Hotovo: zkontrolováno %s dní, importováno %s, přeskočeno (kompletní) %s, "
"přeskočeno (budoucnost) %s, selhalo %s",
stats.days_checked,
stats.days_imported,
stats.days_skipped_complete,
stats.days_skipped_future,
stats.days_failed,
)
for day_str, err in stats.failures[:20]:
logging.warning(" %s: %s", day_str, err)
if len(stats.failures) > 20:
logging.warning(" … dalších %s chyb v seznamu", len(stats.failures) - 20)
if args.refresh_predictions and stats.days_imported > 0:
await _refresh_predictions_all(conn)
return 1 if stats.days_failed else 0
finally:
await pool.close()
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(message)s",
)
parser = argparse.ArgumentParser(description="Backfill OTE cen do ems.market_interval_price")
parser.add_argument(
"--days",
type=int,
default=730,
help="Počet dní zpět od --to-date (výchozí 730)",
)
parser.add_argument("--from-date", type=str, default=None, help="YYYY-MM-DD začátek rozsahu")
parser.add_argument(
"--to-date",
type=str,
default=None,
help="YYYY-MM-DD konec rozsahu (výchozí dnes Europe/Prague)",
)
parser.add_argument(
"--force",
action="store_true",
help="Stáhnout znovu i dny s plnými %s sloty" % OTE_EXPECTED_SLOTS,
)
parser.add_argument("--dry-run", action="store_true", help="Jen vypsat chybějící dny")
parser.add_argument(
"--delay",
type=float,
default=0.35,
help="Sekundy pauzy mezi dny (výchozí 0.35)",
)
parser.add_argument(
"--refresh-predictions",
action="store_true",
help="Po importu obnovit fn_predict_negative_price_windows pro aktivní lokality",
)
ns = parser.parse_args()
raise SystemExit(asyncio.run(main_async(ns)))
if __name__ == "__main__":
main()