790 lines
28 KiB
Python
790 lines
28 KiB
Python
"""Pre-selection nabíjecích a exportních slotů – referenční Python.
|
||
|
||
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
|
||
|
||
Charge mask:
|
||
B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC).
|
||
A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell − degrad.
|
||
|
||
Discharge-export mask:
|
||
ref_buy = min(buy) celého horizontu.
|
||
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import unittest
|
||
from datetime import date, datetime, timezone, timedelta
|
||
from types import SimpleNamespace
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from services.planning_engine import INTERVAL_H, PlanningSlot
|
||
|
||
_PRAGUE = ZoneInfo("Europe/Prague")
|
||
_LOOKAHEAD_SLOTS = 4
|
||
_BUY_LOOKAHEAD_EPS = 0.05
|
||
_BUY_CHARGE_BAND = 0.40
|
||
_MAX_GRID_CHARGE_CAP = 24
|
||
|
||
|
||
def _prague_date(s: PlanningSlot) -> date:
|
||
return s.interval_start.astimezone(_PRAGUE).date()
|
||
|
||
|
||
def _export_window_start_by_day(
|
||
slots: list[PlanningSlot], degrad: float
|
||
) -> dict[date, datetime]:
|
||
"""Kopie R__063: první sell > min(buy) téhož kalendářního dne (Prague) + degrad."""
|
||
out: dict[date, datetime] = {}
|
||
for s in slots:
|
||
day = _prague_date(s)
|
||
day_min = min(float(x.buy_price) for x in slots if _prague_date(x) == day)
|
||
if float(s.sell_price) > day_min + degrad:
|
||
prev = out.get(day)
|
||
if prev is None or s.interval_start < prev:
|
||
out[day] = s.interval_start
|
||
return out
|
||
|
||
|
||
def _before_day_export(
|
||
slots: list[PlanningSlot], t: int, export_by_day: dict[date, datetime]
|
||
) -> bool:
|
||
start = export_by_day.get(_prague_date(slots[t]))
|
||
return start is not None and slots[t].interval_start < start
|
||
|
||
|
||
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
|
||
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
|
||
return max(tail) if tail else float(slots[t].sell_price)
|
||
|
||
|
||
def _buy_min_next_n(
|
||
slots: list[PlanningSlot],
|
||
t: int,
|
||
n: int = _LOOKAHEAD_SLOTS,
|
||
*,
|
||
export_by_day: dict[date, datetime] | None = None,
|
||
) -> float | None:
|
||
tail: list[float] = []
|
||
for i in range(t + 1, min(t + 1 + n, len(slots))):
|
||
day_start = export_by_day.get(_prague_date(slots[i])) if export_by_day else None
|
||
if day_start is None or slots[i].interval_start < day_start:
|
||
tail.append(float(slots[i].buy_price))
|
||
return min(tail) if tail else None
|
||
|
||
|
||
def _store_score(slots: list[PlanningSlot], t: int) -> float:
|
||
s = slots[t]
|
||
buy = float(s.buy_price)
|
||
sell = float(s.sell_price)
|
||
fso = _future_sell(slots, t)
|
||
return fso - sell - max(0.0, buy - sell)
|
||
|
||
|
||
def _select_charge_slots(
|
||
slots: list[PlanningSlot],
|
||
battery: SimpleNamespace,
|
||
current_soc_wh: float,
|
||
*,
|
||
purchase_pricing_mode: str = "spot",
|
||
) -> set[int]:
|
||
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
|
||
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
|
||
if charge_buf <= 0:
|
||
return set(range(len(slots)))
|
||
|
||
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
|
||
if energy_to_fill <= 0:
|
||
return set(range(len(slots)))
|
||
|
||
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
|
||
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
|
||
ref_buy_am = min(
|
||
(float(s.buy_price) for s in slots if _prague_hour(s) < 12),
|
||
default=min(float(s.buy_price) for s in slots),
|
||
)
|
||
ref_buy_pm = min(
|
||
(float(s.buy_price) for s in slots if _prague_hour(s) >= 12),
|
||
default=min(float(s.buy_price) for s in slots),
|
||
)
|
||
ref_buy_global = min(float(s.buy_price) for s in slots)
|
||
export_by_day = _export_window_start_by_day(slots, degrad)
|
||
plan_day = _prague_date(slots[0])
|
||
|
||
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
|
||
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
|
||
per_slot_full_wh = max_p_w * eta * INTERVAL_H
|
||
soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0)
|
||
if charge_buf > 0:
|
||
charge_target_wh = min(
|
||
max(energy_to_fill, 0.0) * charge_buf,
|
||
max(soc_max_wh - float(current_soc_wh), 0.0),
|
||
)
|
||
elif current_soc_wh >= reserve_wh:
|
||
charge_target_wh = max(energy_to_fill, 0.0)
|
||
else:
|
||
charge_target_wh = min(
|
||
max(energy_to_fill, 0.0) * charge_buf,
|
||
max(energy_to_fill, 0.0),
|
||
)
|
||
|
||
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
|
||
n_pm = len(slots) - n_am
|
||
if n_am <= 0:
|
||
chg_am = 0.0
|
||
chg_pm = charge_target_wh
|
||
elif n_pm <= 0:
|
||
chg_am = charge_target_wh
|
||
chg_pm = 0.0
|
||
else:
|
||
chg_am = charge_target_wh / 2.0
|
||
chg_pm = charge_target_wh - chg_am
|
||
|
||
selected: set[int] = set()
|
||
grid_filled_wh = 0.0
|
||
|
||
buf_mult = charge_buf if charge_buf > 0 else 1.0
|
||
cap_am = (
|
||
max(
|
||
1,
|
||
min(
|
||
_MAX_GRID_CHARGE_CAP,
|
||
int(chg_am / per_slot_full_wh * buf_mult) + 1,
|
||
),
|
||
)
|
||
if per_slot_full_wh > 0
|
||
else 6
|
||
)
|
||
cap_pm = (
|
||
max(
|
||
1,
|
||
min(
|
||
_MAX_GRID_CHARGE_CAP,
|
||
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
|
||
),
|
||
)
|
||
if per_slot_full_wh > 0
|
||
else 6
|
||
)
|
||
|
||
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
|
||
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
|
||
before_export = 0 if _before_day_export(slots, t, export_by_day) else 1
|
||
return (today_first, before_export, int(pred), price, t)
|
||
|
||
if purchase_pricing_mode != "fixed":
|
||
am_candidates = [
|
||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||
for t in range(len(slots))
|
||
if _prague_hour(slots[t]) < 12
|
||
]
|
||
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
|
||
cum = 0.0
|
||
grid_am = 0
|
||
for t, _pred, _price in am_candidates:
|
||
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
|
||
break
|
||
selected.add(t)
|
||
cum += per_slot_full_wh
|
||
grid_am += 1
|
||
grid_filled_wh += cum
|
||
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
|
||
if per_slot_full_wh > 0:
|
||
cap_pm = max(
|
||
cap_pm,
|
||
min(
|
||
_MAX_GRID_CHARGE_CAP,
|
||
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
|
||
),
|
||
)
|
||
|
||
pm_candidates = [
|
||
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
|
||
for t in range(len(slots))
|
||
if _prague_hour(slots[t]) >= 12
|
||
]
|
||
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
|
||
cum = 0.0
|
||
grid_pm = 0
|
||
for t, _pred, _price in pm_candidates:
|
||
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
|
||
break
|
||
selected.add(t)
|
||
cum += per_slot_full_wh
|
||
grid_pm += 1
|
||
grid_filled_wh += cum
|
||
|
||
for t, s in enumerate(slots):
|
||
if float(s.buy_price) < 0:
|
||
selected.add(t)
|
||
|
||
elif purchase_pricing_mode == "fixed" and any(
|
||
float(s.sell_price) > float(s.buy_price) + degrad for s in slots
|
||
):
|
||
am_candidates = [
|
||
(t, getattr(slots[t], "is_predicted_price", False))
|
||
for t in range(len(slots))
|
||
if _prague_hour(slots[t]) < 12
|
||
]
|
||
am_candidates.sort(
|
||
key=lambda x: (
|
||
_grid_sort_key(x[0], x[1], 0.0)[0],
|
||
_grid_sort_key(x[0], x[1], 0.0)[1],
|
||
_grid_sort_key(x[0], x[1], 0.0)[2],
|
||
x[0],
|
||
)
|
||
)
|
||
cum = 0.0
|
||
grid_am = 0
|
||
for t, _pred in am_candidates:
|
||
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
|
||
break
|
||
selected.add(t)
|
||
cum += per_slot_full_wh
|
||
grid_am += 1
|
||
grid_filled_wh += cum
|
||
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
|
||
if per_slot_full_wh > 0:
|
||
cap_pm = max(
|
||
cap_pm,
|
||
min(
|
||
_MAX_GRID_CHARGE_CAP,
|
||
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
|
||
),
|
||
)
|
||
pm_candidates = [
|
||
(t, getattr(slots[t], "is_predicted_price", False))
|
||
for t in range(len(slots))
|
||
if _prague_hour(slots[t]) >= 12
|
||
]
|
||
pm_candidates.sort(
|
||
key=lambda x: (
|
||
_grid_sort_key(x[0], x[1], 0.0)[0],
|
||
_grid_sort_key(x[0], x[1], 0.0)[1],
|
||
_grid_sort_key(x[0], x[1], 0.0)[2],
|
||
x[0],
|
||
)
|
||
)
|
||
cum = 0.0
|
||
grid_pm = 0
|
||
for t, _pred in pm_candidates:
|
||
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
|
||
break
|
||
selected.add(t)
|
||
cum += per_slot_full_wh
|
||
grid_pm += 1
|
||
grid_filled_wh += cum
|
||
|
||
pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0)
|
||
pv_candidates: list[tuple[int, float, float]] = []
|
||
for t, s in enumerate(slots):
|
||
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
|
||
fso = _future_sell(slots, t)
|
||
if (
|
||
pv_surplus_w > 0
|
||
and float(s.sell_price) >= float(s.buy_price) - degrad
|
||
and (
|
||
float(s.sell_price) < 0
|
||
or float(s.sell_price) >= fso - degrad
|
||
)
|
||
):
|
||
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
|
||
|
||
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
|
||
cum = 0.0
|
||
for t, _score, pv_surplus_w in pv_candidates:
|
||
if cum >= pv_layer_cap:
|
||
break
|
||
selected.add(t)
|
||
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
|
||
|
||
return selected
|
||
|
||
|
||
def _select_discharge_export_slots(
|
||
slots: list[PlanningSlot],
|
||
battery: SimpleNamespace,
|
||
current_soc_wh: float,
|
||
charge_slots: set[int] | None = None,
|
||
*,
|
||
purchase_pricing_mode: str = "spot",
|
||
) -> set[int]:
|
||
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
|
||
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
|
||
if discharge_buf <= 0:
|
||
return set(range(len(slots)))
|
||
|
||
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
|
||
soc_max_wh = float(battery.soc_max_wh)
|
||
exportable_wh = soc_max_wh - min_soc_wh
|
||
if exportable_wh <= 0:
|
||
return set()
|
||
|
||
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
|
||
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
|
||
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
|
||
per_slot_wh = max_p_w * eta * INTERVAL_H
|
||
discharge_target_wh = exportable_wh * discharge_buf
|
||
|
||
ref_buy = min(float(s.buy_price) for s in slots)
|
||
|
||
if purchase_pricing_mode == "fixed":
|
||
sell_min = None # per-slot buy + degrad below
|
||
else:
|
||
sell_min = ref_buy + degrad
|
||
candidates = [
|
||
(t, float(slots[t].sell_price))
|
||
for t in range(len(slots))
|
||
if (
|
||
float(slots[t].sell_price) > float(slots[t].buy_price) + degrad
|
||
if purchase_pricing_mode == "fixed"
|
||
else float(slots[t].sell_price) > sell_min
|
||
)
|
||
]
|
||
candidates.sort(key=lambda x: (-x[1], -x[0]))
|
||
|
||
first_neg = next(
|
||
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
|
||
None,
|
||
)
|
||
neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None
|
||
|
||
if first_neg is not None and neg_day is not None:
|
||
filtered: list[tuple[int, float]] = []
|
||
for t, sell in candidates:
|
||
if t >= first_neg:
|
||
filtered.append((t, sell))
|
||
continue
|
||
if _prague_date(slots[t]) != neg_day:
|
||
filtered.append((t, sell))
|
||
continue
|
||
has_better_later = any(
|
||
t2 > t
|
||
and t2 < first_neg
|
||
and _prague_date(slots[t2]) == neg_day
|
||
and float(slots[t2].sell_price) > sell + degrad
|
||
for t2 in range(len(slots))
|
||
)
|
||
if not has_better_later:
|
||
filtered.append((t, sell))
|
||
candidates = filtered
|
||
|
||
selected: set[int] = set()
|
||
cum = 0.0
|
||
for t, _sell in candidates:
|
||
if cum >= discharge_target_wh or per_slot_wh <= 0:
|
||
break
|
||
selected.add(t)
|
||
cum += per_slot_wh
|
||
|
||
if first_neg is not None and neg_day is not None:
|
||
evening_by_day: dict = {}
|
||
for t, s in enumerate(slots):
|
||
d = _prague_date(s)
|
||
if _prague_hour(s) < 17:
|
||
continue
|
||
evening_by_day[d] = max(evening_by_day.get(d, 0.0), float(s.sell_price))
|
||
for t, s in enumerate(slots):
|
||
d = _prague_date(s)
|
||
peak = evening_by_day.get(d, 0.0)
|
||
if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad:
|
||
if purchase_pricing_mode == "fixed":
|
||
if float(s.sell_price) > float(s.buy_price) + degrad:
|
||
selected.add(t)
|
||
elif float(s.sell_price) > sell_min:
|
||
selected.add(t)
|
||
|
||
preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0)
|
||
if (
|
||
first_neg is not None
|
||
and first_neg > 0
|
||
and current_soc_wh >= preneg_min_soc
|
||
and neg_day is not None
|
||
):
|
||
morning_sells = [
|
||
float(slots[i].sell_price)
|
||
for i in range(first_neg)
|
||
if float(slots[i].sell_price) >= 0
|
||
and _prague_date(slots[i]) == neg_day
|
||
and 5 <= _prague_hour(slots[i]) <= 11
|
||
]
|
||
if morning_sells:
|
||
zone_peak = max(morning_sells)
|
||
for i in range(first_neg):
|
||
if (
|
||
_prague_date(slots[i]) == neg_day
|
||
and 5 <= _prague_hour(slots[i]) <= 11
|
||
and float(slots[i].sell_price) >= zone_peak - degrad
|
||
):
|
||
selected.add(i)
|
||
for i in range(first_neg):
|
||
if _prague_date(slots[i]) != neg_day:
|
||
continue
|
||
h = _prague_hour(slots[i])
|
||
if 5 <= h < 17 and float(slots[i].sell_price) < zone_peak - degrad:
|
||
selected.discard(i)
|
||
|
||
return selected
|
||
|
||
|
||
def _prague_hour(s: PlanningSlot) -> int:
|
||
dt = s.interval_start
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(_PRAGUE).hour
|
||
|
||
|
||
def _slot(
|
||
*,
|
||
buy: float,
|
||
sell: float = 1.0,
|
||
pv: int = 0,
|
||
load: int = 2_000,
|
||
hour_utc: int = 12,
|
||
predicted: bool = False,
|
||
interval_start: datetime | None = None,
|
||
) -> PlanningSlot:
|
||
if interval_start is None:
|
||
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
|
||
return PlanningSlot(
|
||
interval_start=interval_start,
|
||
buy_price=buy,
|
||
sell_price=sell,
|
||
pv_a_forecast_w=0,
|
||
pv_b_forecast_w=pv,
|
||
load_baseline_w=load,
|
||
ev1_connected=False,
|
||
ev2_connected=False,
|
||
is_predicted_price=predicted,
|
||
)
|
||
|
||
|
||
def _battery(
|
||
*,
|
||
charge_buf: float = 1.3,
|
||
discharge_buf: float = 1.5,
|
||
uc_wh: float = 64_000.0,
|
||
soc_max_pct: float = 95.0,
|
||
min_soc_pct: float = 10.0,
|
||
reserve_soc_pct: float = 20.0,
|
||
max_charge_w: float = 18_000.0,
|
||
max_discharge_w: float = 18_000.0,
|
||
charge_eff: float = 0.95,
|
||
discharge_eff: float = 0.95,
|
||
degrad: float = 0.15,
|
||
) -> SimpleNamespace:
|
||
uc = uc_wh
|
||
return SimpleNamespace(
|
||
usable_capacity_wh=uc,
|
||
min_soc_wh=min_soc_pct / 100.0 * uc,
|
||
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
|
||
soc_max_wh=soc_max_pct / 100.0 * uc,
|
||
max_charge_power_w=max_charge_w,
|
||
max_discharge_power_w=max_discharge_w,
|
||
charge_efficiency=charge_eff,
|
||
discharge_efficiency=discharge_eff,
|
||
charge_slot_buffer=charge_buf,
|
||
discharge_slot_buffer=discharge_buf,
|
||
degradation_cost_czk_kwh=degrad,
|
||
)
|
||
|
||
|
||
class SelectChargeSlotsTests(unittest.TestCase):
|
||
def test_buffer_zero_returns_all_slots(self) -> None:
|
||
slots = [_slot(buy=3.0) for _ in range(4)]
|
||
battery = _battery(charge_buf=0.0)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||
self.assertEqual(out, set(range(4)))
|
||
|
||
def test_returns_all_when_battery_is_full(self) -> None:
|
||
slots = [_slot(buy=0.1) for _ in range(3)]
|
||
battery = _battery()
|
||
out = _select_charge_slots(
|
||
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
|
||
)
|
||
self.assertEqual(out, set(range(3)))
|
||
|
||
def test_pv_surplus_high_store_score_selected(self) -> None:
|
||
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
|
||
slots = [
|
||
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
|
||
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
|
||
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
|
||
]
|
||
battery = _battery(
|
||
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
|
||
)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||
self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán")
|
||
self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5")
|
||
|
||
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
|
||
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
|
||
slots = [
|
||
_slot(buy=0.5, hour_utc=4),
|
||
_slot(buy=3.0, hour_utc=5),
|
||
_slot(buy=0.4, hour_utc=14),
|
||
_slot(buy=9.9, hour_utc=15),
|
||
]
|
||
battery = _battery(
|
||
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
|
||
)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
|
||
|
||
def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None:
|
||
"""Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy)."""
|
||
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
|
||
slots = [
|
||
_slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base),
|
||
_slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)),
|
||
_slot(
|
||
buy=5.5,
|
||
sell=3.8,
|
||
pv=0,
|
||
load=2500,
|
||
interval_start=base + timedelta(hours=7, minutes=30),
|
||
),
|
||
_slot(
|
||
buy=0.50,
|
||
sell=-0.3,
|
||
pv=2000,
|
||
load=5000,
|
||
interval_start=base + timedelta(hours=26),
|
||
),
|
||
]
|
||
battery = _battery(uc_wh=64_000.0)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh)
|
||
self.assertIn(0, out)
|
||
self.assertIn(1, out)
|
||
self.assertEqual(
|
||
min(out),
|
||
0,
|
||
"Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)",
|
||
)
|
||
|
||
def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None:
|
||
"""Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B."""
|
||
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
|
||
slots = [
|
||
_slot(
|
||
buy=0.80,
|
||
sell=-0.08,
|
||
pv=2_500,
|
||
load=3_400,
|
||
interval_start=base,
|
||
),
|
||
_slot(
|
||
buy=0.72,
|
||
sell=-0.13,
|
||
pv=500,
|
||
load=3_400,
|
||
interval_start=base + timedelta(minutes=15),
|
||
),
|
||
_slot(
|
||
buy=2.50,
|
||
sell=1.40,
|
||
pv=2_000,
|
||
load=3_800,
|
||
interval_start=base + timedelta(hours=5),
|
||
),
|
||
_slot(
|
||
buy=5.50,
|
||
sell=3.80,
|
||
pv=100,
|
||
load=2_900,
|
||
interval_start=base + timedelta(hours=9),
|
||
),
|
||
]
|
||
battery = _battery(uc_wh=64_000.0, charge_buf=1.05)
|
||
soc = 0.88 * battery.usable_capacity_wh
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
|
||
self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE")
|
||
self.assertIn(0, out)
|
||
self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty")
|
||
|
||
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
|
||
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
|
||
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
|
||
slots = [
|
||
_slot(
|
||
buy=1.49,
|
||
sell=-0.04,
|
||
pv=0,
|
||
load=3_500,
|
||
interval_start=base,
|
||
),
|
||
_slot(
|
||
buy=0.86,
|
||
sell=0.01,
|
||
pv=0,
|
||
load=3_500,
|
||
interval_start=base + timedelta(minutes=15),
|
||
),
|
||
_slot(
|
||
buy=0.86,
|
||
sell=0.01,
|
||
pv=0,
|
||
load=3_500,
|
||
interval_start=base + timedelta(minutes=30),
|
||
),
|
||
]
|
||
battery = _battery(uc_wh=64_000.0, charge_buf=1.0)
|
||
soc = 0.92 * battery.usable_capacity_wh
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
|
||
self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49")
|
||
self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány")
|
||
|
||
def test_pm_grid_gets_unused_am_wh_budget(self) -> None:
|
||
"""Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge."""
|
||
base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc)
|
||
slots = [
|
||
_slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base),
|
||
_slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)),
|
||
_slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)),
|
||
_slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)),
|
||
_slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)),
|
||
]
|
||
battery = _battery(charge_buf=1.3, uc_wh=64_000.0)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh)
|
||
pm_cheap = {2, 3}
|
||
self.assertTrue(
|
||
pm_cheap & out,
|
||
"po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu",
|
||
)
|
||
|
||
def test_ote_slots_prioritized_over_predicted(self) -> None:
|
||
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
|
||
slots = [
|
||
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
|
||
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
|
||
]
|
||
battery = _battery(
|
||
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
|
||
)
|
||
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
|
||
self.assertIn(0, out)
|
||
self.assertNotIn(1, out)
|
||
|
||
|
||
class SelectDischargeExportSlotsTests(unittest.TestCase):
|
||
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
|
||
slots = [
|
||
_slot(buy=0.50, sell=-0.30, hour_utc=6),
|
||
_slot(buy=0.51, sell=-0.29, hour_utc=7),
|
||
_slot(buy=5.60, sell=3.30, hour_utc=16),
|
||
_slot(buy=5.98, sell=3.37, hour_utc=17),
|
||
]
|
||
battery = _battery(uc_wh=64_000.0)
|
||
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
|
||
discharge = _select_discharge_export_slots(
|
||
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
|
||
)
|
||
self.assertIn(2, discharge)
|
||
self.assertIn(3, discharge)
|
||
|
||
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
|
||
slots = [
|
||
_slot(buy=0.40, sell=0.10, hour_utc=8),
|
||
_slot(buy=4.00, sell=0.50, hour_utc=18),
|
||
]
|
||
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
|
||
discharge = _select_discharge_export_slots(
|
||
slots, battery, current_soc_wh=0.0
|
||
)
|
||
self.assertNotIn(1, discharge)
|
||
|
||
|
||
class FixedPurchasePricingTests(unittest.TestCase):
|
||
def test_fixed_skips_grid_charge_when_no_sell_arbitrage(self) -> None:
|
||
"""Fixní buy bez výkupu nad buy+degrad → žádné grid nabíjení."""
|
||
slots = [
|
||
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
|
||
_slot(buy=6.35, sell=3.5, hour_utc=18, load=500),
|
||
]
|
||
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
|
||
out = _select_charge_slots(
|
||
slots,
|
||
battery,
|
||
current_soc_wh=0.4 * battery.usable_capacity_wh,
|
||
purchase_pricing_mode="fixed",
|
||
)
|
||
self.assertEqual(out, set())
|
||
|
||
def test_fixed_grid_charge_before_evening_export(self) -> None:
|
||
"""BA81: konstantní buy, večerní sell > buy+degrad → NT/AM grid sloty."""
|
||
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
|
||
slots: list[PlanningSlot] = []
|
||
for i in range(96):
|
||
t = base + timedelta(minutes=15 * i)
|
||
sell = 3.75 if t.hour >= 18 else 2.8
|
||
slots.append(
|
||
_slot(
|
||
buy=3.088,
|
||
sell=sell,
|
||
load=200,
|
||
interval_start=t.astimezone(timezone.utc),
|
||
)
|
||
)
|
||
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
|
||
out = _select_charge_slots(
|
||
slots,
|
||
battery,
|
||
current_soc_wh=0.33 * battery.usable_capacity_wh,
|
||
purchase_pricing_mode="fixed",
|
||
)
|
||
night = {t for t in out if _prague_hour(slots[t]) < 8}
|
||
self.assertGreater(len(night), 0, "očekáváno grid nabíjení v noci před večerním výkupem")
|
||
|
||
def test_fixed_nt_charge_after_yesterday_evening_peak(self) -> None:
|
||
"""Včerejší večerní peak nesmí zablokovat dnešní 00–06 grid (per-day export okno)."""
|
||
base = datetime(2026, 5, 23, 20, 0, tzinfo=_PRAGUE)
|
||
slots: list[PlanningSlot] = []
|
||
for i in range(64):
|
||
t = base + timedelta(minutes=15 * i)
|
||
sell = 3.8 if t.hour >= 20 and t.date() == date(2026, 5, 23) else 2.9
|
||
if t.date() == date(2026, 5, 24) and t.hour >= 19:
|
||
sell = 3.7
|
||
slots.append(
|
||
_slot(
|
||
buy=3.088,
|
||
sell=sell,
|
||
load=200,
|
||
interval_start=t.astimezone(timezone.utc),
|
||
)
|
||
)
|
||
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
|
||
out = _select_charge_slots(
|
||
slots,
|
||
battery,
|
||
current_soc_wh=0.33 * battery.usable_capacity_wh,
|
||
purchase_pricing_mode="fixed",
|
||
)
|
||
may24_night = {
|
||
t
|
||
for t in out
|
||
if _prague_date(slots[t]) == date(2026, 5, 24)
|
||
and _prague_hour(slots[t]) < 6
|
||
}
|
||
self.assertGreater(len(may24_night), 0)
|
||
|
||
def test_fixed_allows_discharge_on_high_sell(self) -> None:
|
||
slots = [
|
||
_slot(buy=3.09, sell=1.0, hour_utc=10),
|
||
_slot(buy=3.09, sell=3.8, hour_utc=18),
|
||
_slot(buy=3.09, sell=3.5, hour_utc=19),
|
||
]
|
||
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
|
||
discharge = _select_discharge_export_slots(
|
||
slots,
|
||
battery,
|
||
current_soc_wh=0.5 * battery.usable_capacity_wh,
|
||
purchase_pricing_mode="fixed",
|
||
)
|
||
self.assertIn(1, discharge)
|
||
self.assertIn(2, discharge, "oba sloty sell > buy + degrad")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|