Files
ems/backend/tests/test_planning_charge_slot_selection.py
Dusan Vojacek 9ba65ea6bb
Some checks failed
CI and deploy / migration-check (push) Failing after 14s
CI and deploy / deploy (push) Has been skipped
a dalsi fix
2026-05-25 00:10:58 +02:00

1135 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pre-selection nabíjecích a exportních slotů referenční Python.
Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG.
Charge mask:
B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC).
A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell degrad.
Discharge-export mask:
ref_buy = min(buy) celého horizontu.
Top sloty dle sell_price desc kde sell > ref_buy + degradation.
"""
from __future__ import annotations
import unittest
from datetime import date, datetime, timezone, timedelta
from types import SimpleNamespace
from zoneinfo import ZoneInfo
from services.planning_engine import INTERVAL_H, PlanningSlot
_PRAGUE = ZoneInfo("Europe/Prague")
_LOOKAHEAD_SLOTS = 4
_BUY_LOOKAHEAD_EPS = 0.05
_BUY_CHARGE_BAND = 0.40
_MAX_GRID_CHARGE_CAP = 24
def _prague_date(s: PlanningSlot) -> date:
return s.interval_start.astimezone(_PRAGUE).date()
def _export_window_start_by_day(
slots: list[PlanningSlot], degrad: float
) -> dict[date, datetime]:
"""Kopie R__063: první sell > min(buy) téhož kalendářního dne (Prague) + degrad."""
out: dict[date, datetime] = {}
for s in slots:
day = _prague_date(s)
day_min = min(float(x.buy_price) for x in slots if _prague_date(x) == day)
if float(s.sell_price) > day_min + degrad:
prev = out.get(day)
if prev is None or s.interval_start < prev:
out[day] = s.interval_start
return out
def _before_day_export(
slots: list[PlanningSlot], t: int, export_by_day: dict[date, datetime]
) -> bool:
start = export_by_day.get(_prague_date(slots[t]))
return start is not None and slots[t].interval_start < start
def _future_sell(slots: list[PlanningSlot], t: int) -> float:
tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))]
return max(tail) if tail else float(slots[t].sell_price)
def _buy_min_next_n(
slots: list[PlanningSlot],
t: int,
n: int = _LOOKAHEAD_SLOTS,
*,
export_by_day: dict[date, datetime] | None = None,
) -> float | None:
tail: list[float] = []
for i in range(t + 1, min(t + 1 + n, len(slots))):
day_start = export_by_day.get(_prague_date(slots[i])) if export_by_day else None
if day_start is None or slots[i].interval_start < day_start:
tail.append(float(slots[i].buy_price))
return min(tail) if tail else None
def _pv_surplus_w(s: PlanningSlot) -> int:
return max(0, int(s.pv_a_forecast_w) + int(s.pv_b_forecast_w) - int(s.load_baseline_w))
def _first_neg_sell_ord(slots: list[PlanningSlot]) -> int | None:
for i, s in enumerate(slots):
if float(s.sell_price) < 0:
return i
return None
def _apply_dynamic_grid_filter(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
grid_slots: set[int],
) -> set[int]:
"""Self-konzistentni filtr vrstvy B (kopie R__063 A2)."""
if not grid_slots:
return grid_slots
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
first_neg = _first_neg_sell_ord(slots)
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
soc_max = float(battery.soc_max_wh)
deficit = max(0.0, soc_max - float(current_soc_wh))
threshold = deficit * 0.6
t_len = len(slots)
pv_ahead = [0.0] * t_len
neg_ahead = [0.0] * t_len
for t in range(t_len):
pv_sum = 0.0
neg_sum = 0.0
for w2 in range(t, t_len):
if first_neg is not None and w2 >= first_neg:
break
s2 = slots[w2]
pv_s = _pv_surplus_w(s2)
if pv_s > 0 and (float(s2.sell_price) < 0 or float(s2.buy_price) < 0):
pv_sum += min(pv_s, max_p_w) * eta * INTERVAL_H
if float(s2.buy_price) < 0:
neg_sum += per_slot_wh
pv_ahead[t] = min(pv_sum, deficit)
neg_ahead[t] = neg_sum
slots[t].pv_charge_wh_ahead = pv_ahead[t]
slots[t].neg_buy_wh_ahead = neg_ahead[t]
remaining = set(grid_slots)
acq_prev = -999.0
for _ in range(5):
if not remaining:
break
total_wh = len(remaining) * per_slot_wh
pos_remaining = [t for t in remaining if float(slots[t].buy_price) >= 0]
total_wh_pos = len(pos_remaining) * per_slot_wh
if total_wh_pos <= 0:
acq = min(float(s.buy_price) for s in slots if float(s.buy_price) >= 0)
else:
acq = sum(float(slots[t].buy_price) * per_slot_wh for t in pos_remaining) / total_wh_pos
if abs(acq - acq_prev) < 0.05:
break
acq_prev = acq
to_remove: set[int] = set()
for t in list(remaining):
s = slots[t]
if float(s.buy_price) < 0:
continue
if float(s.buy_price) > acq - degrad and pv_ahead[t] + neg_ahead[t] >= threshold:
to_remove.add(t)
slots[t].grid_charge_suppressed_reason = (
"cheaper_pv_ahead"
if pv_ahead[t] >= neg_ahead[t]
else "cheaper_neg_buy_ahead"
)
if not to_remove:
break
remaining -= to_remove
cum_allowed = len(remaining) * per_slot_wh
pv0 = pv_ahead[0] if t_len else 0.0
target_deficit = deficit - pv0
if cum_allowed < target_deficit * 0.6:
suppressed = sorted(
[
t
for t in grid_slots
if t not in remaining and slots[t].grid_charge_suppressed_reason
],
key=lambda t: (float(slots[t].buy_price), t),
)
for t in suppressed:
if float(slots[t].buy_price) >= 2 * acq_prev:
break
remaining.add(t)
slots[t].grid_charge_suppressed_reason = "safety_failsafe_unlock"
cum_allowed += per_slot_wh
if cum_allowed >= target_deficit * 0.6:
break
return remaining
def _store_score(slots: list[PlanningSlot], t: int) -> float:
s = slots[t]
buy = float(s.buy_price)
sell = float(s.sell_price)
fso = _future_sell(slots, t)
return fso - sell - max(0.0, buy - sell)
def _select_charge_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
*,
purchase_pricing_mode: str = "spot",
apply_dynamic_grid_filter: bool = True,
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (charge mask)."""
charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0)
if charge_buf <= 0:
return set(range(len(slots)))
energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh)
if energy_to_fill <= 0:
return set(range(len(slots)))
reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0)
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
ref_buy_am = min(
(float(s.buy_price) for s in slots if _prague_hour(s) < 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_pm = min(
(float(s.buy_price) for s in slots if _prague_hour(s) >= 12),
default=min(float(s.buy_price) for s in slots),
)
ref_buy_global = min(float(s.buy_price) for s in slots)
export_by_day = _export_window_start_by_day(slots, degrad)
plan_day = _prague_date(slots[0])
eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0)
per_slot_full_wh = max_p_w * eta * INTERVAL_H
soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0)
if charge_buf > 0:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(soc_max_wh - float(current_soc_wh), 0.0),
)
elif current_soc_wh >= reserve_wh:
charge_target_wh = max(energy_to_fill, 0.0)
else:
charge_target_wh = min(
max(energy_to_fill, 0.0) * charge_buf,
max(energy_to_fill, 0.0),
)
n_am = sum(1 for s in slots if _prague_hour(s) < 12)
n_pm = len(slots) - n_am
if n_am <= 0:
chg_am = 0.0
chg_pm = charge_target_wh
elif n_pm <= 0:
chg_am = charge_target_wh
chg_pm = 0.0
else:
chg_am = charge_target_wh / 2.0
chg_pm = charge_target_wh - chg_am
selected: set[int] = set()
grid_selected: set[int] = set()
grid_filled_wh = 0.0
buf_mult = charge_buf if charge_buf > 0 else 1.0
cap_am = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_am / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
cap_pm = (
max(
1,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
if per_slot_full_wh > 0
else 6
)
def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]:
today_first = 0 if _prague_date(slots[t]) == plan_day else 1
before_export = 0 if _before_day_export(slots, t, export_by_day) else 1
return (today_first, before_export, int(pred), price, t)
if purchase_pricing_mode != "fixed":
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) < 12
]
am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_am = 0
for t, _pred, _price in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
grid_selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
if per_slot_full_wh > 0:
cap_pm = max(
cap_pm,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price))
for t in range(len(slots))
if _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2]))
cum = 0.0
grid_pm = 0
for t, _pred, _price in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
grid_selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
for t, s in enumerate(slots):
if float(s.buy_price) < 0:
selected.add(t)
grid_selected.add(t)
if apply_dynamic_grid_filter:
filtered_grid = _apply_dynamic_grid_filter(
slots, battery, current_soc_wh, grid_selected
)
for t in grid_selected - filtered_grid:
selected.discard(t)
elif purchase_pricing_mode == "fixed" and any(
float(s.sell_price) > float(s.buy_price) + degrad for s in slots
):
am_candidates = [
(t, getattr(slots[t], "is_predicted_price", False))
for t in range(len(slots))
if _prague_hour(slots[t]) < 12
]
am_candidates.sort(
key=lambda x: (
_grid_sort_key(x[0], x[1], 0.0)[0],
_grid_sort_key(x[0], x[1], 0.0)[1],
_grid_sort_key(x[0], x[1], 0.0)[2],
x[0],
)
)
cum = 0.0
grid_am = 0
for t, _pred in am_candidates:
if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am:
break
selected.add(t)
cum += per_slot_full_wh
grid_am += 1
grid_filled_wh += cum
chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh)
if per_slot_full_wh > 0:
cap_pm = max(
cap_pm,
min(
_MAX_GRID_CHARGE_CAP,
int(chg_pm / per_slot_full_wh * buf_mult) + 1,
),
)
pm_candidates = [
(t, getattr(slots[t], "is_predicted_price", False))
for t in range(len(slots))
if _prague_hour(slots[t]) >= 12
]
pm_candidates.sort(
key=lambda x: (
_grid_sort_key(x[0], x[1], 0.0)[0],
_grid_sort_key(x[0], x[1], 0.0)[1],
_grid_sort_key(x[0], x[1], 0.0)[2],
x[0],
)
)
cum = 0.0
grid_pm = 0
for t, _pred in pm_candidates:
if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm:
break
selected.add(t)
cum += per_slot_full_wh
grid_pm += 1
grid_filled_wh += cum
pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0)
pv_candidates: list[tuple[int, float, float]] = []
for t, s in enumerate(slots):
pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w)
fso = _future_sell(slots, t)
if (
pv_surplus_w > 0
and float(s.sell_price) >= float(s.buy_price) - degrad
and (
float(s.sell_price) < 0
or float(s.sell_price) >= fso - degrad
)
):
pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w)))
pv_candidates.sort(key=lambda x: (-x[1], x[0]))
cum = 0.0
for t, _score, pv_surplus_w in pv_candidates:
if cum >= pv_layer_cap:
break
selected.add(t)
cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H
return selected
def _select_discharge_export_slots(
slots: list[PlanningSlot],
battery: SimpleNamespace,
current_soc_wh: float,
charge_slots: set[int] | None = None,
*,
purchase_pricing_mode: str = "spot",
) -> set[int]:
"""Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask)."""
discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0)
if discharge_buf <= 0:
return set(range(len(slots)))
min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0)
soc_max_wh = float(battery.soc_max_wh)
exportable_wh = soc_max_wh - min_soc_wh
if exportable_wh <= 0:
return set()
degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15)
eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0)
max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0)
per_slot_wh = max_p_w * eta * INTERVAL_H
discharge_target_wh = exportable_wh * discharge_buf
ref_buy = min(float(s.buy_price) for s in slots)
if purchase_pricing_mode == "fixed":
sell_min = None # per-slot buy + degrad below
else:
sell_min = ref_buy + degrad
candidates = [
(t, float(slots[t].sell_price))
for t in range(len(slots))
if (
float(slots[t].sell_price) > float(slots[t].buy_price) + degrad
if purchase_pricing_mode == "fixed"
else float(slots[t].sell_price) > sell_min
)
]
candidates.sort(key=lambda x: (-x[1], -x[0]))
first_neg = next(
(i for i, s in enumerate(slots) if float(s.sell_price) < 0),
None,
)
neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None
if first_neg is not None and neg_day is not None:
filtered: list[tuple[int, float]] = []
for t, sell in candidates:
if t >= first_neg:
filtered.append((t, sell))
continue
if _prague_date(slots[t]) != neg_day:
filtered.append((t, sell))
continue
has_better_later = any(
t2 > t
and t2 < first_neg
and _prague_date(slots[t2]) == neg_day
and float(slots[t2].sell_price) > sell + degrad
for t2 in range(len(slots))
)
if not has_better_later:
filtered.append((t, sell))
candidates = filtered
selected: set[int] = set()
cum = 0.0
for t, _sell in candidates:
if cum >= discharge_target_wh or per_slot_wh <= 0:
break
selected.add(t)
cum += per_slot_wh
if first_neg is not None and neg_day is not None:
evening_by_day: dict = {}
for t, s in enumerate(slots):
d = _prague_date(s)
if _prague_hour(s) < 17:
continue
evening_by_day[d] = max(evening_by_day.get(d, 0.0), float(s.sell_price))
for t, s in enumerate(slots):
d = _prague_date(s)
peak = evening_by_day.get(d, 0.0)
if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad:
if purchase_pricing_mode == "fixed":
if float(s.sell_price) > float(s.buy_price) + degrad:
selected.add(t)
elif float(s.sell_price) > sell_min:
selected.add(t)
preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0)
if (
first_neg is not None
and first_neg > 0
and current_soc_wh >= preneg_min_soc
and neg_day is not None
):
morning_sells = [
float(slots[i].sell_price)
for i in range(first_neg)
if float(slots[i].sell_price) >= 0
and _prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
]
if morning_sells:
zone_peak = max(morning_sells)
for i in range(first_neg):
if (
_prague_date(slots[i]) == neg_day
and 5 <= _prague_hour(slots[i]) <= 11
and float(slots[i].sell_price) >= zone_peak - degrad
):
selected.add(i)
for i in range(first_neg):
if _prague_date(slots[i]) != neg_day:
continue
h = _prague_hour(slots[i])
if 5 <= h < 17 and float(slots[i].sell_price) < zone_peak - degrad:
selected.discard(i)
return selected
def _prague_hour(s: PlanningSlot) -> int:
dt = s.interval_start
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_PRAGUE).hour
def _slot(
*,
buy: float,
sell: float = 1.0,
pv: int = 0,
load: int = 2_000,
hour_utc: int = 12,
predicted: bool = False,
interval_start: datetime | None = None,
) -> PlanningSlot:
if interval_start is None:
interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc)
return PlanningSlot(
interval_start=interval_start,
buy_price=buy,
sell_price=sell,
pv_a_forecast_w=0,
pv_b_forecast_w=pv,
load_baseline_w=load,
ev1_connected=False,
ev2_connected=False,
is_predicted_price=predicted,
)
def _battery(
*,
charge_buf: float = 1.3,
discharge_buf: float = 1.5,
uc_wh: float = 64_000.0,
soc_max_pct: float = 95.0,
min_soc_pct: float = 10.0,
reserve_soc_pct: float = 20.0,
max_charge_w: float = 18_000.0,
max_discharge_w: float = 18_000.0,
charge_eff: float = 0.95,
discharge_eff: float = 0.95,
degrad: float = 0.15,
) -> SimpleNamespace:
uc = uc_wh
return SimpleNamespace(
usable_capacity_wh=uc,
min_soc_wh=min_soc_pct / 100.0 * uc,
reserve_soc_wh=reserve_soc_pct / 100.0 * uc,
soc_max_wh=soc_max_pct / 100.0 * uc,
max_charge_power_w=max_charge_w,
max_discharge_power_w=max_discharge_w,
charge_efficiency=charge_eff,
discharge_efficiency=discharge_eff,
charge_slot_buffer=charge_buf,
discharge_slot_buffer=discharge_buf,
degradation_cost_czk_kwh=degrad,
)
class SelectChargeSlotsTests(unittest.TestCase):
def test_buffer_zero_returns_all_slots(self) -> None:
slots = [_slot(buy=3.0) for _ in range(4)]
battery = _battery(charge_buf=0.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertEqual(out, set(range(4)))
def test_returns_all_when_battery_is_full(self) -> None:
slots = [_slot(buy=0.1) for _ in range(3)]
battery = _battery()
out = _select_charge_slots(
slots, battery, current_soc_wh=battery.soc_max_wh + 1.0
)
self.assertEqual(out, set(range(3)))
def test_pv_surplus_high_store_score_selected(self) -> None:
"""Slot s vyšším store_score (lepší uložení vs export) má přednost."""
slots = [
_slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8),
_slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9),
_slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10),
]
battery = _battery(
charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán")
self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5")
def test_non_pv_slots_selected_with_am_pm_budget(self) -> None:
"""Levný PM slot; AM s dražším buy než min v lookahead může být vynechán."""
slots = [
_slot(buy=0.5, hour_utc=4),
_slot(buy=3.0, hour_utc=5),
_slot(buy=0.4, hour_utc=14),
_slot(buy=9.9, hour_utc=15),
]
battery = _battery(
charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán")
def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None:
"""Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy)."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base),
_slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)),
_slot(
buy=5.5,
sell=3.8,
pv=0,
load=2500,
interval_start=base + timedelta(hours=7, minutes=30),
),
_slot(
buy=0.50,
sell=-0.3,
pv=2000,
load=5000,
interval_start=base + timedelta(hours=26),
),
]
battery = _battery(uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh)
self.assertIn(0, out)
self.assertIn(1, out)
self.assertEqual(
min(out),
0,
"Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)",
)
def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None:
"""Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B."""
base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc)
slots = [
_slot(
buy=0.80,
sell=-0.08,
pv=2_500,
load=3_400,
interval_start=base,
),
_slot(
buy=0.72,
sell=-0.13,
pv=500,
load=3_400,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=2.50,
sell=1.40,
pv=2_000,
load=3_800,
interval_start=base + timedelta(hours=5),
),
_slot(
buy=5.50,
sell=3.80,
pv=100,
load=2_900,
interval_start=base + timedelta(hours=9),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.05)
soc = 0.88 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE")
self.assertIn(0, out)
self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty")
def test_vt_before_nt_skips_expensive_pm_slot(self) -> None:
"""Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45."""
base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc)
slots = [
_slot(
buy=1.49,
sell=-0.04,
pv=0,
load=3_500,
interval_start=base,
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=15),
),
_slot(
buy=0.86,
sell=0.01,
pv=0,
load=3_500,
interval_start=base + timedelta(minutes=30),
),
]
battery = _battery(uc_wh=64_000.0, charge_buf=1.0)
soc = 0.92 * battery.usable_capacity_wh
out = _select_charge_slots(slots, battery, current_soc_wh=soc)
self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49")
self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány")
def test_pm_grid_gets_unused_am_wh_budget(self) -> None:
"""Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge."""
base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc)
slots = [
_slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base),
_slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)),
_slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)),
_slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)),
_slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)),
]
battery = _battery(charge_buf=1.3, uc_wh=64_000.0)
out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh)
pm_cheap = {2, 3}
self.assertTrue(
pm_cheap & out,
"po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu",
)
def test_ote_slots_prioritized_over_predicted(self) -> None:
"""Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným."""
slots = [
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False),
_slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True),
]
battery = _battery(
charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0
)
out = _select_charge_slots(slots, battery, current_soc_wh=0.0)
self.assertIn(0, out)
self.assertNotIn(1, out)
class SelectDischargeExportSlotsTests(unittest.TestCase):
def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None:
slots = [
_slot(buy=0.50, sell=-0.30, hour_utc=6),
_slot(buy=0.51, sell=-0.29, hour_utc=7),
_slot(buy=5.60, sell=3.30, hour_utc=16),
_slot(buy=5.98, sell=3.37, hour_utc=17),
]
battery = _battery(uc_wh=64_000.0)
charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge
)
self.assertIn(2, discharge)
self.assertIn(3, discharge)
def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None:
slots = [
_slot(buy=0.40, sell=0.10, hour_utc=8),
_slot(buy=4.00, sell=0.50, hour_utc=18),
]
battery = _battery(uc_wh=10_000.0, discharge_buf=2.0)
discharge = _select_discharge_export_slots(
slots, battery, current_soc_wh=0.0
)
self.assertNotIn(1, discharge)
class DynamicGridFilterTests(unittest.TestCase):
def _range(self, start_h: int, end_h: int, **kwargs) -> list[PlanningSlot]:
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
out: list[PlanningSlot] = []
for h in range(start_h, end_h):
for minute in (0, 15, 30, 45):
t = base.replace(hour=h, minute=minute).astimezone(timezone.utc)
out.append(
_slot(
interval_start=t,
hour_utc=t.hour,
buy=kwargs.get("buy", 4.5),
sell=kwargs.get("sell", 2.0),
pv=kwargs.get("pv_b", 0),
load=kwargs.get("load", 500),
)
)
if "pv_a" in kwargs:
out[-1] = PlanningSlot(
interval_start=t,
buy_price=float(kwargs.get("buy", 4.5)),
sell_price=float(kwargs.get("sell", 2.0)),
pv_a_forecast_w=int(kwargs["pv_a"]),
pv_b_forecast_w=int(kwargs.get("pv_b", 0)),
load_baseline_w=int(kwargs.get("load", 500)),
ev1_connected=False,
ev2_connected=False,
is_predicted_price=False,
)
return out
def _home01_battery(self) -> SimpleNamespace:
return _battery(charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0)
def _uniform_buy_slots(self, buy: float, n: int = 96) -> list[PlanningSlot]:
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
return [
_slot(
buy=buy,
sell=2.0,
load=500,
interval_start=(base + timedelta(minutes=15 * i)).astimezone(timezone.utc),
)
for i in range(n)
]
def test_home01_night_charge_before_neg_sell_pv_day(self) -> None:
slots = [
*self._range(0, 5, buy=4.7, sell=2.9),
*self._range(5, 7, buy=5.0, sell=3.0, pv_b=400),
*self._range(7, 11, buy=4.5, sell=2.8, pv_a=3000, pv_b=2000),
*self._range(11, 14, buy=0.5, sell=-0.4, pv_a=6000, pv_b=5000),
*self._range(14, 17, buy=1.0, sell=-0.3, pv_a=5000, pv_b=4000),
*self._range(17, 19, buy=4.5, sell=3.0),
*self._range(19, 22, buy=6.5, sell=4.0),
*self._range(22, 24, buy=4.8, sell=3.0),
]
selected = _select_charge_slots(
slots, self._home01_battery(), current_soc_wh=30_000.0
)
for i in range(88, 96):
self.assertNotIn(i, selected, f"slot {i} (noc) nema byt allow_grid_charge")
self.assertTrue(any(i in selected for i in range(44, 56)))
def test_cloudy_day_no_pv_grid_unlock(self) -> None:
slots = self._uniform_buy_slots(4.5 + 0.1)
selected = _select_charge_slots(
slots, self._home01_battery(), current_soc_wh=30_000.0
)
self.assertGreater(len(selected), 4, "failsafe musel uvolnit nejake sloty")
def test_ba81_fixed_tariff_mask_unchanged(self) -> None:
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
slots_fixed = self._uniform_buy_slots(buy=3.5)
selected_v1 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
apply_dynamic_grid_filter=False,
)
selected_v2 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
)
self.assertEqual(selected_v1, selected_v2)
def test_kv1_block_export_unchanged(self) -> None:
"""KV1: filtr vrstvy B beze zmeny u fixed tarifu (stejne jako BA81)."""
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
slots_fixed = self._uniform_buy_slots(buy=3.5)
selected_v1 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
apply_dynamic_grid_filter=False,
)
selected_v2 = _select_charge_slots(
slots_fixed,
battery,
5000.0,
purchase_pricing_mode="fixed",
)
self.assertEqual(selected_v1, selected_v2)
class FixedPurchasePricingTests(unittest.TestCase):
def test_fixed_skips_grid_charge_when_no_sell_arbitrage(self) -> None:
"""Fixní buy bez výkupu nad buy+degrad → žádné grid nabíjení."""
slots = [
_slot(buy=6.35, sell=2.0, hour_utc=14, load=500),
_slot(buy=6.35, sell=3.5, hour_utc=18, load=500),
]
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.4 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertEqual(out, set())
def test_fixed_grid_charge_before_evening_export(self) -> None:
"""BA81: konstantní buy, večerní sell > buy+degrad → NT/AM grid sloty."""
base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE)
slots: list[PlanningSlot] = []
for i in range(96):
t = base + timedelta(minutes=15 * i)
sell = 3.75 if t.hour >= 18 else 2.8
slots.append(
_slot(
buy=3.088,
sell=sell,
load=200,
interval_start=t.astimezone(timezone.utc),
)
)
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.33 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
night = {t for t in out if _prague_hour(slots[t]) < 8}
self.assertGreater(len(night), 0, "očekáváno grid nabíjení v noci před večerním výkupem")
def test_fixed_nt_charge_after_yesterday_evening_peak(self) -> None:
"""Včerejší večerní peak nesmí zablokovat dnešní 0006 grid (per-day export okno)."""
base = datetime(2026, 5, 23, 20, 0, tzinfo=_PRAGUE)
slots: list[PlanningSlot] = []
for i in range(64):
t = base + timedelta(minutes=15 * i)
sell = 3.8 if t.hour >= 20 and t.date() == date(2026, 5, 23) else 2.9
if t.date() == date(2026, 5, 24) and t.hour >= 19:
sell = 3.7
slots.append(
_slot(
buy=3.088,
sell=sell,
load=200,
interval_start=t.astimezone(timezone.utc),
)
)
battery = _battery(charge_buf=1.3, uc_wh=12_500.0)
out = _select_charge_slots(
slots,
battery,
current_soc_wh=0.33 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
may24_night = {
t
for t in out
if _prague_date(slots[t]) == date(2026, 5, 24)
and _prague_hour(slots[t]) < 6
}
self.assertGreater(len(may24_night), 0)
def test_fixed_allows_discharge_on_high_sell(self) -> None:
slots = [
_slot(buy=3.09, sell=1.0, hour_utc=10),
_slot(buy=3.09, sell=3.8, hour_utc=18),
_slot(buy=3.09, sell=3.5, hour_utc=19),
]
battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3)
discharge = _select_discharge_export_slots(
slots,
battery,
current_soc_wh=0.5 * battery.usable_capacity_wh,
purchase_pricing_mode="fixed",
)
self.assertIn(1, discharge)
self.assertIn(2, discharge, "oba sloty sell > buy + degrad")
class NegSellReservationTests(unittest.TestCase):
"""Mirror SQL `R__063` PV vrstva A — rezervace `v_pv_layer_cap_wh` pro `sell<0` okno.
Cíl: do prvního `sell<0` slotu dorazit s SoC = soc_max min(neg_window_pv, available_storage),
aby `sell<0` PV mohlo doplnit zbytek bez exportu / curtail pole A.
"""
@staticmethod
def _pv_layer_cap_with_reservation(
slots, # list of dict { sell, pv_surplus_w }
energy_to_fill_wh: float,
grid_filled_wh: float,
max_charge_w: float,
charge_eff: float,
) -> float:
cap = max(energy_to_fill_wh - grid_filled_wh, 0.0)
neg_window = sum(
min(float(s["pv_surplus_w"]), max_charge_w) * charge_eff * 0.25
for s in slots
if float(s["sell"]) < 0 and float(s["pv_surplus_w"]) > 0
)
return max(cap - neg_window, 0.0)
def test_neg_sell_window_reduces_pv_layer_cap(self) -> None:
# 4 ranní PV sloty (sell>=0), 2 odpolední neg-sell PV sloty
slots = [
{"sell": 1.2, "pv_surplus_w": 6000}, # 10:00 cap candidate
{"sell": 1.0, "pv_surplus_w": 7000}, # 11:00
{"sell": 0.8, "pv_surplus_w": 8000}, # 12:00
{"sell": -0.5, "pv_surplus_w": 9000}, # 13:00 neg-sell
{"sell": -1.0, "pv_surplus_w": 9000}, # 13:30 neg-sell
]
cap_before = 20_000.0 # 20 kWh deficit
cap_after = self._pv_layer_cap_with_reservation(
slots,
energy_to_fill_wh=cap_before,
grid_filled_wh=0.0,
max_charge_w=10_000.0,
charge_eff=0.95,
)
# neg-window = 2 × min(9000, 10000) × 0.95 × 0.25 = 4 275
self.assertAlmostEqual(cap_after, cap_before - 4275.0, places=1)
self.assertLess(cap_after, cap_before)
def test_neg_sell_pv_covers_full_deficit(self) -> None:
slots = [
{"sell": 1.0, "pv_surplus_w": 5000}, # ranní
{"sell": -0.4, "pv_surplus_w": 9000},
{"sell": -1.1, "pv_surplus_w": 9000},
{"sell": -0.8, "pv_surplus_w": 9000},
]
# neg-window = 3 × 9000 × 0.95 × 0.25 = 6 412.5 Wh
cap_after = self._pv_layer_cap_with_reservation(
slots,
energy_to_fill_wh=5_000.0,
grid_filled_wh=0.0,
max_charge_w=10_000.0,
charge_eff=0.95,
)
# cap (5000) - neg_window (6412.5) → clamp na 0 → žádné ranní PV nabíjení
self.assertEqual(cap_after, 0.0)
def test_no_neg_sell_window_no_change(self) -> None:
slots = [
{"sell": 1.2, "pv_surplus_w": 6000},
{"sell": 0.8, "pv_surplus_w": 7000},
]
cap_before = 8_000.0
cap_after = self._pv_layer_cap_with_reservation(
slots,
energy_to_fill_wh=cap_before,
grid_filled_wh=0.0,
max_charge_w=10_000.0,
charge_eff=0.95,
)
self.assertEqual(cap_after, cap_before)
class FallbackAcquisitionTests(unittest.TestCase):
"""Mirror SQL `R__063` fallback when `v_est_grid_wh = 0` and no positive buy grid slot."""
@staticmethod
def _fallback_acq(slots, est_grid_wh: float, est_grid_cost: float) -> float:
if est_grid_wh > 0:
return est_grid_cost / est_grid_wh
positive_buys = [float(s["buy"]) for s in slots if float(s["buy"]) >= 0]
v = min(positive_buys) if positive_buys else 0.0
return max(v, 0.0)
def test_no_grid_charging_but_horizon_has_positive_buys(self) -> None:
# PM má negativní buy (13:0014:00), ale jinde v horizontu pozitivní min
slots = [
{"buy": 4.5, "hour": 23},
{"buy": 0.3, "hour": 7},
{"buy": 0.7, "hour": 10},
{"buy": -0.36, "hour": 13},
{"buy": -0.43, "hour": 14},
]
acq = self._fallback_acq(slots, est_grid_wh=0.0, est_grid_cost=0.0)
self.assertAlmostEqual(acq, 0.3, places=4)
self.assertGreaterEqual(acq, 0.0)
def test_all_negative_buys_clamps_to_zero(self) -> None:
slots = [
{"buy": -0.4, "hour": 13},
{"buy": -0.5, "hour": 14},
]
acq = self._fallback_acq(slots, est_grid_wh=0.0, est_grid_cost=0.0)
self.assertEqual(acq, 0.0)
def test_positive_weighted_mean_unchanged(self) -> None:
# est_grid_wh > 0 — vrátit weighted mean, ne fallback
acq = self._fallback_acq(
[{"buy": 0.7, "hour": 10}],
est_grid_wh=4000.0,
est_grid_cost=4000.0 * 0.7,
)
self.assertAlmostEqual(acq, 0.7, places=4)
if __name__ == "__main__":
unittest.main()