Files
ems/backend/services/telemetry_collector.py
Dusan Vojacek 2122fa2035
All checks were successful
CI and deploy / migration-check (push) Successful in 47s
CI and deploy / deploy (push) Has been skipped
Tesla presence watcher: geofence, ev_presence_obs, 'píchni auto' pobídka
- V095 ems.ev_presence_obs (state/at_home/distance/charging/shift per ~5 min)
- tesla_client: get_vehicle_api_state (jen /vehicles — nebudí), haversine_m
- collector poll_tesla_presence: online → poloha → geofence 150 m vs GPS site;
  přechod pryč→doma + Disconnected → Discord pobídka s aktuálním přebytkem
  (cooldown 2 h); vše logováno pro budoucí dostupnostní statistiku
- 6 testů (haversine, přechody); docs: zákopy reauth procesu (6 bodů)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 14:14:48 +02:00

696 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import asyncpg
from app.ws_manager import manager
from zoneinfo import ZoneInfo
from services.modbus_client import get_modbus_client
_PRAGUE_TZ_NOTIFY = ZoneInfo("Europe/Prague")
logger = logging.getLogger(__name__)
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
_BG_POOL: asyncpg.Pool | None = None
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
Deferred importy: planning_engine/control_exporter importují control balík,
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
"""
if _BG_POOL is None:
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
return
try:
from services.control_exporter import export_setpoints
from services.planning_engine import run_rolling_replan
async with _BG_POOL.acquire() as conn:
# Tesla: skutečné SoC do session PŘED replanem (selhání nesmí blokovat plán)
try:
await _patch_session_from_tesla(site_id, charger_code, conn)
except Exception:
logger.exception(
"Tesla SoC fetch failed (site=%s, %s) — replan jede na defaultech",
site_id, charger_code,
)
await run_rolling_replan(
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
)
await export_setpoints(site_id, conn)
try:
from services.ev_notify import send_ev_arrival
await send_ev_arrival(site_id, charger_code, conn)
except Exception:
logger.exception("EV arrival Discord notify failed (%s)", charger_code)
logger.info(
"EV arrival replan+export done (site=%s, charger=%s)",
site_id,
charger_code,
)
except Exception:
logger.exception(
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
)
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
"""
if _BG_POOL is None:
return
try:
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
async with _BG_POOL.acquire() as conn:
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
logger.info(
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
site_id, charger_code,
state["battery_level"], state.get("odometer_km"),
)
except Exception:
logger.exception(
"EV departure obs failed (site=%s, %s)", site_id, charger_code
)
async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:
"""Po příjezdu: pro vozidlo s api_type='tesla' doplnit reálné SoC do session.
Energie se NEpočítá tady — soc_at_connect_pct + target_soc_pct si přebere
fn_planning_site_context (SQL-first). VIN se při prvním úspěchu naučí.
"""
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
session_id = ctx.get("session_id")
if session_id is None:
logger.warning("Tesla hook: chybí otevřená session (%s)", charger_code)
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
if not ctx.get("vin") and state.get("vin"):
await conn.execute(
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
int(ctx["vehicle_id"]),
str(state["vin"]),
)
patch: dict = {"soc_at_connect_pct": state["battery_level"]}
if state.get("charge_limit_soc"):
patch["target_soc_pct"] = state["charge_limit_soc"]
import json as _json
await fetch_json(
conn,
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
site_id,
int(session_id),
_json.dumps(patch),
)
logger.info(
"Tesla SoC -> session %s: level=%s%%, limit=%s%% (%s)",
session_id,
state["battery_level"],
state.get("charge_limit_soc"),
charger_code,
)
# Deye SUN holding registry (decimal adresa = přímo pro read_holding_registers)
DEYE_REG_RUN_STATE = 500
DEYE_REG_BATT_CHARGE_TODAY = 514
DEYE_REG_BATT_DISCHARGE_TODAY = 515
DEYE_REG_BATTERY_SOC = 588
DEYE_REG_BATTERY_POWER_FLOW = 590
DEYE_REG_GRID_TOTAL_POWER = 625
DEYE_REG_GEN_PORT_POWER = 667
DEYE_REG_LOAD_TOTAL_POWER = 653
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
DEYE_REG_PV1_POWER = 672
DEYE_REG_PV2_POWER = 673
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits01 == 3 → cut-off ON).
# Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based).
# Viz modbus-registers.md.
DEYE_REG_SOLAR_SELL = 145
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
# Teltonika TeltoCharge holding registry (oficiální „Modbus RTU Communication
# protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení).
# Blok 040: 02 napětí L1L3 (V), 35 proud L1L3 (×10 A), 6 EVSE status (DLM),
# 27 charge point state, 33 IEC61851, 34/35 warning/error bity,
# 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s).
# Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 632), 16 start/stop.
TELTO_REG_BLOCK_START = 0
TELTO_REG_BLOCK_COUNT = 41
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
TELTO_STATUS_MAP = {
0: "charging", # C nabíjí
1: "preparing", # B1 EV připojeno, čeká na EV
2: "preparing", # B2 dříve nabíjelo, nedostatek výkonu
3: "preparing", # B3 nenabíjelo, nedostatek výkonu
4: "suspended_ev", # D1 zastaveno vozidlem
5: "suspended_evse", # D2 bez autorizace
6: "suspended_evse", # D3 nabíjení nepovoleno
7: "available", # A bez EV
8: "faulted", # F chyba
9: "unknown", # E
}
def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]:
"""Čistý parser bloku registrů 040 TeltoCharge (testovatelné bez Modbus)."""
if len(regs) < TELTO_REG_BLOCK_COUNT:
raise ValueError(f"TeltoCharge frame too short: {len(regs)}")
status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown")
current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0
return {
"status": status,
"power_w": int(regs[38]),
"session_energy_kwh": int(regs[39]) / 100.0,
"current_a": current_a,
"voltage_v": int(regs[0]),
"warning_bits": int(regs[34]),
"error_bits": int(regs[35]),
"evse_status_raw": int(regs[6]),
"charge_point_state_raw": int(regs[27]),
}
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
"""
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
"""
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
if reg145 is None and reg179 is None:
return None, None
flags = 0
if reg145 is not None and int(reg145) == 0:
flags |= 1
if reg179 is not None and (int(reg179) & 3) == 3:
flags |= 2
return (flags != 0), flags
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select inverter_id as id, code, host, port, unit_id
from ems.vw_asset_inverter_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
inv_id = row["id"]
code = row["code"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
grid_energy_regs = await mb.read_holding_registers(
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
)
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
await db.execute(
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
site_id,
inv_id,
measured_at,
pv_power_w,
pv1_power,
pv2_power,
gen_port_power,
float(battery_soc),
battery_power,
batt_charge_today,
batt_discharge_today,
grid_power,
load_power,
grid_import_total_wh,
grid_export_total_wh,
run_state,
is_export_limited,
pv_derating_flags,
)
inv_temp: float | None = None
await manager.broadcast_telemetry(
{
"type": "telemetry",
"site_id": site_id,
"ts": datetime.now(timezone.utc).isoformat(),
"pv_power_w": pv_power_w,
"battery_soc_pct": float(battery_soc),
"battery_power_w": battery_power,
"grid_power_w": grid_power,
"load_power_w": load_power,
"gen_port_power_w": gen_port_power,
"inverter_temp_c": inv_temp,
"is_export_limited": is_export_limited,
"pv_derating_flags": pv_derating_flags,
}
)
except Exception as e:
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select charger_id as id, code, host, port, unit_id
from ems.vw_asset_ev_charger_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
connector_id = 1
for row in rows:
code = row["code"]
charger_id = row["id"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
regs = await mb.read_holding_registers(
TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT
)
frame = parse_teltocharge_frame(regs)
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
# falešně ukončoval EV session a špinil bazál (power 0).
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
continue
current_status = str(frame["status"])
if frame["error_bits"]:
logger.warning(
"EV charger %s error bits=0x%04x warning=0x%04x",
code, frame["error_bits"], frame["warning_bits"],
)
previous_status = await db.fetchval(
"""
select status
from ems.telemetry_ev_charger
where charger_id = $1 and connector_id = $2
order by measured_at desc
limit 1
""",
charger_id,
connector_id,
)
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
int(frame["power_w"]),
float(frame["session_energy_kwh"]),
float(frame["current_a"]),
)
if previous_status is not None:
await db.fetchval(
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
site_id,
charger_id,
str(previous_status),
current_status,
measured_at,
)
if previous_status == "available" and current_status != "available":
logger.info("EV arrival detected on charger %s", code)
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code)
asyncio.create_task(_on_ev_departure(site_id, str(code)))
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select heat_pump_id as id, code, host, port, unit_id
from ems.vw_asset_heat_pump_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
await db.execute(
"select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)",
site_id,
row["id"],
measured_at,
0,
10.0,
45.0,
55.0,
"standby",
)
async def poll_loxone_sensors(site_id: int, db: asyncpg.Connection) -> None:
"""Čidla z Loxone (teplota bazénu, akumulační nádrže…): GET /jdev/sps/io/<name>/state.
Endpoint = site loxone_http; auth LOXONE_USER/PASSWORD (env). Hodnota
z LL.value ("23.5°" → 23.5). Bez čidel v ems.loxone_sensor no-op.
"""
rows = await db.fetch(
"""
select ls.id, ls.loxone_name, se.host, se.port, se.protocol
from ems.loxone_sensor ls
join ems.site_endpoint se
on se.site_id = ls.site_id and se.endpoint_type = 'loxone_http' and se.enabled
where ls.site_id = $1 and ls.enabled
""",
site_id,
)
if not rows:
return
import os
import re as _re
import httpx
auth = None
user = os.getenv("LOXONE_USER") or ""
if user:
auth = (user, os.getenv("LOXONE_PASSWORD") or "")
measured_at = datetime.now(timezone.utc)
async with httpx.AsyncClient(timeout=5.0, auth=auth) as client:
for r in rows:
proto = (r["protocol"] or "http").lower()
port = int(r["port"] or (443 if proto == "https" else 80))
url = f"{proto}://{r['host']}:{port}/jdev/sps/io/{r['loxone_name']}/state"
try:
resp = await client.get(url)
resp.raise_for_status()
raw = str((resp.json().get("LL") or {}).get("value", ""))
m = _re.search(r"-?\d+(?:[.,]\d+)?", raw)
if m is None:
continue
value = float(m.group(0).replace(",", "."))
except Exception as e:
logger.warning("Loxone sensor %s read failed: %s", r["loxone_name"], e)
continue
await db.execute(
"""
insert into ems.telemetry_loxone_sensor (sensor_id, measured_at, value)
values ($1, $2, $3) on conflict do nothing
""",
int(r["id"]),
measured_at,
value,
)
#: presence poll pacing (sekundy) a geofence poloměr (m)
EV_PRESENCE_POLL_S = 300
EV_PRESENCE_HOME_RADIUS_M = 150
#: anti-spam "píchni auto": min. rozestup notifikací per vozidlo (s)
EV_PLUG_NUDGE_COOLDOWN_S = 2 * 3600
_EV_PRESENCE_LAST_POLL: dict[int, float] = {}
_EV_PLUG_NUDGE_LAST: dict[int, float] = {}
def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None) -> str | None:
"""Čistá detekce přechodu: 'arrived' / 'left' / None (testovatelné)."""
if new_at_home is None or prev_at_home is None:
return None
if not prev_at_home and new_at_home:
return "arrived"
if prev_at_home and not new_at_home:
return "left"
return None
async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
"""Přítomnost vozidla: /vehicles state (nebudí) + při online poloha → geofence.
Přechod pryč→doma + nepíchnuté → Discord pobídka (s aktuálním přebytkem).
Vše se loguje do ev_presence_obs (budoucí dostupnostní statistika).
"""
loop_now = asyncio.get_running_loop().time()
if loop_now - _EV_PRESENCE_LAST_POLL.get(site_id, 0.0) < EV_PRESENCE_POLL_S:
return
_EV_PRESENCE_LAST_POLL[site_id] = loop_now
veh = await db.fetchrow(
"""
select v.id, v.vin, v.name, s.latitude, s.longitude
from ems.asset_vehicle v join ems.site s on s.id = v.site_id
where v.site_id = $1 and v.api_type = 'tesla' and v.active
order by v.id limit 1
""",
site_id,
)
if veh is None or veh["latitude"] is None:
return
from services.tesla_client import get_charge_state, get_vehicle_api_state, haversine_m
try:
api_state = await get_vehicle_api_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: state poll failed: %s", e)
return
if api_state is None:
return
at_home = None
distance_m = None
charging_state = None
shift_state = None
if api_state == "online":
try:
st = await get_charge_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: data read failed: %s", e)
st = None
if st is not None and st.get("latitude") is not None:
distance_m = int(
haversine_m(
float(st["latitude"]), float(st["longitude"]),
float(veh["latitude"]), float(veh["longitude"]),
)
)
at_home = distance_m <= EV_PRESENCE_HOME_RADIUS_M
charging_state = st.get("charging_state")
shift_state = st.get("shift_state")
prev = await db.fetchrow(
"""
select at_home from ems.ev_presence_obs
where vehicle_id = $1 and at_home is not null
order by observed_at desc limit 1
""",
int(veh["id"]),
)
await db.execute(
"""
insert into ems.ev_presence_obs
(vehicle_id, api_state, at_home, distance_m, charging_state, shift_state)
values ($1, $2, $3, $4, $5, $6)
""",
int(veh["id"]), api_state, at_home, distance_m, charging_state, shift_state,
)
trans = ev_presence_transition(prev["at_home"] if prev else None, at_home)
if trans == "arrived" and charging_state == "Disconnected":
if loop_now - _EV_PLUG_NUDGE_LAST.get(int(veh["id"]), 0.0) < EV_PLUG_NUDGE_COOLDOWN_S:
return
_EV_PLUG_NUDGE_LAST[int(veh["id"])] = loop_now
grid_w = await db.fetchval(
"select grid_power_w from ems.vw_latest_inverter where site_id = $1 limit 1",
site_id,
)
surplus = f" — právě teče {abs(int(grid_w))/1000:.1f} kW do sítě" if grid_w and grid_w < -500 else ""
from services.notification_service import send_discord
await send_discord(
db, site_id,
f"🚗 **{veh['name'] or 'EV'} je doma a nepíchnuté**{surplus}.\n"
f"Píchni ho a plán se o zbytek postará (přebytky / levné sloty).",
level="info",
)
logger.info("EV plug nudge sent (site=%s, vehicle=%s)", site_id, veh["id"])
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
"""
loop = asyncio.get_running_loop()
start = loop.time()
sites = await conn.fetch(
"select id from ems.vw_site_directory where active = true"
)
for site in sites:
sid = site["id"]
try:
await poll_inverter(sid, conn)
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
await poll_loxone_sensors(sid, conn)
await poll_tesla_presence(sid, conn)
await poll_pool_pumps(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)
return loop.time() - start
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
global _BG_POOL
_BG_POOL = pool
while True:
try:
async with pool.acquire() as conn:
elapsed = await run_telemetry_loop(conn)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("Telemetry wrapper DB error: %s", e)
elapsed = 0.0
await asyncio.sleep(5)
continue
if elapsed > 50:
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
await asyncio.sleep(max(0.0, 60.0 - elapsed))
async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
"""Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s.
Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení.
"""
# Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main).
from services.shelly_client import get_switch_status, shelly_base_url
rows = await db.fetch(
"""
select pump_id as id, code, shelly_switch_id, protocol, host, port
from ems.vw_asset_pool_pump_http_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
base = shelly_base_url(row["protocol"], row["host"], row["port"])
try:
status = await get_switch_status(base, int(row["shelly_switch_id"] or 0))
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila
# historii spotřeby (stejný princip jako u EV nabíječek výše).
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
continue
await db.execute(
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
site_id,
row["id"],
measured_at,
status.output,
int(round(status.apower_w)) if status.apower_w is not None else None,
int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None,
)