Files
ems/scripts/harness/extract_fixtures.py
Dusan Vojacek 484f1f85fc Fáze 0: ekonomický regresní harness plánovače
- scripts/harness/extract_fixtures.py: extrakce vstupů solveru
  (fn_planning_site_context + fn_load_planning_slots_full) do JSON fixtures
- backend/tests/test_golden_replay.py: golden gate — replay fixtures přes
  solve_dispatch_two_pass, bit-perfektní diff proti snapshotům (GOLDEN_UPDATE=1
  pro vědomou regeneraci); 4 scénáře: home-01 neg-sell extrém / normal, BA81, KV1
- scripts/harness/economics_report.py: actual (audit_interval) vs oracle MILP
  (perfect hindsight, čistá ekonomika bez heuristických penalt), SoC-adjusted

Baseline home-01 2026-05-12..06-09: GAP 2185 Kč / 29 dní (~27 %).
Známý stav: 4/124 testů test_planning_dispatch_milp.py failuje už na main.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 10:48:13 +02:00

220 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Fáze 0 ekonomický regresní harness: extrakce golden fixtures z EMS DB.
Pro zadanou lokalitu a pražský den stáhne KOMPLETNÍ vstupy plánovače:
- ems.fn_planning_site_context(site_id) → context jsonb (battery, grid, TČ, vozidla, TUV stats)
- ems.fn_load_planning_slots_full(...) → všechny sloupce slotů (ceny, forecast, masky, charge budget)
- SoC na začátku okna z ems.audit_interval (actual_battery_soc_pct)
a uloží je jako JSON fixture do backend/tests/golden/fixtures/. Fixtures jsou
vstupem replay runneru (test_golden_replay.py), který nad nimi spouští
solve_dispatch_two_pass a porovnává výstup s golden snapshotem.
Čtení z DB je read-only (SELECT). DSN: --dsn > EMS_DB_DSN > DB_HOST/DB_PORT/
DB_USER/DB_PASSWORD (default 127.0.0.1:5432/ems jako docker-compose).
Příklady:
python3 scripts/harness/extract_fixtures.py --site-code home-01 --day 2026-06-07 --tag neg_sell_deep
EMS_DB_DSN=postgresql://ems_user:***@10.200.200.1:5432/ems \
python3 scripts/harness/extract_fixtures.py --site-code KV1 --day 2026-06-09 --tag fixed_normal
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import asyncpg
PRAGUE = ZoneInfo("Europe/Prague")
REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_OUT_DIR = REPO_ROOT / "backend" / "tests" / "golden" / "fixtures"
FIXTURE_VERSION = 1
def _build_dsn(args: argparse.Namespace) -> str:
if args.dsn:
return args.dsn
env_dsn = os.environ.get("EMS_DB_DSN")
if env_dsn:
return env_dsn
host = os.environ.get("DB_HOST", "127.0.0.1")
port = os.environ.get("DB_PORT", "5432")
user = os.environ.get("DB_USER", "ems_user")
password = os.environ.get("DB_PASSWORD", "")
name = os.environ.get("DB_NAME", "ems")
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
def _json_default(obj: object) -> str:
if isinstance(obj, datetime):
return obj.isoformat()
return str(obj)
async def _fetch_site_id(conn: asyncpg.Connection, site_code: str) -> int:
row = await conn.fetchrow("select id from ems.site where code = $1", site_code)
if row is None:
raise SystemExit(f"Site code '{site_code}' nenalezen v ems.site")
return int(row["id"])
async def _fetch_context(conn: asyncpg.Connection, site_id: int) -> dict:
raw = await conn.fetchval("select ems.fn_planning_site_context($1::int)", site_id)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise SystemExit(f"fn_planning_site_context: unknown_site pro id={site_id}")
return ctx
async def _fetch_soc_at(conn: asyncpg.Connection, site_id: int, at: datetime, usable_wh: float) -> float | None:
"""SoC (Wh) na začátku okna z audit_interval; None pokud audit chybí."""
row = await conn.fetchrow(
"""
select actual_battery_soc_pct
from ems.audit_interval
where site_id = $1 and interval_start = $2
""",
site_id,
at,
)
if row is None or row["actual_battery_soc_pct"] is None:
return None
return float(row["actual_battery_soc_pct"]) / 100.0 * usable_wh
async def _fetch_slots(
conn: asyncpg.Connection, site_id: int, from_dt: datetime, to_dt: datetime, soc_wh: float
) -> list[dict]:
rows = await conn.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason,
charge_target_wh, pre_window_wh, in_window_wh,
charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[dict] = []
for r in rows:
d = dict(r)
for key, val in list(d.items()):
if isinstance(val, datetime):
d[key] = val.isoformat()
elif val is not None and type(val).__name__ == "Decimal":
d[key] = float(val)
out.append(d)
return out
async def extract(args: argparse.Namespace) -> Path:
dsn = _build_dsn(args)
conn = await asyncpg.connect(dsn)
try:
site_id = await _fetch_site_id(conn, args.site_code)
ctx = await _fetch_context(conn, site_id)
day = datetime.strptime(args.day, "%Y-%m-%d").replace(tzinfo=PRAGUE)
from_dt = day
to_dt = day + timedelta(hours=args.hours)
usable_wh = float(ctx["battery"]["usable_capacity_wh"])
soc_wh = await _fetch_soc_at(conn, site_id, from_dt, usable_wh)
soc_source = "audit_interval"
if soc_wh is None:
soc_wh = 0.5 * usable_wh
soc_source = "fallback_50pct"
slot_rows = await _fetch_slots(conn, site_id, from_dt, to_dt, soc_wh)
if not slot_rows:
raise SystemExit(
f"fn_load_planning_slots_full nevrátila žádné sloty pro {args.site_code} {args.day}"
)
# Determinismus replay:
# - SoC/TUV fixujeme do contextu (přepis aktuálních hodnot historickými / extrakčními),
# - otevřené EV sessions z doby extrakce nepatří k historickému oknu → vynulovat,
# - operating_mode fixně AUTO (plný solver, srovnatelnost napříč fixtures).
ctx["soc_wh"] = soc_wh
ctx["ev_sessions"] = []
ctx["operating_mode"] = "AUTO"
fixture = {
"fixture_version": FIXTURE_VERSION,
"meta": {
"site_id": site_id,
"site_code": args.site_code,
"prague_day": args.day,
"window_from": from_dt.isoformat(),
"window_to": to_dt.isoformat(),
"horizon_hours": args.hours,
"soc_wh": round(soc_wh, 1),
"soc_source": soc_source,
"tag": args.tag,
"extracted_at": datetime.now(tz=PRAGUE).isoformat(),
"dsn_host": dsn.split("@")[-1].split("/")[0] if "@" in dsn else "?",
"note": (
"Vstupy plánovače zmrazené k okamžiku extrakce (context = aktuální konfigurace, "
"sloty = fn_load_planning_slots_full nad historickými cenami/forecasty). "
"EV sessions vynulovány, operating_mode=AUTO."
),
},
"context_json": ctx,
"slot_rows": slot_rows,
}
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
name = f"{args.site_code}_{args.day}_{args.tag}.json".replace("/", "-")
out_path = out_dir / name
out_path.write_text(
json.dumps(fixture, ensure_ascii=False, indent=1, default=_json_default) + "\n",
encoding="utf-8",
)
print(
f"OK {out_path.relative_to(REPO_ROOT)}: {len(slot_rows)} slotů, "
f"soc={soc_wh:.0f} Wh ({soc_source}), "
f"buy {min(s['buy_price'] for s in slot_rows):.2f}..{max(s['buy_price'] for s in slot_rows):.2f}, "
f"sell {min(s['sell_price'] for s in slot_rows):.2f}..{max(s['sell_price'] for s in slot_rows):.2f} Kč/kWh"
)
return out_path
finally:
await conn.close()
def main() -> None:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--site-code", required=True, help="ems.site.code (home-01, BA81, KV1, …)")
p.add_argument("--day", required=True, help="Pražský den YYYY-MM-DD (začátek okna 00:00)")
p.add_argument("--hours", type=int, default=36, help="Délka okna v hodinách (default 36)")
p.add_argument("--tag", required=True, help="Krátký popis scénáře (neg_sell_deep, normal, …)")
p.add_argument("--dsn", default=None, help="postgresql:// DSN (jinak EMS_DB_DSN / DB_* env)")
p.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR), help="Cílový adresář fixtures")
args = p.parse_args()
asyncio.run(extract(args))
if __name__ == "__main__":
sys.exit(main())