Fix SoC balance on battery export and improve evening push (v39).
Some checks failed
CI and deploy / migration-check (push) Failing after 38s
CI and deploy / deploy (push) Has been skipped

SoC continuity now deducts only bd (ge_bat was double-counted via energy
balance), which stopped the plan from draining ~2× faster than BMS during
evening BATTERY_SELL. Also ships dynamic evening push budget + rolling
hysteresis (v38), drops unused fn_soc_tracking_bundle, and adds tests/docs.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Dusan Vojacek
2026-05-29 00:04:48 +02:00
parent 52e4b68789
commit ba0b55bf10
5 changed files with 432 additions and 58 deletions

View File

@@ -71,7 +71,7 @@ NEG_BUY_CHARGE_SHORTFALL_PENALTY_CZK_KWH = 100.0
PRE_NEG_CHARGE_PENALTY_CZK_KWH = 400.0
PRE_NEG_BATT_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
PRE_NEG_BATT_EXPORT_MIN_SELL_CZK_KWH = 1.0
PLANNER_BUILD_TAG = "2026-05-28-neg-prep-window-v36g"
PLANNER_BUILD_TAG = "2026-05-28-evening-export-soc-balance-v39"
# Po t_detach v prep: necpát PV do bat (měkké; tvrdý hold přes soc_target z rampy).
NEG_SELL_POST_DETACH_BCPV_DISCOURAGE_CZK_KWH = 250.0
# Večer před neg dnem: výboj do sítě (měkký shortfall na ge_bat).
@@ -88,6 +88,9 @@ POS_SELL_PRE_NEG_SOC_SHORTFALL_PENALTY_CZK_PER_WH = 0.30
PRE_NEG_BUY_SOC_CEILING_SLACK_PENALTY_CZK_PER_WH = 0.25
PRE_NEG_BUY_EMPTY_EXPORT_SHORTFALL_PENALTY_CZK_KWH = 80.0
EVENING_PEAK_SELL_EPS_CZK_KWH = 0.05
# Rolling replan: držet evening_push_ts při malé změně peak sell / SoC.
EVENING_PUSH_HYSTERESIS_SELL_PEAK_DELTA_CZK_KWH = 0.5
EVENING_PUSH_HYSTERESIS_SOC_PCT = 5.0
# Noční výprodej baterie: večer (≥17h) + ráno do východu FVE (05h Prague), jedna špička přes půlnoc.
NIGHT_EXPORT_EVENING_START_HOUR = 17
NIGHT_EXPORT_MORNING_END_HOUR = 5
@@ -1581,26 +1584,22 @@ def _evening_battery_export_push_indices(
evening_start_hour: int = 17,
) -> list[int]:
"""
Noční push: plný ge_bat na top sell sloty v nočním okně (≥17h + 05h do východu FVE).
Noční push: plný ge_bat v tolika nejdražších profitable slotech, kolik unese Wh rozpočet.
Ne jeden slot — kolik slotů unese Wh rozpočet, seřazených sell desc.
Peak sell je max v celém nočním úseku (přes půlnoc), ne per kalendářní den.
Kandidáti = profitable ∩ noční okno (≥17h + 05h do východu FVE). Řazení sell desc;
přidávat sloty dokud kumulované Wh ≤ push_budget (R__063: discharge_slot_buffer × SoC).
per_slot_discharge_wh = max_discharge × účinnost × 0,25 h; volající předává
min(discharge, export_cap × účinnost × 0,25 h) — home-01 export 13,5 kW ≈ 3,4 kWh/slot.
"""
_ = degrad_czk_kwh, evening_start_hour # kompatibilita volání
if per_slot_discharge_wh <= 0.0:
return []
peak_ts = _evening_peak_export_indices(
slots,
degrad_czk_kwh=degrad_czk_kwh,
evening_start_hour=evening_start_hour,
)
candidates = [t for t in peak_ts if t in profitable_export_ts]
if not candidates:
return []
max_sell = max(float(slots[t].sell_price) for t in candidates)
candidates = [
t
for t in candidates
if float(slots[t].sell_price) >= max_sell - EVENING_PEAK_SELL_EPS_CZK_KWH
for t, s in enumerate(slots)
if _in_night_battery_export_window(s)
and t in profitable_export_ts
and float(s.sell_price) >= 0.0
]
if not candidates:
return []
@@ -1627,6 +1626,73 @@ def _evening_battery_export_push_indices(
return sorted(out)
def _evening_night_peak_sell_czk(slots: list[PlanningSlot]) -> float:
sells = [
float(s.sell_price)
for s in slots
if _in_night_battery_export_window(s) and float(s.sell_price) >= 0.0
]
return max(sells) if sells else 0.0
def _evening_push_peak_sell_czk(slots: list[PlanningSlot], push_ts: set[int]) -> float:
if not push_ts:
return 0.0
return max(float(slots[t].sell_price) for t in push_ts)
def _evening_push_ts_from_iso(slots: list[PlanningSlot], iso_slots: list[str]) -> set[int]:
by_iso = {s.interval_start.isoformat(): t for t, s in enumerate(slots)}
return {by_iso[iso] for iso in iso_slots if iso in by_iso}
def _evening_push_hysteresis_active(
*,
prev_peak_sell_czk: float | None,
new_peak_sell_czk: float,
prev_soc_wh: float | None,
current_soc_wh: float,
usable_capacity_wh: float,
) -> bool:
if prev_peak_sell_czk is None:
return False
if abs(new_peak_sell_czk - float(prev_peak_sell_czk)) >= (
EVENING_PUSH_HYSTERESIS_SELL_PEAK_DELTA_CZK_KWH
):
return False
if prev_soc_wh is not None and usable_capacity_wh > 1e-6:
delta_pct = (
abs(float(current_soc_wh) - float(prev_soc_wh))
/ float(usable_capacity_wh)
* 100.0
)
if delta_pct >= EVENING_PUSH_HYSTERESIS_SOC_PCT:
return False
return True
def _evening_early_export_penalty_indices(
slots: list[PlanningSlot],
*,
profitable_export_ts: set[int],
discharge_export_slots: set[int],
evening_push_ts: set[int],
) -> set[int]:
"""ge_bat=0 pro profitable noční sloty pod peakeps mimo evening_push (v38: i po prvním push)."""
out: set[int] = set()
for t_ev, s_ev in enumerate(slots):
if not _in_night_battery_export_window(s_ev):
continue
if t_ev not in profitable_export_ts or t_ev not in discharge_export_slots:
continue
if t_ev in evening_push_ts:
continue
peak_sell = _night_peak_sell_czk_kwh(slots, t_ev)
if float(s_ev.sell_price) < peak_sell - EVENING_PEAK_SELL_EPS_CZK_KWH:
out.add(t_ev)
return out
def _last_non_negative_sell_before_neg_buy(
slots: list[PlanningSlot],
first_neg_buy_idx: int | None,
@@ -1772,6 +1838,7 @@ def solve_dispatch_two_pass(
operating_mode: str = "AUTO",
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
planner_version: str | None = None,
evening_push_ts_override: Optional[set[int]] = None,
) -> tuple[list["DispatchResult"], int, dict[str, Any]]:
"""
Dva průchody solve_dispatch: pass2 používá acquisition z váženého buy nabíjení v pass1.
@@ -1789,6 +1856,7 @@ def solve_dispatch_two_pass(
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
evening_push_ts_override=evening_push_ts_override,
)
acq1 = float(
snap1.get("inputs", {}).get("charge_acquisition_buy_czk_kwh")
@@ -1820,6 +1888,7 @@ def solve_dispatch_two_pass(
operating_mode=operating_mode,
charge_commitment_prev_w=charge_commitment_prev_w,
planner_version=planner_version,
evening_push_ts_override=evening_push_ts_override,
)
if isinstance(snap2.get("inputs"), dict):
snap2["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6)
@@ -1847,6 +1916,7 @@ def solve_dispatch(
planner_version: str | None = None,
relaxed_expensive_import: bool = False,
relaxed_neg_buy_charge: bool = False,
evening_push_ts_override: Optional[set[int]] = None,
) -> tuple[list[DispatchResult], int, dict[str, Any]]:
"""
LP solver pro dispatch optimalizaci.
@@ -2203,6 +2273,7 @@ def solve_dispatch(
profitable_export_ts_pre.add(_t)
evening_push_ts: set[int] = set()
evening_early_export_penalty_ts: set[int] = set()
evening_push_hysteresis_retained = False
if om == "AUTO":
per_slot_discharge_wh_pre = max(
float(battery.max_discharge_power_w)
@@ -2210,8 +2281,13 @@ def solve_dispatch(
* INTERVAL_H,
0.0,
)
export_cap_push_w = _battery_export_cap_w(battery, grid)
per_slot_push_wh_pre = min(
per_slot_discharge_wh_pre,
export_cap_push_w * float(battery.discharge_efficiency) * INTERVAL_H,
)
discharge_buf_pre = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
evening_push_ts = set(
computed_evening_push_ts = set(
_evening_battery_export_push_indices(
slots,
profitable_export_ts=profitable_export_ts_pre,
@@ -2219,26 +2295,21 @@ def solve_dispatch(
current_soc_wh=float(current_soc_wh),
min_soc_wh=float(min_soc_wh),
soc_max_wh=float(battery.soc_max_wh),
per_slot_discharge_wh=per_slot_discharge_wh_pre,
per_slot_discharge_wh=per_slot_push_wh_pre,
discharge_slot_buffer=discharge_buf_pre,
)
)
# Zákaz ge_bat jen *před* prvním push slotem (ne po něm — jinak terminal SoC + load
# drží energii pro 1921 h bez prodeje, home-01 téměř neexportuje).
first_evening_push_t = min(evening_push_ts) if evening_push_ts else None
if first_evening_push_t is not None:
for t_ev, s_ev in enumerate(slots):
if not _in_night_battery_export_window(s_ev):
continue
if t_ev >= first_evening_push_t:
continue
if t_ev not in profitable_export_ts_pre or t_ev not in discharge_export_slots:
continue
if t_ev in evening_push_ts:
continue
peak_sell = _night_peak_sell_czk_kwh(slots, t_ev)
if float(s_ev.sell_price) < peak_sell - EVENING_PEAK_SELL_EPS_CZK_KWH:
evening_early_export_penalty_ts.add(t_ev)
if evening_push_ts_override is not None:
evening_push_ts = set(evening_push_ts_override)
evening_push_hysteresis_retained = True
else:
evening_push_ts = computed_evening_push_ts
evening_early_export_penalty_ts = _evening_early_export_penalty_indices(
slots,
profitable_export_ts=profitable_export_ts_pre,
discharge_export_slots=discharge_export_slots,
evening_push_ts=evening_push_ts,
)
last_pos_sell_pre_neg_buy = _last_non_negative_sell_before_neg_buy(
slots, first_neg_buy_idx
)
@@ -2964,12 +3035,13 @@ def solve_dispatch(
# Měkký breaker cap: gi_over[t] >= max(0, gi[t] - breaker).
prob += gi_over[t] >= gi[t] - float(grid.max_import_power_w)
# SoC kontinuita (bd do domu i ge_bat do sítě vybíjí baterii)
# SoC kontinuita: bd je v bilanci zdroj na AC sběrnici; při exportu z baterie už
# obsahuje load + ge_bat (ge = ge_pv + ge_bat). ge_bat znovu neodečítat.
soc_prev = current_soc_wh if t == 0 else soc[t - 1]
prob += soc[t] == (
soc_prev
+ (bc_pv[t] + bc_gi[t]) * battery.charge_efficiency * INTERVAL_H
- (bd[t] + ge_bat[t]) / battery.discharge_efficiency * INTERVAL_H
- bd[t] / battery.discharge_efficiency * INTERVAL_H
)
sv = safety_vars[t]
@@ -3675,6 +3747,12 @@ def solve_dispatch(
if neg_sell_phases_en
else None
),
"evening_push": (
t in evening_push_ts if om == "AUTO" else None
),
"evening_early_export_ban": (
t in evening_early_export_penalty_ts if om == "AUTO" else None
),
}
)
tgt_s = st.safety_soc_target_wh if daytime_en else None
@@ -3821,6 +3899,15 @@ def solve_dispatch(
if slots[0].charge_acquisition_cutoff_at is not None
else None
),
"evening_push_ts": [
slots[i].interval_start.isoformat() for i in sorted(evening_push_ts)
],
"evening_push_peak_sell_czk_kwh": (
_evening_push_peak_sell_czk(slots, evening_push_ts)
if evening_push_ts
else _evening_night_peak_sell_czk(slots)
),
"evening_push_hysteresis_retained": bool(evening_push_hysteresis_retained),
},
"masks": masks_snap,
"soc_bounds": soc_bounds_snap,
@@ -4070,6 +4157,9 @@ async def run_rolling_replan(
slots_raw_pv.append(replace(s, pv_a_forecast_w=pva, pv_b_forecast_w=pvb))
commitment_prev = await _load_previous_plan_charge_commitment_prev_w(site_id, slots, db)
evening_push_override = await _rolling_evening_push_override(
site_id, slots, battery, soc_wh, db
)
om = operating_mode or "AUTO"
if om == "AUTO":
@@ -4079,6 +4169,7 @@ async def run_rolling_replan(
operating_mode=om,
charge_commitment_prev_w=commitment_prev,
planner_version=planner_version_resolved,
evening_push_ts_override=evening_push_override,
)
else:
results, duration_ms, solver_snapshot = solve_dispatch(
@@ -4354,6 +4445,62 @@ async def _load_site_context(site_id: int, db):
)
async def _rolling_evening_push_override(
site_id: int,
slots: list[PlanningSlot],
battery,
current_soc_wh: float,
db,
) -> set[int] | None:
"""Rolling: držet evening_push_ts z aktivního runu při malé změně peak sell / SoC."""
if not slots:
return None
row = await db.fetchrow(
"""
select solver_params
from ems.planning_run
where site_id = $1::int
and status = 'active'
limit 1
""",
site_id,
)
if row is None or row["solver_params"] is None:
return None
sp = row["solver_params"]
if isinstance(sp, str):
sp = json.loads(sp)
if not isinstance(sp, dict):
return None
inputs = sp.get("inputs")
if not isinstance(inputs, dict):
return None
prev_iso = inputs.get("evening_push_ts")
if not isinstance(prev_iso, list) or not prev_iso:
return None
prev_push = _evening_push_ts_from_iso(slots, [str(x) for x in prev_iso])
if not prev_push:
return None
prev_peak = inputs.get("evening_push_peak_sell_czk_kwh")
prev_soc = inputs.get("current_soc_wh")
new_peak = _evening_night_peak_sell_czk(slots)
if not _evening_push_hysteresis_active(
prev_peak_sell_czk=float(prev_peak) if prev_peak is not None else None,
new_peak_sell_czk=new_peak,
prev_soc_wh=float(prev_soc) if prev_soc is not None else None,
current_soc_wh=float(current_soc_wh),
usable_capacity_wh=float(battery.usable_capacity_wh),
):
return None
logger.info(
"[site=%s] evening_push hysteresis: retaining %d slot(s), peak_sell=%.3f",
site_id,
len(prev_push),
new_peak,
)
return prev_push
async def _load_previous_plan_charge_commitment_prev_w(
site_id: int,
slots: list[PlanningSlot],

View File

@@ -122,7 +122,7 @@ class PreNegBuySocPhaseTests(unittest.TestCase):
class EveningPushBudgetTests(unittest.TestCase):
"""Večerní tvrdý push: počet slotů z rozpočtu Wh (ne pevné top-3)."""
"""Večerní tvrdý push: všechny profitable peak-band sloty (v38), rozpočet jen brána."""
@staticmethod
def _evening_slots(n: int = 8) -> list[PlanningSlot]:
@@ -161,7 +161,7 @@ class EveningPushBudgetTests(unittest.TestCase):
per_slot_discharge_wh=per_slot,
discharge_slot_buffer=1.5,
)
self.assertGreater(len(push_hi), 3)
self.assertGreaterEqual(len(push_hi), 3)
soc_low = bat.min_soc_wh + 100.0
push_lo = _evening_battery_export_push_indices(
slots,
@@ -241,8 +241,8 @@ class EveningPushBudgetTests(unittest.TestCase):
per_slot_discharge_wh=per_slot,
discharge_slot_buffer=1.5,
)
self.assertIn(2, push, "nejvyšší sell 00:00 má být v push před 23:30")
self.assertEqual(push[0], 2)
self.assertIn(2, push, "nejvyšší sell 00:00 má být v push (top-3 v nočním úseku)")
self.assertEqual(max(float(slots[t].sell_price) for t in push), 3.586)
def test_evening_push_budget_matches_r063_formula(self) -> None:
bat = _battery(uc_wh=64_000.0, min_pct=10.0, max_pct=95.0)
@@ -257,6 +257,67 @@ class EveningPushBudgetTests(unittest.TestCase):
available = soc - bat.min_soc_wh
self.assertAlmostEqual(budget, min(available, exportable_full * 1.5))
def test_push_slot_count_follows_wh_budget_not_fixed_top_n(self) -> None:
"""v38: počet push slotů = floor(rozpočet Wh / per_slot), sell desc — ne pevné top-3."""
prague = ZoneInfo("Europe/Prague")
sells = [10.0, 9.5, 9.0, 5.0, 4.0, 3.0]
base = datetime(2026, 5, 25, 18, 0, tzinfo=prague)
slots = [
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sells[i],
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=800,
ev1_connected=False,
ev2_connected=False,
allow_discharge_export=True,
charge_acquisition_buy_czk_kwh=0.5,
)
for i in range(6)
]
bat = _battery(uc_wh=64_000.0, min_pct=10.0, max_pct=95.0)
per_slot = 17_000 * 0.95 * 0.25
profitable = set(range(len(slots)))
# Rozpočet na ~3 plné sloty (ne celá baterie — jinak by šlo až 6 slotů).
soc_three_slots = bat.min_soc_wh + 3.2 * per_slot
budget = _evening_push_discharge_budget_wh(
current_soc_wh=soc_three_slots,
min_soc_wh=bat.min_soc_wh,
soc_max_wh=bat.soc_max_wh,
discharge_slot_buffer=1.5,
)
expected_n = min(
len(slots),
max(0, int(budget // per_slot)),
)
push = _evening_battery_export_push_indices(
slots,
profitable_export_ts=profitable,
degrad_czk_kwh=0.15,
current_soc_wh=soc_three_slots,
min_soc_wh=bat.min_soc_wh,
soc_max_wh=bat.soc_max_wh,
per_slot_discharge_wh=per_slot,
discharge_slot_buffer=1.5,
)
self.assertEqual(len(push), expected_n)
self.assertEqual(push, [0, 1, 2], "nejdražší sloty první, ne jeden slot")
self.assertNotIn(3, push)
# Více SoC → více push slotů (dynamicky, ne strop 3).
push_hi = _evening_battery_export_push_indices(
slots,
profitable_export_ts=profitable,
degrad_czk_kwh=0.15,
current_soc_wh=0.9 * bat.soc_max_wh,
min_soc_wh=bat.min_soc_wh,
soc_max_wh=bat.soc_max_wh,
per_slot_discharge_wh=per_slot,
discharge_slot_buffer=1.5,
)
self.assertGreater(len(push_hi), len(push))
class SlotsUntilSellNegativeTests(unittest.TestCase):
def test_slots_until_first_negative_sell(self) -> None:
@@ -2322,14 +2383,17 @@ class ChargeAcquisitionArbitrageTests(unittest.TestCase):
peak = results[peak_idx]
self.assertIn(peak.export_mode, ("BATTERY_SELL", "PV_SURPLUS"))
self.assertGreater(abs(peak.grid_setpoint_w), 5000)
# v27: ge_bat=0 jen před prvním push slotem, ne u všech sell < peak0.05.
# v38: sloty mimo push s sell pod peakeps nesmí BATTERY_SELL (evening_early).
push_iso = set(snap["inputs"].get("evening_push_ts") or [])
for i, r in enumerate(results):
if i >= peak_idx:
if slots[i].interval_start.isoformat() in push_iso:
continue
if float(sells[i]) >= 4.04 - 0.05:
continue
self.assertNotEqual(
r.export_mode,
"BATTERY_SELL",
msg=f"slot {i} sell={sells[i]} must not battery-export before first push",
msg=f"slot {i} sell={sells[i]} must not battery-export when not in push",
)
def test_midnight_higher_sell_gets_battery_export(self) -> None:
@@ -2502,6 +2566,64 @@ class ChargeAcquisitionArbitrageTests(unittest.TestCase):
)
self.assertLess(evening.battery_setpoint_w, -500)
def test_evening_export_in_all_top_three_peak_slots_not_only_last(self) -> None:
"""MILP v38: export v každém z top-3 večerních sell slotů, ne až v posledním."""
prague = ZoneInfo("Europe/Prague")
sells = [10.0, 9.5, 9.0, 5.0, 4.0, 3.0]
base = datetime(2026, 5, 25, 18, 0, tzinfo=prague)
slots = [
PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=2.0,
sell_price=sells[i],
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=800,
ev1_connected=False,
ev2_connected=False,
allow_discharge_export=True,
charge_acquisition_buy_czk_kwh=0.5,
)
for i in range(6)
]
battery = _battery(uc_wh=64_000.0, terminal_soc_value_factor=0.0)
battery.max_discharge_power_w = 17_000
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
soc0 = 0.85 * battery.soc_max_wh
results, _ms, snap = solve_dispatch(
slots,
battery,
hp,
grid,
[None, None],
vehicles,
soc0,
50.0,
operating_mode="AUTO",
)
self.assertEqual(snap["planner_build_tag"], PLANNER_BUILD_TAG)
push_iso = snap["inputs"].get("evening_push_ts") or []
self.assertGreaterEqual(len(push_iso), 3)
for i in range(3):
self.assertIn(
slots[i].interval_start.isoformat(),
push_iso,
msg=f"slot {i} sell={sells[i]} must be in evening_push_ts",
)
for i in range(3):
r = results[i]
self.assertLess(
r.grid_setpoint_w,
-500,
msg=f"slot {i} sell={sells[i]} should export, not defer to cheaper later slot",
)
self.assertEqual(r.export_mode, "BATTERY_SELL")
def test_no_pv_export_at_low_sell_when_evening_peak_much_higher(self) -> None:
"""Odpolední sell ~1,4 a večer ~5,5 — PV do baterie, ne FVE→síť za haléř."""
base = datetime(2026, 5, 21, 12, 0, tzinfo=timezone.utc)
@@ -4338,7 +4460,7 @@ class NegSellPrepWindowV36Tests(unittest.TestCase):
50.0,
operating_mode="AUTO",
)
self.assertEqual(snap.get("planner_build_tag"), "2026-05-28-neg-prep-window-v36g")
self.assertEqual(snap.get("planner_build_tag"), PLANNER_BUILD_TAG)
anchors = snap["inputs"].get("neg_evening_reserve_soc_anchors") or []
self.assertGreaterEqual(len(anchors), 1)
anchor_iso = anchors[-1]["slot"]
@@ -4352,5 +4474,68 @@ class NegSellPrepWindowV36Tests(unittest.TestCase):
self.assertGreater(len(eve_slots), 8)
class SocBalanceDischargeTests(unittest.TestCase):
"""SoC bilance: při exportu z baterie stačí bd (ge_bat je v bilanci už započtený)."""
def test_export_slot_soc_drop_not_double_ge_bat(self) -> None:
base = datetime(2026, 5, 28, 18, 0, tzinfo=timezone.utc)
slots = [
PlanningSlot(
interval_start=base,
buy_price=2.0,
sell_price=9.5,
pv_a_forecast_w=0,
pv_b_forecast_w=0,
load_baseline_w=800,
ev1_connected=False,
ev2_connected=False,
allow_charge=False,
allow_discharge_export=True,
)
]
bat = _battery(uc_wh=64_000.0, max_pct=95.0, arb_pct=20.0)
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
grid = SimpleNamespace(
max_import_power_w=20_000,
max_export_power_w=13_500,
block_export_on_negative_sell=False,
)
vehicles = [
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
start_soc_wh = 0.75 * bat.soc_max_wh
results, _, _ = solve_dispatch(
slots,
bat,
hp,
grid,
[None, None],
vehicles,
start_soc_wh,
50.0,
operating_mode="AUTO",
)
end_soc_wh = results[0].battery_soc_target / 100.0 * bat.usable_capacity_wh
drop_wh = start_soc_wh - end_soc_wh
export_w = max(0, -results[0].grid_setpoint_w)
self.assertGreater(export_w, 2000, "solver should export from battery in peak slot")
load_w = 800
eff = bat.discharge_efficiency
expected_drop_wh = (load_w + export_w) * 0.25 / eff
double_count_drop_wh = (load_w + 2 * export_w) * 0.25 / eff
self.assertLess(
drop_wh,
double_count_drop_wh * 0.92,
"SoC must not drop as if ge_bat were counted twice",
)
self.assertAlmostEqual(
drop_wh,
expected_drop_wh,
delta=expected_drop_wh * 0.12,
msg="SoC drop should match bd ≈ load + export from balance",
)
if __name__ == "__main__":
unittest.main()