1372 lines
50 KiB
Python
1372 lines
50 KiB
Python
# backend/services/planning_engine.py
|
||
#
|
||
# EMS Platform – plánovací engine
|
||
# Obsahuje: hlavní denní plán + rolling 15min replan
|
||
#
|
||
# Spouštění (APScheduler v main.py):
|
||
# scheduler.add_job(run_daily_plan, 'cron', hour=15, minute=0)
|
||
# scheduler.add_job(run_rolling_replan, 'cron', minute='*/15')
|
||
|
||
import time
|
||
import logging
|
||
from dataclasses import dataclass, replace
|
||
from datetime import datetime, timezone, timedelta
|
||
from types import SimpleNamespace
|
||
from typing import Optional
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import pulp
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ============================================================
|
||
# Konstanty
|
||
# ============================================================
|
||
|
||
HORIZON_HOURS = 96 # horizont denního plánu (OTE ~36h + predikce)
|
||
INTERVAL_H = 0.25 # 15 minut v hodinách
|
||
SLOT_WEIGHT_FULL = 1.0 # 0–36h od začátku okna (přesné OTE ceny)
|
||
SLOT_WEIGHT_MEDIUM = 0.7 # 36–72h
|
||
SLOT_WEIGHT_LOW = 0.4 # 72–96h
|
||
CURTAILMENT_PENALTY = 0.001 # Kč/Wh – malá penalizace za omezení FVE pole A
|
||
SOLVER_TIME_LIMIT = 10 # sekund
|
||
# MILP: jakýkoli významný výkon exportu ge (W) ⇒ koncové soc[t] ≥ arb_base_wh (rezerva z DB)
|
||
GE_MIN_EXPORT_W = 1.0
|
||
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
|
||
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
|
||
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
|
||
# Útlum korekce: čím dál od aktuálního času, tím méně korigujeme forecast
|
||
CORRECTION_DECAY_SLOTS = 16 # po 16 slotech (4h) klesne korekce na 0
|
||
# Dynamická ekonomická podlaha (MILP w_arb): lookahead FVE energie v dalších slotech
|
||
ARB_LOOKAHEAD_SLOTS = 32 # 8 h při INTERVAL_H=0.25
|
||
ARB_FLOOR_E_REF_FRAC = 0.5 # má scale Wh = tato frakce usable_capacity (0..1)
|
||
|
||
_PRAGUE_TZ = ZoneInfo("Europe/Prague")
|
||
|
||
|
||
def slot_weight(slot_index: int, now_index: int = 0) -> float:
|
||
"""Váha slotu v účelové funkci podle vzdálenosti od začátku optimalizačního okna."""
|
||
hours_ahead = (slot_index - now_index) * INTERVAL_H
|
||
if hours_ahead <= 36:
|
||
return SLOT_WEIGHT_FULL
|
||
if hours_ahead <= 72:
|
||
return SLOT_WEIGHT_MEDIUM
|
||
return SLOT_WEIGHT_LOW
|
||
|
||
|
||
def _pv_scarcity_penalty_multiplier(slots: list["PlanningSlot"], battery) -> float:
|
||
"""
|
||
Měkká úprava ekonomiky cyklu podle očekávaného slunečního zisku.
|
||
- málo očekávané FVE energie -> nižší penalizace cyklu (podpora precharge ze sítě),
|
||
- hodně očekávané FVE energie -> standardní penalizace.
|
||
"""
|
||
horizon_slots = min(len(slots), int(24 / INTERVAL_H)) # konzervativní 1 den dopředu
|
||
if horizon_slots <= 0:
|
||
return 1.0
|
||
|
||
pv_kwh = 0.0
|
||
for s in slots[:horizon_slots]:
|
||
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
|
||
|
||
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
|
||
# coverage = kolikanásobek baterie očekáváme ze slunce v horizontu.
|
||
coverage = pv_kwh / batt_kwh
|
||
coverage_clamped = max(0.0, min(1.0, coverage))
|
||
# 0.65 při nízkém slunci, 1.0 při vysokém slunci.
|
||
return 0.65 + 0.35 * coverage_clamped
|
||
|
||
|
||
def _pv_coverage_ratio(slots: list["PlanningSlot"], battery, hours: int = 24) -> float:
|
||
horizon_slots = min(len(slots), int(hours / INTERVAL_H))
|
||
if horizon_slots <= 0:
|
||
return 1.0
|
||
pv_kwh = 0.0
|
||
for s in slots[:horizon_slots]:
|
||
pv_kwh += max(0.0, float(s.pv_a_forecast_w + s.pv_b_forecast_w)) * INTERVAL_H / 1000.0
|
||
batt_kwh = max(1.0, float(getattr(battery, "usable_capacity_wh", 0.0)) / 1000.0)
|
||
return max(0.0, min(1.0, pv_kwh / batt_kwh))
|
||
|
||
|
||
def _dynamic_arb_floor_wh_series(
|
||
slots: list["PlanningSlot"],
|
||
min_soc_wh: float,
|
||
arb_base_wh: float,
|
||
usable_wh: float,
|
||
) -> list[float]:
|
||
"""
|
||
Časově proměnná ekonomická podlaha Wh pro MILP (nad min_soc_wh).
|
||
Hodně očekávané FVE energie v dalších ARB_LOOKAHEAD_SLOTS → podlaha klesá k min_soc_wh;
|
||
málo slunce → zůstává u arb_base_wh (typicky reserve z DB).
|
||
"""
|
||
T = len(slots)
|
||
if T == 0:
|
||
return []
|
||
e_ref = max(1.0, ARB_FLOOR_E_REF_FRAC * float(usable_wh))
|
||
spread = max(0.0, float(arb_base_wh) - float(min_soc_wh))
|
||
out: list[float] = []
|
||
for t in range(T):
|
||
e_pv_wh = 0.0
|
||
for k in range(t, min(T, t + ARB_LOOKAHEAD_SLOTS)):
|
||
s = slots[k]
|
||
e_pv_wh += max(0, s.pv_a_forecast_w + s.pv_b_forecast_w) * INTERVAL_H
|
||
f = min(1.0, e_pv_wh / e_ref) if e_ref > 1e-9 else 1.0
|
||
arb_t = float(min_soc_wh) + (1.0 - f) * spread
|
||
out.append(arb_t)
|
||
return out
|
||
|
||
|
||
def _soc_security_profile(slots: list["PlanningSlot"], battery) -> tuple[float, float]:
|
||
"""
|
||
Při nízkém očekávaném slunci drží solver vyšší SoC buffer:
|
||
- cílový buffer: reserve + až 20 % usable capacity,
|
||
- ekonomická penalizace deficitu vůči bufferu z průměrné ceny.
|
||
"""
|
||
coverage = _pv_coverage_ratio(slots, battery, hours=24)
|
||
scarcity = 1.0 - coverage
|
||
usable_wh = float(getattr(battery, "usable_capacity_wh", 0.0))
|
||
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0.0))
|
||
soc_max_wh = float(getattr(battery, "soc_max_wh", usable_wh))
|
||
extra_buffer_wh = 0.35 * usable_wh * scarcity
|
||
target_wh = min(soc_max_wh, reserve_wh + extra_buffer_wh)
|
||
|
||
h24 = min(len(slots), int(24 / INTERVAL_H))
|
||
avg_buy = (
|
||
sum(float(s.buy_price) for s in slots[:h24]) / h24
|
||
if h24 > 0
|
||
else 4.0
|
||
)
|
||
penalty_czk_kwh = max(0.1, avg_buy * 1.00 * scarcity)
|
||
return target_wh, penalty_czk_kwh
|
||
|
||
|
||
def _prague_dow_hour(interval_start: datetime) -> tuple[int, int]:
|
||
"""DOW v konvenci PostgreSQL EXTRACT(DOW, Europe/Prague): 0=Ne … 6=So."""
|
||
dt = interval_start
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
loc = dt.astimezone(_PRAGUE_TZ)
|
||
return (loc.weekday() + 1) % 7, loc.hour
|
||
|
||
|
||
# ============================================================
|
||
# Slot pre-selection (anti-micro-cycling)
|
||
# ============================================================
|
||
|
||
def _select_charge_slots(
|
||
slots: list["PlanningSlot"],
|
||
battery,
|
||
current_soc_wh: float,
|
||
) -> set[int]:
|
||
"""
|
||
Pre-select which slots are eligible for battery charging.
|
||
Only the X cheapest sell-price PV-surplus slots are selected,
|
||
enough to fill the battery with a configurable buffer.
|
||
Returns set of slot indices. Empty set = no restriction.
|
||
"""
|
||
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
|
||
if charge_buf <= 0:
|
||
return set(range(len(slots)))
|
||
|
||
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
|
||
if energy_to_fill <= 0:
|
||
return set()
|
||
|
||
candidates: list[tuple[int, float, float]] = []
|
||
for t, s in enumerate(slots):
|
||
pv_surplus = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
|
||
if pv_surplus <= 0:
|
||
continue
|
||
charge_w = min(float(battery.max_charge_power_w), float(pv_surplus))
|
||
charge_wh = charge_w * float(battery.charge_efficiency) * INTERVAL_H
|
||
candidates.append((t, float(s.sell_price), charge_wh))
|
||
|
||
candidates.sort(key=lambda x: x[1])
|
||
|
||
selected: set[int] = set()
|
||
cumulative = 0.0
|
||
target = energy_to_fill * charge_buf
|
||
for t, _price, wh in candidates:
|
||
if cumulative >= target:
|
||
break
|
||
selected.add(t)
|
||
cumulative += wh
|
||
|
||
if cumulative < energy_to_fill:
|
||
selected = set(c[0] for c in candidates)
|
||
|
||
return selected
|
||
|
||
|
||
def _select_discharge_export_slots(
|
||
slots: list["PlanningSlot"],
|
||
battery,
|
||
) -> set[int]:
|
||
"""
|
||
Pre-select which slots may use battery energy for grid export.
|
||
Only the Y most expensive sell-price slots are selected,
|
||
enough to empty the exportable portion of the battery with a buffer.
|
||
Returns set of slot indices. Empty set = no restriction.
|
||
"""
|
||
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
|
||
if discharge_buf <= 0:
|
||
return set(range(len(slots)))
|
||
|
||
exportable = float(battery.soc_max_wh) - float(battery.min_soc_wh)
|
||
if exportable <= 0:
|
||
return set()
|
||
|
||
candidates = [(t, float(s.sell_price)) for t, s in enumerate(slots)]
|
||
candidates.sort(key=lambda x: x[1], reverse=True)
|
||
|
||
energy_per_slot = (
|
||
float(battery.max_discharge_power_w)
|
||
* float(battery.discharge_efficiency)
|
||
* INTERVAL_H
|
||
)
|
||
target = exportable * discharge_buf
|
||
selected: set[int] = set()
|
||
cumulative = 0.0
|
||
for t, _price in candidates:
|
||
if cumulative >= target:
|
||
break
|
||
selected.add(t)
|
||
cumulative += energy_per_slot
|
||
|
||
return selected
|
||
|
||
|
||
# ============================================================
|
||
# Datové třídy (lze nahradit pydantic modely)
|
||
# ============================================================
|
||
|
||
@dataclass
|
||
class PlanningSlot:
|
||
interval_start: datetime
|
||
buy_price: float # Kč/kWh
|
||
sell_price: float # Kč/kWh
|
||
pv_a_forecast_w: int # W – pole A (řiditelné)
|
||
pv_b_forecast_w: int # W – pole B (zelený bonus, pevné)
|
||
load_baseline_w: int # W – predikce bazální spotřeby
|
||
ev1_connected: bool
|
||
ev2_connected: bool
|
||
is_predicted_price: bool = False
|
||
|
||
|
||
@dataclass
|
||
class DispatchResult:
|
||
interval_start: datetime
|
||
battery_setpoint_w: int # kladné = nabíjení, záporné = vybíjení
|
||
battery_soc_target: float # % SoC na konci intervalu
|
||
grid_setpoint_w: int # kladné = import, záporné = export
|
||
ev1_setpoint_w: Optional[int]
|
||
ev2_setpoint_w: Optional[int]
|
||
ev1_via_bat_w: int
|
||
ev2_via_bat_w: int
|
||
heat_pump_enabled: bool
|
||
heat_pump_setpoint_w: int
|
||
pv_a_curtailed_w: int
|
||
expected_cost_czk: float
|
||
effective_buy_price: float
|
||
effective_sell_price: float
|
||
is_predicted_price: bool # shodné s PlanningSlot (chybí OTE v efektivní ceně → fn_get_predicted_price)
|
||
|
||
|
||
# ============================================================
|
||
# Korekce forecastu na základě skutečné výroby
|
||
# ============================================================
|
||
|
||
async def compute_correction_factor(
|
||
site_id: int,
|
||
now: datetime,
|
||
db,
|
||
window_h: float = CORRECTION_WINDOW_H,
|
||
) -> tuple[float, dict]:
|
||
"""
|
||
Spočítá korekční faktor FVE forecastu z posledních window_h hodin.
|
||
|
||
Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP].
|
||
factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný.
|
||
"""
|
||
window_start = now - timedelta(hours=window_h)
|
||
|
||
# Skutečná výroba za okno (z telemetrie)
|
||
actual = await db.fetchval("""
|
||
SELECT COALESCE(SUM(pv_power_w) * 0.25 / 1000.0, 0) -- kWh
|
||
FROM ems.telemetry_inverter
|
||
WHERE site_id = $1
|
||
AND measured_at >= $2 AND measured_at < $3
|
||
""", site_id, window_start, now)
|
||
|
||
# Předpovídaná výroba za stejné okno (z nejnovějšího forecastu který platil tehdy)
|
||
forecast = await db.fetchval("""
|
||
SELECT COALESCE(SUM(fpi.power_w) * 0.25 / 1000.0, 0)
|
||
FROM ems.forecast_pv_interval fpi
|
||
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
|
||
WHERE fpr.site_id = $1
|
||
AND fpi.interval_start >= $2 AND fpi.interval_start < $3
|
||
AND fpr.status = 'ok'
|
||
AND fpr.created_at = (
|
||
SELECT MAX(fpr2.created_at)
|
||
FROM ems.forecast_pv_run fpr2
|
||
WHERE fpr2.site_id = $1 AND fpr2.status = 'ok'
|
||
AND fpr2.created_at <= $2
|
||
)
|
||
""", site_id, window_start, now)
|
||
|
||
log_data = {
|
||
"window_start": window_start,
|
||
"window_end": now,
|
||
"actual_pv_wh": actual * 1000,
|
||
"forecast_pv_wh": forecast * 1000,
|
||
}
|
||
|
||
# Pokud forecast nebo actual jsou příliš malé (noc, <0.1 kWh) → žádná korekce
|
||
if forecast < 0.1 or actual < 0.05:
|
||
log_data["correction_factor"] = 1.0
|
||
log_data["reason"] = "insufficient_data"
|
||
return 1.0, log_data
|
||
|
||
raw_factor = actual / forecast
|
||
factor = max(CORRECTION_MIN_CLAMP, min(CORRECTION_MAX_CLAMP, raw_factor))
|
||
|
||
log_data["correction_factor"] = factor
|
||
log_data["raw_factor"] = raw_factor
|
||
return factor, log_data
|
||
|
||
|
||
def apply_forecast_correction(
|
||
slots: list[PlanningSlot],
|
||
now: datetime,
|
||
factor: float,
|
||
decay_slots: int = CORRECTION_DECAY_SLOTS,
|
||
) -> list[PlanningSlot]:
|
||
"""
|
||
Aplikuje korekční faktor na FVE forecast zbývajících slotů.
|
||
Korekce se lineárně utlumuje: na 1. slotu plná korekce,
|
||
na decay_slots-tém slotu žádná korekce.
|
||
|
||
Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce
|
||
"""
|
||
corrected = []
|
||
for i, slot in enumerate(slots):
|
||
if factor == 1.0 or i >= decay_slots:
|
||
corrected.append(slot)
|
||
continue
|
||
|
||
# Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots)
|
||
weight = 1.0 - (i / decay_slots)
|
||
effective_factor = 1.0 + (factor - 1.0) * weight
|
||
|
||
corrected.append(
|
||
replace(
|
||
slot,
|
||
pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)),
|
||
pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)),
|
||
)
|
||
)
|
||
|
||
return corrected
|
||
|
||
|
||
# ============================================================
|
||
# LP Solver
|
||
# ============================================================
|
||
|
||
def solve_dispatch(
|
||
slots: list[PlanningSlot],
|
||
battery,
|
||
heat_pump,
|
||
grid,
|
||
ev_sessions: list, # aktivní EV sessions [ev1_session, ev2_session]
|
||
vehicles: list, # [vehicle1, vehicle2]
|
||
current_soc_wh: float,
|
||
current_tuv_temp_c: float,
|
||
*,
|
||
tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None,
|
||
now_slot_index: int = 0,
|
||
operating_mode: str = "AUTO",
|
||
price_failsafe_active: bool = False,
|
||
) -> tuple[list[DispatchResult], int]:
|
||
"""
|
||
LP solver pro dispatch optimalizaci.
|
||
Vrátí (výsledky, solver_duration_ms).
|
||
"""
|
||
T = len(slots)
|
||
EV = len(vehicles) # počet EV (typicky 2)
|
||
|
||
EV_ROUNDTRIP_FACTOR = 1.0 / (battery.charge_efficiency * battery.discharge_efficiency)
|
||
cycle_penalty_mult = _pv_scarcity_penalty_multiplier(slots, battery)
|
||
degradation_cost_effective = battery.degradation_cost_czk_kwh * cycle_penalty_mult
|
||
soc_buffer_target_wh, soc_deficit_penalty_czk_kwh = _soc_security_profile(slots, battery)
|
||
|
||
prob = pulp.LpProblem("ems_dispatch", pulp.LpMinimize)
|
||
|
||
min_soc_wh = float(getattr(battery, "min_soc_wh", battery.reserve_soc_wh))
|
||
arb_base_wh = max(
|
||
float(getattr(battery, "arb_floor_wh", battery.reserve_soc_wh)),
|
||
min_soc_wh,
|
||
)
|
||
if getattr(battery, "disable_dynamic_arb_floor", False):
|
||
arb_floor_series = [arb_base_wh] * T
|
||
else:
|
||
arb_floor_series = _dynamic_arb_floor_wh_series(
|
||
slots, min_soc_wh, arb_base_wh, float(battery.usable_capacity_wh)
|
||
)
|
||
|
||
# --- Proměnné ---
|
||
gi = [pulp.LpVariable(f"gi_{t}", 0, grid.max_import_power_w) for t in range(T)]
|
||
ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)]
|
||
bc = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)]
|
||
bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)]
|
||
soc = [pulp.LpVariable(f"soc_{t}", min_soc_wh, battery.soc_max_wh) for t in range(T)]
|
||
w_arb = [pulp.LpVariable(f"w_arb_{t}", cat=pulp.LpBinary) for t in range(T)]
|
||
z_export = [pulp.LpVariable(f"z_export_{t}", cat=pulp.LpBinary) for t in range(T)]
|
||
ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)]
|
||
hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)]
|
||
soc_deficit_24h = pulp.LpVariable("soc_deficit_24h", 0, battery.usable_capacity_wh)
|
||
|
||
# EV proměnné per vozidlo
|
||
ev_direct = [[pulp.LpVariable(f"evd_{e}_{t}", 0,
|
||
min(vehicles[e].max_charge_power_w, grid.max_import_power_w))
|
||
for t in range(T)] for e in range(EV)]
|
||
ev_via_bat = [[pulp.LpVariable(f"evb_{e}_{t}", 0,
|
||
vehicles[e].max_charge_power_w)
|
||
for t in range(T)] for e in range(EV)]
|
||
|
||
# --- Účelová funkce (váhy slotů podle nejistoty za horizontem OTE) ---
|
||
prob += pulp.lpSum(
|
||
slot_weight(t, now_slot_index) * (
|
||
gi[t] * slots[t].buy_price * INTERVAL_H / 1000
|
||
- ge[t] * slots[t].sell_price * INTERVAL_H / 1000
|
||
# Degradační náklad rozložíme symetricky na charge/discharge (0.5 + 0.5),
|
||
# aby nebyl roundtrip penalizovaný dvojnásobně.
|
||
+ 0.5 * (bc[t] + bd[t]) * degradation_cost_effective * INTERVAL_H / 1000
|
||
+ pulp.lpSum(
|
||
ev_direct[e][t] * slots[t].buy_price * INTERVAL_H / 1000
|
||
+ ev_via_bat[e][t] * slots[t].buy_price * EV_ROUNDTRIP_FACTOR * INTERVAL_H / 1000
|
||
for e in range(EV)
|
||
)
|
||
+ ca[t] * CURTAILMENT_PENALTY
|
||
)
|
||
for t in range(T)
|
||
) + soc_deficit_24h * soc_deficit_penalty_czk_kwh / 1000
|
||
|
||
# --- Omezení ---
|
||
for t in range(T):
|
||
s = slots[t]
|
||
pv_a_net = s.pv_a_forecast_w - ca[t]
|
||
|
||
ev_total_t = pulp.lpSum(ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV))
|
||
|
||
# Energetická bilance
|
||
prob += (
|
||
pv_a_net + s.pv_b_forecast_w + gi[t] + bd[t]
|
||
== s.load_baseline_w + ev_total_t + hp[t] + bc[t] + ge[t]
|
||
)
|
||
|
||
# SoC kontinuita
|
||
soc_prev = current_soc_wh if t == 0 else soc[t - 1]
|
||
prob += soc[t] == (
|
||
soc_prev
|
||
+ bc[t] * battery.charge_efficiency * INTERVAL_H
|
||
- bd[t] / battery.discharge_efficiency * INTERVAL_H
|
||
)
|
||
|
||
# ev_via_bat kryto z discharge
|
||
prob += pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t]
|
||
|
||
# Záporná prodejní cena → zakázat export
|
||
if s.sell_price < 0:
|
||
prob += ge[t] == 0
|
||
|
||
# Záporná nákupní cena → cap import (baseline domu + akumulace + řízené zátěže)
|
||
if s.buy_price < 0:
|
||
prob += gi[t] <= (
|
||
s.load_baseline_w
|
||
+ battery.max_charge_power_w
|
||
+ sum(v.max_charge_power_w for v in vehicles)
|
||
+ heat_pump.rated_heating_power_w
|
||
)
|
||
|
||
soc_prev_expr = current_soc_wh if t == 0 else soc[t - 1]
|
||
arb_t = arb_floor_series[t]
|
||
prob += soc_prev_expr >= (arb_t - (arb_t - min_soc_wh) * (1 - w_arb[t]))
|
||
prob += bd[t] <= (
|
||
s.load_baseline_w
|
||
+ ev_total_t
|
||
+ hp[t]
|
||
+ bc[t]
|
||
+ battery.max_discharge_power_w * w_arb[t]
|
||
)
|
||
|
||
# Významný export ⇒ koncové SoC ≥ ekonomická rezerva (arb_base_wh), ne dynamická arb_floor_series
|
||
m_ge = float(grid.max_export_power_w)
|
||
m_soc_bigm = float(battery.usable_capacity_wh)
|
||
prob += ge[t] <= m_ge * z_export[t]
|
||
prob += ge[t] >= GE_MIN_EXPORT_W * z_export[t]
|
||
prob += soc[t] >= arb_base_wh - m_soc_bigm * (1 - z_export[t])
|
||
|
||
# EV – limity a připojení
|
||
for e in range(EV):
|
||
connected = (
|
||
(e == 0 and s.ev1_connected) or
|
||
(e == 1 and s.ev2_connected)
|
||
)
|
||
if not connected:
|
||
prob += ev_direct[e][t] == 0
|
||
prob += ev_via_bat[e][t] == 0
|
||
else:
|
||
prob += ev_direct[e][t] + ev_via_bat[e][t] <= vehicles[e].max_charge_power_w
|
||
|
||
om = (operating_mode or "AUTO").strip().upper()
|
||
if om == "SELF_SUSTAIN":
|
||
for t in range(T):
|
||
prob += ge[t] == 0
|
||
prob += gi[t] <= slots[t].load_baseline_w
|
||
elif om == "PRESERVE":
|
||
for t in range(T):
|
||
prob += bc[t] == 0
|
||
prob += bd[t] == 0
|
||
elif om == "CHARGE_CHEAP":
|
||
for t in range(T):
|
||
prob += ge[t] == 0
|
||
prob += bd[t] == 0
|
||
|
||
if price_failsafe_active:
|
||
for t in range(T):
|
||
if slots[t].is_predicted_price:
|
||
prob += ge[t] == 0
|
||
|
||
# Slot pre-selection: omezení nabíjení a discharge-exportu na vybrané sloty
|
||
if om == "AUTO":
|
||
charge_slots = _select_charge_slots(slots, battery, current_soc_wh)
|
||
discharge_export_slots = _select_discharge_export_slots(slots, battery)
|
||
for t in range(T):
|
||
if t not in charge_slots:
|
||
prob += bc[t] == 0
|
||
|
||
if t not in discharge_export_slots:
|
||
s = slots[t]
|
||
ev_total_t = pulp.lpSum(
|
||
ev_direct[e][t] + ev_via_bat[e][t] for e in range(EV)
|
||
)
|
||
prob += bd[t] <= s.load_baseline_w + ev_total_t + hp[t]
|
||
|
||
# Deadline constraints pro EV
|
||
for e, session in enumerate(ev_sessions):
|
||
if session and session.target_deadline and session.energy_needed_wh > 0:
|
||
t_dl = next(
|
||
(t for t, s in enumerate(slots) if s.interval_start >= session.target_deadline),
|
||
T - 1
|
||
)
|
||
prob += pulp.lpSum(
|
||
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
|
||
for t in range(t_dl + 1)
|
||
if (e == 0 and slots[t].ev1_connected) or (e == 1 and slots[t].ev2_connected)
|
||
) >= session.energy_needed_wh
|
||
|
||
# TUV look-ahead podle tuv_usage_stats (DOW+hodina, konvence jako v DB)
|
||
if (
|
||
tuv_delta_stats
|
||
and heat_pump.rated_heating_power_w > 0
|
||
and getattr(heat_pump, "tuv_min_temp_c", 0) is not None
|
||
):
|
||
tuv_pred = float(current_tuv_temp_c)
|
||
tgt = float(getattr(heat_pump, "tuv_target_temp_c", 55.0) or 55.0)
|
||
thr = float(heat_pump.tuv_min_temp_c) + 5.0
|
||
for t in range(T):
|
||
dow, hour = _prague_dow_hour(slots[t].interval_start)
|
||
delta = tuv_delta_stats.get((dow, hour), -0.1)
|
||
tuv_pred += float(delta) * INTERVAL_H
|
||
if tuv_pred < thr:
|
||
prob += (
|
||
pulp.lpSum(hp[s] for s in range(max(0, t - 8), t + 1))
|
||
>= heat_pump.rated_heating_power_w * 0.5
|
||
)
|
||
tuv_pred = tgt
|
||
|
||
# Nouzový ohřev TUV
|
||
if current_tuv_temp_c < heat_pump.tuv_min_temp_c:
|
||
prob += hp[0] >= heat_pump.rated_heating_power_w * 0.8
|
||
|
||
# SoC bezpečnostní buffer vyhodnocený až na konci 24h horizontu
|
||
eod_idx = min(T - 1, int(24 / INTERVAL_H) - 1)
|
||
prob += soc_deficit_24h >= soc_buffer_target_wh - soc[eod_idx]
|
||
|
||
# --- Řešení (HiGHS přes highspy / PuLP API; bez externí binárky HiGHS_CMD) ---
|
||
t_start = time.monotonic()
|
||
try:
|
||
solver = pulp.getSolver(
|
||
"HiGHS", msg=False, timeLimit=SOLVER_TIME_LIMIT
|
||
)
|
||
except Exception:
|
||
logger.warning("HiGHS nedostupný, používám CBC fallback")
|
||
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=SOLVER_TIME_LIMIT)
|
||
status = prob.solve(solver)
|
||
duration_ms = int((time.monotonic() - t_start) * 1000)
|
||
|
||
if pulp.LpStatus[status] != 'Optimal':
|
||
raise RuntimeError(f"Solver: {pulp.LpStatus[status]}")
|
||
|
||
# --- Post-processing ---
|
||
results = []
|
||
for t in range(T):
|
||
hp_raw = pulp.value(hp[t])
|
||
hp_on = hp_raw > heat_pump.rated_heating_power_w * 0.3
|
||
batt_w = round(pulp.value(bc[t]) - pulp.value(bd[t]))
|
||
grid_w = round(pulp.value(gi[t]) - pulp.value(ge[t]))
|
||
soc_pct = round(pulp.value(soc[t]) / battery.usable_capacity_wh * 100, 1)
|
||
|
||
cost = (
|
||
pulp.value(gi[t]) * slots[t].buy_price * INTERVAL_H / 1000
|
||
- pulp.value(ge[t]) * slots[t].sell_price * INTERVAL_H / 1000
|
||
)
|
||
|
||
results.append(DispatchResult(
|
||
interval_start = slots[t].interval_start,
|
||
battery_setpoint_w = batt_w,
|
||
battery_soc_target = soc_pct,
|
||
grid_setpoint_w = grid_w,
|
||
ev1_setpoint_w = round(pulp.value(ev_direct[0][t]) + pulp.value(ev_via_bat[0][t]))
|
||
if slots[t].ev1_connected else None,
|
||
ev2_setpoint_w = round(pulp.value(ev_direct[1][t]) + pulp.value(ev_via_bat[1][t]))
|
||
if slots[t].ev2_connected else None,
|
||
ev1_via_bat_w = round(pulp.value(ev_via_bat[0][t])),
|
||
ev2_via_bat_w = round(pulp.value(ev_via_bat[1][t])),
|
||
heat_pump_enabled = hp_on,
|
||
heat_pump_setpoint_w = heat_pump.rated_heating_power_w if hp_on else 0,
|
||
pv_a_curtailed_w = round(pulp.value(ca[t])),
|
||
expected_cost_czk = round(cost, 4),
|
||
effective_buy_price = slots[t].buy_price,
|
||
effective_sell_price = slots[t].sell_price,
|
||
is_predicted_price = bool(slots[t].is_predicted_price),
|
||
))
|
||
|
||
return results, duration_ms
|
||
|
||
|
||
# ============================================================
|
||
# Denní plán (15:00)
|
||
# ============================================================
|
||
|
||
async def run_daily_plan(site_id: int, db, triggered_by: str = "scheduler:daily") -> tuple[int, int]:
|
||
"""
|
||
Hlavní denní plánování. Spouštět v 15:00 po importu cen (14:00)
|
||
a aktualizaci forecastu (14:30).
|
||
Horizont: od začátku aktuálního 15min slotu do +HORIZON_HOURS (96h; OTE + predikce).
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
horizon_from = _current_slot_start(now)
|
||
horizon_to = horizon_from + timedelta(hours=HORIZON_HOURS)
|
||
|
||
logger.info(f"[site={site_id}] Daily plan: {horizon_from} → {horizon_to}")
|
||
|
||
slots = await _load_slots(site_id, horizon_from, horizon_to, db)
|
||
critical_slots = int(36 / INTERVAL_H)
|
||
missing_ote_count = sum(1 for s in slots[:critical_slots] if s.is_predicted_price)
|
||
price_failsafe_active = missing_ote_count > 0
|
||
if price_failsafe_active:
|
||
logger.warning(
|
||
"[site=%s] Price fail-safe active (daily): missing OTE slots in first 36h = %s",
|
||
site_id,
|
||
missing_ote_count,
|
||
)
|
||
|
||
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode = (
|
||
await _load_site_context(site_id, db)
|
||
)
|
||
tuv_stats = await _load_tuv_usage_stats(site_id, db)
|
||
|
||
results, duration_ms = solve_dispatch(
|
||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||
tuv_delta_stats=tuv_stats,
|
||
operating_mode=operating_mode or "AUTO",
|
||
price_failsafe_active=price_failsafe_active,
|
||
)
|
||
|
||
slot_inputs = _build_slot_inputs(slots, slots)
|
||
run_id = await _save_planning_run(
|
||
site_id,
|
||
results,
|
||
horizon_from,
|
||
horizon_to,
|
||
run_type="daily",
|
||
triggered_by=triggered_by,
|
||
replan_from=None,
|
||
soc_wh=soc_wh,
|
||
duration_ms=duration_ms,
|
||
correction=1.0,
|
||
db=db,
|
||
slot_inputs=slot_inputs,
|
||
)
|
||
logger.info(f"[site={site_id}] Daily plan done in {duration_ms} ms")
|
||
return run_id, duration_ms
|
||
|
||
|
||
# ============================================================
|
||
# Rolling replan (každých 15min)
|
||
# ============================================================
|
||
|
||
async def run_rolling_replan(
|
||
site_id: int,
|
||
db,
|
||
*,
|
||
triggered_by: str = "scheduler:rolling",
|
||
allow_skip: bool = True,
|
||
) -> tuple[Optional[int], Optional[int]]:
|
||
"""
|
||
Rolling replan každých 15 minut.
|
||
1. Zjistí aktuální SoC baterie z telemetrie
|
||
2. Spočítá korekční faktor FVE forecastu z poslední hodiny
|
||
3. Aplikuje korekci na forecast zbytku dne (s útlumem)
|
||
4. Spustí solver pro zbývající horizont aktivního plánu
|
||
5. Uloží jako nový planning_run (aktivní plán se stane superseded)
|
||
|
||
Pokud allow_skip=True (scheduler) a horizont je vyčerpaný → vrátí (None, None).
|
||
Pokud allow_skip=False (API) → spustí denní plán jako náhradu.
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
replan_from = _current_slot_start(now)
|
||
|
||
active_run = await db.fetchrow("""
|
||
SELECT id, horizon_end FROM ems.planning_run
|
||
WHERE site_id = $1 AND status = 'active'
|
||
ORDER BY created_at DESC LIMIT 1
|
||
""", site_id)
|
||
|
||
if not active_run:
|
||
logger.warning(f"[site={site_id}] Rolling replan: no active plan, triggering daily plan")
|
||
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
|
||
|
||
horizon_to = active_run["horizon_end"]
|
||
|
||
if (horizon_to - replan_from).total_seconds() < 1800:
|
||
if allow_skip:
|
||
logger.info(f"[site={site_id}] Rolling replan: horizon almost exhausted, skipping")
|
||
return None, None
|
||
logger.info(f"[site={site_id}] Rolling replan: horizon exhausted, running daily plan")
|
||
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
|
||
|
||
logger.info(f"[site={site_id}] Rolling replan from {replan_from} → {horizon_to}")
|
||
|
||
battery, hp, grid, vehicles, ev_sessions, soc_wh, tuv_temp, operating_mode = (
|
||
await _load_site_context(site_id, db)
|
||
)
|
||
|
||
correction_factor, correction_log = await compute_correction_factor(site_id, now, db)
|
||
|
||
slots = await _load_slots(site_id, replan_from, horizon_to, db)
|
||
slots_before_pv_correction = list(slots)
|
||
critical_slots = int(36 / INTERVAL_H)
|
||
missing_ote_count = sum(1 for s in slots[:critical_slots] if s.is_predicted_price)
|
||
price_failsafe_active = missing_ote_count > 0
|
||
if price_failsafe_active:
|
||
logger.warning(
|
||
"[site=%s] Price fail-safe active (rolling): missing OTE slots in first 36h = %s",
|
||
site_id,
|
||
missing_ote_count,
|
||
)
|
||
|
||
slots = apply_forecast_correction(slots, now, correction_factor)
|
||
|
||
tuv_stats = await _load_tuv_usage_stats(site_id, db)
|
||
|
||
results, duration_ms = solve_dispatch(
|
||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||
tuv_delta_stats=tuv_stats,
|
||
operating_mode=operating_mode or "AUTO",
|
||
price_failsafe_active=price_failsafe_active,
|
||
)
|
||
|
||
slot_inputs = _build_slot_inputs(slots_before_pv_correction, slots)
|
||
run_id = await _save_planning_run(
|
||
site_id,
|
||
results,
|
||
replan_from,
|
||
horizon_to,
|
||
run_type="rolling",
|
||
triggered_by=triggered_by,
|
||
replan_from=replan_from,
|
||
soc_wh=soc_wh,
|
||
duration_ms=duration_ms,
|
||
correction=correction_factor,
|
||
db=db,
|
||
slot_inputs=slot_inputs,
|
||
)
|
||
|
||
await db.execute(
|
||
"""
|
||
INSERT INTO ems.forecast_correction_log
|
||
(site_id, window_start, window_end, actual_pv_wh, forecast_pv_wh,
|
||
correction_factor, applied_to_run_id)
|
||
VALUES ($1,$2,$3,$4,$5,$6,$7)
|
||
""",
|
||
site_id,
|
||
correction_log["window_start"],
|
||
correction_log["window_end"],
|
||
correction_log.get("actual_pv_wh"),
|
||
correction_log.get("forecast_pv_wh"),
|
||
correction_factor,
|
||
run_id,
|
||
)
|
||
|
||
logger.info(
|
||
f"[site={site_id}] Rolling replan done in {duration_ms} ms "
|
||
f"(correction={correction_factor:.3f})"
|
||
)
|
||
return run_id, duration_ms
|
||
|
||
|
||
async def run_plan_api(
|
||
site_id: int,
|
||
plan_type: str,
|
||
db,
|
||
*,
|
||
triggered_by: str = "api",
|
||
) -> tuple[int, int]:
|
||
"""Ruční / UI spuštění plánu. Vždy vrátí (run_id, solver_duration_ms)."""
|
||
pt = plan_type.lower().strip()
|
||
if pt == "daily":
|
||
return await run_daily_plan(site_id, db, triggered_by=triggered_by)
|
||
if pt == "rolling":
|
||
rid, ms = await run_rolling_replan(
|
||
site_id, db, triggered_by=triggered_by, allow_skip=False
|
||
)
|
||
if rid is None or ms is None:
|
||
raise RuntimeError("Rolling replan did not return a run")
|
||
return rid, ms
|
||
raise ValueError(f"Unknown plan_type: {plan_type!r} (use daily or rolling)")
|
||
|
||
|
||
# ============================================================
|
||
# Pomocné funkce
|
||
# ============================================================
|
||
|
||
def _current_slot_start(dt: datetime) -> datetime:
|
||
"""Zaokrouhlí čas dolů na začátek aktuálního 15min slotu."""
|
||
minute = (dt.minute // 15) * 15
|
||
return dt.replace(minute=minute, second=0, microsecond=0)
|
||
|
||
|
||
def _ev_session_ctx(row) -> Optional[SimpleNamespace]:
|
||
"""Kontext deadline constraintu pro jedno EV (nebo None)."""
|
||
if row is None or row["target_deadline"] is None:
|
||
return None
|
||
cap_kwh = row["veh_cap_kwh"]
|
||
if cap_kwh is None:
|
||
return None
|
||
cap_wh = float(cap_kwh) * 1000.0
|
||
tgt = row["target_soc_pct"]
|
||
if tgt is None:
|
||
tgt = row["default_target_soc_pct"]
|
||
if tgt is None:
|
||
return None
|
||
tgt_f = float(tgt)
|
||
soc0 = row["soc_at_connect_pct"]
|
||
if soc0 is None:
|
||
return None
|
||
needed_wh = (tgt_f - float(soc0)) / 100.0 * cap_wh
|
||
delivered = float(row["energy_delivered_wh"] or 0)
|
||
remaining = max(0.0, needed_wh - delivered)
|
||
if remaining <= 0:
|
||
return None
|
||
return SimpleNamespace(
|
||
target_deadline=row["target_deadline"],
|
||
energy_needed_wh=remaining,
|
||
)
|
||
|
||
|
||
async def _load_site_context(site_id: int, db):
|
||
"""
|
||
Načte baterii, TČ, síť, 2× vozidlo, otevřené EV session, SoC, TUV a provozní režim pro solver.
|
||
"""
|
||
operating_mode = await db.fetchval(
|
||
"SELECT mode_code FROM ems.site_operating_mode WHERE site_id = $1",
|
||
site_id,
|
||
)
|
||
|
||
brow = await db.fetchrow(
|
||
"""
|
||
SELECT ab.usable_capacity_wh,
|
||
ab.min_soc_percent,
|
||
ab.reserve_soc_percent,
|
||
ab.max_soc_percent,
|
||
ab.charge_efficiency,
|
||
ab.discharge_efficiency,
|
||
ab.degradation_cost_czk_kwh,
|
||
ab.charge_slot_buffer,
|
||
ab.discharge_slot_buffer,
|
||
LEAST(
|
||
COALESCE(ai.max_battery_charge_w, ai.max_charge_power_w),
|
||
COALESCE(
|
||
ab.bms_max_charge_w,
|
||
CASE WHEN ab.max_charge_c_rate IS NOT NULL
|
||
THEN (ab.max_charge_c_rate * ab.usable_capacity_wh)::bigint
|
||
END,
|
||
COALESCE(ai.max_battery_charge_w, ai.max_charge_power_w)
|
||
)
|
||
) AS effective_charge_w,
|
||
LEAST(
|
||
COALESCE(ai.max_battery_discharge_w, ai.max_discharge_power_w),
|
||
COALESCE(
|
||
ab.bms_max_discharge_w,
|
||
CASE WHEN ab.max_discharge_c_rate IS NOT NULL
|
||
THEN (ab.max_discharge_c_rate * ab.usable_capacity_wh)::bigint
|
||
END,
|
||
COALESCE(ai.max_battery_discharge_w, ai.max_discharge_power_w)
|
||
)
|
||
) AS effective_discharge_w
|
||
FROM ems.asset_battery ab
|
||
JOIN ems.asset_inverter ai ON ai.id = ab.inverter_id AND ai.site_id = ab.site_id
|
||
WHERE ab.site_id = $1
|
||
ORDER BY ab.id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if brow is None:
|
||
raise RuntimeError(f"No asset_battery for site_id={site_id}")
|
||
|
||
ec_w = brow["effective_charge_w"]
|
||
ed_w = brow["effective_discharge_w"]
|
||
if ec_w is None or ed_w is None:
|
||
raise RuntimeError(
|
||
f"Battery effective power limits missing for site_id={site_id} "
|
||
"(need max_battery_charge_w/max_discharge or legacy max_charge_power_w / max_discharge_power_w)"
|
||
)
|
||
ec_i = int(ec_w)
|
||
ed_i = int(ed_w)
|
||
if ec_i <= 0 or ed_i <= 0:
|
||
raise RuntimeError(
|
||
f"Invalid battery effective limits for site_id={site_id}: "
|
||
f"charge={ec_i}W discharge={ed_i}W"
|
||
)
|
||
|
||
uc = float(brow["usable_capacity_wh"])
|
||
min_soc_wh = float(brow["min_soc_percent"]) / 100.0 * uc
|
||
arb_floor_wh = float(brow["reserve_soc_percent"]) / 100.0 * uc
|
||
soc_max_wh = float(brow["max_soc_percent"]) / 100.0 * uc
|
||
battery = SimpleNamespace(
|
||
usable_capacity_wh=uc,
|
||
min_soc_wh=min_soc_wh,
|
||
arb_floor_wh=arb_floor_wh,
|
||
reserve_soc_wh=arb_floor_wh,
|
||
soc_max_wh=soc_max_wh,
|
||
charge_efficiency=float(brow["charge_efficiency"]),
|
||
discharge_efficiency=float(brow["discharge_efficiency"]),
|
||
degradation_cost_czk_kwh=float(brow["degradation_cost_czk_kwh"]),
|
||
max_charge_power_w=ec_i,
|
||
max_discharge_power_w=ed_i,
|
||
charge_slot_buffer=float(brow["charge_slot_buffer"]) if brow["charge_slot_buffer"] is not None else 0,
|
||
discharge_slot_buffer=float(brow["discharge_slot_buffer"]) if brow["discharge_slot_buffer"] is not None else 0,
|
||
)
|
||
|
||
hrow = await db.fetchrow(
|
||
"""
|
||
SELECT COALESCE(rated_heating_power_w, 8000) AS rated_heating_power_w,
|
||
COALESCE(tuv_min_temp_c, 45) AS tuv_min_temp_c,
|
||
COALESCE(tuv_target_temp_c, 55) AS tuv_target_temp_c
|
||
FROM ems.asset_heat_pump
|
||
WHERE site_id = $1
|
||
ORDER BY id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if hrow is None:
|
||
heat_pump = SimpleNamespace(
|
||
rated_heating_power_w=0,
|
||
tuv_min_temp_c=0.0,
|
||
tuv_target_temp_c=55.0,
|
||
)
|
||
else:
|
||
hp_w = int(hrow["rated_heating_power_w"])
|
||
heat_pump = SimpleNamespace(
|
||
rated_heating_power_w=max(hp_w, 0),
|
||
tuv_min_temp_c=float(hrow["tuv_min_temp_c"]),
|
||
tuv_target_temp_c=float(hrow["tuv_target_temp_c"]),
|
||
)
|
||
|
||
grow = await db.fetchrow(
|
||
"""
|
||
SELECT max_import_power_w, max_export_power_w
|
||
FROM ems.site_grid_connection
|
||
WHERE site_id = $1
|
||
ORDER BY id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if grow is None:
|
||
raise RuntimeError(f"No site_grid_connection for site_id={site_id}")
|
||
grid = SimpleNamespace(
|
||
max_import_power_w=int(grow["max_import_power_w"]),
|
||
max_export_power_w=int(grow["max_export_power_w"]),
|
||
)
|
||
|
||
vrows = await db.fetch(
|
||
"""
|
||
SELECT v.battery_capacity_kwh,
|
||
v.max_charge_power_w,
|
||
v.default_target_soc_pct,
|
||
ch.code AS charger_code
|
||
FROM ems.asset_vehicle v
|
||
JOIN ems.asset_ev_charger ch ON ch.id = v.default_charger_id
|
||
WHERE v.site_id = $1
|
||
AND ch.code IN ('ev-charger-1', 'ev-charger-2')
|
||
ORDER BY ch.code
|
||
""",
|
||
site_id,
|
||
)
|
||
vehicles: list[SimpleNamespace] = [
|
||
SimpleNamespace(
|
||
max_charge_power_w=int(r["max_charge_power_w"]),
|
||
battery_capacity_kwh=float(r["battery_capacity_kwh"]),
|
||
default_target_soc_pct=float(r["default_target_soc_pct"]),
|
||
)
|
||
for r in vrows
|
||
]
|
||
while len(vehicles) < 2:
|
||
vehicles.append(
|
||
SimpleNamespace(
|
||
max_charge_power_w=0,
|
||
battery_capacity_kwh=1.0,
|
||
default_target_soc_pct=80.0,
|
||
)
|
||
)
|
||
|
||
srows = await db.fetch(
|
||
"""
|
||
SELECT es.target_deadline,
|
||
es.target_soc_pct,
|
||
es.soc_at_connect_pct,
|
||
es.energy_delivered_wh,
|
||
ch.code AS charger_code,
|
||
v.battery_capacity_kwh AS veh_cap_kwh,
|
||
v.default_target_soc_pct
|
||
FROM ems.ev_session es
|
||
JOIN ems.asset_ev_charger ch ON ch.id = es.charger_id
|
||
LEFT JOIN ems.asset_vehicle v ON v.id = es.vehicle_id
|
||
WHERE es.site_id = $1
|
||
AND es.session_end IS NULL
|
||
""",
|
||
site_id,
|
||
)
|
||
by_charger = {r["charger_code"]: r for r in srows}
|
||
ev_sessions = [
|
||
_ev_session_ctx(by_charger.get("ev-charger-1")),
|
||
_ev_session_ctx(by_charger.get("ev-charger-2")),
|
||
]
|
||
|
||
soc_pct = await db.fetchval(
|
||
"""
|
||
SELECT battery_soc_percent
|
||
FROM ems.telemetry_inverter
|
||
WHERE site_id = $1
|
||
ORDER BY measured_at DESC
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if soc_pct is None:
|
||
soc_wh = uc * 0.5
|
||
else:
|
||
soc_wh = float(soc_pct) / 100.0 * uc
|
||
soc_wh = max(min_soc_wh, min(soc_wh, soc_max_wh))
|
||
|
||
tuv = await db.fetchval(
|
||
"""
|
||
SELECT tuv_tank_temp_c
|
||
FROM ems.telemetry_heat_pump
|
||
WHERE site_id = $1
|
||
ORDER BY measured_at DESC
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
tuv_temp = float(tuv) if tuv is not None else 50.0
|
||
|
||
return (
|
||
battery,
|
||
heat_pump,
|
||
grid,
|
||
vehicles,
|
||
ev_sessions,
|
||
soc_wh,
|
||
tuv_temp,
|
||
operating_mode,
|
||
)
|
||
|
||
|
||
async def _load_tuv_usage_stats(site_id: int, db) -> dict[tuple[int, int], float]:
|
||
"""Průměrná změna teploty TUV zásobníku per (DOW, hodina) v konvenci DB EXTRACT(DOW)."""
|
||
rows = await db.fetch(
|
||
"""
|
||
SELECT day_of_week, hour_of_day, avg_temp_delta_c
|
||
FROM ems.tuv_usage_stats
|
||
WHERE site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
return {
|
||
(int(r["day_of_week"]), int(r["hour_of_day"])): float(r["avg_temp_delta_c"])
|
||
for r in rows
|
||
}
|
||
|
||
|
||
async def _load_slots(site_id, from_dt, to_dt, db) -> list[PlanningSlot]:
|
||
"""Načte 15min sloty s cenami (OTE + predikce za horizont), forecasty a stavem EV z DB."""
|
||
rows = await db.fetch("""
|
||
WITH slot_spine AS (
|
||
SELECT gs AS interval_start
|
||
FROM generate_series(
|
||
$2::timestamptz,
|
||
($3::timestamptz - interval '15 minutes')::timestamptz,
|
||
interval '15 minutes'
|
||
) AS gs
|
||
)
|
||
SELECT
|
||
s.interval_start,
|
||
COALESCE(
|
||
ep.effective_buy_price_czk_kwh,
|
||
ems.fn_get_predicted_price($1, s.interval_start)
|
||
) AS buy_price,
|
||
COALESCE(
|
||
ep.effective_sell_price_czk_kwh,
|
||
ems.fn_get_predicted_price($1, s.interval_start) * 0.85
|
||
) AS sell_price,
|
||
(ep.effective_buy_price_czk_kwh IS NULL) AS is_predicted_price,
|
||
COALESCE(fpi_a.power_w, 0) AS pv_a_forecast_w,
|
||
COALESCE(fpi_b.power_w, 0) AS pv_b_forecast_w,
|
||
COALESCE(
|
||
(SELECT bs.avg_power_w
|
||
FROM ems.consumption_baseline_stats bs
|
||
WHERE bs.site_id = $1
|
||
AND bs.day_of_week = EXTRACT(DOW FROM s.interval_start
|
||
AT TIME ZONE 'Europe/Prague')::INT
|
||
AND bs.hour_of_day = EXTRACT(HOUR FROM s.interval_start
|
||
AT TIME ZONE 'Europe/Prague')::INT
|
||
LIMIT 1),
|
||
500
|
||
) AS load_baseline_w,
|
||
(COALESCE(ev1.status, 'available') NOT IN ('available', 'unavailable')) AS ev1_connected,
|
||
(COALESCE(ev2.status, 'available') NOT IN ('available', 'unavailable')) AS ev2_connected
|
||
FROM slot_spine s
|
||
LEFT JOIN ems.vw_site_effective_price ep
|
||
ON ep.site_id = $1 AND ep.interval_start = s.interval_start
|
||
LEFT JOIN LATERAL (
|
||
SELECT COALESCE(SUM(u.power_w), 0)::INT AS power_w
|
||
FROM (
|
||
SELECT DISTINCT ON (apa.id)
|
||
fpi.power_w
|
||
FROM ems.asset_pv_array apa
|
||
JOIN ems.forecast_pv_run fpr
|
||
ON fpr.pv_array_id = apa.id
|
||
AND fpr.site_id = apa.site_id
|
||
AND fpr.status = 'ok'
|
||
JOIN ems.forecast_pv_interval fpi
|
||
ON fpi.run_id = fpr.id
|
||
AND fpi.pv_array_id = apa.id
|
||
AND fpi.interval_start = s.interval_start
|
||
WHERE apa.site_id = $1
|
||
AND apa.controllable IS TRUE
|
||
ORDER BY apa.id, fpr.created_at DESC
|
||
) u
|
||
) fpi_a ON true
|
||
LEFT JOIN LATERAL (
|
||
SELECT COALESCE(SUM(u.power_w), 0)::INT AS power_w
|
||
FROM (
|
||
SELECT DISTINCT ON (apa.id)
|
||
fpi.power_w
|
||
FROM ems.asset_pv_array apa
|
||
JOIN ems.forecast_pv_run fpr
|
||
ON fpr.pv_array_id = apa.id
|
||
AND fpr.site_id = apa.site_id
|
||
AND fpr.status = 'ok'
|
||
JOIN ems.forecast_pv_interval fpi
|
||
ON fpi.run_id = fpr.id
|
||
AND fpi.pv_array_id = apa.id
|
||
AND fpi.interval_start = s.interval_start
|
||
WHERE apa.site_id = $1
|
||
AND apa.controllable IS FALSE
|
||
ORDER BY apa.id, fpr.created_at DESC
|
||
) u
|
||
) fpi_b ON true
|
||
LEFT JOIN LATERAL (
|
||
SELECT t.status
|
||
FROM ems.telemetry_ev_charger t
|
||
JOIN ems.asset_ev_charger ch ON ch.id = t.charger_id
|
||
WHERE t.site_id = $1 AND ch.code = 'ev-charger-1'
|
||
ORDER BY t.measured_at DESC LIMIT 1
|
||
) ev1 ON true
|
||
LEFT JOIN LATERAL (
|
||
SELECT t.status
|
||
FROM ems.telemetry_ev_charger t
|
||
JOIN ems.asset_ev_charger ch ON ch.id = t.charger_id
|
||
WHERE t.site_id = $1 AND ch.code = 'ev-charger-2'
|
||
ORDER BY t.measured_at DESC LIMIT 1
|
||
) ev2 ON true
|
||
ORDER BY s.interval_start
|
||
""", site_id, from_dt, to_dt)
|
||
|
||
out: list[PlanningSlot] = []
|
||
for r in rows:
|
||
d = dict(r)
|
||
out.append(
|
||
PlanningSlot(
|
||
interval_start=d["interval_start"],
|
||
buy_price=float(d["buy_price"]),
|
||
sell_price=float(d["sell_price"]),
|
||
pv_a_forecast_w=int(d["pv_a_forecast_w"] or 0),
|
||
pv_b_forecast_w=int(d["pv_b_forecast_w"] or 0),
|
||
load_baseline_w=int(d["load_baseline_w"] or 0),
|
||
ev1_connected=bool(d["ev1_connected"]),
|
||
ev2_connected=bool(d["ev2_connected"]),
|
||
is_predicted_price=bool(d.get("is_predicted_price")),
|
||
)
|
||
)
|
||
if not out:
|
||
raise RuntimeError(
|
||
"No planning slots available – check market prices and horizon settings"
|
||
)
|
||
return out
|
||
|
||
|
||
def _build_slot_inputs(
|
||
slots_raw_pv: list[PlanningSlot],
|
||
slots_solver: list[PlanningSlot],
|
||
) -> list[tuple[int, int, int, int, int]]:
|
||
"""(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot."""
|
||
if len(slots_raw_pv) != len(slots_solver):
|
||
raise ValueError("slots_raw_pv and slots_solver length mismatch")
|
||
out: list[tuple[int, int, int, int, int]] = []
|
||
for raw, sol in zip(slots_raw_pv, slots_solver):
|
||
out.append(
|
||
(
|
||
int(raw.load_baseline_w),
|
||
int(raw.pv_a_forecast_w),
|
||
int(raw.pv_b_forecast_w),
|
||
int(sol.pv_a_forecast_w),
|
||
int(sol.pv_b_forecast_w),
|
||
)
|
||
)
|
||
return out
|
||
|
||
|
||
async def _save_planning_run(
|
||
site_id, results, horizon_from, horizon_to,
|
||
run_type, triggered_by, replan_from,
|
||
soc_wh, duration_ms, correction, db,
|
||
slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None,
|
||
) -> int:
|
||
"""Uloží výsledky solveru jako nový planning_run, deaktivuje předchozí."""
|
||
if slot_inputs is not None and len(slot_inputs) != len(results):
|
||
raise ValueError("slot_inputs and results length mismatch")
|
||
run_id = await db.fetchval("""
|
||
INSERT INTO ems.planning_run
|
||
(site_id, horizon_start, horizon_end, status,
|
||
run_type, triggered_by, replan_from,
|
||
soc_at_replan_wh, solver_duration_ms, forecast_correction_factor)
|
||
VALUES ($1,$2,$3,'draft',$4,$5,$6,$7,$8,$9)
|
||
RETURNING id
|
||
""", site_id, horizon_from, horizon_to,
|
||
run_type, triggered_by, replan_from,
|
||
soc_wh, duration_ms, correction)
|
||
|
||
# Bulk insert výsledků
|
||
if slot_inputs is not None:
|
||
rows_pi = [
|
||
(
|
||
run_id,
|
||
r.interval_start,
|
||
r.battery_setpoint_w,
|
||
r.battery_soc_target,
|
||
r.grid_setpoint_w,
|
||
r.ev1_setpoint_w,
|
||
r.ev2_setpoint_w,
|
||
r.ev1_via_bat_w,
|
||
r.ev2_via_bat_w,
|
||
r.heat_pump_enabled,
|
||
r.heat_pump_setpoint_w,
|
||
r.pv_a_curtailed_w,
|
||
r.expected_cost_czk,
|
||
r.effective_buy_price,
|
||
r.effective_sell_price,
|
||
r.is_predicted_price,
|
||
si[0],
|
||
si[1],
|
||
si[2],
|
||
si[3],
|
||
si[4],
|
||
)
|
||
for r, si in zip(results, slot_inputs)
|
||
]
|
||
await db.executemany(
|
||
"""
|
||
INSERT INTO ems.planning_interval
|
||
(run_id, interval_start,
|
||
battery_setpoint_w, battery_soc_target_pct,
|
||
grid_setpoint_w,
|
||
ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w,
|
||
heat_pump_enabled, heat_pump_setpoint_w,
|
||
pv_a_curtailed_w, expected_cost_czk,
|
||
effective_buy_price, effective_sell_price,
|
||
is_predicted_price,
|
||
load_baseline_w,
|
||
pv_a_forecast_raw_w, pv_b_forecast_raw_w,
|
||
pv_a_forecast_solver_w, pv_b_forecast_solver_w)
|
||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,
|
||
$17,$18,$19,$20,$21)
|
||
""",
|
||
rows_pi,
|
||
)
|
||
else:
|
||
await db.executemany(
|
||
"""
|
||
INSERT INTO ems.planning_interval
|
||
(run_id, interval_start,
|
||
battery_setpoint_w, battery_soc_target_pct,
|
||
grid_setpoint_w,
|
||
ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w,
|
||
heat_pump_enabled, heat_pump_setpoint_w,
|
||
pv_a_curtailed_w, expected_cost_czk,
|
||
effective_buy_price, effective_sell_price,
|
||
is_predicted_price)
|
||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)
|
||
""",
|
||
[
|
||
(
|
||
run_id,
|
||
r.interval_start,
|
||
r.battery_setpoint_w,
|
||
r.battery_soc_target,
|
||
r.grid_setpoint_w,
|
||
r.ev1_setpoint_w,
|
||
r.ev2_setpoint_w,
|
||
r.ev1_via_bat_w,
|
||
r.ev2_via_bat_w,
|
||
r.heat_pump_enabled,
|
||
r.heat_pump_setpoint_w,
|
||
r.pv_a_curtailed_w,
|
||
r.expected_cost_czk,
|
||
r.effective_buy_price,
|
||
r.effective_sell_price,
|
||
r.is_predicted_price,
|
||
)
|
||
for r in results
|
||
],
|
||
)
|
||
|
||
# Aktivovat nový plán, supersede předchozí
|
||
await db.execute("""
|
||
UPDATE ems.planning_run SET status = 'superseded'
|
||
WHERE site_id = $1 AND status = 'active' AND id <> $2
|
||
""", site_id, run_id)
|
||
|
||
await db.execute(
|
||
"UPDATE ems.planning_run SET status = 'active' WHERE id = $1", run_id
|
||
)
|
||
|
||
return run_id
|