Files
ems/scripts/harness/battery_upgrade_study.py
Dusan Vojacek d47f5f8b87 Studie investic: navýšení baterií BA81/KV1 + HU1 BESS (perfect hindsight nad reálnými daty)
battery_upgrade_study.py: oracle MILP po týdenních oknech s navazujícím SoC,
plné limity (síť, BMS, bateriová cesta střídače, AC strop hybridu, GEN 5 kW
mimo AC strop, gen-cutoff shed pole B). Výsledky viz docstring/report.

hu1_bess_study.py: čistý BESS 128 kWh / 36 kW / AC 40 kW; fixní (BA81) vs
spot (site 5) ceny; EDC sdílecí kanál z BA81 neg-sell přebytku s citlivostí
na distribuci. Klíčové: spot nákup ~7× výnosnější než fixní; EDC sdílení
přidává málo (fixní) až nic (spot — neg buy levnější než distribuce).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 17:07:04 +02:00

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Studie navýšení kapacity baterie (BA81 → 32 kWh, KV1 → 25 kWh) nad REÁLNÝMI daty.
Metoda: perfect-hindsight MILP (rozšířený oracle z economics_report) nad skutečnou
PV výrobou, spotřebou a efektivními cenami z ems.audit_interval — po TÝDENNÍCH
oknech s navazujícím SoC mezi okny (zachytí vícedenní arbitráž). Pro každou
lokalitu tři konfigurace:
1. current — stávající baterie z DB,
2. upgrade/keepP — cílová kapacita, výkon beze změny (střídačem limitované),
3. upgrade/0.5C — cílová kapacita, výkon 0.5C jako dnes (BMS limitované).
Δ oracle cashflow = HORNÍ MEZ přínosu (dokonalá předpověď). Reálný plánovač
zachytí část — viz capture ratio v reportu. Extrapolace na rok je poctivě
označená (data jaro/léto: vysoká PV, záporné ceny → zima přinese méně).
BA81 specifikum: pole B (GEN mikroinvertory) lze fyzicky odpojit (gen cutoff)
→ model dovoluje shed PV B (jinak by nucený export při sell<0 zkresloval).
Spouštět odkudkoli: EMS_DB_DSN=… python3 scripts/harness/battery_upgrade_study.py
"""
from __future__ import annotations
import asyncio
import os
import sys
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import asyncpg
import pulp
PRAGUE = ZoneInfo("Europe/Prague")
INTERVAL_H = 0.25
STUDY = [
# (site_code, target_usable_kwh)
("BA81", 32.0),
("KV1", 25.0),
]
WINDOW_DAYS = 7
@dataclass
class Bat:
usable_wh: float
min_pct: float
max_pct: float
eff_c: float
eff_d: float
deg_czk_kwh: float
max_charge_w: float
max_discharge_w: float
@property
def min_wh(self) -> float:
return self.min_pct / 100.0 * self.usable_wh
@property
def max_wh(self) -> float:
return self.max_pct / 100.0 * self.usable_wh
@dataclass
class Slot:
ts: datetime
buy: float
sell: float
pv_a_wh: float
pv_b_wh: float
load_wh: float
def _dsn() -> str:
return os.environ.get(
"EMS_DB_DSN",
"postgresql://ems_user:dev_password@10.200.200.1:5432/ems",
)
async def _load_site(conn: asyncpg.Connection, code: str):
row = await conn.fetchrow(
"""
select s.id as site_id, b.usable_capacity_wh, b.min_soc_percent, b.max_soc_percent,
b.charge_efficiency, b.discharge_efficiency, b.degradation_cost_czk_kwh,
b.max_charge_c_rate, b.bms_max_charge_w, b.bms_max_discharge_w,
g.max_import_power_w, g.max_export_power_w, g.block_export_on_negative_sell,
coalesce(i.deye_gen_microinverter_cutoff_enabled, false) as gen_cutoff,
i.max_ac_output_w, i.max_battery_charge_w as inv_bat_charge_w,
i.max_battery_discharge_w as inv_bat_discharge_w
from ems.asset_battery b
join ems.site s on s.id = b.site_id
join ems.asset_inverter i on i.id = b.inverter_id
left join ems.site_grid_connection g on g.site_id = s.id
where s.code = $1
""",
code,
)
if row is None:
raise SystemExit(f"site {code} nenalezen")
inv_chg = float(row["inv_bat_charge_w"] or 10**9)
inv_dis = float(row["inv_bat_discharge_w"] or 10**9)
bat = Bat(
usable_wh=float(row["usable_capacity_wh"]),
min_pct=float(row["min_soc_percent"]),
max_pct=float(row["max_soc_percent"]),
eff_c=float(row["charge_efficiency"]),
eff_d=float(row["discharge_efficiency"]),
deg_czk_kwh=float(row["degradation_cost_czk_kwh"]),
max_charge_w=min(float(row["bms_max_charge_w"]), inv_chg),
max_discharge_w=min(float(row["bms_max_discharge_w"]), inv_dis),
)
grid = {
"imp_w": float(row["max_import_power_w"]),
"exp_w": float(row["max_export_power_w"]),
"block_neg": bool(row["block_export_on_negative_sell"]),
"pv_b_shed": bool(row["gen_cutoff"]),
"c_rate": float(row["max_charge_c_rate"] or 0.5),
"ac_w": float(row["max_ac_output_w"] or 10**9), # strop AC výstupu hybridu
"inv_bat_w": min(inv_chg, inv_dis), # strop bateriové cesty střídače
}
return int(row["site_id"]), bat, grid
async def _load_slots(conn: asyncpg.Connection, site_id: int) -> list[Slot]:
rows = await conn.fetch(
"""
select a.interval_start,
p.effective_buy_price_czk_kwh as buy,
p.effective_sell_price_czk_kwh as sell,
greatest(0, coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0)) as pv_a,
coalesce(a.pv_b_production_wh,0) as pv_b,
coalesce(a.actual_load_consumption_wh,0) as load
from ems.audit_interval a
join ems.vw_site_effective_price p
on p.site_id = a.site_id and p.interval_start = a.interval_start
where a.site_id = $1 and a.actual_load_consumption_wh is not null
order by a.interval_start
""",
site_id,
)
return [
Slot(r["interval_start"], float(r["buy"]), float(r["sell"]),
float(r["pv_a"]), float(r["pv_b"]), float(r["load"]))
for r in rows
]
def solve_window(slots: list[Slot], bat: Bat, grid: dict, soc0: float) -> tuple[float, float]:
"""Vrátí (cash+deg náklad okna v Kč, koncový SoC Wh). Min = lepší."""
n = len(slots)
prob = pulp.LpProblem("w", pulp.LpMinimize)
mc = bat.max_charge_w * INTERVAL_H
md = bat.max_discharge_w * INTERVAL_H
mi = grid["imp_w"] * INTERVAL_H
me = grid["exp_w"] * INTERVAL_H
gi = [pulp.LpVariable(f"gi{t}", 0, mi) for t in range(n)]
ge = [pulp.LpVariable(f"ge{t}", 0, me) for t in range(n)]
bc = [pulp.LpVariable(f"bc{t}", 0, mc) for t in range(n)]
bd = [pulp.LpVariable(f"bd{t}", 0, md) for t in range(n)]
pa = [pulp.LpVariable(f"pa{t}", 0, slots[t].pv_a_wh) for t in range(n)]
pb = [pulp.LpVariable(f"pb{t}", 0, slots[t].pv_b_wh) for t in range(n)]
soc = [pulp.LpVariable(f"s{t}", bat.min_wh, bat.max_wh) for t in range(n)]
y = [pulp.LpVariable(f"y{t}", cat="Binary") for t in range(n)]
for t in range(n):
s = slots[t]
pb_used = pb[t] if grid["pv_b_shed"] else s.pv_b_wh
prob += pa[t] + pb_used + gi[t] + bd[t] == s.load_wh + bc[t] + ge[t]
prev = soc0 if t == 0 else soc[t - 1]
prob += soc[t] == prev + bc[t] * bat.eff_c - bd[t] / bat.eff_d
prob += gi[t] <= mi * y[t]
prob += ge[t] <= me * (1 - y[t])
# AC strop hybridního střídače: export jde přes Deye kromě pole B (GEN)
prob += ge[t] - (pb[t] if grid["pv_b_shed"] else s.pv_b_wh) <= grid["ac_w"] * INTERVAL_H
if s.sell < 0 and grid["block_neg"]:
prob += ge[t] == 0
if s.buy < 0:
prob += ge[t] == 0
avg_buy = sum(s.buy for s in slots) / n
cash = pulp.lpSum(gi[t] / 1000 * slots[t].buy - ge[t] / 1000 * slots[t].sell for t in range(n))
deg = pulp.lpSum(0.5 * (bc[t] + bd[t]) / 1000 * bat.deg_czk_kwh for t in range(n))
prob += cash + deg - soc[n - 1] / 1000 * max(0.0, avg_buy)
solver = pulp.HiGHS_CMD(msg=False, timeLimit=30) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False)
prob.solve(solver)
if pulp.LpStatus[prob.status] != "Optimal":
raise RuntimeError(pulp.LpStatus[prob.status])
val = float(pulp.value(cash)) + float(pulp.value(deg))
return val, float(soc[n - 1].value())
def run_config(slots: list[Slot], bat: Bat, grid: dict) -> tuple[float, int]:
"""Sekvenčně po oknech, navazující SoC. Vrátí (Σ náklad Kč, počet dní)."""
total = 0.0
soc = bat.min_wh + 0.3 * (bat.max_wh - bat.min_wh)
days = 0
i = 0
while i < len(slots):
win = slots[i : i + WINDOW_DAYS * 96]
if len(win) < 96: # neúplný zbytek
break
cost, soc = solve_window(win, bat, grid, soc)
total += cost
days += len(win) // 96
i += len(win)
return total, days
async def main() -> None:
conn = await asyncpg.connect(_dsn())
try:
print("# Studie navýšení baterie — perfect-hindsight nad reálnými daty (audit_interval)")
print(f"# Okna {WINDOW_DAYS} dní s navazujícím SoC; Δ = horní mez ročního přínosu\n")
for code, target_kwh in STUDY:
site_id, bat_cur, grid = await _load_site(conn, code)
slots = await _load_slots(conn, site_id)
if not slots:
print(f"{code}: žádná audit data, přeskočeno")
continue
d0, d1 = slots[0].ts.date(), slots[-1].ts.date()
bat_up_keep = replace(bat_cur, usable_wh=target_kwh * 1000.0)
p_c = min(target_kwh * 1000.0 * grid["c_rate"], grid["ac_w"])
bat_up_c = replace(
bat_cur,
usable_wh=target_kwh * 1000.0,
max_charge_w=p_c,
max_discharge_w=p_c,
)
configs = [
(f"current {bat_cur.usable_wh/1000:.1f} kWh / {bat_cur.max_charge_w/1000:.2f} kW", bat_cur),
(f"upgrade {target_kwh:.0f} kWh / {bat_up_keep.max_charge_w/1000:.2f} kW (výkon beze změny)", bat_up_keep),
(f"upgrade {target_kwh:.0f} kWh / {bat_up_c.max_charge_w/1000:.2f} kW (0.5C, cap AC stridace)", bat_up_c),
]
print(f"## {code} ({d0}{d1}; block_neg={grid['block_neg']}, pv_b_shed={grid['pv_b_shed']}, export cap {grid['exp_w']/1000:.0f} kW)")
base_cost = None
base_days = None
for label, bat in configs:
cost, days = run_config(slots, bat, grid)
if base_cost is None:
base_cost, base_days = cost, days
print(f" {label:<55} {cost:>10.0f} Kč / {days} dní")
else:
d = base_cost - cost
per_day = d / max(1, days)
print(
f" {label:<55} {cost:>10.0f} Kč Δ {d:+.0f}"
f"({per_day:+.2f} Kč/den; rok ~{per_day*365*0.6:.0f}{per_day*365:.0f} Kč)"
)
print()
print("Pozn.: rok = Kč/den × 365; dolní odhad ×0.6 (zima: méně PV, menší spready).")
print("Horní mez (dokonalá předpověď) — reálný plánovač zachytí typicky 7090 %.")
finally:
await conn.close()
if __name__ == "__main__":
sys.exit(asyncio.run(main()))