Files
ems/backend/services/telemetry_collector.py
Dusan Vojacek d63a85a2ea
Some checks failed
CI and deploy / migration-check (push) Failing after 7m29s
CI and deploy / deploy (push) Has been skipped
TČ Samsung přes MIM-B19N: endpoint 172.16.1.17, plný poll, registry doc
- V096: endpoint home-01 TČ z placeholderu 192.168.1.103 na reálný Waveshare
  RS485 TO POE ETH (B) 172.16.1.17:502; telemetry_heat_pump.room_temp_c.
- R__048: fn_telemetry_heat_pump_sample rozšířena (water_inlet, room_temp,
  defrost, alarm_code) — drop/comment bez parametrů dle konvence.
- poll_heat_pump: místo TODO stubu (zapisoval dummy 45/55 °C!) skutečné čtení
  MIM bloku 50-75 + defrost reg 2; gate na comm_status ready (jinak skip);
  operating_mode off/heat/cool/auto/dhw/error; power_w NULL (MIM příkon nemá).
- docs/04-modules/modbus-registers-mim-b19n.md (mapa, 9600 8E1, DIP adresa,
  troubleshooting E6xx) + heat-pump.md odkaz.

Živý stav: TCP :502 OK, Modbus bez odpovědi (čeká na protokol převodníku /
paritu EVEN / polaritu A-B — checklist v docu).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 18:24:10 +02:00

