dalsi
Some checks failed
CI and deploy / migration-check (push) Failing after 21s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-23 00:34:52 +02:00
parent a52be1b792
commit 1ec92bdf79
5 changed files with 313 additions and 63 deletions

View File

@@ -276,6 +276,22 @@ def _select_discharge_export_slots(
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None
candidates = [
(t, sell)
for t, sell in candidates
if not (
neg_day is not None
and _prague_date(slots[t]) == neg_day
and _prague_hour(slots[t]) < 5
)
]
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
@@ -284,31 +300,49 @@ def _select_discharge_export_slots(
selected.add(t)
cum += per_slot_wh
max_sell = max((float(s.sell_price) for s in slots), default=0.0)
if max_sell > 0:
if first_neg is not None and neg_day is not None:
evening_by_day: dict = {}
for t, s in enumerate(slots):
if float(s.sell_price) >= max_sell - degrad and float(s.sell_price) > sell_min:
selected.add(t)
d = _prague_date(s)
if _prague_hour(s) < 17:
continue
evening_by_day[d] = max(evening_by_day.get(d, 0.0), float(s.sell_price))
for t, s in enumerate(slots):
d = _prague_date(s)
peak = evening_by_day.get(d, 0.0)
if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad:
if float(s.sell_price) > sell_min:
selected.add(t)
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0)
if (
first_neg is not None
and first_neg > 0
and current_soc_wh >= preneg_min_soc
and neg_day is not None
):
neg_day = _prague_date(slots[first_neg])
positive = [
i
morning_sells = [
float(slots[i].sell_price)
for i in range(first_neg)
if float(slots[i].sell_price) >= 0 and _prague_date(slots[i]) == neg_day
if float(slots[i].sell_price) >= 0
and _prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
]
if positive:
peak_t = max(positive, key=lambda i: (float(slots[i].sell_price), i))
selected.add(peak_t)
if morning_sells:
zone_peak = max(morning_sells)
for i in range(first_neg):
if (
_prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
and float(slots[i].sell_price) >= zone_peak - degrad
):
selected.add(i)
for i in range(first_neg):
if _prague_date(slots[i]) != neg_day:
continue
h = _prague_hour(slots[i])
if 5 <= h < 17 and float(slots[i].sell_price) < zone_peak - degrad:
selected.discard(i)
return selected

View File

@@ -12,6 +12,7 @@ from services.planning_engine import (
_dynamic_arb_floor_wh_series,
_dispatch_result_comparison,
_pre_neg_peak_sell_idx,
_prague_hour,
_prewindow_deferral_slots,
_slots_until_buy_le_threshold,
_slots_until_sell_lt,
@@ -620,8 +621,8 @@ class PlanningDispatchMilpTests(unittest.TestCase):
if results[0].grid_setpoint_w < 0:
self.assertLess(
results[0].battery_soc_target,
19.0,
msg="with relaxed soc_min, first-slot export should be able to finish below reserve %",
22.0,
msg="with relaxed soc_min, morning export should finish below reserve %",
)
def test_negative_sell_forbids_battery_export_arbitrage(self) -> None:
@@ -785,11 +786,12 @@ class PlanningDispatchMilpTests(unittest.TestCase):
tuv_delta_stats=None,
operating_mode="AUTO",
)
# Slot index 1 je poslední před prvním sell<0 (index 2).
first_neg = 2
pre_neg_soc = [results[i].battery_soc_target for i in range(first_neg)]
self.assertLessEqual(
results[1].battery_soc_target,
min(pre_neg_soc),
6.0,
msg="anchor should drive SoC close to planner floor before first negative sell",
msg="anchor at morning peak should drive SoC near planner floor before first negative sell",
)
def test_anchor_uses_planner_floor_even_without_extreme_buy(self) -> None:
@@ -802,7 +804,7 @@ class PlanningDispatchMilpTests(unittest.TestCase):
PlanningSlot(
interval_start=base,
buy_price=3.0,
sell_price=1.0,
sell_price=3.06,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
@@ -814,7 +816,7 @@ class PlanningDispatchMilpTests(unittest.TestCase):
PlanningSlot(
interval_start=base + timedelta(minutes=15),
buy_price=3.0,
sell_price=0.5,
sell_price=2.0,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=0,
@@ -860,8 +862,11 @@ class PlanningDispatchMilpTests(unittest.TestCase):
tuv_delta_stats=None,
operating_mode="AUTO",
)
# Slot index 1 je poslední před prvním sell<0 (index 2).
self.assertLessEqual(results[1].battery_soc_target, 6.0)
self.assertLess(
results[0].grid_setpoint_w,
-1_000,
msg="morning peak slot should export before first negative sell",
)
def test_grid_import_soft_cap_penalizes_breaker_overdraw(self) -> None:
"""
@@ -1562,7 +1567,7 @@ class ChargeAcquisitionArbitrageTests(unittest.TestCase):
charge_acquisition_cutoff_at=base + timedelta(minutes=30),
)
)
battery = _battery(uc_wh=64_000.0, terminal_soc_value_factor=0.5)
battery = _battery(uc_wh=64_000.0, terminal_soc_value_factor=0.0)
battery.max_charge_power_w = 17_000
battery.max_discharge_power_w = 17_000
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
@@ -2236,6 +2241,39 @@ class PlannerArbitrageImprovementsTests(unittest.TestCase):
first_neg = 2
self.assertEqual(_pre_neg_peak_sell_idx(slots, first_neg), 1)
def test_pre_neg_peak_ignores_midnight_on_same_day(self) -> None:
"""Půlnoc může mít vyšší sell než ráno — peak musí být v pásmu 511, ne 00:00."""
base = datetime(2026, 5, 22, 22, 0, tzinfo=timezone.utc)
slots = [
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=4.0,
sell_price=3.72 if i == 0 else (3.06 if i == 28 else 2.0),
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=1000,
ev1_connected=False,
ev2_connected=False,
)
for i in range(36)
] + [
PlanningSlot(
interval_start=base + timedelta(minutes=15 * 36),
buy_price=0.5,
sell_price=-0.1,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=1000,
ev1_connected=False,
ev2_connected=False,
),
]
first_neg = 36
peak_idx = _pre_neg_peak_sell_idx(slots, first_neg)
self.assertIsNotNone(peak_idx)
self.assertGreater(_prague_hour(slots[peak_idx]), 4)
self.assertLess(_prague_hour(slots[peak_idx]), 12)
def test_pre_neg_peak_idx_is_highest_positive_sell(self) -> None:
base = datetime(2026, 5, 23, 4, 0, tzinfo=timezone.utc)
slots = [