feat(telemetry): idle-skip zápisů — neukládat 1min řádky idle zařízení
Some checks failed
CI and deploy / migration-check (push) Successful in 28s
CI and deploy / deploy (push) Failing after 17m56s

Slabý server: dict (tabulka, asset_id) → (signature, last_stored_at);
_idle_skip ukládá vždy při změně signature, aktivitě, po startu procesu
a heartbeat po > 840 s (každý 15min bucket má ≥ 1 řádek).

- telemetry_ev_charger: aktivní = status != 'available' nebo power > 50 W;
  signature (status, výkon na 100 W)
- telemetry_pool_pump: aktivní = is_on nebo power > 5 W (ON řádky 1/min
  kvůli on_minutes); signature (is_on, výkon na 10 W)
- telemetry_loxone_sensor: jen změna hodnoty ≥ 0.1 / heartbeat
- telemetry_heat_pump: aktivní = mode != 'off' nebo defrost; signature
  (mode, teploty na 0.2 °C)
- telemetry_inverter: beze změny — NIKDY se nepřeskakuje (audit Wh split,
  baseline, SoC plánovače)

Detekce příjezdu/odjezdu EV: previous_status přesunut z posledního řádku DB
do in-memory _EV_LAST_STATUS (po startu seed z vw_latest_ev_charger —
přechod během výpadku se pozná, prázdná DB nevystřelí falešný příjezd);
fn_ev_session_transition se volá jen při změně statusu.

PoolCard: staleness práh 5 → 16 min (> heartbeat 840 s).
Docs: telemetry.md sekce „Idle-skip zápisů" (pravidla pro nové čtecí dotazy:
sumy/gapfill, ne avg přes řádky), planning-changelog (TUV °C/min).
Testy: tests/test_telemetry_idle_skip.py — _idle_skip jednotkově + EV
arrival/departure přežije skip i restart procesu (303 passed).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-12 19:06:41 +02:00
parent f71bc944b4
commit 815a233049
5 changed files with 405 additions and 30 deletions

View File

