"""Idle-skip zápisů telemetrie: _idle_skip + detekce příjezdu/odjezdu EV přes skip.""" from __future__ import annotations import asyncio import unittest from unittest.mock import AsyncMock, patch import services.telemetry_collector as tc from services.telemetry_collector import ( IDLE_SKIP_MAX_GAP_S, TELTO_REG_BLOCK_COUNT, _idle_skip, _sig_round, ) class IdleSkipTests(unittest.TestCase): KEY = ("telemetry_test", 1) def setUp(self) -> None: tc._IDLE_SKIP_STATE.clear() def test_first_sample_after_start_is_stored(self) -> None: self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0)) def test_unchanged_idle_sample_is_skipped(self) -> None: self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0)) self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1060.0)) self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1120.0)) def test_signature_change_is_stored(self) -> None: self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0)) self.assertFalse(_idle_skip(self.KEY, ("a", 100), False, 1060.0)) def test_active_device_is_always_stored(self) -> None: self.assertFalse(_idle_skip(self.KEY, ("a", 0), True, 1000.0)) self.assertFalse(_idle_skip(self.KEY, ("a", 0), True, 1060.0)) def test_heartbeat_after_max_gap(self) -> None: self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0)) # přesně na hranici se ještě přeskakuje (> max_gap_s, ne >=) self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S)) self.assertFalse( _idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S + 1.0) ) # heartbeat resetuje last_stored_at → další idle vzorek se zase přeskočí self.assertTrue( _idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S + 61.0) ) def test_keys_are_independent(self) -> None: other = ("telemetry_test", 2) self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0)) self.assertFalse(_idle_skip(other, ("a", 0), False, 1060.0)) self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1060.0)) def test_sig_round(self) -> None: self.assertIsNone(_sig_round(None, 0.2)) self.assertEqual(_sig_round(47.31, 0.2), 47.4) self.assertEqual(_sig_round(47.29, 0.2), 47.2) self.assertEqual(_sig_round(-3.1, 0.2), -3.2) def _frame_regs(status_raw: int, power_w: int = 0) -> list[int]: regs = [0] * TELTO_REG_BLOCK_COUNT regs[0] = 230 regs[6] = status_raw # 7 = available, 0 = charging regs[38] = power_w return regs class _FakeBatch: def __init__(self, regs: list[int]) -> None: self._regs = regs async def __aenter__(self) -> "_FakeBatch": return self async def __aexit__(self, *args: object) -> bool: return False async def read_holding_registers(self, start: int, count: int) -> list[int]: return self._regs class _FakeModbusClient: def __init__(self) -> None: self.regs: list[int] = _frame_regs(7) def batch(self, unit_id: int) -> _FakeBatch: return _FakeBatch(self.regs) class _FakeDB: """Min. asyncpg.Connection náhrada pro poll_ev_chargers.""" def __init__(self, latest_status: str | None) -> None: self.latest_status = latest_status self.inserts: list[tuple] = [] self.transitions: list[tuple[str, str]] = [] async def fetch(self, query: str, *args: object) -> list[dict]: return [{"id": 7, "code": "ev-charger-1", "host": "h", "port": 502, "unit_id": 1}] async def fetchval(self, query: str, *args: object): if "vw_latest_ev_charger" in query: return self.latest_status if "fn_ev_session_transition" in query: self.transitions.append((str(args[2]), str(args[3]))) return None raise AssertionError(f"unexpected fetchval: {query}") async def execute(self, query: str, *args: object) -> None: assert "fn_telemetry_ev_charger_sample" in query self.inserts.append(args) class EvArrivalSurvivesIdleSkipTests(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: tc._IDLE_SKIP_STATE.clear() tc._EV_LAST_STATUS.clear() async def _poll(self, db: _FakeDB, client: _FakeModbusClient) -> None: with ( patch.object(tc, "get_modbus_client", AsyncMock(return_value=client)), patch.object(tc, "_on_ev_arrival", AsyncMock()) as arrival, patch.object(tc, "_on_ev_departure", AsyncMock()) as departure, ): await tc.poll_ev_chargers(1, db) # type: ignore[arg-type] await asyncio.sleep(0) # nechat doběhnout create_task self.arrival_called = arrival.await_count > 0 self.departure_called = departure.await_count > 0 async def test_arrival_detected_after_skipped_idle_samples(self) -> None: db = _FakeDB(latest_status=None) client = _FakeModbusClient() # 1. tick po startu: available → uloží se (prázdný stav), žádný příjezd await self._poll(db, client) self.assertEqual(len(db.inserts), 1) self.assertFalse(self.arrival_called) self.assertEqual(db.transitions, []) # 2.–3. tick: idle beze změny → řádky se přeskočí await self._poll(db, client) await self._poll(db, client) self.assertEqual(len(db.inserts), 1) # 4. tick: EV se připojí (charging) → insert + transition + arrival hook client.regs = _frame_regs(0, power_w=11000) await self._poll(db, client) self.assertEqual(len(db.inserts), 2) self.assertEqual(db.transitions, [("available", "charging")]) self.assertTrue(self.arrival_called) # 5. tick: nabíjí dál (aktivní) → ukládá se každou minutu, bez transition await self._poll(db, client) self.assertEqual(len(db.inserts), 3) self.assertEqual(len(db.transitions), 1) # 6. tick: odpojení → departure client.regs = _frame_regs(7) await self._poll(db, client) self.assertEqual(db.transitions[-1], ("charging", "available")) self.assertTrue(self.departure_called) async def test_no_false_arrival_after_restart(self) -> None: # restart procesu: in-memory stav prázdný, DB má poslední řádek 'charging', # nabíječka stále nabíjí → žádný falešný příjezd db = _FakeDB(latest_status="charging") client = _FakeModbusClient() client.regs = _frame_regs(0, power_w=11000) await self._poll(db, client) self.assertFalse(self.arrival_called) self.assertEqual(db.transitions, []) async def test_transition_across_restart_detected(self) -> None: # během výpadku backendu EV přijelo: DB 'available', teď 'charging' db = _FakeDB(latest_status="available") client = _FakeModbusClient() client.regs = _frame_regs(0, power_w=11000) await self._poll(db, client) self.assertEqual(db.transitions, [("available", "charging")]) self.assertTrue(self.arrival_called) class _FakeConn: async def execute(self, *args: object, **kwargs: object) -> None: return None async def fetchval(self, *args: object, **kwargs: object) -> object: return None class _FakeAcquireCtx: def __init__(self, conn: _FakeConn) -> None: self._conn = conn async def __aenter__(self) -> _FakeConn: return self._conn async def __aexit__(self, *exc: object) -> bool: return False class _FakePool: def __init__(self) -> None: self.conn = _FakeConn() def acquire(self) -> _FakeAcquireCtx: return _FakeAcquireCtx(self.conn) class EvDepartureTriggersReplanTests(unittest.IsolatedAsyncioTestCase): """Odjezd EV musí okamžitě přeplánovat (ne čekat na */15) — symetrie k příjezdu.""" async def test_departure_triggers_replan_and_export(self) -> None: import app.db_json as dbj import services.control_exporter as ce import services.planning_engine as pe replan = AsyncMock() export = AsyncMock() # OBS část: non-tesla ctx → krátí se před voláním Tesla API. fake_fetch = AsyncMock(return_value={"api_type": "loxone"}) with ( patch.object(tc, "_BG_POOL", _FakePool()), patch.object(pe, "run_rolling_replan", replan), patch.object(ce, "export_setpoints", export), patch.object(dbj, "fetch_json", fake_fetch), ): await tc._on_ev_departure(2, "vt-ev-charger-1") replan.assert_awaited_once() _, kwargs = replan.await_args self.assertEqual(kwargs.get("triggered_by"), "ev_departure:vt-ev-charger-1") export.assert_awaited_once() async def test_departure_replan_failure_does_not_block_obs(self) -> None: # Replan spadne → OBS část (jiný conn/try) musí proběhnout dál bez výjimky. import app.db_json as dbj import services.control_exporter as ce import services.planning_engine as pe replan = AsyncMock(side_effect=RuntimeError("solver down")) export = AsyncMock() fake_fetch = AsyncMock(return_value={"api_type": "loxone"}) with ( patch.object(tc, "_BG_POOL", _FakePool()), patch.object(pe, "run_rolling_replan", replan), patch.object(ce, "export_setpoints", export), patch.object(dbj, "fetch_json", fake_fetch), ): await tc._on_ev_departure(2, "vt-ev-charger-1") # nesmí vyhodit replan.assert_awaited_once() export.assert_not_awaited() # export se po pádu replanu nevolá fake_fetch.assert_awaited() # OBS část přesto běžela if __name__ == "__main__": unittest.main()