798 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import asyncpg
from app.ws_manager import manager
from zoneinfo import ZoneInfo
from services.modbus_client import get_modbus_client
_PRAGUE_TZ_NOTIFY = ZoneInfo("Europe/Prague")
logger = logging.getLogger(__name__)
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
_BG_POOL: asyncpg.Pool | None = None
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
Deferred importy: planning_engine/control_exporter importují control balík,
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
"""
if _BG_POOL is None:
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
return
try:
from services.control_exporter import export_setpoints
from services.planning_engine import run_rolling_replan
async with _BG_POOL.acquire() as conn:
# Tesla: skutečné SoC do session PŘED replanem (selhání nesmí blokovat plán)
try:
await _patch_session_from_tesla(site_id, charger_code, conn)
except Exception:
logger.exception(
"Tesla SoC fetch failed (site=%s, %s) — replan jede na defaultech",
site_id, charger_code,
)
await run_rolling_replan(
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
)
await export_setpoints(site_id, conn)
try:
from services.ev_notify import send_ev_arrival
await send_ev_arrival(site_id, charger_code, conn)
except Exception:
logger.exception("EV arrival Discord notify failed (%s)", charger_code)
logger.info(
"EV arrival replan+export done (site=%s, charger=%s)",
site_id,
charger_code,
)
except Exception:
logger.exception(
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
)
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
"""
if _BG_POOL is None:
return
try:
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
async with _BG_POOL.acquire() as conn:
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
logger.info(
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
site_id, charger_code,
state["battery_level"], state.get("odometer_km"),
)
except Exception:
logger.exception(
"EV departure obs failed (site=%s, %s)", site_id, charger_code
)
async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:
"""Po příjezdu: pro vozidlo s api_type='tesla' doplnit reálné SoC do session.
Energie se NEpočítá tady — soc_at_connect_pct + target_soc_pct si přebere
fn_planning_site_context (SQL-first). VIN se při prvním úspěchu naučí.
"""
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
session_id = ctx.get("session_id")
if session_id is None:
logger.warning("Tesla hook: chybí otevřená session (%s)", charger_code)
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
if not ctx.get("vin") and state.get("vin"):
await conn.execute(
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
int(ctx["vehicle_id"]),
str(state["vin"]),
)
patch: dict = {"soc_at_connect_pct": state["battery_level"]}
if state.get("charge_limit_soc"):
patch["target_soc_pct"] = state["charge_limit_soc"]
import json as _json
await fetch_json(
conn,
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
site_id,
int(session_id),
_json.dumps(patch),
)
logger.info(
"Tesla SoC -> session %s: level=%s%%, limit=%s%% (%s)",
session_id,
state["battery_level"],
state.get("charge_limit_soc"),
charger_code,
)
# Deye SUN holding registry (decimal adresa = přímo pro read_holding_registers)
DEYE_REG_RUN_STATE = 500
DEYE_REG_BATT_CHARGE_TODAY = 514
DEYE_REG_BATT_DISCHARGE_TODAY = 515
DEYE_REG_BATTERY_SOC = 588
DEYE_REG_BATTERY_POWER_FLOW = 590
DEYE_REG_GRID_TOTAL_POWER = 625
DEYE_REG_GEN_PORT_POWER = 667
DEYE_REG_LOAD_TOTAL_POWER = 653
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
DEYE_REG_PV1_POWER = 672
DEYE_REG_PV2_POWER = 673
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits01 == 3 → cut-off ON).
# Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based).
# Viz modbus-registers.md.
DEYE_REG_SOLAR_SELL = 145
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
# Teltonika TeltoCharge holding registry (oficiální „Modbus RTU Communication
# protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení).
# Blok 040: 02 napětí L1L3 (V), 35 proud L1L3 (×10 A), 6 EVSE status (DLM),
# 27 charge point state, 33 IEC61851, 34/35 warning/error bity,
# 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s).
# Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 632), 16 start/stop.
TELTO_REG_BLOCK_START = 0
TELTO_REG_BLOCK_COUNT = 41
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
TELTO_STATUS_MAP = {
0: "charging", # C nabíjí
1: "preparing", # B1 EV připojeno, čeká na EV
2: "preparing", # B2 dříve nabíjelo, nedostatek výkonu
3: "preparing", # B3 nenabíjelo, nedostatek výkonu
4: "suspended_ev", # D1 zastaveno vozidlem
5: "suspended_evse", # D2 bez autorizace
6: "suspended_evse", # D3 nabíjení nepovoleno
7: "available", # A bez EV
8: "faulted", # F chyba
9: "unknown", # E
}
def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]:
"""Čistý parser bloku registrů 040 TeltoCharge (testovatelné bez Modbus)."""
if len(regs) < TELTO_REG_BLOCK_COUNT:
raise ValueError(f"TeltoCharge frame too short: {len(regs)}")
status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown")
current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0
return {
"status": status,
"power_w": int(regs[38]),
"session_energy_kwh": int(regs[39]) / 100.0,
"current_a": current_a,
"voltage_v": int(regs[0]),
"warning_bits": int(regs[34]),
"error_bits": int(regs[35]),
"evse_status_raw": int(regs[6]),
"charge_point_state_raw": int(regs[27]),
}
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
"""
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
"""
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
if reg145 is None and reg179 is None:
return None, None
flags = 0
if reg145 is not None and int(reg145) == 0:
flags |= 1
if reg179 is not None and (int(reg179) & 3) == 3:
flags |= 2
return (flags != 0), flags
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select inverter_id as id, code, host, port, unit_id
from ems.vw_asset_inverter_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
inv_id = row["id"]
code = row["code"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
grid_energy_regs = await mb.read_holding_registers(
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
)
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
await db.execute(
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
site_id,
inv_id,
measured_at,
pv_power_w,
pv1_power,
pv2_power,
gen_port_power,
float(battery_soc),
battery_power,
batt_charge_today,
batt_discharge_today,
grid_power,
load_power,
grid_import_total_wh,
grid_export_total_wh,
run_state,
is_export_limited,
pv_derating_flags,
)
inv_temp: float | None = None
await manager.broadcast_telemetry(
{
"type": "telemetry",
"site_id": site_id,
"ts": datetime.now(timezone.utc).isoformat(),
"pv_power_w": pv_power_w,
"battery_soc_pct": float(battery_soc),
"battery_power_w": battery_power,
"grid_power_w": grid_power,
"load_power_w": load_power,
"gen_port_power_w": gen_port_power,
"inverter_temp_c": inv_temp,
"is_export_limited": is_export_limited,
"pv_derating_flags": pv_derating_flags,
}
)
except Exception as e:
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select charger_id as id, code, host, port, unit_id
from ems.vw_asset_ev_charger_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
connector_id = 1
for row in rows:
code = row["code"]
charger_id = row["id"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
regs = await mb.read_holding_registers(
TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT
)
frame = parse_teltocharge_frame(regs)
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
# falešně ukončoval EV session a špinil bazál (power 0).
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
continue
current_status = str(frame["status"])
if frame["error_bits"]:
logger.warning(
"EV charger %s error bits=0x%04x warning=0x%04x",
code, frame["error_bits"], frame["warning_bits"],
)
previous_status = await db.fetchval(
"""
select status
from ems.telemetry_ev_charger
where charger_id = $1 and connector_id = $2
order by measured_at desc
limit 1
""",
charger_id,
connector_id,
)
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
int(frame["power_w"]),
float(frame["session_energy_kwh"]),
float(frame["current_a"]),
)
if previous_status is not None:
await db.fetchval(
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
site_id,
charger_id,
str(previous_status),
current_status,
measured_at,
)
if previous_status == "available" and current_status != "available":
logger.info("EV arrival detected on charger %s", code)
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code)
asyncio.create_task(_on_ev_departure(site_id, str(code)))
# Samsung MIM-B19N(T) — Modbus RTU slave za RS485→TCP (9600 8E1!).
# Adresace: vnitřní jednotka IU má blok base = 50 + IU*50; zde IU 0 → 50..99.
# Plný popis: docs/04-modules/modbus-registers-mim-b19n.md
MIM_IU_BASE = 50 # blok vnitřní jednotky 0
MIM_OFF_COMM_STATUS = 0 # b0 exist, b1 type OK, b2 ready, b3 comm error
MIM_OFF_UNIT_TYPE = 1 # lower byte: 110=HE, 115-117=EHS, 120=HT
MIM_OFF_ONOFF = 2
MIM_OFF_MODE = 3 # 0 auto, 1 cool, 4 heat
MIM_OFF_ROOM_TEMP = 9 # °C×10 signed
MIM_OFF_ERROR_CODE = 13 # 0 = OK, 100-999 kód
MIM_OFF_WATER_IN = 15 # °C×10 signed
MIM_OFF_WATER_OUT = 16 # °C×10 signed
MIM_OFF_DHW_ONOFF = 22
MIM_OFF_DHW_TEMP = 25 # °C×10 (zásobník TUV)
MIM_REG_DEFROST = 2 # modulový registr: 0=off, jinak defrost
MIM_MODE_NAMES = {0: "auto", 1: "cool", 2: "dry", 3: "fan", 4: "heat"}
def _mim_temp_c(raw: int) -> float | None:
"""°C×10, signed 16bit; MIM drží 0 dokud jednotka hodnotu nedodá."""
v = raw - 65536 if raw > 32767 else raw
return round(v / 10.0, 1)
def mim_operating_mode(on: int, mode: int, dhw_on: int, comm_ready: bool, error: int) -> str:
if not comm_ready:
return "offline"
if error:
return "error"
parts = []
if int(on) == 1:
parts.append(MIM_MODE_NAMES.get(int(mode), f"mode{mode}"))
if int(dhw_on) == 1:
parts.append("dhw")
return "+".join(parts) if parts else "off"
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select heat_pump_id as id, code, host, port, unit_id
from ems.vw_asset_heat_pump_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
iu = await mb.read_holding_registers(MIM_IU_BASE, 26)
defrost_raw = await mb.read_register(MIM_REG_DEFROST)
except Exception as e:
logger.warning("heat_pump %s: Modbus poll failed (%s)", code, e)
continue
comm = int(iu[MIM_OFF_COMM_STATUS])
comm_ready = (comm & 0b111) == 0b111
error_code = int(iu[MIM_OFF_ERROR_CODE])
mode_txt = mim_operating_mode(
iu[MIM_OFF_ONOFF], iu[MIM_OFF_MODE], iu[MIM_OFF_DHW_ONOFF],
comm_ready, error_code,
)
if not comm_ready:
# MIM odpovídá, ale jednotka není ztrackovaná (b0-b2) — telemetrii
# nezapisovat (samé nuly), jen log; trvalý stav = špatná adresa IU
# nebo SEG5 "Use of central control" vypnuté.
logger.warning(
"heat_pump %s: jednotka není ready (comm_status=%s) — vzorek přeskočen",
code, bin(comm),
)
continue
await db.execute(
"select ems.fn_telemetry_heat_pump_sample("
"$1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8,"
" $7::float8, $8::text, $9::float8, $10::float8, $11::boolean, $12::int)",
site_id,
row["id"],
measured_at,
None, # příkon: MIM neměří — doplní elektroměr (Shelly/Chint)
None, # venkovní teplota: v MIM mapě není
_mim_temp_c(iu[MIM_OFF_WATER_OUT]),
_mim_temp_c(iu[MIM_OFF_DHW_TEMP]),
mode_txt,
_mim_temp_c(iu[MIM_OFF_WATER_IN]),
_mim_temp_c(iu[MIM_OFF_ROOM_TEMP]),
bool(defrost_raw),
error_code,
)
if error_code:
logger.warning("heat_pump %s: error code %s", code, error_code)
async def poll_loxone_sensors(site_id: int, db: asyncpg.Connection) -> None:
"""Čidla z Loxone (teplota bazénu, akumulační nádrže…): GET /jdev/sps/io/<name>/state.
Endpoint = site loxone_http; auth LOXONE_USER/PASSWORD (env). Hodnota
z LL.value ("23.5°" → 23.5). Bez čidel v ems.loxone_sensor no-op.
"""
rows = await db.fetch(
"""
select ls.id, ls.loxone_name, se.host, se.port, se.protocol
from ems.loxone_sensor ls
join ems.site_endpoint se
on se.site_id = ls.site_id and se.endpoint_type = 'loxone_http' and se.enabled
where ls.site_id = $1 and ls.enabled
""",
site_id,
)
if not rows:
return
import os
import re as _re
import httpx
auth = None
user = os.getenv("LOXONE_USER") or ""
if user:
auth = (user, os.getenv("LOXONE_PASSWORD") or "")
measured_at = datetime.now(timezone.utc)
async with httpx.AsyncClient(timeout=5.0, auth=auth) as client:
for r in rows:
proto = (r["protocol"] or "http").lower()
port = int(r["port"] or (443 if proto == "https" else 80))
url = f"{proto}://{r['host']}:{port}/jdev/sps/io/{r['loxone_name']}/state"
try:
resp = await client.get(url)
resp.raise_for_status()
raw = str((resp.json().get("LL") or {}).get("value", ""))
m = _re.search(r"-?\d+(?:[.,]\d+)?", raw)
if m is None:
continue
value = float(m.group(0).replace(",", "."))
except Exception as e:
logger.warning("Loxone sensor %s read failed: %s", r["loxone_name"], e)
continue
await db.execute(
"""
insert into ems.telemetry_loxone_sensor (sensor_id, measured_at, value)
values ($1, $2, $3) on conflict do nothing
""",
int(r["id"]),
measured_at,
value,
)
#: presence poll pacing (sekundy) a geofence poloměr (m).
#: Fleet API je placené (vehicle_data $0.002/req, wake $0.02 — wake NIKDY):
#: list 10 min; vehicle_data jen při přechodu asleep→online a pak max 1×/15 min;
#: při otevřené ev_session se nepolluje vůbec (auto je u wallboxu = doma,
#: a při AC nabíjení nespí — bez gatu by data cally tekly celou noc).
EV_PRESENCE_POLL_S = 600
EV_PRESENCE_DATA_MIN_S = 900
EV_PRESENCE_HOME_RADIUS_M = 150
#: anti-spam "píchni auto": min. rozestup notifikací per vozidlo (s)
EV_PLUG_NUDGE_COOLDOWN_S = 2 * 3600
_EV_PRESENCE_LAST_POLL: dict[int, float] = {}
_EV_PRESENCE_LAST_DATA: dict[int, float] = {}
_EV_PRESENCE_LAST_STATE: dict[int, str] = {}
_EV_PLUG_NUDGE_LAST: dict[int, float] = {}
def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None) -> str | None:
"""Čistá detekce přechodu: 'arrived' / 'left' / None (testovatelné)."""
if new_at_home is None or prev_at_home is None:
return None
if not prev_at_home and new_at_home:
return "arrived"
if prev_at_home and not new_at_home:
return "left"
return None
async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
"""Přítomnost vozidla: /vehicles state (nebudí) + při online poloha → geofence.
Přechod pryč→doma + nepíchnuté → Discord pobídka (s aktuálním přebytkem).
Vše se loguje do ev_presence_obs (budoucí dostupnostní statistika).
"""
loop_now = asyncio.get_running_loop().time()
if loop_now - _EV_PRESENCE_LAST_POLL.get(site_id, 0.0) < EV_PRESENCE_POLL_S:
return
_EV_PRESENCE_LAST_POLL[site_id] = loop_now
veh = await db.fetchrow(
"""
select v.id, v.vin, v.name, s.latitude, s.longitude
from ems.asset_vehicle v join ems.site s on s.id = v.site_id
where v.site_id = $1 and v.api_type = 'tesla' and v.active
order by v.id limit 1
""",
site_id,
)
if veh is None or veh["latitude"] is None:
return
# auto na wallboxu (otevřená session) = doma; žádné API cally (šetří $)
plugged = await db.fetchval(
"""
select exists(
select 1 from ems.ev_session es
where es.vehicle_id = $1 and es.session_end is null
)
""",
int(veh["id"]),
)
if plugged:
return
from services.tesla_client import get_charge_state, get_vehicle_api_state, haversine_m
try:
api_state = await get_vehicle_api_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: state poll failed: %s", e)
return
if api_state is None:
return
prev_state = _EV_PRESENCE_LAST_STATE.get(int(veh["id"]))
_EV_PRESENCE_LAST_STATE[int(veh["id"])] = api_state
woke_up = api_state == "online" and prev_state != "online"
data_due = (
loop_now - _EV_PRESENCE_LAST_DATA.get(int(veh["id"]), 0.0)
>= EV_PRESENCE_DATA_MIN_S
)
at_home = None
distance_m = None
charging_state = None
shift_state = None
if api_state == "online" and (woke_up or data_due):
_EV_PRESENCE_LAST_DATA[int(veh["id"])] = loop_now
try:
st = await get_charge_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: data read failed: %s", e)
st = None
if st is not None and st.get("latitude") is not None:
distance_m = int(
haversine_m(
float(st["latitude"]), float(st["longitude"]),
float(veh["latitude"]), float(veh["longitude"]),
)
)
at_home = distance_m <= EV_PRESENCE_HOME_RADIUS_M
charging_state = st.get("charging_state")
shift_state = st.get("shift_state")
prev = await db.fetchrow(
"""
select at_home from ems.ev_presence_obs
where vehicle_id = $1 and at_home is not null
order by observed_at desc limit 1
""",
int(veh["id"]),
)
await db.execute(
"""
insert into ems.ev_presence_obs
(vehicle_id, api_state, at_home, distance_m, charging_state, shift_state)
values ($1, $2, $3, $4, $5, $6)
""",
int(veh["id"]), api_state, at_home, distance_m, charging_state, shift_state,
)
trans = ev_presence_transition(prev["at_home"] if prev else None, at_home)
if trans == "arrived" and charging_state == "Disconnected":
if loop_now - _EV_PLUG_NUDGE_LAST.get(int(veh["id"]), 0.0) < EV_PLUG_NUDGE_COOLDOWN_S:
return
_EV_PLUG_NUDGE_LAST[int(veh["id"])] = loop_now
grid_w = await db.fetchval(
"select grid_power_w from ems.vw_latest_inverter where site_id = $1 limit 1",
site_id,
)
surplus = f" — právě teče {abs(int(grid_w))/1000:.1f} kW do sítě" if grid_w and grid_w < -500 else ""
from services.notification_service import send_discord
await send_discord(
db, site_id,
f"🚗 **{veh['name'] or 'EV'} je doma a nepíchnuté**{surplus}.\n"
f"Píchni ho a plán se o zbytek postará (přebytky / levné sloty).",
level="info",
)
logger.info("EV plug nudge sent (site=%s, vehicle=%s)", site_id, veh["id"])
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
"""
loop = asyncio.get_running_loop()
start = loop.time()
sites = await conn.fetch(
"select id from ems.vw_site_directory where active = true"
)
for site in sites:
sid = site["id"]
try:
await poll_inverter(sid, conn)
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
await poll_loxone_sensors(sid, conn)
await poll_tesla_presence(sid, conn)
await poll_pool_pumps(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)
return loop.time() - start
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
global _BG_POOL
_BG_POOL = pool
while True:
try:
async with pool.acquire() as conn:
elapsed = await run_telemetry_loop(conn)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("Telemetry wrapper DB error: %s", e)
elapsed = 0.0
await asyncio.sleep(5)
continue
if elapsed > 50:
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
await asyncio.sleep(max(0.0, 60.0 - elapsed))
async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
"""Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s.
Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení.
"""
# Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main).
from services.shelly_client import get_switch_status, shelly_base_url
rows = await db.fetch(
"""
select pump_id as id, code, shelly_switch_id, protocol, host, port
from ems.vw_asset_pool_pump_http_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
base = shelly_base_url(row["protocol"], row["host"], row["port"])
try:
status = await get_switch_status(base, int(row["shelly_switch_id"] or 0))
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila
# historii spotřeby (stejný princip jako u EV nabíječek výše).
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
continue
await db.execute(
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
site_id,
row["id"],
measured_at,
status.output,
int(round(status.apower_w)) if status.apower_w is not None else None,
int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None,
)