feat(ev): event-driven replan i na odjezd EV (ne jen příjezd)
Odjezd auta (≠available → available) teď spouští okamžitý rolling replan + export, symetricky k příjezdu — místo čekání na */15 tick. Řeší stale plán spočítaný těsně před odjezdem, který držel fantomovou EV alokaci (~4–11 kW do už odjetého auta). Session už zavřela fn_ev_session_transition synchronně v poll smyčce, takže replan vidí 'žádná session' a alokaci shodí. Replan i pozorování jízdy každý ve vlastním try+conn (pád solveru ani spící auto se navzájem neshodí). +2 regresní testy, +docs (changelog, ev-charging). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -185,5 +185,79 @@ class EvArrivalSurvivesIdleSkipTests(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertTrue(self.arrival_called)
|
||||
|
||||
|
||||
class _FakeConn:
|
||||
async def execute(self, *args: object, **kwargs: object) -> None:
|
||||
return None
|
||||
|
||||
async def fetchval(self, *args: object, **kwargs: object) -> object:
|
||||
return None
|
||||
|
||||
|
||||
class _FakeAcquireCtx:
|
||||
def __init__(self, conn: _FakeConn) -> None:
|
||||
self._conn = conn
|
||||
|
||||
async def __aenter__(self) -> _FakeConn:
|
||||
return self._conn
|
||||
|
||||
async def __aexit__(self, *exc: object) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
class _FakePool:
|
||||
def __init__(self) -> None:
|
||||
self.conn = _FakeConn()
|
||||
|
||||
def acquire(self) -> _FakeAcquireCtx:
|
||||
return _FakeAcquireCtx(self.conn)
|
||||
|
||||
|
||||
class EvDepartureTriggersReplanTests(unittest.IsolatedAsyncioTestCase):
|
||||
"""Odjezd EV musí okamžitě přeplánovat (ne čekat na */15) — symetrie k příjezdu."""
|
||||
|
||||
async def test_departure_triggers_replan_and_export(self) -> None:
|
||||
import app.db_json as dbj
|
||||
import services.control_exporter as ce
|
||||
import services.planning_engine as pe
|
||||
|
||||
replan = AsyncMock()
|
||||
export = AsyncMock()
|
||||
# OBS část: non-tesla ctx → krátí se před voláním Tesla API.
|
||||
fake_fetch = AsyncMock(return_value={"api_type": "loxone"})
|
||||
with (
|
||||
patch.object(tc, "_BG_POOL", _FakePool()),
|
||||
patch.object(pe, "run_rolling_replan", replan),
|
||||
patch.object(ce, "export_setpoints", export),
|
||||
patch.object(dbj, "fetch_json", fake_fetch),
|
||||
):
|
||||
await tc._on_ev_departure(2, "vt-ev-charger-1")
|
||||
|
||||
replan.assert_awaited_once()
|
||||
_, kwargs = replan.await_args
|
||||
self.assertEqual(kwargs.get("triggered_by"), "ev_departure:vt-ev-charger-1")
|
||||
export.assert_awaited_once()
|
||||
|
||||
async def test_departure_replan_failure_does_not_block_obs(self) -> None:
|
||||
# Replan spadne → OBS část (jiný conn/try) musí proběhnout dál bez výjimky.
|
||||
import app.db_json as dbj
|
||||
import services.control_exporter as ce
|
||||
import services.planning_engine as pe
|
||||
|
||||
replan = AsyncMock(side_effect=RuntimeError("solver down"))
|
||||
export = AsyncMock()
|
||||
fake_fetch = AsyncMock(return_value={"api_type": "loxone"})
|
||||
with (
|
||||
patch.object(tc, "_BG_POOL", _FakePool()),
|
||||
patch.object(pe, "run_rolling_replan", replan),
|
||||
patch.object(ce, "export_setpoints", export),
|
||||
patch.object(dbj, "fetch_json", fake_fetch),
|
||||
):
|
||||
await tc._on_ev_departure(2, "vt-ev-charger-1") # nesmí vyhodit
|
||||
|
||||
replan.assert_awaited_once()
|
||||
export.assert_not_awaited() # export se po pádu replanu nevolá
|
||||
fake_fetch.assert_awaited() # OBS část přesto běžela
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user