Klíčová oprava (postřeh uživatele): při sell<0 lokality škrtí výrobu (reg 340 / GEN cutoff) — telemetrie ukázala 357 kWh, predikce 1879 kWh (96 % minut v derating). Studie nyní používají max(skutečnost, kanonický forecast per pole) v sell<0 slotech. Nové výsledky (horní meze): BA81 32 kWh +35/+46 Kč/den (výkon 6.25/12 kW); KV1 25 kWh +20/+22 Kč/den (stará smlouva); HU1 fixní: 75 Kč/den bez sdílení, 149 Kč/den s EDC sdílením @1.5 Kč distribuce (sdílitelných ~49 kWh/den!); HU1 spot: 372 Kč/den, sdílení +0. + docs/onboarding-wallbox-tc-2026-06.md. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
287 lines
12 KiB
Python
287 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Studie navýšení kapacity baterie (BA81 → 32 kWh, KV1 → 25 kWh) nad REÁLNÝMI daty.
|
||
|
||
Metoda: perfect-hindsight MILP (rozšířený oracle z economics_report) nad skutečnou
|
||
PV výrobou, spotřebou a efektivními cenami z ems.audit_interval — po TÝDENNÍCH
|
||
oknech s navazujícím SoC mezi okny (zachytí vícedenní arbitráž). Pro každou
|
||
lokalitu tři konfigurace:
|
||
1. current — stávající baterie z DB,
|
||
2. upgrade/keepP — cílová kapacita, výkon beze změny (střídačem limitované),
|
||
3. upgrade/0.5C — cílová kapacita, výkon 0.5C jako dnes (BMS limitované).
|
||
|
||
Δ oracle cashflow = HORNÍ MEZ přínosu (dokonalá předpověď). Reálný plánovač
|
||
zachytí část — viz capture ratio v reportu. Extrapolace na rok je poctivě
|
||
označená (data jaro/léto: vysoká PV, záporné ceny → zima přinese méně).
|
||
|
||
BA81 specifikum: pole B (GEN mikroinvertory) lze fyzicky odpojit (gen cutoff)
|
||
→ model dovoluje shed PV B (jinak by nucený export při sell<0 zkresloval).
|
||
|
||
Spouštět odkudkoli: EMS_DB_DSN=… python3 scripts/harness/battery_upgrade_study.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import os
|
||
import sys
|
||
from dataclasses import dataclass, replace
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import asyncpg
|
||
import pulp
|
||
|
||
PRAGUE = ZoneInfo("Europe/Prague")
|
||
INTERVAL_H = 0.25
|
||
|
||
STUDY = [
|
||
# (site_code, target_usable_kwh)
|
||
("BA81", 32.0),
|
||
("KV1", 25.0),
|
||
]
|
||
WINDOW_DAYS = 7
|
||
|
||
|
||
@dataclass
|
||
class Bat:
|
||
usable_wh: float
|
||
min_pct: float
|
||
max_pct: float
|
||
eff_c: float
|
||
eff_d: float
|
||
deg_czk_kwh: float
|
||
max_charge_w: float
|
||
max_discharge_w: float
|
||
|
||
@property
|
||
def min_wh(self) -> float:
|
||
return self.min_pct / 100.0 * self.usable_wh
|
||
|
||
@property
|
||
def max_wh(self) -> float:
|
||
return self.max_pct / 100.0 * self.usable_wh
|
||
|
||
|
||
@dataclass
|
||
class Slot:
|
||
ts: datetime
|
||
buy: float
|
||
sell: float
|
||
pv_a_wh: float
|
||
pv_b_wh: float
|
||
load_wh: float
|
||
|
||
|
||
def _dsn() -> str:
|
||
return os.environ.get(
|
||
"EMS_DB_DSN",
|
||
"postgresql://ems_user:dev_password@10.200.200.1:5432/ems",
|
||
)
|
||
|
||
|
||
async def _load_site(conn: asyncpg.Connection, code: str):
|
||
row = await conn.fetchrow(
|
||
"""
|
||
select s.id as site_id, b.usable_capacity_wh, b.min_soc_percent, b.max_soc_percent,
|
||
b.charge_efficiency, b.discharge_efficiency, b.degradation_cost_czk_kwh,
|
||
b.max_charge_c_rate, b.bms_max_charge_w, b.bms_max_discharge_w,
|
||
g.max_import_power_w, g.max_export_power_w, g.block_export_on_negative_sell,
|
||
coalesce(i.deye_gen_microinverter_cutoff_enabled, false) as gen_cutoff,
|
||
i.max_ac_output_w, i.max_battery_charge_w as inv_bat_charge_w,
|
||
i.max_battery_discharge_w as inv_bat_discharge_w
|
||
from ems.asset_battery b
|
||
join ems.site s on s.id = b.site_id
|
||
join ems.asset_inverter i on i.id = b.inverter_id
|
||
left join ems.site_grid_connection g on g.site_id = s.id
|
||
where s.code = $1
|
||
""",
|
||
code,
|
||
)
|
||
if row is None:
|
||
raise SystemExit(f"site {code} nenalezen")
|
||
inv_chg = float(row["inv_bat_charge_w"] or 10**9)
|
||
inv_dis = float(row["inv_bat_discharge_w"] or 10**9)
|
||
bat = Bat(
|
||
usable_wh=float(row["usable_capacity_wh"]),
|
||
min_pct=float(row["min_soc_percent"]),
|
||
max_pct=float(row["max_soc_percent"]),
|
||
eff_c=float(row["charge_efficiency"]),
|
||
eff_d=float(row["discharge_efficiency"]),
|
||
deg_czk_kwh=float(row["degradation_cost_czk_kwh"]),
|
||
max_charge_w=min(float(row["bms_max_charge_w"]), inv_chg),
|
||
max_discharge_w=min(float(row["bms_max_discharge_w"]), inv_dis),
|
||
)
|
||
grid = {
|
||
"imp_w": float(row["max_import_power_w"]),
|
||
"exp_w": float(row["max_export_power_w"]),
|
||
"block_neg": bool(row["block_export_on_negative_sell"]),
|
||
"pv_b_shed": bool(row["gen_cutoff"]),
|
||
"c_rate": float(row["max_charge_c_rate"] or 0.5),
|
||
"ac_w": float(row["max_ac_output_w"] or 10**9), # strop AC výstupu hybridu
|
||
"inv_bat_w": min(inv_chg, inv_dis), # strop bateriové cesty střídače
|
||
}
|
||
return int(row["site_id"]), bat, grid
|
||
|
||
|
||
async def _load_slots(conn: asyncpg.Connection, site_id: int) -> list[Slot]:
|
||
rows = await conn.fetch(
|
||
"""
|
||
select a.interval_start,
|
||
p.effective_buy_price_czk_kwh as buy,
|
||
p.effective_sell_price_czk_kwh as sell,
|
||
-- POTENCIÁL: při sell<0 lokalita škrtí výrobu (reg 340 / GEN cutoff),
|
||
-- telemetrie ji nevidí → použij max(skutečnost, predikce) per pole.
|
||
case when p.effective_sell_price_czk_kwh < 0
|
||
then greatest(coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0),
|
||
coalesce(fc.fc_a_wh, 0))
|
||
else greatest(0, coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0))
|
||
end as pv_a,
|
||
case when p.effective_sell_price_czk_kwh < 0
|
||
then greatest(coalesce(a.pv_b_production_wh,0), coalesce(fc.fc_b_wh, 0))
|
||
else coalesce(a.pv_b_production_wh,0)
|
||
end as pv_b,
|
||
coalesce(a.actual_load_consumption_wh,0) as load
|
||
from ems.audit_interval a
|
||
join ems.vw_site_effective_price p
|
||
on p.site_id = a.site_id and p.interval_start = a.interval_start
|
||
left join lateral (
|
||
select
|
||
sum(power_w) filter (where pa.controllable) * 0.25 as fc_a_wh,
|
||
sum(power_w) filter (where not pa.controllable) * 0.25 as fc_b_wh
|
||
from (
|
||
select distinct on (fpr.pv_array_id) fpi2.power_w, fpr.pv_array_id
|
||
from ems.forecast_pv_interval fpi2
|
||
join ems.forecast_pv_run fpr on fpr.id = fpi2.run_id
|
||
where fpi2.interval_start = a.interval_start
|
||
order by fpr.pv_array_id, fpr.created_at desc
|
||
) x
|
||
join ems.asset_pv_array pa on pa.id = x.pv_array_id and pa.site_id = a.site_id
|
||
) fc on true
|
||
where a.site_id = $1 and a.actual_load_consumption_wh is not null
|
||
order by a.interval_start
|
||
""",
|
||
site_id,
|
||
)
|
||
return [
|
||
Slot(r["interval_start"], float(r["buy"]), float(r["sell"]),
|
||
float(r["pv_a"]), float(r["pv_b"]), float(r["load"]))
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def solve_window(slots: list[Slot], bat: Bat, grid: dict, soc0: float) -> tuple[float, float]:
|
||
"""Vrátí (cash+deg náklad okna v Kč, koncový SoC Wh). Min = lepší."""
|
||
n = len(slots)
|
||
prob = pulp.LpProblem("w", pulp.LpMinimize)
|
||
mc = bat.max_charge_w * INTERVAL_H
|
||
md = bat.max_discharge_w * INTERVAL_H
|
||
mi = grid["imp_w"] * INTERVAL_H
|
||
me = grid["exp_w"] * INTERVAL_H
|
||
|
||
gi = [pulp.LpVariable(f"gi{t}", 0, mi) for t in range(n)]
|
||
ge = [pulp.LpVariable(f"ge{t}", 0, me) for t in range(n)]
|
||
bc = [pulp.LpVariable(f"bc{t}", 0, mc) for t in range(n)]
|
||
bd = [pulp.LpVariable(f"bd{t}", 0, md) for t in range(n)]
|
||
pa = [pulp.LpVariable(f"pa{t}", 0, slots[t].pv_a_wh) for t in range(n)]
|
||
pb = [pulp.LpVariable(f"pb{t}", 0, slots[t].pv_b_wh) for t in range(n)]
|
||
soc = [pulp.LpVariable(f"s{t}", bat.min_wh, bat.max_wh) for t in range(n)]
|
||
y = [pulp.LpVariable(f"y{t}", cat="Binary") for t in range(n)]
|
||
|
||
for t in range(n):
|
||
s = slots[t]
|
||
pb_used = pb[t] if grid["pv_b_shed"] else s.pv_b_wh
|
||
prob += pa[t] + pb_used + gi[t] + bd[t] == s.load_wh + bc[t] + ge[t]
|
||
prev = soc0 if t == 0 else soc[t - 1]
|
||
prob += soc[t] == prev + bc[t] * bat.eff_c - bd[t] / bat.eff_d
|
||
prob += gi[t] <= mi * y[t]
|
||
prob += ge[t] <= me * (1 - y[t])
|
||
# AC strop hybridního střídače: export jde přes Deye kromě pole B (GEN)
|
||
prob += ge[t] - (pb[t] if grid["pv_b_shed"] else s.pv_b_wh) <= grid["ac_w"] * INTERVAL_H
|
||
if s.sell < 0 and grid["block_neg"]:
|
||
prob += ge[t] == 0
|
||
if s.buy < 0:
|
||
prob += ge[t] == 0
|
||
|
||
avg_buy = sum(s.buy for s in slots) / n
|
||
cash = pulp.lpSum(gi[t] / 1000 * slots[t].buy - ge[t] / 1000 * slots[t].sell for t in range(n))
|
||
deg = pulp.lpSum(0.5 * (bc[t] + bd[t]) / 1000 * bat.deg_czk_kwh for t in range(n))
|
||
prob += cash + deg - soc[n - 1] / 1000 * max(0.0, avg_buy)
|
||
|
||
solver = pulp.HiGHS_CMD(msg=False, timeLimit=30) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False)
|
||
prob.solve(solver)
|
||
if pulp.LpStatus[prob.status] != "Optimal":
|
||
raise RuntimeError(pulp.LpStatus[prob.status])
|
||
val = float(pulp.value(cash)) + float(pulp.value(deg))
|
||
return val, float(soc[n - 1].value())
|
||
|
||
|
||
def run_config(slots: list[Slot], bat: Bat, grid: dict) -> tuple[float, int]:
|
||
"""Sekvenčně po oknech, navazující SoC. Vrátí (Σ náklad Kč, počet dní)."""
|
||
total = 0.0
|
||
soc = bat.min_wh + 0.3 * (bat.max_wh - bat.min_wh)
|
||
days = 0
|
||
i = 0
|
||
while i < len(slots):
|
||
win = slots[i : i + WINDOW_DAYS * 96]
|
||
if len(win) < 96: # neúplný zbytek
|
||
break
|
||
cost, soc = solve_window(win, bat, grid, soc)
|
||
total += cost
|
||
days += len(win) // 96
|
||
i += len(win)
|
||
return total, days
|
||
|
||
|
||
async def main() -> None:
|
||
conn = await asyncpg.connect(_dsn())
|
||
try:
|
||
print("# Studie navýšení baterie — perfect-hindsight nad reálnými daty (audit_interval)")
|
||
print(f"# Okna {WINDOW_DAYS} dní s navazujícím SoC; Δ = horní mez ročního přínosu\n")
|
||
for code, target_kwh in STUDY:
|
||
site_id, bat_cur, grid = await _load_site(conn, code)
|
||
slots = await _load_slots(conn, site_id)
|
||
if not slots:
|
||
print(f"{code}: žádná audit data, přeskočeno")
|
||
continue
|
||
d0, d1 = slots[0].ts.date(), slots[-1].ts.date()
|
||
|
||
bat_up_keep = replace(bat_cur, usable_wh=target_kwh * 1000.0)
|
||
p_c = min(target_kwh * 1000.0 * grid["c_rate"], grid["ac_w"])
|
||
bat_up_c = replace(
|
||
bat_cur,
|
||
usable_wh=target_kwh * 1000.0,
|
||
max_charge_w=p_c,
|
||
max_discharge_w=p_c,
|
||
)
|
||
configs = [
|
||
(f"current {bat_cur.usable_wh/1000:.1f} kWh / {bat_cur.max_charge_w/1000:.2f} kW", bat_cur),
|
||
(f"upgrade {target_kwh:.0f} kWh / {bat_up_keep.max_charge_w/1000:.2f} kW (výkon beze změny)", bat_up_keep),
|
||
(f"upgrade {target_kwh:.0f} kWh / {bat_up_c.max_charge_w/1000:.2f} kW (0.5C, cap AC stridace)", bat_up_c),
|
||
]
|
||
print(f"## {code} ({d0} … {d1}; block_neg={grid['block_neg']}, pv_b_shed={grid['pv_b_shed']}, export cap {grid['exp_w']/1000:.0f} kW)")
|
||
base_cost = None
|
||
base_days = None
|
||
for label, bat in configs:
|
||
cost, days = run_config(slots, bat, grid)
|
||
if base_cost is None:
|
||
base_cost, base_days = cost, days
|
||
print(f" {label:<55} {cost:>10.0f} Kč / {days} dní")
|
||
else:
|
||
d = base_cost - cost
|
||
per_day = d / max(1, days)
|
||
print(
|
||
f" {label:<55} {cost:>10.0f} Kč Δ {d:+.0f} Kč "
|
||
f"({per_day:+.2f} Kč/den; rok ~{per_day*365*0.6:.0f}–{per_day*365:.0f} Kč)"
|
||
)
|
||
print()
|
||
print("Pozn.: rok = Kč/den × 365; dolní odhad ×0.6 (zima: méně PV, menší spready).")
|
||
print("Horní mez (dokonalá předpověď) — reálný plánovač zachytí typicky 70–90 %.")
|
||
finally:
|
||
await conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(asyncio.run(main()))
|