"""Sběr telemetrie z Modbus (Deye) a placeholder záznamy pro EV / TČ.""" from __future__ import annotations import asyncio import logging from datetime import datetime, timezone import asyncpg from app.ws_manager import manager from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) # Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers) DEYE_REG_RUN_STATE = 500 DEYE_REG_BATT_CHARGE_TODAY = 514 DEYE_REG_BATT_DISCHARGE_TODAY = 515 DEYE_REG_BATTERY_SOC = 588 DEYE_REG_BATTERY_POWER_FLOW = 590 DEYE_REG_GRID_TOTAL_POWER = 625 DEYE_REG_GEN_PORT_POWER = 667 DEYE_REG_LOAD_TOTAL_POWER = 653 DEYE_REG_GRID_IMPORT_TOTAL_LO = 522 DEYE_REG_GRID_IMPORT_TOTAL_HI = 523 DEYE_REG_GRID_EXPORT_TOTAL_LO = 524 DEYE_REG_GRID_EXPORT_TOTAL_HI = 525 DEYE_REG_PV1_POWER = 672 DEYE_REG_PV2_POWER = 673 # Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits0–1 == 3 → cut-off ON). # Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based). # Viz modbus-registers.md. DEYE_REG_SOLAR_SELL = 145 DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178 def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int: """ Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667 jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba. """ return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w)) def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]: """Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé).""" if reg145 is None and reg179 is None: return None, None flags = 0 if reg145 is not None and int(reg145) == 0: flags |= 1 if reg179 is not None and (int(reg179) & 3) == 3: flags |= 2 return (flags != 0), flags async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select inverter_id as id, code, host, port, unit_id from ems.vw_asset_inverter_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: inv_id = row["id"] code = row["code"] host = row["host"] port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) try: client = await get_modbus_client(host, port) async with client.batch(unit_id) as mb: run_state = await mb.read_register(DEYE_REG_RUN_STATE) battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC) battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW) batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY) batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY) grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER) load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER) pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER) pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER) gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER) grid_energy_regs = await mb.read_holding_registers( DEYE_REG_GRID_IMPORT_TOTAL_LO, 4 ) reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL) reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1) pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power) grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100 grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100 is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179) logger.debug("inverter:%s Deye run_state raw=%s", code, run_state) await db.execute( "select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)", site_id, inv_id, measured_at, pv_power_w, pv1_power, pv2_power, gen_port_power, float(battery_soc), battery_power, batt_charge_today, batt_discharge_today, grid_power, load_power, grid_import_total_wh, grid_export_total_wh, run_state, is_export_limited, pv_derating_flags, ) inv_temp: float | None = None await manager.broadcast_telemetry( { "type": "telemetry", "site_id": site_id, "ts": datetime.now(timezone.utc).isoformat(), "pv_power_w": pv_power_w, "battery_soc_pct": float(battery_soc), "battery_power_w": battery_power, "grid_power_w": grid_power, "load_power_w": load_power, "gen_port_power_w": gen_port_power, "inverter_temp_c": inv_temp, "is_export_limited": is_export_limited, "pv_derating_flags": pv_derating_flags, } ) except Exception as e: logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e) async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select charger_id as id, code, host, port, unit_id from ems.vw_asset_ev_charger_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) connector_id = 1 for row in rows: code = row["code"] charger_id = row["id"] logger.info("TODO: EV charger Modbus registry pending | %s", code) current_status = "available" previous_status = await db.fetchval( """ select status from ems.telemetry_ev_charger where charger_id = $1 and connector_id = $2 order by measured_at desc limit 1 """, charger_id, connector_id, ) await db.execute( "select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8)", site_id, charger_id, measured_at, connector_id, current_status, 0, 0.0, ) if previous_status is not None: await db.fetchval( "select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)", site_id, charger_id, str(previous_status), current_status, measured_at, ) if previous_status == "available" and current_status != "available": logger.info("EV arrival detected on charger %s", code) elif previous_status != "available" and current_status == "available": logger.info("EV departure detected on charger %s", code) async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select heat_pump_id as id, code, host, port, unit_id from ems.vw_asset_heat_pump_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: code = row["code"] logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code) await db.execute( "select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)", site_id, row["id"], measured_at, 0, 10.0, 45.0, 55.0, "standby", ) async def run_telemetry_loop(conn: asyncpg.Connection) -> float: """Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep). Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy. """ loop = asyncio.get_running_loop() start = loop.time() sites = await conn.fetch( "select id from ems.vw_site_directory where active = true" ) for site in sites: sid = site["id"] try: await poll_inverter(sid, conn) await poll_ev_chargers(sid, conn) await poll_heat_pump(sid, conn) except Exception as e: logger.error("Telemetry loop error site %s: %s", sid, e) return loop.time() - start async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None: """Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep.""" while True: try: async with pool.acquire() as conn: elapsed = await run_telemetry_loop(conn) except asyncio.CancelledError: raise except Exception as e: logger.exception("Telemetry wrapper DB error: %s", e) elapsed = 0.0 await asyncio.sleep(5) continue if elapsed > 50: logger.warning("Telemetry loop took %.1fs (>50s)", elapsed) await asyncio.sleep(max(0.0, 60.0 - elapsed))