"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder.""" from __future__ import annotations import asyncio import logging from datetime import datetime, timezone import asyncpg from app.ws_manager import manager from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) # Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers) DEYE_REG_RUN_STATE = 500 DEYE_REG_BATT_CHARGE_TODAY = 514 DEYE_REG_BATT_DISCHARGE_TODAY = 515 DEYE_REG_BATTERY_SOC = 588 DEYE_REG_BATTERY_POWER_FLOW = 590 DEYE_REG_GRID_TOTAL_POWER = 625 DEYE_REG_GEN_PORT_POWER = 667 DEYE_REG_LOAD_TOTAL_POWER = 653 DEYE_REG_GRID_IMPORT_TOTAL_LO = 522 DEYE_REG_GRID_IMPORT_TOTAL_HI = 523 DEYE_REG_GRID_EXPORT_TOTAL_LO = 524 DEYE_REG_GRID_EXPORT_TOTAL_HI = 525 DEYE_REG_PV1_POWER = 672 DEYE_REG_PV2_POWER = 673 # Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits0–1 == 3 → cut-off ON). # Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based). # Viz modbus-registers.md. DEYE_REG_SOLAR_SELL = 145 DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178 # Teltonika TeltoCharge – holding registry (oficiální „Modbus RTU Communication # protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení). # Blok 0–40: 0–2 napětí L1–L3 (V), 3–5 proud L1–L3 (×10 A), 6 EVSE status (DLM), # 27 charge point state, 33 IEC61851, 34/35 warning/error bity, # 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s). # Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 6–32), 16 start/stop. TELTO_REG_BLOCK_START = 0 TELTO_REG_BLOCK_COUNT = 41 #: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available' #: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'. TELTO_STATUS_MAP = { 0: "charging", # C – nabíjí 1: "preparing", # B1 – EV připojeno, čeká na EV 2: "preparing", # B2 – dříve nabíjelo, nedostatek výkonu 3: "preparing", # B3 – nenabíjelo, nedostatek výkonu 4: "suspended_ev", # D1 – zastaveno vozidlem 5: "suspended_evse", # D2 – bez autorizace 6: "suspended_evse", # D3 – nabíjení nepovoleno 7: "available", # A – bez EV 8: "faulted", # F – chyba 9: "unknown", # E } def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]: """Čistý parser bloku registrů 0–40 TeltoCharge (testovatelné bez Modbus).""" if len(regs) < TELTO_REG_BLOCK_COUNT: raise ValueError(f"TeltoCharge frame too short: {len(regs)}") status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown") current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0 return { "status": status, "power_w": int(regs[38]), "session_energy_kwh": int(regs[39]) / 100.0, "current_a": current_a, "voltage_v": int(regs[0]), "warning_bits": int(regs[34]), "error_bits": int(regs[35]), "evse_status_raw": int(regs[6]), "charge_point_state_raw": int(regs[27]), } def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int: """ Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667 jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba. """ return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w)) def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]: """Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé).""" if reg145 is None and reg179 is None: return None, None flags = 0 if reg145 is not None and int(reg145) == 0: flags |= 1 if reg179 is not None and (int(reg179) & 3) == 3: flags |= 2 return (flags != 0), flags async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select inverter_id as id, code, host, port, unit_id from ems.vw_asset_inverter_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: inv_id = row["id"] code = row["code"] host = row["host"] port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) try: client = await get_modbus_client(host, port) async with client.batch(unit_id) as mb: run_state = await mb.read_register(DEYE_REG_RUN_STATE) battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC) battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW) batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY) batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY) grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER) load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER) pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER) pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER) gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER) grid_energy_regs = await mb.read_holding_registers( DEYE_REG_GRID_IMPORT_TOTAL_LO, 4 ) reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL) reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1) pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power) grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100 grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100 is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179) logger.debug("inverter:%s Deye run_state raw=%s", code, run_state) await db.execute( "select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)", site_id, inv_id, measured_at, pv_power_w, pv1_power, pv2_power, gen_port_power, float(battery_soc), battery_power, batt_charge_today, batt_discharge_today, grid_power, load_power, grid_import_total_wh, grid_export_total_wh, run_state, is_export_limited, pv_derating_flags, ) inv_temp: float | None = None await manager.broadcast_telemetry( { "type": "telemetry", "site_id": site_id, "ts": datetime.now(timezone.utc).isoformat(), "pv_power_w": pv_power_w, "battery_soc_pct": float(battery_soc), "battery_power_w": battery_power, "grid_power_w": grid_power, "load_power_w": load_power, "gen_port_power_w": gen_port_power, "inverter_temp_c": inv_temp, "is_export_limited": is_export_limited, "pv_derating_flags": pv_derating_flags, } ) except Exception as e: logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e) async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select charger_id as id, code, host, port, unit_id from ems.vw_asset_ev_charger_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) connector_id = 1 for row in rows: code = row["code"] charger_id = row["id"] host = row["host"] port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) try: client = await get_modbus_client(host, port) async with client.batch(unit_id) as mb: regs = await mb.read_holding_registers( TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT ) frame = parse_teltocharge_frame(regs) except Exception as e: # Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by # falešně ukončoval EV session a špinil bazál (power 0). logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e) continue current_status = str(frame["status"]) if frame["error_bits"]: logger.warning( "EV charger %s error bits=0x%04x warning=0x%04x", code, frame["error_bits"], frame["warning_bits"], ) previous_status = await db.fetchval( """ select status from ems.telemetry_ev_charger where charger_id = $1 and connector_id = $2 order by measured_at desc limit 1 """, charger_id, connector_id, ) await db.execute( "select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)", site_id, charger_id, measured_at, connector_id, current_status, int(frame["power_w"]), float(frame["session_energy_kwh"]), float(frame["current_a"]), ) if previous_status is not None: await db.fetchval( "select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)", site_id, charger_id, str(previous_status), current_status, measured_at, ) if previous_status == "available" and current_status != "available": logger.info("EV arrival detected on charger %s", code) elif previous_status != "available" and current_status == "available": logger.info("EV departure detected on charger %s", code) async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ select heat_pump_id as id, code, host, port, unit_id from ems.vw_asset_heat_pump_modbus_poll where site_id = $1 """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: code = row["code"] logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code) await db.execute( "select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)", site_id, row["id"], measured_at, 0, 10.0, 45.0, 55.0, "standby", ) async def run_telemetry_loop(conn: asyncpg.Connection) -> float: """Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep). Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy. """ loop = asyncio.get_running_loop() start = loop.time() sites = await conn.fetch( "select id from ems.vw_site_directory where active = true" ) for site in sites: sid = site["id"] try: await poll_inverter(sid, conn) await poll_ev_chargers(sid, conn) await poll_heat_pump(sid, conn) except Exception as e: logger.error("Telemetry loop error site %s: %s", sid, e) return loop.time() - start async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None: """Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep.""" while True: try: async with pool.acquire() as conn: elapsed = await run_telemetry_loop(conn) except asyncio.CancelledError: raise except Exception as e: logger.exception("Telemetry wrapper DB error: %s", e) elapsed = 0.0 await asyncio.sleep(5) continue if elapsed > 50: logger.warning("Telemetry loop took %.1fs (>50s)", elapsed) await asyncio.sleep(max(0.0, 60.0 - elapsed))