Files
ems/backend/tests/test_solver_v2.py
Dusan Vojacek 85dff7f13e
All checks were successful
CI and deploy / migration-check (push) Successful in 44s
CI and deploy / deploy (push) Has been skipped
v2: měkký EV cíl — oportunistické nabíjení nad target (+ strop energie)
Uživatel: 'potřebuju do X % (tvrdý), ale klidně dobij na 100 % když je to
skoro zadarmo; při záporných cenách radši do auta než nechat na střeše'.

- V094 asset_vehicle.opportunistic_value_czk_kwh (default 1.0; = hodnota
  ušetřeného BUDOUCÍHO nabíjení — auto neumí zpět, žádný noční prodej)
- R__039 ev_sessions: + headroom_wh ((100−target) % kapacity) + opp value;
  session se nenuluje po dosažení targetu, dokud má headroom
- solver_v2: dekompozice Σ(EV) == needed − unmet + opp, opp ∈ [0, headroom],
  odměna opp×value; zároveň FIX latentního bugu — při buy<0 chyběl strop
  celkové energie do auta (model mohl pumpovat bez limitu)
- 3 testy (neg ceny sají nad target po strop; běžné ceny ne; cap při opp=0);
  eval fixtures beze změny (sessions null)

Víkend (pátek nízký tvrdý cíl + víkendová negativa → samo doplní do 100 %)
vyplývá z mechanismu, žádná speciální logika.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 12:17:59 +02:00

