Files
ems/backend/services/control/outputs.py
2026-05-02 19:47:12 +02:00

150 lines
4.5 KiB
Python

"""Non-Deye output writers for control export."""
from __future__ import annotations
import logging
import os
import asyncpg
import httpx
from app.config import get_settings
from services.control.models import ControlSetpoints, OperatingModeInfo
logger = logging.getLogger(__name__)
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
c = (charger_code or "").strip().lower()
if c == "ev-charger-1":
a = sp.ev1_current_a
elif c == "ev-charger-2":
a = sp.ev2_current_a
elif c.endswith("-1") or c == "ev1":
a = sp.ev1_current_a
elif c.endswith("-2") or c == "ev2":
a = sp.ev2_current_a
else:
a = 0
if a < 6:
a = 0
return a
async def write_ev_setpoints(
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
) -> str:
rows = await db.fetch(
"""
SELECT ec.code, se.host, se.port, se.unit_id
FROM ems.asset_ev_charger ec
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
WHERE ec.site_id = $1
AND ec.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
ORDER BY ec.code
""",
site_id,
)
if not rows:
return "OK EV: no schedulable chargers"
for row in rows:
code = row["code"]
current_a = _current_limit_for_charger(code, setpoints)
logger.info(
"EV setpoint [%s]: %sA (TODO: Modbus registers)",
code,
current_a,
)
return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)"
async def write_heat_pump_setpoint(
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
) -> str:
rows = await db.fetch(
"""
SELECT hp.code, se.host, se.port, se.unit_id
FROM ems.asset_heat_pump hp
JOIN ems.site_endpoint se ON se.id = hp.endpoint_id
WHERE hp.site_id = $1
AND hp.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
if not rows:
return "OK heat pump: no schedulable unit"
for row in rows:
logger.info(
"HP setpoint [%s]: enable=%s (TODO: Modbus registers)",
row["code"],
setpoints.heat_pump_enable,
)
return "OK heat pump: logged (Modbus TODO)"
async def send_loxone_setpoints(
site_id: int,
setpoints: ControlSetpoints,
mode: OperatingModeInfo,
db: asyncpg.Connection,
) -> str:
endpoint = await db.fetchrow(
"""
SELECT host, port, protocol
FROM ems.site_endpoint
WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true
ORDER BY id
LIMIT 1
""",
site_id,
)
if not endpoint:
return "OK Loxone: no endpoint, skipped"
proto = (endpoint["protocol"] or "http").lower()
if proto not in ("http", "https"):
proto = "http"
host = endpoint["host"]
port = int(endpoint["port"] or (443 if proto == "https" else 80))
base = f"{proto}://{host}:{port}/dev/sps/io"
settings = get_settings()
user = settings.loxone_user or os.getenv("LOXONE_USER") or ""
password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or ""
auth = (user, password) if user else None
batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
paths: list[tuple[str, int]] = [
(f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value),
(f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display),
(f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w),
(f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w),
(f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w),
(
f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}",
1 if setpoints.heat_pump_enable else 0,
),
]
errs: list[str] = []
try:
async with httpx.AsyncClient(timeout=5.0) as client:
for url, _ in paths:
try:
r = await client.get(url, auth=auth)
r.raise_for_status()
except Exception as e:
errs.append(f"{url!s}: {e}")
except Exception as e:
return f"FAIL Loxone: client {e}"
if errs:
return "FAIL Loxone: " + "; ".join(errs[:3])
return "OK Loxone: all virtual inputs updated"