Branch 5: dynamický terminal SoC factor při future neg buy
Some checks failed
CI and deploy / migration-check (push) Failing after 15s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-06-06 22:38:05 +02:00
parent 0f7dc6ed94
commit 36cb06b9d0
4 changed files with 179 additions and 10 deletions

View File

@@ -91,8 +91,11 @@ NEG_EVENING_PREP_DISCHARGE_SHORTFALL_PENALTY_CZK_KWH = 120.0
# Kotva: SoC na konci večera D1 a těsně před 1. sell<0 ráno D ≤ reserve_soc.
NEG_EVENING_RESERVE_SOC_MAX_SLACK_WH = 400.0
NEG_EVENING_RESERVE_SOC_SLACK_PENALTY_CZK_PER_WH = 55.0
# Terminal SoC shadow price: při blízkém buy<0 nesmí LP „šetřit“ baterii ve večerní špičce.
FUTURE_NEG_BUY_TERMINAL_SOC_FACTOR_MULT = 0.1
# Terminal SoC shadow price: effective_factor = base × (1 w_neg); w_neg roste s blízkostí a záporností buy<0.
TERMINAL_NEG_BUY_WEIGHT_HORIZON_SLOTS = int(36 / INTERVAL_H)
TERMINAL_NEG_BUY_MAGNITUDE_REF_CZK = 1.0
TERMINAL_NEG_BUY_MAGNITUDE_FLOOR = 0.25
TERMINAL_NEG_BUY_WEIGHT_CAP = 0.95
# Před prvním sell<0: export FVE jen pokud predikce v sell<0 okně pokryje dobítí na prep cíl.
PRE_NEG_PV_EXPORT_FORECAST_MARGIN = 1.15
PRE_NEG_PV_EXPORT_MIN_NEEDED_WH = 2500.0
@@ -1388,6 +1391,38 @@ def _first_neg_sell_idx_on_prague_day(
return None
def _terminal_neg_buy_weight(
slots: list[PlanningSlot],
*,
first_neg_buy_idx: int | None,
) -> float:
"""
w_neg ∈ [0, TERMINAL_NEG_BUY_WEIGHT_CAP]: snížení terminal SoC shadow price při buy<0 v horizontu.
Blížší a zápornější okno → vyšší váha; effective_factor = planner_terminal_soc_value_factor × (1 w_neg).
"""
if first_neg_buy_idx is None or first_neg_buy_idx <= 0:
return 0.0
slots_ahead = first_neg_buy_idx
prox = max(
0.0,
1.0 - slots_ahead / TERMINAL_NEG_BUY_WEIGHT_HORIZON_SLOTS,
)
if prox <= 0.0:
return 0.0
window_end = min(len(slots), first_neg_buy_idx + int(24 / INTERVAL_H))
neg_buys = [
float(slots[t].buy_price)
for t in range(first_neg_buy_idx, window_end)
if float(slots[t].buy_price) < 0.0
]
if not neg_buys:
return 0.0
min_neg_buy = min(neg_buys)
mag_raw = min(1.0, abs(min_neg_buy) / TERMINAL_NEG_BUY_MAGNITUDE_REF_CZK)
mag = TERMINAL_NEG_BUY_MAGNITUDE_FLOOR + (1.0 - TERMINAL_NEG_BUY_MAGNITUDE_FLOOR) * mag_raw
return min(TERMINAL_NEG_BUY_WEIGHT_CAP, prox * mag)
def _future_neg_buy_discharge_enabled(
slots: list[PlanningSlot],
battery: Any,
@@ -2802,9 +2837,11 @@ def solve_dispatch(
neg_sell_soc_target_by_t if neg_sell_phases_en else None
),
)
terminal_factor = terminal_factor_base
if future_neg_buy_discharge_en:
terminal_factor *= FUTURE_NEG_BUY_TERMINAL_SOC_FACTOR_MULT
terminal_neg_buy_weight = _terminal_neg_buy_weight(
slots,
first_neg_buy_idx=first_neg_buy_idx,
)
terminal_factor = terminal_factor_base * (1.0 - terminal_neg_buy_weight)
# Kč/Wh: ocenění energie ponechané v baterii na konci horizontu (receding horizon kotva).
terminal_soc_kcz_per_wh = avg_buy_terminal * terminal_factor / 1000.0
@@ -4903,6 +4940,7 @@ def solve_dispatch(
),
"evening_push_hard_suppressed": bool(evening_push_hard_suppressed),
"future_neg_buy_discharge": bool(future_neg_buy_discharge_en),
"terminal_neg_buy_weight": float(terminal_neg_buy_weight),
"terminal_soc_factor_effective": float(terminal_factor),
"pos_sell_pre_neg_buy_ge_exempt_slots": [
slots[i].interval_start.isoformat()

View File

@@ -42,6 +42,7 @@ from services.planning_engine import (
_slots_until_buy_le_threshold,
_slots_until_sell_lt,
_soc_panel_min_wh_series,
_terminal_neg_buy_weight,
solve_dispatch,
solve_dispatch_two_pass,
)
@@ -2717,6 +2718,113 @@ class TerminalSocShadowTests(unittest.TestCase):
msg="terminal SoC shadow price should keep end-of-horizon SoC above bare minimum",
)
def test_terminal_neg_buy_weight_zero_without_future_neg_buy(self) -> None:
base = datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc)
slots = [
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=3.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=600,
ev1_connected=False,
ev2_connected=False,
)
for i in range(8)
]
self.assertEqual(_terminal_neg_buy_weight(slots, first_neg_buy_idx=None), 0.0)
self.assertEqual(_terminal_neg_buy_weight(slots, first_neg_buy_idx=0), 0.0)
def test_terminal_neg_buy_weight_scales_with_proximity_and_magnitude(self) -> None:
base = datetime(2026, 6, 6, 19, 0, tzinfo=timezone.utc)
slots_far: list[PlanningSlot] = []
for i in range(96):
buy = -0.9 if i >= 64 else 3.0
slots_far.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=buy,
sell_price=5.0 if buy > 0 else -0.2,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
slots_near: list[PlanningSlot] = []
for i in range(32):
buy = -1.0 if i >= 8 else 3.0
slots_near.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=buy,
sell_price=5.0 if buy > 0 else -0.2,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
)
)
w_far = _terminal_neg_buy_weight(slots_far, first_neg_buy_idx=64)
w_near = _terminal_neg_buy_weight(slots_near, first_neg_buy_idx=8)
self.assertGreater(w_far, 0.2)
self.assertLess(w_far, 0.75)
self.assertGreater(w_near, w_far)
self.assertGreater(w_near, 0.8)
def test_terminal_soc_factor_reduced_when_future_neg_buy(self) -> None:
prague = ZoneInfo("Europe/Prague")
base = datetime(2026, 6, 6, 19, 0, tzinfo=prague).astimezone(timezone.utc)
slots: list[PlanningSlot] = []
for i in range(96):
local = (base + timedelta(minutes=15 * i)).astimezone(prague)
d, h = local.day, local.hour
if d == 6:
buy, sell = 3.0, 5.3
else:
buy = -0.89 if 11 <= h < 14 else 2.0
sell = -0.2 if 5 <= h < 15 else 2.5
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
)
)
battery = _battery(uc_wh=64_000.0, terminal_soc_value_factor=0.9)
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(max_import_power_w=17_000, max_export_power_w=13_500)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
_results, _ms, snap = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
0.5 * battery.soc_max_wh,
50.0,
operating_mode="AUTO",
)
w_neg = float(snap["inputs"]["terminal_neg_buy_weight"])
eff = float(snap["inputs"]["terminal_soc_factor_effective"])
self.assertGreater(w_neg, 0.3)
self.assertLess(eff, 0.6)
self.assertAlmostEqual(eff, 0.9 * (1.0 - w_neg), places=6)
class SpreadGuardHome01EconomicsTests(unittest.TestCase):
"""Regrese: sell≪buy (VT) nesmí vést k PV exportu + masivnímu grid importu ve stejném slotu."""