Files
ems/backend/services/telemetry_collector.py
Dusan Vojacek 897b95f728 x
2026-03-20 14:30:03 +01:00

322 lines
12 KiB
Python

"""Sběr telemetrie z Modbus (Deye) a placeholder záznamy pro EV / TČ."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import asyncpg
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.exceptions import ConnectionException, ModbusIOException
logger = logging.getLogger(__name__)
def _to_signed_i16(value: int) -> int:
v = value & 0xFFFF
if v >= 0x8000:
return v - 0x10000
return v
class ModbusDevice:
def __init__(self, host: str, port: int, unit_id: int, device_name: str) -> None:
self._host = host
self._port = int(port) if port else 502
self._unit_id = int(unit_id) if unit_id is not None else 1
self._device_name = device_name
self._client: AsyncModbusTcpClient | None = None
self._error_count = 0
def _log_prefix(self) -> str:
return f"[{self._device_name}]"
def _note_communication_failure(self, exc: BaseException | None) -> None:
self._error_count += 1
if isinstance(exc, ConnectionError):
logger.warning("%s ConnectionError: %s", self._log_prefix(), exc)
else:
logger.warning(
"%s komunikace selhala: %s",
self._log_prefix(),
exc if exc is not None else "neznámá chyba",
)
if self._error_count >= 3:
logger.error("%s Opakované chyby komunikace", self._log_prefix())
if self._error_count >= 10 and self._error_count % 10 == 0:
logger.critical(
"%s Opakované chyby komunikace, pokus o reconnect",
self._log_prefix(),
)
def _reset_error_count(self) -> None:
self._error_count = 0
async def _ensure_connected(self) -> bool:
if self._client is None:
self._client = AsyncModbusTcpClient(self._host, port=self._port)
if not self._client.connected:
try:
ok = await self._client.connect()
except ConnectionError as e:
self._note_communication_failure(e)
return False
except OSError as e:
self._note_communication_failure(e)
return False
if not ok:
self._note_communication_failure(ConnectionError("Modbus connect() returned False"))
return False
return True
async def _reconnect(self) -> None:
if self._client is not None:
self._client.close()
self._client = None
self._client = AsyncModbusTcpClient(self._host, port=self._port)
try:
await self._client.connect()
except (ConnectionError, OSError) as e:
logger.warning("%s reconnect selhal: %s", self._log_prefix(), e)
async def read_register(self, address: int) -> int:
"""Čte jeden holding register. Vrátí 0 při chybě."""
try:
if not await self._ensure_connected():
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return 0
assert self._client is not None
resp = await self._client.read_holding_registers(
address, count=1, device_id=self._unit_id
)
if resp.isError() or not getattr(resp, "registers", None):
self._note_communication_failure(
ConnectionException(f"read_holding_registers@{address:#x}: {resp!r}")
)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return 0
self._reset_error_count()
return int(resp.registers[0])
except ConnectionError as e:
self._note_communication_failure(e)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return 0
except (OSError, ModbusIOException, ConnectionException) as e:
self._note_communication_failure(e)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return 0
async def read_register_signed(self, address: int) -> int:
"""Čte signed int16 (pro výkony které mohou být záporné)."""
u = await self.read_register(address)
return _to_signed_i16(u)
async def write_register(self, address: int, value: int) -> bool:
"""Zapíše jeden holding register. Vrátí False při chybě."""
try:
if not await self._ensure_connected():
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return False
assert self._client is not None
resp = await self._client.write_register(address, value, device_id=self._unit_id)
if resp.isError():
self._note_communication_failure(
ConnectionException(f"write_register@{address:#x}: {resp!r}")
)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return False
self._reset_error_count()
return True
except ConnectionError as e:
self._note_communication_failure(e)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return False
except (OSError, ModbusIOException, ConnectionException) as e:
self._note_communication_failure(e)
if self._error_count >= 10 and self._error_count % 10 == 0:
await self._reconnect()
return False
async def close(self) -> None:
if self._client is not None:
self._client.close()
self._client = None
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
SELECT ai.id, ai.code, se.host, se.port, se.unit_id
FROM ems.asset_inverter ai
JOIN ems.site_endpoint se ON se.id = ai.endpoint_id
WHERE ai.site_id = $1
AND ai.active = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
inv_id = row["id"]
code = row["code"]
host = row["host"]
port = row["port"] or 502
unit_id = row["unit_id"] if row["unit_id"] is not None else 1
dev = ModbusDevice(host, port, unit_id, f"inverter:{code}")
try:
pv_power_w = await dev.read_register(0x0215)
battery_soc = await dev.read_register(0x0103)
battery_power = await dev.read_register_signed(0x0105)
battery_voltage = (await dev.read_register(0x0101)) / 10.0
grid_power = await dev.read_register_signed(0x0169)
grid_voltage = (await dev.read_register(0x016F)) / 10.0
load_power = await dev.read_register(0x0213)
inv_temp = (await dev.read_register(0x0220)) / 10.0
op_mode = await dev.read_register(0x0168)
fault_code = await dev.read_register(0x0180)
await db.execute(
"""
INSERT INTO ems.telemetry_inverter (
site_id, inverter_id, measured_at,
pv_power_w, battery_soc_percent, battery_power_w, battery_voltage_v,
grid_power_w, grid_voltage_v, load_power_w,
inverter_temp_c, operating_mode, fault_code
)
VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, $9, $10,
$11, $12, $13
)
ON CONFLICT (inverter_id, measured_at) DO NOTHING
""",
site_id,
inv_id,
measured_at,
pv_power_w,
battery_soc,
battery_power,
battery_voltage,
grid_power,
grid_voltage,
load_power,
inv_temp,
str(op_mode),
fault_code,
)
except Exception as e:
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
finally:
await dev.close()
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
SELECT ec.id, ec.code, se.host, se.port, se.unit_id
FROM ems.asset_ev_charger ec
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
WHERE ec.site_id = $1
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
logger.info("TODO: EV charger Modbus registry pending | %s", code)
await db.execute(
"""
INSERT INTO ems.telemetry_ev_charger (
site_id, charger_id, measured_at, connector_id,
status, power_w, energy_kwh
)
VALUES ($1, $2, $3, 1, 'available', 0, 0)
ON CONFLICT (charger_id, connector_id, measured_at) DO NOTHING
""",
site_id,
row["id"],
measured_at,
)
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
SELECT hp.id, hp.code, se.host, se.port, se.unit_id
FROM ems.asset_heat_pump hp
JOIN ems.site_endpoint se ON se.id = hp.endpoint_id
WHERE hp.site_id = $1
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
await db.execute(
"""
INSERT INTO ems.telemetry_heat_pump (
site_id, heat_pump_id, measured_at,
power_w, outdoor_temp_c, water_outlet_temp_c, tuv_tank_temp_c,
operating_mode
)
VALUES ($1, $2, $3, 0, 10.0, 45.0, 55.0, 'standby')
ON CONFLICT (heat_pump_id, measured_at) DO NOTHING
""",
site_id,
row["id"],
measured_at,
)
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
"""
loop = asyncio.get_running_loop()
start = loop.time()
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
sid = site["id"]
try:
await poll_inverter(sid, conn)
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)
return loop.time() - start
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
while True:
try:
async with pool.acquire() as conn:
elapsed = await run_telemetry_loop(conn)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("Telemetry wrapper DB error: %s", e)
elapsed = 0.0
await asyncio.sleep(5)
continue
if elapsed > 50:
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
await asyncio.sleep(max(0.0, 60.0 - elapsed))