Files
ems/scripts/analysis/battery_sizing_screen.py
Dusan Vojacek 3da738e7e9
All checks were successful
deploy / deploy (push) Successful in 13s
test / smoke-test (push) Successful in 3s
battery simulator
2026-04-12 22:24:32 +02:00

369 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ekonomický screening velikosti baterie (15min, jednobusová energie).
Typicky BA81: fixní nákup + prodej spot, limity výkonu z baterie min(0,5C, střídač),
export/import podle připojení. Načte OTE z Postgres (stejná DB jako EMS) nebo z CSV.
Příklad:
python3 scripts/analysis/battery_sizing_screen.py \\
--db \\
--date-from 2024-04-01 --date-to 2026-04-01 \\
--battery-kwh 12.5 32 48 \\
--load-kw 1.2 \\
--pv-daily-kwh-summer 55 --pv-daily-kwh-winter 12 \\
--sell-margin-fixed -0.02 \\
--buy-vat-kwh 4.443 \\
--capex-per-kwh 9000
Vyžaduje: pip install pulp (volitelně psycopg2 pro --db).
Omezení modelu: syntetický denní tvar FVE (kalibruj --pv-daily-kwh-* podle měření);
mikroinvertory / GEN nejsou; zelený bonus není v účelové funkci; nákup je jedna flat
sazba vč. DPH (reálné NT/VT přes HDO přidej později). Výsledek = screening, ne nabídka.
"""
from __future__ import annotations
import argparse
import csv
import math
import os
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Iterable, Sequence
try:
import pulp
except ImportError:
print("Instaluj PuLP: pip install pulp", file=sys.stderr)
raise
DT_H = 0.25 # 15 min
SLOTS_PER_DAY = 96
@dataclass
class SiteLimits:
max_export_w: float = 16_000.0
max_import_w: float = 17_000.0
inv_batt_max_w: float = 12_000.0 # strop střídače z baterie / nabíjení
c_rate: float = 0.5 # P_batt = min(c_rate * E_kWh * 1000, inv_batt_max_w)
eta_charge: float = 0.95
eta_discharge: float = 0.95
soc_min_frac: float = 0.10
soc_max_frac: float = 0.95
def batt_power_cap_w(usable_kwh: float, site: SiteLimits) -> float:
return min(site.c_rate * usable_kwh * 1000.0, site.inv_batt_max_w)
def summer_day(d: date) -> bool:
m = d.month
return m >= 4 and m <= 9
def pv_shape_96() -> list[float]:
"""Nenormalizovaný denní tvar (96 slotů), plocha = 1 po normalizaci."""
w = [0.0] * SLOTS_PER_DAY
for t in range(SLOTS_PER_DAY):
h = t / 4.0 # hodiny od půlnoci
if 5.5 <= h <= 20.5:
w[t] = max(0.0, math.sin(math.pi * (h - 5.5) / 15.0)) ** 1.2
else:
w[t] = 0.0
s = sum(w)
if s <= 0:
return [1.0 / SLOTS_PER_DAY] * SLOTS_PER_DAY
return [x / s for x in w]
def daily_pv_wh(d: date, summer_kwh: float, winter_kwh: float, shape: Sequence[float]) -> list[float]:
base = summer_kwh if summer_day(d) else winter_kwh
return [base * 1000.0 * sh for sh in shape]
def daily_load_wh(load_kw: float) -> list[float]:
e_per_slot = load_kw * 1000.0 * DT_H
return [e_per_slot] * SLOTS_PER_DAY
def effective_sell_kc_kwh(raw_ote: float, margin_fixed: float, margin_pct: float) -> float:
return raw_ote + margin_fixed + (raw_ote * margin_pct / 100.0)
def load_prices_csv(path: str) -> list[tuple[datetime, float]]:
out: list[tuple[datetime, float]] = []
with open(path, newline="", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
ts = datetime.fromisoformat(row["interval_start"].replace("Z", "+00:00"))
px = float(row["sell_raw_price_czk_kwh"])
out.append((ts, px))
out.sort(key=lambda x: x[0])
return out
def load_prices_db(date_from: date, date_to: date) -> list[tuple[datetime, float]]:
from datetime import timezone
try:
import psycopg2
except ImportError as e:
raise SystemExit("Pro --db instaluj psycopg2-binary nebo použij --price-csv") from e
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
t0 = datetime.combine(date_from, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
t1 = datetime.combine(date_to, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
conn = psycopg2.connect(
host=os.environ.get("PGHOST", "127.0.0.1"),
port=int(os.environ.get("PGPORT", "5432")),
dbname=os.environ.get("PGDATABASE", "ems"),
user=os.environ.get("PGUSER", os.environ.get("DB_USER", "ems_user")),
password=os.environ.get("PGPASSWORD", os.environ.get("DB_PASSWORD", "")),
)
cur = conn.cursor()
cur.execute(
"""
SELECT interval_start, sell_raw_price_czk_kwh::float
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start >= %s
AND interval_start < %s
ORDER BY interval_start
""",
(t0, t1),
)
rows = cur.fetchall()
conn.close()
return [(r[0], float(r[1])) for r in rows]
def prices_by_calendar_day(
series: list[tuple[datetime, float]],
) -> dict[date, list[float]]:
"""96 hodnot Kč/kWh (raw OTE) na kalendářní den Europe/Prague."""
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
buckets: dict[date, dict[int, float]] = {}
for ts, px in series:
local = ts.astimezone(prg)
d = local.date()
slot = local.hour * 4 + local.minute // 15
buckets.setdefault(d, {})[slot] = px
out: dict[date, list[float]] = {}
for d, mp in buckets.items():
if len(mp) < SLOTS_PER_DAY:
continue
out[d] = [mp[i] for i in range(SLOTS_PER_DAY)]
return out
def solve_one_day(
pv_wh: Sequence[float],
load_wh: Sequence[float],
p_sell: Sequence[float],
p_buy_flat: float,
e_usable_wh: float,
p_batt_w: float,
site: SiteLimits,
soc_start_wh: float,
) -> tuple[float, float, float, float]:
"""
Vrátí (cash_kc, soc_end_wh, curtailed_wh, discharged_wh_sum).
cash = příjem z exportu nákup z DS (jen energie, Kč).
"""
e_min = site.soc_min_frac * e_usable_wh
e_max = site.soc_max_frac * e_usable_wh
max_ch = p_batt_w * DT_H
max_dis = p_batt_w * DT_H
max_exp = site.max_export_w * DT_H
max_imp = site.max_import_w * DT_H
prob = pulp.LpProblem("ems_day", pulp.LpMaximize)
soc = pulp.LpVariable.dicts("soc", range(SLOTS_PER_DAY + 1), lowBound=e_min, upBound=e_max)
ch = pulp.LpVariable.dicts("ch", range(SLOTS_PER_DAY), lowBound=0)
dis = pulp.LpVariable.dicts("dis", range(SLOTS_PER_DAY), lowBound=0)
gexp = pulp.LpVariable.dicts("gexp", range(SLOTS_PER_DAY), lowBound=0)
gimp = pulp.LpVariable.dicts("gimp", range(SLOTS_PER_DAY), lowBound=0)
curt = pulp.LpVariable.dicts("curt", range(SLOTS_PER_DAY), lowBound=0)
prob += soc[0] == soc_start_wh
obj = []
for t in range(SLOTS_PER_DAY):
prob += ch[t] <= max_ch
prob += dis[t] <= max_dis
prob += gexp[t] <= max_exp
prob += gimp[t] <= max_imp
prob += curt[t] <= pv_wh[t]
prob += (
pv_wh[t] - curt[t] + dis[t] + gimp[t] == load_wh[t] + ch[t] + gexp[t]
), f"balance_{t}"
prob += (
soc[t + 1]
== soc[t] + site.eta_charge * ch[t] - dis[t] / site.eta_discharge
), f"socdyn_{t}"
obj.append(p_sell[t] * gexp[t] / 1000.0 - p_buy_flat * gimp[t] / 1000.0)
prob += pulp.lpSum(obj)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=60)
prob.solve(solver)
if prob.status != pulp.LpStatusOptimal:
raise RuntimeError(f"LP status {pulp.LpStatus[prob.status]}")
cash = float(pulp.value(prob.objective))
soc_end = float(pulp.value(soc[SLOTS_PER_DAY]))
curt_total = sum(float(pulp.value(curt[t])) for t in range(SLOTS_PER_DAY))
dis_total = sum(float(pulp.value(dis[t])) for t in range(SLOTS_PER_DAY))
return cash, soc_end, curt_total, dis_total
def simulate_year(
days: Iterable[date],
px_day: dict[date, list[float]],
usable_kwh: float,
site: SiteLimits,
sell_margin_fixed: float,
sell_margin_pct: float,
buy_vat_kwh: float,
summer_kwh: float,
winter_kwh: float,
load_kw: float,
shape: Sequence[float],
) -> dict[str, float]:
e_wh = usable_kwh * 1000.0
p_batt = batt_power_cap_w(usable_kwh, site)
load_wh = daily_load_wh(load_kw)
cash_total = 0.0
curt_total = 0.0
dis_total = 0.0
soc_state = 0.5 * (site.soc_min_frac + site.soc_max_frac) * e_wh
n_days = 0
for d in days:
if d not in px_day:
continue
raw = px_day[d]
p_sell = [effective_sell_kc_kwh(x, sell_margin_fixed, sell_margin_pct) for x in raw]
pv_wh = daily_pv_wh(d, summer_kwh, winter_kwh, shape)
cash, soc_state, curt, dis = solve_one_day(
pv_wh, load_wh, p_sell, buy_vat_kwh, e_wh, p_batt, site, soc_state
)
cash_total += cash
curt_total += curt
dis_total += dis
n_days += 1
feq = (dis_total / e_wh / n_days) if n_days and e_wh > 0 else 0.0
return {
"cash_kc": cash_total,
"days": float(n_days),
"curt_wh": curt_total,
"dis_wh": dis_total,
"feq_cycles_per_day": feq,
}
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--db", action="store_true", help="Načti OTE z Postgres (env PG* / DB_*)")
ap.add_argument("--price-csv", type=str, default="", help="CSV: interval_start, sell_raw_price_czk_kwh")
ap.add_argument("--date-from", type=str, required=True)
ap.add_argument("--date-to", type=str, required=True)
ap.add_argument("--battery-kwh", type=float, nargs="+", required=True, help="Užitkové kWh (např. 12.5 32 48)")
ap.add_argument("--load-kw", type=float, default=1.0, help="Průměrný odběr (konstanta přes den)")
ap.add_argument("--pv-daily-kwh-summer", type=float, default=50.0)
ap.add_argument("--pv-daily-kwh-winter", type=float, default=10.0)
ap.add_argument("--sell-margin-fixed", type=float, default=-0.02)
ap.add_argument("--sell-margin-pct", type=float, default=0.0)
ap.add_argument("--buy-vat-kwh", type=float, default=4.443, help="Efektivní nákup Kč/kWh vč. DPH (flat screening)")
ap.add_argument("--max-export-w", type=float, default=16_000.0)
ap.add_argument("--max-import-w", type=float, default=17_000.0)
ap.add_argument("--inv-batt-max-w", type=float, default=12_000.0)
ap.add_argument("--c-rate", type=float, default=0.5)
ap.add_argument("--capex-per-kwh", type=float, default=0.0, help="CAPEX za 1 kWh rozšíření; vypíše jednoduchou návratnost vs. nejmenší baterie")
args = ap.parse_args()
d0 = date.fromisoformat(args.date_from)
d1 = date.fromisoformat(args.date_to)
if args.db:
series = load_prices_db(d0, d1)
elif args.price_csv:
series = load_prices_csv(args.price_csv)
else:
ap.error("Zadej --db nebo --price-csv")
px_day = prices_by_calendar_day(series)
shape = pv_shape_96()
site = SiteLimits(
max_export_w=args.max_export_w,
max_import_w=args.max_import_w,
inv_batt_max_w=args.inv_batt_max_w,
c_rate=args.c_rate,
)
day_list = [d0 + timedelta(days=i) for i in range((d1 - d0).days)]
results = []
for kwh in sorted(args.battery_kwh):
r = simulate_year(
day_list,
px_day,
kwh,
site,
args.sell_margin_fixed,
args.sell_margin_pct,
args.buy_vat_kwh,
args.pv_daily_kwh_summer,
args.pv_daily_kwh_winter,
args.load_kw,
shape,
)
results.append((kwh, r))
baseline_kwh = min(args.battery_kwh)
base = dict(results)[baseline_kwh]
print("Parametry: prodej = OTE + sell_margin_fixed (+ %), nákup = flat buy_vat_kwh")
print(f" FVE tvar = syntetický den, léto {args.pv_daily_kwh_summer} kWh/d, zima {args.pv_daily_kwh_winter} kWh/d, load {args.load_kw} kW")
print(f" Limity: export {args.max_export_w} W, import {args.max_import_w} W, P_batt = min({args.c_rate}*E_kWh, {args.inv_batt_max_w} W)")
print()
print(f"{'kWh':>8} {'P_batt_kW':>10} {'cash_kc/rok':>14} {'Δ vs min':>12} {'curt_MWh/y':>12} {'Feq/den':>8}")
for kwh, r in results:
pkw = batt_power_cap_w(kwh, site) / 1000.0
days = max(int(r["days"]), 1)
cash_y = r["cash_kc"] * (365.0 / days)
curt_mwh = r["curt_wh"] / 1e6 * (365.0 / days)
delta = cash_y - base["cash_kc"] * (365.0 / days) if kwh != baseline_kwh else 0.0
print(
f"{kwh:8.1f} {pkw:10.2f} {cash_y:14.0f} {delta:12.0f} {curt_mwh:12.2f} {r['feq_cycles_per_day']:8.2f}"
)
if args.capex_per_kwh > 0:
print()
base_cash = base["cash_kc"] * (365.0 / max(int(base["days"]), 1))
for kwh, r in results:
if kwh <= baseline_kwh:
continue
cash_y = r["cash_kc"] * (365.0 / max(int(r["days"]), 1))
delta = cash_y - base_cash
extra_kwh = kwh - baseline_kwh
capex = extra_kwh * args.capex_per_kwh
if delta > 0:
years = capex / delta
print(
f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh CAPEX ~{capex:,.0f} Kč, "
f"odhad +{delta:,.0f} Kč/rok → návratnost ~{years:.1f} r"
)
else:
print(f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh: model neukazuje vyšší roční cash ({delta:,.0f} Kč/rok)")
if __name__ == "__main__":
main()