"""Sběr telemetrie z Modbus (Deye) a placeholder záznamy pro EV / TČ.""" from __future__ import annotations import asyncio import logging from datetime import datetime, timezone import asyncpg from app.ws_manager import manager from services.modbus_client import get_modbus_client logger = logging.getLogger(__name__) # Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers) DEYE_REG_RUN_STATE = 500 DEYE_REG_BATT_CHARGE_TODAY = 514 DEYE_REG_BATT_DISCHARGE_TODAY = 515 DEYE_REG_BATTERY_SOC = 588 DEYE_REG_BATTERY_POWER_FLOW = 590 DEYE_REG_GRID_TOTAL_POWER = 625 DEYE_REG_GEN_PORT_POWER = 667 DEYE_REG_LOAD_TOTAL_POWER = 653 DEYE_REG_PV1_POWER = 672 DEYE_REG_PV2_POWER = 673 def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int: """ Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667 jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba. """ return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w)) async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ SELECT ai.id, ai.code, se.host, se.port, se.unit_id FROM ems.asset_inverter ai JOIN ems.site_endpoint se ON se.id = ai.endpoint_id WHERE ai.site_id = $1 AND ai.active = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: inv_id = row["id"] code = row["code"] host = row["host"] port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) try: client = await get_modbus_client(host, port) async with client.batch(unit_id) as mb: run_state = await mb.read_register(DEYE_REG_RUN_STATE) battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC) battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW) batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY) batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY) grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER) load_power = await mb.read_register(DEYE_REG_LOAD_TOTAL_POWER) pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER) pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER) gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER) pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power) logger.debug("inverter:%s Deye run_state raw=%s", code, run_state) await db.execute( """ INSERT INTO ems.telemetry_inverter ( site_id, inverter_id, measured_at, pv_power_w, pv1_power_w, pv2_power_w, gen_port_power_w, battery_soc_percent, battery_power_w, batt_charge_today_wh, batt_discharge_today_wh, grid_power_w, load_power_w, run_state ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) ON CONFLICT (inverter_id, measured_at) DO NOTHING """, site_id, inv_id, measured_at, pv_power_w, pv1_power, pv2_power, gen_port_power, float(battery_soc), battery_power, batt_charge_today, batt_discharge_today, grid_power, load_power, run_state, ) inv_temp: float | None = None await manager.broadcast_telemetry( { "type": "telemetry", "site_id": site_id, "ts": datetime.now(timezone.utc).isoformat(), "pv_power_w": pv_power_w, "battery_soc_pct": float(battery_soc), "battery_power_w": battery_power, "grid_power_w": grid_power, "load_power_w": load_power, "gen_port_power_w": gen_port_power, "inverter_temp_c": inv_temp, } ) except Exception as e: logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e) async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ SELECT ec.id, ec.code, se.host, se.port, se.unit_id FROM ems.asset_ev_charger ec JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, ) measured_at = datetime.now(timezone.utc) connector_id = 1 for row in rows: code = row["code"] charger_id = row["id"] logger.info("TODO: EV charger Modbus registry pending | %s", code) # Placeholder až do mapování Modbus: status zůstává available (bez falešných přechodů). current_status = "available" previous_status = await db.fetchval( """ SELECT status FROM ems.telemetry_ev_charger WHERE charger_id = $1 AND connector_id = $2 ORDER BY measured_at DESC LIMIT 1 """, charger_id, connector_id, ) await db.execute( """ INSERT INTO ems.telemetry_ev_charger ( site_id, charger_id, measured_at, connector_id, status, power_w, energy_kwh ) VALUES ($1, $2, $3, $4, $5, 0, 0) ON CONFLICT (charger_id, connector_id, measured_at) DO NOTHING """, site_id, charger_id, measured_at, connector_id, current_status, ) if previous_status is not None: if previous_status == "available" and current_status != "available": vehicle_id = await db.fetchval( """ SELECT av.id FROM ems.asset_vehicle av WHERE av.site_id = $1 AND av.default_charger_id = $2 AND av.active = true ORDER BY av.id LIMIT 1 """, site_id, charger_id, ) await db.execute( "SELECT ems.fn_update_ev_arrival_stats($1, $2, $3, $4)", site_id, charger_id, vehicle_id, measured_at, ) logger.info("EV arrival detected on charger %s", code) await db.execute( """ INSERT INTO ems.ev_session ( site_id, charger_id, vehicle_id, session_start, target_soc_pct, target_deadline ) SELECT ac.site_id, ac.id, av.id, now(), av.default_target_soc_pct, CASE WHEN av.default_deadline_hour IS NOT NULL THEN ( (timezone('Europe/Prague', now()))::date + interval '1 day' + make_interval(hours => av.default_deadline_hour) )::timestamp AT TIME ZONE 'Europe/Prague' END FROM ems.asset_ev_charger ac LEFT JOIN LATERAL ( SELECT v.id, v.default_target_soc_pct, v.default_deadline_hour FROM ems.asset_vehicle v WHERE v.default_charger_id = ac.id AND v.site_id = ac.site_id AND v.active = true ORDER BY v.id LIMIT 1 ) av ON true WHERE ac.id = $1 AND ac.site_id = $2 ON CONFLICT (charger_id) WHERE session_end IS NULL DO NOTHING """, charger_id, site_id, ) if previous_status != "available" and current_status == "available": await db.execute( """ UPDATE ems.ev_session SET session_end = now() WHERE charger_id = $1 AND session_end IS NULL """, charger_id, ) logger.info("EV departure detected on charger %s", code) async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None: rows = await db.fetch( """ SELECT hp.id, hp.code, se.host, se.port, se.unit_id FROM ems.asset_heat_pump hp JOIN ems.site_endpoint se ON se.id = hp.endpoint_id WHERE hp.site_id = $1 AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, ) measured_at = datetime.now(timezone.utc) for row in rows: code = row["code"] logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code) await db.execute( """ INSERT INTO ems.telemetry_heat_pump ( site_id, heat_pump_id, measured_at, power_w, outdoor_temp_c, water_outlet_temp_c, tuv_tank_temp_c, operating_mode ) VALUES ($1, $2, $3, 0, 10.0, 45.0, 55.0, 'standby') ON CONFLICT (heat_pump_id, measured_at) DO NOTHING """, site_id, row["id"], measured_at, ) async def run_telemetry_loop(conn: asyncpg.Connection) -> float: """Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep). Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy. """ loop = asyncio.get_running_loop() start = loop.time() sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") for site in sites: sid = site["id"] try: await poll_inverter(sid, conn) await poll_ev_chargers(sid, conn) await poll_heat_pump(sid, conn) except Exception as e: logger.error("Telemetry loop error site %s: %s", sid, e) return loop.time() - start async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None: """Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep.""" while True: try: async with pool.acquire() as conn: elapsed = await run_telemetry_loop(conn) except asyncio.CancelledError: raise except Exception as e: logger.exception("Telemetry wrapper DB error: %s", e) elapsed = 0.0 await asyncio.sleep(5) continue if elapsed > 50: logger.warning("Telemetry loop took %.1fs (>50s)", elapsed) await asyncio.sleep(max(0.0, 60.0 - elapsed))