Files
ems/backend/services/planning_engine.py
Dusan Vojacek 8b4af663d8 Initial commit
Made-with: Cursor
2026-03-20 13:27:44 +01:00

818 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/services/planning_engine.py
#
# EMS Platform plánovací engine
# Obsahuje: hlavní denní plán + rolling 15min replan
#
# Spouštění (APScheduler v main.py):
# scheduler.add_job(run_daily_plan, 'cron', hour=15, minute=0)
# scheduler.add_job(run_rolling_replan, 'cron', minute='*/15')
import time
import logging
from dataclasses import dataclass, replace
from datetime import datetime, timezone, timedelta
from types import SimpleNamespace
from typing import Optional
import pulp
from pulp import HiGHS_CMD
logger = logging.getLogger(__name__)
# ============================================================
# Konstanty
# ============================================================
HORIZON_HOURS = 36 # horizont denního plánu
INTERVAL_H = 0.25 # 15 minut v hodinách
CURTAILMENT_PENALTY = 0.001 # Kč/Wh malá penalizace za omezení FVE pole A
SOLVER_TIME_LIMIT = 10 # sekund
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
# Útlum korekce: čím dál od aktuálního času, tím méně korigujeme forecast
CORRECTION_DECAY_SLOTS = 16 # po 16 slotech (4h) klesne korekce na 0
# ============================================================
# Datové třídy (lze nahradit pydantic modely)
# ============================================================
@dataclass
class PlanningSlot:
interval_start: datetime
buy_price: float # Kč/kWh
sell_price: float # Kč/kWh
pv_a_forecast_w: int # W pole A (řiditelné)
pv_b_forecast_w: int # W pole B (zelený bonus, pevné)
load_baseline_w: int # W predikce bazální spotřeby
ev1_connected: bool
ev2_connected: bool
@dataclass
class DispatchResult:
interval_start: datetime
battery_setpoint_w: int # kladné = nabíjení, záporné = vybíjení
battery_soc_target: float # % SoC na konci intervalu
grid_setpoint_w: int # kladné = import, záporné = export
ev1_setpoint_w: Optional[int]
ev2_setpoint_w: Optional[int]
ev1_via_bat_w: int
ev2_via_bat_w: int
heat_pump_enabled: bool
heat_pump_setpoint_w: int
pv_a_curtailed_w: int
expected_cost_czk: float
effective_buy_price: float
effective_sell_price: float
# ============================================================
# Korekce forecastu na základě skutečné výroby
# ============================================================
async def compute_correction_factor(
site_id: int,
now: datetime,
db,
window_h: float = CORRECTION_WINDOW_H,
) -> tuple[float, dict]:
"""
Spočítá korekční faktor FVE forecastu z posledních window_h hodin.
Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP].
factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný.
"""
window_start = now - timedelta(hours=window_h)
# Skutečná výroba za okno (z telemetrie)
actual = await db.fetchval("""
SELECT COALESCE(SUM(pv_power_w) * 0.25 / 1000.0, 0) -- kWh
FROM ems.telemetry_inverter
WHERE site_id = $1
AND measured_at >= $2 AND measured_at < $3
""", site_id, window_start, now)
# Předpovídaná výroba za stejné okno (z nejnovějšího forecastu který platil tehdy)
forecast = await db.fetchval("""
SELECT COALESCE(SUM(fpi.power_w) * 0.25 / 1000.0, 0)
FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
WHERE fpr.site_id = $1
AND fpi.interval_start >= $2 AND fpi.interval_start < $3
AND fpr.status = 'ok'
AND fpr.created_at = (
SELECT MAX(fpr2.created_at)
FROM ems.forecast_pv_run fpr2
WHERE fpr2.site_id = $1 AND fpr2.status = 'ok'
AND fpr2.created_at <= $2
)
""", site_id, window_start, now)
log_data = {
"window_start": window_start,
"window_end": now,
"actual_pv_wh": actual * 1000,
"forecast_pv_wh": forecast * 1000,
}
# Pokud forecast nebo actual jsou příliš malé (noc, <0.1 kWh) → žádná korekce
if forecast < 0.1 or actual < 0.05:
log_data["correction_factor"] = 1.0
log_data["reason"] = "insufficient_data"
return 1.0, log_data
raw_factor = actual / forecast
factor = max(CORRECTION_MIN_CLAMP, min(CORRECTION_MAX_CLAMP, raw_factor))
log_data["correction_factor"] = factor
log_data["raw_factor"] = raw_factor
return factor, log_data
def apply_forecast_correction(
slots: list[PlanningSlot],
now: datetime,
factor: float,
decay_slots: int = CORRECTION_DECAY_SLOTS,
) -> list[PlanningSlot]:
"""
Aplikuje korekční faktor na FVE forecast zbývajících slotů.
Korekce se lineárně utlumuje: na 1. slotu plná korekce,
na decay_slots-tém slotu žádná korekce.
Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce
"""
corrected = []
for i, slot in enumerate(slots):
if factor == 1.0 or i >= decay_slots:
corrected.append(slot)
continue
# Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots)
weight = 1.0 - (i / decay_slots)
effective_factor = 1.0 + (factor - 1.0) * weight
corrected.append(
replace(
slot,
pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)),
pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)),
)
)
return corrected
# ============================================================
# LP Solver
# ============================================================
def solve_dispatch(
slots: list[PlanningSlot],
battery,
heat_pump,
grid,
ev_sessions: list, # aktivní EV sessions [ev1_session, ev2_session]
vehicles: list, # [vehicle1, vehicle2]
current_soc_wh: float,
current_tuv_temp_c: float,
) -> tuple[list[DispatchResult], int]:
"""
LP solver pro dispatch optimalizaci.
Vrátí (výsledky, solver_duration_ms).
"""
T = len(slots)
EV = len(vehicles) # počet EV (typicky 2)
EV_ROUNDTRIP_FACTOR = 1.0 / (battery.charge_efficiency * battery.discharge_efficiency)
prob = pulp.LpProblem("ems_dispatch", pulp.LpMinimize)
# --- Proměnné ---
gi = [pulp.LpVariable(f"gi_{t}", 0, grid.max_import_power_w) for t in range(T)]
ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)]
bc = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)]
soc = [pulp.LpVariable(f"soc_{t}", battery.reserve_soc_wh, battery.soc_max_wh) for t in range(T)]
ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)]
hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)]
# EV proměnné per vozidlo
ev_direct = [[pulp.LpVariable(f"evd_{e}_{t}", 0,
min(vehicles[e].max_charge_power_w, grid.max_import_power_w))
for t in range(T)] for e in range(EV)]
ev_via_bat = [[pulp.LpVariable(f"evb_{e}_{t}", 0,
vehicles[e].max_charge_power_w)
for t in range(T)] for e in range(EV)]
# --- Účelová funkce ---
prob += pulp.lpSum(
gi[t] * slots[t].buy_price * INTERVAL_H / 1000
- ge[t] * slots[t].sell_price * INTERVAL_H / 1000
+ (bc[t] + bd[t]) * battery.degradation_cost_czk_kwh * INTERVAL_H / 1000
+ pulp.lpSum(
ev_direct[e][t] * slots[t].buy_price * INTERVAL_H / 1000
+ ev_via_bat[e][t] * slots[t].buy_price * EV_ROUNDTRIP_FACTOR * INTERVAL_H / 1000
for e in range(EV)
)
+ ca[t] * CURTAILMENT_PENALTY
for t in range(T)
)
# --- Omezení ---
for t in range(T):
s = slots[t]
pv_a_net = s.pv_a_forecast_w - ca[t]
ev_total_t = pulp.lpSum(ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV))
# Energetická bilance
prob += (
pv_a_net + s.pv_b_forecast_w + gi[t] + bd[t]
== s.load_baseline_w + ev_total_t + hp[t] + bc[t] + ge[t]
)
# SoC kontinuita
soc_prev = current_soc_wh if t == 0 else soc[t - 1]
prob += soc[t] == (
soc_prev
+ bc[t] * battery.charge_efficiency * INTERVAL_H
- bd[t] / battery.discharge_efficiency * INTERVAL_H
)
# ev_via_bat kryto z discharge
prob += pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t]
# Záporná prodejní cena → zakázat export
if s.sell_price < 0:
prob += ge[t] == 0
# Záporná nákupní cena → cap import na reálnou spotřebu
if s.buy_price < 0:
prob += gi[t] <= (
battery.max_charge_power_w
+ sum(v.max_charge_power_w for v in vehicles)
+ heat_pump.rated_heating_power_w
)
# EV limity a připojení
for e in range(EV):
connected = (
(e == 0 and s.ev1_connected) or
(e == 1 and s.ev2_connected)
)
if not connected:
prob += ev_direct[e][t] == 0
prob += ev_via_bat[e][t] == 0
else:
prob += ev_direct[e][t] + ev_via_bat[e][t] <= vehicles[e].max_charge_power_w
# Deadline constraints pro EV
for e, session in enumerate(ev_sessions):
if session and session.target_deadline and session.energy_needed_wh > 0:
t_dl = next(
(t for t, s in enumerate(slots) if s.interval_start >= session.target_deadline),
T - 1
)
prob += pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(t_dl + 1)
if (e == 0 and slots[t].ev1_connected) or (e == 1 and slots[t].ev2_connected)
) >= session.energy_needed_wh
# Nouzový ohřev TUV
if current_tuv_temp_c < heat_pump.tuv_min_temp_c:
prob += hp[0] >= heat_pump.rated_heating_power_w * 0.8
# --- Řešení ---
t_start = time.monotonic()
solver = HiGHS_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT)
status = prob.solve(solver)
duration_ms = int((time.monotonic() - t_start) * 1000)
if pulp.LpStatus[status] != 'Optimal':
raise RuntimeError(f"Solver: {pulp.LpStatus[status]}")
# --- Post-processing ---
results = []
for t in range(T):
hp_raw = pulp.value(hp[t])
hp_on = hp_raw > heat_pump.rated_heating_power_w * 0.3
batt_w = round(pulp.value(bc[t]) - pulp.value(bd[t]))
grid_w = round(pulp.value(gi[t]) - pulp.value(ge[t]))
soc_pct = round(pulp.value(soc[t]) / battery.usable_capacity_wh * 100, 1)
cost = (
pulp.value(gi[t]) * slots[t].buy_price * INTERVAL_H / 1000
- pulp.value(ge[t]) * slots[t].sell_price * INTERVAL_H / 1000
)
results.append(DispatchResult(
interval_start = slots[t].interval_start,
battery_setpoint_w = batt_w,
battery_soc_target = soc_pct,
grid_setpoint_w = grid_w,
ev1_setpoint_w = round(pulp.value(ev_direct[0][t]) + pulp.value(ev_via_bat[0][t]))
if slots[t].ev1_connected else None,
ev2_setpoint_w = round(pulp.value(ev_direct[1][t]) + pulp.value(ev_via_bat[1][t]))
if slots[t].ev2_connected else None,
ev1_via_bat_w = round(pulp.value(ev_via_bat[0][t])),
ev2_via_bat_w = round(pulp.value(ev_via_bat[1][t])),
heat_pump_enabled = hp_on,
heat_pump_setpoint_w = heat_pump.rated_heating_power_w if hp_on else 0,
pv_a_curtailed_w = round(pulp.value(ca[t])),
expected_cost_czk = round(cost, 4),
effective_buy_price = slots[t].buy_price,
effective_sell_price = slots[t].sell_price,
))
return results, duration_ms
# ============================================================
# Denní plán (15:00)
# ============================================================
async def run_daily_plan(site_id: int, db, triggered_by: str = "scheduler:daily") -> tuple[int, int]:
"""
Hlavní denní plánování. Spouštět v 15:00 po importu cen (14:00)
a aktualizaci forecastu (14:30).
Horizont: od začátku aktuálního 15min slotu do +36h.
"""
now = datetime.now(timezone.utc)
horizon_from = _current_slot_start(now)
horizon_to = horizon_from + timedelta(hours=HORIZON_HOURS)
logger.info(f"[site={site_id}] Daily plan: {horizon_from}{horizon_to}")
slots = await _load_slots(site_id, horizon_from, horizon_to, db)
if not slots:
raise RuntimeError(f"No planning slots for site_id={site_id} (prices/forecast horizon?)")
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp = await _load_site_context(
site_id, db
)
results, duration_ms = solve_dispatch(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp
)
run_id = await _save_planning_run(
site_id,
results,
horizon_from,
horizon_to,
run_type="daily",
triggered_by=triggered_by,
replan_from=None,
soc_wh=soc_wh,
duration_ms=duration_ms,
correction=1.0,
db=db,
)
logger.info(f"[site={site_id}] Daily plan done in {duration_ms} ms")
return run_id, duration_ms
# ============================================================
# Rolling replan (každých 15min)
# ============================================================
async def run_rolling_replan(
site_id: int,
db,
*,
triggered_by: str = "scheduler:rolling",
allow_skip: bool = True,
) -> tuple[Optional[int], Optional[int]]:
"""
Rolling replan každých 15 minut.
1. Zjistí aktuální SoC baterie z telemetrie
2. Spočítá korekční faktor FVE forecastu z poslední hodiny
3. Aplikuje korekci na forecast zbytku dne (s útlumem)
4. Spustí solver pro zbývající horizont aktivního plánu
5. Uloží jako nový planning_run (aktivní plán se stane superseded)
Pokud allow_skip=True (scheduler) a horizont je vyčerpaný → vrátí (None, None).
Pokud allow_skip=False (API) → spustí denní plán jako náhradu.
"""
now = datetime.now(timezone.utc)
replan_from = _current_slot_start(now)
active_run = await db.fetchrow("""
SELECT id, horizon_end FROM ems.planning_run
WHERE site_id = $1 AND status = 'active'
ORDER BY created_at DESC LIMIT 1
""", site_id)
if not active_run:
logger.warning(f"[site={site_id}] Rolling replan: no active plan, triggering daily plan")
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
horizon_to = active_run["horizon_end"]
if (horizon_to - replan_from).total_seconds() < 1800:
if allow_skip:
logger.info(f"[site={site_id}] Rolling replan: horizon almost exhausted, skipping")
return None, None
logger.info(f"[site={site_id}] Rolling replan: horizon exhausted, running daily plan")
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
logger.info(f"[site={site_id}] Rolling replan from {replan_from}{horizon_to}")
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp = await _load_site_context(
site_id, db
)
correction_factor, correction_log = await compute_correction_factor(site_id, now, db)
slots = await _load_slots(site_id, replan_from, horizon_to, db)
if not slots:
logger.warning(f"[site={site_id}] Rolling replan: no slots, running daily plan")
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
slots = apply_forecast_correction(slots, now, correction_factor)
results, duration_ms = solve_dispatch(
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp
)
run_id = await _save_planning_run(
site_id,
results,
replan_from,
horizon_to,
run_type="rolling",
triggered_by=triggered_by,
replan_from=replan_from,
soc_wh=soc_wh,
duration_ms=duration_ms,
correction=correction_factor,
db=db,
)
await db.execute(
"""
INSERT INTO ems.forecast_correction_log
(site_id, window_start, window_end, actual_pv_wh, forecast_pv_wh,
correction_factor, applied_to_run_id)
VALUES ($1,$2,$3,$4,$5,$6,$7)
""",
site_id,
correction_log["window_start"],
correction_log["window_end"],
correction_log.get("actual_pv_wh"),
correction_log.get("forecast_pv_wh"),
correction_factor,
run_id,
)
logger.info(
f"[site={site_id}] Rolling replan done in {duration_ms} ms "
f"(correction={correction_factor:.3f})"
)
return run_id, duration_ms
async def run_plan_api(site_id: int, db, plan_type: str, triggered_by: str = "api") -> tuple[int, int]:
"""Ruční / UI spuštění plánu. Vždy vrátí (run_id, solver_duration_ms)."""
pt = plan_type.lower().strip()
if pt == "daily":
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
if pt == "rolling":
rid, ms = await run_rolling_replan(
site_id, db, triggered_by=triggered_by, allow_skip=False
)
if rid is None or ms is None:
raise RuntimeError("Rolling replan did not return a run")
return rid, ms
raise ValueError(f"Unknown plan_type: {plan_type!r} (use daily or rolling)")
# ============================================================
# Pomocné funkce
# ============================================================
def _current_slot_start(dt: datetime) -> datetime:
"""Zaokrouhlí čas dolů na začátek aktuálního 15min slotu."""
minute = (dt.minute // 15) * 15
return dt.replace(minute=minute, second=0, microsecond=0)
def _ev_session_ctx(row) -> Optional[SimpleNamespace]:
"""Kontext deadline constraintu pro jedno EV (nebo None)."""
if row is None or row["target_deadline"] is None:
return None
cap_kwh = row["veh_cap_kwh"]
if cap_kwh is None:
return None
cap_wh = float(cap_kwh) * 1000.0
tgt = row["target_soc_pct"]
if tgt is None:
tgt = row["default_target_soc_pct"]
if tgt is None:
return None
tgt_f = float(tgt)
soc0 = row["soc_at_connect_pct"]
if soc0 is None:
return None
needed_wh = (tgt_f - float(soc0)) / 100.0 * cap_wh
delivered = float(row["energy_delivered_wh"] or 0)
remaining = max(0.0, needed_wh - delivered)
if remaining <= 0:
return None
return SimpleNamespace(
target_deadline=row["target_deadline"],
energy_needed_wh=remaining,
)
async def _load_site_context(site_id: int, db):
"""
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC a TUV pro solver.
"""
brow = await db.fetchrow(
"""
SELECT bat.usable_capacity_wh,
bat.reserve_soc_percent,
bat.max_soc_percent,
bat.charge_efficiency,
bat.discharge_efficiency,
bat.degradation_cost_czk_kwh,
inv.max_charge_power_w,
inv.max_discharge_power_w
FROM ems.asset_battery bat
JOIN ems.asset_inverter inv ON inv.id = bat.inverter_id AND inv.site_id = bat.site_id
WHERE bat.site_id = $1
ORDER BY bat.id
LIMIT 1
""",
site_id,
)
if brow is None:
raise RuntimeError(f"No asset_battery for site_id={site_id}")
uc = float(brow["usable_capacity_wh"])
reserve_wh = float(brow["reserve_soc_percent"]) / 100.0 * uc
soc_max_wh = float(brow["max_soc_percent"]) / 100.0 * uc
battery = SimpleNamespace(
usable_capacity_wh=uc,
reserve_soc_wh=reserve_wh,
soc_max_wh=soc_max_wh,
charge_efficiency=float(brow["charge_efficiency"]),
discharge_efficiency=float(brow["discharge_efficiency"]),
degradation_cost_czk_kwh=float(brow["degradation_cost_czk_kwh"]),
max_charge_power_w=int(brow["max_charge_power_w"]),
max_discharge_power_w=int(brow["max_discharge_power_w"]),
)
hrow = await db.fetchrow(
"""
SELECT COALESCE(rated_heating_power_w, 8000) AS rated_heating_power_w,
COALESCE(tuv_min_temp_c, 45) AS tuv_min_temp_c
FROM ems.asset_heat_pump
WHERE site_id = $1
ORDER BY id
LIMIT 1
""",
site_id,
)
if hrow is None:
heat_pump = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=0.0)
else:
hp_w = int(hrow["rated_heating_power_w"])
heat_pump = SimpleNamespace(
rated_heating_power_w=max(hp_w, 0),
tuv_min_temp_c=float(hrow["tuv_min_temp_c"]),
)
grow = await db.fetchrow(
"""
SELECT max_import_power_w, max_export_power_w
FROM ems.site_grid_connection
WHERE site_id = $1
ORDER BY id
LIMIT 1
""",
site_id,
)
if grow is None:
raise RuntimeError(f"No site_grid_connection for site_id={site_id}")
grid = SimpleNamespace(
max_import_power_w=int(grow["max_import_power_w"]),
max_export_power_w=int(grow["max_export_power_w"]),
)
vrows = await db.fetch(
"""
SELECT v.battery_capacity_kwh,
v.max_charge_power_w,
v.default_target_soc_pct,
ch.code AS charger_code
FROM ems.asset_vehicle v
JOIN ems.asset_ev_charger ch ON ch.id = v.default_charger_id
WHERE v.site_id = $1
AND ch.code IN ('ev-charger-1', 'ev-charger-2')
ORDER BY ch.code
""",
site_id,
)
vehicles: list[SimpleNamespace] = [
SimpleNamespace(
max_charge_power_w=int(r["max_charge_power_w"]),
battery_capacity_kwh=float(r["battery_capacity_kwh"]),
default_target_soc_pct=float(r["default_target_soc_pct"]),
)
for r in vrows
]
while len(vehicles) < 2:
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)
)
srows = await db.fetch(
"""
SELECT es.target_deadline,
es.target_soc_pct,
es.soc_at_connect_pct,
es.energy_delivered_wh,
ch.code AS charger_code,
v.battery_capacity_kwh AS veh_cap_kwh,
v.default_target_soc_pct
FROM ems.ev_session es
JOIN ems.asset_ev_charger ch ON ch.id = es.charger_id
LEFT JOIN ems.asset_vehicle v ON v.id = es.vehicle_id
WHERE es.site_id = $1
AND es.session_end IS NULL
""",
site_id,
)
by_charger = {r["charger_code"]: r for r in srows}
ev_sessions = [
_ev_session_ctx(by_charger.get("ev-charger-1")),
_ev_session_ctx(by_charger.get("ev-charger-2")),
]
soc_pct = await db.fetchval(
"""
SELECT battery_soc_percent
FROM ems.telemetry_inverter
WHERE site_id = $1
ORDER BY measured_at DESC
LIMIT 1
""",
site_id,
)
if soc_pct is None:
soc_wh = reserve_wh
else:
soc_wh = float(soc_pct) / 100.0 * uc
soc_wh = max(reserve_wh, min(soc_wh, soc_max_wh))
tuv = await db.fetchval(
"""
SELECT tuv_tank_temp_c
FROM ems.telemetry_heat_pump
WHERE site_id = $1
ORDER BY measured_at DESC
LIMIT 1
""",
site_id,
)
tuv_temp = float(tuv) if tuv is not None else 50.0
return battery, heat_pump, grid, vehicles, ev_sessions, soc_wh, tuv_temp
async def _load_slots(site_id, from_dt, to_dt, db) -> list[PlanningSlot]:
"""Načte 15min sloty s cenami, forecasty a stavem EV z DB."""
rows = await db.fetch("""
SELECT
ep.interval_start,
ep.effective_buy_price_czk_kwh AS buy_price,
ep.effective_sell_price_czk_kwh AS sell_price,
COALESCE(fpi_a.power_w, 0) AS pv_a_forecast_w,
COALESCE(fpi_b.power_w, 0) AS pv_b_forecast_w,
COALESCE(cbi.power_w, 500) AS load_baseline_w,
-- EV připojení z aktuálního stavu nabíječek
(ev1.status NOT IN ('available', 'unavailable')) AS ev1_connected,
(ev2.status NOT IN ('available', 'unavailable')) AS ev2_connected
FROM ems.vw_site_effective_price ep
-- FVE pole A forecast
LEFT JOIN LATERAL (
SELECT fpi.power_w FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
JOIN ems.asset_pv_array apa ON apa.id = fpi.pv_array_id AND apa.site_id = fpr.site_id
WHERE fpr.site_id = $1 AND apa.code = 'pv-a'
AND fpi.interval_start = ep.interval_start AND fpr.status = 'ok'
ORDER BY fpr.created_at DESC LIMIT 1
) fpi_a ON true
-- FVE pole B forecast
LEFT JOIN LATERAL (
SELECT fpi.power_w FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
JOIN ems.asset_pv_array apa ON apa.id = fpi.pv_array_id AND apa.site_id = fpr.site_id
WHERE fpr.site_id = $1 AND apa.code = 'pv-b'
AND fpi.interval_start = ep.interval_start AND fpr.status = 'ok'
ORDER BY fpr.created_at DESC LIMIT 1
) fpi_b ON true
-- Bazální spotřeba
LEFT JOIN ems.consumption_baseline_interval cbi
ON cbi.site_id = $1 AND cbi.interval_start = ep.interval_start
AND cbi.data_type = 'forecast'
-- Stav EV nabíječek (aktuální, pro celý horizont stejný)
LEFT JOIN LATERAL (
SELECT t.status
FROM ems.telemetry_ev_charger t
JOIN ems.asset_ev_charger ch ON ch.id = t.charger_id
WHERE t.site_id = $1 AND ch.code = 'ev-charger-1'
ORDER BY t.measured_at DESC LIMIT 1
) ev1 ON true
LEFT JOIN LATERAL (
SELECT t.status
FROM ems.telemetry_ev_charger t
JOIN ems.asset_ev_charger ch ON ch.id = t.charger_id
WHERE t.site_id = $1 AND ch.code = 'ev-charger-2'
ORDER BY t.measured_at DESC LIMIT 1
) ev2 ON true
WHERE ep.site_id = $1
AND ep.interval_start >= $2 AND ep.interval_start < $3
ORDER BY ep.interval_start
""", site_id, from_dt, to_dt)
out: list[PlanningSlot] = []
for r in rows:
d = dict(r)
out.append(
PlanningSlot(
interval_start=d["interval_start"],
buy_price=float(d["buy_price"]),
sell_price=float(d["sell_price"]),
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
load_baseline_w=int(d["load_baseline_w"] or 0),
ev1_connected=bool(d["ev1_connected"]),
ev2_connected=bool(d["ev2_connected"]),
)
)
return out
async def _save_planning_run(
site_id, results, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction, db
) -> int:
"""Uloží výsledky solveru jako nový planning_run, deaktivuje předchozí."""
run_id = await db.fetchval("""
INSERT INTO ems.planning_run
(site_id, horizon_start, horizon_end, status,
run_type, triggered_by, replan_from,
soc_at_replan_wh, solver_duration_ms, forecast_correction_factor)
VALUES ($1,$2,$3,'draft',$4,$5,$6,$7,$8,$9)
RETURNING id
""", site_id, horizon_from, horizon_to,
run_type, triggered_by, replan_from,
soc_wh, duration_ms, correction)
# Bulk insert výsledků
await db.executemany("""
INSERT INTO ems.planning_interval
(run_id, interval_start,
battery_setpoint_w, battery_soc_target_pct,
grid_setpoint_w,
ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w,
heat_pump_enabled, heat_pump_setpoint_w,
pv_a_curtailed_w, expected_cost_czk,
effective_buy_price, effective_sell_price)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)
""", [
(run_id, r.interval_start,
r.battery_setpoint_w, r.battery_soc_target,
r.grid_setpoint_w,
r.ev1_setpoint_w, r.ev2_setpoint_w, r.ev1_via_bat_w, r.ev2_via_bat_w,
r.heat_pump_enabled, r.heat_pump_setpoint_w,
r.pv_a_curtailed_w, r.expected_cost_czk,
r.effective_buy_price, r.effective_sell_price)
for r in results
])
# Aktivovat nový plán, supersede předchozí
await db.execute("""
UPDATE ems.planning_run SET status = 'superseded'
WHERE site_id = $1 AND status = 'active' AND id <> $2
""", site_id, run_id)
await db.execute(
"UPDATE ems.planning_run SET status = 'active' WHERE id = $1", run_id
)
return run_id