uprava aby rano prodaval do site pred sell < 0 oknem
Some checks failed
CI and deploy / migration-check (push) Failing after 20s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-26 08:29:05 +02:00
parent da79eec077
commit b4e5fc5040
4 changed files with 303 additions and 14 deletions

View File

@@ -71,7 +71,12 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-neg-sell-soc-phases-v32"
PLANNER_BUILD_TAG = "2026-05-28-pre-neg-pv-export-forecast-v33"
# Před prvním sell<0: export FVE jen pokud predikce v sell<0 okně pokryje dobítí na prep cíl.
PRE_NEG_PV_EXPORT_FORECAST_MARGIN = 1.15
PRE_NEG_PV_EXPORT_MIN_NEEDED_WH = 2500.0
PRE_NEG_PV_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 55.0
PRE_NEG_PV_BCPV_DISCOURAGE_CZK_KWH = 90.0
POS_SELL_PRE_NEG_SOC_SHORTFALL_PENALTY_CZK_PER_WH = 0.30
PRE_NEG_BUY_SOC_CEILING_SLACK_PENALTY_CZK_PER_WH = 0.25
PRE_NEG_BUY_EMPTY_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
@@ -860,6 +865,90 @@ def _neg_sell_day_phases(
return phases, soc_targets, shortfall_weights
def _neg_sell_day_pv_usable_wh(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
*,
max_charge_power_w: float,
charge_efficiency: float,
) -> float:
"""
Odhad Wh nabitelné z FVE v sell<0 slotech téhož pražského dne (forecast surplus × cap nabíjení).
"""
if first_neg_sell_idx is None:
return 0.0
neg_day = _prague_calendar_date(slots[first_neg_sell_idx])
total_wh = 0.0
for s in slots:
if _prague_calendar_date(s) != neg_day:
continue
if float(s.sell_price) >= 0.0:
continue
pv_surplus_w = max(
0.0,
float(s.pv_a_forecast_w)
+ float(s.pv_b_forecast_w)
- float(s.load_baseline_w),
)
if pv_surplus_w <= 500.0:
continue
cap_w = min(pv_surplus_w, float(max_charge_power_w))
total_wh += cap_w * INTERVAL_H * float(charge_efficiency)
return total_wh
def _pre_neg_pv_export_forecast_cushion_ok(
slots: list[PlanningSlot],
battery: Any,
current_soc_wh: float,
first_neg_sell_idx: int | None,
*,
neg_sell_phases_en: bool,
) -> bool:
"""
Export FVE před sell<0 jen pokud forecast v záporném okně pokryje dobítí na cíl (typ. 80 %).
Jinak raději nabíjet teď — riziko deště / podhodnocené FVE v sell<0.
"""
if first_neg_sell_idx is None or first_neg_sell_idx <= 0:
return False
prep_pct = float(getattr(battery, "planner_neg_sell_prep_soc_percent", 100.0))
if neg_sell_phases_en and prep_pct < 100.0 - 1e-6:
target_wh = prep_pct / 100.0 * float(battery.soc_max_wh)
else:
target_wh = float(battery.soc_max_wh)
needed_wh = max(0.0, target_wh - float(current_soc_wh))
if needed_wh < PRE_NEG_PV_EXPORT_MIN_NEEDED_WH:
return True
usable_wh = _neg_sell_day_pv_usable_wh(
slots,
first_neg_sell_idx,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
return usable_wh >= needed_wh * PRE_NEG_PV_EXPORT_FORECAST_MARGIN
def _pre_neg_pv_export_slot_indices(
slots: list[PlanningSlot],
first_neg_sell_idx: int | None,
pre_neg_export_last_t: int | None,
first_neg_buy_idx: int | None,
) -> set[int]:
"""Sloty s kladným sell před prvním sell<0 (a před buy<0), PV přebytek)."""
if first_neg_sell_idx is None or pre_neg_export_last_t is None:
return set()
out: set[int] = set()
for t in range(pre_neg_export_last_t + 1):
if float(slots[t].sell_price) < 0.0:
continue
if first_neg_buy_idx is not None and t >= first_neg_buy_idx:
continue
if _slot_pv_surplus_w(slots[t]) <= NIGHT_EXPORT_PV_SUNRISE_SURPLUS_W:
continue
out.add(t)
return out
MORNING_PRENEG_START_HOUR = 5
MORNING_PRENEG_END_HOUR = 11
@@ -1646,6 +1735,29 @@ def solve_dispatch(
prep_hold_bcpv_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_curtail_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
prep_hold_met_binary: dict[int, pulp.LpVariable] = {}
pre_neg_pv_export_forecast_ok = bool(
om == "AUTO"
and not purchase_fixed_pre
and first_neg_sell_idx is not None
and pre_neg_export_last_t is not None
and _pre_neg_pv_export_forecast_cushion_ok(
slots,
battery,
current_soc_wh,
first_neg_sell_idx,
neg_sell_phases_en=neg_sell_phases_en,
)
)
pre_neg_pv_export_ts = (
_pre_neg_pv_export_slot_indices(
slots,
first_neg_sell_idx,
pre_neg_export_last_t,
first_neg_buy_idx,
)
if pre_neg_pv_export_forecast_ok
else set()
)
pre_neg_buy_discharge_ts: set[int] = set()
if om == "AUTO" and first_neg_buy_idx is not None and first_neg_buy_idx > 0:
pre_neg_buy_discharge_ts = _pre_neg_buy_discharge_indices(
@@ -1815,6 +1927,7 @@ def solve_dispatch(
peak_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
pre_neg_pv_export_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_bat_dump_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
neg_sell_soc_underfill: list[tuple[int, pulp.LpVariable, float]] = []
neg_buy_charge_shortfall: list[tuple[int, pulp.LpVariable, float]] = []
@@ -1904,6 +2017,25 @@ def solve_dispatch(
cap_ns = float(min(pv_surplus_ns, battery.max_charge_power_w))
sf_ns = pulp.LpVariable(f"neg_phase_pv_charge_{t_ns}", 0, cap_ns)
pv_charge_shortfall.append((t_ns, sf_ns, cap_ns))
if pre_neg_pv_export_forecast_ok:
for t_pe in sorted(pre_neg_pv_export_ts):
s_pe = slots[t_pe]
pv_surplus_pe = max(
0.0,
float(s_pe.pv_a_forecast_w)
+ float(s_pe.pv_b_forecast_w)
- float(s_pe.load_baseline_w),
)
cap_pe = float(
min(
pv_surplus_pe,
float(grid.max_export_power_w),
)
)
if cap_pe <= 500.0:
continue
sf_pe = pulp.LpVariable(f"pre_neg_pv_export_sf_{t_pe}", 0, cap_pe)
pre_neg_pv_export_shortfall.append((t_pe, sf_pe, cap_pe))
for t in range(T):
if not post_neg_pv_topup[t]:
continue
@@ -2122,6 +2254,17 @@ def solve_dispatch(
sf * NEG_SELL_CURTAIL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in prep_hold_curtail_shortfall
)
+ pulp.lpSum(
sf * PRE_NEG_PV_EXPORT_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in pre_neg_pv_export_shortfall
)
+ pulp.lpSum(
bc_pv[t]
* PRE_NEG_PV_BCPV_DISCOURAGE_CZK_KWH
* INTERVAL_H
/ 1000.0
for t in pre_neg_pv_export_ts
)
+ pulp.lpSum(
sf * NEG_SELL_BAT_DUMP_SHORTFALL_PENALTY_CZK_KWH * INTERVAL_H / 1000.0
for _t, sf, _cap in neg_sell_bat_dump_shortfall
@@ -2211,6 +2354,8 @@ def solve_dispatch(
prob += sf >= cap_w - ge_bat[t_sf]
for t_sf, sf, cap_w in pre_neg_pv_charge_shortfall:
prob += sf >= cap_w - bc_pv[t_sf]
for t_sf, sf, cap_w in pre_neg_pv_export_shortfall:
prob += sf >= cap_w - ge_pv[t_sf]
preneg_export_min_soc_wh = float(min_soc_wh) + max(
float(battery.max_discharge_power_w)
* float(battery.discharge_efficiency)
@@ -2662,6 +2807,9 @@ def solve_dispatch(
):
prob += ge_bat[t] == 0
prob += z_export[t] == 0
for t_pne in pre_neg_pv_export_ts:
# v33: při dostatečné FVE v sell<0 okně neukládat ranní PV do baterie — export.
prob += bc_pv[t_pne] == 0
# Ekonomické guardy: ceny v objective nestačí proti maskám / terminal SoC.
# Referenční buy jen z ne-záporných slotů: jinak jeden buy<0 v horizontu označí
@@ -2689,18 +2837,8 @@ def solve_dispatch(
0.0,
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_t,
)
# FVE export: před prvním sell<0 smí jít přebytek do sítě (kladný sell), pak nabít
# v záporném okně z PV. Jinak držet energii na future_sell peak.
allow_pre_neg_pv_export = (
first_neg_sell_idx is not None
and pre_neg_export_last_t is not None
and t <= pre_neg_export_last_t
and sell_t >= 0
and (
first_neg_buy_idx is None
or t < first_neg_buy_idx
)
)
# FVE export před sell<0 jen pokud forecast v sell<0 okně pokryje dobítí (v33).
allow_pre_neg_pv_export = t in pre_neg_pv_export_ts
pv_store_val = _pv_store_value_czk_kwh(s, min_spread)
skip_pv_store_block = (
float(s.pv_b_forecast_w) > 0
@@ -3124,6 +3262,20 @@ def solve_dispatch(
),
},
"neg_sell_phases_enabled": bool(neg_sell_phases_en),
"pre_neg_pv_export_forecast_ok": bool(pre_neg_pv_export_forecast_ok),
"pre_neg_pv_export_slots": [
slots[i].interval_start.isoformat() for i in sorted(pre_neg_pv_export_ts)
],
"neg_sell_day_pv_usable_wh": (
_neg_sell_day_pv_usable_wh(
slots,
first_neg_sell_idx,
max_charge_power_w=float(battery.max_charge_power_w),
charge_efficiency=float(battery.charge_efficiency),
)
if first_neg_sell_idx is not None
else None
),
"load_first_enabled": om == "AUTO",
"relaxed_expensive_import": relaxed_expensive_import,
"charge_acquisition_buy_czk_kwh": charge_acquisition_czk_kwh,

View File

@@ -21,6 +21,7 @@ from services.planning_engine import (
_neg_sell_phases_enabled,
_pre_neg_buy_soc_ceiling_wh,
_pre_neg_peak_sell_idx,
_pre_neg_pv_export_forecast_cushion_ok,
_prague_hour,
_prewindow_deferral_slots,
_slots_until_buy_le_threshold,
@@ -3845,5 +3846,132 @@ class NegSellSocPhaseTests(unittest.TestCase):
self.assertLessEqual(max(0, -results[-1].grid_setpoint_w), 500)
class PreNegPvExportForecastTests(unittest.TestCase):
"""v33: export FVE před sell<0 jen pokud forecast v sell<0 okně pokryje prep SoC."""
@staticmethod
def _slots_morning_then_neg(n: int = 22, *, neg_pv_scale: float = 1.0) -> list[PlanningSlot]:
base = datetime(2026, 6, 10, 6, 0, tzinfo=timezone.utc)
out: list[PlanningSlot] = []
for i in range(n):
sell = -0.25 if i >= 6 else (2.8 if i < 4 else 1.2)
if i >= 6:
pv_a = (8000 + (i - 6) * 500) * neg_pv_scale
pv_b = 6000.0 * neg_pv_scale
else:
pv_a = 1500 + i * 400
pv_b = 1500.0
future_sell = 6.5 if sell >= 0 else None
out.append(
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=pv_b,
load_baseline_w=450,
ev1_connected=False,
ev2_connected=False,
allow_charge=True,
allow_discharge_export=False,
future_sell_opportunity_czk_kwh=future_sell,
)
)
return out
def test_cushion_ok_when_neg_window_pv_large(self) -> None:
slots = self._slots_morning_then_neg()
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
self.assertTrue(
_pre_neg_pv_export_forecast_cushion_ok(
slots,
bat,
0.30 * bat.soc_max_wh,
6,
neg_sell_phases_en=True,
)
)
def test_cushion_fail_when_neg_window_pv_tiny(self) -> None:
slots = self._slots_morning_then_neg(neg_pv_scale=0.05)
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
self.assertFalse(
_pre_neg_pv_export_forecast_cushion_ok(
slots,
bat,
0.30 * bat.soc_max_wh,
6,
neg_sell_phases_en=True,
)
)
def test_morning_exports_pv_when_cushion_ok(self) -> None:
slots = self._slots_morning_then_neg()
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
bat.planner_neg_sell_vent_min_sell_czk_kwh = -1.0
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(
max_import_power_w=20_000,
max_export_power_w=13_500,
block_export_on_negative_sell=False,
)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
results, _, snap = solve_dispatch(
slots,
bat,
hp,
grid,
[None, None],
vehicles,
0.30 * bat.soc_max_wh,
50.0,
operating_mode="AUTO",
)
self.assertTrue(snap["inputs"].get("pre_neg_pv_export_forecast_ok"))
self.assertIn(
slots[2].interval_start.isoformat(),
snap["inputs"].get("pre_neg_pv_export_slots") or [],
)
self.assertLess(results[2].grid_setpoint_w, -500)
def test_morning_charges_when_cushion_fail(self) -> None:
slots = self._slots_morning_then_neg(neg_pv_scale=0.05)
bat = _battery(uc_wh=64_000.0, max_pct=95.0)
bat.planner_neg_sell_prep_soc_percent = 80.0
bat.planner_neg_sell_full_soc_tail_slots = 4
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(
max_import_power_w=20_000,
max_export_power_w=13_500,
block_export_on_negative_sell=False,
)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
results, _, snap = solve_dispatch(
slots,
bat,
hp,
grid,
[None, None],
vehicles,
0.30 * bat.soc_max_wh,
50.0,
operating_mode="AUTO",
)
self.assertFalse(snap["inputs"].get("pre_neg_pv_export_forecast_ok"))
self.assertGreater(results[2].battery_setpoint_w, 2000)
if __name__ == "__main__":
unittest.main()