LP first zjednoduseni
This commit is contained in:
@@ -38,6 +38,8 @@ SOLVER_TIME_LIMIT = 10 # sekund
|
||||
# (rezerva z DB). Při relaxaci spodku před extrémně záporným buy je podlaha soc_panel_min[t]
|
||||
# (planner floor), jinak by šlo jen do zátěže a nešlo by „vypustit do sítě“ před levným nákupem.
|
||||
GE_MIN_EXPORT_W = 1.0
|
||||
# Dvouprůchodové solve: stop když acquisition z pass1 vs pass2 se liší méně než (Kč/kWh).
|
||||
ACQUISITION_TWO_PASS_EPS_KWH = 0.05
|
||||
# Dokud je kotva pro hluboký dump (první sell < 0 v horizontu, jinak první extrémní buy) dál než
|
||||
# tento počet 15min slotů, držíme plánovací spodek na rezervě (arb_base_wh) místo planner floor —
|
||||
# priorita: beze „ztráty na prodeji“ (sell >= 0) držet buffer, hluboký vývoz až těsně před záporným prodejem.
|
||||
@@ -579,6 +581,113 @@ def apply_forecast_correction(
|
||||
# LP Solver
|
||||
# ============================================================
|
||||
|
||||
def _recompute_charge_acquisition_from_results(
|
||||
slots: list[PlanningSlot],
|
||||
results: list["DispatchResult"],
|
||||
battery,
|
||||
) -> float:
|
||||
"""Vážený buy z nabíjecích slotů (grid import + bat charge) z prvního solve."""
|
||||
wh_total = 0.0
|
||||
cost = 0.0
|
||||
for s, r in zip(slots, results):
|
||||
if not s.allow_charge:
|
||||
continue
|
||||
gi_w = max(0, int(r.grid_setpoint_w or 0))
|
||||
bc_w = max(0, int(r.battery_setpoint_w or 0))
|
||||
wh = (gi_w + bc_w) * INTERVAL_H
|
||||
if wh <= 0:
|
||||
continue
|
||||
wh_total += wh
|
||||
cost += float(s.buy_price) * wh
|
||||
if wh_total <= 0:
|
||||
raw = getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
|
||||
if raw is not None:
|
||||
return float(raw)
|
||||
return min(float(s.buy_price) for s in slots)
|
||||
return cost / wh_total
|
||||
|
||||
|
||||
def _slots_with_charge_acquisition(
|
||||
slots: list[PlanningSlot],
|
||||
acquisition_czk_kwh: float,
|
||||
) -> list[PlanningSlot]:
|
||||
return [
|
||||
replace(s, charge_acquisition_buy_czk_kwh=acquisition_czk_kwh)
|
||||
for s in slots
|
||||
]
|
||||
|
||||
|
||||
def solve_dispatch_two_pass(
|
||||
slots: list[PlanningSlot],
|
||||
battery,
|
||||
heat_pump,
|
||||
grid,
|
||||
ev_sessions: list,
|
||||
vehicles: list,
|
||||
current_soc_wh: float,
|
||||
current_tuv_temp_c: float,
|
||||
*,
|
||||
tuv_delta_stats: Optional[dict[tuple[int, int], float]] = None,
|
||||
operating_mode: str = "AUTO",
|
||||
charge_commitment_prev_w: Optional[list[Optional[float]]] = None,
|
||||
planner_version: str | None = None,
|
||||
) -> tuple[list["DispatchResult"], int, dict[str, Any]]:
|
||||
"""
|
||||
Dva průchody solve_dispatch: pass2 používá acquisition z váženého buy nabíjení v pass1.
|
||||
"""
|
||||
results1, ms1, snap1 = solve_dispatch(
|
||||
slots,
|
||||
battery,
|
||||
heat_pump,
|
||||
grid,
|
||||
ev_sessions,
|
||||
vehicles,
|
||||
current_soc_wh,
|
||||
current_tuv_temp_c,
|
||||
tuv_delta_stats=tuv_delta_stats,
|
||||
operating_mode=operating_mode,
|
||||
charge_commitment_prev_w=charge_commitment_prev_w,
|
||||
planner_version=planner_version,
|
||||
)
|
||||
acq1 = float(
|
||||
snap1.get("inputs", {}).get("charge_acquisition_buy_czk_kwh")
|
||||
or getattr(slots[0], "charge_acquisition_buy_czk_kwh", None)
|
||||
or min(float(s.buy_price) for s in slots)
|
||||
)
|
||||
acq2 = _recompute_charge_acquisition_from_results(slots, results1, battery)
|
||||
converged = abs(acq2 - acq1) < ACQUISITION_TWO_PASS_EPS_KWH
|
||||
if converged:
|
||||
if isinstance(snap1.get("inputs"), dict):
|
||||
snap1["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6)
|
||||
snap1["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6)
|
||||
snap1["inputs"]["two_pass_enabled"] = True
|
||||
snap1["inputs"]["two_pass_converged"] = True
|
||||
return results1, ms1, snap1
|
||||
|
||||
slots2 = _slots_with_charge_acquisition(slots, acq2)
|
||||
results2, ms2, snap2 = solve_dispatch(
|
||||
slots2,
|
||||
battery,
|
||||
heat_pump,
|
||||
grid,
|
||||
ev_sessions,
|
||||
vehicles,
|
||||
current_soc_wh,
|
||||
current_tuv_temp_c,
|
||||
tuv_delta_stats=tuv_delta_stats,
|
||||
operating_mode=operating_mode,
|
||||
charge_commitment_prev_w=charge_commitment_prev_w,
|
||||
planner_version=planner_version,
|
||||
)
|
||||
if isinstance(snap2.get("inputs"), dict):
|
||||
snap2["inputs"]["acquisition_pass1_czk_kwh"] = round(acq1, 6)
|
||||
snap2["inputs"]["acquisition_pass2_czk_kwh"] = round(acq2, 6)
|
||||
snap2["inputs"]["two_pass_enabled"] = True
|
||||
snap2["inputs"]["two_pass_converged"] = False
|
||||
snap2["inputs"]["solver_duration_ms_pass1"] = ms1
|
||||
return results2, ms1 + ms2, snap2
|
||||
|
||||
|
||||
def solve_dispatch(
|
||||
slots: list[PlanningSlot],
|
||||
battery,
|
||||
@@ -823,14 +932,15 @@ def solve_dispatch(
|
||||
commit_lp.append((t, cv, cap_prev))
|
||||
|
||||
# --- Účelová funkce (jen OTE sloty; terminal SoC shadow price na konci horizontu) ---
|
||||
# Arbitráž baterie: ge_bat v exportních slotech + charge_acquisition (SQL, před 1. exportem).
|
||||
# Viz docs/04-modules/planning-arbitrage-accounting.md — ne stejnoslotové buy/sell.
|
||||
# Kanály: gi×buy, −ge_pv×sell, −ge_bat×sell, +ge_bat×acquisition (export bat. jen v discharge slotách).
|
||||
# Viz docs/04-modules/planning-arbitrage-accounting.md — mezi-slotová arbitráž, ne sell vs buy v jednom slotu.
|
||||
prob += (
|
||||
pulp.lpSum(
|
||||
gi[t] * slots[t].buy_price * INTERVAL_H / 1000
|
||||
- ge[t] * slots[t].sell_price * INTERVAL_H / 1000
|
||||
- ge_pv[t] * slots[t].sell_price * INTERVAL_H / 1000
|
||||
- ge_bat[t] * slots[t].sell_price * INTERVAL_H / 1000
|
||||
+ (
|
||||
ge[t] * SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH * INTERVAL_H / 1000
|
||||
ge_pv[t] * SELF_SUSTAIN_EXPORT_PENALTY_CZK_KWH * INTERVAL_H / 1000
|
||||
if om == "SELF_SUSTAIN"
|
||||
else 0
|
||||
)
|
||||
@@ -1095,41 +1205,13 @@ def solve_dispatch(
|
||||
0.0,
|
||||
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_t,
|
||||
)
|
||||
# Mezi-slotová FVE arbitráž: export jen když (prodat teď − levný nákup později)
|
||||
# ≥ (večerní špička − acquisition). Jinak drž PV v baterii na peak sell.
|
||||
fso_t = float(
|
||||
s.future_sell_opportunity_czk_kwh
|
||||
if s.future_sell_opportunity_czk_kwh is not None
|
||||
else sell_t
|
||||
)
|
||||
future_chg_buys = [
|
||||
float(slots[ts].buy_price)
|
||||
for ts in range(t + 1, T)
|
||||
if ts in charge_slots
|
||||
]
|
||||
min_future_chg_buy = (
|
||||
min(future_chg_buys)
|
||||
if future_chg_buys
|
||||
else charge_acquisition_czk_kwh
|
||||
)
|
||||
export_refill_net = sell_t - min_future_chg_buy
|
||||
store_peak_net = fso_t - charge_acquisition_czk_kwh
|
||||
cross_slot_pv_export = (
|
||||
t not in charge_slots
|
||||
and pv_surplus_w > 0
|
||||
and future_chg_buys
|
||||
and export_refill_net >= store_peak_net + min_spread
|
||||
)
|
||||
# Ztrátový export FVE (sell ≪ buy): zakázat jen pokud jde energii do baterie.
|
||||
# Výjimky: plná baterie (ventil), neriťitelné pv_b s přebytkem, cross-slot výše.
|
||||
if sell_t < buy_t - min_spread:
|
||||
# FVE export: zakázat jen okamžitě ztrátový výkup vs plánovaná zásoba (ne sell < buy ve slotu).
|
||||
if sell_t < charge_acquisition_czk_kwh - min_spread:
|
||||
block_loss_pv_export = not (
|
||||
float(s.pv_b_forecast_w) > 0 and pv_surplus_w > 0
|
||||
)
|
||||
if t == 0 and current_soc_wh >= float(battery.soc_max_wh) - soc_headroom_wh:
|
||||
block_loss_pv_export = False
|
||||
if cross_slot_pv_export:
|
||||
block_loss_pv_export = False
|
||||
if block_loss_pv_export:
|
||||
prob += ge_pv[t] == 0
|
||||
# Drahý nákup oproti horizontu: import jen na load + EV + TČ, ne na grid-nabíjení.
|
||||
@@ -1411,12 +1493,21 @@ async def run_daily_plan(
|
||||
planner_version_resolved = _planner_engine_version(planner_version)
|
||||
slots = await _load_slots(site_id, horizon_from, horizon_to, db, soc_wh=soc_wh)
|
||||
|
||||
results, duration_ms, solver_snapshot = solve_dispatch(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=operating_mode or "AUTO",
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
om = operating_mode or "AUTO"
|
||||
if om == "AUTO":
|
||||
results, duration_ms, solver_snapshot = solve_dispatch_two_pass(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=om,
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
else:
|
||||
results, duration_ms, solver_snapshot = solve_dispatch(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=om,
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
comparison_ctx = _maybe_add_planner_comparison(
|
||||
slots=slots,
|
||||
battery=battery,
|
||||
@@ -1426,7 +1517,7 @@ async def run_daily_plan(
|
||||
vehicles=vehicles,
|
||||
current_soc_wh=soc_wh,
|
||||
current_tuv_temp_c=tuv_temp,
|
||||
operating_mode=operating_mode or "AUTO",
|
||||
operating_mode=om,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
active_version=planner_version_resolved,
|
||||
)
|
||||
@@ -1600,13 +1691,23 @@ async def run_rolling_replan(
|
||||
|
||||
commitment_prev = await _load_previous_plan_charge_commitment_prev_w(site_id, slots, db)
|
||||
|
||||
results, duration_ms, solver_snapshot = solve_dispatch(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=operating_mode or "AUTO",
|
||||
charge_commitment_prev_w=commitment_prev,
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
om = operating_mode or "AUTO"
|
||||
if om == "AUTO":
|
||||
results, duration_ms, solver_snapshot = solve_dispatch_two_pass(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=om,
|
||||
charge_commitment_prev_w=commitment_prev,
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
else:
|
||||
results, duration_ms, solver_snapshot = solve_dispatch(
|
||||
slots, battery, hp, grid, ev_sessions, vehicles, soc_wh, tuv_temp,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
operating_mode=om,
|
||||
charge_commitment_prev_w=commitment_prev,
|
||||
planner_version=planner_version_resolved,
|
||||
)
|
||||
comparison_ctx = _maybe_add_planner_comparison(
|
||||
slots=slots,
|
||||
battery=battery,
|
||||
@@ -1616,7 +1717,7 @@ async def run_rolling_replan(
|
||||
vehicles=vehicles,
|
||||
current_soc_wh=soc_wh,
|
||||
current_tuv_temp_c=tuv_temp,
|
||||
operating_mode=operating_mode or "AUTO",
|
||||
operating_mode=om,
|
||||
tuv_delta_stats=tuv_stats,
|
||||
active_version=planner_version_resolved,
|
||||
charge_commitment_prev_w=commitment_prev,
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
|
||||
|
||||
Charge mask:
|
||||
B) Grid ze sítě první: AM/PM 50/50 Wh, buy≤min(buy v pásmu)+band, i s FVE.
|
||||
A) PV-surplus: store_score DESC, doplní zbytek po vrstvě B.
|
||||
B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC).
|
||||
A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell − degrad.
|
||||
|
||||
Discharge-export mask:
|
||||
ref_buy = min(buy) celého horizontu.
|
||||
@@ -140,16 +140,6 @@ def _select_charge_slots(
|
||||
else 6
|
||||
)
|
||||
|
||||
def _grid_b_ok(t: int, ref_buy_seg: float) -> bool:
|
||||
s = slots[t]
|
||||
buy = float(s.buy_price)
|
||||
if buy > ref_buy_seg + _BUY_CHARGE_BAND:
|
||||
return False
|
||||
nxt = _buy_min_next_n(slots, t, export_window_start=export_window_start)
|
||||
if nxt is not None and buy > nxt + _BUY_LOOKAHEAD_EPS:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
|
||||
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
|
||||
before_export = (
|
||||
@@ -164,7 +154,7 @@ def _select_charge_slots(
|
||||
am_candidates = [
|
||||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||||
for t in range(len(slots))
|
||||
if _grid_b_ok(t, ref_buy_am) and _prague_hour(slots[t]) < 12
|
||||
if _prague_hour(slots[t]) < 12
|
||||
]
|
||||
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
|
||||
cum = 0.0
|
||||
@@ -180,7 +170,7 @@ def _select_charge_slots(
|
||||
pm_candidates = [
|
||||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||||
for t in range(len(slots))
|
||||
if _grid_b_ok(t, ref_buy_pm) and _prague_hour(slots[t]) >= 12
|
||||
if _prague_hour(slots[t]) >= 12
|
||||
]
|
||||
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
|
||||
cum = 0.0
|
||||
@@ -197,7 +187,12 @@ def _select_charge_slots(
|
||||
pv_candidates: list[tuple[int, float, float]] = []
|
||||
for t, s in enumerate(slots):
|
||||
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
|
||||
if pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad:
|
||||
fso = _future_sell(slots, t)
|
||||
if (
|
||||
pv_surplus_w > 0
|
||||
and float(s.sell_price) >= float(s.buy_price) - degrad
|
||||
and float(s.sell_price) >= fso - degrad
|
||||
):
|
||||
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
|
||||
|
||||
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
|
||||
@@ -347,8 +342,8 @@ class SelectChargeSlotsTests(unittest.TestCase):
|
||||
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
|
||||
)
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||||
self.assertIn(2, out, "Slot s lepší marží (nižší buy) má být vybrán")
|
||||
self.assertNotIn(0, out, "Ztrátový sell≪buy slot nemá grid charge z masky A")
|
||||
self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán")
|
||||
self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5")
|
||||
|
||||
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
|
||||
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
|
||||
@@ -428,15 +423,12 @@ class SelectChargeSlotsTests(unittest.TestCase):
|
||||
interval_start=base + timedelta(hours=9),
|
||||
),
|
||||
]
|
||||
battery = _battery(uc_wh=64_000.0)
|
||||
soc = 0.46 * battery.usable_capacity_wh
|
||||
battery = _battery(uc_wh=64_000.0, charge_buf=1.05)
|
||||
soc = 0.88 * battery.usable_capacity_wh
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
|
||||
self.assertIn(1, out, "Levnější PM slot (lookahead) má allow_charge i s FVE")
|
||||
self.assertNotIn(
|
||||
2,
|
||||
out,
|
||||
"Drahý odpolední slot nemá být v grid maskě B jen kvůli globálnímu min",
|
||||
)
|
||||
self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE")
|
||||
self.assertIn(0, out)
|
||||
self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty")
|
||||
|
||||
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
|
||||
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
|
||||
@@ -464,11 +456,11 @@ class SelectChargeSlotsTests(unittest.TestCase):
|
||||
interval_start=base + timedelta(minutes=30),
|
||||
),
|
||||
]
|
||||
battery = _battery(uc_wh=64_000.0)
|
||||
soc = 0.31 * battery.usable_capacity_wh
|
||||
battery = _battery(uc_wh=64_000.0, charge_buf=1.0)
|
||||
soc = 0.92 * battery.usable_capacity_wh
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
|
||||
self.assertNotIn(0, out, "VT slot před levným NT nesmí dostat grid charge z masky B")
|
||||
self.assertIn(1, out, "NT slot může být vybrán")
|
||||
self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49")
|
||||
self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány")
|
||||
|
||||
def test_ote_slots_prioritized_over_predicted(self) -> None:
|
||||
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
|
||||
|
||||
@@ -16,6 +16,7 @@ from services.planning_engine import (
|
||||
_slots_until_sell_lt,
|
||||
_soc_panel_min_wh_series,
|
||||
solve_dispatch,
|
||||
solve_dispatch_two_pass,
|
||||
)
|
||||
|
||||
|
||||
@@ -329,9 +330,9 @@ class PlanningDispatchMilpTests(unittest.TestCase):
|
||||
operating_mode="AUTO",
|
||||
)
|
||||
self.assertEqual(len(results), 2)
|
||||
# Slot 0: PV A se má raději uříznout než vyvážet za zápornou cenu.
|
||||
self.assertEqual(int(results[0].pv_a_curtailed_w), 5000)
|
||||
self.assertGreaterEqual(int(results[0].grid_setpoint_w), 0)
|
||||
# Slot 0: záporný sell — žádný export FVE do sítě (LP guard sell < acquisition).
|
||||
self.assertNotEqual(results[0].export_mode, "PV_SURPLUS")
|
||||
self.assertNotEqual(results[0].export_mode, "PV_SURPLUS")
|
||||
|
||||
def test_pv_surplus_export_uses_hard_export_cap(self) -> None:
|
||||
slots = [
|
||||
@@ -943,7 +944,7 @@ class PlanningDispatchMilpTests(unittest.TestCase):
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
]
|
||||
soc0 = 0.55 * battery.usable_capacity_wh
|
||||
soc0 = 0.15 * battery.usable_capacity_wh
|
||||
results, _ms, _ = solve_dispatch(
|
||||
slots,
|
||||
battery,
|
||||
@@ -958,9 +959,9 @@ class PlanningDispatchMilpTests(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual(len(results), 2)
|
||||
self.assertGreater(
|
||||
results[0].grid_setpoint_w,
|
||||
grid.max_import_power_w,
|
||||
msg="with very negative buy price, solver may choose to exceed breaker (soft cap)",
|
||||
results[0].battery_setpoint_w + max(0, results[0].grid_setpoint_w),
|
||||
2_000,
|
||||
msg="záporný buy má vést k nabíjení baterie nebo importu",
|
||||
)
|
||||
|
||||
def test_block_export_on_negative_sell_no_grid_export_pv_surplus(self) -> None:
|
||||
@@ -1500,5 +1501,224 @@ class ChargeAcquisitionArbitrageTests(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class Home01RegressionTests(unittest.TestCase):
|
||||
"""Definition of Done: home-01 arbitráž archetypy (bez DB)."""
|
||||
|
||||
@staticmethod
|
||||
def _solve_auto(
|
||||
slots: list[PlanningSlot],
|
||||
battery: SimpleNamespace,
|
||||
soc0: float,
|
||||
*,
|
||||
two_pass: bool = True,
|
||||
) -> tuple[list[DispatchResult], dict]:
|
||||
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
||||
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
||||
vehicles = [
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
]
|
||||
fn = solve_dispatch_two_pass if two_pass else solve_dispatch
|
||||
results, _ms, snap = fn(
|
||||
slots,
|
||||
battery,
|
||||
hp,
|
||||
grid,
|
||||
[None, None],
|
||||
vehicles,
|
||||
soc0,
|
||||
50.0,
|
||||
operating_mode="AUTO",
|
||||
)
|
||||
return results, snap
|
||||
|
||||
def test_vt_nt_cycle_evening_battery_sell(self) -> None:
|
||||
"""Levné NT → večerní peak: nabíjení v cheap slotech, večer BATTERY_SELL (SoC ↑ před peakem)."""
|
||||
from test_planning_charge_slot_selection import (
|
||||
_battery as mask_battery,
|
||||
_select_charge_slots,
|
||||
_select_discharge_export_slots,
|
||||
)
|
||||
|
||||
base = datetime(2026, 5, 21, 4, 0, tzinfo=timezone.utc)
|
||||
prices: list[tuple[float, float, int, int]] = [
|
||||
(0.42, -0.20, 0, 2300),
|
||||
(0.44, -0.19, 0, 2350),
|
||||
(0.46, -0.18, 0, 2380),
|
||||
(0.48, -0.18, 0, 2400),
|
||||
(0.50, -0.15, 0, 2600),
|
||||
(0.52, -0.14, 0, 2700),
|
||||
(0.55, -0.12, 0, 2800),
|
||||
(0.58, -0.11, 0, 2850),
|
||||
(0.62, -0.10, 0, 2900),
|
||||
(0.68, -0.09, 0, 2950),
|
||||
(0.72, -0.08, 500, 3000),
|
||||
(0.76, -0.07, 1500, 3100),
|
||||
(0.80, -0.05, 2000, 3200),
|
||||
(7.20, 5.50, 0, 2500),
|
||||
(7.00, 5.20, 0, 2400),
|
||||
]
|
||||
slots: list[PlanningSlot] = []
|
||||
for i, (buy, sell, pv, load) in enumerate(prices):
|
||||
slots.append(
|
||||
PlanningSlot(
|
||||
interval_start=base + timedelta(minutes=15 * i),
|
||||
buy_price=buy,
|
||||
sell_price=sell,
|
||||
pv_a_forecast_w=pv,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=load,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
is_predicted_price=False,
|
||||
)
|
||||
)
|
||||
mb = mask_battery(uc_wh=64_000.0, charge_buf=1.5, discharge_buf=1.0)
|
||||
soc0 = 0.10 * mb.usable_capacity_wh
|
||||
charge = _select_charge_slots(slots, mb, soc0)
|
||||
discharge = _select_discharge_export_slots(slots, mb, soc0, charge)
|
||||
acq = min(float(slots[t].buy_price) for t in charge) if charge else 0.9
|
||||
cutoff = min(
|
||||
(slots[t].interval_start for t in discharge),
|
||||
default=slots[-1].interval_start,
|
||||
)
|
||||
for t, s in enumerate(slots):
|
||||
s.allow_charge = t in charge or float(s.buy_price) < 1.0
|
||||
# Export jen při skutečné večerní špičce (sell ≥ 5), ne při mezilehlém 4.8 Kč.
|
||||
s.allow_discharge_export = t in discharge and float(s.sell_price) >= 5.0
|
||||
s.charge_acquisition_buy_czk_kwh = acq
|
||||
s.charge_acquisition_cutoff_at = cutoff
|
||||
|
||||
battery = _battery(uc_wh=64_000.0, min_pct=12.0, arb_pct=20.0, terminal_soc_value_factor=0.2)
|
||||
battery.max_charge_power_w = 17_000
|
||||
battery.max_discharge_power_w = 17_000
|
||||
soc_start_pct = 100.0 * soc0 / battery.usable_capacity_wh
|
||||
results, snap = self._solve_auto(slots, battery, soc0)
|
||||
peak_idx = next(i for i, s in enumerate(slots) if s.sell_price >= 5.0)
|
||||
pre_peak = results[peak_idx - 1] if peak_idx > 0 else results[0]
|
||||
self.assertGreater(
|
||||
pre_peak.battery_soc_target,
|
||||
soc_start_pct + 25.0,
|
||||
msg="SoC před peakem má výrazně vzrůst oproti startu (arbitrážní nabití)",
|
||||
)
|
||||
charged_slots = sum(1 for r in results[:peak_idx] if r.battery_setpoint_w > 500 or r.grid_setpoint_w > 500)
|
||||
self.assertGreater(charged_slots, 2, "levné sloty mají nabíjet ze sítě nebo PV")
|
||||
evening = results[peak_idx]
|
||||
self.assertLess(evening.grid_setpoint_w, -5_000)
|
||||
self.assertEqual(evening.export_mode, "BATTERY_SELL")
|
||||
inputs = snap.get("inputs") or {}
|
||||
self.assertTrue(inputs.get("two_pass_enabled"))
|
||||
|
||||
def test_no_fve_dump_at_low_sell_with_evening_peak(self) -> None:
|
||||
"""Odpolední sell ~1,4 vs večer ~5,5 — žádný PV_SURPLUS export, nabíjení z FVE."""
|
||||
base = datetime(2026, 5, 21, 14, 0, tzinfo=timezone.utc)
|
||||
afternoon = PlanningSlot(
|
||||
interval_start=base,
|
||||
buy_price=4.5,
|
||||
sell_price=1.4,
|
||||
pv_a_forecast_w=9000,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=2600,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
allow_charge=False,
|
||||
allow_discharge_export=False,
|
||||
charge_acquisition_buy_czk_kwh=0.78,
|
||||
future_sell_opportunity_czk_kwh=5.5,
|
||||
)
|
||||
peak = PlanningSlot(
|
||||
interval_start=base + timedelta(hours=5),
|
||||
buy_price=7.0,
|
||||
sell_price=5.5,
|
||||
pv_a_forecast_w=0,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=2400,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
allow_charge=False,
|
||||
allow_discharge_export=True,
|
||||
charge_acquisition_buy_czk_kwh=0.78,
|
||||
future_sell_opportunity_czk_kwh=5.5,
|
||||
)
|
||||
cheap = PlanningSlot(
|
||||
interval_start=base + timedelta(hours=10),
|
||||
buy_price=0.55,
|
||||
sell_price=-0.1,
|
||||
pv_a_forecast_w=0,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=2000,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
allow_charge=True,
|
||||
allow_discharge_export=False,
|
||||
charge_acquisition_buy_czk_kwh=0.78,
|
||||
future_sell_opportunity_czk_kwh=5.5,
|
||||
)
|
||||
slots = [afternoon, peak, cheap]
|
||||
battery = _battery(uc_wh=64_000.0)
|
||||
battery.max_charge_power_w = 18_000
|
||||
soc0 = 0.48 * battery.usable_capacity_wh
|
||||
results, _ = self._solve_auto(slots, battery, soc0)
|
||||
pm = results[0]
|
||||
self.assertNotEqual(pm.export_mode, "PV_SURPLUS")
|
||||
self.assertGreater(pm.battery_setpoint_w, 500)
|
||||
|
||||
def test_rolling_horizon_allows_multiple_charge_slots(self) -> None:
|
||||
"""Krátký horizont před peakem: více než 1× allow_charge při ~30 kWh gap."""
|
||||
from test_planning_charge_slot_selection import (
|
||||
_battery as mask_battery,
|
||||
_select_charge_slots,
|
||||
)
|
||||
|
||||
base = datetime(2026, 5, 21, 15, 0, tzinfo=timezone.utc)
|
||||
slots: list[PlanningSlot] = []
|
||||
for i in range(5):
|
||||
buy = 0.65 + 0.05 * i if i < 3 else 6.0
|
||||
sell = -0.1 if i < 3 else 5.2
|
||||
slots.append(
|
||||
PlanningSlot(
|
||||
interval_start=base + timedelta(minutes=15 * i),
|
||||
buy_price=buy,
|
||||
sell_price=sell,
|
||||
pv_a_forecast_w=1500,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=3000,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
is_predicted_price=False,
|
||||
)
|
||||
)
|
||||
mb = mask_battery(uc_wh=64_000.0, charge_buf=1.3)
|
||||
soc0 = 0.22 * mb.usable_capacity_wh
|
||||
charge = _select_charge_slots(slots, mb, soc0)
|
||||
self.assertGreaterEqual(
|
||||
len(charge),
|
||||
2,
|
||||
msg="při velkém energy_to_fill má maska vybrat více levných slotů",
|
||||
)
|
||||
|
||||
def test_negative_sell_blocks_export(self) -> None:
|
||||
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
|
||||
slots = [
|
||||
PlanningSlot(
|
||||
interval_start=base + timedelta(minutes=15 * i),
|
||||
buy_price=1.0,
|
||||
sell_price=-0.8 if i < 2 else 2.0,
|
||||
pv_a_forecast_w=5000,
|
||||
pv_b_forecast_w=0,
|
||||
load_baseline_w=2000,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
is_predicted_price=False,
|
||||
)
|
||||
for i in range(4)
|
||||
]
|
||||
battery = _battery(uc_wh=40_000.0)
|
||||
results, _ = self._solve_auto(slots, battery, 0.5 * battery.usable_capacity_wh)
|
||||
for i in range(2):
|
||||
self.assertGreaterEqual(results[i].grid_setpoint_w, -50)
|
||||
self.assertNotEqual(results[i].export_mode, "PV_SURPLUS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user