Fáze 0: ekonomický regresní harness plánovače

- scripts/harness/extract_fixtures.py: extrakce vstupů solveru
  (fn_planning_site_context + fn_load_planning_slots_full) do JSON fixtures
- backend/tests/test_golden_replay.py: golden gate — replay fixtures přes
  solve_dispatch_two_pass, bit-perfektní diff proti snapshotům (GOLDEN_UPDATE=1
  pro vědomou regeneraci); 4 scénáře: home-01 neg-sell extrém / normal, BA81, KV1
- scripts/harness/economics_report.py: actual (audit_interval) vs oracle MILP
  (perfect hindsight, čistá ekonomika bez heuristických penalt), SoC-adjusted

Baseline home-01 2026-05-12..06-09: GAP 2185 Kč / 29 dní (~27 %).
Známý stav: 4/124 testů test_planning_dispatch_milp.py failuje už na main.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-11 10:48:13 +02:00
parent edc8ae9774
commit 484f1f85fc
12 changed files with 35461 additions and 0 deletions

78
scripts/harness/README.md Normal file
View File

@@ -0,0 +1,78 @@
# Ekonomický regresní harness (Fáze 0 — „Čistý plánovač“)
Nástroje pro objektivní měření ekonomiky plánovače a regresní bránu pro jeho
dekompozici. Kontext a fáze: viz strategie refaktoru (Fáze 04) v paměti
projektu / plánu „Čistý plánovač“.
## Komponenty
| Soubor | Účel |
|--------|------|
| `extract_fixtures.py` | Stáhne z EMS DB kompletní vstupy plánovače (context + sloty `fn_load_planning_slots_full`) pro zadanou site a pražský den → JSON fixture do `backend/tests/golden/fixtures/`. |
| `economics_report.py` | Pro rozsah dní spočítá skutečný cashflow (audit_interval) vs. **oracle LP** (perfect hindsight, čistý model bez heuristických penalt) → tabulka GAP per den. |
| `../../backend/tests/test_golden_replay.py` | Pytest gate: replay fixtures přes `solve_dispatch_two_pass`, porovnání s golden snapshoty v `backend/tests/golden/snapshots/`. |
## Připojení k DB
Všechny skripty čtou DSN v pořadí `--dsn` > `EMS_DB_DSN` > `DB_HOST`/`DB_PORT`/
`DB_USER`/`DB_PASSWORD`/`DB_NAME` (default lokální docker `127.0.0.1:5432/ems`).
Čtou pouze (SELECT) — bezpečné proti produkci.
```bash
export EMS_DB_DSN="postgresql://ems_user:***@10.200.200.1:5432/ems"
```
## Golden replay gate (regresní brána dekompozice)
```bash
cd backend
python3 -m pytest tests/test_golden_replay.py -q # ověření (identity refactor → musí projít)
GOLDEN_UPDATE=1 python3 -m pytest tests/test_golden_replay.py -q # vědomá změna chování → regenerace
```
Pravidla:
- **Fáze 1 (dekompozice)**: snapshoty se NIKDY neregenerují — výstup musí být bitově shodný
(floaty zaokrouhleny na 4 d.m.).
- **Fáze 2/3 (změny ekonomiky)**: regenerace snapshotů je povolená pouze s odůvodněním
v commit message a se zlepšeným/nezhoršeným GAPem v `economics_report.py`.
- Fixtures jsou zmrazené vstupy z reálné DB (konfigurace k datu extrakce, EV sessions
vynulovány, `operating_mode=AUTO`) — deterministické, bez DB při běhu testu.
### Přidání fixture
```bash
python3 scripts/harness/extract_fixtures.py --site-code home-01 --day 2026-06-07 --tag neg_sell_deep
cd backend && GOLDEN_UPDATE=1 python3 -m pytest tests/test_golden_replay.py -q
```
Pokryté scénáře (v4 fixtures): home-01 hluboký neg-sell (sell 1.57, buy 0.89),
home-01 běžný spot den, BA81 běžný den, KV1 fixní nákup. Při změnách heuristik
přidávej scénář, který změnu pokrývá.
## Ekonomický report (metrika „kolik necháváme na stole“)
```bash
python3 scripts/harness/economics_report.py --site-code home-01 --from 2026-05-12 --to 2026-06-09
```
- **actual** = skutečný cashflow dne z auditu (import × eff. buy export × eff. sell),
- **oracle** = dolní mez: čistý MILP se skutečnou PV/spotřebou/cenami (perfect foresight),
- obojí **SoC-adjusted**: koncový SoC oceněn průměrnou denní nákupní cenou, aby den
„nevyhrával“ vybitím baterie,
- **gap = actual oracle** = chyba forecastu + neefektivita dispatche. Oracle je
nedosažitelná dolní mez; sleduj TREND gapu (před/po změně plánovače), ne absolutní nulu.
Vyloučeno z obou stran: zelený bonus PV B (nezávislý na dispatch rozhodnutích),
přesouvání EV/TČ zátěže (spotřeba je brána jako fixní).
## Známý stav k 2026-06-11 (baseline)
- `tests/test_planning_dispatch_milp.py`: **4 ze 124 testů failují už na main**
(`test_future_neg_buy_evening_export_at_high_soc_relaxed_prep`,
`test_grid_charge_respects_import_and_battery_caps`,
`test_morning_exports_pv_when_cushion_ok`,
`test_evening_reserve_soc_near_reserve_after_discharge`) — všechny z neg-sell/prep
heuristik. Nezakrývat regenerací; vyřešit ve Fázi 1/2.
- Golden snapshot home-01 neg-sell dne: `penalty_czk = 2119 Kč` při cashflow 163 Kč —
heuristické penalty v objective 13× převažují reálné peníze. To je kvantifikace
problému, který Fáze 2/3 odstraňují.