312 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""solver_v2 (čisté jádro): tvrdá pravidla, režimy, EV deadline, arbitráž (bez DB)."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.planning.solver_v2 import solve_dispatch_v2
from services.planning.types import PlanningSlot
def _slot(
base: datetime,
i: int,
*,
buy: float,
sell: float,
pv_a: int = 0,
pv_b: int = 0,
load: int = 1000,
ev1: bool = False,
) -> PlanningSlot:
return PlanningSlot(
interval_start=base + timedelta(minutes=15 * i),
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=pv_a,
pv_b_forecast_w=pv_b,
load_baseline_w=load,
ev1_connected=ev1,
ev2_connected=False,
)
def _battery(uc_wh: float = 20_000.0) -> SimpleNamespace:
return SimpleNamespace(
usable_capacity_wh=uc_wh,
min_soc_wh=0.12 * uc_wh,
arb_floor_wh=0.20 * uc_wh,
reserve_soc_wh=0.20 * uc_wh,
soc_max_wh=0.95 * uc_wh,
charge_efficiency=0.95,
discharge_efficiency=0.95,
degradation_cost_czk_kwh=0.5,
max_charge_power_w=8000,
max_discharge_power_w=8000,
planner_terminal_soc_value_factor=0.8,
)
def _grid(block_neg: bool = False, gen_cutoff: bool = False) -> SimpleNamespace:
return SimpleNamespace(
max_import_power_w=17_000,
max_export_power_w=13_500,
block_export_on_negative_sell=block_neg,
deye_gen_microinverter_cutoff_enabled=gen_cutoff,
)
_HP = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
_VEHICLES = [
SimpleNamespace(max_charge_power_w=11_000, battery_capacity_kwh=60.0, default_target_soc_pct=80.0),
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
]
_BASE = datetime(2026, 6, 10, 0, 0, tzinfo=timezone.utc)
def _solve(slots, *, battery=None, grid=None, ev_sessions=(None, None), soc0=None, mode="AUTO"):
bat = battery or _battery()
return solve_dispatch_v2(
slots,
bat,
_HP,
grid or _grid(),
list(ev_sessions),
_VEHICLES,
soc0 if soc0 is not None else 0.5 * bat.usable_capacity_wh,
50.0,
operating_mode=mode,
)
class HardRulesTests(unittest.TestCase):
def test_negative_buy_blocks_export(self) -> None:
slots = [_slot(_BASE, i, buy=-2.0, sell=1.5, pv_a=6000, load=500) for i in range(8)]
results, _, _ = _solve(slots)
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0, "buy<0 → žádný export (pumpa)")
def test_block_export_on_negative_sell(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=8000, load=500) for i in range(8)]
results, _, _ = _solve(slots, grid=_grid(block_neg=True))
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0, "KV1: sell<0 → ge=0")
def test_negative_sell_prefers_charge_or_curtail_over_paid_export(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=-1.0, pv_a=8000, load=500) for i in range(8)]
results, _, _ = _solve(slots)
paid_export = sum(-r.grid_setpoint_w for r in results if r.grid_setpoint_w < 0)
self.assertEqual(paid_export, 0, "spot: za export při sell<0 se platí → ekonomika ho vyloučí")
def test_battery_export_requires_arb_floor(self) -> None:
bat = _battery()
slots = [_slot(_BASE, i, buy=1.0, sell=8.0, load=500) for i in range(8)]
results, _, _ = _solve(slots, battery=bat, soc0=0.5 * bat.usable_capacity_wh)
for r in results:
if r.grid_setpoint_w < 0 and r.battery_setpoint_w < 0:
self.assertGreaterEqual(
r.battery_soc_target / 100.0 * bat.usable_capacity_wh,
bat.arb_floor_wh - 1.0,
"export z baterie nesmí podlézt arb floor",
)
def test_curtailment_only_pv_a(self) -> None:
# extrémně záporný sell bez block_export: pole B nelze omezit, A ano
slots = [_slot(_BASE, i, buy=2.0, sell=-3.0, pv_a=5000, pv_b=4000, load=300) for i in range(8)]
bat = _battery(uc_wh=2000.0) # malá baterie, ať se přebytek nevejde
results, _, _ = _solve(slots, battery=bat, soc0=0.9 * 2000.0)
self.assertTrue(any(r.pv_a_curtailed_w > 0 for r in results), "A se curtailuje")
for r in results:
self.assertLessEqual(r.pv_a_curtailed_w, 5000, "curtail max = výroba A")
class ArbitrageTests(unittest.TestCase):
def test_cheap_night_charge_expensive_evening_discharge(self) -> None:
slots = [_slot(_BASE, i, buy=1.0, sell=0.5, load=1000) for i in range(16)]
slots += [_slot(_BASE, 16 + i, buy=8.0, sell=7.0, load=1000) for i in range(16)]
results, _, _ = _solve(slots)
charged = sum(r.battery_setpoint_w for r in results[:16] if r.battery_setpoint_w > 0)
discharged = sum(-r.battery_setpoint_w for r in results[16:] if r.battery_setpoint_w < 0)
self.assertGreater(charged, 0, "levná noc → nabíjet")
self.assertGreater(discharged, 0, "drahý večer → vybíjet")
class OperatingModeTests(unittest.TestCase):
def _slots(self):
return [_slot(_BASE, i, buy=1.0, sell=6.0, pv_a=3000, load=1000) for i in range(8)]
def test_preserve_locks_battery(self) -> None:
results, _, _ = _solve(self._slots(), mode="PRESERVE")
for r in results:
self.assertEqual(r.battery_setpoint_w, 0)
def test_charge_cheap_no_export_no_discharge(self) -> None:
results, _, _ = _solve(self._slots(), mode="CHARGE_CHEAP")
for r in results:
self.assertGreaterEqual(r.grid_setpoint_w, 0)
self.assertGreaterEqual(r.battery_setpoint_w, 0)
def test_self_sustain_import_capped_to_load(self) -> None:
results, _, _ = _solve(self._slots(), mode="SELF_SUSTAIN")
for r in results:
self.assertLessEqual(r.grid_setpoint_w, 1000, "import ≤ baseline load")
class NightReserveTests(unittest.TestCase):
def test_night_discharge_respects_buffer(self) -> None:
# noc: vysoký sell, žádné PV; buffer 2000 Wh nad min → plán nesmí
# kalkulovat s vybitím pod min+buffer (sell < buy ⇒ slack se nevyplatí)
bat = _battery()
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=6.0, sell=4.5, load=800)
s.night_baseload_buffer_wh = 2000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
for r in results:
self.assertGreaterEqual(r.battery_soc_target, floor_pct - 0.6)
def test_extreme_sell_spike_may_sell_reserve(self) -> None:
# sell výrazně nad buy → racionální polštář prodat (placený slack)
bat = _battery()
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=2.0, sell=12.0, load=300)
s.night_baseload_buffer_wh = 2000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
min_soc_pct = min(r.battery_soc_target for r in results)
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
self.assertLess(min_soc_pct, floor_pct - 1.0, "spike má polštář vyprodat")
def test_start_below_buffer_is_feasible(self) -> None:
bat = _battery()
slots = []
for i in range(8):
s = _slot(_BASE, i, buy=6.0, sell=1.0, load=1500)
s.night_baseload_buffer_wh = 3000.0
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=bat.min_soc_wh + 500.0)
self.assertEqual(len(results), 8)
class DaytimeSafetyRampTests(unittest.TestCase):
def test_morning_tops_up_reserve_before_selling(self) -> None:
# KV1 scénář: ráno baterie u dna, fixní buy 6.35 >> sell 2.5, PV jede;
# s rampou (target 30 % usable) musí nejdřív dotáhnout rezervu, ne prodávat
bat = _battery()
bat.planner_safety_soc_risk_factor = 0.05
target_wh = 0.30 * bat.usable_capacity_wh
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=6.35, sell=2.5, pv_a=6000, load=800)
s.safety_soc_target_wh = target_wh
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.11 * bat.usable_capacity_wh)
soc_pct = [r.battery_soc_target for r in results]
first_reach = next((i for i, v in enumerate(soc_pct) if v >= 29.5), None)
self.assertIsNotNone(first_reach, "rampa má dotáhnout na rezervu")
exported_before = sum(
-r.grid_setpoint_w for r in results[:first_reach] if r.grid_setpoint_w < 0
)
self.assertLess(
exported_before, 500 * max(1, first_reach),
"před dosažením rezervy se nemá významně prodávat",
)
def test_sell_spike_beats_ramp(self) -> None:
# extrémní sell nad buy → deficit je racionální podstoupit
bat = _battery()
bat.planner_safety_soc_risk_factor = 0.05
slots = []
for i in range(16):
s = _slot(_BASE, i, buy=2.0, sell=14.0, pv_a=2000, load=300)
s.safety_soc_target_wh = 0.5 * bat.usable_capacity_wh
slots.append(s)
results, _, _ = _solve(slots, battery=bat, soc0=0.45 * bat.usable_capacity_wh)
total_export = sum(-r.grid_setpoint_w for r in results if r.grid_setpoint_w < 0)
self.assertGreater(total_export, 5000, "spike má vyprodat i pod target")
class PvRiskFrontloadTests(unittest.TestCase):
def test_neg_window_charges_asap(self) -> None:
# sell<0 okno, PV >> load, prázdnější baterie: s frontload prémií musí
# nabíjení běžet plným tempem od začátku (ne odložené na konec okna)
bat = _battery()
bat.planner_pv_risk_frontload_czk_kwh = 0.05
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=12000, load=500) for i in range(12)]
results, _, _ = _solve(slots, battery=bat, soc0=0.2 * bat.usable_capacity_wh)
# max tempo: 8 kW × 0.25 h × 0.95 eff = 1.9 kWh/slot = 9.5 p.b. na 20 kWh
soc_mid = results[3].battery_soc_target
self.assertGreaterEqual(
soc_mid, 20.0 + 4 * 9.0,
"frontload: prvni 4 sloty maji nabijet plnym vykonem",
)
class EvOpportunisticTests(unittest.TestCase):
def _session(self, needed=4000.0, headroom=20000.0, opp=1.0):
return SimpleNamespace(
target_deadline=_BASE + timedelta(hours=2),
energy_needed_wh=needed,
headroom_wh=headroom,
opportunistic_value_czk_kwh=opp,
)
def test_negative_prices_fill_beyond_target(self) -> None:
# buy<0 celé okno → nad target se vyplatí brát (hodnota 1 Kč/kWh + platí ti síť)
slots = [_slot(_BASE, i, buy=-1.0, sell=-0.5, ev1=True, load=300) for i in range(16)]
results, _, snap = _solve(slots, ev_sessions=(self._session(), None))
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertGreater(delivered, 4000.0 + 2000.0, "měkký cíl má nasávat")
self.assertLessEqual(delivered, 4000.0 + 20000.0 + 1.0, "strop headroom")
self.assertGreater(snap["objective_terms"]["ev_opp_wh"][0], 0)
def test_normal_prices_no_opportunistic(self) -> None:
# běžné ceny (buy 3) > hodnota 1 Kč/kWh → jen tvrdý cíl
slots = [_slot(_BASE, i, buy=3.0, sell=2.0, ev1=True, load=300) for i in range(16)]
results, _, snap = _solve(slots, ev_sessions=(self._session(), None))
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertLess(delivered, 4000.0 + 200.0)
self.assertLess(snap["objective_terms"]["ev_opp_wh"][0], 100.0)
def test_total_energy_capped_even_at_negative_buy(self) -> None:
# fix latentního bugu: bez headroom (opp=0) nesmí buy<0 pumpovat nad needed
slots = [_slot(_BASE, i, buy=-2.0, sell=-1.0, ev1=True, load=300) for i in range(16)]
sess = self._session(needed=3000.0, headroom=0.0, opp=0.0)
results, _, _ = _solve(slots, ev_sessions=(sess, None))
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertLessEqual(delivered, 3000.0 + 1.0)
class EvDeadlineTests(unittest.TestCase):
def test_ev_energy_delivered_before_deadline(self) -> None:
slots = [_slot(_BASE, i, buy=2.0 if i < 8 else 6.0, sell=1.0, ev1=True) for i in range(16)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=4), # slot 16 → vše do konce
energy_needed_wh=8000.0,
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertGreaterEqual(delivered, 8000.0 - 1.0)
self.assertEqual(snap["objective_terms"]["ev_unmet_wh"], [0.0])
# levné sloty (07) mají dodat většinu energie
cheap = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[:8])
self.assertGreater(cheap, 4000.0, "EV nabíjí přednostně v levných slotech")
def test_ev_unreachable_deadline_uses_paid_slack(self) -> None:
slots = [_slot(_BASE, i, buy=2.0, sell=1.0, ev1=(i == 0)) for i in range(8)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(minutes=15),
energy_needed_wh=50_000.0, # nesplnitelné za 1 slot
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
self.assertGreater(snap["objective_terms"]["ev_unmet_wh"][0], 0.0, "slack místo infeasible")
if __name__ == "__main__":
unittest.main()