Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek 0f922c91f5
Some checks failed
CI and deploy / migration-check (push) Failing after 22s
CI and deploy / deploy (push) Has been skipped
dalsi oprava
2026-05-23 22:30:46 +02:00

755 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích a exportních slotů referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
Charge mask:
B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC).
A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell degrad.
Discharge-export mask:
ref_buy = min(buy) celého horizontu.
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
"""
from __future__ import annotations
import unittest
from datetime import date, datetime, timezone, timedelta
from types import SimpleNamespace
from zoneinfo import ZoneInfo
from services.planning_engine import INTERVAL_H, PlanningSlot
_PRAGUE = ZoneInfo("Europe/Prague")
_LOOKAHEAD_SLOTS = 4
_BUY_LOOKAHEAD_EPS = 0.05
_BUY_CHARGE_BAND = 0.40
_MAX_GRID_CHARGE_CAP = 24
def _prague_date(s: PlanningSlot) -> date:
return s.interval_start.astimezone(_PRAGUE).date()
def _export_window_start(slots: list[PlanningSlot], degrad: float) -> datetime | None:
"""Kopie R__063: sell > min(buy) téhož kalendářního dne (Prague) + degrad."""
result: datetime | None = None
for s in slots:
day = _prague_date(s)
day_min = min(
float(x.buy_price) for x in slots if _prague_date(x) == day
)
if float(s.sell_price) > day_min + degrad:
if result is None or s.interval_start < result:
result = s.interval_start
return result
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
return max(tail) if tail else float(slots[t].sell_price)
def _buy_min_next_n(
slots: list[PlanningSlot],
t: int,
n: int = _LOOKAHEAD_SLOTS,
*,
export_window_start: datetime | None = None,
) -> float | None:
tail = [
float(slots[i].buy_price)
for i in range(t + 1, min(t + 1 + n, len(slots)))
if export_window_start is None or slots[i].interval_start < export_window_start
]
return min(tail) if tail else None
def _store_score(slots: list[PlanningSlot], t: int) -> float:
s = slots[t]
buy = float(s.buy_price)
sell = float(s.sell_price)
fso = _future_sell(slots, t)
return fso - sell - max(0.0, buy - sell)
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set(range(len(slots)))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
ref_buy_am = min(
(float(s.buy_price) for s in slots if _prague_hour(s) < 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_pm = min(
(float(s.buy_price) for s in slots if _prague_hour(s) >= 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_global = min(float(s.buy_price) for s in slots)
export_window_start = _export_window_start(slots, degrad)
plan_day = _prague_date(slots[0])
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0)
if charge_buf > 0:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(soc_max_wh - float(current_soc_wh), 0.0),
)
elif current_soc_wh >= reserve_wh:
charge_target_wh = max(energy_to_fill, 0.0)
else:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(energy_to_fill, 0.0),
)
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
n_pm = len(slots) - n_am
if n_am <= 0:
chg_am = 0.0
chg_pm = charge_target_wh
elif n_pm <= 0:
chg_am = charge_target_wh
chg_pm = 0.0
else:
chg_am = charge_target_wh / 2.0
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
grid_filled_wh = 0.0
buf_mult = charge_buf if charge_buf > 0 else 1.0
cap_am = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_am / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
cap_pm = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
before_export = (
0
if export_window_start is not None
and slots[t].interval_start < export_window_start
else 1
)
return (today_first, before_export, int(pred), price, t)
if purchase_pricing_mode != "fixed":
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) < 12
]
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_am = 0
for t, _pred, _price in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
if per_slot_full_wh > 0:
cap_pm = max(
cap_pm,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_pm = 0
for t, _pred, _price in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
for t, s in enumerate(slots):
if float(s.buy_price) < 0:
selected.add(t)
elif purchase_pricing_mode == "fixed" and any(
float(s.sell_price) > float(s.buy_price) + degrad for s in slots
):
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False))
for t in range(len(slots))
if _prague_hour(slots[t]) < 12
]
am_candidates.sort(
key=lambda x: (
_grid_sort_key(x[0], x[1], 0.0)[0],
_grid_sort_key(x[0], x[1], 0.0)[1],
_grid_sort_key(x[0], x[1], 0.0)[2],
x[0],
)
)
cum = 0.0
grid_am = 0
for t, _pred in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
if per_slot_full_wh > 0:
cap_pm = max(
cap_pm,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False))
for t in range(len(slots))
if _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(
key=lambda x: (
_grid_sort_key(x[0], x[1], 0.0)[0],
_grid_sort_key(x[0], x[1], 0.0)[1],
_grid_sort_key(x[0], x[1], 0.0)[2],
x[0],
)
)
cum = 0.0
grid_pm = 0
for t, _pred in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0)
pv_candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
fso = _future_sell(slots, t)
if (
pv_surplus_w > 0
and float(s.sell_price) >= float(s.buy_price) - degrad
and (
float(s.sell_price) < 0
or float(s.sell_price) >= fso - degrad
)
):
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
cum = 0.0
for t, _score, pv_surplus_w in pv_candidates:
if cum >= pv_layer_cap:
break
selected.add(t)
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
return selected
def _select_discharge_export_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
charge_slots: set[int] | None = None,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
if discharge_buf <= 0:
return set(range(len(slots)))
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
soc_max_wh = float(battery.soc_max_wh)
exportable_wh = soc_max_wh - min_soc_wh
if exportable_wh <= 0:
return set()
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
discharge_target_wh = exportable_wh * discharge_buf
ref_buy = min(float(s.buy_price) for s in slots)
if purchase_pricing_mode == "fixed":
sell_min = None # per-slot buy + degrad below
else:
sell_min = ref_buy + degrad
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if (
float(slots[t].sell_price) > float(slots[t].buy_price) + degrad
if purchase_pricing_mode == "fixed"
else float(slots[t].sell_price) > sell_min
)
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None
if first_neg is not None and neg_day is not None:
filtered: list[tuple[int, float]] = []
for t, sell in candidates:
if t >= first_neg:
filtered.append((t, sell))
continue
if _prague_date(slots[t]) != neg_day:
filtered.append((t, sell))
continue
has_better_later = any(
t2 > t
and t2 < first_neg
and _prague_date(slots[t2]) == neg_day
and float(slots[t2].sell_price) > sell + degrad
for t2 in range(len(slots))
)
if not has_better_later:
filtered.append((t, sell))
candidates = filtered
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
if cum >= discharge_target_wh or per_slot_wh <= 0:
break
selected.add(t)
cum += per_slot_wh
if first_neg is not None and neg_day is not None:
evening_by_day: dict = {}
for t, s in enumerate(slots):
d = _prague_date(s)
if _prague_hour(s) < 17:
continue
evening_by_day[d] = max(evening_by_day.get(d, 0.0), float(s.sell_price))
for t, s in enumerate(slots):
d = _prague_date(s)
peak = evening_by_day.get(d, 0.0)
if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad:
if purchase_pricing_mode == "fixed":
if float(s.sell_price) > float(s.buy_price) + degrad:
selected.add(t)
elif float(s.sell_price) > sell_min:
selected.add(t)
preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0)
if (
first_neg is not None
and first_neg > 0
and current_soc_wh >= preneg_min_soc
and neg_day is not None
):
morning_sells = [
float(slots[i].sell_price)
for i in range(first_neg)
if float(slots[i].sell_price) >= 0
and _prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
]
if morning_sells:
zone_peak = max(morning_sells)
for i in range(first_neg):
if (
_prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
and float(slots[i].sell_price) >= zone_peak - degrad
):
selected.add(i)
for i in range(first_neg):
if _prague_date(slots[i]) != neg_day:
continue
h = _prague_hour(slots[i])
if 5 <= h < 17 and float(slots[i].sell_price) < zone_peak - degrad:
selected.discard(i)
return selected
def _prague_hour(s: PlanningSlot) -> int:
dt = s.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_PRAGUE).hour
def _slot(
*,
buy: float,
sell: float = 1.0,
pv: int = 0,
load: int = 2_000,
hour_utc: int = 12,
predicted: bool = False,
interval_start: datetime | None = None,
) -> PlanningSlot:
if interval_start is None:
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
return PlanningSlot(
interval_start=interval_start,
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=predicted,
)
def _battery(
*,
charge_buf: float = 1.3,
discharge_buf: float = 1.5,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
min_soc_pct: float = 10.0,
reserve_soc_pct: float = 20.0,
max_charge_w: float = 18_000.0,
max_discharge_w: float = 18_000.0,
charge_eff: float = 0.95,
discharge_eff: float = 0.95,
degrad: float = 0.15,
) -> SimpleNamespace:
uc = uc_wh
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_soc_pct / 100.0 * uc,
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
soc_max_wh=soc_max_pct / 100.0 * uc,
max_charge_power_w=max_charge_w,
max_discharge_power_w=max_discharge_w,
charge_efficiency=charge_eff,
discharge_efficiency=discharge_eff,
charge_slot_buffer=charge_buf,
discharge_slot_buffer=discharge_buf,
degradation_cost_czk_kwh=degrad,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_returns_all_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set(range(3)))
def test_pv_surplus_high_store_score_selected(self) -> None:
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
slots = [
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
]
battery = _battery(
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán")
self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5")
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
slots = [
_slot(buy=0.5, hour_utc=4),
_slot(buy=3.0, hour_utc=5),
_slot(buy=0.4, hour_utc=14),
_slot(buy=9.9, hour_utc=15),
]
battery = _battery(
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None:
"""Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy)."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base),
_slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)),
_slot(
buy=5.5,
sell=3.8,
pv=0,
load=2500,
interval_start=base + timedelta(hours=7, minutes=30),
),
_slot(
buy=0.50,
sell=-0.3,
pv=2000,
load=5000,
interval_start=base + timedelta(hours=26),
),
]
battery = _battery(uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh)
self.assertIn(0, out)
self.assertIn(1, out)
self.assertEqual(
min(out),
0,
"Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)",
)
def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None:
"""Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(
buy=0.80,
sell=-0.08,
pv=2_500,
load=3_400,
interval_start=base,
),
_slot(
buy=0.72,
sell=-0.13,
pv=500,
load=3_400,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=2.50,
sell=1.40,
pv=2_000,
load=3_800,
interval_start=base + timedelta(hours=5),
),
_slot(
buy=5.50,
sell=3.80,
pv=100,
load=2_900,
interval_start=base + timedelta(hours=9),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.05)
soc = 0.88 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE")
self.assertIn(0, out)
self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty")
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
slots = [
_slot(
buy=1.49,
sell=-0.04,
pv=0,
load=3_500,
interval_start=base,
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=30),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.0)
soc = 0.92 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49")
self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány")
def test_pm_grid_gets_unused_am_wh_budget(self) -> None:
"""Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge."""
base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base),
_slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)),
_slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)),
_slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)),
_slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)),
]
battery = _battery(charge_buf=1.3, uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh)
pm_cheap = {2, 3}
self.assertTrue(
pm_cheap & out,
"po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu",
)
def test_ote_slots_prioritized_over_predicted(self) -> None:
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
slots = [
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
]
battery = _battery(
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out)
self.assertNotIn(1, out)
class SelectDischargeExportSlotsTests(unittest.TestCase):
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
slots = [
_slot(buy=0.50, sell=-0.30, hour_utc=6),
_slot(buy=0.51, sell=-0.29, hour_utc=7),
_slot(buy=5.60, sell=3.30, hour_utc=16),
_slot(buy=5.98, sell=3.37, hour_utc=17),
]
battery = _battery(uc_wh=64_000.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
)
self.assertIn(2, discharge)
self.assertIn(3, discharge)
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
slots = [
_slot(buy=0.40, sell=0.10, hour_utc=8),
_slot(buy=4.00, sell=0.50, hour_utc=18),
]
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.0
)
self.assertNotIn(1, discharge)
class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_skips_grid_charge_when_no_sell_arbitrage(self) -> None:
"""Fixní buy bez výkupu nad buy+degrad → žádné grid nabíjení."""
slots = [
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
_slot(buy=6.35, sell=3.5, hour_utc=18, load=500),
]
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.4 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertEqual(out, set())
def test_fixed_grid_charge_before_evening_export(self) -> None:
"""BA81: konstantní buy, večerní sell > buy+degrad → NT/AM grid sloty."""
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
slots: list[PlanningSlot] = []
for i in range(96):
t = base + timedelta(minutes=15 * i)
sell = 3.75 if t.hour >= 18 else 2.8
slots.append(
_slot(
buy=3.088,
sell=sell,
load=200,
interval_start=t.astimezone(timezone.utc),
)
)
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.33 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
night = {t for t in out if _prague_hour(slots[t]) < 8}
self.assertGreater(len(night), 0, "očekáváno grid nabíjení v noci před večerním výkupem")
def test_fixed_allows_discharge_on_high_sell(self) -> None:
slots = [
_slot(buy=3.09, sell=1.0, hour_utc=10),
_slot(buy=3.09, sell=3.8, hour_utc=18),
_slot(buy=3.09, sell=3.5, hour_utc=19),
]
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
discharge = _select_discharge_export_slots(
slots,
battery,
current_soc_wh=0.5 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertIn(1, discharge)
self.assertIn(2, discharge, "oba sloty sell > buy + degrad")
if __name__ == "__main__":
unittest.main()