"""Pre-selection nabíjecích a exportních slotů – referenční Python. Logika je v DB: ems.fn_load_planning_slots_full. Kopie algoritmu pro unit testy bez PG. Charge mask: B) Grid AM/PM: nejlevnější sloty do Wh rozpočtu (den plánu → před exportním oknem → buy ASC). A) PV-surplus: store_score DESC; jen pokud sell ≥ future_sell − degrad. Discharge-export mask: ref_buy = min(buy) celého horizontu. Top sloty dle sell_price desc kde sell > ref_buy + degradation. """ from __future__ import annotations import unittest from datetime import date, datetime, timezone, timedelta from types import SimpleNamespace from zoneinfo import ZoneInfo from services.planning_engine import INTERVAL_H, PlanningSlot _PRAGUE = ZoneInfo("Europe/Prague") _LOOKAHEAD_SLOTS = 4 _BUY_LOOKAHEAD_EPS = 0.05 _BUY_CHARGE_BAND = 0.40 _MAX_GRID_CHARGE_CAP = 24 def _prague_date(s: PlanningSlot) -> date: return s.interval_start.astimezone(_PRAGUE).date() def _export_window_start_by_day( slots: list[PlanningSlot], degrad: float ) -> dict[date, datetime]: """Kopie R__063: první sell > min(buy) téhož kalendářního dne (Prague) + degrad.""" out: dict[date, datetime] = {} for s in slots: day = _prague_date(s) day_min = min(float(x.buy_price) for x in slots if _prague_date(x) == day) if float(s.sell_price) > day_min + degrad: prev = out.get(day) if prev is None or s.interval_start < prev: out[day] = s.interval_start return out def _before_day_export( slots: list[PlanningSlot], t: int, export_by_day: dict[date, datetime] ) -> bool: start = export_by_day.get(_prague_date(slots[t])) return start is not None and slots[t].interval_start < start def _future_sell(slots: list[PlanningSlot], t: int) -> float: tail = [float(slots[i].sell_price) for i in range(t + 1, len(slots))] return max(tail) if tail else float(slots[t].sell_price) def _buy_min_next_n( slots: list[PlanningSlot], t: int, n: int = _LOOKAHEAD_SLOTS, *, export_by_day: dict[date, datetime] | None = None, ) -> float | None: tail: list[float] = [] for i in range(t + 1, min(t + 1 + n, len(slots))): day_start = export_by_day.get(_prague_date(slots[i])) if export_by_day else None if day_start is None or slots[i].interval_start < day_start: tail.append(float(slots[i].buy_price)) return min(tail) if tail else None def _pv_surplus_w(s: PlanningSlot) -> int: return max(0, int(s.pv_a_forecast_w) + int(s.pv_b_forecast_w) - int(s.load_baseline_w)) def _first_neg_sell_ord(slots: list[PlanningSlot]) -> int | None: for i, s in enumerate(slots): if float(s.sell_price) < 0: return i return None def _apply_dynamic_grid_filter( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, grid_slots: set[int], ) -> set[int]: """Self-konzistentni filtr vrstvy B (kopie R__063 A2).""" if not grid_slots: return grid_slots degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) first_neg = _first_neg_sell_ord(slots) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_wh = max_p_w * eta * INTERVAL_H soc_max = float(battery.soc_max_wh) deficit = max(0.0, soc_max - float(current_soc_wh)) threshold = deficit * 0.6 t_len = len(slots) pv_ahead = [0.0] * t_len neg_ahead = [0.0] * t_len for t in range(t_len): pv_sum = 0.0 neg_sum = 0.0 for w2 in range(t, t_len): if first_neg is not None and w2 >= first_neg: break s2 = slots[w2] pv_s = _pv_surplus_w(s2) if pv_s > 0 and (float(s2.sell_price) < 0 or float(s2.buy_price) < 0): pv_sum += min(pv_s, max_p_w) * eta * INTERVAL_H if float(s2.buy_price) < 0: neg_sum += per_slot_wh pv_ahead[t] = min(pv_sum, deficit) neg_ahead[t] = neg_sum slots[t].pv_charge_wh_ahead = pv_ahead[t] slots[t].neg_buy_wh_ahead = neg_ahead[t] remaining = set(grid_slots) acq_prev = -999.0 for _ in range(5): if not remaining: break total_wh = len(remaining) * per_slot_wh pos_remaining = [t for t in remaining if float(slots[t].buy_price) >= 0] total_wh_pos = len(pos_remaining) * per_slot_wh if total_wh_pos <= 0: acq = min(float(s.buy_price) for s in slots if float(s.buy_price) >= 0) else: acq = sum(float(slots[t].buy_price) * per_slot_wh for t in pos_remaining) / total_wh_pos if abs(acq - acq_prev) < 0.05: break acq_prev = acq to_remove: set[int] = set() for t in list(remaining): s = slots[t] if float(s.buy_price) < 0: continue if float(s.buy_price) > acq - degrad and pv_ahead[t] + neg_ahead[t] >= threshold: to_remove.add(t) slots[t].grid_charge_suppressed_reason = ( "cheaper_pv_ahead" if pv_ahead[t] >= neg_ahead[t] else "cheaper_neg_buy_ahead" ) if not to_remove: break remaining -= to_remove cum_allowed = len(remaining) * per_slot_wh pv0 = pv_ahead[0] if t_len else 0.0 target_deficit = deficit - pv0 if cum_allowed < target_deficit * 0.6: suppressed = sorted( [ t for t in grid_slots if t not in remaining and slots[t].grid_charge_suppressed_reason ], key=lambda t: (float(slots[t].buy_price), t), ) for t in suppressed: if float(slots[t].buy_price) >= 2 * acq_prev: break remaining.add(t) slots[t].grid_charge_suppressed_reason = "safety_failsafe_unlock" cum_allowed += per_slot_wh if cum_allowed >= target_deficit * 0.6: break return remaining def _store_score(slots: list[PlanningSlot], t: int) -> float: s = slots[t] buy = float(s.buy_price) sell = float(s.sell_price) fso = _future_sell(slots, t) return fso - sell - max(0.0, buy - sell) def _select_charge_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, *, purchase_pricing_mode: str = "spot", apply_dynamic_grid_filter: bool = True, ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (charge mask).""" charge_buf = float(getattr(battery, "charge_slot_buffer", 0) or 0) if charge_buf <= 0: return set(range(len(slots))) energy_to_fill = float(battery.soc_max_wh) - float(current_soc_wh) if energy_to_fill <= 0: return set(range(len(slots))) reserve_wh = float(getattr(battery, "reserve_soc_wh", 0) or 0) degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) ref_buy_am = min( (float(s.buy_price) for s in slots if _prague_hour(s) < 12), default=min(float(s.buy_price) for s in slots), ) ref_buy_pm = min( (float(s.buy_price) for s in slots if _prague_hour(s) >= 12), default=min(float(s.buy_price) for s in slots), ) ref_buy_global = min(float(s.buy_price) for s in slots) export_by_day = _export_window_start_by_day(slots, degrad) plan_day = _prague_date(slots[0]) eta = float(getattr(battery, "charge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_charge_power_w", 0.0) or 0.0) per_slot_full_wh = max_p_w * eta * INTERVAL_H soc_max_wh = float(getattr(battery, "soc_max_wh", 0) or 0) if charge_buf > 0: charge_target_wh = min( max(energy_to_fill, 0.0) * charge_buf, max(soc_max_wh - float(current_soc_wh), 0.0), ) elif current_soc_wh >= reserve_wh: charge_target_wh = max(energy_to_fill, 0.0) else: charge_target_wh = min( max(energy_to_fill, 0.0) * charge_buf, max(energy_to_fill, 0.0), ) n_am = sum(1 for s in slots if _prague_hour(s) < 12) n_pm = len(slots) - n_am if n_am <= 0: chg_am = 0.0 chg_pm = charge_target_wh elif n_pm <= 0: chg_am = charge_target_wh chg_pm = 0.0 else: chg_am = charge_target_wh / 2.0 chg_pm = charge_target_wh - chg_am selected: set[int] = set() grid_selected: set[int] = set() grid_filled_wh = 0.0 buf_mult = charge_buf if charge_buf > 0 else 1.0 cap_am = ( max( 1, min( _MAX_GRID_CHARGE_CAP, int(chg_am / per_slot_full_wh * buf_mult) + 1, ), ) if per_slot_full_wh > 0 else 6 ) cap_pm = ( max( 1, min( _MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh * buf_mult) + 1, ), ) if per_slot_full_wh > 0 else 6 ) def _grid_sort_key(t: int, pred: bool, price: float) -> tuple[int, int, int, float, int]: today_first = 0 if _prague_date(slots[t]) == plan_day else 1 before_export = 0 if _before_day_export(slots, t, export_by_day) else 1 return (today_first, before_export, int(pred), price, t) if purchase_pricing_mode != "fixed": am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if _prague_hour(slots[t]) < 12 ] am_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2])) cum = 0.0 grid_am = 0 for t, _pred, _price in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am: break selected.add(t) grid_selected.add(t) cum += per_slot_full_wh grid_am += 1 grid_filled_wh += cum chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh) if per_slot_full_wh > 0: cap_pm = max( cap_pm, min( _MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh * buf_mult) + 1, ), ) pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False), float(slots[t].buy_price)) for t in range(len(slots)) if _prague_hour(slots[t]) >= 12 ] pm_candidates.sort(key=lambda x: _grid_sort_key(x[0], x[1], x[2])) cum = 0.0 grid_pm = 0 for t, _pred, _price in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm: break selected.add(t) grid_selected.add(t) cum += per_slot_full_wh grid_pm += 1 grid_filled_wh += cum for t, s in enumerate(slots): if float(s.buy_price) < 0: selected.add(t) grid_selected.add(t) if apply_dynamic_grid_filter: filtered_grid = _apply_dynamic_grid_filter( slots, battery, current_soc_wh, grid_selected ) for t in grid_selected - filtered_grid: selected.discard(t) elif purchase_pricing_mode == "fixed" and any( float(s.sell_price) > float(s.buy_price) + degrad for s in slots ): am_candidates = [ (t, getattr(slots[t], "is_predicted_price", False)) for t in range(len(slots)) if _prague_hour(slots[t]) < 12 ] am_candidates.sort( key=lambda x: ( _grid_sort_key(x[0], x[1], 0.0)[0], _grid_sort_key(x[0], x[1], 0.0)[1], _grid_sort_key(x[0], x[1], 0.0)[2], x[0], ) ) cum = 0.0 grid_am = 0 for t, _pred in am_candidates: if cum >= chg_am or per_slot_full_wh <= 0 or grid_am >= cap_am: break selected.add(t) cum += per_slot_full_wh grid_am += 1 grid_filled_wh += cum chg_pm = max(chg_pm, charge_target_wh - grid_filled_wh) if per_slot_full_wh > 0: cap_pm = max( cap_pm, min( _MAX_GRID_CHARGE_CAP, int(chg_pm / per_slot_full_wh * buf_mult) + 1, ), ) pm_candidates = [ (t, getattr(slots[t], "is_predicted_price", False)) for t in range(len(slots)) if _prague_hour(slots[t]) >= 12 ] pm_candidates.sort( key=lambda x: ( _grid_sort_key(x[0], x[1], 0.0)[0], _grid_sort_key(x[0], x[1], 0.0)[1], _grid_sort_key(x[0], x[1], 0.0)[2], x[0], ) ) cum = 0.0 grid_pm = 0 for t, _pred in pm_candidates: if cum >= chg_pm or per_slot_full_wh <= 0 or grid_pm >= cap_pm: break selected.add(t) cum += per_slot_full_wh grid_pm += 1 grid_filled_wh += cum pv_layer_cap = max(charge_target_wh - grid_filled_wh, 0.0) pv_candidates: list[tuple[int, float, float]] = [] for t, s in enumerate(slots): pv_surplus_w = max(0, s.pv_a_forecast_w + s.pv_b_forecast_w - s.load_baseline_w) fso = _future_sell(slots, t) if ( pv_surplus_w > 0 and float(s.sell_price) >= float(s.buy_price) - degrad and ( float(s.sell_price) < 0 or float(s.sell_price) >= fso - degrad ) ): pv_candidates.append((t, _store_score(slots, t), float(pv_surplus_w))) pv_candidates.sort(key=lambda x: (-x[1], x[0])) cum = 0.0 for t, _score, pv_surplus_w in pv_candidates: if cum >= pv_layer_cap: break selected.add(t) cum += min(pv_surplus_w, max_p_w) * eta * INTERVAL_H return selected def _select_discharge_export_slots( slots: list[PlanningSlot], battery: SimpleNamespace, current_soc_wh: float, charge_slots: set[int] | None = None, *, purchase_pricing_mode: str = "spot", ) -> set[int]: """Kopie logiky z ems.fn_load_planning_slots_full (discharge-export mask).""" discharge_buf = float(getattr(battery, "discharge_slot_buffer", 0) or 0) if discharge_buf <= 0: return set(range(len(slots))) min_soc_wh = float(getattr(battery, "min_soc_wh", 0) or 0) soc_max_wh = float(battery.soc_max_wh) exportable_wh = soc_max_wh - min_soc_wh if exportable_wh <= 0: return set() degrad = float(getattr(battery, "degradation_cost_czk_kwh", 0.15) or 0.15) eta = float(getattr(battery, "discharge_efficiency", 1.0) or 1.0) max_p_w = float(getattr(battery, "max_discharge_power_w", 0.0) or 0.0) per_slot_wh = max_p_w * eta * INTERVAL_H discharge_target_wh = exportable_wh * discharge_buf ref_buy = min(float(s.buy_price) for s in slots) if purchase_pricing_mode == "fixed": sell_min = None # per-slot buy + degrad below else: sell_min = ref_buy + degrad candidates = [ (t, float(slots[t].sell_price)) for t in range(len(slots)) if ( float(slots[t].sell_price) > float(slots[t].buy_price) + degrad if purchase_pricing_mode == "fixed" else float(slots[t].sell_price) > sell_min ) ] candidates.sort(key=lambda x: (-x[1], -x[0])) first_neg = next( (i for i, s in enumerate(slots) if float(s.sell_price) < 0), None, ) neg_day = _prague_date(slots[first_neg]) if first_neg is not None else None if first_neg is not None and neg_day is not None: filtered: list[tuple[int, float]] = [] for t, sell in candidates: if t >= first_neg: filtered.append((t, sell)) continue if _prague_date(slots[t]) != neg_day: filtered.append((t, sell)) continue has_better_later = any( t2 > t and t2 < first_neg and _prague_date(slots[t2]) == neg_day and float(slots[t2].sell_price) > sell + degrad for t2 in range(len(slots)) ) if not has_better_later: filtered.append((t, sell)) candidates = filtered selected: set[int] = set() cum = 0.0 for t, _sell in candidates: if cum >= discharge_target_wh or per_slot_wh <= 0: break selected.add(t) cum += per_slot_wh if first_neg is not None and neg_day is not None: evening_by_day: dict = {} for t, s in enumerate(slots): d = _prague_date(s) if _prague_hour(s) < 17: continue evening_by_day[d] = max(evening_by_day.get(d, 0.0), float(s.sell_price)) for t, s in enumerate(slots): d = _prague_date(s) peak = evening_by_day.get(d, 0.0) if peak > 0 and _prague_hour(s) >= 17 and float(s.sell_price) >= peak - degrad: if purchase_pricing_mode == "fixed": if float(s.sell_price) > float(s.buy_price) + degrad: selected.add(t) elif float(s.sell_price) > sell_min: selected.add(t) preneg_min_soc = min_soc_wh + max(per_slot_wh, 1000.0) if ( first_neg is not None and first_neg > 0 and current_soc_wh >= preneg_min_soc and neg_day is not None ): morning_sells = [ float(slots[i].sell_price) for i in range(first_neg) if float(slots[i].sell_price) >= 0 and _prague_date(slots[i]) == neg_day and 5 <= _prague_hour(slots[i]) <= 11 ] if morning_sells: zone_peak = max(morning_sells) for i in range(first_neg): if ( _prague_date(slots[i]) == neg_day and 5 <= _prague_hour(slots[i]) <= 11 and float(slots[i].sell_price) >= zone_peak - degrad ): selected.add(i) for i in range(first_neg): if _prague_date(slots[i]) != neg_day: continue h = _prague_hour(slots[i]) if 5 <= h < 17 and float(slots[i].sell_price) < zone_peak - degrad: selected.discard(i) return selected def _prague_hour(s: PlanningSlot) -> int: dt = s.interval_start if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(_PRAGUE).hour def _slot( *, buy: float, sell: float = 1.0, pv: int = 0, load: int = 2_000, hour_utc: int = 12, predicted: bool = False, interval_start: datetime | None = None, ) -> PlanningSlot: if interval_start is None: interval_start = datetime(2026, 5, 19, hour_utc, 0, tzinfo=timezone.utc) return PlanningSlot( interval_start=interval_start, buy_price=buy, sell_price=sell, pv_a_forecast_w=0, pv_b_forecast_w=pv, load_baseline_w=load, ev1_connected=False, ev2_connected=False, is_predicted_price=predicted, ) def _battery( *, charge_buf: float = 1.3, discharge_buf: float = 1.5, uc_wh: float = 64_000.0, soc_max_pct: float = 95.0, min_soc_pct: float = 10.0, reserve_soc_pct: float = 20.0, max_charge_w: float = 18_000.0, max_discharge_w: float = 18_000.0, charge_eff: float = 0.95, discharge_eff: float = 0.95, degrad: float = 0.15, ) -> SimpleNamespace: uc = uc_wh return SimpleNamespace( usable_capacity_wh=uc, min_soc_wh=min_soc_pct / 100.0 * uc, reserve_soc_wh=reserve_soc_pct / 100.0 * uc, soc_max_wh=soc_max_pct / 100.0 * uc, max_charge_power_w=max_charge_w, max_discharge_power_w=max_discharge_w, charge_efficiency=charge_eff, discharge_efficiency=discharge_eff, charge_slot_buffer=charge_buf, discharge_slot_buffer=discharge_buf, degradation_cost_czk_kwh=degrad, ) class SelectChargeSlotsTests(unittest.TestCase): def test_buffer_zero_returns_all_slots(self) -> None: slots = [_slot(buy=3.0) for _ in range(4)] battery = _battery(charge_buf=0.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertEqual(out, set(range(4))) def test_returns_all_when_battery_is_full(self) -> None: slots = [_slot(buy=0.1) for _ in range(3)] battery = _battery() out = _select_charge_slots( slots, battery, current_soc_wh=battery.soc_max_wh + 1.0 ) self.assertEqual(out, set(range(3))) def test_pv_surplus_high_store_score_selected(self) -> None: """Slot s vyšším store_score (lepší uložení vs export) má přednost.""" slots = [ _slot(buy=1.5, sell=0.01, pv=8_000, load=2_000, hour_utc=8), _slot(buy=1.5, sell=0.50, pv=8_000, load=2_000, hour_utc=9), _slot(buy=0.5, sell=0.40, pv=8_000, load=2_000, hour_utc=10), ] battery = _battery( charge_buf=1.3, uc_wh=1_000.0, soc_max_pct=100.0, max_charge_w=6_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Nejlevnější buy (grid B) má být vybrán") self.assertNotIn(1, out, "Dražší AM slot (buy 1.5) nemá přednost před levným buy 0.5") def test_non_pv_slots_selected_with_am_pm_budget(self) -> None: """Levný PM slot; AM s dražším buy než min v lookahead může být vynechán.""" slots = [ _slot(buy=0.5, hour_utc=4), _slot(buy=3.0, hour_utc=5), _slot(buy=0.4, hour_utc=14), _slot(buy=9.9, hour_utc=15), ] battery = _battery( charge_buf=1.3, uc_wh=5_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(2, out, "Nejlevnější buy v horizontu (PM) musí být vybrán") def test_pm_grid_prefers_today_before_export_over_tomorrow_cheaper(self) -> None: """Dnes PM levné před večerním exportem má prioritu před zítřejším min(buy).""" base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc) slots = [ _slot(buy=0.72, sell=-0.1, pv=500, load=3000, interval_start=base), _slot(buy=0.68, sell=-0.15, pv=500, load=3000, interval_start=base + timedelta(minutes=15)), _slot( buy=5.5, sell=3.8, pv=0, load=2500, interval_start=base + timedelta(hours=7, minutes=30), ), _slot( buy=0.50, sell=-0.3, pv=2000, load=5000, interval_start=base + timedelta(hours=26), ), ] battery = _battery(uc_wh=64_000.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.46 * battery.usable_capacity_wh) self.assertIn(0, out) self.assertIn(1, out) self.assertEqual( min(out), 0, "Před exportním oknem musí být vybrány dnešní levné PM sloty dřív než zítřejší min(buy)", ) def test_cheap_pm_with_pv_surplus_gets_grid_charge(self) -> None: """Regrese home-01: levné PM VT (~0,8) i s FVE musí projít grid maskou B.""" base = datetime(2026, 5, 21, 10, 0, tzinfo=timezone.utc) slots = [ _slot( buy=0.80, sell=-0.08, pv=2_500, load=3_400, interval_start=base, ), _slot( buy=0.72, sell=-0.13, pv=500, load=3_400, interval_start=base + timedelta(minutes=15), ), _slot( buy=2.50, sell=1.40, pv=2_000, load=3_800, interval_start=base + timedelta(hours=5), ), _slot( buy=5.50, sell=3.80, pv=100, load=2_900, interval_start=base + timedelta(hours=9), ), ] battery = _battery(uc_wh=64_000.0, charge_buf=1.05) soc = 0.88 * battery.usable_capacity_wh out = _select_charge_slots(slots, battery, current_soc_wh=soc) self.assertIn(1, out, "Levnější PM slot má allow_charge i s FVE") self.assertIn(0, out) self.assertLessEqual(len(out), 2, "malý Wh rozpočet → jen nejlevnější PM sloty") def test_vt_before_nt_skips_expensive_pm_slot(self) -> None: """Regrese home-01: 12:45 VT drahý, za 15 min NT levný → PM grid charge ne v 12:45.""" base = datetime(2026, 5, 21, 10, 45, tzinfo=timezone.utc) slots = [ _slot( buy=1.49, sell=-0.04, pv=0, load=3_500, interval_start=base, ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=15), ), _slot( buy=0.86, sell=0.01, pv=0, load=3_500, interval_start=base + timedelta(minutes=30), ), ] battery = _battery(uc_wh=64_000.0, charge_buf=1.0) soc = 0.92 * battery.usable_capacity_wh out = _select_charge_slots(slots, battery, current_soc_wh=soc) self.assertNotIn(0, out, "Při malém rozpočtu má přednost levnější NT, ne VT 1.49") self.assertTrue({1, 2} & out, "NT slot(y) mohou být vybrány") def test_pm_grid_gets_unused_am_wh_budget(self) -> None: """Nečerpaný AM rozpočet → odpolední levné PM sloty mohou dostat allow_charge.""" base = datetime(2026, 5, 22, 6, 0, tzinfo=timezone.utc) slots = [ _slot(buy=0.55, sell=-0.2, hour_utc=6, interval_start=base), _slot(buy=0.58, sell=-0.2, hour_utc=7, interval_start=base + timedelta(hours=1)), _slot(buy=0.52, sell=-0.25, hour_utc=14, interval_start=base + timedelta(hours=8)), _slot(buy=0.50, sell=-0.25, hour_utc=15, interval_start=base + timedelta(hours=9)), _slot(buy=5.5, sell=3.8, hour_utc=20, interval_start=base + timedelta(hours=14)), ] battery = _battery(charge_buf=1.3, uc_wh=64_000.0) out = _select_charge_slots(slots, battery, current_soc_wh=0.12 * battery.usable_capacity_wh) pm_cheap = {2, 3} self.assertTrue( pm_cheap & out, "po levném AM má PM dostat grid charge z nevyčerpaného rozpočtu", ) def test_ote_slots_prioritized_over_predicted(self) -> None: """Při stejné ceně má OTE (is_predicted=false) přednost před predikovaným.""" slots = [ _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=False), _slot(buy=2.00, sell=2.0, hour_utc=13, predicted=True), ] battery = _battery( charge_buf=1.3, uc_wh=3_000.0, soc_max_pct=100.0, max_charge_w=18_000.0 ) out = _select_charge_slots(slots, battery, current_soc_wh=0.0) self.assertIn(0, out) self.assertNotIn(1, out) class SelectDischargeExportSlotsTests(unittest.TestCase): def test_evening_sell_allowed_when_cheaper_than_ref_charge_buy(self) -> None: slots = [ _slot(buy=0.50, sell=-0.30, hour_utc=6), _slot(buy=0.51, sell=-0.29, hour_utc=7), _slot(buy=5.60, sell=3.30, hour_utc=16), _slot(buy=5.98, sell=3.37, hour_utc=17), ] battery = _battery(uc_wh=64_000.0) charge = _select_charge_slots(slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.2 * battery.usable_capacity_wh, charge_slots=charge ) self.assertIn(2, discharge) self.assertIn(3, discharge) def test_export_excluded_when_sell_below_ref_buy_plus_degradation(self) -> None: slots = [ _slot(buy=0.40, sell=0.10, hour_utc=8), _slot(buy=4.00, sell=0.50, hour_utc=18), ] battery = _battery(uc_wh=10_000.0, discharge_buf=2.0) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.0 ) self.assertNotIn(1, discharge) class DynamicGridFilterTests(unittest.TestCase): def _range(self, start_h: int, end_h: int, **kwargs) -> list[PlanningSlot]: base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE) out: list[PlanningSlot] = [] for h in range(start_h, end_h): for minute in (0, 15, 30, 45): t = base.replace(hour=h, minute=minute).astimezone(timezone.utc) out.append( _slot( interval_start=t, hour_utc=t.hour, buy=kwargs.get("buy", 4.5), sell=kwargs.get("sell", 2.0), pv=kwargs.get("pv_b", 0), load=kwargs.get("load", 500), ) ) if "pv_a" in kwargs: out[-1] = PlanningSlot( interval_start=t, buy_price=float(kwargs.get("buy", 4.5)), sell_price=float(kwargs.get("sell", 2.0)), pv_a_forecast_w=int(kwargs["pv_a"]), pv_b_forecast_w=int(kwargs.get("pv_b", 0)), load_baseline_w=int(kwargs.get("load", 500)), ev1_connected=False, ev2_connected=False, is_predicted_price=False, ) return out def _home01_battery(self) -> SimpleNamespace: return _battery(charge_buf=1.3, uc_wh=64_000.0, soc_max_pct=95.0) def _uniform_buy_slots(self, buy: float, n: int = 96) -> list[PlanningSlot]: base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE) return [ _slot( buy=buy, sell=2.0, load=500, interval_start=(base + timedelta(minutes=15 * i)).astimezone(timezone.utc), ) for i in range(n) ] def test_home01_night_charge_before_neg_sell_pv_day(self) -> None: slots = [ *self._range(0, 5, buy=4.7, sell=2.9), *self._range(5, 7, buy=5.0, sell=3.0, pv_b=400), *self._range(7, 11, buy=4.5, sell=2.8, pv_a=3000, pv_b=2000), *self._range(11, 14, buy=0.5, sell=-0.4, pv_a=6000, pv_b=5000), *self._range(14, 17, buy=1.0, sell=-0.3, pv_a=5000, pv_b=4000), *self._range(17, 19, buy=4.5, sell=3.0), *self._range(19, 22, buy=6.5, sell=4.0), *self._range(22, 24, buy=4.8, sell=3.0), ] selected = _select_charge_slots( slots, self._home01_battery(), current_soc_wh=30_000.0 ) for i in range(88, 96): self.assertNotIn(i, selected, f"slot {i} (noc) nema byt allow_grid_charge") self.assertTrue(any(i in selected for i in range(44, 56))) def test_cloudy_day_no_pv_grid_unlock(self) -> None: slots = self._uniform_buy_slots(4.5 + 0.1) selected = _select_charge_slots( slots, self._home01_battery(), current_soc_wh=30_000.0 ) self.assertGreater(len(selected), 4, "failsafe musel uvolnit nejake sloty") def test_ba81_fixed_tariff_mask_unchanged(self) -> None: battery = _battery(charge_buf=1.3, uc_wh=12_500.0) slots_fixed = self._uniform_buy_slots(buy=3.5) selected_v1 = _select_charge_slots( slots_fixed, battery, 5000.0, purchase_pricing_mode="fixed", apply_dynamic_grid_filter=False, ) selected_v2 = _select_charge_slots( slots_fixed, battery, 5000.0, purchase_pricing_mode="fixed", ) self.assertEqual(selected_v1, selected_v2) def test_kv1_block_export_unchanged(self) -> None: """KV1: filtr vrstvy B beze zmeny u fixed tarifu (stejne jako BA81).""" battery = _battery(charge_buf=1.3, uc_wh=12_500.0) slots_fixed = self._uniform_buy_slots(buy=3.5) selected_v1 = _select_charge_slots( slots_fixed, battery, 5000.0, purchase_pricing_mode="fixed", apply_dynamic_grid_filter=False, ) selected_v2 = _select_charge_slots( slots_fixed, battery, 5000.0, purchase_pricing_mode="fixed", ) self.assertEqual(selected_v1, selected_v2) class FixedPurchasePricingTests(unittest.TestCase): def test_fixed_skips_grid_charge_when_no_sell_arbitrage(self) -> None: """Fixní buy bez výkupu nad buy+degrad → žádné grid nabíjení.""" slots = [ _slot(buy=6.35, sell=2.0, hour_utc=14, load=500), _slot(buy=6.35, sell=3.5, hour_utc=18, load=500), ] battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.4 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertEqual(out, set()) def test_fixed_grid_charge_before_evening_export(self) -> None: """BA81: konstantní buy, večerní sell > buy+degrad → NT/AM grid sloty.""" base = datetime(2026, 5, 24, 0, 0, tzinfo=_PRAGUE) slots: list[PlanningSlot] = [] for i in range(96): t = base + timedelta(minutes=15 * i) sell = 3.75 if t.hour >= 18 else 2.8 slots.append( _slot( buy=3.088, sell=sell, load=200, interval_start=t.astimezone(timezone.utc), ) ) battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.33 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) night = {t for t in out if _prague_hour(slots[t]) < 8} self.assertGreater(len(night), 0, "očekáváno grid nabíjení v noci před večerním výkupem") def test_fixed_nt_charge_after_yesterday_evening_peak(self) -> None: """Včerejší večerní peak nesmí zablokovat dnešní 00–06 grid (per-day export okno).""" base = datetime(2026, 5, 23, 20, 0, tzinfo=_PRAGUE) slots: list[PlanningSlot] = [] for i in range(64): t = base + timedelta(minutes=15 * i) sell = 3.8 if t.hour >= 20 and t.date() == date(2026, 5, 23) else 2.9 if t.date() == date(2026, 5, 24) and t.hour >= 19: sell = 3.7 slots.append( _slot( buy=3.088, sell=sell, load=200, interval_start=t.astimezone(timezone.utc), ) ) battery = _battery(charge_buf=1.3, uc_wh=12_500.0) out = _select_charge_slots( slots, battery, current_soc_wh=0.33 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) may24_night = { t for t in out if _prague_date(slots[t]) == date(2026, 5, 24) and _prague_hour(slots[t]) < 6 } self.assertGreater(len(may24_night), 0) def test_fixed_allows_discharge_on_high_sell(self) -> None: slots = [ _slot(buy=3.09, sell=1.0, hour_utc=10), _slot(buy=3.09, sell=3.8, hour_utc=18), _slot(buy=3.09, sell=3.5, hour_utc=19), ] battery = _battery(uc_wh=12_500.0, discharge_buf=2.0, degrad=0.3) discharge = _select_discharge_export_slots( slots, battery, current_soc_wh=0.5 * battery.usable_capacity_wh, purchase_pricing_mode="fixed", ) self.assertIn(1, discharge) self.assertIn(2, discharge, "oba sloty sell > buy + degrad") class NegSellReservationTests(unittest.TestCase): """Mirror SQL `R__063` PV vrstva A — rezervace `v_pv_layer_cap_wh` pro `sell<0` okno. Cíl: do prvního `sell<0` slotu dorazit s SoC = soc_max − min(neg_window_pv, available_storage), aby `sell<0` PV mohlo doplnit zbytek bez exportu / curtail pole A. """ @staticmethod def _pv_layer_cap_with_reservation( slots, # list of dict { sell, pv_surplus_w } energy_to_fill_wh: float, grid_filled_wh: float, max_charge_w: float, charge_eff: float, ) -> float: cap = max(energy_to_fill_wh - grid_filled_wh, 0.0) neg_window = sum( min(float(s["pv_surplus_w"]), max_charge_w) * charge_eff * 0.25 for s in slots if float(s["sell"]) < 0 and float(s["pv_surplus_w"]) > 0 ) return max(cap - neg_window, 0.0) def test_neg_sell_window_reduces_pv_layer_cap(self) -> None: # 4 ranní PV sloty (sell>=0), 2 odpolední neg-sell PV sloty slots = [ {"sell": 1.2, "pv_surplus_w": 6000}, # 10:00 cap candidate {"sell": 1.0, "pv_surplus_w": 7000}, # 11:00 {"sell": 0.8, "pv_surplus_w": 8000}, # 12:00 {"sell": -0.5, "pv_surplus_w": 9000}, # 13:00 neg-sell {"sell": -1.0, "pv_surplus_w": 9000}, # 13:30 neg-sell ] cap_before = 20_000.0 # 20 kWh deficit cap_after = self._pv_layer_cap_with_reservation( slots, energy_to_fill_wh=cap_before, grid_filled_wh=0.0, max_charge_w=10_000.0, charge_eff=0.95, ) # neg-window = 2 × min(9000, 10000) × 0.95 × 0.25 = 4 275 self.assertAlmostEqual(cap_after, cap_before - 4275.0, places=1) self.assertLess(cap_after, cap_before) def test_neg_sell_pv_covers_full_deficit(self) -> None: slots = [ {"sell": 1.0, "pv_surplus_w": 5000}, # ranní {"sell": -0.4, "pv_surplus_w": 9000}, {"sell": -1.1, "pv_surplus_w": 9000}, {"sell": -0.8, "pv_surplus_w": 9000}, ] # neg-window = 3 × 9000 × 0.95 × 0.25 = 6 412.5 Wh cap_after = self._pv_layer_cap_with_reservation( slots, energy_to_fill_wh=5_000.0, grid_filled_wh=0.0, max_charge_w=10_000.0, charge_eff=0.95, ) # cap (5000) - neg_window (6412.5) → clamp na 0 → žádné ranní PV nabíjení self.assertEqual(cap_after, 0.0) def test_no_neg_sell_window_no_change(self) -> None: slots = [ {"sell": 1.2, "pv_surplus_w": 6000}, {"sell": 0.8, "pv_surplus_w": 7000}, ] cap_before = 8_000.0 cap_after = self._pv_layer_cap_with_reservation( slots, energy_to_fill_wh=cap_before, grid_filled_wh=0.0, max_charge_w=10_000.0, charge_eff=0.95, ) self.assertEqual(cap_after, cap_before) class FallbackAcquisitionTests(unittest.TestCase): """Mirror SQL `R__063` fallback when `v_est_grid_wh = 0` and no positive buy grid slot.""" @staticmethod def _fallback_acq(slots, est_grid_wh: float, est_grid_cost: float) -> float: if est_grid_wh > 0: return est_grid_cost / est_grid_wh positive_buys = [float(s["buy"]) for s in slots if float(s["buy"]) >= 0] v = min(positive_buys) if positive_buys else 0.0 return max(v, 0.0) def test_no_grid_charging_but_horizon_has_positive_buys(self) -> None: # PM má negativní buy (13:00–14:00), ale jinde v horizontu pozitivní min slots = [ {"buy": 4.5, "hour": 23}, {"buy": 0.3, "hour": 7}, {"buy": 0.7, "hour": 10}, {"buy": -0.36, "hour": 13}, {"buy": -0.43, "hour": 14}, ] acq = self._fallback_acq(slots, est_grid_wh=0.0, est_grid_cost=0.0) self.assertAlmostEqual(acq, 0.3, places=4) self.assertGreaterEqual(acq, 0.0) def test_all_negative_buys_clamps_to_zero(self) -> None: slots = [ {"buy": -0.4, "hour": 13}, {"buy": -0.5, "hour": 14}, ] acq = self._fallback_acq(slots, est_grid_wh=0.0, est_grid_cost=0.0) self.assertEqual(acq, 0.0) def test_positive_weighted_mean_unchanged(self) -> None: # est_grid_wh > 0 — vrátit weighted mean, ne fallback acq = self._fallback_acq( [{"buy": 0.7, "hour": 10}], est_grid_wh=4000.0, est_grid_cost=4000.0 * 0.7, ) self.assertAlmostEqual(acq, 0.7, places=4) if __name__ == "__main__": unittest.main()