"""OTE CZ price import – Python dělá pouze HTTP fetch, logika je v PostgreSQL.""" from __future__ import annotations import asyncio import json import logging from dataclasses import dataclass, field from datetime import date, datetime, timedelta from zoneinfo import ZoneInfo import httpx from app.config import get_settings from app.db_json import fetch_json from services.notification_service import notify_ote_import_format_changed logger = logging.getLogger(__name__) # Běžný kalendářní den na DAM = 96 čtvrthodin; 92 při přechodu na letní čas, 100 na zimní. OTE_TYPICAL_SLOTS = 96 OTE_FULL_DAY_SLOT_COUNTS: frozenset[int] = frozenset({92, 96, 100}) # Zpětná kompatibilita ve starších importech OTE_EXPECTED_SLOTS = OTE_TYPICAL_SLOTS def ote_prague_day_slots_look_complete(slot_count: int) -> bool: """True, pokud počet řádků odpovídá celému obchodnímu dni OTE (včetně DST).""" return slot_count in OTE_FULL_DAY_SLOT_COUNTS OTE_URL = ( "https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/" "@@chart-data?report_date={date}&time_resolution=PT15M" ) def _is_retryable_status(status_code: int) -> bool: return status_code in {408, 425, 429, 500, 502, 503, 504} async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]: url = OTE_URL.format(date=date_str) timeout = httpx.Timeout(connect=10.0, read=45.0, write=10.0, pool=10.0) headers = { "User-Agent": "Mozilla/5.0 (compatible; EMS/1.0; +https://www.ote-cr.cz)", "Accept": "application/json, text/plain, */*", "Accept-Language": "cs-CZ,cs;q=0.9,en;q=0.8", "Connection": "keep-alive", } max_attempts = 4 backoff_s = 1.0 last_err: str | None = None async with httpx.AsyncClient( timeout=timeout, headers=headers, follow_redirects=True, ) as client: for attempt in range(1, max_attempts + 1): try: logger.info("OTE fetch %s attempt %s/%s", date_str, attempt, max_attempts) resp = await client.get(url) if _is_retryable_status(resp.status_code) and attempt < max_attempts: last_err = f"http_status:{resp.status_code}" logger.warning( "OTE temporary HTTP %s for %s (attempt %s/%s), retrying", resp.status_code, date_str, attempt, max_attempts, ) await asyncio.sleep(backoff_s) backoff_s *= 2.0 continue resp.raise_for_status() return resp.json(), None except (httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout) as e: last_err = f"timeout_or_connect:{e.__class__.__name__}" if attempt < max_attempts: logger.warning( "OTE request failed for %s (%s), retrying %s/%s", date_str, e.__class__.__name__, attempt, max_attempts, ) await asyncio.sleep(backoff_s) backoff_s *= 2.0 continue logger.error("OTE fetch failed for %s after retries: %s", date_str, e) except httpx.HTTPStatusError as e: code = e.response.status_code if e.response is not None else "unknown" last_err = f"http_status:{code}" logger.error("OTE HTTP error for %s: %s", date_str, code) break except json.JSONDecodeError as e: last_err = f"invalid_json:{e.__class__.__name__}" logger.error("OTE invalid JSON for %s: %s", date_str, e) break except Exception as e: last_err = f"unexpected:{e.__class__.__name__}" logger.error("OTE fetch unexpected error for %s: %s", date_str, e) break return None, last_err OTE_TZ = ZoneInfo("Europe/Prague") async def _apply_ote_json_to_db(conn, payload: dict) -> int: """Zapíše JSON z OTE přes ems.fn_ote_import_from_json; vrátí ROW_COUNT z funkce.""" settings = get_settings() eur_czk = float(settings.eur_czk_rate) n = await conn.fetchval( "SELECT ems.fn_ote_import_from_json($1::jsonb, $2)", json.dumps(payload), eur_czk, ) return int(n) async def count_ote_slots_prague_day(conn, target_day: date) -> int: """Počet řádků OTE_CZ pro kalendářní den v Europe/Prague (plný den 92/96/100).""" stats = await fetch_json( conn, "select ems.fn_ote_day_slot_stats_prague($1::date)", target_day, ) if not isinstance(stats, dict): stats = json.loads(stats) return int(stats.get("count") or 0) async def import_ote_prices_for_day( conn, target_day: date, ) -> tuple[int, str, float, str | None]: """ Stáhne OTE pro jeden konkrétní report_date a uloží přes fn_ote_import_from_json. Stejný význam návratové hodnoty jako import_ote_prices(). """ day_str = target_day.isoformat() payload, fetch_error = await _fetch_ote_json(day_str) if payload is None: return -1, day_str, 0.0, fetch_error or "fetch_failed" try: n = await _apply_ote_json_to_db(conn, payload) stats_after = await fetch_json( conn, "select ems.fn_ote_day_slot_stats_prague($1::date)", target_day, ) if not isinstance(stats_after, dict): stats_after = json.loads(stats_after) first_price = stats_after.get("first_price") n_imported = int(stats_after.get("count") or 0) if not ote_prague_day_slots_look_complete(n_imported): logger.warning( "OTE: %s slotů pro %s (plný den = jedna z %s; jinak neúplná data)", n_imported, day_str, sorted(OTE_FULL_DAY_SLOT_COUNTS), ) logger.info( "OTE import OK: %s slotů (upsert) pro %s, první cena %.4f Kč/kWh", n, day_str, float(first_price or 0), ) return n, day_str, float(first_price or 0.0), None except Exception as e: detail = str(e).strip() or e.__class__.__name__ logger.error("OTE import DB error pro %s: %s", day_str, detail, exc_info=True) if ( "OTE price dataLine not found" in detail or "OTE price series:" in detail or "cannot parse date from graph.title" in detail ): await notify_ote_import_format_changed( conn, report_date=day_str, error_detail=detail, url=OTE_URL.format(date=day_str), ) short = detail[:200] if len(detail) > 200 else detail return -1, day_str, 0.0, f"db_import:{e.__class__.__name__}: {short}" @dataclass class OteBackfillStats: start_date: date end_date: date days_checked: int = 0 days_imported: int = 0 days_skipped_complete: int = 0 days_skipped_future: int = 0 days_failed: int = 0 failures: list[tuple[str, str]] = field(default_factory=list) async def backfill_ote_prices( conn, *, start_date: date, end_date: date, only_missing: bool = True, pause_between_days_s: float = 0.35, max_failures_logged: int = 80, ) -> OteBackfillStats: """ Projde rozsah [start_date, end_date] (kalendář Prague) a doplní chybějící dny z OTE. only_missing: přeskočí dny, kde už je „plný“ počet slotů (92/96/100 dle OTE). pause_between_days_s: krátká pauza mezi HTTP požadavky (ohleduplnost k OTE). """ stats = OteBackfillStats(start_date=start_date, end_date=end_date) today_prague = datetime.now(OTE_TZ).date() d = start_date while d <= end_date: stats.days_checked += 1 if d > today_prague: stats.days_skipped_future += 1 d += timedelta(days=1) continue slots = await count_ote_slots_prague_day(conn, d) if only_missing and ote_prague_day_slots_look_complete(slots): stats.days_skipped_complete += 1 d += timedelta(days=1) continue n, day_str, _, err = await import_ote_prices_for_day(conn, d) if n < 0: stats.days_failed += 1 if len(stats.failures) < max_failures_logged: stats.failures.append((day_str, err or "unknown")) else: stats.days_imported += 1 if pause_between_days_s > 0: await asyncio.sleep(pause_between_days_s) d += timedelta(days=1) return stats async def import_ote_prices( db, site_id: int | None = None, target_date: date | None = None, ) -> tuple[int, str, float, str | None]: """ Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json. Data jsou globální (market_interval_price); site_id je volitelné jen pro výběr „dnes/zítra“ při target_date=None (timezone lokality vs. výchozí Europe/Prague). Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code) (-1, datum_str, 0.0, error_code) při chybě """ if site_id is not None: row = await db.fetchrow( "select timezone from ems.vw_site_directory where id = $1", site_id ) if row is None: logger.error("OTE import: site id=%s nenalezen", site_id) return -1, "", 0.0, "site_not_found" site_tz = ZoneInfo(row["timezone"] or "Europe/Prague") else: site_tz = OTE_TZ now_site = datetime.now(site_tz) today_site = now_site.date() tomorrow_site = today_site + timedelta(days=1) candidate_days = [target_date] if target_date is not None else [tomorrow_site, today_site] payload: dict | None = None fetch_error: str | None = None target_day = candidate_days[0] # Varování před 13:30 CET při implicitním (zítra) importu. if target_date is None: now_cet = datetime.now(ZoneInfo("Europe/Prague")) if now_cet.hour < 13 or (now_cet.hour == 13 and now_cet.minute < 30): logger.warning( "OTE: ceny pro %s nemusí být dostupné (před 13:30 CET), použiji fallback na dnešek", tomorrow_site.isoformat(), ) for day in candidate_days: day_str = day.isoformat() payload, fetch_error = await _fetch_ote_json(day_str) if payload is not None: target_day = day break logger.warning("OTE fetch selhal pro %s (err=%s)", day_str, fetch_error) if payload is None: return -1, candidate_days[0].isoformat(), 0.0, fetch_error or "fetch_failed" date_str = target_day.isoformat() try: n = await _apply_ote_json_to_db(db, payload) stats_after = await fetch_json( db, "select ems.fn_ote_day_slot_stats_prague($1::date)", target_day, ) if not isinstance(stats_after, dict): stats_after = json.loads(stats_after) first_price = stats_after.get("first_price") n_imported = int(stats_after.get("count") or 0) incomplete = not ote_prague_day_slots_look_complete(n_imported or 0) if incomplete: now_p = datetime.now(ZoneInfo("Europe/Prague")) tomorrow_p = (now_p + timedelta(days=1)).date() # Stejná logika jako dashboard: neúplný D+1 před 14:30 je očekávaný if not ( target_day == tomorrow_p and (now_p.hour, now_p.minute) < (14, 30) ): logger.warning( "OTE: %s slotů pro %s (plný den = jedna z %s)", n_imported, date_str, sorted(OTE_FULL_DAY_SLOT_COUNTS), ) logger.info( "OTE import OK: %s slotů pro %s, první cena %.4f Kč/kWh", n, date_str, float(first_price or 0), ) return int(n), date_str, float(first_price or 0.0), None except Exception as e: detail = str(e).strip() or e.__class__.__name__ logger.error("OTE import DB error: %s", detail, exc_info=True) if ( "OTE price dataLine not found" in detail or "OTE price series:" in detail or "cannot parse date from graph.title" in detail ): await notify_ote_import_format_changed( db, report_date=date_str, error_detail=detail, url=OTE_URL.format(date=date_str), ) short = detail[:200] if len(detail) > 200 else detail return -1, date_str, 0.0, f"db_import:{e.__class__.__name__}: {short}"