posun dovybijejiciho okna tesne pred zapornou cenu
Some checks failed
CI and deploy / migration-check (push) Failing after 8s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-04-26 01:39:48 +02:00
parent 0edf9226cb
commit c6ca68b263
4 changed files with 128 additions and 12 deletions

View File

@@ -37,9 +37,9 @@ SOLVER_TIME_LIMIT = 10 # sekund
# (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_panel_min[t]
# (planner floor), jinak by šlo jen do zátěže a nešlo by „vypustit do sítě“ před levným nákupem.
GE_MIN_EXPORT_W = 1.0
# Dokud je první „extrémní“ buy dál než tento počet 15min slotů, držíme plánovací spodek na rezervě
# (arb_base_wh) místo hlubokého planner floor — aby šlo nejdřív vybíjet „standardně“ a hluboký
# dump až těsně před oknem záporných cen (operativní buffer).
# Dokud je kotva pro hluboký dump (první sell < 0 v horizontu, jinak první extrémní buy) dál než
# tento počet 15min slotů, držíme plánovací spodek na rezervě (arb_base_wh) místo planner floor —
# priorita: beze „ztráty na prodeji“ (sell >= 0) držet buffer, hluboký vývoz až těsně před záporným prodejem.
DEFAULT_PLANNER_DISCHARGE_RELAX_PREWINDOW_SLOTS = 8
CORRECTION_WINDOW_H = 1 # hodina zpět pro výpočet korekčního faktoru
CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru
@@ -242,22 +242,68 @@ def _slots_until_buy_le_threshold(
return out
def _slots_until_sell_lt(slots: list[PlanningSlot], sell_upper: float) -> list[int]:
"""
Pro slot t: kolik slotů (0 = tento slot) do nejbližšího k>=t s sell_price < sell_upper.
Typicky sell_upper=0 (první záporný / „ztrátový“ prodej z pohledu OTE).
Pokud v [t, T) žádný takový není, vrátí T + 1.
"""
t_len = len(slots)
sentinel = t_len + 1
next_lt = sentinel
next_at_or_after: list[int] = [sentinel] * t_len
for t in range(t_len - 1, -1, -1):
if float(slots[t].sell_price) < sell_upper:
next_lt = t
next_at_or_after[t] = next_lt
out: list[int] = []
for t in range(t_len):
nxt = next_at_or_after[t]
if nxt >= t_len:
out.append(sentinel)
else:
out.append(nxt - t)
return out
def _prewindow_deferral_slots(
slots: list[PlanningSlot], buy_extreme_threshold: float, sell_upper: float = 0.0
) -> list[int]:
"""
Vzdálenost (v 15min slotech) pro zpoždění hlubokého planner flooru:
primárně do prvního sell < sell_upper (poslední „bez ztráty na prodeji“ je k-1),
pokud v horizontu není záporný prodej, fallback na první buy <= buy_extreme_threshold.
"""
t_len = len(slots)
sell_d = _slots_until_sell_lt(slots, sell_upper)
buy_d = _slots_until_buy_le_threshold(slots, buy_extreme_threshold)
sentinel = t_len + 1
out: list[int] = []
for t in range(t_len):
if sell_d[t] < sentinel:
out.append(sell_d[t])
else:
out.append(buy_d[t])
return out
def _soc_panel_min_wh_series(
soc_min_series: list[float],
slots_until_buy_extreme: list[int],
slots_until_relax_anchor: list[int],
min_soc_wh: float,
arb_base_wh: float,
prewindow_slots: int,
) -> list[float]:
"""
Zpoždění hluboké relaxace: pokud je lookahead extrémní, ale první extrémní buy je dál než
prewindow_slots, drž spodek na max(relax_wh, arb_base_wh) — prakticky na rezervě.
Zpoždění hluboké relaxace: pokud je lookahead extrémní (soc_min pod min_soc), ale kotva
(záporný prodej / fallback extrémní buy) je dál než prewindow_slots, drž spodek na
max(relax_wh, arb_base_wh) — prakticky na rezervě.
"""
t_len = len(soc_min_series)
out: list[float] = []
for t in range(t_len):
sm = float(soc_min_series[t])
if sm < min_soc_wh - 1e-3 and slots_until_buy_extreme[t] > prewindow_slots:
if sm < min_soc_wh - 1e-3 and slots_until_relax_anchor[t] > prewindow_slots:
out.append(max(sm, float(arb_base_wh)))
else:
out.append(sm)
@@ -443,10 +489,10 @@ def solve_dispatch(
)
),
)
slots_until_buy_extreme = _slots_until_buy_le_threshold(slots, buy_extreme_thr)
deferral_slots = _prewindow_deferral_slots(slots, buy_extreme_thr)
soc_panel_min = _soc_panel_min_wh_series(
soc_min_series,
slots_until_buy_extreme,
deferral_slots,
min_soc_wh,
arb_base_wh,
prewin,

View File

@@ -9,7 +9,9 @@ from types import SimpleNamespace
from services.planning_engine import (
PlanningSlot,
_dynamic_arb_floor_wh_series,
_prewindow_deferral_slots,
_slots_until_buy_le_threshold,
_slots_until_sell_lt,
_soc_panel_min_wh_series,
solve_dispatch,
)
@@ -60,6 +62,68 @@ def _battery(
)
class SlotsUntilSellNegativeTests(unittest.TestCase):
def test_slots_until_first_negative_sell(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=1.0,
sell_price=2.0 if i < 4 else -0.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
dist = _slots_until_sell_lt(slots, 0.0)
self.assertEqual(dist[0], 4)
self.assertEqual(dist[3], 1)
self.assertEqual(dist[4], 0)
def test_prewindow_deferral_prefers_sell_anchor(self) -> None:
"""Když existuje záporný prodej, kotva je vzdálenost k němu, ne k extrémnímu buy."""
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(8):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=-50.0,
sell_price=1.0 if i < 2 else -0.1,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
adv = _prewindow_deferral_slots(slots, -2.0)
self.assertEqual(adv[0], 2)
def test_prewindow_deferral_falls_back_to_buy_when_no_negative_sell(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
slots: list[PlanningSlot] = []
for i in range(10):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=3.0 if i < 7 else -10.0,
sell_price=2.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
adv = _prewindow_deferral_slots(slots, -2.0)
self.assertEqual(adv[0], 7)
class SlotsUntilBuyExtremeTests(unittest.TestCase):
def test_slots_until_first_extreme(self) -> None:
base = datetime(2026, 4, 3, 0, 0, tzinfo=timezone.utc)
@@ -94,7 +158,7 @@ class SlotsUntilBuyExtremeTests(unittest.TestCase):
def test_prewindow_clamps_relaxed_floor_until_close(self) -> None:
sm = [5000.0] * 10
dist = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
dist = [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] # obecná kotva (sell nebo buy)
panel = _soc_panel_min_wh_series(sm, dist, 10_000.0, 20_000.0, 2)
self.assertEqual(panel[0], 20_000.0)
self.assertEqual(panel[6], 20_000.0)