posun dovybiti tesnep red zapornou cenu
Some checks failed
CI and deploy / migration-check (push) Failing after 11s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-04-26 01:30:28 +02:00
parent b1e124416d
commit 0edf9226cb
4 changed files with 138 additions and 7 deletions

View File

@@ -34,9 +34,13 @@ INTERVAL_H = 0.25 # 15 minut v hodinách
CURTAILMENT_PENALTY = 0.001 # Kč/Wh malá penalizace za omezení FVE pole A
SOLVER_TIME_LIMIT = 10 # sekund
# MILP: významný export ge (W) ⇒ koncové soc[t] ≥ podlaha; mimo arbitrážní relax je to arb_base_wh
# (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_min_series[t]
# (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_panel_min[t]
# (planner floor), jinak by šlo jen do zátěže a nešlo by „vypustit do sítě“ před levným nákupem.
GE_MIN_EXPORT_W = 1.0
# Dokud je první „extrémní“ buy dál než tento počet 15min slotů, držíme plánovací spodek na rezervě
# (arb_base_wh) místo hlubokého planner floor — aby šlo nejdřív vybíjet „standardně“ a hluboký
# dump až těsně před oknem záporných cen (operativní buffer).
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS = 8
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru
@@ -213,6 +217,53 @@ def _soc_min_wh_series(
return out
def _slots_until_buy_le_threshold(
slots: list[PlanningSlot], buy_threshold: float
) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s buy_price <= buy_threshold.
Pokud v [t, T) žádný takový není, vrátí T + 1 (větší než jakýkoli rozumný prewindow).
"""
t_len = len(slots)
sentinel = t_len + 1
next_le = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].buy_price) <= buy_threshold:
next_le = t
next_at_or_after[t] = next_le
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _soc_panel_min_wh_series(
soc_min_series: list[float],
slots_until_buy_extreme: list[int],
min_soc_wh: float,
arb_base_wh: float,
prewindow_slots: int,
) -> list[float]:
"""
Zpoždění hluboké relaxace: pokud je lookahead extrémní, ale první extrémní buy je dál než
prewindow_slots, drž spodek na max(relax_wh, arb_base_wh) — prakticky na rezervě.
"""
t_len = len(soc_min_series)
out: list[float] = []
for t in range(t_len):
sm = float(soc_min_series[t])
if sm < min_soc_wh - 1e-3 and slots_until_buy_extreme[t] > prewindow_slots:
out.append(max(sm, float(arb_base_wh)))
else:
out.append(sm)
return out
@dataclass
class DispatchResult:
interval_start: datetime
@@ -382,6 +433,25 @@ def solve_dispatch(
slots, min_soc_wh, arb_base_wh, float(battery.usable_capacity_wh)
)
prewin = max(
0,
int(
getattr(
battery,
"planner_discharge_relax_prewindow_slots",
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
)
),
)
slots_until_buy_extreme = _slots_until_buy_le_threshold(slots, buy_extreme_thr)
soc_panel_min = _soc_panel_min_wh_series(
soc_min_series,
slots_until_buy_extreme,
min_soc_wh,
arb_base_wh,
prewin,
)
# --- Proměnné ---
# gi[t] horní mez: site breaker (max_import_power_w) je fyzický strop.
# Pro robustnost (forecast PV/load nemusí sedět) používáme měkký cap: dovolíme gi nominálně
@@ -396,7 +466,7 @@ def solve_dispatch(
bc = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)]
soc = [
pulp.LpVariable(f"soc_{t}", soc_min_series[t], battery.soc_max_wh) for t in range(T)
pulp.LpVariable(f"soc_{t}", soc_panel_min[t], battery.soc_max_wh) for t in range(T)
]
w_arb = [pulp.LpVariable(f"w_arb_{t}", cat=pulp.LpBinary) for t in range(T)]
z_export = [pulp.LpVariable(f"z_export_{t}", cat=pulp.LpBinary) for t in range(T)]
@@ -499,7 +569,7 @@ def solve_dispatch(
soc_prev_expr = current_soc_wh if t == 0 else soc[t - 1]
arb_t = arb_floor_series[t]
soc_low_t = soc_min_series[t]
soc_low_t = soc_panel_min[t]
# Při relaxovaném dnu (soc_low pod DB min_soc Wh) nesmí větev w_arb=1 znovu vynutit arb_t
# (typicky ~rezerva 20 %) — jinak nejde „vypustit“ baterku k planner floor 5 %.
if soc_low_t < min_soc_wh - 1e-3:
@@ -515,15 +585,15 @@ def solve_dispatch(
+ battery.max_discharge_power_w * w_arb[t]
)
# Významný export ⇒ koncové SoC ≥ ekonomická rezerva (arb_base_wh), ne dynamická arb_floor_series
# Významný export ⇒ koncové SoC ≥ podlaha (viz soc_panel_min / arb_base).
m_ge = float(grid.max_export_power_w)
m_soc_bigm = float(battery.usable_capacity_wh)
prob += ge[t] <= m_ge * z_export[t]
prob += ge[t] >= GE_MIN_EXPORT_W * z_export[t]
# Bez relaxace: export končí ≥ rezerva (arb_base). Při relaxaci (_soc_min_wh_series pod min_soc)
# Bez hluboké relaxace: export končí ≥ rezerva. Při hluboké relaxaci (soc_panel_min pod min_soc)
# sladit s LP spodkem — jinak z_export vynutil arb_base a blokoval vývoz k planner floor.
if soc_min_series[t] < min_soc_wh - 1e-3:
export_soc_floor_t = float(soc_min_series[t])
if soc_panel_min[t] < min_soc_wh - 1e-3:
export_soc_floor_t = float(soc_panel_min[t])
else:
export_soc_floor_t = float(arb_base_wh)
prob += soc[t] >= export_soc_floor_t - m_soc_bigm * (1 - z_export[t])
@@ -915,6 +985,7 @@ async def _load_site_context(site_id: int, db):
planner_soc_max = float(b.get("planner_soc_max_wh", b["soc_max_wh"]))
floor_pct = b.get("planner_discharge_floor_percent")
buy_thr = b.get("planner_extreme_buy_threshold_czk_kwh")
relax_prewin = b.get("planner_discharge_relax_prewindow_slots")
battery = SimpleNamespace(
usable_capacity_wh=float(b["usable_capacity_wh"]),
min_soc_wh=float(b["min_soc_wh"]),
@@ -934,6 +1005,9 @@ async def _load_site_context(site_id: int, db):
else 0,
planner_extreme_buy_threshold_czk_kwh=float(buy_thr) if buy_thr is not None else -5.0,
planner_discharge_floor_percent=float(floor_pct) if floor_pct is not None else None,
planner_discharge_relax_prewindow_slots=int(relax_prewin)
if relax_prewin is not None
else DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS,
)
hpj = ctx["heat_pump"]

View File

@@ -9,6 +9,8 @@ from types import SimpleNamespace
from services.planning_engine import (
PlanningSlot,
_dynamic_arb_floor_wh_series,
_slots_until_buy_le_threshold,
_soc_panel_min_wh_series,
solve_dispatch,
)
@@ -58,6 +60,48 @@ def _battery(
)
class SlotsUntilBuyExtremeTests(unittest.TestCase):
def test_slots_until_first_extreme(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=1.0,
sell_price=1.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
slots[-1] = PlanningSlot(
interval_start=slots[-1].interval_start,
buy_price=-10.0,
sell_price=0.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
dist = _slots_until_buy_le_threshold(slots, -2.0)
self.assertEqual(dist[0], 9)
self.assertEqual(dist[8], 1)
self.assertEqual(dist[9], 0)
def test_prewindow_clamps_relaxed_floor_until_close(self) -> None:
sm = [5000.0] * 10
dist = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
panel = _soc_panel_min_wh_series(sm, dist, 10_000.0, 20_000.0, 2)
self.assertEqual(panel[0], 20_000.0)
self.assertEqual(panel[6], 20_000.0)
self.assertEqual(panel[7], 5000.0)
self.assertEqual(panel[9], 5000.0)
class DynamicArbFloorTests(unittest.TestCase):
def test_more_pv_ahead_lowers_floor(self) -> None:
"""Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu."""