Files
ems/scripts/analysis/battery_sizing_screen.py
Dusan Vojacek fd06811753
All checks were successful
deploy / deploy (push) Successful in 25s
test / smoke-test (push) Successful in 6s
tune microcycling
2026-04-13 00:49:36 +02:00

588 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ekonomický screening velikosti baterie (15min, jednobusová energie).
Typicky BA81: fixní nákup + prodej spot, limity výkonu z baterie min(0,5C, střídač),
export/import podle připojení. Načte OTE z Postgres (stejná DB jako EMS) nebo z CSV.
Připojení k DB (deploy / Docker):
- Postgres v compose poslouchá na ``EMS_DB_BIND:5432`` (výchozí 127.0.0.1). ``connection refused``
= služba ``db`` neběží, nebo je port vázaný jen na jinou IP (WireGuard) → nastav stejný host.
- Skript načte první nalezené ``.env`` z ``…/ems-deploy/.env`` nebo ``…/app/.env`` (není-li
``--no-auto-env``) a doplní ``PGUSER``/``PGPASSWORD`` z ``DB_USER``/``DB_PASSWORD``.
- Nebo ``DATABASE_URL`` / ``postgresql://USER:PASS@HOST:5432/ems`` (na hostu HOST=127.0.0.1
nebo EMS_DB_BIND, ne ``db`` — to je jen uvnitř Docker sítě).
Příklad (syntetická FVE, flat nákup):
python3 scripts/analysis/battery_sizing_screen.py --db \\
--date-from 2024-04-01 --date-to 2026-04-01 \\
--battery-kwh 12.5 32 48 --load-kw 1.2 \\
--pv-daily-kwh-summer 55 --pv-daily-kwh-winter 12 \\
--sell-margin-fixed -0.02 --buy-vat-kwh 4.443 --capex-per-kwh 9000
Příklad (PVGIS měsíční E_d + NT/VT):
python3 scripts/analysis/battery_sizing_screen.py --db \\
--pvgis-csv pole_A.csv --pvgis-csv pole_B.csv \\
--buy-nt-kwh 5.25 --buy-vt-surcharge-kwh 2.0 --nt-from-hour 22 --nt-to-hour 6 \\
... (ostatní jako výše)
Vyžaduje: pip install pulp (volitelně psycopg2 pro --db).
Omezení modelu: FVE buď syntetický denní tvar (--pv-daily-kwh-*), nebo součet měsíčních
E_d z PVGIS CSV (--pvgis-csv, opakovat pro více orientací); denní energie = E_d měsíce
× normalizovaný tvar (stejný profil každý den v měsíci). Nákup: buď flat (--buy-vat-kwh),
nebo NT/VT podle hodin Europe/Prague: --buy-nt-kwh, VT = NT + --buy-vt-surcharge-kwh,
okno NT --nt-from-hour až --nt-to-hour (přes půlnoc, pokud from > to). Mikroinvertory / GEN
nejsou; zelený bonus není v účelové funkci. Výsledek = screening, ne nabídka.
"""
from __future__ import annotations
import argparse
import csv
import math
import os
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Iterable, Sequence, Mapping
try:
import pulp
except ImportError:
print("Instaluj PuLP: pip install pulp", file=sys.stderr)
raise
DT_H = 0.25 # 15 min
SLOTS_PER_DAY = 96
@dataclass
class SiteLimits:
max_export_w: float = 16_000.0
max_import_w: float = 17_000.0
inv_batt_max_w: float = 12_000.0 # strop střídače z baterie / nabíjení
c_rate: float = 0.5 # P_batt = min(c_rate * E_kWh * 1000, inv_batt_max_w)
eta_charge: float = 0.95
eta_discharge: float = 0.95
soc_min_frac: float = 0.10
soc_max_frac: float = 0.95
def batt_power_cap_w(usable_kwh: float, site: SiteLimits) -> float:
return min(site.c_rate * usable_kwh * 1000.0, site.inv_batt_max_w)
def summer_day(d: date) -> bool:
m = d.month
return m >= 4 and m <= 9
def pv_shape_96() -> list[float]:
"""Nenormalizovaný denní tvar (96 slotů), plocha = 1 po normalizaci."""
w = [0.0] * SLOTS_PER_DAY
for t in range(SLOTS_PER_DAY):
h = t / 4.0 # hodiny od půlnoci
if 5.5 <= h <= 20.5:
w[t] = max(0.0, math.sin(math.pi * (h - 5.5) / 15.0)) ** 1.2
else:
w[t] = 0.0
s = sum(w)
if s <= 0:
return [1.0 / SLOTS_PER_DAY] * SLOTS_PER_DAY
return [x / s for x in w]
def daily_pv_wh(d: date, summer_kwh: float, winter_kwh: float, shape: Sequence[float]) -> list[float]:
base = summer_kwh if summer_day(d) else winter_kwh
return [base * 1000.0 * sh for sh in shape]
def load_pvgis_monthly_ed_kwh(path: Path) -> dict[int, float]:
"""Z PVGIS CSV (Fixed angle) načte E_d [kWh/d] pro měsíce 112."""
text = path.read_text(encoding="utf-8", errors="replace").splitlines()
start: int | None = None
for i, line in enumerate(text):
if line.strip().startswith("Fixed angle"):
start = i + 2
break
if start is None:
raise ValueError(f"PVGIS: řádek 'Fixed angle' nenalezen: {path}")
out: dict[int, float] = {}
for line in text[start:]:
cells = [c.strip() for c in line.split("\t") if c.strip() != ""]
if not cells:
continue
if cells[0] == "Year":
break
try:
month = int(cells[0])
except ValueError:
continue
if not (1 <= month <= 12):
continue
out[month] = float(cells[1].replace(",", "."))
if len(out) != 12:
raise ValueError(f"PVGIS: očekáváno 12 měsíců E_d v {path}, mám {sorted(out.keys())}")
return out
def merge_pvgis_monthly_ed_kwh(paths: Sequence[Path]) -> dict[int, float]:
"""Sečte E_d jednotlivých polí (např. dvě orientace)."""
total = {m: 0.0 for m in range(1, 13)}
for p in paths:
part = load_pvgis_monthly_ed_kwh(Path(p))
for m in range(1, 13):
total[m] += part[m]
return total
def daily_pv_wh_monthly(d: date, monthly_ed_kwh: Mapping[int, float], shape: Sequence[float]) -> list[float]:
kwh = float(monthly_ed_kwh[d.month])
return [kwh * 1000.0 * sh for sh in shape]
def buy_prices_96_nt_vt(
nt_kwh: float,
vt_kwh: float,
nt_from_hour: int,
nt_to_hour: int,
) -> list[float]:
"""
96 cen nákupu [Kč/kWh] podle začátku 15min slotu (hodina 023, Europe/Prague).
Pokud nt_from_hour > nt_to_hour: NT pro hodiny >= from nebo < to (přes půlnoc).
Jinak NT pro from <= h < to.
"""
out: list[float] = []
for t in range(SLOTS_PER_DAY):
h = t // 4
if nt_from_hour > nt_to_hour:
is_nt = h >= nt_from_hour or h < nt_to_hour
else:
is_nt = nt_from_hour <= h < nt_to_hour
out.append(nt_kwh if is_nt else vt_kwh)
return out
def daily_load_wh(load_kw: float) -> list[float]:
e_per_slot = load_kw * 1000.0 * DT_H
return [e_per_slot] * SLOTS_PER_DAY
def effective_sell_kc_kwh(raw_ote: float, margin_fixed: float, margin_pct: float) -> float:
return raw_ote + margin_fixed + (raw_ote * margin_pct / 100.0)
def load_env_file(path: Path) -> None:
if not path.is_file():
return
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k, v = k.strip(), v.strip().strip('"').strip("'")
if not k:
continue
if k not in os.environ or os.environ.get(k, "") == "":
os.environ[k] = v
def apply_auto_env_files() -> None:
"""Na produkci: /opt/ems-deploy/.env (když tam je docker-compose), pak app/.env nebo kořen repa."""
script = Path(__file__).resolve()
if len(script.parents) >= 3:
deploy_root = script.parents[2]
if (deploy_root / "docker-compose.yml").is_file():
load_env_file(deploy_root / ".env")
if len(script.parents) >= 2:
load_env_file(script.parents[1] / ".env")
def sync_pg_env_from_db_vars() -> None:
if not os.environ.get("PGUSER") and os.environ.get("DB_USER"):
os.environ["PGUSER"] = os.environ["DB_USER"]
if not os.environ.get("PGPASSWORD") and os.environ.get("DB_PASSWORD"):
os.environ["PGPASSWORD"] = os.environ["DB_PASSWORD"]
if not os.environ.get("PGDATABASE"):
os.environ["PGDATABASE"] = "ems"
def load_prices_csv(path: str) -> list[tuple[datetime, float]]:
out: list[tuple[datetime, float]] = []
with open(path, newline="", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
ts = datetime.fromisoformat(row["interval_start"].replace("Z", "+00:00"))
px = float(row["sell_raw_price_czk_kwh"])
out.append((ts, px))
out.sort(key=lambda x: x[0])
return out
def load_prices_db(date_from: date, date_to: date) -> list[tuple[datetime, float]]:
from datetime import timezone
try:
import psycopg2
except ImportError as e:
raise SystemExit("Pro --db instaluj psycopg2-binary nebo použij --price-csv") from e
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
t0 = datetime.combine(date_from, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
t1 = datetime.combine(date_to, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
dsn = (
os.environ.get("DATABASE_URL")
or os.environ.get("EMS_DATABASE_URL")
or os.environ.get("POSTGRES_URL")
)
if dsn:
if dsn.startswith("postgres://"):
dsn = "postgresql://" + dsn[len("postgres://") :]
conn = psycopg2.connect(dsn)
else:
conn = psycopg2.connect(
host=os.environ.get("PGHOST", "127.0.0.1"),
port=int(os.environ.get("PGPORT", "5432")),
dbname=os.environ.get("PGDATABASE", "ems"),
user=os.environ.get("PGUSER", os.environ.get("DB_USER", "ems_user")),
password=os.environ.get("PGPASSWORD", os.environ.get("DB_PASSWORD", "")),
)
cur = conn.cursor()
cur.execute(
"""
SELECT interval_start, sell_raw_price_czk_kwh::float
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start >= %s
AND interval_start < %s
ORDER BY interval_start
""",
(t0, t1),
)
rows = cur.fetchall()
conn.close()
return [(r[0], float(r[1])) for r in rows]
def prices_by_calendar_day(
series: list[tuple[datetime, float]],
) -> dict[date, list[float]]:
"""96 hodnot Kč/kWh (raw OTE) na kalendářní den Europe/Prague."""
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
buckets: dict[date, dict[int, float]] = {}
for ts, px in series:
local = ts.astimezone(prg)
d = local.date()
slot = local.hour * 4 + local.minute // 15
buckets.setdefault(d, {})[slot] = px
out: dict[date, list[float]] = {}
for d, mp in buckets.items():
if len(mp) < SLOTS_PER_DAY:
continue
out[d] = [mp[i] for i in range(SLOTS_PER_DAY)]
return out
def solve_one_day(
pv_wh: Sequence[float],
load_wh: Sequence[float],
p_sell: Sequence[float],
p_buy: Sequence[float],
e_usable_wh: float,
p_batt_w: float,
site: SiteLimits,
soc_start_wh: float,
) -> tuple[float, float, float, float]:
"""
Vrátí (cash_kc, soc_end_wh, curtailed_wh, discharged_wh_sum).
cash = příjem z exportu nákup z DS (jen energie, Kč).
"""
e_min = site.soc_min_frac * e_usable_wh
e_max = site.soc_max_frac * e_usable_wh
max_ch = p_batt_w * DT_H
max_dis = p_batt_w * DT_H
max_exp = site.max_export_w * DT_H
max_imp = site.max_import_w * DT_H
prob = pulp.LpProblem("ems_day", pulp.LpMaximize)
soc = pulp.LpVariable.dicts("soc", range(SLOTS_PER_DAY + 1), lowBound=e_min, upBound=e_max)
ch = pulp.LpVariable.dicts("ch", range(SLOTS_PER_DAY), lowBound=0)
dis = pulp.LpVariable.dicts("dis", range(SLOTS_PER_DAY), lowBound=0)
gexp = pulp.LpVariable.dicts("gexp", range(SLOTS_PER_DAY), lowBound=0)
gimp = pulp.LpVariable.dicts("gimp", range(SLOTS_PER_DAY), lowBound=0)
curt = pulp.LpVariable.dicts("curt", range(SLOTS_PER_DAY), lowBound=0)
prob += soc[0] == soc_start_wh
obj = []
for t in range(SLOTS_PER_DAY):
prob += ch[t] <= max_ch
prob += dis[t] <= max_dis
prob += gexp[t] <= max_exp
prob += gimp[t] <= max_imp
prob += curt[t] <= pv_wh[t]
prob += (
pv_wh[t] - curt[t] + dis[t] + gimp[t] == load_wh[t] + ch[t] + gexp[t]
), f"balance_{t}"
prob += (
soc[t + 1]
== soc[t] + site.eta_charge * ch[t] - dis[t] / site.eta_discharge
), f"socdyn_{t}"
obj.append(p_sell[t] * gexp[t] / 1000.0 - p_buy[t] * gimp[t] / 1000.0)
prob += pulp.lpSum(obj)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=60)
prob.solve(solver)
if prob.status != pulp.LpStatusOptimal:
raise RuntimeError(f"LP status {pulp.LpStatus[prob.status]}")
cash = float(pulp.value(prob.objective))
soc_end = float(pulp.value(soc[SLOTS_PER_DAY]))
curt_total = sum(float(pulp.value(curt[t])) for t in range(SLOTS_PER_DAY))
dis_total = sum(float(pulp.value(dis[t])) for t in range(SLOTS_PER_DAY))
return cash, soc_end, curt_total, dis_total
def simulate_year(
days: Iterable[date],
px_day: dict[date, list[float]],
usable_kwh: float,
site: SiteLimits,
sell_margin_fixed: float,
sell_margin_pct: float,
buy_flat_kwh: float,
buy_prices_96: Sequence[float] | None,
summer_kwh: float,
winter_kwh: float,
load_kw: float,
shape: Sequence[float],
monthly_ed_kwh: Mapping[int, float] | None,
) -> dict[str, float]:
e_wh = usable_kwh * 1000.0
p_batt = batt_power_cap_w(usable_kwh, site)
load_wh = daily_load_wh(load_kw)
if buy_prices_96 is not None:
if len(buy_prices_96) != SLOTS_PER_DAY:
raise ValueError("buy_prices_96 musí mít 96 hodnot")
p_buy_day: Sequence[float] = buy_prices_96
else:
p_buy_day = [buy_flat_kwh] * SLOTS_PER_DAY
cash_total = 0.0
curt_total = 0.0
dis_total = 0.0
soc_state = 0.5 * (site.soc_min_frac + site.soc_max_frac) * e_wh
n_days = 0
for d in days:
if d not in px_day:
continue
raw = px_day[d]
p_sell = [effective_sell_kc_kwh(x, sell_margin_fixed, sell_margin_pct) for x in raw]
if monthly_ed_kwh is not None:
pv_wh = daily_pv_wh_monthly(d, monthly_ed_kwh, shape)
else:
pv_wh = daily_pv_wh(d, summer_kwh, winter_kwh, shape)
cash, soc_state, curt, dis = solve_one_day(
pv_wh, load_wh, p_sell, p_buy_day, e_wh, p_batt, site, soc_state
)
cash_total += cash
curt_total += curt
dis_total += dis
n_days += 1
feq = (dis_total / e_wh / n_days) if n_days and e_wh > 0 else 0.0
return {
"cash_kc": cash_total,
"days": float(n_days),
"curt_wh": curt_total,
"dis_wh": dis_total,
"feq_cycles_per_day": feq,
}
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--db", action="store_true", help="Načti OTE z Postgres (env PG* / DB_* / DATABASE_URL)")
ap.add_argument(
"--no-auto-env",
action="store_true",
help="Nenačítej automaticky .env z ems-deploy/ ani app/",
)
ap.add_argument("--env-file", type=str, default="", help="Dodatečný soubor .env (po auto-env)")
ap.add_argument(
"--pg-host",
type=str,
default="",
help="Přepíše PGHOST (např. stejná IP jako EMS_DB_BIND ve compose)",
)
ap.add_argument("--price-csv", type=str, default="", help="CSV: interval_start, sell_raw_price_czk_kwh")
ap.add_argument("--date-from", type=str, required=True)
ap.add_argument("--date-to", type=str, required=True)
ap.add_argument("--battery-kwh", type=float, nargs="+", required=True, help="Užitkové kWh (např. 12.5 32 48)")
ap.add_argument("--load-kw", type=float, default=1.0, help="Průměrný odběr (konstanta přes den)")
ap.add_argument("--pv-daily-kwh-summer", type=float, default=50.0)
ap.add_argument("--pv-daily-kwh-winter", type=float, default=10.0)
ap.add_argument("--sell-margin-fixed", type=float, default=-0.02)
ap.add_argument("--sell-margin-pct", type=float, default=0.0)
ap.add_argument(
"--buy-vat-kwh",
type=float,
default=4.443,
help="Flat nákup Kč/kWh (když není --buy-nt-kwh)",
)
ap.add_argument(
"--buy-nt-kwh",
type=float,
default=None,
help="NT cena Kč/kWh; VT = NT + --buy-vt-surcharge-kwh; okno --nt-from-hour / --nt-to-hour (Europe/Prague)",
)
ap.add_argument(
"--buy-vt-surcharge-kwh",
type=float,
default=0.0,
help="Příplatek VT oproti NT (jako buy_fixed_vt_surcharge v EMS)",
)
ap.add_argument("--nt-from-hour", type=int, default=22, help="Začátek NT (hodina 023)")
ap.add_argument("--nt-to-hour", type=int, default=6, help="Konec NT: první hodina VT (023); přes půlnoc pokud from > to")
ap.add_argument(
"--pvgis-csv",
action="append",
default=[],
metavar="PATH",
help="PVGIS měsíční E_d (Fixed angle); opakovat pro více polí/orientací, energie se sečte",
)
ap.add_argument("--max-export-w", type=float, default=16_000.0)
ap.add_argument("--max-import-w", type=float, default=17_000.0)
ap.add_argument("--inv-batt-max-w", type=float, default=12_000.0)
ap.add_argument("--c-rate", type=float, default=0.5)
ap.add_argument("--capex-per-kwh", type=float, default=0.0, help="CAPEX za 1 kWh rozšíření; vypíše jednoduchou návratnost vs. nejmenší baterie")
args = ap.parse_args()
d0 = date.fromisoformat(args.date_from)
d1 = date.fromisoformat(args.date_to)
if args.db:
if not args.no_auto_env:
apply_auto_env_files()
if args.env_file:
load_env_file(Path(args.env_file))
sync_pg_env_from_db_vars()
if args.pg_host:
os.environ["PGHOST"] = args.pg_host
series = load_prices_db(d0, d1)
elif args.price_csv:
series = load_prices_csv(args.price_csv)
else:
ap.error("Zadej --db nebo --price-csv")
px_day = prices_by_calendar_day(series)
shape = pv_shape_96()
site = SiteLimits(
max_export_w=args.max_export_w,
max_import_w=args.max_import_w,
inv_batt_max_w=args.inv_batt_max_w,
c_rate=args.c_rate,
)
monthly_ed: dict[int, float] | None = None
if args.pvgis_csv:
monthly_ed = merge_pvgis_monthly_ed_kwh([Path(p) for p in args.pvgis_csv])
if args.buy_nt_kwh is not None:
vt = args.buy_nt_kwh + args.buy_vt_surcharge_kwh
buy_prices_96 = buy_prices_96_nt_vt(
args.buy_nt_kwh,
vt,
args.nt_from_hour,
args.nt_to_hour,
)
buy_flat = args.buy_vat_kwh
else:
buy_prices_96 = None
buy_flat = args.buy_vat_kwh
day_list = [d0 + timedelta(days=i) for i in range((d1 - d0).days)]
results = []
for kwh in sorted(args.battery_kwh):
r = simulate_year(
day_list,
px_day,
kwh,
site,
args.sell_margin_fixed,
args.sell_margin_pct,
buy_flat,
buy_prices_96,
args.pv_daily_kwh_summer,
args.pv_daily_kwh_winter,
args.load_kw,
shape,
monthly_ed,
)
results.append((kwh, r))
baseline_kwh = min(args.battery_kwh)
base = dict(results)[baseline_kwh]
print("Parametry: prodej = OTE + sell_margin_fixed (+ %)")
if buy_prices_96 is not None:
vt_show = args.buy_nt_kwh + args.buy_vt_surcharge_kwh
print(
f" Nákup = NT/VT: NT {args.buy_nt_kwh} Kč/kWh, VT {vt_show} Kč/kWh "
f"(okno NT {args.nt_from_hour:02d}{args.nt_to_hour:02d} h lokální)"
)
else:
print(f" Nákup = flat {args.buy_vat_kwh} Kč/kWh")
if monthly_ed is not None:
edv = [monthly_ed[m] for m in range(1, 13)]
print(
f" FVE = PVGIS měsíční E_d (součet {len(args.pvgis_csv)} souborů), "
f"rozsah {min(edv):.1f}{max(edv):.1f} kWh/d, denní tvar = syntetika"
)
else:
print(
f" FVE = syntetický den, léto {args.pv_daily_kwh_summer} kWh/d, "
f"zima {args.pv_daily_kwh_winter} kWh/d"
)
print(f" Load (konstanta) {args.load_kw} kW")
print(f" Limity: export {args.max_export_w} W, import {args.max_import_w} W, P_batt = min({args.c_rate}*E_kWh, {args.inv_batt_max_w} W)")
print()
print(f"{'kWh':>8} {'P_batt_kW':>10} {'cash_kc/rok':>14} {'Δ vs min':>12} {'curt_MWh/y':>12} {'Feq/den':>8}")
for kwh, r in results:
pkw = batt_power_cap_w(kwh, site) / 1000.0
days = max(int(r["days"]), 1)
cash_y = r["cash_kc"] * (365.0 / days)
curt_mwh = r["curt_wh"] / 1e6 * (365.0 / days)
delta = cash_y - base["cash_kc"] * (365.0 / days) if kwh != baseline_kwh else 0.0
print(
f"{kwh:8.1f} {pkw:10.2f} {cash_y:14.0f} {delta:12.0f} {curt_mwh:12.2f} {r['feq_cycles_per_day']:8.2f}"
)
if args.capex_per_kwh > 0:
print()
base_cash = base["cash_kc"] * (365.0 / max(int(base["days"]), 1))
for kwh, r in results:
if kwh <= baseline_kwh:
continue
cash_y = r["cash_kc"] * (365.0 / max(int(r["days"]), 1))
delta = cash_y - base_cash
extra_kwh = kwh - baseline_kwh
capex = extra_kwh * args.capex_per_kwh
if delta > 0:
years = capex / delta
print(
f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh CAPEX ~{capex:,.0f} Kč, "
f"odhad +{delta:,.0f} Kč/rok → návratnost ~{years:.1f} r"
)
else:
print(f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh: model neukazuje vyšší roční cash ({delta:,.0f} Kč/rok)")
if __name__ == "__main__":
main()