Merge branch 'worktree-agent-a53f3277d55fecfcb' into dev
All checks were successful
CI and deploy / migration-check (push) Successful in 19s
CI and deploy / deploy (push) Successful in 1m14s

This commit is contained in:
Dusan Vojacek
2026-06-12 19:40:50 +02:00
12 changed files with 425 additions and 57 deletions

View File

@@ -137,6 +137,7 @@ async def _load_site_context(site_id: int, db):
vehicles.append(
SimpleNamespace(
max_charge_power_w=int(v["max_charge_power_w"]),
min_power_w=int(v.get("min_power_w") or 0),
battery_capacity_kwh=float(v["battery_capacity_kwh"]),
default_target_soc_pct=float(v["default_target_soc_pct"]),
)
@@ -145,6 +146,7 @@ async def _load_site_context(site_id: int, db):
vehicles.append(
SimpleNamespace(
max_charge_power_w=0,
min_power_w=0,
battery_capacity_kwh=1.0,
default_target_soc_pct=80.0,
)

View File

@@ -28,9 +28,17 @@
# vede k "nabít plným výkonem hned, pak řezat A" — emergentně, bez rampy.
# - oportunistické EV („měkký cíl"): nad tvrdý target smí auto vzít až
# headroom_wh (do 100 %), oceněno opportunistic_value_czk_kwh (= budoucí
# ušetřené nabíjení, DB) — kupuje jen velmi levnou/zápornou energii.
# Dekompozice Σ(EV energie) == needed unmet + opp zároveň stropuje
# celkovou energii do auta (dřív při buy<0 bez stropu).
# ušetřené nabíjení, session override → vozidlo, DB) — kupuje jen velmi
# levnou/zápornou energii. Dekompozice Σ(EV energie) == needed unmet + opp
# zároveň stropuje celkovou energii do auta (dřív při buy<0 bez stropu);
# opp vrstva NENÍ vázaná deadline (auto bývá doma dál, odjezd řeší rolling
# replan); bez session je EV == 0 (stop-session). Deadline suma jde po
# slot PŘED deadline (slot začínající v deadline už nepatří „do deadline").
# - min. výkon wallboxu (asset_ev_charger.min_power_w, 6 A ≈ 1380 W):
# binárka ev_on → setpoint ∈ {0} [min_power_w, max]; ev_direct ≤ gi + PV
# (fyzikální split direct/via_bat). Reporting: kWh přes ev_via_bat plní
# battery_arbitrage_czk oportunitní cenou (min sell exportního slotu dne,
# jinak terminal value) — slotový buy pro ně neplatí.
# - denní SoC rampa: deficit pod slot.safety_soc_target_wh (R__063: reserve →
# reserve+noc, 619 h) platí za slot nájem buy×faktor (DB
# planner_safety_soc_risk_factor) — ráno se nejdřív dotáhne rezerva
@@ -58,13 +66,14 @@ from services.planning.constants import (
from services.planning.types import (
DispatchResult,
PlanningSlot,
_prague_calendar_date,
_prague_dow_hour,
)
from services.planning.heuristics import _dispatch_grid_setpoint_w
logger = logging.getLogger(__name__)
V2_BUILD_TAG = "v2-clean-2026-06-11"
V2_BUILD_TAG = "v2-ev-accounting-2026-06-12"
# Cena za vypnutí GEN portu (mikroinvertory pole B): reálné riziko/opotřebení
# cyklování stykače — drobná, ale nenulová, aby cutoff platil jen při sell < 0.
@@ -166,6 +175,10 @@ def solve_dispatch_v2(
]
ev_unmet: list = [] # slack Wh per session (cena V2_EV_UNMET_CZK_KWH)
ev_opp: list = [] # (var, value_czk_kwh) — energie nad target (měkký cíl)
# min. výkon wallboxu (IEC 61851: 6 A ≈ 1380 W) — setpoint ∈ {0} [min, max]
ev_min_w = [
max(0.0, float(getattr(vehicles[e], "min_power_w", 0) or 0)) for e in range(EV)
]
nb_buffer_wh = [max(0.0, float(s.night_baseload_buffer_wh or 0.0)) for s in slots]
safety_risk = float(getattr(battery, "planner_safety_soc_risk_factor", 0.0) or 0.0)
safety_tgt_wh = [
@@ -222,6 +235,11 @@ def solve_dispatch_v2(
prob += bc_gi[t] <= gi[t], f"bcgi_src_{t}"
# vybíjení kryje dům + EV-via-bat + export z baterie
prob += ge_bat[t] + pulp.lpSum(ev_via_bat[e][t] for e in range(EV)) <= bd[t], f"bd_split_{t}"
# ev_direct fyzicky jen ze sítě + PV (ne z baterie) — split direct/via_bat
# není arbitrární, ekonomiku nemění (bilance platí stejně)
prob += (
pulp.lpSum(ev_direct[e][t] for e in range(EV)) <= gi[t] + pv_a_net + pv_b_eff
), f"evd_src_{t}"
# zákaz současného importu a exportu
prob += gi[t] <= max_imp * y_imp[t], f"imp_excl_{t}"
@@ -245,15 +263,20 @@ def solve_dispatch_v2(
if float(s.sell_price) < 0.0 and block_neg_sell:
prob += ge_pv[t] + ge_bat[t] == 0, f"neg_sell_block_{t}"
# EV dostupnost
# EV dostupnost + min. výkon wallboxu (binárka jen kde je min > 0)
for e in range(EV):
if not _connected(e, t):
prob += ev_direct[e][t] == 0
prob += ev_via_bat[e][t] == 0
else:
prob += ev_direct[e][t] + ev_via_bat[e][t] <= float(
vehicles[e].max_charge_power_w
)
ev_max_w = float(vehicles[e].max_charge_power_w)
ev_total = ev_direct[e][t] + ev_via_bat[e][t]
if 0 < ev_min_w[e] <= ev_max_w:
on = pulp.LpVariable(f"evon_{e}_{t}", cat=pulp.LpBinary)
prob += ev_total >= ev_min_w[e] * on, f"ev_min_{e}_{t}"
prob += ev_total <= ev_max_w * on, f"ev_max_{e}_{t}"
else:
prob += ev_total <= ev_max_w
# provozní režimy (tvrdé constraints dle operating-modes.md)
if om == "SELF_SUSTAIN":
@@ -266,28 +289,40 @@ def solve_dispatch_v2(
prob += ge_pv[t] + ge_bat[t] == 0
prob += bd[t] == 0
# EV deadline (s placeným slackem místo infeasibility)
# EV deadline (s placeným slackem místo infeasibility) + měkký cíl.
# Bez session není mandát nabíjet: připojené auto bez session (stop-session,
# golden fixtures s vynulovanými sessions) nesmí při buy<0 „pumpovat" energii.
for e in range(EV):
sess = ev_sessions[e] if e < len(ev_sessions) else None
if sess is None or not getattr(sess, "energy_needed_wh", 0):
if sess is None:
for t in range(T):
if _connected(e, t):
prob += ev_direct[e][t] == 0, f"ev_nosess_d_{e}_{t}"
prob += ev_via_bat[e][t] == 0, f"ev_nosess_b_{e}_{t}"
continue
t_dl = next(
(t for t in range(T) if slots[t].interval_start >= sess.target_deadline),
T - 1,
)
unmet = pulp.LpVariable(f"ev_unmet_{e}", 0, float(sess.energy_needed_wh))
needed = max(0.0, float(getattr(sess, "energy_needed_wh", 0.0) or 0.0))
unmet = pulp.LpVariable(f"ev_unmet_{e}", 0, needed)
ev_unmet.append(unmet)
prob += (
pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(t_dl + 1)
if _connected(e, t)
if needed > 0:
# první slot s interval_start >= deadline už do deadline NEPATŘÍ
# (slot [deadline, deadline+15min) dodává energii až po odjezdu)
t_dl = next(
(t for t in range(T) if slots[t].interval_start >= sess.target_deadline),
T,
)
+ unmet
>= float(sess.energy_needed_wh)
), f"ev_deadline_{e}"
prob += (
pulp.lpSum(
(ev_direct[e][t] + ev_via_bat[e][t]) * INTERVAL_H
for t in range(t_dl)
if _connected(e, t)
)
+ unmet
>= needed
), f"ev_deadline_{e}"
# měkký cíl: dekompozice celkové energie == needed unmet + opp
# měkký cíl: dekompozice celkové energie == needed unmet + opp.
# Oportunistická vrstva NENÍ omezená deadline — auto bývá doma dál,
# odjezd řeší rolling replan (rozhodnutí 2026-06-12).
headroom = max(0.0, float(getattr(sess, "headroom_wh", 0.0) or 0.0))
opp_val = float(getattr(sess, "opportunistic_value_czk_kwh", 0.0) or 0.0)
opp = pulp.LpVariable(f"ev_opp_{e}", 0, headroom if opp_val > 0 else 0.0)
@@ -298,7 +333,7 @@ def solve_dispatch_v2(
for t in range(T)
if _connected(e, t)
)
== float(sess.energy_needed_wh) - unmet + opp
== needed - unmet + opp
), f"ev_total_{e}"
# TUV look-ahead (převzato z v1 — komfortní constraint, ne heuristika)
@@ -384,9 +419,32 @@ def solve_dispatch_v2(
v = pulp.value(var)
return float(v) if v is not None else 0.0
# Reporting EV-via-bat: kWh do auta z baterie neplatí slotový buy (jdou
# z baterie), ale ušlou příležitost. Aproximace oportunitní ceny: nejnižší
# sell slotu, kde plán exportuje, v témže pražském dni; bez exportu ten den
# terminal value (Kč/kWh). Plní battery_arbitrage_czk (dřív konstantní 0).
day_min_export_sell: dict[Any, float] = {}
for t in range(T):
if _val(ge_pv[t]) + _val(ge_bat[t]) >= 1.0:
d_key = _prague_calendar_date(slots[t])
sp = float(slots[t].sell_price)
if d_key not in day_min_export_sell or sp < day_min_export_sell[d_key]:
day_min_export_sell[d_key] = sp
results: list[DispatchResult] = []
for t in range(T):
s = slots[t]
via1_w = _val(ev_via_bat[0][t]) if EV > 0 else 0.0
via2_w = _val(ev_via_bat[1][t]) if EV > 1 else 0.0
via_kwh = (via1_w + via2_w) * wh
if via_kwh > 1e-9:
opp_price = max(
0.0,
day_min_export_sell.get(_prague_calendar_date(s), terminal * 1000.0),
)
arb_czk = via_kwh * opp_price
else:
arb_czk = 0.0
bc_tot = _val(bc_pv[t]) + _val(bc_gi[t])
bd_v = _val(bd[t])
batt_w = round(bc_tot - bd_v)
@@ -434,8 +492,8 @@ def solve_dispatch_v2(
if EV > 1 and s.ev2_connected
else None
),
ev1_via_bat_w=round(_val(ev_via_bat[0][t])) if EV > 0 else 0,
ev2_via_bat_w=round(_val(ev_via_bat[1][t])) if EV > 1 else 0,
ev1_via_bat_w=round(via1_w),
ev2_via_bat_w=round(via2_w),
heat_pump_enabled=hp_on,
heat_pump_setpoint_w=int(rated_hp) if hp_on else 0,
pv_a_curtailed_w=round(_val(ca[t])),
@@ -444,7 +502,7 @@ def solve_dispatch_v2(
effective_sell_price=float(s.sell_price),
is_predicted_price=bool(s.is_predicted_price),
cashflow_czk=round(cash_t, 4),
battery_arbitrage_czk=0.0,
battery_arbitrage_czk=round(arb_czk, 4),
penalty_czk=round(pen_t, 4),
green_bonus_czk=float(getattr(s, "green_bonus_czk_per_slot", 0.0) or 0.0),
)
@@ -462,6 +520,7 @@ def solve_dispatch_v2(
"gen_cutoff_available": gen_cutoff_avail,
"slot_count": T,
"ev_sessions": sum(1 for x in ev_sessions if x is not None),
"ev_min_power_w": ev_min_w,
"masks_ignored": True,
"night_buffer_slots": sum(1 for b in nb_buffer_wh if b > 0),
"pv_risk_frontload_czk_kwh": frontload if neg_idx else 0.0,

View File

@@ -66,7 +66,16 @@ _VEHICLES = [
_BASE = datetime(2026, 6, 10, 0, 0, tzinfo=timezone.utc)
def _solve(slots, *, battery=None, grid=None, ev_sessions=(None, None), soc0=None, mode="AUTO"):
def _solve(
slots,
*,
battery=None,
grid=None,
ev_sessions=(None, None),
soc0=None,
mode="AUTO",
vehicles=None,
):
bat = battery or _battery()
return solve_dispatch_v2(
slots,
@@ -74,7 +83,7 @@ def _solve(slots, *, battery=None, grid=None, ev_sessions=(None, None), soc0=Non
_HP,
grid or _grid(),
list(ev_sessions),
_VEHICLES,
vehicles if vehicles is not None else _VEHICLES,
soc0 if soc0 is not None else 0.5 * bat.usable_capacity_wh,
50.0,
operating_mode=mode,
@@ -296,6 +305,144 @@ class EvOpportunisticTests(unittest.TestCase):
self.assertLessEqual(delivered, 3000.0 + 1.0)
class EvAccountingTests(unittest.TestCase):
"""EV účtování 2026-06-12: deadline boundary, stop-session, fyzikální split,
min. výkon wallboxu, opp po deadline, battery_arbitrage_czk reporting."""
def test_deadline_boundary_slot_excluded(self) -> None:
# slot začínající přesně v deadline (slot 4) už do deadline nepatří;
# levné sloty 4..7 nesmí krýt tvrdý cíl (dřív off-by-one t_dl+1)
slots = [
_slot(_BASE, i, buy=5.0 if i < 4 else 0.5, sell=0.2, ev1=True)
for i in range(8)
]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=1), # = start slotu 4
energy_needed_wh=4000.0,
headroom_wh=0.0,
opportunistic_value_czk_kwh=0.0,
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
before = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[:4])
after = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[4:])
self.assertGreaterEqual(before, 4000.0 - 1.0, "tvrdý cíl jen sloty PŘED deadline")
self.assertLessEqual(after, 1.0, "slot v deadline a dál nekryje tvrdý cíl")
self.assertEqual(snap["objective_terms"]["ev_unmet_wh"], [0.0])
def test_stop_session_zero_everywhere(self) -> None:
# needed 0 + opp 0 (stop-session) → EV nula i při záporných cenách
slots = [_slot(_BASE, i, buy=-2.0, sell=-1.0, ev1=True, load=300) for i in range(8)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=2),
energy_needed_wh=0.0,
headroom_wh=0.0,
opportunistic_value_czk_kwh=0.0,
)
results, _, _ = _solve(slots, ev_sessions=(session, None))
for r in results:
self.assertEqual(r.ev1_setpoint_w or 0, 0)
def test_no_session_zero_even_at_negative_buy(self) -> None:
# připojené auto BEZ session nemá mandát nabíjet (golden fixtures)
slots = [_slot(_BASE, i, buy=-2.0, sell=-1.0, ev1=True, load=300) for i in range(8)]
results, _, _ = _solve(slots, ev_sessions=(None, None))
for r in results:
self.assertEqual(r.ev1_setpoint_w or 0, 0)
def test_ev_direct_within_grid_plus_pv(self) -> None:
# fyzikální split: direct (= setpoint via_bat) nesmí překročit gi + PV
slots = [
_slot(_BASE, i, buy=2.0, sell=1.0, pv_a=(3000 if i < 4 else 0), ev1=True)
for i in range(12)
]
bat = _battery()
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=3),
energy_needed_wh=10000.0,
headroom_wh=0.0,
opportunistic_value_czk_kwh=0.0,
)
results, _, _ = _solve(
slots, battery=bat, soc0=0.9 * bat.usable_capacity_wh,
ev_sessions=(session, None),
)
for i, r in enumerate(results):
direct = (r.ev1_setpoint_w or 0) - r.ev1_via_bat_w
gi_w = max(0, r.grid_setpoint_w)
pv_w = slots[i].pv_a_forecast_w + slots[i].pv_b_forecast_w
self.assertLessEqual(direct, gi_w + pv_w + 2, f"slot {i}: direct > gi+pv")
def test_min_power_setpoints_zero_or_above_min(self) -> None:
# wallbox min 1380 W (6 A): setpoint ∈ {0} [1380, max] — žádné 400900 W
vehicles = [
SimpleNamespace(
max_charge_power_w=11_000, min_power_w=1380,
battery_capacity_kwh=60.0, default_target_soc_pct=80.0,
),
_VEHICLES[1],
]
# ceny nutí rozprostřít malé množství energie → bez binárky by vyšlo ~86 W/slot
slots = [_slot(_BASE, i, buy=2.0 + 0.01 * i, sell=1.0, ev1=True) for i in range(8)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=2),
energy_needed_wh=690.0, # 2 sloty × 1380 W × 0.25 h
headroom_wh=0.0,
opportunistic_value_czk_kwh=0.0,
)
results, _, _ = _solve(slots, ev_sessions=(session, None), vehicles=vehicles)
delivered = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertGreaterEqual(delivered, 690.0 - 1.0)
for i, r in enumerate(results):
sp = r.ev1_setpoint_w or 0
self.assertTrue(
sp == 0 or sp >= 1379,
f"slot {i}: setpoint {sp} W je pod minimem wallboxu",
)
def test_opportunistic_after_deadline_allowed(self) -> None:
# ROZHODNUTO 2026-06-12: opp vrstva NENÍ omezená deadline — záporné ceny
# po deadline smí téct do auta (odjezd řeší rolling replan)
slots = [
_slot(_BASE, i, buy=(3.0 if i < 4 else -1.5), sell=(1.0 if i < 4 else -0.5),
ev1=True, load=300)
for i in range(16)
]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=1), # slot 4
energy_needed_wh=2000.0,
headroom_wh=20000.0,
opportunistic_value_czk_kwh=1.0,
)
results, _, snap = _solve(slots, ev_sessions=(session, None))
after_deadline = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results[4:])
total = sum((r.ev1_setpoint_w or 0) * 0.25 for r in results)
self.assertGreater(after_deadline, 0.0, "opp po deadline musí zůstat povolené")
self.assertLessEqual(total, 2000.0 + 20000.0 + 1.0, "strop needed + headroom")
self.assertGreater(snap["objective_terms"]["ev_opp_wh"][0], 0.0)
def test_battery_arbitrage_reported_for_via_bat(self) -> None:
# EV kryté z baterie (noc, drahý buy, plná baterie) → via_bat > 0 a
# battery_arbitrage_czk nese oportunitní cenu (ne konstantní 0)
bat = _battery()
slots = [_slot(_BASE, i, buy=8.0, sell=1.0, ev1=True, load=300) for i in range(8)]
session = SimpleNamespace(
target_deadline=_BASE + timedelta(hours=2),
energy_needed_wh=6000.0,
headroom_wh=0.0,
opportunistic_value_czk_kwh=0.0,
)
results, _, _ = _solve(
slots, battery=bat, soc0=bat.soc_max_wh, ev_sessions=(session, None)
)
via = sum(r.ev1_via_bat_w for r in results)
self.assertGreater(via, 0, "drahý buy + plná baterie → EV z baterie")
arb = sum(r.battery_arbitrage_czk for r in results)
self.assertGreater(arb, 0.0, "via_bat sloty musí reportovat oportunitní Kč")
for r in results:
if r.ev1_via_bat_w == 0:
self.assertEqual(r.battery_arbitrage_czk, 0.0)
class EvDeadlineTests(unittest.TestCase):
def test_ev_energy_delivered_before_deadline(self) -> None:
slots = [_slot(_BASE, i, buy=2.0 if i < 8 else 6.0, sell=1.0, ev1=True) for i in range(16)]