Files
ems/backend/services/control/exporter_monolith.py
2026-05-02 19:40:16 +02:00

1558 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Export plánovaných setpointů na Modbus (Deye, EV, TČ) a HTTP do Loxone."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from collections import defaultdict
from typing import Any
from datetime import datetime, timezone
import asyncpg
import httpx
from app.config import get_settings
from services.control.deye_helpers import (
BATT_VOLTAGE_V,
DEYE_CLOCK_DRIFT_OK_SEC,
DEYE_CLOCK_REGS,
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC, # noqa: F401 - re-export for compatibility
DEYE_CRITICAL_REGS_SELF_SUSTAIN, # noqa: F401 - re-export for compatibility
DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A,
DEYE_REGISTER_NAMES,
DEYE_TOU_INACTIVE_HHMM,
DEYE_TOU_POWER_REGS,
PRAGUE_TZ,
REG143_SELL_CAP_MIN_W,
REG178_MI_EXPORT_DISABLE,
REG178_MI_EXPORT_ENABLE,
REG178_MI_EXPORT_MASK,
REG178_PASSIVE,
REG178_SELL,
REG178_VERIFY_MASK,
REG178_VERIFY_MASK_COMBINED,
_DEYE_INACTIVE_TOU_REGISTERS,
_deye_clock_registers_verify_match,
_deye_reg178_verify_match,
_deye_reg178_verify_with_double_read,
_deye_registers_to_prague_datetime, # noqa: F401 - re-export for compatibility
_deye_should_skip_time_sync_after_read,
_deye_tou_power_verify_match,
_prague_minute_start_utc,
battery_watts_to_amps,
compute_pv_a_reg340_max_solar_w,
current_slot_hhmm,
deye_reg_triggers_self_sustain_after_verify_exhaust, # noqa: F401 - re-export
next_slot_hhmm,
watts_to_amps,
)
from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo
from services.control.setpoints import (
_DictRecord,
_apply_price_failsafe_guard,
_build_setpoints,
_clamp_deye_tou_soc_pct,
_deye_passive_tou_battery_soc_pct,
_deye_reg143_export_w,
_deye_system_time_register_rows,
_deye_time_point_rows,
_deye_tou_min_soc_pct,
_deye_tou_params,
_deye_tou_reserve_soc_pct,
_deye_zero_export_amps_for_passive,
get_deye_mode,
)
from services.modbus_client import get_modbus_client
from services.signal_service import enqueue_site_signals
logger = logging.getLogger(__name__)
async def _fetch_written_deye_clock_commands(
site_id: int,
asset_id: int,
host: str,
port: int,
unit_id: int,
db: asyncpg.Connection,
) -> list[asyncpg.Record]:
"""Všechny řádky journalu 6264 ve stavu written pro daný invertor/endpoint."""
rows = await db.fetch(
"""
SELECT * FROM ems.modbus_command
WHERE site_id = $1
AND asset_type = 'inverter'
AND asset_id = $2
AND device_host = $3
AND device_port = $4
AND device_unit_id = $5
AND register IN (62, 63, 64)
AND status = 'written'
ORDER BY register
""",
site_id,
asset_id,
host,
port,
unit_id,
)
return list(rows)
async def _fetch_last_verified_inverter_registers(
site_id: int, inverter_asset_id: int, db: asyncpg.Connection
) -> dict[int, int]:
"""
Poslední hodnota na zařízení podle journalu (jen status verified).
Slouží k přeskočení duplicitního zápisu stejné hodnoty.
"""
raw = await db.fetchval(
"""
select ems.fn_modbus_last_verified_map($1::int, $2::int)
""",
site_id,
inverter_asset_id,
)
data = raw if isinstance(raw, dict) else json.loads(raw)
return {int(k): int(v) for k, v in data.items()}
def _drop_registers_matching_last_verified(
registers: list[tuple[int, str, int]],
last_verified: dict[int, int],
) -> tuple[list[tuple[int, str, int]], list[int]]:
"""Vynechá položky s hodnotou shodnou s posledním ověřeným stavem; vrátí (nový seznam, vynechané reg)."""
out: list[tuple[int, str, int]] = []
skipped: list[int] = []
for reg, meta, val in registers:
lv = last_verified.get(int(reg))
if lv is not None:
# reg178: porovnáváme jen masku bitů 45 (Deye si v dalších bitech drží vlastní stav).
if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)):
skipped.append(int(reg))
continue
if int(lv) == int(val):
skipped.append(int(reg))
continue
out.append((reg, meta, val))
return out, skipped
async def create_modbus_commands(
site_id: int,
planning_run_id: int | None,
asset_type: str,
asset_id: int,
asset_code: str,
host: str,
port: int,
unit_id: int,
registers: list[tuple[int, str, int]],
db: asyncpg.Connection,
deye_physical_mode: str | None = None,
) -> list[int]:
"""
Vytvoří záznamy v modbus_command pro sadu zápisů.
Vrátí list command IDs.
Pro Deye se jméno registru bere z DEYE_REGISTER_NAMES (prostřední položka tuplu se ignoruje).
"""
ids: list[int] = []
for reg, _ignored_name, val in registers:
register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}")
cmd_id = await db.fetchval(
"""
INSERT INTO ems.modbus_command
(site_id, asset_type, asset_id, asset_code,
device_host, device_port, device_unit_id,
register, register_name, value_to_write,
planning_run_id, status, deye_physical_mode)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12)
RETURNING id
""",
site_id,
asset_type,
asset_id,
asset_code,
host,
port,
unit_id,
reg,
register_name,
val,
planning_run_id,
deye_physical_mode,
)
if cmd_id is not None:
ids.append(int(cmd_id))
return ids
def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]:
"""Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3."""
if not cmds:
return []
sorted_cmds = sorted(cmds, key=lambda c: int(c["register"]))
runs: list[list[asyncpg.Record]] = []
cur: list[asyncpg.Record] = [sorted_cmds[0]]
for c in sorted_cmds[1:]:
if int(c["register"]) == int(cur[-1]["register"]) + 1:
cur.append(c)
else:
runs.append(cur)
cur = [c]
runs.append(cur)
return runs
async def execute_modbus_commands(
command_ids: list[int],
db: asyncpg.Connection,
) -> bool:
"""
Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích).
Aktualizuje status na 'written' nebo 'failed'.
Vrátí True pokud všechny příkazy uspěly.
"""
MAX_RETRIES = 3
RETRY_DELAY = 0.5
rows: list[asyncpg.Record] = []
for cmd_id in command_ids:
cmd = await db.fetchrow(
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
)
if cmd is not None:
rows.append(cmd)
if not rows:
return True
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
for cmd in rows:
by_gw[
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
].append(cmd)
all_ok = True
for (host, port, unit), group in by_gw.items():
client = await get_modbus_client(host, port)
for run in _modbus_command_contiguous_runs(group):
start_reg = int(run[0]["register"])
values = [int(c["value_to_write"]) for c in run]
ids_run = [int(c["id"]) for c in run]
for attempt in range(MAX_RETRIES):
try:
await client.write_registers(start_reg, values, unit)
for cmd, val in zip(run, values):
cid = int(cmd["id"])
await db.execute(
"""
UPDATE ems.modbus_command
SET status='written', value_written=$1, written_at=now(),
attempt_count=attempt_count+1, error_msg=NULL
WHERE id=$2
""",
val,
cid,
)
logger.info(
"[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)",
cid,
cmd["asset_code"],
int(cmd["register"]),
val,
start_reg,
attempt + 1,
)
break
except Exception as e:
if attempt < MAX_RETRIES - 1:
logger.warning(
"Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...",
start_reg,
len(values),
attempt + 1,
e,
)
await asyncio.sleep(RETRY_DELAY)
await client.force_disconnect()
else:
for cmd in run:
await db.execute(
"""
UPDATE ems.modbus_command
SET status='failed', error_msg=$1,
attempt_count=attempt_count+1
WHERE id=$2
""",
str(e),
int(cmd["id"]),
)
logger.error(
"Modbus batch 0x%04X count=%s all %s attempts failed: %s",
start_reg,
len(values),
MAX_RETRIES,
e,
)
all_ok = False
return all_ok
async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None:
"""Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord."""
from services.notification_service import run_fn_set_mode_with_discord
await run_fn_set_mode_with_discord(
db,
site_id,
"SELF_SUSTAIN",
"system:mismatch",
None,
reason,
)
logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason)
def _modbus_cmd_register(cmd: Any) -> int:
"""asyncpg.Record má __getitem__; objekty s atributem .register též (testy)."""
try:
return int(cmd["register"])
except (KeyError, TypeError):
return int(cmd.register)
def _deye_expected_clock_triplet_for_verify(
bundle: list[asyncpg.Record],
last_verified: dict[int, int],
a62: int,
a63: int,
a64: int,
) -> tuple[int, int, int]:
"""
Sestaví očekávané (w62,w63,w64) pro toleranční verify.
Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení
(aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64).
"""
by_reg = {_modbus_cmd_register(c): c for c in bundle}
def _vtw(c: Any) -> int:
try:
return int(c["value_to_write"])
except (KeyError, TypeError):
return int(c.value_to_write)
w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62)
w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63)
w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64)
return (int(w62), int(w63), int(w64))
async def _verify_deye_clock_written_bundle(
site_id: int,
bundle: list[asyncpg.Record],
a62: int,
a63: int,
a64: int,
db: asyncpg.Connection,
) -> bool:
"""
Toleranční ověření pro jeden až tři řádky journalu 6264 ve stavu written.
Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech.
"""
from services.notification_service import (
notify_modbus_clock_verify_exhausted,
notify_modbus_mismatch,
)
cmds_s = sorted(bundle, key=_modbus_cmd_register)
try:
asset_id = int(cmds_s[0]["asset_id"])
except (KeyError, TypeError):
asset_id = int(cmds_s[0].asset_id)
last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db)
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64)
clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64)
actual_by_reg = {62: a62, 63: a63, 64: a64}
for cmd in cmds_s:
try:
cid = int(cmd["id"])
except (KeyError, TypeError):
cid = int(cmd.id)
r = _modbus_cmd_register(cmd)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_by_reg[r],
clock_ok,
cid,
)
if clock_ok:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
asset_id,
)
for cmd in cmds_s:
try:
cid_l = int(cmd["id"])
except (KeyError, TypeError):
cid_l = int(cmd.id)
try:
code_l = str(cmd["asset_code"])
except (KeyError, TypeError):
code_l = str(cmd.asset_code)
rr = _modbus_cmd_register(cmd)
logger.info(
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
cid_l,
code_l,
rr,
actual_by_reg[rr],
)
return True
cmd0 = cmds_s[0]
try:
ac0 = str(cmd0["asset_code"])
except (KeyError, TypeError):
ac0 = str(cmd0.asset_code)
logger.error(
"[cmd clock] MISMATCH %s 6264: written=(%s,%s,%s) actual=(%s,%s,%s)",
ac0,
w62,
w63,
w64,
a62,
a63,
a64,
)
attempts = 0
for cmd in cmds_s:
try:
cid_q = int(cmd["id"])
except (KeyError, TypeError):
cid_q = int(cmd.id)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q
)
ac = int(row_ac["attempt_count"] or 0) if row_ac else 0
attempts = max(attempts, ac)
await notify_modbus_mismatch(
db,
site_id,
ac0,
62,
"system_time_62_64",
w62,
a62,
attempts,
)
ids_ordered = []
for c in cmds_s:
try:
ids_ordered.append(int(c["id"]))
except (KeyError, TypeError):
ids_ordered.append(int(c.id))
if attempts < 3:
for cid in ids_ordered:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cid,
)
await execute_modbus_commands(ids_ordered, db)
await verify_modbus_commands(ids_ordered, db, site_id)
else:
logger.critical(
"[cmd clock] 3 failed verify attempts (6264); režim se nemění automaticky"
)
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
await notify_modbus_clock_verify_exhausted(
db,
site_id,
site["code"] if site else str(site_id),
ac0,
(w62, w63, w64),
(a62, a63, a64),
)
return False
async def verify_modbus_commands(
command_ids: list[int],
db: asyncpg.Connection,
site_id: int,
) -> bool:
"""
Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write.
Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145)
→ SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění.
"""
from services.notification_service import notify_modbus_mismatch
inv_cfg = await _load_inverter_config(site_id, db)
async def _apply_verify_result(
cmd: asyncpg.Record,
actual_i: int,
*,
client: Any,
unit: int,
) -> bool:
"""Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN)."""
reg = int(cmd["register"])
cmd_id = int(cmd["id"])
if reg in DEYE_CLOCK_REGS:
asset_id = int(cmd["asset_id"])
host = str(cmd["device_host"])
port_i = int(cmd["device_port"])
uid = int(cmd["device_unit_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port_i, uid, db
)
if not bundle:
bundle = [cmd]
try:
cvals = await client.read_holding_registers(62, 3, uid)
except Exception as e:
logger.error(
"verify clock guard read 6264 failed (reg 0x%04X): %s", reg, e
)
return False
if len(cvals) != 3:
logger.error(
"verify clock guard: expected 3 regs, got %s", len(cvals)
)
return False
logger.warning(
"Clock register 0x%04X reached strict verify path; using tolerant 6264 bundle",
reg,
)
return await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
expected_i = int(cmd["value_to_write"])
matches = actual_i == expected_i
if reg == 178:
first_178 = int(actual_i)
second_178: int | None = None
if not _deye_reg178_verify_match(expected_i, first_178):
try:
r178 = await client.read_holding_registers(178, 1, unit)
if r178 and len(r178) >= 1:
second_178 = int(r178[0])
except Exception as e:
logger.warning(
"[cmd %s] reg178 double-read failed: %s", cmd_id, e
)
matches, actual_i = _deye_reg178_verify_with_double_read(
expected_i, first_178, second_178
)
if (
matches
and second_178 is not None
and not _deye_reg178_verify_match(expected_i, first_178)
):
logger.info(
"[cmd %s] reg178 double-read recovered: first=%s second=%s",
cmd_id,
first_178,
second_178,
)
if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None:
matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg)
await db.execute(
"""
UPDATE ems.modbus_command
SET value_verified=$1::int, verified_at=now(),
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
actual_i,
matches,
cmd_id,
)
if not matches:
logger.error(
"[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
(
" (reg178 mask 0x%04X)" % REG178_VERIFY_MASK
if reg == 178
else ""
),
)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id
)
attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0
await notify_modbus_mismatch(
db,
site_id,
cmd["asset_code"],
reg,
cmd["register_name"] or "",
expected_i,
actual_i,
attempts,
)
if attempts < 3:
await db.execute(
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
cmd_id,
)
await execute_modbus_commands([cmd_id], db)
await verify_modbus_commands([cmd_id], db, site_id)
else:
if deye_reg_triggers_self_sustain_after_verify_exhaust(reg):
logger.critical(
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
cmd_id,
)
await _switch_to_self_sustain(
site_id,
db,
reason=(
f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} "
f"reg 0x{reg:04X}"
),
)
else:
logger.warning(
"[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X "
"(no mode change): %s",
cmd_id,
reg,
cmd["asset_code"],
)
return False
if reg == 178 and actual_i != expected_i:
logger.info(
"[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s",
cmd_id,
cmd["asset_code"],
reg,
expected_i,
actual_i,
)
else:
logger.info(
"[cmd %s] verified OK: %s 0x%04X=%s",
cmd_id,
cmd["asset_code"],
reg,
actual_i,
)
return True
cmds: list[asyncpg.Record] = []
for cmd_id in command_ids:
cmd = await db.fetchrow(
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
)
if cmd is not None and cmd["status"] == "written":
cmds.append(cmd)
if not cmds:
return True
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
for cmd in cmds:
by_gw[
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
].append(cmd)
all_ok = True
for (host, port, unit), group in by_gw.items():
client = await get_modbus_client(host, port)
clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS]
rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS]
if clock_cmds:
asset_id = int(clock_cmds[0]["asset_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port, unit, db
)
if not bundle:
bundle = clock_cmds
try:
cvals = await client.read_holding_registers(62, 3, unit)
except Exception as e:
logger.error("verify clock read 6264 failed: %s", e)
all_ok = False
else:
if len(cvals) != 3:
logger.error(
"verify clock read: expected 3 regs, got %s", len(cvals)
)
all_ok = False
else:
matched = await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
if not matched:
all_ok = False
for run in _modbus_command_contiguous_runs(rest):
start_reg = int(run[0]["register"])
n = len(run)
try:
values = await client.read_holding_registers(start_reg, n, unit)
except Exception as e:
logger.error(
"verify batch read 0x%04X count=%s failed: %s", start_reg, n, e
)
all_ok = False
continue
if len(values) != n:
logger.error(
"verify read 0x%04X: expected %s regs, got %s",
start_reg,
n,
len(values),
)
all_ok = False
continue
for cmd, actual in zip(run, values):
matched = await _apply_verify_result(
cmd, int(actual), client=client, unit=unit
)
if not matched:
all_ok = False
return all_ok
async def _fetch_operating_mode(site_id: int, db: asyncpg.Connection) -> OperatingModeInfo | None:
sql = """
SELECT som.mode_code, omd.battery_mode, omd.grid_mode,
omd.ev_enabled, omd.heat_pump_enabled, omd.loxone_mode_value,
som.valid_until
FROM ems.site_operating_mode som
JOIN ems.operating_mode_def omd ON omd.code = som.mode_code
WHERE som.site_id = $1
"""
row = await db.fetchrow(sql, site_id)
if row is None:
return None
vu = row["valid_until"]
if vu is not None:
now_utc = datetime.now(timezone.utc)
if vu.tzinfo is None:
vu = vu.replace(tzinfo=timezone.utc)
if vu <= now_utc:
exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()")
from services.notification_service import notify_operating_mode_changed
for er in exp_rows:
await notify_operating_mode_changed(
str(er["site_code"]),
str(er["old_mode"]),
str(er["new_mode"]),
"system:expiry",
"Automatické vypršení dočasného režimu",
)
row = await db.fetchrow(sql, site_id)
if row is None:
return None
return OperatingModeInfo(
mode_code=row["mode_code"],
battery_mode=row["battery_mode"],
grid_mode=row["grid_mode"],
ev_enabled=bool(row["ev_enabled"]),
heat_pump_enabled_def=bool(row["heat_pump_enabled"]),
loxone_mode_value=int(row["loxone_mode_value"]),
)
async def _get_current_soc(site_id: int, db: asyncpg.Connection) -> int:
soc = await db.fetchval(
"""
SELECT battery_soc_percent
FROM ems.telemetry_inverter
WHERE site_id = $1 AND battery_soc_percent IS NOT NULL
ORDER BY measured_at DESC
LIMIT 1
""",
site_id,
)
return int(soc) if soc is not None else 50
async def _load_inverter_config(
site_id: int, db: asyncpg.Connection
) -> InverterConfig | None:
row = await db.fetchrow(
"""
SELECT
ai.id, ai.code,
coalesce(ems.fn_inverter_pv_a_max_w(ai.id), 0) AS pv_a_cap_w,
se.host, se.port, se.unit_id,
sgc.max_export_power_w,
sgc.max_import_power_w,
sgc.no_export,
ai.max_battery_charge_w,
ai.max_battery_discharge_w,
ab.min_soc_percent,
ab.reserve_soc_percent,
ab.max_soc_percent,
ab.usable_capacity_wh,
ai.deye_last_system_time_sync_minute,
ai.deye_last_system_time_sync_at,
ai.deye_last_tou_inactive_write_prague_date,
ai.deye_tou_inactive_signature,
COALESCE(ai.deye_zero_export_mode, 1) AS deye_zero_export_mode,
COALESCE(ai.deye_gen_microinverter_cutoff_enabled, false) AS deye_gen_microinverter_cutoff_enabled,
coalesce(ems.fn_site_has_active_green_bonus_pv(ai.site_id), false)
AS deye_reg340_pv_a_control_enabled,
COALESCE(
ai.deye_register_max_charge_a,
FLOOR(
LEAST(
COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w),
ai.max_battery_charge_w
)::numeric / 51.2
)::int
) AS max_charge_a,
COALESCE(
ai.deye_register_max_discharge_a,
FLOOR(
LEAST(
COALESCE(ab.bms_max_discharge_w, ai.max_battery_discharge_w),
ai.max_battery_discharge_w
)::numeric / 51.2
)::int
) AS max_discharge_a
FROM ems.asset_inverter ai
JOIN ems.site_endpoint se ON se.id = ai.endpoint_id
JOIN ems.asset_battery ab ON ab.inverter_id = ai.id
LEFT JOIN ems.site_grid_connection sgc ON sgc.site_id = ai.site_id
WHERE ai.site_id = $1
AND ai.active = true
AND ai.controllable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
ORDER BY ai.id
LIMIT 1
""",
site_id,
)
if row is None:
return None
mc = row["max_charge_a"]
md = row["max_discharge_a"]
max_charge_a = int(mc) if mc is not None else 0
max_discharge_a = int(md) if md is not None else 0
# Firmware Deye často drží max 350 A — vyšší hodnota z DB → mismatch 351 vs 350.
max_charge_a = min(max_charge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A)
max_discharge_a = min(max_discharge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A)
port = int(row["port"] or 502)
uid = int(row["unit_id"] if row["unit_id"] is not None else 1)
return InverterConfig(
id=int(row["id"]),
code=row["code"],
host=row["host"],
port=port,
unit_id=uid,
max_export_power_w=int(row["max_export_power_w"])
if row["max_export_power_w"] is not None
else None,
max_import_power_w=int(row["max_import_power_w"])
if row["max_import_power_w"] is not None
else None,
no_export=bool(row["no_export"] or False),
max_battery_charge_w=int(row["max_battery_charge_w"])
if row["max_battery_charge_w"] is not None
else None,
max_battery_discharge_w=int(row["max_battery_discharge_w"])
if row["max_battery_discharge_w"] is not None
else None,
min_soc_percent=int(round(float(row["min_soc_percent"])))
if row["min_soc_percent"] is not None
else None,
reserve_soc_percent=int(row["reserve_soc_percent"])
if row["reserve_soc_percent"] is not None
else None,
max_soc_percent=int(row["max_soc_percent"])
if row["max_soc_percent"] is not None
else None,
usable_capacity_wh=int(row["usable_capacity_wh"])
if row["usable_capacity_wh"] is not None
else None,
max_charge_a=max_charge_a,
max_discharge_a=max_discharge_a,
deye_last_system_time_sync_minute=row["deye_last_system_time_sync_minute"],
deye_last_system_time_sync_at=row["deye_last_system_time_sync_at"],
deye_last_tou_inactive_write_prague_date=row[
"deye_last_tou_inactive_write_prague_date"
],
deye_tou_inactive_signature=row["deye_tou_inactive_signature"],
deye_zero_export_mode=int(row["deye_zero_export_mode"]),
deye_gen_microinverter_cutoff_enabled=bool(row["deye_gen_microinverter_cutoff_enabled"] or False),
pv_a_cap_w=int(row["pv_a_cap_w"] or 0),
deye_reg340_pv_a_control_enabled=bool(
row["deye_reg340_pv_a_control_enabled"] or False
),
)
async def _fetch_plan_row_for_slot_offset(
site_id: int, db: asyncpg.Connection, slot_offset: int
) -> asyncpg.Record | None:
"""Řádek plánu pro slot z ems.fn_planning_interval_at_offset (jsonb → Record-like dict)."""
raw = await db.fetchval(
"""
select ems.fn_planning_interval_at_offset($1::int, $2::int)
""",
site_id,
slot_offset,
)
if raw is None:
return None
data = raw if isinstance(raw, dict) else json.loads(raw)
if not data:
return None
return _DictRecord(data)
async def _fetch_max_charge_power_w(site_id: int, db: asyncpg.Connection) -> int:
v = await db.fetchval(
"select ems.fn_planning_max_effective_charge_w($1::int)",
site_id,
)
return int(v or 0)
async def write_inverter_setpoints(
site_id: int,
setpoints_now: ControlSetpoints,
setpoints_next: ControlSetpoints | None,
db: asyncpg.Connection,
planning_run_id: int | None = None,
) -> str:
inv = await _load_inverter_config(site_id, db)
if inv is None:
return "FAIL inverter: no controllable Modbus endpoint"
unit_id = int(inv.unit_id if inv.unit_id is not None else 1)
raw_bat = setpoints_now.battery_w
grid_w = int(setpoints_now.grid_setpoint_w or 0)
no_export = inv.no_export
export_lim = _deye_reg143_export_w(no_export, inv.max_export_power_w)
max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
tp_discharge_w = 0 if setpoints_now.lock_battery else max_batt_w_discharge
tou_min_pct = _deye_tou_min_soc_pct(inv)
tou_reserve_pct = _deye_tou_reserve_soc_pct(inv)
try:
soc_telemetry = await _get_current_soc(site_id, db)
deye_mode = get_deye_mode(setpoints_now)
bat_w = int(raw_bat) if raw_bat is not None else 0
if setpoints_now.lock_battery:
charge_a = 0
discharge_a = 0
elif deye_mode == "CHARGE":
charge_a = battery_watts_to_amps(bat_w, inv.max_charge_a)
discharge_a = 0
elif deye_mode == "SELL":
# Záměrný výdej baterie do sítě: plný vybíjecí proud; export strop dle plánu níže.
charge_a = 0
discharge_a = int(inv.max_discharge_a)
elif setpoints_now.self_sustain_local_use:
# SELF_SUSTAIN: plný nabíjecí i vybíjecí proud invertoru — přebytek FVE jde do baterie,
# reg. 142 = zero export to load/CT (viz selling_mode níže), ne reg. 108 = 0.
charge_a = int(inv.max_charge_a)
discharge_a = int(inv.max_discharge_a)
else:
# PASSIVE (ZERO): výchozí plné 108/109; u přetoku FVE do sítě nebo importu bez baterie viz helper.
charge_a, discharge_a = _deye_zero_export_amps_for_passive(
grid_w,
bat_w,
int(inv.max_charge_a),
int(inv.max_discharge_a),
)
zero_exp_mode = int(inv.deye_zero_export_mode or 1)
selling_mode = 0 if deye_mode == "SELL" else zero_exp_mode
solar_sell = 0 if (setpoints_now.export_ban and deye_mode != "SELL") else 1
export_limit = export_lim
if deye_mode == "SELL" and grid_w < 0:
export_limit = min(export_lim, max(REG143_SELL_CAP_MIN_W, abs(grid_w)))
reg178_val = REG178_SELL if deye_mode == "SELL" else REG178_PASSIVE
logger.info(
f"[control] site={site_id} fyzický režim Deye: {deye_mode} | "
f"battery_w={raw_bat!r} grid_w={grid_w} | "
f"charge_a={charge_a} discharge_a={discharge_a} | "
f"reg142={selling_mode} reg145={solar_sell} reg143={export_limit}W reg178={reg178_val}"
)
now_cet, time_rows = _deye_system_time_register_rows()
skip_time = False
try:
mb_clock = await get_modbus_client(inv.host, inv.port)
tvals = await mb_clock.read_holding_registers(
62, 3, int(inv.unit_id if inv.unit_id is not None else 1)
)
if len(tvals) == 3:
skip_time = _deye_should_skip_time_sync_after_read(
inv, int(tvals[0]), int(tvals[1]), int(tvals[2])
)
else:
logger.warning(
"Deye clock read: expected 3 registers, got %s; will sync 6264",
len(tvals),
)
except Exception as e:
logger.warning("Deye clock read failed (will sync 6264): %s", e)
if skip_time:
logger.info(
"Deye clock 6264 skipped (drift ≤ %ss, last sync < %sh ago): %s CET",
DEYE_CLOCK_DRIFT_OK_SEC,
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
now_cet.strftime("%Y-%m-%d %H:%M:%S"),
)
else:
logger.info("Deye time will sync: %s CET", now_cet.strftime("%Y-%m-%d %H:%M:%S"))
registers: list[tuple[int, str, int]] = [] if skip_time else list(time_rows)
sp_tp2 = setpoints_next if setpoints_next is not None else setpoints_now
hh_cur = current_slot_hhmm()
hh_nxt = next_slot_hhmm()
p1, s1, g1 = _deye_tou_params(setpoints_now, inv)
p2, s2, g2 = _deye_tou_params(sp_tp2, inv)
registers.extend(_deye_time_point_rows(0, hh_cur, p1, s1, g1))
registers.extend(_deye_time_point_rows(1, hh_nxt, p2, s2, g2))
prague_date = datetime.now(PRAGUE_TZ).date()
inactive_sig = (
f"{DEYE_TOU_INACTIVE_HHMM}|{tou_min_pct}|{tou_reserve_pct}|{tp_discharge_w}"
)
need_inactive_tou = (
inv.deye_last_tou_inactive_write_prague_date != prague_date
or inv.deye_tou_inactive_signature != inactive_sig
)
if need_inactive_tou:
for idx in range(2, 6):
registers.extend(
_deye_time_point_rows(
idx, DEYE_TOU_INACTIVE_HHMM, tp_discharge_w, tou_min_pct, False
)
)
else:
logger.debug(
"Deye TOU rows 36 skipped (already written today, signature unchanged)"
)
registers.extend(
[
(108, "", charge_a),
(109, "", discharge_a),
(141, "energy_mode (0)", 0),
(142, "limit_control", selling_mode),
(143, "", export_limit),
(145, "solar_sell", solar_sell),
]
)
if (
bool(inv.deye_reg340_pv_a_control_enabled)
and int(inv.pv_a_cap_w) > 0
and setpoints_now.pv_a_allowed_w is not None
):
registers.append((340, "max_solar_power_w", int(setpoints_now.pv_a_allowed_w)))
# Reg 178: bitové pole. Nastavujeme bits45 (peak shaving) vždy; bits01 (MI export cutoff) jen pokud feature.
# Ostatní bity musí zůstat zachované → read-modify-write.
try:
mb178 = await get_modbus_client(inv.host, inv.port)
r178 = await mb178.read_holding_registers(178, 1, unit_id)
if not r178 or len(r178) < 1:
raise OSError(f"reg178 read returned {len(r178) if r178 is not None else None} values")
current_178 = int(r178[0])
peak_bits = int(reg178_val) & int(REG178_VERIFY_MASK)
if inv.deye_gen_microinverter_cutoff_enabled:
want_cutoff = bool(setpoints_now.deye_gen_cutoff_enabled) and deye_mode != "SELL"
mi_bits = (
REG178_MI_EXPORT_ENABLE if want_cutoff else REG178_MI_EXPORT_DISABLE
)
else:
mi_bits = int(current_178) & int(REG178_MI_EXPORT_MASK)
new_178 = (
(int(current_178) & ~int(REG178_VERIFY_MASK_COMBINED))
| int(peak_bits)
| int(mi_bits)
)
registers.append((178, "control_board_special_1", int(new_178)))
logger.info(
"[control] %s: reg178 (control_board_special_1) old=%s new=%s peak_bits=0x%04X mi_bits=%s",
inv.code,
current_178,
new_178,
int(peak_bits),
int(mi_bits),
)
except Exception as e:
logger.warning("[control] %s: reg178 RMW failed (skip reg178 write): %s", inv.code, e)
logger.info(
"[control] %s: deye_mode=%s charge=%sA discharge=%sA "
"reg142=%s reg145=%s export=%sW "
"tp1=%s tp2=%s soc=%s%% (batt=%r grid=%sW)",
inv.code,
deye_mode,
charge_a,
discharge_a,
selling_mode,
solar_sell,
export_limit,
hh_cur,
hh_nxt,
soc_telemetry,
raw_bat,
grid_w,
)
last_verified = await _fetch_last_verified_inverter_registers(site_id, inv.id, db)
registers, skipped_unchanged = _drop_registers_matching_last_verified(
registers, last_verified
)
if skipped_unchanged:
logger.info(
"[control] %s: skip %s registers (value equals last verified): %s",
inv.code,
len(skipped_unchanged),
skipped_unchanged[:24],
)
if not registers:
logger.info(
"[control] %s: all Deye holding regs match last verified, no Modbus write",
inv.code,
)
if need_inactive_tou:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_tou_inactive_write_prague_date = $1,
deye_tou_inactive_signature = $2
WHERE id = $3
""",
prague_date,
inactive_sig,
inv.id,
)
return (
f"OK inverter: batt_w={raw_bat!r} (no changes vs last verified Modbus snapshot)"
)
will_write_inactive = any(
int(r) in _DEYE_INACTIVE_TOU_REGISTERS for r, _, _ in registers
)
cmd_ids = await create_modbus_commands(
site_id,
planning_run_id,
"inverter",
inv.id,
inv.code,
inv.host,
inv.port,
inv.unit_id,
registers,
db,
deye_physical_mode=deye_mode,
)
if not await execute_modbus_commands(cmd_ids, db):
return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)"
logger.info("[control] Inverter %s journal write OK", inv.code)
will_write_time = any(int(r) in DEYE_CLOCK_REGS for r, _, _ in registers)
if will_write_time:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
inv.id,
)
if need_inactive_tou or will_write_inactive:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_tou_inactive_write_prague_date = $1,
deye_tou_inactive_signature = $2
WHERE id = $3
""",
prague_date,
inactive_sig,
inv.id,
)
except Exception as e:
return f"FAIL inverter: {inv.code}: {e}"
return (
f"OK inverter: batt_w={raw_bat!r} "
f"(time points + FC 0x10: 108/109/141/142/178/143/145/340 dle plánu)"
)
async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict[str, Any]:
"""
Živé čtení holding registrů Deye 108, 109, 141145, 178, 191 a volitelně 340
(jen pokud `deye_reg340_pv_a_control_enabled`, jinak `reg340_max_solar_power_w` = null).
Vše pod jedním mutexem + sdružené FC3 bloky — mezi jednotlivými read_register dřív telemetrie
střídavě brala lock a RS485 brány házely cizí transaction_id / I/O timeouty.
"""
inv = await _load_inverter_config(site_id, db)
if inv is None:
raise ValueError("no controllable Modbus inverter for site")
uid = int(inv.unit_id)
client = await get_modbus_client(inv.host, inv.port)
read_at = datetime.now(timezone.utc)
try:
async with client.batch(uid) as mb:
b108 = await mb.read_holding_registers(108, 2)
b141 = await mb.read_holding_registers(141, 5)
r178 = await mb.read_holding_registers(178, 1)
r191 = await mb.read_holding_registers(191, 1)
if inv.deye_reg340_pv_a_control_enabled:
r340 = await mb.read_holding_registers(340, 1)
else:
r340 = None
r108, r109 = b108[0], b108[1]
r141, r142, r143 = b141[0], b141[1], b141[2]
r145 = b141[4]
r178 = r178[0]
r191 = r191[0]
r340v = (
int(r340[0])
if r340 is not None and len(r340) >= 1
else None
)
except Exception:
logger.exception("read_deye_registers_live site=%s failed", site_id)
raise
return {
"reg108_charge_a": int(r108),
"reg109_discharge_a": int(r109),
"reg141_energy_mode": int(r141),
"reg142_limit_control": int(r142),
"reg143_export_limit_w": int(r143),
"reg145_solar_sell": int(r145),
"reg178_peak_shaving_switch": int(r178),
"reg178_control_board_special_1": int(r178),
"reg178_mi_export_cutoff_bits": int(r178) & int(REG178_MI_EXPORT_MASK),
"reg178_mi_export_cutoff_is_on": (int(r178) & int(REG178_MI_EXPORT_MASK)) == int(REG178_MI_EXPORT_ENABLE),
"reg191_peak_shaving_w": int(r191),
"reg340_max_solar_power_w": r340v,
"read_at": read_at.isoformat(),
}
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
c = (charger_code or "").strip().lower()
if c == "ev-charger-1":
a = sp.ev1_current_a
elif c == "ev-charger-2":
a = sp.ev2_current_a
elif c.endswith("-1") or c == "ev1":
a = sp.ev1_current_a
elif c.endswith("-2") or c == "ev2":
a = sp.ev2_current_a
else:
a = 0
if a < 6:
a = 0
return a
async def write_ev_setpoints(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str:
rows = await db.fetch(
"""
SELECT ec.code, se.host, se.port, se.unit_id
FROM ems.asset_ev_charger ec
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
WHERE ec.site_id = $1
AND ec.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
ORDER BY ec.code
""",
site_id,
)
if not rows:
return "OK EV: no schedulable chargers"
for row in rows:
code = row["code"]
current_a = _current_limit_for_charger(code, setpoints)
logger.info(
"EV setpoint [%s]: %sA (TODO: Modbus registers)",
code,
current_a,
)
return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)"
async def write_heat_pump_setpoint(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str:
rows = await db.fetch(
"""
SELECT hp.code, se.host, se.port, se.unit_id
FROM ems.asset_heat_pump hp
JOIN ems.site_endpoint se ON se.id = hp.endpoint_id
WHERE hp.site_id = $1
AND hp.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
if not rows:
return "OK heat pump: no schedulable unit"
for row in rows:
logger.info(
"HP setpoint [%s]: enable=%s (TODO: Modbus registers)",
row["code"],
setpoints.heat_pump_enable,
)
return "OK heat pump: logged (Modbus TODO)"
async def send_loxone_setpoints(
site_id: int,
setpoints: ControlSetpoints,
mode: OperatingModeInfo,
db: asyncpg.Connection,
) -> str:
endpoint = await db.fetchrow(
"""
SELECT host, port, protocol
FROM ems.site_endpoint
WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true
ORDER BY id
LIMIT 1
""",
site_id,
)
if not endpoint:
return "OK Loxone: no endpoint, skipped"
proto = (endpoint["protocol"] or "http").lower()
if proto not in ("http", "https"):
proto = "http"
host = endpoint["host"]
port = int(endpoint["port"] or (443 if proto == "https" else 80))
base = f"{proto}://{host}:{port}/dev/sps/io"
settings = get_settings()
user = settings.loxone_user or os.getenv("LOXONE_USER") or ""
password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or ""
auth = (user, password) if user else None
batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
paths: list[tuple[str, int]] = [
(f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value),
(f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display),
(f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w),
(f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w),
(f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w),
(f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", 1 if setpoints.heat_pump_enable else 0),
]
errs: list[str] = []
try:
async with httpx.AsyncClient(timeout=5.0) as client:
for url, _ in paths:
try:
r = await client.get(url, auth=auth)
r.raise_for_status()
except Exception as e:
errs.append(f"{url!s}: {e}")
except Exception as e:
return f"FAIL Loxone: client {e}"
if errs:
return "FAIL Loxone: " + "; ".join(errs[:3])
return "OK Loxone: all virtual inputs updated"
async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None:
mode = await _fetch_operating_mode(site_id, db)
if mode is None:
logger.warning("control export site=%s: no operating mode row", site_id)
return
if mode.mode_code == "MANUAL":
logger.info("control export site=%s: MANUAL, skip writes", site_id)
return
try:
inv_for_pv = await _load_inverter_config(site_id, db)
cap_pv = int(inv_for_pv.pv_a_cap_w) if inv_for_pv is not None else 0
reg340_en = (
bool(inv_for_pv.deye_reg340_pv_a_control_enabled)
if inv_for_pv is not None
else False
)
pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0)
pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1)
sp_now = _build_setpoints(
mode,
pi_now,
pv_a_cap_w=cap_pv,
reg340_pv_a_control_enabled=reg340_en,
)
sp_next = _build_setpoints(
mode,
pi_next,
pv_a_cap_w=cap_pv,
reg340_pv_a_control_enabled=reg340_en,
)
if mode.mode_code == "AUTO" and sp_now is None:
if pi_now is None:
logger.warning(
"control export site=%s: AUTO but no planning_interval for current slot, skip",
site_id,
)
return
if sp_now is None:
logger.warning(
"control export site=%s: no setpoints for mode %s, skip",
site_id,
mode.mode_code,
)
return
if mode.mode_code == "CHARGE_CHEAP":
max_ch = await _fetch_max_charge_power_w(site_id, db)
# Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB.
pw = max(1, int(max_ch))
sp_now = ControlSetpoints(
battery_w=pw,
grid_export_limit=0,
ev1_current_a=0,
ev2_current_a=0,
heat_pump_enable=False,
grid_setpoint_w=pw,
ev1_power_w=0,
ev2_power_w=0,
target_soc_pct=None,
effective_sell_price_czk_kwh=None,
)
sp_next = sp_now
else:
sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now)
if sp_next is not None:
sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next)
planning_run_id = await db.fetchval(
"""
SELECT id FROM ems.planning_run
WHERE site_id = $1 AND status = 'active'
ORDER BY created_at DESC
LIMIT 1
""",
site_id,
)
if planning_run_id is not None:
planning_run_id = int(planning_run_id)
try:
inv_res = await write_inverter_setpoints(
site_id, sp_now, sp_next, db, planning_run_id=planning_run_id
)
except Exception as e:
logger.error("inverter write failed: %s", e)
inv_res = f"FAIL inverter: {e}"
try:
ev_res = await write_ev_setpoints(site_id, sp_now, db)
except Exception as e:
logger.error("ev write failed: %s", e)
ev_res = f"FAIL ev: {e}"
try:
hp_res = await write_heat_pump_setpoint(site_id, sp_now, db)
except Exception as e:
logger.error("hp write failed: %s", e)
hp_res = f"FAIL heat pump: {e}"
try:
lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db)
except Exception as e:
logger.error("loxone write failed: %s", e)
lox_res = f"FAIL Loxone: {e}"
results = list(
zip(
("inverter", "ev", "heat_pump", "loxone"),
(inv_res, ev_res, hp_res, lox_res),
)
)
for name, res in results:
if isinstance(res, Exception):
logger.error("control export site=%s %s: FAIL %s", site_id, name, res)
elif isinstance(res, str) and res.startswith("FAIL"):
logger.error("control export site=%s %s: %s", site_id, name, res)
else:
logger.info("control export site=%s %s: %s", site_id, name, res)
finally:
try:
await enqueue_site_signals(site_id, db)
except Exception as e:
logger.warning(
"control export site=%s: signal enqueue failed: %s", site_id, e
)