Slabý server: dict (tabulka, asset_id) → (signature, last_stored_at); _idle_skip ukládá vždy při změně signature, aktivitě, po startu procesu a heartbeat po > 840 s (každý 15min bucket má ≥ 1 řádek). - telemetry_ev_charger: aktivní = status != 'available' nebo power > 50 W; signature (status, výkon na 100 W) - telemetry_pool_pump: aktivní = is_on nebo power > 5 W (ON řádky 1/min kvůli on_minutes); signature (is_on, výkon na 10 W) - telemetry_loxone_sensor: jen změna hodnoty ≥ 0.1 / heartbeat - telemetry_heat_pump: aktivní = mode != 'off' nebo defrost; signature (mode, teploty na 0.2 °C) - telemetry_inverter: beze změny — NIKDY se nepřeskakuje (audit Wh split, baseline, SoC plánovače) Detekce příjezdu/odjezdu EV: previous_status přesunut z posledního řádku DB do in-memory _EV_LAST_STATUS (po startu seed z vw_latest_ev_charger — přechod během výpadku se pozná, prázdná DB nevystřelí falešný příjezd); fn_ev_session_transition se volá jen při změně statusu. PoolCard: staleness práh 5 → 16 min (> heartbeat 840 s). Docs: telemetry.md sekce „Idle-skip zápisů" (pravidla pro nové čtecí dotazy: sumy/gapfill, ne avg přes řádky), planning-changelog (TUV °C/min). Testy: tests/test_telemetry_idle_skip.py — _idle_skip jednotkově + EV arrival/departure přežije skip i restart procesu (303 passed). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
190 lines
7.2 KiB
Python
190 lines
7.2 KiB
Python
"""Idle-skip zápisů telemetrie: _idle_skip + detekce příjezdu/odjezdu EV přes skip."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import unittest
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
import services.telemetry_collector as tc
|
||
from services.telemetry_collector import (
|
||
IDLE_SKIP_MAX_GAP_S,
|
||
TELTO_REG_BLOCK_COUNT,
|
||
_idle_skip,
|
||
_sig_round,
|
||
)
|
||
|
||
|
||
class IdleSkipTests(unittest.TestCase):
|
||
KEY = ("telemetry_test", 1)
|
||
|
||
def setUp(self) -> None:
|
||
tc._IDLE_SKIP_STATE.clear()
|
||
|
||
def test_first_sample_after_start_is_stored(self) -> None:
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0))
|
||
|
||
def test_unchanged_idle_sample_is_skipped(self) -> None:
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0))
|
||
self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1060.0))
|
||
self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1120.0))
|
||
|
||
def test_signature_change_is_stored(self) -> None:
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0))
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 100), False, 1060.0))
|
||
|
||
def test_active_device_is_always_stored(self) -> None:
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), True, 1000.0))
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), True, 1060.0))
|
||
|
||
def test_heartbeat_after_max_gap(self) -> None:
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0))
|
||
# přesně na hranici se ještě přeskakuje (> max_gap_s, ne >=)
|
||
self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S))
|
||
self.assertFalse(
|
||
_idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S + 1.0)
|
||
)
|
||
# heartbeat resetuje last_stored_at → další idle vzorek se zase přeskočí
|
||
self.assertTrue(
|
||
_idle_skip(self.KEY, ("a", 0), False, 1000.0 + IDLE_SKIP_MAX_GAP_S + 61.0)
|
||
)
|
||
|
||
def test_keys_are_independent(self) -> None:
|
||
other = ("telemetry_test", 2)
|
||
self.assertFalse(_idle_skip(self.KEY, ("a", 0), False, 1000.0))
|
||
self.assertFalse(_idle_skip(other, ("a", 0), False, 1060.0))
|
||
self.assertTrue(_idle_skip(self.KEY, ("a", 0), False, 1060.0))
|
||
|
||
def test_sig_round(self) -> None:
|
||
self.assertIsNone(_sig_round(None, 0.2))
|
||
self.assertEqual(_sig_round(47.31, 0.2), 47.4)
|
||
self.assertEqual(_sig_round(47.29, 0.2), 47.2)
|
||
self.assertEqual(_sig_round(-3.1, 0.2), -3.2)
|
||
|
||
|
||
def _frame_regs(status_raw: int, power_w: int = 0) -> list[int]:
|
||
regs = [0] * TELTO_REG_BLOCK_COUNT
|
||
regs[0] = 230
|
||
regs[6] = status_raw # 7 = available, 0 = charging
|
||
regs[38] = power_w
|
||
return regs
|
||
|
||
|
||
class _FakeBatch:
|
||
def __init__(self, regs: list[int]) -> None:
|
||
self._regs = regs
|
||
|
||
async def __aenter__(self) -> "_FakeBatch":
|
||
return self
|
||
|
||
async def __aexit__(self, *args: object) -> bool:
|
||
return False
|
||
|
||
async def read_holding_registers(self, start: int, count: int) -> list[int]:
|
||
return self._regs
|
||
|
||
|
||
class _FakeModbusClient:
|
||
def __init__(self) -> None:
|
||
self.regs: list[int] = _frame_regs(7)
|
||
|
||
def batch(self, unit_id: int) -> _FakeBatch:
|
||
return _FakeBatch(self.regs)
|
||
|
||
|
||
class _FakeDB:
|
||
"""Min. asyncpg.Connection náhrada pro poll_ev_chargers."""
|
||
|
||
def __init__(self, latest_status: str | None) -> None:
|
||
self.latest_status = latest_status
|
||
self.inserts: list[tuple] = []
|
||
self.transitions: list[tuple[str, str]] = []
|
||
|
||
async def fetch(self, query: str, *args: object) -> list[dict]:
|
||
return [{"id": 7, "code": "ev-charger-1", "host": "h", "port": 502, "unit_id": 1}]
|
||
|
||
async def fetchval(self, query: str, *args: object):
|
||
if "vw_latest_ev_charger" in query:
|
||
return self.latest_status
|
||
if "fn_ev_session_transition" in query:
|
||
self.transitions.append((str(args[2]), str(args[3])))
|
||
return None
|
||
raise AssertionError(f"unexpected fetchval: {query}")
|
||
|
||
async def execute(self, query: str, *args: object) -> None:
|
||
assert "fn_telemetry_ev_charger_sample" in query
|
||
self.inserts.append(args)
|
||
|
||
|
||
class EvArrivalSurvivesIdleSkipTests(unittest.IsolatedAsyncioTestCase):
|
||
def setUp(self) -> None:
|
||
tc._IDLE_SKIP_STATE.clear()
|
||
tc._EV_LAST_STATUS.clear()
|
||
|
||
async def _poll(self, db: _FakeDB, client: _FakeModbusClient) -> None:
|
||
with (
|
||
patch.object(tc, "get_modbus_client", AsyncMock(return_value=client)),
|
||
patch.object(tc, "_on_ev_arrival", AsyncMock()) as arrival,
|
||
patch.object(tc, "_on_ev_departure", AsyncMock()) as departure,
|
||
):
|
||
await tc.poll_ev_chargers(1, db) # type: ignore[arg-type]
|
||
await asyncio.sleep(0) # nechat doběhnout create_task
|
||
self.arrival_called = arrival.await_count > 0
|
||
self.departure_called = departure.await_count > 0
|
||
|
||
async def test_arrival_detected_after_skipped_idle_samples(self) -> None:
|
||
db = _FakeDB(latest_status=None)
|
||
client = _FakeModbusClient()
|
||
|
||
# 1. tick po startu: available → uloží se (prázdný stav), žádný příjezd
|
||
await self._poll(db, client)
|
||
self.assertEqual(len(db.inserts), 1)
|
||
self.assertFalse(self.arrival_called)
|
||
self.assertEqual(db.transitions, [])
|
||
|
||
# 2.–3. tick: idle beze změny → řádky se přeskočí
|
||
await self._poll(db, client)
|
||
await self._poll(db, client)
|
||
self.assertEqual(len(db.inserts), 1)
|
||
|
||
# 4. tick: EV se připojí (charging) → insert + transition + arrival hook
|
||
client.regs = _frame_regs(0, power_w=11000)
|
||
await self._poll(db, client)
|
||
self.assertEqual(len(db.inserts), 2)
|
||
self.assertEqual(db.transitions, [("available", "charging")])
|
||
self.assertTrue(self.arrival_called)
|
||
|
||
# 5. tick: nabíjí dál (aktivní) → ukládá se každou minutu, bez transition
|
||
await self._poll(db, client)
|
||
self.assertEqual(len(db.inserts), 3)
|
||
self.assertEqual(len(db.transitions), 1)
|
||
|
||
# 6. tick: odpojení → departure
|
||
client.regs = _frame_regs(7)
|
||
await self._poll(db, client)
|
||
self.assertEqual(db.transitions[-1], ("charging", "available"))
|
||
self.assertTrue(self.departure_called)
|
||
|
||
async def test_no_false_arrival_after_restart(self) -> None:
|
||
# restart procesu: in-memory stav prázdný, DB má poslední řádek 'charging',
|
||
# nabíječka stále nabíjí → žádný falešný příjezd
|
||
db = _FakeDB(latest_status="charging")
|
||
client = _FakeModbusClient()
|
||
client.regs = _frame_regs(0, power_w=11000)
|
||
await self._poll(db, client)
|
||
self.assertFalse(self.arrival_called)
|
||
self.assertEqual(db.transitions, [])
|
||
|
||
async def test_transition_across_restart_detected(self) -> None:
|
||
# během výpadku backendu EV přijelo: DB 'available', teď 'charging'
|
||
db = _FakeDB(latest_status="available")
|
||
client = _FakeModbusClient()
|
||
client.regs = _frame_regs(0, power_w=11000)
|
||
await self._poll(db, client)
|
||
self.assertEqual(db.transitions, [("available", "charging")])
|
||
self.assertTrue(self.arrival_called)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|