Files
ems/scripts/analysis/battery_sizing_screen.py
Dusan Vojacek 3b33594354
All checks were successful
test / smoke-test (push) Successful in 3s
deploy / deploy (push) Successful in 14s
fix enf load
2026-04-12 22:31:18 +02:00

442 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ekonomický screening velikosti baterie (15min, jednobusová energie).
Typicky BA81: fixní nákup + prodej spot, limity výkonu z baterie min(0,5C, střídač),
export/import podle připojení. Načte OTE z Postgres (stejná DB jako EMS) nebo z CSV.
Připojení k DB (deploy / Docker):
- Postgres v compose poslouchá na ``EMS_DB_BIND:5432`` (výchozí 127.0.0.1). ``connection refused``
= služba ``db`` neběží, nebo je port vázaný jen na jinou IP (WireGuard) → nastav stejný host.
- Skript načte první nalezené ``.env`` z ``…/ems-deploy/.env`` nebo ``…/app/.env`` (není-li
``--no-auto-env``) a doplní ``PGUSER``/``PGPASSWORD`` z ``DB_USER``/``DB_PASSWORD``.
- Nebo ``DATABASE_URL`` / ``postgresql://USER:PASS@HOST:5432/ems`` (na hostu HOST=127.0.0.1
nebo EMS_DB_BIND, ne ``db`` — to je jen uvnitř Docker sítě).
Příklad:
python3 scripts/analysis/battery_sizing_screen.py \\
--db \\
--date-from 2024-04-01 --date-to 2026-04-01 \\
--battery-kwh 12.5 32 48 \\
--load-kw 1.2 \\
--pv-daily-kwh-summer 55 --pv-daily-kwh-winter 12 \\
--sell-margin-fixed -0.02 \\
--buy-vat-kwh 4.443 \\
--capex-per-kwh 9000
Vyžaduje: pip install pulp (volitelně psycopg2 pro --db).
Omezení modelu: syntetický denní tvar FVE (kalibruj --pv-daily-kwh-* podle měření);
mikroinvertory / GEN nejsou; zelený bonus není v účelové funkci; nákup je jedna flat
sazba vč. DPH (reálné NT/VT přes HDO přidej později). Výsledek = screening, ne nabídka.
"""
from __future__ import annotations
import argparse
import csv
import math
import os
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Iterable, Sequence
try:
import pulp
except ImportError:
print("Instaluj PuLP: pip install pulp", file=sys.stderr)
raise
DT_H = 0.25 # 15 min
SLOTS_PER_DAY = 96
@dataclass
class SiteLimits:
max_export_w: float = 16_000.0
max_import_w: float = 17_000.0
inv_batt_max_w: float = 12_000.0 # strop střídače z baterie / nabíjení
c_rate: float = 0.5 # P_batt = min(c_rate * E_kWh * 1000, inv_batt_max_w)
eta_charge: float = 0.95
eta_discharge: float = 0.95
soc_min_frac: float = 0.10
soc_max_frac: float = 0.95
def batt_power_cap_w(usable_kwh: float, site: SiteLimits) -> float:
return min(site.c_rate * usable_kwh * 1000.0, site.inv_batt_max_w)
def summer_day(d: date) -> bool:
m = d.month
return m >= 4 and m <= 9
def pv_shape_96() -> list[float]:
"""Nenormalizovaný denní tvar (96 slotů), plocha = 1 po normalizaci."""
w = [0.0] * SLOTS_PER_DAY
for t in range(SLOTS_PER_DAY):
h = t / 4.0 # hodiny od půlnoci
if 5.5 <= h <= 20.5:
w[t] = max(0.0, math.sin(math.pi * (h - 5.5) / 15.0)) ** 1.2
else:
w[t] = 0.0
s = sum(w)
if s <= 0:
return [1.0 / SLOTS_PER_DAY] * SLOTS_PER_DAY
return [x / s for x in w]
def daily_pv_wh(d: date, summer_kwh: float, winter_kwh: float, shape: Sequence[float]) -> list[float]:
base = summer_kwh if summer_day(d) else winter_kwh
return [base * 1000.0 * sh for sh in shape]
def daily_load_wh(load_kw: float) -> list[float]:
e_per_slot = load_kw * 1000.0 * DT_H
return [e_per_slot] * SLOTS_PER_DAY
def effective_sell_kc_kwh(raw_ote: float, margin_fixed: float, margin_pct: float) -> float:
return raw_ote + margin_fixed + (raw_ote * margin_pct / 100.0)
def load_env_file(path: Path) -> None:
if not path.is_file():
return
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
k, v = k.strip(), v.strip().strip('"').strip("'")
if not k:
continue
if k not in os.environ or os.environ.get(k, "") == "":
os.environ[k] = v
def apply_auto_env_files() -> None:
"""Na produkci: /opt/ems-deploy/.env (když tam je docker-compose), pak app/.env nebo kořen repa."""
script = Path(__file__).resolve()
if len(script.parents) >= 3:
deploy_root = script.parents[2]
if (deploy_root / "docker-compose.yml").is_file():
load_env_file(deploy_root / ".env")
if len(script.parents) >= 2:
load_env_file(script.parents[1] / ".env")
def sync_pg_env_from_db_vars() -> None:
if not os.environ.get("PGUSER") and os.environ.get("DB_USER"):
os.environ["PGUSER"] = os.environ["DB_USER"]
if not os.environ.get("PGPASSWORD") and os.environ.get("DB_PASSWORD"):
os.environ["PGPASSWORD"] = os.environ["DB_PASSWORD"]
if not os.environ.get("PGDATABASE"):
os.environ["PGDATABASE"] = "ems"
def load_prices_csv(path: str) -> list[tuple[datetime, float]]:
out: list[tuple[datetime, float]] = []
with open(path, newline="", encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
ts = datetime.fromisoformat(row["interval_start"].replace("Z", "+00:00"))
px = float(row["sell_raw_price_czk_kwh"])
out.append((ts, px))
out.sort(key=lambda x: x[0])
return out
def load_prices_db(date_from: date, date_to: date) -> list[tuple[datetime, float]]:
from datetime import timezone
try:
import psycopg2
except ImportError as e:
raise SystemExit("Pro --db instaluj psycopg2-binary nebo použij --price-csv") from e
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
t0 = datetime.combine(date_from, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
t1 = datetime.combine(date_to, datetime.min.time(), tzinfo=prg).astimezone(timezone.utc)
dsn = (
os.environ.get("DATABASE_URL")
or os.environ.get("EMS_DATABASE_URL")
or os.environ.get("POSTGRES_URL")
)
if dsn:
if dsn.startswith("postgres://"):
dsn = "postgresql://" + dsn[len("postgres://") :]
conn = psycopg2.connect(dsn)
else:
conn = psycopg2.connect(
host=os.environ.get("PGHOST", "127.0.0.1"),
port=int(os.environ.get("PGPORT", "5432")),
dbname=os.environ.get("PGDATABASE", "ems"),
user=os.environ.get("PGUSER", os.environ.get("DB_USER", "ems_user")),
password=os.environ.get("PGPASSWORD", os.environ.get("DB_PASSWORD", "")),
)
cur = conn.cursor()
cur.execute(
"""
SELECT interval_start, sell_raw_price_czk_kwh::float
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start >= %s
AND interval_start < %s
ORDER BY interval_start
""",
(t0, t1),
)
rows = cur.fetchall()
conn.close()
return [(r[0], float(r[1])) for r in rows]
def prices_by_calendar_day(
series: list[tuple[datetime, float]],
) -> dict[date, list[float]]:
"""96 hodnot Kč/kWh (raw OTE) na kalendářní den Europe/Prague."""
from zoneinfo import ZoneInfo
prg = ZoneInfo("Europe/Prague")
buckets: dict[date, dict[int, float]] = {}
for ts, px in series:
local = ts.astimezone(prg)
d = local.date()
slot = local.hour * 4 + local.minute // 15
buckets.setdefault(d, {})[slot] = px
out: dict[date, list[float]] = {}
for d, mp in buckets.items():
if len(mp) < SLOTS_PER_DAY:
continue
out[d] = [mp[i] for i in range(SLOTS_PER_DAY)]
return out
def solve_one_day(
pv_wh: Sequence[float],
load_wh: Sequence[float],
p_sell: Sequence[float],
p_buy_flat: float,
e_usable_wh: float,
p_batt_w: float,
site: SiteLimits,
soc_start_wh: float,
) -> tuple[float, float, float, float]:
"""
Vrátí (cash_kc, soc_end_wh, curtailed_wh, discharged_wh_sum).
cash = příjem z exportu nákup z DS (jen energie, Kč).
"""
e_min = site.soc_min_frac * e_usable_wh
e_max = site.soc_max_frac * e_usable_wh
max_ch = p_batt_w * DT_H
max_dis = p_batt_w * DT_H
max_exp = site.max_export_w * DT_H
max_imp = site.max_import_w * DT_H
prob = pulp.LpProblem("ems_day", pulp.LpMaximize)
soc = pulp.LpVariable.dicts("soc", range(SLOTS_PER_DAY + 1), lowBound=e_min, upBound=e_max)
ch = pulp.LpVariable.dicts("ch", range(SLOTS_PER_DAY), lowBound=0)
dis = pulp.LpVariable.dicts("dis", range(SLOTS_PER_DAY), lowBound=0)
gexp = pulp.LpVariable.dicts("gexp", range(SLOTS_PER_DAY), lowBound=0)
gimp = pulp.LpVariable.dicts("gimp", range(SLOTS_PER_DAY), lowBound=0)
curt = pulp.LpVariable.dicts("curt", range(SLOTS_PER_DAY), lowBound=0)
prob += soc[0] == soc_start_wh
obj = []
for t in range(SLOTS_PER_DAY):
prob += ch[t] <= max_ch
prob += dis[t] <= max_dis
prob += gexp[t] <= max_exp
prob += gimp[t] <= max_imp
prob += curt[t] <= pv_wh[t]
prob += (
pv_wh[t] - curt[t] + dis[t] + gimp[t] == load_wh[t] + ch[t] + gexp[t]
), f"balance_{t}"
prob += (
soc[t + 1]
== soc[t] + site.eta_charge * ch[t] - dis[t] / site.eta_discharge
), f"socdyn_{t}"
obj.append(p_sell[t] * gexp[t] / 1000.0 - p_buy_flat * gimp[t] / 1000.0)
prob += pulp.lpSum(obj)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=60)
prob.solve(solver)
if prob.status != pulp.LpStatusOptimal:
raise RuntimeError(f"LP status {pulp.LpStatus[prob.status]}")
cash = float(pulp.value(prob.objective))
soc_end = float(pulp.value(soc[SLOTS_PER_DAY]))
curt_total = sum(float(pulp.value(curt[t])) for t in range(SLOTS_PER_DAY))
dis_total = sum(float(pulp.value(dis[t])) for t in range(SLOTS_PER_DAY))
return cash, soc_end, curt_total, dis_total
def simulate_year(
days: Iterable[date],
px_day: dict[date, list[float]],
usable_kwh: float,
site: SiteLimits,
sell_margin_fixed: float,
sell_margin_pct: float,
buy_vat_kwh: float,
summer_kwh: float,
winter_kwh: float,
load_kw: float,
shape: Sequence[float],
) -> dict[str, float]:
e_wh = usable_kwh * 1000.0
p_batt = batt_power_cap_w(usable_kwh, site)
load_wh = daily_load_wh(load_kw)
cash_total = 0.0
curt_total = 0.0
dis_total = 0.0
soc_state = 0.5 * (site.soc_min_frac + site.soc_max_frac) * e_wh
n_days = 0
for d in days:
if d not in px_day:
continue
raw = px_day[d]
p_sell = [effective_sell_kc_kwh(x, sell_margin_fixed, sell_margin_pct) for x in raw]
pv_wh = daily_pv_wh(d, summer_kwh, winter_kwh, shape)
cash, soc_state, curt, dis = solve_one_day(
pv_wh, load_wh, p_sell, buy_vat_kwh, e_wh, p_batt, site, soc_state
)
cash_total += cash
curt_total += curt
dis_total += dis
n_days += 1
feq = (dis_total / e_wh / n_days) if n_days and e_wh > 0 else 0.0
return {
"cash_kc": cash_total,
"days": float(n_days),
"curt_wh": curt_total,
"dis_wh": dis_total,
"feq_cycles_per_day": feq,
}
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--db", action="store_true", help="Načti OTE z Postgres (env PG* / DB_* / DATABASE_URL)")
ap.add_argument(
"--no-auto-env",
action="store_true",
help="Nenačítej automaticky .env z ems-deploy/ ani app/",
)
ap.add_argument("--env-file", type=str, default="", help="Dodatečný soubor .env (po auto-env)")
ap.add_argument(
"--pg-host",
type=str,
default="",
help="Přepíše PGHOST (např. stejná IP jako EMS_DB_BIND ve compose)",
)
ap.add_argument("--price-csv", type=str, default="", help="CSV: interval_start, sell_raw_price_czk_kwh")
ap.add_argument("--date-from", type=str, required=True)
ap.add_argument("--date-to", type=str, required=True)
ap.add_argument("--battery-kwh", type=float, nargs="+", required=True, help="Užitkové kWh (např. 12.5 32 48)")
ap.add_argument("--load-kw", type=float, default=1.0, help="Průměrný odběr (konstanta přes den)")
ap.add_argument("--pv-daily-kwh-summer", type=float, default=50.0)
ap.add_argument("--pv-daily-kwh-winter", type=float, default=10.0)
ap.add_argument("--sell-margin-fixed", type=float, default=-0.02)
ap.add_argument("--sell-margin-pct", type=float, default=0.0)
ap.add_argument("--buy-vat-kwh", type=float, default=4.443, help="Efektivní nákup Kč/kWh vč. DPH (flat screening)")
ap.add_argument("--max-export-w", type=float, default=16_000.0)
ap.add_argument("--max-import-w", type=float, default=17_000.0)
ap.add_argument("--inv-batt-max-w", type=float, default=12_000.0)
ap.add_argument("--c-rate", type=float, default=0.5)
ap.add_argument("--capex-per-kwh", type=float, default=0.0, help="CAPEX za 1 kWh rozšíření; vypíše jednoduchou návratnost vs. nejmenší baterie")
args = ap.parse_args()
d0 = date.fromisoformat(args.date_from)
d1 = date.fromisoformat(args.date_to)
if args.db:
if not args.no_auto_env:
apply_auto_env_files()
if args.env_file:
load_env_file(Path(args.env_file))
sync_pg_env_from_db_vars()
if args.pg_host:
os.environ["PGHOST"] = args.pg_host
series = load_prices_db(d0, d1)
elif args.price_csv:
series = load_prices_csv(args.price_csv)
else:
ap.error("Zadej --db nebo --price-csv")
px_day = prices_by_calendar_day(series)
shape = pv_shape_96()
site = SiteLimits(
max_export_w=args.max_export_w,
max_import_w=args.max_import_w,
inv_batt_max_w=args.inv_batt_max_w,
c_rate=args.c_rate,
)
day_list = [d0 + timedelta(days=i) for i in range((d1 - d0).days)]
results = []
for kwh in sorted(args.battery_kwh):
r = simulate_year(
day_list,
px_day,
kwh,
site,
args.sell_margin_fixed,
args.sell_margin_pct,
args.buy_vat_kwh,
args.pv_daily_kwh_summer,
args.pv_daily_kwh_winter,
args.load_kw,
shape,
)
results.append((kwh, r))
baseline_kwh = min(args.battery_kwh)
base = dict(results)[baseline_kwh]
print("Parametry: prodej = OTE + sell_margin_fixed (+ %), nákup = flat buy_vat_kwh")
print(f" FVE tvar = syntetický den, léto {args.pv_daily_kwh_summer} kWh/d, zima {args.pv_daily_kwh_winter} kWh/d, load {args.load_kw} kW")
print(f" Limity: export {args.max_export_w} W, import {args.max_import_w} W, P_batt = min({args.c_rate}*E_kWh, {args.inv_batt_max_w} W)")
print()
print(f"{'kWh':>8} {'P_batt_kW':>10} {'cash_kc/rok':>14} {'Δ vs min':>12} {'curt_MWh/y':>12} {'Feq/den':>8}")
for kwh, r in results:
pkw = batt_power_cap_w(kwh, site) / 1000.0
days = max(int(r["days"]), 1)
cash_y = r["cash_kc"] * (365.0 / days)
curt_mwh = r["curt_wh"] / 1e6 * (365.0 / days)
delta = cash_y - base["cash_kc"] * (365.0 / days) if kwh != baseline_kwh else 0.0
print(
f"{kwh:8.1f} {pkw:10.2f} {cash_y:14.0f} {delta:12.0f} {curt_mwh:12.2f} {r['feq_cycles_per_day']:8.2f}"
)
if args.capex_per_kwh > 0:
print()
base_cash = base["cash_kc"] * (365.0 / max(int(base["days"]), 1))
for kwh, r in results:
if kwh <= baseline_kwh:
continue
cash_y = r["cash_kc"] * (365.0 / max(int(r["days"]), 1))
delta = cash_y - base_cash
extra_kwh = kwh - baseline_kwh
capex = extra_kwh * args.capex_per_kwh
if delta > 0:
years = capex / delta
print(
f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh CAPEX ~{capex:,.0f} Kč, "
f"odhad +{delta:,.0f} Kč/rok → návratnost ~{years:.1f} r"
)
else:
print(f"vs {baseline_kwh} kWh → +{extra_kwh:.0f} kWh: model neukazuje vyšší roční cash ({delta:,.0f} Kč/rok)")
if __name__ == "__main__":
main()