From 53288d130aa2410033fed100315c3be1e62f09c3 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Sat, 2 May 2026 19:47:12 +0200 Subject: [PATCH] refactor control output writers --- backend/services/control/exporter_monolith.py | 137 +--------------- backend/services/control/outputs.py | 149 ++++++++++++++++++ 2 files changed, 155 insertions(+), 131 deletions(-) create mode 100644 backend/services/control/outputs.py diff --git a/backend/services/control/exporter_monolith.py b/backend/services/control/exporter_monolith.py index fe379d0..bff405e 100644 --- a/backend/services/control/exporter_monolith.py +++ b/backend/services/control/exporter_monolith.py @@ -3,15 +3,12 @@ from __future__ import annotations import logging -import os from collections import defaultdict from typing import Any from datetime import datetime, timezone import asyncpg -import httpx -from app.config import get_settings from services.control.deye_helpers import ( BATT_VOLTAGE_V, DEYE_CLOCK_DRIFT_OK_SEC, @@ -55,6 +52,12 @@ from services.control.modbus_journal import ( create_modbus_commands, execute_modbus_commands, ) +from services.control.outputs import ( + _current_limit_for_charger, + send_loxone_setpoints, + write_ev_setpoints, + write_heat_pump_setpoint, +) from services.control.repository import ( _fetch_max_charge_power_w, _fetch_operating_mode, @@ -877,134 +880,6 @@ async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict } -def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: - c = (charger_code or "").strip().lower() - if c == "ev-charger-1": - a = sp.ev1_current_a - elif c == "ev-charger-2": - a = sp.ev2_current_a - elif c.endswith("-1") or c == "ev1": - a = sp.ev1_current_a - elif c.endswith("-2") or c == "ev2": - a = sp.ev2_current_a - else: - a = 0 - if a < 6: - a = 0 - return a - - -async def write_ev_setpoints(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str: - rows = await db.fetch( - """ - SELECT ec.code, se.host, se.port, se.unit_id - FROM ems.asset_ev_charger ec - JOIN ems.site_endpoint se ON se.id = ec.endpoint_id - WHERE ec.site_id = $1 - AND ec.schedulable = true - AND se.enabled = true - AND se.endpoint_type = 'modbus_tcp' - ORDER BY ec.code - """, - site_id, - ) - if not rows: - return "OK EV: no schedulable chargers" - - for row in rows: - code = row["code"] - current_a = _current_limit_for_charger(code, setpoints) - logger.info( - "EV setpoint [%s]: %sA (TODO: Modbus registers)", - code, - current_a, - ) - return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)" - - -async def write_heat_pump_setpoint(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str: - rows = await db.fetch( - """ - SELECT hp.code, se.host, se.port, se.unit_id - FROM ems.asset_heat_pump hp - JOIN ems.site_endpoint se ON se.id = hp.endpoint_id - WHERE hp.site_id = $1 - AND hp.schedulable = true - AND se.enabled = true - AND se.endpoint_type = 'modbus_tcp' - """, - site_id, - ) - if not rows: - return "OK heat pump: no schedulable unit" - for row in rows: - logger.info( - "HP setpoint [%s]: enable=%s (TODO: Modbus registers)", - row["code"], - setpoints.heat_pump_enable, - ) - return "OK heat pump: logged (Modbus TODO)" - - -async def send_loxone_setpoints( - site_id: int, - setpoints: ControlSetpoints, - mode: OperatingModeInfo, - db: asyncpg.Connection, -) -> str: - endpoint = await db.fetchrow( - """ - SELECT host, port, protocol - FROM ems.site_endpoint - WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true - ORDER BY id - LIMIT 1 - """, - site_id, - ) - if not endpoint: - return "OK Loxone: no endpoint, skipped" - - proto = (endpoint["protocol"] or "http").lower() - if proto not in ("http", "https"): - proto = "http" - host = endpoint["host"] - port = int(endpoint["port"] or (443 if proto == "https" else 80)) - base = f"{proto}://{host}:{port}/dev/sps/io" - - settings = get_settings() - user = settings.loxone_user or os.getenv("LOXONE_USER") or "" - password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" - auth = (user, password) if user else None - - batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w) - - paths: list[tuple[str, int]] = [ - (f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value), - (f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display), - (f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w), - (f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w), - (f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w), - (f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", 1 if setpoints.heat_pump_enable else 0), - ] - - errs: list[str] = [] - try: - async with httpx.AsyncClient(timeout=5.0) as client: - for url, _ in paths: - try: - r = await client.get(url, auth=auth) - r.raise_for_status() - except Exception as e: - errs.append(f"{url!s}: {e}") - except Exception as e: - return f"FAIL Loxone: client {e}" - - if errs: - return "FAIL Loxone: " + "; ".join(errs[:3]) - return "OK Loxone: all virtual inputs updated" - - async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None: mode = await _fetch_operating_mode(site_id, db) if mode is None: diff --git a/backend/services/control/outputs.py b/backend/services/control/outputs.py new file mode 100644 index 0000000..9a5b130 --- /dev/null +++ b/backend/services/control/outputs.py @@ -0,0 +1,149 @@ +"""Non-Deye output writers for control export.""" + +from __future__ import annotations + +import logging +import os + +import asyncpg +import httpx + +from app.config import get_settings +from services.control.models import ControlSetpoints, OperatingModeInfo + +logger = logging.getLogger(__name__) + + +def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int: + c = (charger_code or "").strip().lower() + if c == "ev-charger-1": + a = sp.ev1_current_a + elif c == "ev-charger-2": + a = sp.ev2_current_a + elif c.endswith("-1") or c == "ev1": + a = sp.ev1_current_a + elif c.endswith("-2") or c == "ev2": + a = sp.ev2_current_a + else: + a = 0 + if a < 6: + a = 0 + return a + + +async def write_ev_setpoints( + site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection +) -> str: + rows = await db.fetch( + """ + SELECT ec.code, se.host, se.port, se.unit_id + FROM ems.asset_ev_charger ec + JOIN ems.site_endpoint se ON se.id = ec.endpoint_id + WHERE ec.site_id = $1 + AND ec.schedulable = true + AND se.enabled = true + AND se.endpoint_type = 'modbus_tcp' + ORDER BY ec.code + """, + site_id, + ) + if not rows: + return "OK EV: no schedulable chargers" + + for row in rows: + code = row["code"] + current_a = _current_limit_for_charger(code, setpoints) + logger.info( + "EV setpoint [%s]: %sA (TODO: Modbus registers)", + code, + current_a, + ) + return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)" + + +async def write_heat_pump_setpoint( + site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection +) -> str: + rows = await db.fetch( + """ + SELECT hp.code, se.host, se.port, se.unit_id + FROM ems.asset_heat_pump hp + JOIN ems.site_endpoint se ON se.id = hp.endpoint_id + WHERE hp.site_id = $1 + AND hp.schedulable = true + AND se.enabled = true + AND se.endpoint_type = 'modbus_tcp' + """, + site_id, + ) + if not rows: + return "OK heat pump: no schedulable unit" + for row in rows: + logger.info( + "HP setpoint [%s]: enable=%s (TODO: Modbus registers)", + row["code"], + setpoints.heat_pump_enable, + ) + return "OK heat pump: logged (Modbus TODO)" + + +async def send_loxone_setpoints( + site_id: int, + setpoints: ControlSetpoints, + mode: OperatingModeInfo, + db: asyncpg.Connection, +) -> str: + endpoint = await db.fetchrow( + """ + SELECT host, port, protocol + FROM ems.site_endpoint + WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true + ORDER BY id + LIMIT 1 + """, + site_id, + ) + if not endpoint: + return "OK Loxone: no endpoint, skipped" + + proto = (endpoint["protocol"] or "http").lower() + if proto not in ("http", "https"): + proto = "http" + host = endpoint["host"] + port = int(endpoint["port"] or (443 if proto == "https" else 80)) + base = f"{proto}://{host}:{port}/dev/sps/io" + + settings = get_settings() + user = settings.loxone_user or os.getenv("LOXONE_USER") or "" + password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" + auth = (user, password) if user else None + + batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w) + + paths: list[tuple[str, int]] = [ + (f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value), + (f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display), + (f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w), + (f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w), + (f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w), + ( + f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", + 1 if setpoints.heat_pump_enable else 0, + ), + ] + + errs: list[str] = [] + try: + async with httpx.AsyncClient(timeout=5.0) as client: + for url, _ in paths: + try: + r = await client.get(url, auth=auth) + r.raise_for_status() + except Exception as e: + errs.append(f"{url!s}: {e}") + except Exception as e: + return f"FAIL Loxone: client {e}" + + if errs: + return "FAIL Loxone: " + "; ".join(errs[:3]) + return "OK Loxone: all virtual inputs updated"