KV1 pozorování uživatele: ráno baterie na 11 % (min 10), prodává se do sítě — nenadálý odběr/mrak by se kupoval za fixních 6.35. v1 mělo denní rampu (safety_soc_target_wh z R__063: reserve 30 % ráno → reserve+noc večer, 6-19 h, flag planner_daytime_charge_target_enabled) — v2 ji ignoroval. Mechanismus (vzor nočního polštáře): deficit pod rampou platí za KAŽDÝ slot nájem buy×faktor (V091 asset_battery.planner_safety_soc_risk_factor, default 0.05; 0=vypnuto) → ráno se nejdřív doplní rezerva (4 h deficitu 1 kWh při buy 6.35 ≈ 5.1 Kč > sell ~2.5), extrémní sell špička smí deficit racionálně podstoupit. R__039 + db_io + 2 testy (KV1 scénář, spike). Eval fixtures beze změny (sloupec v context_json fixtures není → 0); živá produkce dostane faktor přes fn_planning_site_context. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
277 lines
12 KiB
Python
277 lines
12 KiB
Python
"""solver_v2 (čisté jádro): tvrdá pravidla, režimy, EV deadline, arbitráž (bez DB)."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import unittest
|
||
from datetime import datetime, timedelta, timezone
|
||
from types import SimpleNamespace
|
||
|
||
from services.planning.solver_v2 import solve_dispatch_v2
|
||
from services.planning.types import PlanningSlot
|
||
|
||
|
||
def _slot(
|
||
base: datetime,
|
||
i: int,
|
||
*,
|
||
buy: float,
|
||
sell: float,
|
||
pv_a: int = 0,
|
||
pv_b: int = 0,
|
||
load: int = 1000,
|
||
ev1: bool = False,
|
||
) -> PlanningSlot:
|
||
return PlanningSlot(
|
||
interval_start=base + timedelta(minutes=15 * i),
|
||
buy_price=buy,
|
||
sell_price=sell,
|
||
pv_a_forecast_w=pv_a,
|
||
pv_b_forecast_w=pv_b,
|
||
load_baseline_w=load,
|
||
ev1_connected=ev1,
|
||
ev2_connected=False,
|
||
)
|
||
|
||
|
||
def _battery(uc_wh: float = 20_000.0) -> SimpleNamespace:
|
||
return SimpleNamespace(
|
||
usable_capacity_wh=uc_wh,
|
||
min_soc_wh=0.12 * uc_wh,
|
||
arb_floor_wh=0.20 * uc_wh,
|
||
reserve_soc_wh=0.20 * uc_wh,
|
||
soc_max_wh=0.95 * uc_wh,
|
||
charge_efficiency=0.95,
|
||
discharge_efficiency=0.95,
|
||
degradation_cost_czk_kwh=0.5,
|
||
max_charge_power_w=8000,
|
||
max_discharge_power_w=8000,
|
||
planner_terminal_soc_value_factor=0.8,
|
||
)
|
||
|
||
|
||
def _grid(block_neg: bool = False, gen_cutoff: bool = False) -> SimpleNamespace:
|
||
return SimpleNamespace(
|
||
max_import_power_w=17_000,
|
||
max_export_power_w=13_500,
|
||
block_export_on_negative_sell=block_neg,
|
||
deye_gen_microinverter_cutoff_enabled=gen_cutoff,
|
||
)
|
||
|
||
|
||
_HP = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
||
_VEHICLES = [
|
||
SimpleNamespace(max_charge_power_w=11_000, battery_capacity_kwh=60.0, default_target_soc_pct=80.0),
|
||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||
]
|
||
_BASE = datetime(2026, 6, 10, 0, 0, tzinfo=timezone.utc)
|
||
|
||
|
||
def _solve(slots, *, battery=None, grid=None, ev_sessions=(None, None), soc0=None, mode="AUTO"):
|
||
bat = battery or _battery()
|
||
return solve_dispatch_v2(
|
||
slots,
|
||
bat,
|
||
_HP,
|
||
grid or _grid(),
|
||
list(ev_sessions),
|
||
_VEHICLES,
|
||
soc0 if soc0 is not None else 0.5 * bat.usable_capacity_wh,
|
||
50.0,
|
||
operating_mode=mode,
|
||
)
|
||
|
||
|
||
class HardRulesTests(unittest.TestCase):
|
||
def test_negative_buy_blocks_export(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=-2.0, sell=1.5, pv_a=6000, load=500) for i in range(8)]
|
||
results, _, _ = _solve(slots)
|
||
for r in results:
|
||
self.assertGreaterEqual(r.grid_setpoint_w, 0, "buy<0 → žádný export (pumpa)")
|
||
|
||
def test_block_export_on_negative_sell(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=8000, load=500) for i in range(8)]
|
||
results, _, _ = _solve(slots, grid=_grid(block_neg=True))
|
||
for r in results:
|
||
self.assertGreaterEqual(r.grid_setpoint_w, 0, "KV1: sell<0 → ge=0")
|
||
|
||
def test_negative_sell_prefers_charge_or_curtail_over_paid_export(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=2.0, sell=-1.0, pv_a=8000, load=500) for i in range(8)]
|
||
results, _, _ = _solve(slots)
|
||
paid_export = sum(-r.grid_setpoint_w for r in results if r.grid_setpoint_w < 0)
|
||
self.assertEqual(paid_export, 0, "spot: za export při sell<0 se platí → ekonomika ho vyloučí")
|
||
|
||
def test_battery_export_requires_arb_floor(self) -> None:
|
||
bat = _battery()
|
||
slots = [_slot(_BASE, i, buy=1.0, sell=8.0, load=500) for i in range(8)]
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.5 * bat.usable_capacity_wh)
|
||
for r in results:
|
||
if r.grid_setpoint_w < 0 and r.battery_setpoint_w < 0:
|
||
self.assertGreaterEqual(
|
||
r.battery_soc_target / 100.0 * bat.usable_capacity_wh,
|
||
bat.arb_floor_wh - 1.0,
|
||
"export z baterie nesmí podlézt arb floor",
|
||
)
|
||
|
||
def test_curtailment_only_pv_a(self) -> None:
|
||
# extrémně záporný sell bez block_export: pole B nelze omezit, A ano
|
||
slots = [_slot(_BASE, i, buy=2.0, sell=-3.0, pv_a=5000, pv_b=4000, load=300) for i in range(8)]
|
||
bat = _battery(uc_wh=2000.0) # malá baterie, ať se přebytek nevejde
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.9 * 2000.0)
|
||
self.assertTrue(any(r.pv_a_curtailed_w > 0 for r in results), "A se curtailuje")
|
||
for r in results:
|
||
self.assertLessEqual(r.pv_a_curtailed_w, 5000, "curtail max = výroba A")
|
||
|
||
|
||
class ArbitrageTests(unittest.TestCase):
|
||
def test_cheap_night_charge_expensive_evening_discharge(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=1.0, sell=0.5, load=1000) for i in range(16)]
|
||
slots += [_slot(_BASE, 16 + i, buy=8.0, sell=7.0, load=1000) for i in range(16)]
|
||
results, _, _ = _solve(slots)
|
||
charged = sum(r.battery_setpoint_w for r in results[:16] if r.battery_setpoint_w > 0)
|
||
discharged = sum(-r.battery_setpoint_w for r in results[16:] if r.battery_setpoint_w < 0)
|
||
self.assertGreater(charged, 0, "levná noc → nabíjet")
|
||
self.assertGreater(discharged, 0, "drahý večer → vybíjet")
|
||
|
||
|
||
class OperatingModeTests(unittest.TestCase):
|
||
def _slots(self):
|
||
return [_slot(_BASE, i, buy=1.0, sell=6.0, pv_a=3000, load=1000) for i in range(8)]
|
||
|
||
def test_preserve_locks_battery(self) -> None:
|
||
results, _, _ = _solve(self._slots(), mode="PRESERVE")
|
||
for r in results:
|
||
self.assertEqual(r.battery_setpoint_w, 0)
|
||
|
||
def test_charge_cheap_no_export_no_discharge(self) -> None:
|
||
results, _, _ = _solve(self._slots(), mode="CHARGE_CHEAP")
|
||
for r in results:
|
||
self.assertGreaterEqual(r.grid_setpoint_w, 0)
|
||
self.assertGreaterEqual(r.battery_setpoint_w, 0)
|
||
|
||
def test_self_sustain_import_capped_to_load(self) -> None:
|
||
results, _, _ = _solve(self._slots(), mode="SELF_SUSTAIN")
|
||
for r in results:
|
||
self.assertLessEqual(r.grid_setpoint_w, 1000, "import ≤ baseline load")
|
||
|
||
|
||
class NightReserveTests(unittest.TestCase):
|
||
def test_night_discharge_respects_buffer(self) -> None:
|
||
# noc: vysoký sell, žádné PV; buffer 2000 Wh nad min → plán nesmí
|
||
# kalkulovat s vybitím pod min+buffer (sell < buy ⇒ slack se nevyplatí)
|
||
bat = _battery()
|
||
slots = []
|
||
for i in range(16):
|
||
s = _slot(_BASE, i, buy=6.0, sell=4.5, load=800)
|
||
s.night_baseload_buffer_wh = 2000.0
|
||
slots.append(s)
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
|
||
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
|
||
for r in results:
|
||
self.assertGreaterEqual(r.battery_soc_target, floor_pct - 0.6)
|
||
|
||
def test_extreme_sell_spike_may_sell_reserve(self) -> None:
|
||
# sell výrazně nad buy → racionální polštář prodat (placený slack)
|
||
bat = _battery()
|
||
slots = []
|
||
for i in range(16):
|
||
s = _slot(_BASE, i, buy=2.0, sell=12.0, load=300)
|
||
s.night_baseload_buffer_wh = 2000.0
|
||
slots.append(s)
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.6 * bat.usable_capacity_wh)
|
||
min_soc_pct = min(r.battery_soc_target for r in results)
|
||
floor_pct = (bat.min_soc_wh + 2000.0) / bat.usable_capacity_wh * 100.0
|
||
self.assertLess(min_soc_pct, floor_pct - 1.0, "spike má polštář vyprodat")
|
||
|
||
def test_start_below_buffer_is_feasible(self) -> None:
|
||
bat = _battery()
|
||
slots = []
|
||
for i in range(8):
|
||
s = _slot(_BASE, i, buy=6.0, sell=1.0, load=1500)
|
||
s.night_baseload_buffer_wh = 3000.0
|
||
slots.append(s)
|
||
results, _, _ = _solve(slots, battery=bat, soc0=bat.min_soc_wh + 500.0)
|
||
self.assertEqual(len(results), 8)
|
||
|
||
|
||
class DaytimeSafetyRampTests(unittest.TestCase):
|
||
def test_morning_tops_up_reserve_before_selling(self) -> None:
|
||
# KV1 scénář: ráno baterie u dna, fixní buy 6.35 >> sell 2.5, PV jede;
|
||
# s rampou (target 30 % usable) musí nejdřív dotáhnout rezervu, ne prodávat
|
||
bat = _battery()
|
||
bat.planner_safety_soc_risk_factor = 0.05
|
||
target_wh = 0.30 * bat.usable_capacity_wh
|
||
slots = []
|
||
for i in range(16):
|
||
s = _slot(_BASE, i, buy=6.35, sell=2.5, pv_a=6000, load=800)
|
||
s.safety_soc_target_wh = target_wh
|
||
slots.append(s)
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.11 * bat.usable_capacity_wh)
|
||
soc_pct = [r.battery_soc_target for r in results]
|
||
first_reach = next((i for i, v in enumerate(soc_pct) if v >= 29.5), None)
|
||
self.assertIsNotNone(first_reach, "rampa má dotáhnout na rezervu")
|
||
exported_before = sum(
|
||
-r.grid_setpoint_w for r in results[:first_reach] if r.grid_setpoint_w < 0
|
||
)
|
||
self.assertLess(
|
||
exported_before, 500 * max(1, first_reach),
|
||
"před dosažením rezervy se nemá významně prodávat",
|
||
)
|
||
|
||
def test_sell_spike_beats_ramp(self) -> None:
|
||
# extrémní sell nad buy → deficit je racionální podstoupit
|
||
bat = _battery()
|
||
bat.planner_safety_soc_risk_factor = 0.05
|
||
slots = []
|
||
for i in range(16):
|
||
s = _slot(_BASE, i, buy=2.0, sell=14.0, pv_a=2000, load=300)
|
||
s.safety_soc_target_wh = 0.5 * bat.usable_capacity_wh
|
||
slots.append(s)
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.45 * bat.usable_capacity_wh)
|
||
total_export = sum(-r.grid_setpoint_w for r in results if r.grid_setpoint_w < 0)
|
||
self.assertGreater(total_export, 5000, "spike má vyprodat i pod target")
|
||
|
||
|
||
class PvRiskFrontloadTests(unittest.TestCase):
|
||
def test_neg_window_charges_asap(self) -> None:
|
||
# sell<0 okno, PV >> load, prázdnější baterie: s frontload prémií musí
|
||
# nabíjení běžet plným tempem od začátku (ne odložené na konec okna)
|
||
bat = _battery()
|
||
bat.planner_pv_risk_frontload_czk_kwh = 0.05
|
||
slots = [_slot(_BASE, i, buy=2.0, sell=-0.5, pv_a=12000, load=500) for i in range(12)]
|
||
results, _, _ = _solve(slots, battery=bat, soc0=0.2 * bat.usable_capacity_wh)
|
||
# max tempo: 8 kW × 0.25 h × 0.95 eff = 1.9 kWh/slot = 9.5 p.b. na 20 kWh
|
||
soc_mid = results[3].battery_soc_target
|
||
self.assertGreaterEqual(
|
||
soc_mid, 20.0 + 4 * 9.0,
|
||
"frontload: prvni 4 sloty maji nabijet plnym vykonem",
|
||
)
|
||
|
||
|
||
class EvDeadlineTests(unittest.TestCase):
|
||
def test_ev_energy_delivered_before_deadline(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=2.0 if i < 8 else 6.0, sell=1.0, ev1=True) for i in range(16)]
|
||
session = SimpleNamespace(
|
||
target_deadline=_BASE + timedelta(hours=4), # slot 16 → vše do konce
|
||
energy_needed_wh=8000.0,
|
||
)
|
||
results, _, snap = _solve(slots, ev_sessions=(session, None))
|
||
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
|
||
self.assertGreaterEqual(delivered, 8000.0 - 1.0)
|
||
self.assertEqual(snap["objective_terms"]["ev_unmet_wh"], [0.0])
|
||
# levné sloty (0–7) mají dodat většinu energie
|
||
cheap = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[:8])
|
||
self.assertGreater(cheap, 4000.0, "EV nabíjí přednostně v levných slotech")
|
||
|
||
def test_ev_unreachable_deadline_uses_paid_slack(self) -> None:
|
||
slots = [_slot(_BASE, i, buy=2.0, sell=1.0, ev1=(i == 0)) for i in range(8)]
|
||
session = SimpleNamespace(
|
||
target_deadline=_BASE + timedelta(minutes=15),
|
||
energy_needed_wh=50_000.0, # nesplnitelné za 1 slot
|
||
)
|
||
results, _, snap = _solve(slots, ev_sessions=(session, None))
|
||
self.assertGreater(snap["objective_terms"]["ev_unmet_wh"][0], 0.0, "slack místo infeasible")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|