#!/usr/bin/env python3 """ Fáze 0 – ekonomický regresní harness: extrakce golden fixtures z EMS DB. Pro zadanou lokalitu a pražský den stáhne KOMPLETNÍ vstupy plánovače: - ems.fn_planning_site_context(site_id) → context jsonb (battery, grid, TČ, vozidla, TUV stats) - ems.fn_load_planning_slots_full(...) → všechny sloupce slotů (ceny, forecast, masky, charge budget) - SoC na začátku okna z ems.audit_interval (actual_battery_soc_pct) a uloží je jako JSON fixture do backend/tests/golden/fixtures/. Fixtures jsou vstupem replay runneru (test_golden_replay.py), který nad nimi spouští solve_dispatch_two_pass a porovnává výstup s golden snapshotem. Čtení z DB je read-only (SELECT). DSN: --dsn > EMS_DB_DSN > DB_HOST/DB_PORT/ DB_USER/DB_PASSWORD (default 127.0.0.1:5432/ems jako docker-compose). Příklady: python3 scripts/harness/extract_fixtures.py --site-code home-01 --day 2026-06-07 --tag neg_sell_deep EMS_DB_DSN=postgresql://ems_user:***@10.200.200.1:5432/ems \ python3 scripts/harness/extract_fixtures.py --site-code KV1 --day 2026-06-09 --tag fixed_normal """ from __future__ import annotations import argparse import asyncio import json import os import sys from datetime import datetime, timedelta from pathlib import Path from zoneinfo import ZoneInfo import asyncpg PRAGUE = ZoneInfo("Europe/Prague") REPO_ROOT = Path(__file__).resolve().parents[2] DEFAULT_OUT_DIR = REPO_ROOT / "backend" / "tests" / "golden" / "fixtures" FIXTURE_VERSION = 1 def _build_dsn(args: argparse.Namespace) -> str: if args.dsn: return args.dsn env_dsn = os.environ.get("EMS_DB_DSN") if env_dsn: return env_dsn host = os.environ.get("DB_HOST", "127.0.0.1") port = os.environ.get("DB_PORT", "5432") user = os.environ.get("DB_USER", "ems_user") password = os.environ.get("DB_PASSWORD", "") name = os.environ.get("DB_NAME", "ems") return f"postgresql://{user}:{password}@{host}:{port}/{name}" def _json_default(obj: object) -> str: if isinstance(obj, datetime): return obj.isoformat() return str(obj) async def _fetch_site_id(conn: asyncpg.Connection, site_code: str) -> int: row = await conn.fetchrow("select id from ems.site where code = $1", site_code) if row is None: raise SystemExit(f"Site code '{site_code}' nenalezen v ems.site") return int(row["id"]) async def _fetch_context(conn: asyncpg.Connection, site_id: int) -> dict: raw = await conn.fetchval("select ems.fn_planning_site_context($1::int)", site_id) ctx = raw if isinstance(raw, dict) else json.loads(raw) if ctx.get("error") == "unknown_site": raise SystemExit(f"fn_planning_site_context: unknown_site pro id={site_id}") return ctx async def _fetch_soc_at(conn: asyncpg.Connection, site_id: int, at: datetime, usable_wh: float) -> float | None: """SoC (Wh) na začátku okna z audit_interval; None pokud audit chybí.""" row = await conn.fetchrow( """ select actual_battery_soc_pct from ems.audit_interval where site_id = $1 and interval_start = $2 """, site_id, at, ) if row is None or row["actual_battery_soc_pct"] is None: return None return float(row["actual_battery_soc_pct"]) / 100.0 * usable_wh async def _fetch_slots( conn: asyncpg.Connection, site_id: int, from_dt: datetime, to_dt: datetime, soc_wh: float ) -> list[dict]: rows = await conn.fetch( """ select slot_ord, interval_start, buy_price, sell_price, is_predicted_price, pv_a_forecast_w, pv_b_forecast_w, load_baseline_w, ev1_connected, ev2_connected, allow_charge, allow_discharge_export, night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh, future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh, is_daytime_pv_surplus_slot, charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at, min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead, grid_charge_suppressed_reason, charge_target_wh, pre_window_wh, in_window_wh, charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason from ems.fn_load_planning_slots_full( $1::int, $2::timestamptz, $3::timestamptz, $4::numeric ) """, site_id, from_dt, to_dt, soc_wh, ) out: list[dict] = [] for r in rows: d = dict(r) for key, val in list(d.items()): if isinstance(val, datetime): d[key] = val.isoformat() elif val is not None and type(val).__name__ == "Decimal": d[key] = float(val) out.append(d) return out async def extract(args: argparse.Namespace) -> Path: dsn = _build_dsn(args) conn = await asyncpg.connect(dsn) try: site_id = await _fetch_site_id(conn, args.site_code) ctx = await _fetch_context(conn, site_id) day = datetime.strptime(args.day, "%Y-%m-%d").replace(tzinfo=PRAGUE) from_dt = day to_dt = day + timedelta(hours=args.hours) usable_wh = float(ctx["battery"]["usable_capacity_wh"]) soc_wh = await _fetch_soc_at(conn, site_id, from_dt, usable_wh) soc_source = "audit_interval" if soc_wh is None: soc_wh = 0.5 * usable_wh soc_source = "fallback_50pct" slot_rows = await _fetch_slots(conn, site_id, from_dt, to_dt, soc_wh) if not slot_rows: raise SystemExit( f"fn_load_planning_slots_full nevrátila žádné sloty pro {args.site_code} {args.day}" ) # Determinismus replay: # - SoC/TUV fixujeme do contextu (přepis aktuálních hodnot historickými / extrakčními), # - otevřené EV sessions z doby extrakce nepatří k historickému oknu → vynulovat, # - operating_mode fixně AUTO (plný solver, srovnatelnost napříč fixtures). ctx["soc_wh"] = soc_wh ctx["ev_sessions"] = [] ctx["operating_mode"] = "AUTO" fixture = { "fixture_version": FIXTURE_VERSION, "meta": { "site_id": site_id, "site_code": args.site_code, "prague_day": args.day, "window_from": from_dt.isoformat(), "window_to": to_dt.isoformat(), "horizon_hours": args.hours, "soc_wh": round(soc_wh, 1), "soc_source": soc_source, "tag": args.tag, "extracted_at": datetime.now(tz=PRAGUE).isoformat(), "dsn_host": dsn.split("@")[-1].split("/")[0] if "@" in dsn else "?", "note": ( "Vstupy plánovače zmrazené k okamžiku extrakce (context = aktuální konfigurace, " "sloty = fn_load_planning_slots_full nad historickými cenami/forecasty). " "EV sessions vynulovány, operating_mode=AUTO." ), }, "context_json": ctx, "slot_rows": slot_rows, } out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) name = f"{args.site_code}_{args.day}_{args.tag}.json".replace("/", "-") out_path = out_dir / name out_path.write_text( json.dumps(fixture, ensure_ascii=False, indent=1, default=_json_default) + "\n", encoding="utf-8", ) print( f"OK {out_path.relative_to(REPO_ROOT)}: {len(slot_rows)} slotů, " f"soc={soc_wh:.0f} Wh ({soc_source}), " f"buy {min(s['buy_price'] for s in slot_rows):.2f}..{max(s['buy_price'] for s in slot_rows):.2f}, " f"sell {min(s['sell_price'] for s in slot_rows):.2f}..{max(s['sell_price'] for s in slot_rows):.2f} Kč/kWh" ) return out_path finally: await conn.close() def main() -> None: p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("--site-code", required=True, help="ems.site.code (home-01, BA81, KV1, …)") p.add_argument("--day", required=True, help="Pražský den YYYY-MM-DD (začátek okna 00:00)") p.add_argument("--hours", type=int, default=36, help="Délka okna v hodinách (default 36)") p.add_argument("--tag", required=True, help="Krátký popis scénáře (neg_sell_deep, normal, …)") p.add_argument("--dsn", default=None, help="postgresql:// DSN (jinak EMS_DB_DSN / DB_* env)") p.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR), help="Cílový adresář fixtures") args = p.parse_args() asyncio.run(extract(args)) if __name__ == "__main__": sys.exit(main())