diff --git a/scripts/harness/battery_upgrade_study.py b/scripts/harness/battery_upgrade_study.py new file mode 100644 index 0000000..fafb32e --- /dev/null +++ b/scripts/harness/battery_upgrade_study.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Studie navýšení kapacity baterie (BA81 → 32 kWh, KV1 → 25 kWh) nad REÁLNÝMI daty. + +Metoda: perfect-hindsight MILP (rozšířený oracle z economics_report) nad skutečnou +PV výrobou, spotřebou a efektivními cenami z ems.audit_interval — po TÝDENNÍCH +oknech s navazujícím SoC mezi okny (zachytí vícedenní arbitráž). Pro každou +lokalitu tři konfigurace: + 1. current — stávající baterie z DB, + 2. upgrade/keepP — cílová kapacita, výkon beze změny (střídačem limitované), + 3. upgrade/0.5C — cílová kapacita, výkon 0.5C jako dnes (BMS limitované). + +Δ oracle cashflow = HORNÍ MEZ přínosu (dokonalá předpověď). Reálný plánovač +zachytí část — viz capture ratio v reportu. Extrapolace na rok je poctivě +označená (data jaro/léto: vysoká PV, záporné ceny → zima přinese méně). + +BA81 specifikum: pole B (GEN mikroinvertory) lze fyzicky odpojit (gen cutoff) +→ model dovoluje shed PV B (jinak by nucený export při sell<0 zkresloval). + +Spouštět odkudkoli: EMS_DB_DSN=… python3 scripts/harness/battery_upgrade_study.py +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from dataclasses import dataclass, replace +from datetime import datetime, timedelta +from pathlib import Path +from zoneinfo import ZoneInfo + +import asyncpg +import pulp + +PRAGUE = ZoneInfo("Europe/Prague") +INTERVAL_H = 0.25 + +STUDY = [ + # (site_code, target_usable_kwh) + ("BA81", 32.0), + ("KV1", 25.0), +] +WINDOW_DAYS = 7 + + +@dataclass +class Bat: + usable_wh: float + min_pct: float + max_pct: float + eff_c: float + eff_d: float + deg_czk_kwh: float + max_charge_w: float + max_discharge_w: float + + @property + def min_wh(self) -> float: + return self.min_pct / 100.0 * self.usable_wh + + @property + def max_wh(self) -> float: + return self.max_pct / 100.0 * self.usable_wh + + +@dataclass +class Slot: + ts: datetime + buy: float + sell: float + pv_a_wh: float + pv_b_wh: float + load_wh: float + + +def _dsn() -> str: + return os.environ.get( + "EMS_DB_DSN", + "postgresql://ems_user:dev_password@10.200.200.1:5432/ems", + ) + + +async def _load_site(conn: asyncpg.Connection, code: str): + row = await conn.fetchrow( + """ + select s.id as site_id, b.usable_capacity_wh, b.min_soc_percent, b.max_soc_percent, + b.charge_efficiency, b.discharge_efficiency, b.degradation_cost_czk_kwh, + b.max_charge_c_rate, b.bms_max_charge_w, b.bms_max_discharge_w, + g.max_import_power_w, g.max_export_power_w, g.block_export_on_negative_sell, + coalesce(i.deye_gen_microinverter_cutoff_enabled, false) as gen_cutoff, + i.max_ac_output_w, i.max_battery_charge_w as inv_bat_charge_w, + i.max_battery_discharge_w as inv_bat_discharge_w + from ems.asset_battery b + join ems.site s on s.id = b.site_id + join ems.asset_inverter i on i.id = b.inverter_id + left join ems.site_grid_connection g on g.site_id = s.id + where s.code = $1 + """, + code, + ) + if row is None: + raise SystemExit(f"site {code} nenalezen") + inv_chg = float(row["inv_bat_charge_w"] or 10**9) + inv_dis = float(row["inv_bat_discharge_w"] or 10**9) + bat = Bat( + usable_wh=float(row["usable_capacity_wh"]), + min_pct=float(row["min_soc_percent"]), + max_pct=float(row["max_soc_percent"]), + eff_c=float(row["charge_efficiency"]), + eff_d=float(row["discharge_efficiency"]), + deg_czk_kwh=float(row["degradation_cost_czk_kwh"]), + max_charge_w=min(float(row["bms_max_charge_w"]), inv_chg), + max_discharge_w=min(float(row["bms_max_discharge_w"]), inv_dis), + ) + grid = { + "imp_w": float(row["max_import_power_w"]), + "exp_w": float(row["max_export_power_w"]), + "block_neg": bool(row["block_export_on_negative_sell"]), + "pv_b_shed": bool(row["gen_cutoff"]), + "c_rate": float(row["max_charge_c_rate"] or 0.5), + "ac_w": float(row["max_ac_output_w"] or 10**9), # strop AC výstupu hybridu + "inv_bat_w": min(inv_chg, inv_dis), # strop bateriové cesty střídače + } + return int(row["site_id"]), bat, grid + + +async def _load_slots(conn: asyncpg.Connection, site_id: int) -> list[Slot]: + rows = await conn.fetch( + """ + select a.interval_start, + p.effective_buy_price_czk_kwh as buy, + p.effective_sell_price_czk_kwh as sell, + greatest(0, coalesce(a.actual_pv_production_wh,0) - coalesce(a.pv_b_production_wh,0)) as pv_a, + coalesce(a.pv_b_production_wh,0) as pv_b, + coalesce(a.actual_load_consumption_wh,0) as load + from ems.audit_interval a + join ems.vw_site_effective_price p + on p.site_id = a.site_id and p.interval_start = a.interval_start + where a.site_id = $1 and a.actual_load_consumption_wh is not null + order by a.interval_start + """, + site_id, + ) + return [ + Slot(r["interval_start"], float(r["buy"]), float(r["sell"]), + float(r["pv_a"]), float(r["pv_b"]), float(r["load"])) + for r in rows + ] + + +def solve_window(slots: list[Slot], bat: Bat, grid: dict, soc0: float) -> tuple[float, float]: + """Vrátí (cash+deg náklad okna v Kč, koncový SoC Wh). Min = lepší.""" + n = len(slots) + prob = pulp.LpProblem("w", pulp.LpMinimize) + mc = bat.max_charge_w * INTERVAL_H + md = bat.max_discharge_w * INTERVAL_H + mi = grid["imp_w"] * INTERVAL_H + me = grid["exp_w"] * INTERVAL_H + + gi = [pulp.LpVariable(f"gi{t}", 0, mi) for t in range(n)] + ge = [pulp.LpVariable(f"ge{t}", 0, me) for t in range(n)] + bc = [pulp.LpVariable(f"bc{t}", 0, mc) for t in range(n)] + bd = [pulp.LpVariable(f"bd{t}", 0, md) for t in range(n)] + pa = [pulp.LpVariable(f"pa{t}", 0, slots[t].pv_a_wh) for t in range(n)] + pb = [pulp.LpVariable(f"pb{t}", 0, slots[t].pv_b_wh) for t in range(n)] + soc = [pulp.LpVariable(f"s{t}", bat.min_wh, bat.max_wh) for t in range(n)] + y = [pulp.LpVariable(f"y{t}", cat="Binary") for t in range(n)] + + for t in range(n): + s = slots[t] + pb_used = pb[t] if grid["pv_b_shed"] else s.pv_b_wh + prob += pa[t] + pb_used + gi[t] + bd[t] == s.load_wh + bc[t] + ge[t] + prev = soc0 if t == 0 else soc[t - 1] + prob += soc[t] == prev + bc[t] * bat.eff_c - bd[t] / bat.eff_d + prob += gi[t] <= mi * y[t] + prob += ge[t] <= me * (1 - y[t]) + # AC strop hybridního střídače: export jde přes Deye kromě pole B (GEN) + prob += ge[t] - (pb[t] if grid["pv_b_shed"] else s.pv_b_wh) <= grid["ac_w"] * INTERVAL_H + if s.sell < 0 and grid["block_neg"]: + prob += ge[t] == 0 + if s.buy < 0: + prob += ge[t] == 0 + + avg_buy = sum(s.buy for s in slots) / n + cash = pulp.lpSum(gi[t] / 1000 * slots[t].buy - ge[t] / 1000 * slots[t].sell for t in range(n)) + deg = pulp.lpSum(0.5 * (bc[t] + bd[t]) / 1000 * bat.deg_czk_kwh for t in range(n)) + prob += cash + deg - soc[n - 1] / 1000 * max(0.0, avg_buy) + + solver = pulp.HiGHS_CMD(msg=False, timeLimit=30) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False) + prob.solve(solver) + if pulp.LpStatus[prob.status] != "Optimal": + raise RuntimeError(pulp.LpStatus[prob.status]) + val = float(pulp.value(cash)) + float(pulp.value(deg)) + return val, float(soc[n - 1].value()) + + +def run_config(slots: list[Slot], bat: Bat, grid: dict) -> tuple[float, int]: + """Sekvenčně po oknech, navazující SoC. Vrátí (Σ náklad Kč, počet dní).""" + total = 0.0 + soc = bat.min_wh + 0.3 * (bat.max_wh - bat.min_wh) + days = 0 + i = 0 + while i < len(slots): + win = slots[i : i + WINDOW_DAYS * 96] + if len(win) < 96: # neúplný zbytek + break + cost, soc = solve_window(win, bat, grid, soc) + total += cost + days += len(win) // 96 + i += len(win) + return total, days + + +async def main() -> None: + conn = await asyncpg.connect(_dsn()) + try: + print("# Studie navýšení baterie — perfect-hindsight nad reálnými daty (audit_interval)") + print(f"# Okna {WINDOW_DAYS} dní s navazujícím SoC; Δ = horní mez ročního přínosu\n") + for code, target_kwh in STUDY: + site_id, bat_cur, grid = await _load_site(conn, code) + slots = await _load_slots(conn, site_id) + if not slots: + print(f"{code}: žádná audit data, přeskočeno") + continue + d0, d1 = slots[0].ts.date(), slots[-1].ts.date() + + bat_up_keep = replace(bat_cur, usable_wh=target_kwh * 1000.0) + p_c = min(target_kwh * 1000.0 * grid["c_rate"], grid["ac_w"]) + bat_up_c = replace( + bat_cur, + usable_wh=target_kwh * 1000.0, + max_charge_w=p_c, + max_discharge_w=p_c, + ) + configs = [ + (f"current {bat_cur.usable_wh/1000:.1f} kWh / {bat_cur.max_charge_w/1000:.2f} kW", bat_cur), + (f"upgrade {target_kwh:.0f} kWh / {bat_up_keep.max_charge_w/1000:.2f} kW (výkon beze změny)", bat_up_keep), + (f"upgrade {target_kwh:.0f} kWh / {bat_up_c.max_charge_w/1000:.2f} kW (0.5C, cap AC stridace)", bat_up_c), + ] + print(f"## {code} ({d0} … {d1}; block_neg={grid['block_neg']}, pv_b_shed={grid['pv_b_shed']}, export cap {grid['exp_w']/1000:.0f} kW)") + base_cost = None + base_days = None + for label, bat in configs: + cost, days = run_config(slots, bat, grid) + if base_cost is None: + base_cost, base_days = cost, days + print(f" {label:<55} {cost:>10.0f} Kč / {days} dní") + else: + d = base_cost - cost + per_day = d / max(1, days) + print( + f" {label:<55} {cost:>10.0f} Kč Δ {d:+.0f} Kč " + f"({per_day:+.2f} Kč/den; rok ~{per_day*365*0.6:.0f}–{per_day*365:.0f} Kč)" + ) + print() + print("Pozn.: rok = Kč/den × 365; dolní odhad ×0.6 (zima: méně PV, menší spready).") + print("Horní mez (dokonalá předpověď) — reálný plánovač zachytí typicky 70–90 %.") + finally: + await conn.close() + + +if __name__ == "__main__": + sys.exit(asyncio.run(main())) diff --git a/scripts/harness/hu1_bess_study.py b/scripts/harness/hu1_bess_study.py new file mode 100644 index 0000000..f89c11b --- /dev/null +++ b/scripts/harness/hu1_bess_study.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +""" +Studie HU1 (Hulín BESS): 128 kWh / 36 kW baterie, 2×20 kW Deye (AC 40 kW). + +Ekonomika čistého BESS (bez PV a spotřeby) nad reálným obdobím BA81 auditu: + - nákup FIXNÍ jako BA81 (efektivní buy site 3 — dle záměru smlouvy), + - prodej SPOT (efektivní sell site 3 jako proxy; HU1 marže dle budoucí smlouvy), + - limity: baterie 36 kW (BMS/střídač), AC 40 kW, import 43 kW, export 42 kW, + block_export_on_negative_sell = true (konfigurace site 5 v DB), + - volitelný EDC sdílecí kanál z BA81: v slotech, kdy má BA81 přebytek při + sell<0, lze nabíjet za pouhou distribuci (parametr; citlivost 1.0/1.5/2.0 + Kč/kWh). Množství = skutečný přebytek BA81 (pv − load) z auditu. + +Výstup: Kč/den bez sdílení a se sdílením — horní mez (perfect hindsight). +POZOR: EDC sdílení zatím v EMS NENÍ implementováno — viz závěr studie. +""" + +from __future__ import annotations + +import asyncio +import os +import sys +from dataclasses import dataclass + +import asyncpg +import pulp + +INTERVAL_H = 0.25 +WINDOW_DAYS = 7 + +BAT_USABLE_WH = 128_000.0 +BAT_POWER_W = 36_000.0 +AC_W = 40_000.0 +IMP_W = 43_000.0 +EXP_W = 42_000.0 +MIN_PCT, MAX_PCT = 10.0, 100.0 +EFF = 0.95 +DEG_CZK_KWH = 0.30 +BLOCK_NEG = True +DIST_SENSITIVITY = [None, 2.0, 1.5, 1.0] # None = bez sdílení + + +@dataclass +class Slot: + buy: float + sell: float + share_wh: float # sdílitelný přebytek BA81 (jen sloty sell<0, jinak 0) + + +def _dsn() -> str: + return os.environ.get( + "EMS_DB_DSN", "postgresql://ems_user:dev_password@10.200.200.1:5432/ems" + ) + + +async def _load(conn: asyncpg.Connection, price_site: int = 3) -> list[Slot]: + rows = await conn.fetch( + """ + select case when $1 = 5 then p2.effective_buy_price_czk_kwh + else p.effective_buy_price_czk_kwh end as buy, + case when $1 = 5 then p2.effective_sell_price_czk_kwh + else p.effective_sell_price_czk_kwh end as sell, + case when p.effective_sell_price_czk_kwh < 0 + then greatest(0, coalesce(a.actual_pv_production_wh,0) + - coalesce(a.actual_load_consumption_wh,0)) + else 0 end as share_wh + from ems.audit_interval a + join ems.vw_site_effective_price p + on p.site_id = a.site_id and p.interval_start = a.interval_start + left join ems.vw_site_effective_price p2 + on p2.site_id = 5 and p2.interval_start = a.interval_start + where a.site_id = 3 and a.actual_load_consumption_wh is not null + and p2.effective_buy_price_czk_kwh is not null + order by a.interval_start + """, + price_site, + ) + return [Slot(float(r["buy"]), float(r["sell"]), float(r["share_wh"])) for r in rows] + + +def solve_window(slots: list[Slot], soc0: float, dist: float | None) -> tuple[float, float]: + n = len(slots) + prob = pulp.LpProblem("hu1", pulp.LpMinimize) + mc = min(BAT_POWER_W, AC_W) * INTERVAL_H + mi = IMP_W * INTERVAL_H + me = min(EXP_W, AC_W) * INTERVAL_H + smin = MIN_PCT / 100 * BAT_USABLE_WH + smax = MAX_PCT / 100 * BAT_USABLE_WH + + gi = [pulp.LpVariable(f"gi{t}", 0, mi) for t in range(n)] + ge = [pulp.LpVariable(f"ge{t}", 0, me) for t in range(n)] + bc = [pulp.LpVariable(f"bc{t}", 0, mc) for t in range(n)] + bd = [pulp.LpVariable(f"bd{t}", 0, mc) for t in range(n)] + sh = [ + pulp.LpVariable(f"sh{t}", 0, slots[t].share_wh if dist is not None else 0) + for t in range(n) + ] + soc = [pulp.LpVariable(f"s{t}", smin, smax) for t in range(n)] + y = [pulp.LpVariable(f"y{t}", cat="Binary") for t in range(n)] + + for t in range(n): + s = slots[t] + # BESS bez lokální spotřeby/PV: nabíjení ze sítě/sdílení, vybíjení do exportu + prob += gi[t] + sh[t] + bd[t] == bc[t] + ge[t] + prev = soc0 if t == 0 else soc[t - 1] + prob += soc[t] == prev + bc[t] * EFF - bd[t] / EFF + prob += gi[t] + sh[t] <= mi * y[t] + prob += ge[t] <= me * (1 - y[t]) + if s.sell < 0 and BLOCK_NEG: + prob += ge[t] == 0 + if s.buy < 0: + prob += ge[t] == 0 + + avg_buy = sum(s.buy for s in slots) / n + cash = pulp.lpSum( + gi[t] / 1000 * slots[t].buy + + (sh[t] / 1000 * dist if dist is not None else 0) + - ge[t] / 1000 * slots[t].sell + for t in range(n) + ) + deg = pulp.lpSum(0.5 * (bc[t] + bd[t]) / 1000 * DEG_CZK_KWH for t in range(n)) + prob += cash + deg - soc[n - 1] / 1000 * max(0.0, avg_buy) + + solver = pulp.HiGHS_CMD(msg=False, timeLimit=30) if pulp.HiGHS_CMD().available() else pulp.PULP_CBC_CMD(msg=False) + prob.solve(solver) + if pulp.LpStatus[prob.status] != "Optimal": + raise RuntimeError(pulp.LpStatus[prob.status]) + return float(pulp.value(cash)) + float(pulp.value(deg)), float(soc[n - 1].value()) + + +async def main() -> None: + conn = await asyncpg.connect(_dsn()) + try: + import os as _os + price_site = int(_os.environ.get('HU1_PRICE_SITE', '3')) + slots = await _load(conn, price_site) + finally: + await conn.close() + days_total = len(slots) // 96 + share_total = sum(s.share_wh for s in slots) / 1000 + print(f"# HU1 BESS studie — 128 kWh / 36 kW / AC 40 kW; ceny site {price_site} ({'fixní nákup BA81' if price_site==3 else 'SPOT nákup i prodej (site 5)'})") + print(f"# Období: {days_total} dní (BA81 audit); sdílitelný přebytek BA81 při sell<0: {share_total:.0f} kWh\n") + for dist in DIST_SENSITIVITY: + total = 0.0 + soc = 0.3 * BAT_USABLE_WH + days = 0 + i = 0 + while i + 96 <= len(slots): + win = slots[i : i + WINDOW_DAYS * 96] + if len(win) < 96: + break + cost, soc = solve_window(win, soc, dist) + total += cost + days += len(win) // 96 + i += len(win) + per_day = -total / max(1, days) + label = "bez EDC sdílení" if dist is None else f"EDC sdílení, distribuce {dist:.1f} Kč/kWh" + print(f" {label:<42} výnos {-total:>8.0f} Kč = {per_day:>7.2f} Kč/den (rok ~{per_day*365*0.6:.0f}–{per_day*365:.0f} Kč)") + print("\nPozn.: horní mez (perfect hindsight); jaro = nejsilnější sezóna pro spot spready.") + + +if __name__ == "__main__": + sys.exit(asyncio.run(main()))