Files
ems/backend/tests/test_solver_v2.py
Dusan Vojacek 2932d48080
All checks were successful
CI and deploy / migration-check (push) Successful in 29s
CI and deploy / deploy (push) Successful in 1m0s
v2: PV-risk front-load — nabít v neg okně co nejdřív (nejistota predikce)
v1 to řešil rampou (plný výkon než se řeže pole A — zelený bonus B, riziko
večerního mraku). v2 byl k načasování v okně sell<0 indiferentní (PV zdarma
kdykoliv) a směl nabíjení odložit — odklad ale spoléhá na predikci.

Mechanismus: malá prémie za držení energie dřív (objective −= soc[t] ×
frontload v neg slotech). Rozbíjí indiferenci směrem k front-loadu, nikdy
nepřebije skutečné ceny. Velikost z DB: asset_battery.
planner_pv_risk_frontload_czk_kwh (V090, default 0.01; 0 = vypnuto),
přes fn_planning_site_context (R__039). Test: 4 sloty plným tempem od startu.
Eval fixtures beze změny (sloupec v nich není → 0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 09:55:22 +02:00

239 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""solver_v2 (čisté jádro): tvrdá pravidla, režimy, EV deadline, arbitráž (bez DB)."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.planning.solver_v2 import solve_dispatch_v2
from services.planning.types import PlanningSlot
def _slot(
base: datetime,
i: int,
*,
buy: float,
sell: float,
pv_a: int = 0,
pv_b: int = 0,
load: int = 1000,
ev1: bool = False,
) -> PlanningSlot:
return PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=pv_b,
load_baseline_w=load,
ev1_connected=ev1,
ev2_connected=False,
)
def _battery(uc_wh: float = 20_000.0) -> SimpleNamespace:
return SimpleNamespace(
usable_capacity_wh=uc_wh,
min_soc_wh=0.12 * uc_wh,
arb_floor_wh=0.20 * uc_wh,
reserve_soc_wh=0.20 * uc_wh,
soc_max_wh=0.95 * uc_wh,
charge_efficiency=0.95,
discharge_efficiency=0.95,
degradation_cost_czk_kwh=0.5,
max_charge_power_w=8000,
max_discharge_power_w=8000,
planner_terminal_soc_value_factor=0.8,
)
def _grid(block_neg: bool = False, gen_cutoff: bool = False) -> SimpleNamespace:
return SimpleNamespace(
max_import_power_w=17_000,
max_export_power_w=13_500,
block_export_on_negative_sell=block_neg,
deye_gen_microinverter_cutoff_enabled=gen_cutoff,
)
_HP = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
_VEHICLES = [
SimpleNamespace(max_charge_power_w=11_000, battery_capacity_kwh=60.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
_BASE = datetime(2026, 6, 10, 0, 0, tzinfo=timezone.utc)
def _solve(slots, *, battery=None, grid=None, ev_sessions=(None, None), soc0=None, mode="AUTO"):
bat = battery or _battery()
return solve_dispatch_v2(
slots,
bat,
_HP,
grid or _grid(),
list(ev_sessions),
_VEHICLES,
soc0 if soc0 is not None else 0.5 * bat.usable_capacity_wh,
50.0,
operating_mode=mode,
)
class HardRulesTests(unittest.TestCase):
def test_negative_buy_blocks_export(self) -> None:
slots = [_slot(_BASE, i, buy=-2.0, sell=1.5, pv_a=6000, load=500) for i in range(8)]
results, _, _ = _solve(slots)
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0, "buy<0 → žádný export (pumpa)")
def test_block_export_on_negative_sell(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=8000, load=500) for i in range(8)]
results, _, _ = _solve(slots, grid=_grid(block_neg=True))
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0, "KV1: sell<0 → ge=0")
def test_negative_sell_prefers_charge_or_curtail_over_paid_export(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=-1.0, pv_a=8000, load=500) for i in range(8)]
results, _, _ = _solve(slots)
paid_export = sum(-r.grid_setpoint_w for r in results if r.grid_setpoint_w < 0)
self.assertEqual(paid_export, 0, "spot: za export při sell<0 se platí → ekonomika ho vyloučí")
def test_battery_export_requires_arb_floor(self) -> None:
bat = _battery()
slots = [_slot(_BASE, i, buy=1.0, sell=8.0, load=500) for i in range(8)]
results, _, _ = _solve(slots, battery=bat, soc0=0.5 * bat.usable_capacity_wh)
for r in results:
if r.grid_setpoint_w < 0 and r.battery_setpoint_w < 0:
self.assertGreaterEqual(
r.battery_soc_target / 100.0 * bat.usable_capacity_wh,
bat.arb_floor_wh - 1.0,
"export z baterie nesmí podlézt arb floor",
)
def test_curtailment_only_pv_a(self) -> None:
# extrémně záporný sell bez block_export: pole B nelze omezit, A ano
slots = [_slot(_BASE, i, buy=2.0, sell=-3.0, pv_a=5000, pv_b=4000, load=300) for i in range(8)]
bat = _battery(uc_wh=2000.0) # malá baterie, ať se přebytek nevejde
results, _, _ = _solve(slots, battery=bat, soc0=0.9 * 2000.0)
self.assertTrue(any(r.pv_a_curtailed_w > 0 for r in results), "A se curtailuje")
for r in results:
self.assertLessEqual(r.pv_a_curtailed_w, 5000, "curtail max = výroba A")
class ArbitrageTests(unittest.TestCase):
def test_cheap_night_charge_expensive_evening_discharge(self) -> None:
slots = [_slot(_BASE, i, buy=1.0, sell=0.5, load=1000) for i in range(16)]
slots += [_slot(_BASE, 16 + i, buy=8.0, sell=7.0, load=1000) for i in range(16)]
results, _, _ = _solve(slots)
charged = sum(r.battery_setpoint_w for r in results[:16] if r.battery_setpoint_w > 0)
discharged = sum(-r.battery_setpoint_w for r in results[16:] if r.battery_setpoint_w < 0)
self.assertGreater(charged, 0, "levná noc → nabíjet")
self.assertGreater(discharged, 0, "drahý večer → vybíjet")
class OperatingModeTests(unittest.TestCase):
def _slots(self):
return [_slot(_BASE, i, buy=1.0, sell=6.0, pv_a=3000, load=1000) for i in range(8)]
def test_preserve_locks_battery(self) -> None:
results, _, _ = _solve(self._slots(), mode="PRESERVE")
for r in results:
self.assertEqual(r.battery_setpoint_w, 0)
def test_charge_cheap_no_export_no_discharge(self) -> None:
results, _, _ = _solve(self._slots(), mode="CHARGE_CHEAP")
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0)
self.assertGreaterEqual(r.battery_setpoint_w, 0)
def test_self_sustain_import_capped_to_load(self) -> None:
results, _, _ = _solve(self._slots(), mode="SELF_SUSTAIN")
for r in results:
self.assertLessEqual(r.grid_setpoint_w, 1000, "import ≤ baseline load")
class NightReserveTests(unittest.TestCase):
def test_night_discharge_respects_buffer(self) -> None:
# noc: vysoký sell, žádné PV; buffer 2000 Wh nad min → plán nesmí
# kalkulovat s vybitím pod min+buffer (sell < buy ⇒ slack se nevyplatí)
bat = _battery()
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=6.0, sell=4.5, load=800)
s.night_baseload_buffer_wh = 2000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
for r in results:
self.assertGreaterEqual(r.battery_soc_target, floor_pct - 0.6)
def test_extreme_sell_spike_may_sell_reserve(self) -> None:
# sell výrazně nad buy → racionální polštář prodat (placený slack)
bat = _battery()
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=2.0, sell=12.0, load=300)
s.night_baseload_buffer_wh = 2000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
min_soc_pct = min(r.battery_soc_target for r in results)
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
self.assertLess(min_soc_pct, floor_pct - 1.0, "spike má polštář vyprodat")
def test_start_below_buffer_is_feasible(self) -> None:
bat = _battery()
slots = []
for i in range(8):
s = _slot(_BASE, i, buy=6.0, sell=1.0, load=1500)
s.night_baseload_buffer_wh = 3000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=bat.min_soc_wh + 500.0)
self.assertEqual(len(results), 8)
class PvRiskFrontloadTests(unittest.TestCase):
def test_neg_window_charges_asap(self) -> None:
# sell<0 okno, PV >> load, prázdnější baterie: s frontload prémií musí
# nabíjení běžet plným tempem od začátku (ne odložené na konec okna)
bat = _battery()
bat.planner_pv_risk_frontload_czk_kwh = 0.05
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=12000, load=500) for i in range(12)]
results, _, _ = _solve(slots, battery=bat, soc0=0.2 * bat.usable_capacity_wh)
# max tempo: 8 kW × 0.25 h × 0.95 eff = 1.9 kWh/slot = 9.5 p.b. na 20 kWh
soc_mid = results[3].battery_soc_target
self.assertGreaterEqual(
soc_mid, 20.0 + 4 * 9.0,
"frontload: prvni 4 sloty maji nabijet plnym vykonem",
)
class EvDeadlineTests(unittest.TestCase):
def test_ev_energy_delivered_before_deadline(self) -> None:
slots = [_slot(_BASE, i, buy=2.0 if i < 8 else 6.0, sell=1.0, ev1=True) for i in range(16)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=4), # slot 16 → vše do konce
energy_needed_wh=8000.0,
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertGreaterEqual(delivered, 8000.0 - 1.0)
self.assertEqual(snap["objective_terms"]["ev_unmet_wh"], [0.0])
# levné sloty (07) mají dodat většinu energie
cheap = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[:8])
self.assertGreater(cheap, 4000.0, "EV nabíjí přednostně v levných slotech")
def test_ev_unreachable_deadline_uses_paid_slack(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=1.0, ev1=(i == 0)) for i in range(8)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(minutes=15),
energy_needed_wh=50_000.0, # nesplnitelné za 1 slot
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
self.assertGreater(snap["objective_terms"]["ev_unmet_wh"][0], 0.0, "slack místo infeasible")
if __name__ == "__main__":
unittest.main()