@@ -21,6 +21,57 @@ logger = logging.getLogger(__name__)
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
_BG_POOL: asyncpg.Pool | None = None
# ---------------------------------------------------------------------------
# Idle-skip zápisů telemetrie (slabý server)
# ---------------------------------------------------------------------------
#: Heartbeat: max. rozestup uložených vzorků při idle. 840 s < 900 s → každý
#: 15min bucket má ≥1 řádek (latest view, TUV teplota, teplota bazénu).
IDLE_SKIP_MAX_GAP_S = 840.0
#: klíč (tabulka, asset_id) → (signature, last_stored_at epoch s)
_IDLE_SKIP_STATE: dict[tuple[str, int], tuple[object, float]] = {}
def _idle_skip(
key: tuple[str, int],
signature: object,
is_active: bool,
now: float,
max_gap_s: float = IDLE_SKIP_MAX_GAP_S,
) -> bool:
"""True = vzorek PŘESKOČIT (idle, signature beze změny, heartbeat neuplynul).
Ukládá se vždy když: klíč je po startu procesu neznámý; zařízení je aktivní;
signature se změnila; nebo od posledního uložení uplynulo > max_gap_s.
Čtecí dotazy nad takto řidšími tabulkami musí používat sumy / gapfill,
ne avg přes přítomné řádky — viz docs/04-modules/telemetry.md (Idle-skip).
Střídač (telemetry_inverter) se NIKDY nepřeskakuje.
"""
state = _IDLE_SKIP_STATE.get(key)
if (
state is None
or is_active
or signature != state[0]
or now - state[1] > max_gap_s
):
_IDLE_SKIP_STATE[key] = (signature, now)
return False
return True
def _sig_round(value: float | None, step: float) -> float | None:
"""Kvantizace hodnoty do idle-skip signature (None propouští)."""
if value is None:
return None
return round(round(value / step) * step, 3)
#: In-memory poslední pozorovaný status EV konektoru (charger_id, connector_id).
#: Detekce příjezdu/odjezdu nesmí stát na posledním ŘÁDKU v DB — idle-skip řádky
#: ředí. Po startu procesu se seeduje z vw_latest_ev_charger (přechod během
#: výpadku backendu se pozná; prázdná DB → žádný falešný příjezd).
_EV_LAST_STATUS: dict[tuple[int, int], str] = {}
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
@@ -377,31 +428,43 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
code, frame["error_bits"], frame["warning_bits"],
)
previous_status = await db.fetchval(
"""
select status
from ems.telemetry_ev_charger
where charger_id = $1 and connector_id = $2
order by measured_at desc
limit 1
""",
charger_id,
connector_id,
)
state_key = (int(charger_id), connector_id)
previous_status = _EV_LAST_STATUS.get(state_key)
if previous_status is None:
# Start procesu: seed z posledního uloženého řádku (stejná sémantika
# jako dřívější čtení z telemetry_ev_charger; read přes view).
previous_status = await db.fetchval(
"""
select status from ems.vw_latest_ev_charger
where charger_id = $1 and connector_id = $2
""",
charger_id,
connector_id,
)
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
int(frame["power_w"]),
float(frame["session_energy_kwh"]),
float(frame["current_a"]),
)
power_w = int(frame["power_w"])
is_active = current_status != "available" or power_w > 50
signature = (current_status, round(power_w / 100.0) * 100)
if not _idle_skip(
("telemetry_ev_charger", int(charger_id)),
signature,
is_active,
measured_at.timestamp(),
):
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
power_w,
float(frame["session_energy_kwh"]),
float(frame["current_a"]),
)
if previous_status is not None:
_EV_LAST_STATUS[state_key] = current_status
if previous_status is not None and str(previous_status) != current_status:
await db.fetchval(
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
site_id,
@@ -496,6 +559,30 @@ async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
)
continue
water_out_c = _mim_temp_c(iu[MIM_OFF_WATER_OUT])
dhw_temp_c = _mim_temp_c(iu[MIM_OFF_DHW_TEMP])
water_in_c = _mim_temp_c(iu[MIM_OFF_WATER_IN])
room_temp_c = _mim_temp_c(iu[MIM_OFF_ROOM_TEMP])
defrost = bool(defrost_raw)
# Idle-skip: TČ vypnuté a teploty (na 0.2 °C) beze změny → heartbeat;
# pomalý drift teplot zachytí heartbeat 840 s (TUV delta se normalizuje
# per minutu ve fn_update_tuv_usage_stats).
is_active = defrost or mode_txt not in ("off",)
signature = (
mode_txt,
_sig_round(water_out_c, 0.2),
_sig_round(dhw_temp_c, 0.2),
_sig_round(water_in_c, 0.2),
_sig_round(room_temp_c, 0.2),
)
if _idle_skip(
("telemetry_heat_pump", int(row["id"])),
signature,
is_active,
measured_at.timestamp(),
):
continue
await db.execute(
"select ems.fn_telemetry_heat_pump_sample("
"$1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8,"
@@ -505,12 +592,12 @@ async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
measured_at,
None, # příkon: MIM neměří — doplní elektroměr (Shelly/Chint)
None, # venkovní teplota: v MIM mapě není
_mim_temp_c(iu[MIM_OFF_WATER_OUT]),
_mim_temp_c(iu[MIM_OFF_DHW_TEMP]),
water_out_c,
dhw_temp_c,
mode_txt,
_mim_temp_c(iu[MIM_OFF_WATER_IN]),
_mim_temp_c(iu[MIM_OFF_ROOM_TEMP]),
bool(defrost_raw),
water_in_c,
room_temp_c,
defrost,
error_code,
)
if error_code:
@@ -561,6 +648,14 @@ async def poll_loxone_sensors(site_id: int, db: asyncpg.Connection) -> None:
except Exception as e:
logger.warning("Loxone sensor %s read failed: %s", r["loxone_name"], e)
continue
# Idle-skip: čidlo nemá „aktivitu" — ukládá se změna ≥ 0.1 / heartbeat.
if _idle_skip(
("telemetry_loxone_sensor", int(r["id"])),
round(value, 1),
False,
measured_at.timestamp(),
):
continue
await db.execute(
"""
insert into ems.telemetry_loxone_sensor (sensor_id, measured_at, value)
@@ -786,6 +881,23 @@ async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
continue
# Idle-skip: zapnuté čerpadlo (is_on) se ukládá KAŽDOU minutu —
# vw_pool_pump_day_energy.on_minutes počítá ON řádky; vypnuté jen
# při změně / heartbeatu.
power_w = status.apower_w
is_active = bool(status.output) or (power_w is not None and power_w > 5)
signature = (
bool(status.output),
None if power_w is None else int(round(power_w / 10.0)) * 10,
)
if _idle_skip(
("telemetry_pool_pump", int(row["id"])),
signature,
is_active,
measured_at.timestamp(),
):
continue
await db.execute(
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
site_id,