zasadni uprava LP planneru
This commit is contained in:
@@ -1057,6 +1057,44 @@ def solve_dispatch(
|
||||
prob += ge_bat[t] == 0
|
||||
prob += z_export[t] == 0
|
||||
|
||||
# Ekonomické guardy: ceny v objective nestačí proti maskám / terminal SoC.
|
||||
ref_buy_horizon = min(float(s.buy_price) for s in slots)
|
||||
min_spread = float(degradation_cost_effective)
|
||||
hp_rated_w = float(heat_pump.rated_heating_power_w)
|
||||
soc_headroom_wh = max(
|
||||
2000.0, 0.05 * float(battery.soc_max_wh)
|
||||
)
|
||||
for t in range(T):
|
||||
s = slots[t]
|
||||
buy_t = float(s.buy_price)
|
||||
sell_t = float(s.sell_price)
|
||||
load_t = float(s.load_baseline_w)
|
||||
ev_cap_t = sum(
|
||||
float(vehicles[e].max_charge_power_w)
|
||||
for e in range(EV)
|
||||
if (e == 0 and s.ev1_connected) or (e == 1 and s.ev2_connected)
|
||||
)
|
||||
pv_surplus_w = max(
|
||||
0.0,
|
||||
float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w) - load_t,
|
||||
)
|
||||
# Ztrátový export FVE (sell ≪ buy): zakázat jen pokud jde energii do baterie.
|
||||
# Výjimky: plná baterie (ventil), neriťitelné pv_b s přebytkem.
|
||||
if sell_t < buy_t - min_spread:
|
||||
block_loss_pv_export = not (
|
||||
float(s.pv_b_forecast_w) > 0 and pv_surplus_w > 0
|
||||
)
|
||||
if t == 0 and current_soc_wh >= float(battery.soc_max_wh) - soc_headroom_wh:
|
||||
block_loss_pv_export = False
|
||||
if block_loss_pv_export:
|
||||
prob += ge_pv[t] == 0
|
||||
# Drahý nákup oproti horizontu: import jen na load + EV + TČ, ne na grid-nabíjení.
|
||||
if buy_t >= 0 and buy_t > ref_buy_horizon + min_spread:
|
||||
prob += gi[t] <= load_t + ev_cap_t + hp_rated_w
|
||||
# Anti souběžný vývoz FVE + významný import (mikrocyklus).
|
||||
if buy_t > sell_t + min_spread and pv_surplus_w > 0:
|
||||
prob += ge_pv[t] <= pv_surplus_w
|
||||
|
||||
# Deadline constraints pro EV
|
||||
for e, session in enumerate(ev_sessions):
|
||||
if session and session.target_deadline and session.energy_needed_wh > 0:
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
|
||||
|
||||
Charge mask:
|
||||
A) PV-surplus: sell_price ASC, dokud PV nepokryje charge target.
|
||||
B) Non-PV: AM/PM 50/50, OTE-first, buy_price ASC.
|
||||
A) PV-surplus: store_score DESC, dokud PV nepokryje charge target.
|
||||
B) Non-PV: AM/PM, OTE-first, buy≤ref+degrad, lookahead, cap 6 slotů.
|
||||
|
||||
Discharge-export mask:
|
||||
ref_buy = min(buy) mezi allow_charge sloty (arbitráž mezi sloty, ne sell vs buy ve stejném).
|
||||
ref_buy = min(buy) celého horizontu.
|
||||
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
|
||||
"""
|
||||
|
||||
@@ -21,6 +21,31 @@ from zoneinfo import ZoneInfo
|
||||
from services.planning_engine import INTERVAL_H, PlanningSlot
|
||||
|
||||
_PRAGUE = ZoneInfo("Europe/Prague")
|
||||
_LOOKAHEAD_SLOTS = 4
|
||||
_GRID_CHARGE_CAP_AM = 6
|
||||
_GRID_CHARGE_CAP_PM = 6
|
||||
_BUY_LOOKAHEAD_EPS = 0.05
|
||||
|
||||
|
||||
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
|
||||
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
|
||||
return max(tail) if tail else float(slots[t].sell_price)
|
||||
|
||||
|
||||
def _buy_min_next_n(slots: list[PlanningSlot], t: int, n: int = _LOOKAHEAD_SLOTS) -> float | None:
|
||||
tail = [
|
||||
float(slots[i].buy_price)
|
||||
for i in range(t + 1, min(t + 1 + n, len(slots)))
|
||||
]
|
||||
return min(tail) if tail else None
|
||||
|
||||
|
||||
def _store_score(slots: list[PlanningSlot], t: int) -> float:
|
||||
s = slots[t]
|
||||
buy = float(s.buy_price)
|
||||
sell = float(s.sell_price)
|
||||
fso = _future_sell(slots, t)
|
||||
return fso - sell - max(0.0, buy - sell)
|
||||
|
||||
|
||||
def _select_charge_slots(
|
||||
@@ -39,12 +64,21 @@ def _select_charge_slots(
|
||||
if energy_to_fill <= 0:
|
||||
return set(range(len(slots)))
|
||||
|
||||
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
|
||||
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
|
||||
ref_buy = min(float(s.buy_price) for s in slots)
|
||||
|
||||
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
|
||||
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
|
||||
per_slot_full_wh = max_p_w * eta * INTERVAL_H
|
||||
charge_target_wh = max(energy_to_fill, 0) * charge_buf
|
||||
if current_soc_wh >= reserve_wh:
|
||||
charge_target_wh = max(energy_to_fill, 0.0)
|
||||
else:
|
||||
charge_target_wh = min(
|
||||
max(energy_to_fill, 0.0) * charge_buf,
|
||||
max(energy_to_fill, 0.0),
|
||||
)
|
||||
|
||||
# AM/PM budget
|
||||
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
|
||||
n_pm = len(slots) - n_am
|
||||
if n_am <= 0:
|
||||
@@ -59,56 +93,67 @@ def _select_charge_slots(
|
||||
|
||||
selected: set[int] = set()
|
||||
|
||||
# A) PV-surplus: cheapest sell_price first
|
||||
# A) PV-surplus: highest store_score first
|
||||
pv_candidates: list[tuple[int, float, float]] = []
|
||||
for t, s in enumerate(slots):
|
||||
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
|
||||
if pv_surplus_w > 0:
|
||||
pv_candidates.append((t, float(s.sell_price), float(pv_surplus_w)))
|
||||
if pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad:
|
||||
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
|
||||
|
||||
pv_candidates.sort(key=lambda x: (x[1], x[0]))
|
||||
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
|
||||
cum = 0.0
|
||||
for t, _sell, pv_surplus_w in pv_candidates:
|
||||
for t, _score, pv_surplus_w in pv_candidates:
|
||||
if cum >= charge_target_wh:
|
||||
break
|
||||
selected.add(t)
|
||||
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
|
||||
|
||||
# B) Non-PV grid charge — jen spot nákup (u fixed je buy všude stejný → jen FVE)
|
||||
if purchase_pricing_mode == "fixed":
|
||||
return selected
|
||||
|
||||
# B) Non-PV: AM budget (OTE-first)
|
||||
def _grid_b_ok(t: int) -> bool:
|
||||
s = slots[t]
|
||||
if max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) > 0:
|
||||
return False
|
||||
buy = float(s.buy_price)
|
||||
sell = float(s.sell_price)
|
||||
if buy > ref_buy + degrad:
|
||||
return False
|
||||
nxt = _buy_min_next_n(slots, t)
|
||||
if nxt is not None and buy > nxt + _BUY_LOOKAHEAD_EPS:
|
||||
return False
|
||||
return True
|
||||
|
||||
# B) AM
|
||||
am_candidates = [
|
||||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||||
for t in range(len(slots))
|
||||
if t not in selected
|
||||
and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0
|
||||
and _prague_hour(slots[t]) < 12
|
||||
if t not in selected and _grid_b_ok(t) and _prague_hour(slots[t]) < 12
|
||||
]
|
||||
am_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0]))
|
||||
cum = 0.0
|
||||
grid_am = 0
|
||||
for t, _pred, _price in am_candidates:
|
||||
if cum >= chg_am or per_slot_full_wh <= 0:
|
||||
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= _GRID_CHARGE_CAP_AM:
|
||||
break
|
||||
selected.add(t)
|
||||
cum += per_slot_full_wh
|
||||
grid_am += 1
|
||||
|
||||
# B) Non-PV: PM budget (OTE-first)
|
||||
pm_candidates = [
|
||||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||||
for t in range(len(slots))
|
||||
if t not in selected
|
||||
and max(0, slots[t].pv_a_forecast_w + slots[t].pv_b_forecast_w - slots[t].load_baseline_w) <= 0
|
||||
and _prague_hour(slots[t]) >= 12
|
||||
if t not in selected and _grid_b_ok(t) and _prague_hour(slots[t]) >= 12
|
||||
]
|
||||
pm_candidates.sort(key=lambda x: (int(x[1]), x[2], x[0]))
|
||||
cum = 0.0
|
||||
grid_pm = 0
|
||||
for t, _pred, _price in pm_candidates:
|
||||
if cum >= chg_pm or per_slot_full_wh <= 0:
|
||||
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= _GRID_CHARGE_CAP_PM:
|
||||
break
|
||||
selected.add(t)
|
||||
cum += per_slot_full_wh
|
||||
grid_pm += 1
|
||||
|
||||
return selected
|
||||
|
||||
@@ -138,13 +183,7 @@ def _select_discharge_export_slots(
|
||||
per_slot_wh = max_p_w * eta * INTERVAL_H
|
||||
discharge_target_wh = exportable_wh * discharge_buf
|
||||
|
||||
if charge_slots is None:
|
||||
charge_slots = _select_charge_slots(slots, battery, current_soc_wh)
|
||||
|
||||
ref_buy = min(
|
||||
(float(slots[t].buy_price) for t in charge_slots),
|
||||
default=min(float(s.buy_price) for s in slots),
|
||||
)
|
||||
ref_buy = min(float(s.buy_price) for s in slots)
|
||||
|
||||
if purchase_pricing_mode == "fixed":
|
||||
sell_min = degrad
|
||||
@@ -182,9 +221,12 @@ def _slot(
|
||||
load: int = 2_000,
|
||||
hour_utc: int = 12,
|
||||
predicted: bool = False,
|
||||
interval_start: datetime | None = None,
|
||||
) -> PlanningSlot:
|
||||
if interval_start is None:
|
||||
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
|
||||
return PlanningSlot(
|
||||
interval_start=datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc),
|
||||
interval_start=interval_start,
|
||||
buy_price=buy,
|
||||
sell_price=sell,
|
||||
pv_a_forecast_w=0,
|
||||
@@ -203,6 +245,7 @@ def _battery(
|
||||
uc_wh: float = 64_000.0,
|
||||
soc_max_pct: float = 95.0,
|
||||
min_soc_pct: float = 10.0,
|
||||
reserve_soc_pct: float = 20.0,
|
||||
max_charge_w: float = 18_000.0,
|
||||
max_discharge_w: float = 18_000.0,
|
||||
charge_eff: float = 0.95,
|
||||
@@ -213,6 +256,7 @@ def _battery(
|
||||
return SimpleNamespace(
|
||||
usable_capacity_wh=uc,
|
||||
min_soc_wh=min_soc_pct / 100.0 * uc,
|
||||
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
|
||||
soc_max_wh=soc_max_pct / 100.0 * uc,
|
||||
max_charge_power_w=max_charge_w,
|
||||
max_discharge_power_w=max_discharge_w,
|
||||
@@ -239,81 +283,82 @@ class SelectChargeSlotsTests(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual(out, set(range(3)))
|
||||
|
||||
def test_pv_surplus_cheapest_sell_price_selected(self) -> None:
|
||||
"""PV-surplus sloty s nejnižší sell_price se vybírají přednostně."""
|
||||
def test_pv_surplus_high_store_score_selected(self) -> None:
|
||||
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
|
||||
slots = [
|
||||
_slot(buy=1.0, sell=2.0, pv=8_000, load=2_000),
|
||||
_slot(buy=1.0, sell=5.0, pv=8_000, load=2_000),
|
||||
_slot(buy=1.0, sell=3.0, pv=8_000, load=2_000),
|
||||
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
|
||||
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
|
||||
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
|
||||
]
|
||||
battery = _battery(
|
||||
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
|
||||
)
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||||
self.assertIn(0, out, "Cheapest sell_price PV slot must be selected")
|
||||
self.assertNotIn(1, out, "Expensive sell_price PV slot should be excluded")
|
||||
self.assertIn(2, out, "Slot s lepší marží (nižší buy) má být vybrán")
|
||||
self.assertNotIn(0, out, "Ztrátový sell≪buy slot nemá grid charge z masky A")
|
||||
|
||||
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
|
||||
"""Non-PV sloty se vybírají dle buy_price v rámci AM/PM rozpočtu."""
|
||||
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
|
||||
slots = [
|
||||
_slot(buy=0.5, hour_utc=4), # AM slot, cheap
|
||||
_slot(buy=3.0, hour_utc=5), # AM slot, expensive
|
||||
_slot(buy=0.4, hour_utc=14), # PM slot, cheap
|
||||
_slot(buy=9.9, hour_utc=15), # PM slot, expensive
|
||||
_slot(buy=0.5, hour_utc=4),
|
||||
_slot(buy=3.0, hour_utc=5),
|
||||
_slot(buy=0.4, hour_utc=14),
|
||||
_slot(buy=9.9, hour_utc=15),
|
||||
]
|
||||
battery = _battery(
|
||||
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
|
||||
)
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||||
self.assertIn(0, out, "Cheapest AM slot must be selected")
|
||||
self.assertIn(2, out, "Cheapest PM slot must be selected")
|
||||
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
|
||||
|
||||
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
|
||||
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
|
||||
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
|
||||
slots = [
|
||||
_slot(
|
||||
buy=1.49,
|
||||
sell=-0.04,
|
||||
pv=0,
|
||||
load=3_500,
|
||||
interval_start=base,
|
||||
),
|
||||
_slot(
|
||||
buy=0.86,
|
||||
sell=0.01,
|
||||
pv=0,
|
||||
load=3_500,
|
||||
interval_start=base + timedelta(minutes=15),
|
||||
),
|
||||
_slot(
|
||||
buy=0.86,
|
||||
sell=0.01,
|
||||
pv=0,
|
||||
load=3_500,
|
||||
interval_start=base + timedelta(minutes=30),
|
||||
),
|
||||
]
|
||||
battery = _battery(uc_wh=64_000.0)
|
||||
soc = 0.31 * battery.usable_capacity_wh
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
|
||||
self.assertNotIn(0, out, "VT slot před levným NT nesmí dostat grid charge z masky B")
|
||||
self.assertIn(1, out, "NT slot může být vybrán")
|
||||
|
||||
def test_ote_slots_prioritized_over_predicted(self) -> None:
|
||||
"""OTE sloty (is_predicted_price=false) mají přednost před predikovanými."""
|
||||
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
|
||||
slots = [
|
||||
_slot(buy=3.56, hour_utc=13, predicted=False), # OTE, dražší
|
||||
_slot(buy=2.00, hour_utc=13, predicted=True), # predicted, levnější
|
||||
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
|
||||
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
|
||||
]
|
||||
battery = _battery(
|
||||
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
|
||||
)
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||||
self.assertIn(0, out, "OTE slot must be selected even if pricier than predicted")
|
||||
|
||||
def test_does_not_exclude_slot_just_because_pv_below_load(self) -> None:
|
||||
"""Regrese: sloty bez PV-surplus se vybírají přes AM/PM grid budget."""
|
||||
slots = [
|
||||
_slot(buy=0.4, pv=3_320, load=3_747, hour_utc=13),
|
||||
_slot(buy=0.42, pv=2_116, load=3_747, hour_utc=13),
|
||||
_slot(buy=0.44, pv=1_649, load=3_747, hour_utc=13),
|
||||
_slot(buy=0.47, pv=1_276, load=3_747, hour_utc=13),
|
||||
]
|
||||
battery = _battery()
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
|
||||
for idx in (0, 1, 2, 3):
|
||||
self.assertIn(idx, out)
|
||||
|
||||
def test_long_horizon_pv_surplus_does_not_exhaust_grid_budget(self) -> None:
|
||||
"""Regrese: v 96h horizontu nesmí PV-surplus sloty „vyžrat" grid rozpočet."""
|
||||
cheap_grid = [_slot(buy=0.4 + 0.01 * i, pv=0, load=2_000) for i in range(40)]
|
||||
pv_days = [_slot(buy=1.5, sell=1.5, pv=10_000, load=2_000) for _ in range(100)]
|
||||
slots = cheap_grid + pv_days
|
||||
battery = _battery(
|
||||
charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0, max_charge_w=18_000.0
|
||||
)
|
||||
out = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
|
||||
grid_selected = sum(1 for i in range(len(cheap_grid)) if i in out)
|
||||
self.assertGreaterEqual(
|
||||
grid_selected,
|
||||
5,
|
||||
"V dlouhém horizontu s mnoha PV-surplus sloty musí zůstat dostatek "
|
||||
"grid slotů povolených pro nabíjení z levného importu.",
|
||||
)
|
||||
self.assertIn(0, out)
|
||||
self.assertNotIn(1, out)
|
||||
|
||||
|
||||
class SelectDischargeExportSlotsTests(unittest.TestCase):
|
||||
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
|
||||
"""Regrese home-01: večer sell 3.3 > ref_buy 0.5 + degrad i když buy ve slotu je 5.6."""
|
||||
slots = [
|
||||
_slot(buy=0.50, sell=-0.30, hour_utc=6),
|
||||
_slot(buy=0.51, sell=-0.29, hour_utc=7),
|
||||
@@ -325,8 +370,7 @@ class SelectDischargeExportSlotsTests(unittest.TestCase):
|
||||
discharge = _select_discharge_export_slots(
|
||||
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
|
||||
)
|
||||
self.assertIn(0, charge)
|
||||
self.assertIn(2, discharge, "Evening sell must qualify vs ref buy, not same-slot buy")
|
||||
self.assertIn(2, discharge)
|
||||
self.assertIn(3, discharge)
|
||||
|
||||
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
|
||||
@@ -335,16 +379,13 @@ class SelectDischargeExportSlotsTests(unittest.TestCase):
|
||||
_slot(buy=4.00, sell=0.50, hour_utc=18),
|
||||
]
|
||||
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
|
||||
charge = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||||
discharge = _select_discharge_export_slots(
|
||||
slots, battery, current_soc_wh=0.0, charge_slots=charge
|
||||
slots, battery, current_soc_wh=0.0
|
||||
)
|
||||
self.assertNotIn(1, discharge, "sell 0.5 < ref 0.4 + 0.15")
|
||||
self.assertNotIn(1, discharge)
|
||||
|
||||
|
||||
class FixedPurchasePricingTests(unittest.TestCase):
|
||||
"""purchase_pricing_mode=fixed: žádné grid CHARGE, export dle sell."""
|
||||
|
||||
def test_fixed_skips_non_pv_grid_charge_slots(self) -> None:
|
||||
slots = [
|
||||
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
|
||||
@@ -357,7 +398,7 @@ class FixedPurchasePricingTests(unittest.TestCase):
|
||||
current_soc_wh=0.4 * battery.usable_capacity_wh,
|
||||
purchase_pricing_mode="fixed",
|
||||
)
|
||||
self.assertEqual(out, set(), "fixed buy must not enable non-PV grid charge")
|
||||
self.assertEqual(out, set())
|
||||
|
||||
def test_fixed_allows_discharge_on_high_sell(self) -> None:
|
||||
slots = [
|
||||
|
||||
@@ -1290,5 +1290,74 @@ class TerminalSocShadowTests(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class SpreadGuardHome01EconomicsTests(unittest.TestCase):
|
||||
"""Regrese: sell≪buy (VT) nesmí vést k PV exportu + masivnímu grid importu ve stejném slotu."""
|
||||
|
||||
def test_loss_making_morning_and_vt_slot_avoid_export_and_grid_charge(self) -> None:
|
||||
from test_planning_charge_slot_selection import (
|
||||
_battery as mask_battery,
|
||||
_select_charge_slots,
|
||||
_select_discharge_export_slots,
|
||||
)
|
||||
|
||||
base = datetime(2026, 5, 21, 8, 0, tzinfo=timezone.utc)
|
||||
raw: list[tuple[float, float, int, int]] = [
|
||||
(1.55, 0.01, 6_000, 2_000),
|
||||
(1.55, 0.01, 6_500, 2_000),
|
||||
(1.49, -0.04, 0, 3_500),
|
||||
(0.86, 0.01, 0, 3_500),
|
||||
(0.86, 0.01, 0, 3_500),
|
||||
(0.86, 0.01, 5_000, 2_000),
|
||||
]
|
||||
slots: list[PlanningSlot] = []
|
||||
for i, (buy, sell, pv, load) in enumerate(raw):
|
||||
slots.append(
|
||||
PlanningSlot(
|
||||
interval_start=base + timedelta(minutes=15 * i),
|
||||
buy_price=buy,
|
||||
sell_price=sell,
|
||||
pv_a_forecast_w=0,
|
||||
pv_b_forecast_w=pv,
|
||||
load_baseline_w=load,
|
||||
ev1_connected=False,
|
||||
ev2_connected=False,
|
||||
is_predicted_price=False,
|
||||
)
|
||||
)
|
||||
mb = mask_battery(uc_wh=64_000.0)
|
||||
soc0 = 0.31 * mb.usable_capacity_wh
|
||||
charge = _select_charge_slots(slots, mb, soc0)
|
||||
discharge = _select_discharge_export_slots(slots, mb, soc0)
|
||||
for t, s in enumerate(slots):
|
||||
s.allow_charge = t in charge
|
||||
s.allow_discharge_export = t in discharge
|
||||
|
||||
battery = _battery(uc_wh=64_000.0, terminal_soc_value_factor=0.9)
|
||||
hp = SimpleNamespace(rated_heating_power_w=0, tuv_min_temp_c=45.0, tuv_target_temp_c=55.0)
|
||||
grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000)
|
||||
vehicles = [
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
SimpleNamespace(max_charge_power_w=0, battery_capacity_kwh=1.0, default_target_soc_pct=80.0),
|
||||
]
|
||||
results, _ms, _ = solve_dispatch(
|
||||
slots,
|
||||
battery,
|
||||
hp,
|
||||
grid,
|
||||
[None, None],
|
||||
vehicles,
|
||||
soc0,
|
||||
50.0,
|
||||
operating_mode="AUTO",
|
||||
)
|
||||
self.assertEqual(len(results), len(slots))
|
||||
morning = results[0]
|
||||
vt_before_nt = results[2]
|
||||
self.assertLessEqual(morning.grid_setpoint_w, slots[0].load_baseline_w + 500)
|
||||
self.assertNotEqual(morning.export_mode, "PV_SURPLUS")
|
||||
self.assertLessEqual(vt_before_nt.grid_setpoint_w, 4_000)
|
||||
self.assertLessEqual(vt_before_nt.battery_setpoint_w, 2_000)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user