Files
ems/backend/services/telemetry_collector.py
2026-06-12 00:12:43 +02:00

464 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import asyncpg
from app.ws_manager import manager
from services.modbus_client import get_modbus_client
logger = logging.getLogger(__name__)
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
_BG_POOL: asyncpg.Pool | None = None
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
Deferred importy: planning_engine/control_exporter importují control balík,
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
"""
if _BG_POOL is None:
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
return
try:
from services.control_exporter import export_setpoints
from services.planning_engine import run_rolling_replan
async with _BG_POOL.acquire() as conn:
# Tesla: skutečné SoC do session PŘED replanem (selhání nesmí blokovat plán)
try:
await _patch_session_from_tesla(site_id, charger_code, conn)
except Exception:
logger.exception(
"Tesla SoC fetch failed (site=%s, %s) — replan jede na defaultech",
site_id, charger_code,
)
await run_rolling_replan(
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
)
await export_setpoints(site_id, conn)
logger.info(
"EV arrival replan+export done (site=%s, charger=%s)",
site_id,
charger_code,
)
except Exception:
logger.exception(
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
)
async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:
"""Po příjezdu: pro vozidlo s api_type='tesla' doplnit reálné SoC do session.
Energie se NEpočítá tady — soc_at_connect_pct + target_soc_pct si přebere
fn_planning_site_context (SQL-first). VIN se při prvním úspěchu naučí.
"""
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
session_id = ctx.get("session_id")
if session_id is None:
logger.warning("Tesla hook: chybí otevřená session (%s)", charger_code)
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
if not ctx.get("vin") and state.get("vin"):
await conn.execute(
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
int(ctx["vehicle_id"]),
str(state["vin"]),
)
patch: dict = {"soc_at_connect_pct": state["battery_level"]}
if state.get("charge_limit_soc"):
patch["target_soc_pct"] = state["charge_limit_soc"]
import json as _json
await fetch_json(
conn,
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
site_id,
int(session_id),
_json.dumps(patch),
)
logger.info(
"Tesla SoC -> session %s: level=%s%%, limit=%s%% (%s)",
session_id,
state["battery_level"],
state.get("charge_limit_soc"),
charger_code,
)
# Deye SUN holding registry (decimal adresa = přímo pro read_holding_registers)
DEYE_REG_RUN_STATE = 500
DEYE_REG_BATT_CHARGE_TODAY = 514
DEYE_REG_BATT_DISCHARGE_TODAY = 515
DEYE_REG_BATTERY_SOC = 588
DEYE_REG_BATTERY_POWER_FLOW = 590
DEYE_REG_GRID_TOTAL_POWER = 625
DEYE_REG_GEN_PORT_POWER = 667
DEYE_REG_LOAD_TOTAL_POWER = 653
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
DEYE_REG_PV1_POWER = 672
DEYE_REG_PV2_POWER = 673
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits01 == 3 → cut-off ON).
# Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based).
# Viz modbus-registers.md.
DEYE_REG_SOLAR_SELL = 145
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
# Teltonika TeltoCharge holding registry (oficiální „Modbus RTU Communication
# protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení).
# Blok 040: 02 napětí L1L3 (V), 35 proud L1L3 (×10 A), 6 EVSE status (DLM),
# 27 charge point state, 33 IEC61851, 34/35 warning/error bity,
# 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s).
# Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 632), 16 start/stop.
TELTO_REG_BLOCK_START = 0
TELTO_REG_BLOCK_COUNT = 41
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
TELTO_STATUS_MAP = {
0: "charging", # C nabíjí
1: "preparing", # B1 EV připojeno, čeká na EV
2: "preparing", # B2 dříve nabíjelo, nedostatek výkonu
3: "preparing", # B3 nenabíjelo, nedostatek výkonu
4: "suspended_ev", # D1 zastaveno vozidlem
5: "suspended_evse", # D2 bez autorizace
6: "suspended_evse", # D3 nabíjení nepovoleno
7: "available", # A bez EV
8: "faulted", # F chyba
9: "unknown", # E
}
def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]:
"""Čistý parser bloku registrů 040 TeltoCharge (testovatelné bez Modbus)."""
if len(regs) < TELTO_REG_BLOCK_COUNT:
raise ValueError(f"TeltoCharge frame too short: {len(regs)}")
status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown")
current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0
return {
"status": status,
"power_w": int(regs[38]),
"session_energy_kwh": int(regs[39]) / 100.0,
"current_a": current_a,
"voltage_v": int(regs[0]),
"warning_bits": int(regs[34]),
"error_bits": int(regs[35]),
"evse_status_raw": int(regs[6]),
"charge_point_state_raw": int(regs[27]),
}
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
"""
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
"""
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
if reg145 is None and reg179 is None:
return None, None
flags = 0
if reg145 is not None and int(reg145) == 0:
flags |= 1
if reg179 is not None and (int(reg179) & 3) == 3:
flags |= 2
return (flags != 0), flags
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select inverter_id as id, code, host, port, unit_id
from ems.vw_asset_inverter_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
inv_id = row["id"]
code = row["code"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
grid_energy_regs = await mb.read_holding_registers(
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
)
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
await db.execute(
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
site_id,
inv_id,
measured_at,
pv_power_w,
pv1_power,
pv2_power,
gen_port_power,
float(battery_soc),
battery_power,
batt_charge_today,
batt_discharge_today,
grid_power,
load_power,
grid_import_total_wh,
grid_export_total_wh,
run_state,
is_export_limited,
pv_derating_flags,
)
inv_temp: float | None = None
await manager.broadcast_telemetry(
{
"type": "telemetry",
"site_id": site_id,
"ts": datetime.now(timezone.utc).isoformat(),
"pv_power_w": pv_power_w,
"battery_soc_pct": float(battery_soc),
"battery_power_w": battery_power,
"grid_power_w": grid_power,
"load_power_w": load_power,
"gen_port_power_w": gen_port_power,
"inverter_temp_c": inv_temp,
"is_export_limited": is_export_limited,
"pv_derating_flags": pv_derating_flags,
}
)
except Exception as e:
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select charger_id as id, code, host, port, unit_id
from ems.vw_asset_ev_charger_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
connector_id = 1
for row in rows:
code = row["code"]
charger_id = row["id"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
regs = await mb.read_holding_registers(
TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT
)
frame = parse_teltocharge_frame(regs)
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
# falešně ukončoval EV session a špinil bazál (power 0).
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
continue
current_status = str(frame["status"])
if frame["error_bits"]:
logger.warning(
"EV charger %s error bits=0x%04x warning=0x%04x",
code, frame["error_bits"], frame["warning_bits"],
)
previous_status = await db.fetchval(
"""
select status
from ems.telemetry_ev_charger
where charger_id = $1 and connector_id = $2
order by measured_at desc
limit 1
""",
charger_id,
connector_id,
)
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
int(frame["power_w"]),
float(frame["session_energy_kwh"]),
float(frame["current_a"]),
)
if previous_status is not None:
await db.fetchval(
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
site_id,
charger_id,
str(previous_status),
current_status,
measured_at,
)
if previous_status == "available" and current_status != "available":
logger.info("EV arrival detected on charger %s", code)
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code)
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select heat_pump_id as id, code, host, port, unit_id
from ems.vw_asset_heat_pump_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
await db.execute(
"select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)",
site_id,
row["id"],
measured_at,
0,
10.0,
45.0,
55.0,
"standby",
)
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
"""
loop = asyncio.get_running_loop()
start = loop.time()
sites = await conn.fetch(
"select id from ems.vw_site_directory where active = true"
)
for site in sites:
sid = site["id"]
try:
await poll_inverter(sid, conn)
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
await poll_pool_pumps(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)
return loop.time() - start
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
global _BG_POOL
_BG_POOL = pool
while True:
try:
async with pool.acquire() as conn:
elapsed = await run_telemetry_loop(conn)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("Telemetry wrapper DB error: %s", e)
elapsed = 0.0
await asyncio.sleep(5)
continue
if elapsed > 50:
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
await asyncio.sleep(max(0.0, 60.0 - elapsed))
async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
"""Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s.
Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení.
"""
# Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main).
from services.shelly_client import get_switch_status, shelly_base_url
rows = await db.fetch(
"""
select pump_id as id, code, shelly_switch_id, protocol, host, port
from ems.vw_asset_pool_pump_http_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
base = shelly_base_url(row["protocol"], row["host"], row["port"])
try:
status = await get_switch_status(base, int(row["shelly_switch_id"] or 0))
except Exception as e:
# Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila
# historii spotřeby (stejný princip jako u EV nabíječek výše).
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
continue
await db.execute(
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
site_id,
row["id"],
measured_at,
status.output,
int(round(status.apower_w)) if status.apower_w is not None else None,
int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None,
)