Files
ems/scripts/harness/battery_upgrade_study.py
Dusan Vojacek 53e9afb513 Investiční studie v2: POTENCIÁLNÍ výroba místo telemetrie (škrcení 81 %!)
Klíčová oprava (postřeh uživatele): při sell<0 lokality škrtí výrobu
(reg 340 / GEN cutoff) — telemetrie ukázala 357 kWh, predikce 1879 kWh
(96 % minut v derating). Studie nyní používají max(skutečnost, kanonický
forecast per pole) v sell<0 slotech.

Nové výsledky (horní meze): BA81 32 kWh +35/+46 Kč/den (výkon 6.25/12 kW);
KV1 25 kWh +20/+22 Kč/den (stará smlouva); HU1 fixní: 75 Kč/den bez sdílení,
149 Kč/den s EDC sdílením @1.5 Kč distribuce (sdílitelných ~49 kWh/den!);
HU1 spot: 372 Kč/den, sdílení +0. + docs/onboarding-wallbox-tc-2026-06.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 17:24:49 +02:00

287 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Studie navýšení kapacity baterie (BA81 → 32 kWh, KV1 → 25 kWh) nad REÁLNÝMI daty.
Metoda: perfect-hindsight MILP (rozšířený oracle z economics_report) nad skutečnou
PV výrobou, spotřebou a efektivními cenami z ems.audit_interval — po TÝDENNÍCH
oknech s navazujícím SoC mezi okny (zachytí vícedenní arbitráž). Pro každou
lokalitu tři konfigurace:
1. current — stávající baterie z DB,
2. upgrade/keepP — cílová kapacita, výkon beze změny (střídačem limitované),
3. upgrade/0.5C — cílová kapacita, výkon 0.5C jako dnes (BMS limitované).
Δ oracle cashflow = HORNÍ MEZ přínosu (dokonalá předpověď). Reálný plánovač
zachytí část — viz capture ratio v reportu. Extrapolace na rok je poctivě
označená (data jaro/léto: vysoká PV, záporné ceny → zima přinese méně).
BA81 specifikum: pole B (GEN mikroinvertory) lze fyzicky odpojit (gen cutoff)
→ model dovoluje shed PV B (jinak by nucený export při sell<0 zkresloval).
Spouštět odkudkoli: EMS_DB_DSN=… python3 scripts/harness/battery_upgrade_study.py
"""
from __future__ import annotations
import asyncio
import os
import sys
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import asyncpg
import pulp
PRAGUE = ZoneInfo("Europe/Prague")
INTERVAL_H = 0.25
STUDY = [
# (site_code, target_usable_kwh)
("BA81", 32.0),
("KV1", 25.0),
]
WINDOW_DAYS = 7
@dataclass
class Bat:
usable_wh: float
min_pct: float
max_pct: float
eff_c: float
eff_d: float
deg_czk_kwh: float
max_charge_w: float
max_discharge_w: float
@property
def min_wh(self) -> float:
return self.min_pct / 100.0 * self.usable_wh
@property
def max_wh(self) -> float:
return self.max_pct / 100.0 * self.usable_wh
@dataclass
class Slot:
ts: datetime
buy: float
sell: float
pv_a_wh: float
pv_b_wh: float
load_wh: float
def _dsn() -> str:
return os.environ.get(
"EMS_DB_DSN",
"postgresql://ems_user:dev_password@10.200.200.1:5432/ems",
)
async def _load_site(conn: asyncpg.Connection, code: str):
row = await conn.fetchrow(
"""
select s.id as site_id, b.usable_capacity_wh, b.min_soc_percent, b.max_soc_percent,
b.charge_efficiency, b.discharge_efficiency, b.degradation_cost_czk_kwh,
b.max_charge_c_rate, b.bms_max_charge_w, b.bms_max_discharge_w,
g.max_import_power_w, g.max_export_power_w, g.block_export_on_negative_sell,
coalesce(i.deye_gen_microinverter_cutoff_enabled, false) as gen_cutoff,
i.max_ac_output_w, i.max_battery_charge_w as inv_bat_charge_w,
i.max_battery_discharge_w as inv_bat_discharge_w
from ems.asset_battery b
join ems.site s on s.id = b.site_id
join ems.asset_inverter i on i.id = b.inverter_id
left join ems.site_grid_connection g on g.site_id = s.id
where s.code = $1
""",
code,
)
if row is None:
raise SystemExit(f"site {code} nenalezen")
inv_chg = float(row["inv_bat_charge_w"] or 10**9)
inv_dis = float(row["inv_bat_discharge_w"] or 10**9)
bat = Bat(
usable_wh=float(row["usable_capacity_wh"]),
min_pct=float(row["min_soc_percent"]),
max_pct=float(row["max_soc_percent"]),
eff_c=float(row["charge_efficiency"]),
eff_d=float(row["discharge_efficiency"]),
deg_czk_kwh=float(row["degradation_cost_czk_kwh"]),
max_charge_w=min(float(row["bms_max_charge_w"]), inv_chg),
max_discharge_w=min(float(row["bms_max_discharge_w"]), inv_dis),
)
grid = {
"imp_w": float(row["max_import_power_w"]),
"exp_w": float(row["max_export_power_w"]),
"block_neg": bool(row["block_export_on_negative_sell"]),
"pv_b_shed": bool(row["gen_cutoff"]),
"c_rate": float(row["max_charge_c_rate"] or 0.5),
"ac_w": float(row["max_ac_output_w"] or 10**9), # strop AC výstupu hybridu
"inv_bat_w": min(inv_chg, inv_dis), # strop bateriové cesty střídače
}
return int(row["site_id"]), bat, grid
async def _load_slots(conn: asyncpg.Connection, site_id: int) -> list[Slot]:
rows = await conn.fetch(
"""
select a.interval_start,
p.effective_buy_price_czk_kwh as buy,
p.effective_sell_price_czk_kwh as sell,
-- POTENCIÁL: při sell<0 lokalita škrtí výrobu (reg 340 / GEN cutoff),
-- telemetrie ji nevidí → použij max(skutečnost, predikce) per pole.
case when p.effective_sell_price_czk_kwh < 0
then greatest(coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0),
coalesce(fc.fc_a_wh, 0))
else greatest(0, coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0))
end as pv_a,
case when p.effective_sell_price_czk_kwh < 0
then greatest(coalesce(a.pv_b_production_wh,0), coalesce(fc.fc_b_wh, 0))
else coalesce(a.pv_b_production_wh,0)
end as pv_b,
coalesce(a.actual_load_consumption_wh,0) as load
from ems.audit_interval a
join ems.vw_site_effective_price p
on p.site_id = a.site_id and p.interval_start = a.interval_start
left join lateral (
select
sum(power_w) filter (where pa.controllable) * 0.25 as fc_a_wh,
sum(power_w) filter (where not pa.controllable) * 0.25 as fc_b_wh
from (
select distinct on (fpr.pv_array_id) fpi2.power_w, fpr.pv_array_id
from ems.forecast_pv_interval fpi2
join ems.forecast_pv_run fpr on fpr.id = fpi2.run_id
where fpi2.interval_start = a.interval_start
order by fpr.pv_array_id, fpr.created_at desc
) x
join ems.asset_pv_array pa on pa.id = x.pv_array_id and pa.site_id = a.site_id
) fc on true
where a.site_id = $1 and a.actual_load_consumption_wh is not null
order by a.interval_start
""",
site_id,
)
return [
Slot(r["interval_start"], float(r["buy"]), float(r["sell"]),
float(r["pv_a"]), float(r["pv_b"]), float(r["load"]))
for r in rows
]
def solve_window(slots: list[Slot], bat: Bat, grid: dict, soc0: float) -> tuple[float, float]:
"""Vrátí (cash+deg náklad okna v Kč, koncový SoC Wh). Min = lepší."""
n = len(slots)
prob = pulp.LpProblem("w", pulp.LpMinimize)
mc = bat.max_charge_w * INTERVAL_H
md = bat.max_discharge_w * INTERVAL_H
mi = grid["imp_w"] * INTERVAL_H
me = grid["exp_w"] * INTERVAL_H
gi = [pulp.LpVariable(f"gi{t}", 0, mi) for t in range(n)]
ge = [pulp.LpVariable(f"ge{t}", 0, me) for t in range(n)]
bc = [pulp.LpVariable(f"bc{t}", 0, mc) for t in range(n)]
bd = [pulp.LpVariable(f"bd{t}", 0, md) for t in range(n)]
pa = [pulp.LpVariable(f"pa{t}", 0, slots[t].pv_a_wh) for t in range(n)]
pb = [pulp.LpVariable(f"pb{t}", 0, slots[t].pv_b_wh) for t in range(n)]
soc = [pulp.LpVariable(f"s{t}", bat.min_wh, bat.max_wh) for t in range(n)]
y = [pulp.LpVariable(f"y{t}", cat="Binary") for t in range(n)]
for t in range(n):
s = slots[t]
pb_used = pb[t] if grid["pv_b_shed"] else s.pv_b_wh
prob += pa[t] + pb_used + gi[t] + bd[t] == s.load_wh + bc[t] + ge[t]
prev = soc0 if t == 0 else soc[t - 1]
prob += soc[t] == prev + bc[t] * bat.eff_c - bd[t] / bat.eff_d
prob += gi[t] <= mi * y[t]
prob += ge[t] <= me * (1 - y[t])
# AC strop hybridního střídače: export jde přes Deye kromě pole B (GEN)
prob += ge[t] - (pb[t] if grid["pv_b_shed"] else s.pv_b_wh) <= grid["ac_w"] * INTERVAL_H
if s.sell < 0 and grid["block_neg"]:
prob += ge[t] == 0
if s.buy < 0:
prob += ge[t] == 0
avg_buy = sum(s.buy for s in slots) / n
cash = pulp.lpSum(gi[t] / 1000 * slots[t].buy - ge[t] / 1000 * slots[t].sell for t in range(n))
deg = pulp.lpSum(0.5 * (bc[t] + bd[t]) / 1000 * bat.deg_czk_kwh for t in range(n))
prob += cash + deg - soc[n - 1] / 1000 * max(0.0, avg_buy)
solver = pulp.HiGHS_CMD(msg=False, timeLimit=30) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False)
prob.solve(solver)
if pulp.LpStatus[prob.status] != "Optimal":
raise RuntimeError(pulp.LpStatus[prob.status])
val = float(pulp.value(cash)) + float(pulp.value(deg))
return val, float(soc[n - 1].value())
def run_config(slots: list[Slot], bat: Bat, grid: dict) -> tuple[float, int]:
"""Sekvenčně po oknech, navazující SoC. Vrátí (Σ náklad Kč, počet dní)."""
total = 0.0
soc = bat.min_wh + 0.3 * (bat.max_wh - bat.min_wh)
days = 0
i = 0
while i < len(slots):
win = slots[i : i + WINDOW_DAYS * 96]
if len(win) < 96: # neúplný zbytek
break
cost, soc = solve_window(win, bat, grid, soc)
total += cost
days += len(win) // 96
i += len(win)
return total, days
async def main() -> None:
conn = await asyncpg.connect(_dsn())
try:
print("# Studie navýšení baterie — perfect-hindsight nad reálnými daty (audit_interval)")
print(f"# Okna {WINDOW_DAYS} dní s navazujícím SoC; Δ = horní mez ročního přínosu\n")
for code, target_kwh in STUDY:
site_id, bat_cur, grid = await _load_site(conn, code)
slots = await _load_slots(conn, site_id)
if not slots:
print(f"{code}: žádná audit data, přeskočeno")
continue
d0, d1 = slots[0].ts.date(), slots[-1].ts.date()
bat_up_keep = replace(bat_cur, usable_wh=target_kwh * 1000.0)
p_c = min(target_kwh * 1000.0 * grid["c_rate"], grid["ac_w"])
bat_up_c = replace(
bat_cur,
usable_wh=target_kwh * 1000.0,
max_charge_w=p_c,
max_discharge_w=p_c,
)
configs = [
(f"current {bat_cur.usable_wh/1000:.1f} kWh / {bat_cur.max_charge_w/1000:.2f} kW", bat_cur),
(f"upgrade {target_kwh:.0f} kWh / {bat_up_keep.max_charge_w/1000:.2f} kW (výkon beze změny)", bat_up_keep),
(f"upgrade {target_kwh:.0f} kWh / {bat_up_c.max_charge_w/1000:.2f} kW (0.5C, cap AC stridace)", bat_up_c),
]
print(f"## {code} ({d0}{d1}; block_neg={grid['block_neg']}, pv_b_shed={grid['pv_b_shed']}, export cap {grid['exp_w']/1000:.0f} kW)")
base_cost = None
base_days = None
for label, bat in configs:
cost, days = run_config(slots, bat, grid)
if base_cost is None:
base_cost, base_days = cost, days
print(f" {label:<55} {cost:>10.0f} Kč / {days} dní")
else:
d = base_cost - cost
per_day = d / max(1, days)
print(
f" {label:<55} {cost:>10.0f} Kč Δ {d:+.0f}"
f"({per_day:+.2f} Kč/den; rok ~{per_day*365*0.6:.0f}{per_day*365:.0f} Kč)"
)
print()
print("Pozn.: rok = Kč/den × 365; dolní odhad ×0.6 (zima: méně PV, menší spready).")
print("Horní mez (dokonalá předpověď) — reálný plánovač zachytí typicky 7090 %.")
finally:
await conn.close()
if __name__ == "__main__":
sys.exit(asyncio.run(main()))