Files
ems/backend/services/price_importer.py
Dusan Vojacek 0e5227eb5b
All checks were successful
deploy / deploy (push) Successful in 24s
test / smoke-test (push) Successful in 6s
OTE backkfill
2026-04-12 21:32:27 +02:00

331 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""OTE CZ price import Python dělá pouze HTTP fetch, logika je v PostgreSQL."""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo
import httpx
from app.config import get_settings
logger = logging.getLogger(__name__)
OTE_EXPECTED_SLOTS = 96
OTE_URL = (
"https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/"
"@@chart-data?report_date={date}&time_resolution=PT15M"
)
def _is_retryable_status(status_code: int) -> bool:
return status_code in {408, 425, 429, 500, 502, 503, 504}
async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]:
url = OTE_URL.format(date=date_str)
timeout = httpx.Timeout(connect=10.0, read=45.0, write=10.0, pool=10.0)
headers = {
"User-Agent": "Mozilla/5.0 (compatible; EMS/1.0; +https://www.ote-cr.cz)",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "cs-CZ,cs;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
max_attempts = 4
backoff_s = 1.0
last_err: str | None = None
async with httpx.AsyncClient(
timeout=timeout,
headers=headers,
follow_redirects=True,
) as client:
for attempt in range(1, max_attempts + 1):
try:
logger.info("OTE fetch %s attempt %s/%s", date_str, attempt, max_attempts)
resp = await client.get(url)
if _is_retryable_status(resp.status_code) and attempt < max_attempts:
last_err = f"http_status:{resp.status_code}"
logger.warning(
"OTE temporary HTTP %s for %s (attempt %s/%s), retrying",
resp.status_code,
date_str,
attempt,
max_attempts,
)
await asyncio.sleep(backoff_s)
backoff_s *= 2.0
continue
resp.raise_for_status()
return resp.json(), None
except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout) as e:
last_err = f"timeout_or_connect:{e.__class__.__name__}"
if attempt < max_attempts:
logger.warning(
"OTE request failed for %s (%s), retrying %s/%s",
date_str,
e.__class__.__name__,
attempt,
max_attempts,
)
await asyncio.sleep(backoff_s)
backoff_s *= 2.0
continue
logger.error("OTE fetch failed for %s after retries: %s", date_str, e)
except httpx.HTTPStatusError as e:
code = e.response.status_code if e.response is not None else "unknown"
last_err = f"http_status:{code}"
logger.error("OTE HTTP error for %s: %s", date_str, code)
break
except json.JSONDecodeError as e:
last_err = f"invalid_json:{e.__class__.__name__}"
logger.error("OTE invalid JSON for %s: %s", date_str, e)
break
except Exception as e:
last_err = f"unexpected:{e.__class__.__name__}"
logger.error("OTE fetch unexpected error for %s: %s", date_str, e)
break
return None, last_err
OTE_TZ = ZoneInfo("Europe/Prague")
async def _apply_ote_json_to_db(conn, payload: dict) -> int:
"""Zapíše JSON z OTE přes ems.fn_ote_import_from_json; vrátí ROW_COUNT z funkce."""
settings = get_settings()
eur_czk = float(settings.eur_czk_rate)
n = await conn.fetchval(
"SELECT ems.fn_ote_import_from_json($1::jsonb, $2)",
json.dumps(payload),
eur_czk,
)
return int(n)
async def count_ote_slots_prague_day(conn, target_day: date) -> int:
"""Počet řádků OTE_CZ pro kalendářní den v Europe/Prague (96 očekáváno)."""
return int(
await conn.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date
""",
target_day,
)
or 0
)
async def import_ote_prices_for_day(
conn,
target_day: date,
) -> tuple[int, str, float, str | None]:
"""
Stáhne OTE pro jeden konkrétní report_date a uloží přes fn_ote_import_from_json.
Stejný význam návratové hodnoty jako import_ote_prices().
"""
day_str = target_day.isoformat()
payload, fetch_error = await _fetch_ote_json(day_str)
if payload is None:
return -1, day_str, 0.0, fetch_error or "fetch_failed"
try:
n = await _apply_ote_json_to_db(conn, payload)
first_price = await conn.fetchval(
"""
SELECT buy_raw_price_czk_kwh
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND (interval_start AT TIME ZONE 'Europe/Prague')::date = $1::date
ORDER BY interval_start
LIMIT 1
""",
target_day,
)
n_imported = await count_ote_slots_prague_day(conn, target_day)
if n_imported < OTE_EXPECTED_SLOTS:
logger.warning(
"OTE: jen %s/%s slotů pro %s (Europe/Prague)",
n_imported,
OTE_EXPECTED_SLOTS,
day_str,
)
logger.info(
"OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh",
n,
day_str,
float(first_price or 0),
)
return n, day_str, float(first_price or 0.0), None
except Exception as e:
detail = str(e).strip() or e.__class__.__name__
logger.error("OTE import DB error pro %s: %s", day_str, detail, exc_info=True)
short = detail[:200] if len(detail) > 200 else detail
return -1, day_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"
@dataclass
class OteBackfillStats:
start_date: date
end_date: date
days_checked: int = 0
days_imported: int = 0
days_skipped_complete: int = 0
days_skipped_future: int = 0
days_failed: int = 0
failures: list[tuple[str, str]] = field(default_factory=list)
async def backfill_ote_prices(
conn,
*,
start_date: date,
end_date: date,
only_missing: bool = True,
pause_between_days_s: float = 0.35,
max_failures_logged: int = 80,
) -> OteBackfillStats:
"""
Projde rozsah [start_date, end_date] (kalendář Prague) a doplní chybějící dny z OTE.
only_missing: přeskočí dny, kde už je count >= OTE_EXPECTED_SLOTS.
pause_between_days_s: krátká pauza mezi HTTP požadavky (ohleduplnost k OTE).
"""
stats = OteBackfillStats(start_date=start_date, end_date=end_date)
today_prague = datetime.now(OTE_TZ).date()
d = start_date
while d <= end_date:
stats.days_checked += 1
if d > today_prague:
stats.days_skipped_future += 1
d += timedelta(days=1)
continue
slots = await count_ote_slots_prague_day(conn, d)
if only_missing and slots >= OTE_EXPECTED_SLOTS:
stats.days_skipped_complete += 1
d += timedelta(days=1)
continue
n, day_str, _, err = await import_ote_prices_for_day(conn, d)
if n < 0:
stats.days_failed += 1
if len(stats.failures) < max_failures_logged:
stats.failures.append((day_str, err or "unknown"))
else:
stats.days_imported += 1
if pause_between_days_s > 0:
await asyncio.sleep(pause_between_days_s)
d += timedelta(days=1)
return stats
async def import_ote_prices(
db,
site_id: int | None = None,
target_date: date | None = None,
) -> tuple[int, str, float, str | None]:
"""
Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json.
Data jsou globální (market_interval_price); site_id je volitelné jen pro výběr
„dnes/zítra“ při target_date=None (timezone lokality vs. výchozí Europe/Prague).
Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code)
(-1, datum_str, 0.0, error_code) při chybě
"""
if site_id is not None:
row = await db.fetchrow(
"SELECT timezone FROM ems.site WHERE id = $1", site_id
)
if row is None:
logger.error("OTE import: site id=%s nenalezen", site_id)
return -1, "", 0.0, "site_not_found"
site_tz = ZoneInfo(row["timezone"] or "Europe/Prague")
else:
site_tz = OTE_TZ
now_site = datetime.now(site_tz)
today_site = now_site.date()
tomorrow_site = today_site + timedelta(days=1)
candidate_days = [target_date] if target_date is not None else [tomorrow_site, today_site]
payload: dict | None = None
fetch_error: str | None = None
target_day = candidate_days[0]
# Varování před 13:30 CET při implicitním (zítra) importu.
if target_date is None:
now_cet = datetime.now(ZoneInfo("Europe/Prague"))
if now_cet.hour < 13 or (now_cet.hour == 13 and now_cet.minute < 30):
logger.warning(
"OTE: ceny pro %s nemusí být dostupné (před 13:30 CET), použiji fallback na dnešek",
tomorrow_site.isoformat(),
)
for day in candidate_days:
day_str = day.isoformat()
payload, fetch_error = await _fetch_ote_json(day_str)
if payload is not None:
target_day = day
break
logger.warning("OTE fetch selhal pro %s (err=%s)", day_str, fetch_error)
if payload is None:
return -1, candidate_days[0].isoformat(), 0.0, fetch_error or "fetch_failed"
date_str = target_day.isoformat()
try:
n = await _apply_ote_json_to_db(db, payload)
first_price = await db.fetchval(
"""
SELECT buy_raw_price_czk_kwh
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start::date = $1::date
ORDER BY interval_start
LIMIT 1
""",
target_day,
)
n_imported = await db.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start::date = $1::date
""",
target_day,
)
incomplete = (n_imported or 0) < OTE_EXPECTED_SLOTS
if incomplete:
now_p = datetime.now(ZoneInfo("Europe/Prague"))
tomorrow_p = (now_p + timedelta(days=1)).date()
# Stejná logika jako dashboard: neúplný D+1 před 14:30 je očekávaný
if not (
target_day == tomorrow_p
and (now_p.hour, now_p.minute) < (14, 30)
):
logger.warning(
"OTE: jen %s/%s slotů pro %s",
n_imported,
OTE_EXPECTED_SLOTS,
date_str,
)
logger.info(
"OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh",
n,
date_str,
float(first_price or 0),
)
return int(n), date_str, float(first_price or 0.0), None
except Exception as e:
detail = str(e).strip() or e.__class__.__name__
logger.error("OTE import DB error: %s", detail, exc_info=True)
short = detail[:200] if len(detail) > 200 else detail
return -1, date_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"