Files
ems/backend/services/price_importer.py
Dusan Vojacek 93f883f5e0
Some checks failed
CI and deploy / migration-check (push) Successful in 5s
CI and deploy / deploy (push) Failing after 20s
sql first refactor
2026-04-19 20:02:20 +02:00

323 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""OTE CZ price import Python dělá pouze HTTP fetch, logika je v PostgreSQL."""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from zoneinfo import ZoneInfo
import httpx
from app.config import get_settings
from app.db_json import fetch_json
logger = logging.getLogger(__name__)
# Běžný kalendářní den na DAM = 96 čtvrthodin; 92 při přechodu na letní čas, 100 na zimní.
OTE_TYPICAL_SLOTS = 96
OTE_FULL_DAY_SLOT_COUNTS: frozenset[int] = frozenset({92, 96, 100})
# Zpětná kompatibilita ve starších importech
OTE_EXPECTED_SLOTS = OTE_TYPICAL_SLOTS
def ote_prague_day_slots_look_complete(slot_count: int) -> bool:
"""True, pokud počet řádků odpovídá celému obchodnímu dni OTE (včetně DST)."""
return slot_count in OTE_FULL_DAY_SLOT_COUNTS
OTE_URL = (
"https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/"
"@@chart-data?report_date={date}&time_resolution=PT15M"
)
def _is_retryable_status(status_code: int) -> bool:
return status_code in {408, 425, 429, 500, 502, 503, 504}
async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]:
url = OTE_URL.format(date=date_str)
timeout = httpx.Timeout(connect=10.0, read=45.0, write=10.0, pool=10.0)
headers = {
"User-Agent": "Mozilla/5.0 (compatible; EMS/1.0; +https://www.ote-cr.cz)",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "cs-CZ,cs;q=0.9,en;q=0.8",
"Connection": "keep-alive",
}
max_attempts = 4
backoff_s = 1.0
last_err: str | None = None
async with httpx.AsyncClient(
timeout=timeout,
headers=headers,
follow_redirects=True,
) as client:
for attempt in range(1, max_attempts + 1):
try:
logger.info("OTE fetch %s attempt %s/%s", date_str, attempt, max_attempts)
resp = await client.get(url)
if _is_retryable_status(resp.status_code) and attempt < max_attempts:
last_err = f"http_status:{resp.status_code}"
logger.warning(
"OTE temporary HTTP %s for %s (attempt %s/%s), retrying",
resp.status_code,
date_str,
attempt,
max_attempts,
)
await asyncio.sleep(backoff_s)
backoff_s *= 2.0
continue
resp.raise_for_status()
return resp.json(), None
except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout) as e:
last_err = f"timeout_or_connect:{e.__class__.__name__}"
if attempt < max_attempts:
logger.warning(
"OTE request failed for %s (%s), retrying %s/%s",
date_str,
e.__class__.__name__,
attempt,
max_attempts,
)
await asyncio.sleep(backoff_s)
backoff_s *= 2.0
continue
logger.error("OTE fetch failed for %s after retries: %s", date_str, e)
except httpx.HTTPStatusError as e:
code = e.response.status_code if e.response is not None else "unknown"
last_err = f"http_status:{code}"
logger.error("OTE HTTP error for %s: %s", date_str, code)
break
except json.JSONDecodeError as e:
last_err = f"invalid_json:{e.__class__.__name__}"
logger.error("OTE invalid JSON for %s: %s", date_str, e)
break
except Exception as e:
last_err = f"unexpected:{e.__class__.__name__}"
logger.error("OTE fetch unexpected error for %s: %s", date_str, e)
break
return None, last_err
OTE_TZ = ZoneInfo("Europe/Prague")
async def _apply_ote_json_to_db(conn, payload: dict) -> int:
"""Zapíše JSON z OTE přes ems.fn_ote_import_from_json; vrátí ROW_COUNT z funkce."""
settings = get_settings()
eur_czk = float(settings.eur_czk_rate)
n = await conn.fetchval(
"SELECT ems.fn_ote_import_from_json($1::jsonb, $2)",
json.dumps(payload),
eur_czk,
)
return int(n)
async def count_ote_slots_prague_day(conn, target_day: date) -> int:
"""Počet řádků OTE_CZ pro kalendářní den v Europe/Prague (plný den 92/96/100)."""
stats = await fetch_json(
conn,
"select ems.fn_ote_day_slot_stats_prague($1::date)",
target_day,
)
if not isinstance(stats, dict):
stats = json.loads(stats)
return int(stats.get("count") or 0)
async def import_ote_prices_for_day(
conn,
target_day: date,
) -> tuple[int, str, float, str | None]:
"""
Stáhne OTE pro jeden konkrétní report_date a uloží přes fn_ote_import_from_json.
Stejný význam návratové hodnoty jako import_ote_prices().
"""
day_str = target_day.isoformat()
payload, fetch_error = await _fetch_ote_json(day_str)
if payload is None:
return -1, day_str, 0.0, fetch_error or "fetch_failed"
try:
n = await _apply_ote_json_to_db(conn, payload)
stats_after = await fetch_json(
conn,
"select ems.fn_ote_day_slot_stats_prague($1::date)",
target_day,
)
if not isinstance(stats_after, dict):
stats_after = json.loads(stats_after)
first_price = stats_after.get("first_price")
n_imported = int(stats_after.get("count") or 0)
if not ote_prague_day_slots_look_complete(n_imported):
logger.warning(
"OTE: %s slotů pro %s (plný den = jedna z %s; jinak neúplná data)",
n_imported,
day_str,
sorted(OTE_FULL_DAY_SLOT_COUNTS),
)
logger.info(
"OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh",
n,
day_str,
float(first_price or 0),
)
return n, day_str, float(first_price or 0.0), None
except Exception as e:
detail = str(e).strip() or e.__class__.__name__
logger.error("OTE import DB error pro %s: %s", day_str, detail, exc_info=True)
short = detail[:200] if len(detail) > 200 else detail
return -1, day_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"
@dataclass
class OteBackfillStats:
start_date: date
end_date: date
days_checked: int = 0
days_imported: int = 0
days_skipped_complete: int = 0
days_skipped_future: int = 0
days_failed: int = 0
failures: list[tuple[str, str]] = field(default_factory=list)
async def backfill_ote_prices(
conn,
*,
start_date: date,
end_date: date,
only_missing: bool = True,
pause_between_days_s: float = 0.35,
max_failures_logged: int = 80,
) -> OteBackfillStats:
"""
Projde rozsah [start_date, end_date] (kalendář Prague) a doplní chybějící dny z OTE.
only_missing: přeskočí dny, kde už je „plný“ počet slotů (92/96/100 dle OTE).
pause_between_days_s: krátká pauza mezi HTTP požadavky (ohleduplnost k OTE).
"""
stats = OteBackfillStats(start_date=start_date, end_date=end_date)
today_prague = datetime.now(OTE_TZ).date()
d = start_date
while d <= end_date:
stats.days_checked += 1
if d > today_prague:
stats.days_skipped_future += 1
d += timedelta(days=1)
continue
slots = await count_ote_slots_prague_day(conn, d)
if only_missing and ote_prague_day_slots_look_complete(slots):
stats.days_skipped_complete += 1
d += timedelta(days=1)
continue
n, day_str, _, err = await import_ote_prices_for_day(conn, d)
if n < 0:
stats.days_failed += 1
if len(stats.failures) < max_failures_logged:
stats.failures.append((day_str, err or "unknown"))
else:
stats.days_imported += 1
if pause_between_days_s > 0:
await asyncio.sleep(pause_between_days_s)
d += timedelta(days=1)
return stats
async def import_ote_prices(
db,
site_id: int | None = None,
target_date: date | None = None,
) -> tuple[int, str, float, str | None]:
"""
Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json.
Data jsou globální (market_interval_price); site_id je volitelné jen pro výběr
„dnes/zítra“ při target_date=None (timezone lokality vs. výchozí Europe/Prague).
Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code)
(-1, datum_str, 0.0, error_code) při chybě
"""
if site_id is not None:
row = await db.fetchrow(
"select timezone from ems.vw_site_directory where id = $1", site_id
)
if row is None:
logger.error("OTE import: site id=%s nenalezen", site_id)
return -1, "", 0.0, "site_not_found"
site_tz = ZoneInfo(row["timezone"] or "Europe/Prague")
else:
site_tz = OTE_TZ
now_site = datetime.now(site_tz)
today_site = now_site.date()
tomorrow_site = today_site + timedelta(days=1)
candidate_days = [target_date] if target_date is not None else [tomorrow_site, today_site]
payload: dict | None = None
fetch_error: str | None = None
target_day = candidate_days[0]
# Varování před 13:30 CET při implicitním (zítra) importu.
if target_date is None:
now_cet = datetime.now(ZoneInfo("Europe/Prague"))
if now_cet.hour < 13 or (now_cet.hour == 13 and now_cet.minute < 30):
logger.warning(
"OTE: ceny pro %s nemusí být dostupné (před 13:30 CET), použiji fallback na dnešek",
tomorrow_site.isoformat(),
)
for day in candidate_days:
day_str = day.isoformat()
payload, fetch_error = await _fetch_ote_json(day_str)
if payload is not None:
target_day = day
break
logger.warning("OTE fetch selhal pro %s (err=%s)", day_str, fetch_error)
if payload is None:
return -1, candidate_days[0].isoformat(), 0.0, fetch_error or "fetch_failed"
date_str = target_day.isoformat()
try:
n = await _apply_ote_json_to_db(db, payload)
stats_after = await fetch_json(
db,
"select ems.fn_ote_day_slot_stats_prague($1::date)",
target_day,
)
if not isinstance(stats_after, dict):
stats_after = json.loads(stats_after)
first_price = stats_after.get("first_price")
n_imported = int(stats_after.get("count") or 0)
incomplete = not ote_prague_day_slots_look_complete(n_imported or 0)
if incomplete:
now_p = datetime.now(ZoneInfo("Europe/Prague"))
tomorrow_p = (now_p + timedelta(days=1)).date()
# Stejná logika jako dashboard: neúplný D+1 před 14:30 je očekávaný
if not (
target_day == tomorrow_p
and (now_p.hour, now_p.minute) < (14, 30)
):
logger.warning(
"OTE: %s slotů pro %s (plný den = jedna z %s)",
n_imported,
date_str,
sorted(OTE_FULL_DAY_SLOT_COUNTS),
)
logger.info(
"OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh",
n,
date_str,
float(first_price or 0),
)
return int(n), date_str, float(first_price or 0.0), None
except Exception as e:
detail = str(e).strip() or e.__class__.__name__
logger.error("OTE import DB error: %s", detail, exc_info=True)
short = detail[:200] if len(detail) > 200 else detail
return -1, date_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"