Use observed SoC for neg-prep cushion and evening drain (v40).
Some checks failed
CI and deploy / migration-check (push) Failing after 26s
CI and deploy / deploy (push) Has been skipped

Pre-neg forecast cushion and evening push before negative-sell days now use telemetry SoC instead of chaining LP targets across days, so the planner does not stop discharging early when BMS is higher than the model.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Dusan Vojacek
2026-05-29 00:20:05 +02:00
parent a7dff75e58
commit 88df09640c
5 changed files with 306 additions and 34 deletions

View File

@@ -19,8 +19,11 @@ from services.planning_engine import (
_in_night_battery_export_window,
_neg_sell_day_phases,
_neg_sell_phases_enabled,
_neg_evening_before_neg_push_indices,
_neg_evening_discharge_budget_wh,
_neg_evening_reserve_soc_anchors,
_pre_neg_pv_export_bundle,
_pre_neg_pv_export_forecast_cushion_ok_for_day,
_prague_calendar_date,
_pre_neg_buy_soc_ceiling_wh,
_pre_neg_peak_sell_idx,
@@ -4472,6 +4475,156 @@ class NegSellPrepWindowV36Tests(unittest.TestCase):
self.assertLessEqual(soc_wh, cap_wh + 800.0)
eve_slots = snap["inputs"].get("neg_evening_before_neg_slots") or []
self.assertGreater(len(eve_slots), 8)
push_slots = snap["inputs"].get("neg_evening_push_slots") or []
self.assertGreater(len(push_slots), 0)
self.assertIn("observed_soc_wh", snap["inputs"])
self.assertIsNotNone(snap["inputs"].get("neg_evening_export_budget_wh"))
class ObservedSocNegPrepTests(unittest.TestCase):
"""v40: neg-prep a večerní výboj z pozorovaného SoC (telemetrie), ne z LP trajektorie."""
def test_cushion_ok_when_observed_above_prep_target(self) -> None:
base = datetime(2026, 6, 11, 6, 0, tzinfo=ZoneInfo("Europe/Prague")).astimezone(
timezone.utc
)
slots: list[PlanningSlot] = []
for i in range(48):
local = (base + timedelta(minutes=15 * i)).astimezone(
ZoneInfo("Europe/Prague")
)
sell = -0.2 if local.hour >= 10 else 3.0
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sell,
pv_a_forecast_w=5000,
pv_b_forecast_w=8000,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
)
)
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
_ph, tg, _w, _meta = _neg_sell_day_phases(slots, bat)
first_neg = next(i for i, s in enumerate(slots) if float(s.sell_price) < 0)
tgt = float(tg[first_neg] or bat.soc_max_wh)
observed_high = tgt + 5000.0
self.assertTrue(
_pre_neg_pv_export_forecast_cushion_ok_for_day(
slots,
bat,
first_neg,
observed_high,
neg_sell_phases_en=True,
soc_target_by_t=tg,
),
"pozorované SoC nad prep cílem → cushion bez forecastu",
)
def test_pre_neg_bundle_uses_observed_not_model_chain(self) -> None:
base = datetime(2026, 6, 10, 10, 0, tzinfo=ZoneInfo("Europe/Prague")).astimezone(
timezone.utc
)
slots: list[PlanningSlot] = []
for i in range(120):
local = (base + timedelta(minutes=15 * i)).astimezone(
ZoneInfo("Europe/Prague")
)
h = local.hour + local.minute / 60.0
if local.date().day == 10:
sell = -0.2 if h >= 14 else 2.5
elif local.date().day == 11:
sell = -0.2 if 9 <= h < 15 else 2.8
else:
sell = 2.5
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sell,
pv_a_forecast_w=7000,
pv_b_forecast_w=9000,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
)
)
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
_ph, tg, _w, meta = _neg_sell_day_phases(slots, bat)
observed = 0.72 * bat.soc_max_wh
export_ts, cushion = _pre_neg_pv_export_bundle(
slots,
bat,
observed,
None,
neg_sell_phases_en=True,
soc_target_by_t=tg,
)
self.assertTrue(all(cushion.values()))
if len(meta.get("days", [])) >= 2:
second_first = int(meta["days"][1]["first_neg_idx"])
second_morning = [
t for t in export_ts if t < second_first and float(slots[t].sell_price) >= 0.0
]
self.assertGreater(len(second_morning), 0)
def test_neg_evening_push_scales_with_observed_soc(self) -> None:
base = datetime(2026, 6, 10, 18, 0, tzinfo=ZoneInfo("Europe/Prague")).astimezone(
timezone.utc
)
slots: list[PlanningSlot] = []
for i in range(16):
slots.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=3.0 + i * 0.1,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=500,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=True,
)
)
candidates = set(range(16))
bat = _battery(uc_wh=64_000.0, max_pct=95.0, arb_pct=20.0)
reserve = float(bat.reserve_soc_wh)
per_slot = 3000.0
budget_low = _neg_evening_discharge_budget_wh(
observed_soc_wh=reserve + 2000.0,
reserve_soc_wh=reserve,
night_baseload_buffer_wh=1000.0,
)
budget_high = _neg_evening_discharge_budget_wh(
observed_soc_wh=0.70 * bat.soc_max_wh,
reserve_soc_wh=reserve,
night_baseload_buffer_wh=1000.0,
)
push_low = _neg_evening_before_neg_push_indices(
slots,
candidates,
export_budget_wh=budget_low,
per_slot_discharge_wh=per_slot,
)
push_high = _neg_evening_before_neg_push_indices(
slots,
candidates,
export_budget_wh=budget_high,
per_slot_discharge_wh=per_slot,
)
self.assertLess(len(push_low), len(push_high))
class SocBalanceDischargeTests(unittest.TestCase):