"""Non-Deye output writers for control export.""" from __future__ import annotations import logging import os import asyncpg import httpx from app.config import get_settings from services.control.models import ControlSetpoints, OperatingModeInfo logger = logging.getLogger(__name__) # Teltonika TeltoCharge – zápisové registry (oficiální protokol rev 0.5; # docs/04-modules/modbus-registers-teltocharge.md). FC 16 přes journal. TELTO_REG_AMPS_TO_USE = 15 # 0 = stop, 6–32 A TELTO_REG_COMM_TIMEOUT_S = 19 # watchdog: bez komunikace → failsafe TELTO_REG_FAILSAFE_CURRENT_A = 20 #: Výpadek EMS: po 5 min bez zápisu wallbox přejde na failsafe proud — #: auto se přes noc nabije i bez EMS (pomalu), místo aby stálo na 0 A. TELTO_WATCHDOG_TIMEOUT_S = 300 TELTO_WATCHDOG_FAILSAFE_A = 8 def _telto_setpoint_registers(current_a: int) -> list[tuple[int, str, int]]: """Registry pro jeden export tick: limit proudu + watchdog konfigurace. Watchdog (19/20) se posílá s každým tickem, ale journal drop-unchanged ho po prvním verified zápisu přeskakuje — reálně se zapíše jednou. """ a = int(current_a) if a < 6: a = 0 return [ (TELTO_REG_AMPS_TO_USE, "telto_amps_to_use", min(a, 32)), (TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", TELTO_WATCHDOG_TIMEOUT_S), (TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", TELTO_WATCHDOG_FAILSAFE_A), ] def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: c = (charger_code or "").strip().lower() if c == "ev-charger-1": a = sp.ev1_current_a elif c == "ev-charger-2": a = sp.ev2_current_a elif c.endswith("-1") or c == "ev1": a = sp.ev1_current_a elif c.endswith("-2") or c == "ev2": a = sp.ev2_current_a else: a = 0 if a < 6: a = 0 return a async def write_ev_setpoints( site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection ) -> str: from services.control.modbus_journal import ( _drop_registers_matching_last_verified, _fetch_last_verified_registers, create_modbus_commands, execute_modbus_commands, ) rows = await db.fetch( """ SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id FROM ems.asset_ev_charger ec JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 AND ec.schedulable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' ORDER BY ec.code """, site_id, ) if not rows: return "OK EV: no schedulable chargers" written = 0 for row in rows: code = row["code"] asset_id = int(row["asset_id"]) host = str(row["host"]) port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) current_a = _current_limit_for_charger(code, setpoints) registers = _telto_setpoint_registers(current_a) last_verified = await _fetch_last_verified_registers( site_id, asset_id, db, asset_type="ev_charger" ) registers, skipped = _drop_registers_matching_last_verified( registers, last_verified ) if not registers: logger.debug("EV setpoint [%s]: beze změny (%s A)", code, current_a) continue cmd_ids = await create_modbus_commands( site_id, None, "ev_charger", asset_id, code, host, port, unit_id, registers, db, ) ok = await execute_modbus_commands(cmd_ids, db) written += 1 logger.info( "EV setpoint [%s]: %s A (regs %s%s) -> %s", code, current_a, [r for r, _, _ in registers], f", skip {skipped}" if skipped else "", "written" if ok else "FAILED", ) return f"OK EV: {written}/{len(rows)} charger(s) written" async def write_ev_arrival_hold( site_id: int, charger_code: str, db: asyncpg.Connection ) -> bool: """Okamžitě po DETEKCI příjezdu zapsat 0 A na daný wallbox (přes journal). TeltoCharge po připojení kabelu sám rozjede nabíjení svým defaultem — nabíjet smí až PLÁN (replan + export běží hned poté v _on_ev_arrival, takže držení trvá sekundy až ~1 min). Watchdog registry se zapíší spolu s 0 A (drop-unchanged je po prvním verified stejně přeskočí). """ from services.control.modbus_journal import ( create_modbus_commands, execute_modbus_commands, ) row = await db.fetchrow( """ SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id FROM ems.asset_ev_charger ec JOIN ems.site_endpoint se ON se.id = ec.endpoint_id WHERE ec.site_id = $1 AND ec.code = $2 AND ec.schedulable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, charger_code, ) if row is None: return False cmd_ids = await create_modbus_commands( site_id, None, "ev_charger", int(row["asset_id"]), str(row["code"]), str(row["host"]), int(row["port"] or 502), int(row["unit_id"] if row["unit_id"] is not None else 1), _telto_setpoint_registers(0), db, ) ok = await execute_modbus_commands(cmd_ids, db) logger.info( "EV arrival hold [%s]: 0 A %s", charger_code, "written" if ok else "FAILED" ) return bool(ok) async def write_heat_pump_setpoint( site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection ) -> str: rows = await db.fetch( """ SELECT hp.code, se.host, se.port, se.unit_id FROM ems.asset_heat_pump hp JOIN ems.site_endpoint se ON se.id = hp.endpoint_id WHERE hp.site_id = $1 AND hp.schedulable = true AND se.enabled = true AND se.endpoint_type = 'modbus_tcp' """, site_id, ) if not rows: return "OK heat pump: no schedulable unit" for row in rows: logger.info( "HP setpoint [%s]: enable=%s (TODO: Modbus registers)", row["code"], setpoints.heat_pump_enable, ) return "OK heat pump: logged (Modbus TODO)" async def send_loxone_setpoints( site_id: int, setpoints: ControlSetpoints, mode: OperatingModeInfo, db: asyncpg.Connection, ) -> str: endpoint = await db.fetchrow( """ SELECT host, port, protocol FROM ems.site_endpoint WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true ORDER BY id LIMIT 1 """, site_id, ) if not endpoint: return "OK Loxone: no endpoint, skipped" proto = (endpoint["protocol"] or "http").lower() if proto not in ("http", "https"): proto = "http" host = endpoint["host"] port = int(endpoint["port"] or (443 if proto == "https" else 80)) base = f"{proto}://{host}:{port}/dev/sps/io" settings = get_settings() user = settings.loxone_user or os.getenv("LOXONE_USER") or "" password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" auth = (user, password) if user else None batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w) paths: list[tuple[str, int]] = [ (f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value), (f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display), (f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w), (f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w), (f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w), ( f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", 1 if setpoints.heat_pump_enable else 0, ), ] errs: list[str] = [] try: async with httpx.AsyncClient(timeout=5.0) as client: for url, _ in paths: try: r = await client.get(url, auth=auth) r.raise_for_status() except Exception as e: errs.append(f"{url!s}: {e}") except Exception as e: return f"FAIL Loxone: client {e}" if errs: return "FAIL Loxone: " + "; ".join(errs[:3]) return "OK Loxone: all virtual inputs updated"