Files
ems/backend/services/telemetry_collector.py
Dusan Vojacek 342483b885
Some checks failed
CI and deploy / migration-check (push) Failing after 14s
CI and deploy / deploy (push) Has been skipped
invert logic cutoff register
2026-04-29 13:24:28 +02:00

258 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Sběr telemetrie z Modbus (Deye) a placeholder záznamy pro EV / TČ."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
import asyncpg
from app.ws_manager import manager
from services.modbus_client import get_modbus_client
logger = logging.getLogger(__name__)
# Deye SUN holding registry (decimal adresa = přímo pro read_holding_registers)
DEYE_REG_RUN_STATE = 500
DEYE_REG_BATT_CHARGE_TODAY = 514
DEYE_REG_BATT_DISCHARGE_TODAY = 515
DEYE_REG_BATTERY_SOC = 588
DEYE_REG_BATTERY_POWER_FLOW = 590
DEYE_REG_GRID_TOTAL_POWER = 625
DEYE_REG_GEN_PORT_POWER = 667
DEYE_REG_LOAD_TOTAL_POWER = 653
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
DEYE_REG_PV1_POWER = 672
DEYE_REG_PV2_POWER = 673
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (bits01 == 3 → cut-off ON); viz modbus-registers.md
DEYE_REG_SOLAR_SELL = 145
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 179
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
"""
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
"""
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
if reg145 is None and reg179 is None:
return None, None
flags = 0
if reg145 is not None and int(reg145) == 0:
flags |= 1
if reg179 is not None and (int(reg179) & 3) == 3:
flags |= 2
return (flags != 0), flags
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select inverter_id as id, code, host, port, unit_id
from ems.vw_asset_inverter_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
inv_id = row["id"]
code = row["code"]
host = row["host"]
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
try:
client = await get_modbus_client(host, port)
async with client.batch(unit_id) as mb:
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
grid_energy_regs = await mb.read_holding_registers(
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
)
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
await db.execute(
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
site_id,
inv_id,
measured_at,
pv_power_w,
pv1_power,
pv2_power,
gen_port_power,
float(battery_soc),
battery_power,
batt_charge_today,
batt_discharge_today,
grid_power,
load_power,
grid_import_total_wh,
grid_export_total_wh,
run_state,
is_export_limited,
pv_derating_flags,
)
inv_temp: float | None = None
await manager.broadcast_telemetry(
{
"type": "telemetry",
"site_id": site_id,
"ts": datetime.now(timezone.utc).isoformat(),
"pv_power_w": pv_power_w,
"battery_soc_pct": float(battery_soc),
"battery_power_w": battery_power,
"grid_power_w": grid_power,
"load_power_w": load_power,
"gen_port_power_w": gen_port_power,
"inverter_temp_c": inv_temp,
"is_export_limited": is_export_limited,
"pv_derating_flags": pv_derating_flags,
}
)
except Exception as e:
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select charger_id as id, code, host, port, unit_id
from ems.vw_asset_ev_charger_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
connector_id = 1
for row in rows:
code = row["code"]
charger_id = row["id"]
logger.info("TODO: EV charger Modbus registry pending | %s", code)
current_status = "available"
previous_status = await db.fetchval(
"""
select status
from ems.telemetry_ev_charger
where charger_id = $1 and connector_id = $2
order by measured_at desc
limit 1
""",
charger_id,
connector_id,
)
await db.execute(
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8)",
site_id,
charger_id,
measured_at,
connector_id,
current_status,
0,
0.0,
)
if previous_status is not None:
await db.fetchval(
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
site_id,
charger_id,
str(previous_status),
current_status,
measured_at,
)
if previous_status == "available" and current_status != "available":
logger.info("EV arrival detected on charger %s", code)
elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code)
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
rows = await db.fetch(
"""
select heat_pump_id as id, code, host, port, unit_id
from ems.vw_asset_heat_pump_modbus_poll
where site_id = $1
""",
site_id,
)
measured_at = datetime.now(timezone.utc)
for row in rows:
code = row["code"]
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
await db.execute(
"select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)",
site_id,
row["id"],
measured_at,
0,
10.0,
45.0,
55.0,
"standby",
)
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
"""
loop = asyncio.get_running_loop()
start = loop.time()
sites = await conn.fetch(
"select id from ems.vw_site_directory where active = true"
)
for site in sites:
sid = site["id"]
try:
await poll_inverter(sid, conn)
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)
return loop.time() - start
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
while True:
try:
async with pool.acquire() as conn:
elapsed = await run_telemetry_loop(conn)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception("Telemetry wrapper DB error: %s", e)
elapsed = 0.0
await asyncio.sleep(5)
continue
if elapsed > 50:
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
await asyncio.sleep(max(0.0, 60.0 - elapsed))