View File

@@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
Fáze 0 ekonomický report: skutečný provoz vs. oracle LP (perfect hindsight).
Pro každý pražský den v zadaném rozsahu spočítá:
1. ACTUAL skutečný cashflow z ems.audit_interval (import × eff. buy export × eff. sell),
2. ORACLE dolní mez nákladů: malý ČISTÝ MILP (jen reálné peníze: nákup prodej
degradace, žádné heuristické penalty) nad SKUTEČNOU PV výrobou,
SKUTEČNOU spotřebou a skutečnými cenami dne (perfect foresight),
3. GAP actual oracle, férově očištěno o rozdíl koncového SoC
(koncová energie oceněna průměrnou denní nákupní cenou).
GAP = forecast error + neefektivita dispatche. Oracle nelze v reálu dosáhnout
(zná budoucnost), ale trend GAPu je objektivní metrika „neekonomického provozu“
a regresní metr pro Fázi 2/3 (čistý plánovač). Oracle LP je zároveň zárodek
čistého jádra solveru.
Model oracle (15min sloty, Wh):
pv_a_used + pv_b + gi + bd = load + bc + ge (bilance sběrnice)
pv_a_used ≤ pv_a_actual (curtailment jen pole A)
soc[t] = soc[t-1] + bc·η_c bd/η_d (SoC dynamika)
min_soc ≤ soc ≤ soc_max; výkonové stropy baterie i sítě
sell < 0 ∧ block_export_on_negative_sell → ge = 0 (hard pravidlo KV1)
binárka: zákaz současného importu a exportu
objective: Σ gi·buy ge·sell + ½(bc+bd)·degradace soc[T]·avg_buy
Zjednodušení (dokumentovaná): spotřeba je fixní (EV/TČ se nepřesouvá),
zelený bonus PV B vyloučen z obou stran, konfigurace baterie = aktuální stav DB.
Použití:
EMS_DB_DSN=postgresql://ems_user:***@10.200.200.1:5432/ems \
python3 scripts/harness/economics_report.py --site-code home-01 --from 2026-05-12 --to 2026-06-09
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import asyncpg
import pulp
PRAGUE = ZoneInfo("Europe/Prague")
INTERVAL_H = 0.25
SLOT_WH_TO_W = 4 # Wh za 15 min → W
def _build_dsn(args: argparse.Namespace) -> str:
if args.dsn:
return args.dsn
env_dsn = os.environ.get("EMS_DB_DSN")
if env_dsn:
return env_dsn
host = os.environ.get("DB_HOST", "127.0.0.1")
port = os.environ.get("DB_PORT", "5432")
user = os.environ.get("DB_USER", "ems_user")
password = os.environ.get("DB_PASSWORD", "")
name = os.environ.get("DB_NAME", "ems")
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
@dataclass
class DaySlot:
interval_start: datetime
buy: float # Kč/kWh efektivní nákup
sell: float # Kč/kWh efektivní prodej
pv_a_wh: float # skutečná výroba pole A (curtailable)
pv_b_wh: float # skutečná výroba pole B (fixní)
load_wh: float # skutečná spotřeba (vč. EV/TČ)
grid_import_wh: float
grid_export_wh: float
soc_pct: float | None
@dataclass
class BatteryParams:
usable_wh: float
min_soc_wh: float
soc_max_wh: float
charge_eff: float
discharge_eff: float
max_charge_w: float
max_discharge_w: float
degradation_czk_kwh: float
async def _load_battery_and_grid(conn: asyncpg.Connection, site_id: int) -> tuple[BatteryParams, dict]:
raw = await conn.fetchval("select ems.fn_planning_site_context($1::int)", site_id)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
b = ctx["battery"]
bat = BatteryParams(
usable_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
soc_max_wh=float(b.get("planner_soc_max_wh", b["soc_max_wh"])),
charge_eff=float(b["charge_efficiency"]),
discharge_eff=float(b["discharge_efficiency"]),
max_charge_w=float(b["max_charge_power_w"]),
max_discharge_w=float(b["max_discharge_power_w"]),
degradation_czk_kwh=float(b["degradation_cost_czk_kwh"]),
)
g = ctx["grid"]
grid = {
"max_import_w": float(g["max_import_power_w"]),
"max_export_w": float(g["max_export_power_w"]),
"block_export_on_negative_sell": bool(g.get("block_export_on_negative_sell") or False),
}
return bat, grid
async def _load_day(
conn: asyncpg.Connection, site_id: int, day_start: datetime
) -> list[DaySlot]:
day_end = day_start + timedelta(days=1)
rows = await conn.fetch(
"""
select a.interval_start,
p.effective_buy_price_czk_kwh as buy,
p.effective_sell_price_czk_kwh as sell,
coalesce(a.actual_pv_production_wh, 0) as pv_wh,
coalesce(a.pv_b_production_wh, 0) as pv_b_wh,
coalesce(a.actual_load_consumption_wh, 0) as load_wh,
coalesce(a.actual_grid_import_wh, 0) as gi_wh,
coalesce(a.actual_grid_export_wh, 0) as ge_wh,
a.actual_battery_soc_pct as soc_pct
from ems.audit_interval a
join ems.vw_site_effective_price p
on p.site_id = a.site_id and p.interval_start = a.interval_start
where a.site_id = $1
and a.interval_start >= $2
and a.interval_start < $3
order by a.interval_start
""",
site_id,
day_start,
day_end,
)
return [
DaySlot(
interval_start=r["interval_start"],
buy=float(r["buy"]),
sell=float(r["sell"]),
pv_a_wh=max(0.0, float(r["pv_wh"]) - float(r["pv_b_wh"])),
pv_b_wh=float(r["pv_b_wh"]),
load_wh=float(r["load_wh"]),
grid_import_wh=float(r["gi_wh"]),
grid_export_wh=float(r["ge_wh"]),
soc_pct=float(r["soc_pct"]) if r["soc_pct"] is not None else None,
)
for r in rows
]
def _actual_cashflow_czk(slots: list[DaySlot]) -> float:
return sum(
s.grid_import_wh / 1000.0 * s.buy - s.grid_export_wh / 1000.0 * s.sell
for s in slots
)
def solve_oracle(
slots: list[DaySlot],
bat: BatteryParams,
grid: dict,
soc_start_wh: float,
terminal_value_czk_kwh: float,
) -> tuple[float, float]:
"""Vrátí (cash_czk, soc_end_wh) optimálního dispatche s perfektní znalostí dne."""
n = len(slots)
prob = pulp.LpProblem("oracle_day", pulp.LpMinimize)
max_chg_wh = bat.max_charge_w * INTERVAL_H
max_dis_wh = bat.max_discharge_w * INTERVAL_H
max_gi_wh = grid["max_import_w"] * INTERVAL_H
max_ge_wh = grid["max_export_w"] * INTERVAL_H
gi = [pulp.LpVariable(f"gi_{t}", 0, max_gi_wh) for t in range(n)]
ge = [pulp.LpVariable(f"ge_{t}", 0, max_ge_wh) for t in range(n)]
bc = [pulp.LpVariable(f"bc_{t}", 0, max_chg_wh) for t in range(n)]
bd = [pulp.LpVariable(f"bd_{t}", 0, max_dis_wh) for t in range(n)]
pa = [pulp.LpVariable(f"pa_{t}", 0, slots[t].pv_a_wh) for t in range(n)]
soc = [pulp.LpVariable(f"soc_{t}", bat.min_soc_wh, bat.soc_max_wh) for t in range(n)]
z_imp = [pulp.LpVariable(f"zi_{t}", cat="Binary") for t in range(n)]
for t in range(n):
s = slots[t]
# bilance sběrnice (Wh ve slotu)
prob += pa[t] + s.pv_b_wh + gi[t] + bd[t] == s.load_wh + bc[t] + ge[t]
# SoC dynamika
prev = soc_start_wh if t == 0 else soc[t - 1]
prob += soc[t] == prev + bc[t] * bat.charge_eff - bd[t] / bat.discharge_eff
# zákaz současného importu a exportu
prob += gi[t] <= max_gi_wh * z_imp[t]
prob += ge[t] <= max_ge_wh * (1 - z_imp[t])
# tvrdé pravidlo: záporný sell → žádný export (kde konfigurováno)
if s.sell < 0 and grid["block_export_on_negative_sell"]:
prob += ge[t] == 0
cash = pulp.lpSum(
gi[t] / 1000.0 * slots[t].buy - ge[t] / 1000.0 * slots[t].sell for t in range(n)
)
degradation = pulp.lpSum(
0.5 * (bc[t] + bd[t]) / 1000.0 * bat.degradation_czk_kwh for t in range(n)
)
terminal = soc[n - 1] / 1000.0 * terminal_value_czk_kwh
prob += cash + degradation - terminal
solver = pulp.HiGHS_CMD(msg=False) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False)
prob.solve(solver)
if pulp.LpStatus[prob.status] != "Optimal":
raise RuntimeError(f"Oracle LP není Optimal: {pulp.LpStatus[prob.status]}")
cash_val = sum(
gi[t].value() / 1000.0 * slots[t].buy - ge[t].value() / 1000.0 * slots[t].sell
for t in range(n)
)
return cash_val, float(soc[n - 1].value())
async def run_report(args: argparse.Namespace) -> None:
dsn = _build_dsn(args)
conn = await asyncpg.connect(dsn)
try:
site_row = await conn.fetchrow("select id from ems.site where code = $1", args.site_code)
if site_row is None:
raise SystemExit(f"Site code '{args.site_code}' nenalezen")
site_id = int(site_row["id"])
bat, grid = await _load_battery_and_grid(conn, site_id)
d_from = datetime.strptime(getattr(args, "from"), "%Y-%m-%d").replace(tzinfo=PRAGUE)
d_to = datetime.strptime(args.to, "%Y-%m-%d").replace(tzinfo=PRAGUE)
print(f"# Ekonomický report — {args.site_code} (site_id={site_id})")
print(f"# Okno: {getattr(args, 'from')}{args.to} (Prague dny), baterie {bat.usable_wh/1000:.1f} kWh")
print()
header = (
f"{'den':<11} {'actual':>9} {'oracle':>9} {'gap':>8} {'gap%':>6} "
f"{'soc0%':>5} {'socT_a%':>7} {'socT_o%':>7} {'avg_buy':>7}"
)
print(header)
print("-" * len(header))
tot_actual = tot_oracle = tot_gap = 0.0
days_ok = 0
day = d_from
while day <= d_to:
slots = await _load_day(conn, site_id, day)
if len(slots) < 90 or all(s.soc_pct is None for s in slots):
print(f"{day.date()!s:<11} — přeskočeno (slotů: {len(slots)})")
day += timedelta(days=1)
continue
soc0_pct = next(s.soc_pct for s in slots if s.soc_pct is not None)
socT_pct = next(s.soc_pct for s in reversed(slots) if s.soc_pct is not None)
soc_start_wh = soc0_pct / 100.0 * bat.usable_wh
soc_end_actual_wh = socT_pct / 100.0 * bat.usable_wh
avg_buy = sum(s.buy for s in slots) / len(slots)
actual_cash = _actual_cashflow_czk(slots)
oracle_cash, soc_end_oracle_wh = solve_oracle(slots, bat, grid, soc_start_wh, avg_buy)
# férové očištění: koncový SoC obou stran oceněn avg_buy dne
actual_adj = actual_cash - soc_end_actual_wh / 1000.0 * avg_buy
oracle_adj = oracle_cash - soc_end_oracle_wh / 1000.0 * avg_buy
gap = actual_adj - oracle_adj
gap_pct = (gap / abs(oracle_adj) * 100.0) if abs(oracle_adj) > 1e-6 else float("nan")
print(
f"{day.date()!s:<11} {actual_adj:>9.1f} {oracle_adj:>9.1f} {gap:>8.1f} {gap_pct:>5.0f}% "
f"{soc0_pct:>5.0f} {socT_pct:>7.0f} {soc_end_oracle_wh / bat.usable_wh * 100:>7.0f} {avg_buy:>7.2f}"
)
tot_actual += actual_adj
tot_oracle += oracle_adj
tot_gap += gap
days_ok += 1
day += timedelta(days=1)
print("-" * len(header))
if days_ok:
print(
f"{'CELKEM':<11} {tot_actual:>9.1f} {tot_oracle:>9.1f} {tot_gap:>8.1f}"
f" ({days_ok} dní; Kč, SoC-adjusted; gap = forecast error + neefektivita dispatche)"
)
else:
print("Žádný den s kompletním auditem.")
finally:
await conn.close()
def main() -> None:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--site-code", required=True)
p.add_argument("--from", required=True, help="YYYY-MM-DD (Prague)")
p.add_argument("--to", required=True, help="YYYY-MM-DD (Prague), včetně")
p.add_argument("--dsn", default=None)
args = p.parse_args()
asyncio.run(run_report(args))
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""
Fáze 0 ekonomický regresní harness: extrakce golden fixtures z EMS DB.
Pro zadanou lokalitu a pražský den stáhne KOMPLETNÍ vstupy plánovače:
- ems.fn_planning_site_context(site_id) → context jsonb (battery, grid, TČ, vozidla, TUV stats)
- ems.fn_load_planning_slots_full(...) → všechny sloupce slotů (ceny, forecast, masky, charge budget)
- SoC na začátku okna z ems.audit_interval (actual_battery_soc_pct)
a uloží je jako JSON fixture do backend/tests/golden/fixtures/. Fixtures jsou
vstupem replay runneru (test_golden_replay.py), který nad nimi spouští
solve_dispatch_two_pass a porovnává výstup s golden snapshotem.
Čtení z DB je read-only (SELECT). DSN: --dsn > EMS_DB_DSN > DB_HOST/DB_PORT/
DB_USER/DB_PASSWORD (default 127.0.0.1:5432/ems jako docker-compose).
Příklady:
python3 scripts/harness/extract_fixtures.py --site-code home-01 --day 2026-06-07 --tag neg_sell_deep
EMS_DB_DSN=postgresql://ems_user:***@10.200.200.1:5432/ems \
python3 scripts/harness/extract_fixtures.py --site-code KV1 --day 2026-06-09 --tag fixed_normal
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import asyncpg
PRAGUE = ZoneInfo("Europe/Prague")
REPO_ROOT = Path(__file__).resolve().parents[2]
DEFAULT_OUT_DIR = REPO_ROOT / "backend" / "tests" / "golden" / "fixtures"
FIXTURE_VERSION = 1
def _build_dsn(args: argparse.Namespace) -> str:
if args.dsn:
return args.dsn
env_dsn = os.environ.get("EMS_DB_DSN")
if env_dsn:
return env_dsn
host = os.environ.get("DB_HOST", "127.0.0.1")
port = os.environ.get("DB_PORT", "5432")
user = os.environ.get("DB_USER", "ems_user")
password = os.environ.get("DB_PASSWORD", "")
name = os.environ.get("DB_NAME", "ems")
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
def _json_default(obj: object) -> str:
if isinstance(obj, datetime):
return obj.isoformat()
return str(obj)
async def _fetch_site_id(conn: asyncpg.Connection, site_code: str) -> int:
row = await conn.fetchrow("select id from ems.site where code = $1", site_code)
if row is None:
raise SystemExit(f"Site code '{site_code}' nenalezen v ems.site")
return int(row["id"])
async def _fetch_context(conn: asyncpg.Connection, site_id: int) -> dict:
raw = await conn.fetchval("select ems.fn_planning_site_context($1::int)", site_id)
ctx = raw if isinstance(raw, dict) else json.loads(raw)
if ctx.get("error") == "unknown_site":
raise SystemExit(f"fn_planning_site_context: unknown_site pro id={site_id}")
return ctx
async def _fetch_soc_at(conn: asyncpg.Connection, site_id: int, at: datetime, usable_wh: float) -> float | None:
"""SoC (Wh) na začátku okna z audit_interval; None pokud audit chybí."""
row = await conn.fetchrow(
"""
select actual_battery_soc_pct
from ems.audit_interval
where site_id = $1 and interval_start = $2
""",
site_id,
at,
)
if row is None or row["actual_battery_soc_pct"] is None:
return None
return float(row["actual_battery_soc_pct"]) / 100.0 * usable_wh
async def _fetch_slots(
conn: asyncpg.Connection, site_id: int, from_dt: datetime, to_dt: datetime, soc_wh: float
) -> list[dict]:
rows = await conn.fetch(
"""
select slot_ord, interval_start, buy_price, sell_price, is_predicted_price,
pv_a_forecast_w, pv_b_forecast_w, load_baseline_w,
ev1_connected, ev2_connected, allow_charge, allow_discharge_export,
night_baseload_target_wh, night_baseload_buffer_wh, safety_soc_target_wh,
future_avoided_buy_czk_kwh, future_sell_opportunity_czk_kwh,
is_daytime_pv_surplus_slot,
charge_acquisition_buy_czk_kwh, charge_acquisition_cutoff_at,
min_buy_before_cutoff_czk_kwh, pv_charge_wh_ahead, neg_buy_wh_ahead,
grid_charge_suppressed_reason,
charge_target_wh, pre_window_wh, in_window_wh,
charge_slot_wh, charge_cum_wh, charge_layer, charge_slot_reason
from ems.fn_load_planning_slots_full(
$1::int, $2::timestamptz, $3::timestamptz, $4::numeric
)
""",
site_id,
from_dt,
to_dt,
soc_wh,
)
out: list[dict] = []
for r in rows:
d = dict(r)
for key, val in list(d.items()):
if isinstance(val, datetime):
d[key] = val.isoformat()
elif val is not None and type(val).__name__ == "Decimal":
d[key] = float(val)
out.append(d)
return out
async def extract(args: argparse.Namespace) -> Path:
dsn = _build_dsn(args)
conn = await asyncpg.connect(dsn)
try:
site_id = await _fetch_site_id(conn, args.site_code)
ctx = await _fetch_context(conn, site_id)
day = datetime.strptime(args.day, "%Y-%m-%d").replace(tzinfo=PRAGUE)
from_dt = day
to_dt = day + timedelta(hours=args.hours)
usable_wh = float(ctx["battery"]["usable_capacity_wh"])
soc_wh = await _fetch_soc_at(conn, site_id, from_dt, usable_wh)
soc_source = "audit_interval"
if soc_wh is None:
soc_wh = 0.5 * usable_wh
soc_source = "fallback_50pct"
slot_rows = await _fetch_slots(conn, site_id, from_dt, to_dt, soc_wh)
if not slot_rows:
raise SystemExit(
f"fn_load_planning_slots_full nevrátila žádné sloty pro {args.site_code} {args.day}"
)
# Determinismus replay:
# - SoC/TUV fixujeme do contextu (přepis aktuálních hodnot historickými / extrakčními),
# - otevřené EV sessions z doby extrakce nepatří k historickému oknu → vynulovat,
# - operating_mode fixně AUTO (plný solver, srovnatelnost napříč fixtures).
ctx["soc_wh"] = soc_wh
ctx["ev_sessions"] = []
ctx["operating_mode"] = "AUTO"
fixture = {
"fixture_version": FIXTURE_VERSION,
"meta": {
"site_id": site_id,
"site_code": args.site_code,
"prague_day": args.day,
"window_from": from_dt.isoformat(),
"window_to": to_dt.isoformat(),
"horizon_hours": args.hours,
"soc_wh": round(soc_wh, 1),
"soc_source": soc_source,
"tag": args.tag,
"extracted_at": datetime.now(tz=PRAGUE).isoformat(),
"dsn_host": dsn.split("@")[-1].split("/")[0] if "@" in dsn else "?",
"note": (
"Vstupy plánovače zmrazené k okamžiku extrakce (context = aktuální konfigurace, "
"sloty = fn_load_planning_slots_full nad historickými cenami/forecasty). "
"EV sessions vynulovány, operating_mode=AUTO."
),
},
"context_json": ctx,
"slot_rows": slot_rows,
}
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
name = f"{args.site_code}_{args.day}_{args.tag}.json".replace("/", "-")
out_path = out_dir / name
out_path.write_text(
json.dumps(fixture, ensure_ascii=False, indent=1, default=_json_default) + "\n",
encoding="utf-8",
)
print(
f"OK {out_path.relative_to(REPO_ROOT)}: {len(slot_rows)} slotů, "
f"soc={soc_wh:.0f} Wh ({soc_source}), "
f"buy {min(s['buy_price'] for s in slot_rows):.2f}..{max(s['buy_price'] for s in slot_rows):.2f}, "
f"sell {min(s['sell_price'] for s in slot_rows):.2f}..{max(s['sell_price'] for s in slot_rows):.2f} Kč/kWh"
)
return out_path
finally:
await conn.close()
def main() -> None:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--site-code", required=True, help="ems.site.code (home-01, BA81, KV1, …)")
p.add_argument("--day", required=True, help="Pražský den YYYY-MM-DD (začátek okna 00:00)")
p.add_argument("--hours", type=int, default=36, help="Délka okna v hodinách (default 36)")
p.add_argument("--tag", required=True, help="Krátký popis scénáře (neg_sell_deep, normal, …)")
p.add_argument("--dsn", default=None, help="postgresql:// DSN (jinak EMS_DB_DSN / DB_* env)")
p.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR), help="Cílový adresář fixtures")
args = p.parse_args()
asyncio.run(extract(args))
if __name__ == "__main__":
sys.exit(main())