2065 lines
74 KiB
Python
2065 lines
74 KiB
Python
"""Export plánovaných setpointů na Modbus (Deye, EV, TČ) a HTTP do Loxone."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
from collections import defaultdict
|
||
from dataclasses import dataclass
|
||
from typing import Any
|
||
from datetime import date, datetime, timedelta, timezone
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import asyncpg
|
||
import httpx
|
||
|
||
from app.config import get_settings
|
||
from services.modbus_client import get_modbus_client
|
||
from services.signal_service import enqueue_site_signals
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
PRAGUE_TZ = ZoneInfo("Europe/Prague")
|
||
|
||
# Hodiny Deye 62–64: po zápisu sekundy na zařízení dál běží → verify musí být toleranční.
|
||
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120
|
||
# Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund…
|
||
DEYE_CLOCK_DRIFT_OK_SEC = 60
|
||
# …a zároveň neuplynul tento interval od posledního syncu / potvrzení driftu.
|
||
DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24
|
||
|
||
# Deye LV baterie: převod výkon → proud pro registry 108/109 (viz docs/04-modules/modbus-registers.md)
|
||
BATT_VOLTAGE_V = 51.2
|
||
|
||
# Reg 143 ve SELL: min(|grid_setpoint_w|, …) nesmí klesnout pod tuto podlahu (W) — kvůli chování firmware, ne mapování režimu.
|
||
REG143_SELL_CAP_MIN_W = 200
|
||
|
||
# Reg 178 – pevné hodnoty (bit4–5); bez read-modify-write (kolize s Loxone / transaction ID)
|
||
REG178_SELL = 0b00100000 # 32, grid peak shaving disable
|
||
REG178_PASSIVE = 0b00110000 # 48, grid peak shaving enable (PASSIVE i CHARGE)
|
||
# TOU reg 166+ ve PASSIVE při prioritě baterie: signál střídači „využij celý dostupný rozsah“,
|
||
# ne provozní strop z DB (ten je pro LP / Wh – viz asset_battery.max_soc_percent).
|
||
DEYE_TOU_SOC_PASSIVE_BATTERY_PRIORITY_PCT = 100
|
||
# Verify: jen bity 4–5 (horní byte layout v dokumentaci); ostatní bity mohou mít firmware / Loxone
|
||
REG178_VERIFY_MASK = 0x0030
|
||
|
||
# Reg 179 – Control board special 1: bits 0–1 ovládají MI export cutoff (AC coupling / GEN).
|
||
REG179_MI_EXPORT_MASK = 0x0003
|
||
REG179_MI_EXPORT_DISABLE = 0b10
|
||
REG179_MI_EXPORT_ENABLE = 0b11
|
||
|
||
def _deye_reg179_verify_match(expected_i: int, actual_i: int) -> bool:
|
||
return (int(expected_i) & REG179_MI_EXPORT_MASK) == (int(actual_i) & REG179_MI_EXPORT_MASK)
|
||
|
||
# Po 3 neúspěšných verify pokusech → SELF_SUSTAIN jen u těchto registrech (bezpečnost / export).
|
||
# 62–64 řeší toleranční bundle (nemění režim). 178 a TOU power W jsou „soft“ — jen log + Discord.
|
||
DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145})
|
||
# Výkonové řádky TOU (154 + slot_index 0…5) — firmware často přepíše na max W z max_charge/max_discharge A.
|
||
DEYE_TOU_POWER_REGS = frozenset(range(154, 160))
|
||
# Deye LV: firmware často odmítne 351 A a drží 350 — horní strop pro zápis z DB.
|
||
DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350
|
||
|
||
|
||
def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool:
|
||
return (int(expected_i) & REG178_VERIFY_MASK) == (int(actual_i) & REG178_VERIFY_MASK)
|
||
|
||
|
||
def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool:
|
||
"""True = po 3× mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr)."""
|
||
return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN
|
||
|
||
|
||
def _deye_tou_power_verify_match(
|
||
expected_i: int, actual_i: int, inv: InverterConfig
|
||
) -> bool:
|
||
"""Firmware často clampne TOU power W na max z reg. 108/109 × 51.2 V — akceptovat jako OK."""
|
||
if int(actual_i) == int(expected_i):
|
||
return True
|
||
# 51.2 V — nesmí int(BATT_VOLTAGE_V)==51 (off-by-one vs. firmware 17920 W @ 350 A)
|
||
max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V)
|
||
max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
|
||
a = int(actual_i)
|
||
return a == max_w_charge or a == max_w_discharge
|
||
|
||
|
||
def _deye_reg178_verify_with_double_read(
|
||
expected_i: int, actual_first: int, actual_second: int | None
|
||
) -> tuple[bool, int]:
|
||
"""
|
||
Vrátí (shoda, hodnota_pro_journal).
|
||
Druhé čtení použít jen když první neprojde maskou (RS485 / glitch).
|
||
"""
|
||
if _deye_reg178_verify_match(expected_i, actual_first):
|
||
return True, actual_first
|
||
if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second):
|
||
return True, int(actual_second)
|
||
return False, actual_first
|
||
|
||
# Neaktivní TOU bloky (3–6): „konec dne“ — Deye často 23:59 (2359) neuloží a vrátí např. 2355,
|
||
# verify pak hlásí mismatch. 23:55 je na zařízeních stabilní (viz HHMM jako desítkové číslo).
|
||
DEYE_TOU_INACTIVE_HHMM = 2355
|
||
|
||
# Registry TOU řádků 3–6 (slot index 2…5): 150–153, 156–159, … — pro detekci skutečného zápisu po filtru „unchanged“.
|
||
_DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset(
|
||
[
|
||
150, 151, 152, 153,
|
||
156, 157, 158, 159,
|
||
168, 169, 170, 171,
|
||
174, 175, 176, 177,
|
||
]
|
||
)
|
||
|
||
# Systémový čas Deye — vždy toleranční verify jako celek 62–64 (reg 64 sám nesmí do striktní větve).
|
||
DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64})
|
||
|
||
DEYE_REGISTER_NAMES: dict[int, str] = {
|
||
108: "max_charge_a (max nabíjecí proud baterie)",
|
||
109: "max_discharge_a (max vybíjecí proud baterie)",
|
||
141: "energy_mode (0, EMS nemění)",
|
||
142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)",
|
||
143: "export_limit_w (max export do sítě)",
|
||
145: "solar_sell (0=disabled, 1=enabled)",
|
||
178: "grid_peak_shaving_switch (SELL=32 bit4-5=10, PASSIVE/CHARGE=48 bit4-5=11)",
|
||
179: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3)",
|
||
148: "time_point_1_time",
|
||
149: "time_point_2_time",
|
||
154: "time_point_1_power_w",
|
||
155: "time_point_2_power_w",
|
||
166: "time_point_1_soc_min_pct",
|
||
167: "time_point_2_soc_min_pct",
|
||
172: "time_point_1_grid_charge",
|
||
173: "time_point_2_grid_charge",
|
||
62: "system_time_year_month",
|
||
63: "system_time_day_hour",
|
||
64: "system_time_min_sec",
|
||
}
|
||
for _tp_i in range(6):
|
||
_n = _tp_i + 1
|
||
DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time")
|
||
DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w")
|
||
DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct")
|
||
DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge")
|
||
|
||
|
||
def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int:
|
||
if not power_w or power_w <= 0:
|
||
return 0
|
||
return min(32, max(0, int(power_w / (phases * voltage))))
|
||
|
||
|
||
def battery_watts_to_amps(power_w: int, max_amps: int) -> int:
|
||
"""Proud z |výkonu| baterie; max_amps z DB (už COALESCE se stropy v SQL).
|
||
|
||
int(|W|/51.2) — u kladných hodnot stejné jako floor bez importu math.
|
||
"""
|
||
derived = int(abs(power_w) / BATT_VOLTAGE_V)
|
||
return min(max(0, max_amps), max(0, derived))
|
||
|
||
|
||
def current_slot_hhmm() -> int:
|
||
"""Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM (např. 1415)."""
|
||
now = datetime.now(ZoneInfo("Europe/Prague"))
|
||
slot_min = (now.minute // 15) * 15
|
||
return now.hour * 100 + slot_min
|
||
|
||
|
||
def next_slot_hhmm() -> int:
|
||
"""Začátek příštího 15min slotu v Europe/Prague, formát HHMM (např. 1430)."""
|
||
now = datetime.now(ZoneInfo("Europe/Prague"))
|
||
minutes = now.minute
|
||
slot_minutes = ((minutes // 15) + 1) * 15
|
||
if slot_minutes >= 60:
|
||
next_hour = (now.hour + 1) % 24
|
||
next_min = 0
|
||
else:
|
||
next_hour = now.hour
|
||
next_min = slot_minutes
|
||
return next_hour * 100 + next_min
|
||
|
||
|
||
@dataclass
|
||
class InverterConfig:
|
||
id: int
|
||
code: str
|
||
host: str
|
||
port: int
|
||
unit_id: int
|
||
max_export_power_w: int | None
|
||
max_import_power_w: int | None
|
||
no_export: bool
|
||
max_battery_charge_w: int | None
|
||
max_battery_discharge_w: int | None
|
||
min_soc_percent: int | None
|
||
reserve_soc_percent: int | None
|
||
max_soc_percent: int | None
|
||
usable_capacity_wh: int | None
|
||
max_charge_a: int
|
||
max_discharge_a: int
|
||
deye_last_system_time_sync_minute: datetime | None = None
|
||
deye_last_system_time_sync_at: datetime | None = None
|
||
deye_last_tou_inactive_write_prague_date: date | None = None
|
||
deye_tou_inactive_signature: str | None = None
|
||
deye_zero_export_mode: int = 1
|
||
deye_gen_microinverter_cutoff_enabled: bool = False
|
||
|
||
|
||
def _prague_minute_start_utc() -> datetime:
|
||
"""UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague."""
|
||
p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
|
||
return p.astimezone(timezone.utc)
|
||
|
||
|
||
def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None:
|
||
"""Dekódování reg 62–64 (Deye system time v Europe/Prague)."""
|
||
try:
|
||
year = (int(r62) >> 8) + 2000
|
||
month = int(r62) & 0xFF
|
||
day = int(r63) >> 8
|
||
hour = int(r63) & 0xFF
|
||
minute = int(r64) >> 8
|
||
second = int(r64) & 0xFF
|
||
if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23):
|
||
return None
|
||
if not (0 <= minute <= 59 and 0 <= second <= 59):
|
||
return None
|
||
return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ)
|
||
except (ValueError, OverflowError):
|
||
return None
|
||
|
||
|
||
def _deye_clock_registers_verify_match(
|
||
w62: int,
|
||
w63: int,
|
||
w64: int,
|
||
a62: int,
|
||
a63: int,
|
||
a64: int,
|
||
) -> bool:
|
||
w_dt = _deye_registers_to_prague_datetime(w62, w63, w64)
|
||
a_dt = _deye_registers_to_prague_datetime(a62, a63, a64)
|
||
if w_dt is None or a_dt is None:
|
||
return False
|
||
return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC
|
||
|
||
|
||
def _deye_should_skip_time_sync_after_read(
|
||
inv: InverterConfig,
|
||
r62: int,
|
||
r63: int,
|
||
r64: int,
|
||
) -> bool:
|
||
"""
|
||
True = nezařazovat zápis 62–64: drift je malý a od posledního úspěšného zápisu (FC 0x10 ACK)
|
||
nebo tolerančního ověření neuplynulo 24h — sloupec deye_last_system_time_sync_at doplňuje
|
||
write_inverter_setpoints po úspěšném zápisu batche obsahujícího 62–64 a znovu po úspěšném verify.
|
||
"""
|
||
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
|
||
if dev is None:
|
||
return False
|
||
wall = datetime.now(PRAGUE_TZ)
|
||
drift = abs((wall - dev).total_seconds())
|
||
if drift > DEYE_CLOCK_DRIFT_OK_SEC:
|
||
return False
|
||
last_write = inv.deye_last_system_time_sync_at
|
||
if last_write is None:
|
||
return False
|
||
if last_write.tzinfo is None:
|
||
last_write = last_write.replace(tzinfo=timezone.utc)
|
||
else:
|
||
last_write = last_write.astimezone(timezone.utc)
|
||
age = datetime.now(timezone.utc) - last_write
|
||
if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS):
|
||
return False
|
||
return True
|
||
|
||
|
||
async def _fetch_written_deye_clock_commands(
|
||
site_id: int,
|
||
asset_id: int,
|
||
host: str,
|
||
port: int,
|
||
unit_id: int,
|
||
db: asyncpg.Connection,
|
||
) -> list[asyncpg.Record]:
|
||
"""Všechny řádky journalu 62–64 ve stavu written pro daný invertor/endpoint."""
|
||
rows = await db.fetch(
|
||
"""
|
||
SELECT * FROM ems.modbus_command
|
||
WHERE site_id = $1
|
||
AND asset_type = 'inverter'
|
||
AND asset_id = $2
|
||
AND device_host = $3
|
||
AND device_port = $4
|
||
AND device_unit_id = $5
|
||
AND register IN (62, 63, 64)
|
||
AND status = 'written'
|
||
ORDER BY register
|
||
""",
|
||
site_id,
|
||
asset_id,
|
||
host,
|
||
port,
|
||
unit_id,
|
||
)
|
||
return list(rows)
|
||
|
||
|
||
async def _fetch_last_verified_inverter_registers(
|
||
site_id: int, inverter_asset_id: int, db: asyncpg.Connection
|
||
) -> dict[int, int]:
|
||
"""
|
||
Poslední hodnota na zařízení podle journalu (jen status verified).
|
||
Slouží k přeskočení duplicitního zápisu stejné hodnoty.
|
||
"""
|
||
raw = await db.fetchval(
|
||
"""
|
||
select ems.fn_modbus_last_verified_map($1::int, $2::int)
|
||
""",
|
||
site_id,
|
||
inverter_asset_id,
|
||
)
|
||
data = raw if isinstance(raw, dict) else json.loads(raw)
|
||
return {int(k): int(v) for k, v in data.items()}
|
||
|
||
|
||
def _drop_registers_matching_last_verified(
|
||
registers: list[tuple[int, str, int]],
|
||
last_verified: dict[int, int],
|
||
) -> tuple[list[tuple[int, str, int]], list[int]]:
|
||
"""Vynechá položky s hodnotou shodnou s posledním ověřeným stavem; vrátí (nový seznam, vynechané reg)."""
|
||
out: list[tuple[int, str, int]] = []
|
||
skipped: list[int] = []
|
||
for reg, meta, val in registers:
|
||
lv = last_verified.get(int(reg))
|
||
if lv is not None:
|
||
# reg178: porovnáváme jen masku bitů 4–5 (Deye si v dalších bitech drží vlastní stav).
|
||
if int(reg) == 178 and _deye_reg178_verify_match(int(val), int(lv)):
|
||
skipped.append(int(reg))
|
||
continue
|
||
# reg179: porovnáváme jen bits0–1 maskou 0x0003 (masked RMW zachovává ostatní bity).
|
||
if int(reg) == 179 and _deye_reg179_verify_match(int(val), int(lv)):
|
||
skipped.append(int(reg))
|
||
continue
|
||
if int(lv) == int(val):
|
||
skipped.append(int(reg))
|
||
continue
|
||
out.append((reg, meta, val))
|
||
return out, skipped
|
||
|
||
|
||
@dataclass
|
||
class ControlSetpoints:
|
||
battery_w: int | None
|
||
grid_export_limit: int
|
||
ev1_current_a: int
|
||
ev2_current_a: int
|
||
heat_pump_enable: bool
|
||
grid_setpoint_w: int
|
||
ev1_power_w: int
|
||
ev2_power_w: int
|
||
target_soc_pct: int | None = None
|
||
#: Explicitní fyzický režim z plánu (PASSIVE/SELL/CHARGE). Pokud je vyplněn, má přednost před detekcí ze znamének.
|
||
deye_physical_mode: str | None = None
|
||
#: True = zákaz exportu (BLOCK_EXPORT) pro daný slot: např. při efektivní vykupní ceně < 0.
|
||
export_ban: bool = False
|
||
#: True = odpojit GEN port (MI export cutoff) v tomto slotu dle plánu (reg 179 bits0-1).
|
||
#: None/False = neodpojovat.
|
||
deye_gen_cutoff_enabled: bool = False
|
||
#: Efektivní vykupní cena slotu (Kč/kWh z plánu); pro TOU řízení priorit baterie vs. přetok
|
||
effective_sell_price_czk_kwh: float | None = None
|
||
#: True = reg 108/109 na 0 (PRESERVE – Deye baterii nepoužívá)
|
||
lock_battery: bool = False
|
||
#: Režim SELF_SUSTAIN: plný rozsah nabíjení/vybíjení na invertoru + zero-export (reg 142) a nízké TOU %.
|
||
self_sustain_local_use: bool = False
|
||
|
||
|
||
@dataclass
|
||
class OperatingModeInfo:
|
||
mode_code: str
|
||
battery_mode: str
|
||
grid_mode: str
|
||
ev_enabled: bool
|
||
heat_pump_enabled_def: bool
|
||
loxone_mode_value: int
|
||
|
||
|
||
async def create_modbus_commands(
|
||
site_id: int,
|
||
planning_run_id: int | None,
|
||
asset_type: str,
|
||
asset_id: int,
|
||
asset_code: str,
|
||
host: str,
|
||
port: int,
|
||
unit_id: int,
|
||
registers: list[tuple[int, str, int]],
|
||
db: asyncpg.Connection,
|
||
deye_physical_mode: str | None = None,
|
||
) -> list[int]:
|
||
"""
|
||
Vytvoří záznamy v modbus_command pro sadu zápisů.
|
||
Vrátí list command IDs.
|
||
Pro Deye se jméno registru bere z DEYE_REGISTER_NAMES (prostřední položka tuplu se ignoruje).
|
||
"""
|
||
ids: list[int] = []
|
||
for reg, _ignored_name, val in registers:
|
||
register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}")
|
||
cmd_id = await db.fetchval(
|
||
"""
|
||
INSERT INTO ems.modbus_command
|
||
(site_id, asset_type, asset_id, asset_code,
|
||
device_host, device_port, device_unit_id,
|
||
register, register_name, value_to_write,
|
||
planning_run_id, status, deye_physical_mode)
|
||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,'pending',$12)
|
||
RETURNING id
|
||
""",
|
||
site_id,
|
||
asset_type,
|
||
asset_id,
|
||
asset_code,
|
||
host,
|
||
port,
|
||
unit_id,
|
||
reg,
|
||
register_name,
|
||
val,
|
||
planning_run_id,
|
||
deye_physical_mode,
|
||
)
|
||
if cmd_id is not None:
|
||
ids.append(int(cmd_id))
|
||
return ids
|
||
|
||
|
||
def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asyncpg.Record]]:
|
||
"""Seřadí podle adresy registru a rozdělí na souvislé úseky pro FC 0x10 / FC 3."""
|
||
if not cmds:
|
||
return []
|
||
sorted_cmds = sorted(cmds, key=lambda c: int(c["register"]))
|
||
runs: list[list[asyncpg.Record]] = []
|
||
cur: list[asyncpg.Record] = [sorted_cmds[0]]
|
||
for c in sorted_cmds[1:]:
|
||
if int(c["register"]) == int(cur[-1]["register"]) + 1:
|
||
cur.append(c)
|
||
else:
|
||
runs.append(cur)
|
||
cur = [c]
|
||
runs.append(cur)
|
||
return runs
|
||
|
||
|
||
async def execute_modbus_commands(
|
||
command_ids: list[int],
|
||
db: asyncpg.Connection,
|
||
) -> bool:
|
||
"""
|
||
Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích).
|
||
Aktualizuje status na 'written' nebo 'failed'.
|
||
Vrátí True pokud všechny příkazy uspěly.
|
||
"""
|
||
MAX_RETRIES = 3
|
||
RETRY_DELAY = 0.5
|
||
|
||
rows: list[asyncpg.Record] = []
|
||
for cmd_id in command_ids:
|
||
cmd = await db.fetchrow(
|
||
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
|
||
)
|
||
if cmd is not None:
|
||
rows.append(cmd)
|
||
|
||
if not rows:
|
||
return True
|
||
|
||
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
|
||
for cmd in rows:
|
||
by_gw[
|
||
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
|
||
].append(cmd)
|
||
|
||
all_ok = True
|
||
for (host, port, unit), group in by_gw.items():
|
||
client = await get_modbus_client(host, port)
|
||
for run in _modbus_command_contiguous_runs(group):
|
||
start_reg = int(run[0]["register"])
|
||
values = [int(c["value_to_write"]) for c in run]
|
||
ids_run = [int(c["id"]) for c in run]
|
||
for attempt in range(MAX_RETRIES):
|
||
try:
|
||
await client.write_registers(start_reg, values, unit)
|
||
for cmd, val in zip(run, values):
|
||
cid = int(cmd["id"])
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.modbus_command
|
||
SET status='written', value_written=$1, written_at=now(),
|
||
attempt_count=attempt_count+1, error_msg=NULL
|
||
WHERE id=$2
|
||
""",
|
||
val,
|
||
cid,
|
||
)
|
||
logger.info(
|
||
"[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)",
|
||
cid,
|
||
cmd["asset_code"],
|
||
int(cmd["register"]),
|
||
val,
|
||
start_reg,
|
||
attempt + 1,
|
||
)
|
||
break
|
||
except Exception as e:
|
||
if attempt < MAX_RETRIES - 1:
|
||
logger.warning(
|
||
"Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...",
|
||
start_reg,
|
||
len(values),
|
||
attempt + 1,
|
||
e,
|
||
)
|
||
await asyncio.sleep(RETRY_DELAY)
|
||
await client.force_disconnect()
|
||
else:
|
||
for cmd in run:
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.modbus_command
|
||
SET status='failed', error_msg=$1,
|
||
attempt_count=attempt_count+1
|
||
WHERE id=$2
|
||
""",
|
||
str(e),
|
||
int(cmd["id"]),
|
||
)
|
||
logger.error(
|
||
"Modbus batch 0x%04X count=%s all %s attempts failed: %s",
|
||
start_reg,
|
||
len(values),
|
||
MAX_RETRIES,
|
||
e,
|
||
)
|
||
all_ok = False
|
||
|
||
return all_ok
|
||
|
||
|
||
async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None:
|
||
"""Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord."""
|
||
from services.notification_service import run_fn_set_mode_with_discord
|
||
|
||
await run_fn_set_mode_with_discord(
|
||
db,
|
||
site_id,
|
||
"SELF_SUSTAIN",
|
||
"system:mismatch",
|
||
None,
|
||
reason,
|
||
)
|
||
logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason)
|
||
|
||
|
||
def _modbus_cmd_register(cmd: Any) -> int:
|
||
"""asyncpg.Record má __getitem__; objekty s atributem .register též (testy)."""
|
||
try:
|
||
return int(cmd["register"])
|
||
except (KeyError, TypeError):
|
||
return int(cmd.register)
|
||
|
||
|
||
def _deye_expected_clock_triplet_for_verify(
|
||
bundle: list[asyncpg.Record],
|
||
last_verified: dict[int, int],
|
||
a62: int,
|
||
a63: int,
|
||
a64: int,
|
||
) -> tuple[int, int, int]:
|
||
"""
|
||
Sestaví očekávané (w62,w63,w64) pro toleranční verify.
|
||
Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení
|
||
(aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64).
|
||
"""
|
||
by_reg = {_modbus_cmd_register(c): c for c in bundle}
|
||
def _vtw(c: Any) -> int:
|
||
try:
|
||
return int(c["value_to_write"])
|
||
except (KeyError, TypeError):
|
||
return int(c.value_to_write)
|
||
|
||
w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62)
|
||
w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63)
|
||
w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64)
|
||
return (int(w62), int(w63), int(w64))
|
||
|
||
|
||
async def _verify_deye_clock_written_bundle(
|
||
site_id: int,
|
||
bundle: list[asyncpg.Record],
|
||
a62: int,
|
||
a63: int,
|
||
a64: int,
|
||
db: asyncpg.Connection,
|
||
) -> bool:
|
||
"""
|
||
Toleranční ověření pro jeden až tři řádky journalu 62–64 ve stavu written.
|
||
Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech.
|
||
"""
|
||
from services.notification_service import (
|
||
notify_modbus_clock_verify_exhausted,
|
||
notify_modbus_mismatch,
|
||
)
|
||
|
||
cmds_s = sorted(bundle, key=_modbus_cmd_register)
|
||
try:
|
||
asset_id = int(cmds_s[0]["asset_id"])
|
||
except (KeyError, TypeError):
|
||
asset_id = int(cmds_s[0].asset_id)
|
||
last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db)
|
||
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64)
|
||
clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64)
|
||
actual_by_reg = {62: a62, 63: a63, 64: a64}
|
||
|
||
for cmd in cmds_s:
|
||
try:
|
||
cid = int(cmd["id"])
|
||
except (KeyError, TypeError):
|
||
cid = int(cmd.id)
|
||
r = _modbus_cmd_register(cmd)
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.modbus_command
|
||
SET value_verified=$1::int, verified_at=now(),
|
||
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
|
||
WHERE id=$3::int
|
||
""",
|
||
actual_by_reg[r],
|
||
clock_ok,
|
||
cid,
|
||
)
|
||
|
||
if clock_ok:
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.asset_inverter
|
||
SET deye_last_system_time_sync_minute = $1,
|
||
deye_last_system_time_sync_at = now()
|
||
WHERE id = $2
|
||
""",
|
||
_prague_minute_start_utc(),
|
||
asset_id,
|
||
)
|
||
for cmd in cmds_s:
|
||
try:
|
||
cid_l = int(cmd["id"])
|
||
except (KeyError, TypeError):
|
||
cid_l = int(cmd.id)
|
||
try:
|
||
code_l = str(cmd["asset_code"])
|
||
except (KeyError, TypeError):
|
||
code_l = str(cmd.asset_code)
|
||
rr = _modbus_cmd_register(cmd)
|
||
logger.info(
|
||
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
|
||
cid_l,
|
||
code_l,
|
||
rr,
|
||
actual_by_reg[rr],
|
||
)
|
||
return True
|
||
|
||
cmd0 = cmds_s[0]
|
||
try:
|
||
ac0 = str(cmd0["asset_code"])
|
||
except (KeyError, TypeError):
|
||
ac0 = str(cmd0.asset_code)
|
||
logger.error(
|
||
"[cmd clock] MISMATCH %s 62–64: written=(%s,%s,%s) actual=(%s,%s,%s)",
|
||
ac0,
|
||
w62,
|
||
w63,
|
||
w64,
|
||
a62,
|
||
a63,
|
||
a64,
|
||
)
|
||
|
||
attempts = 0
|
||
for cmd in cmds_s:
|
||
try:
|
||
cid_q = int(cmd["id"])
|
||
except (KeyError, TypeError):
|
||
cid_q = int(cmd.id)
|
||
row_ac = await db.fetchrow(
|
||
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q
|
||
)
|
||
ac = int(row_ac["attempt_count"] or 0) if row_ac else 0
|
||
attempts = max(attempts, ac)
|
||
|
||
await notify_modbus_mismatch(
|
||
db,
|
||
site_id,
|
||
ac0,
|
||
62,
|
||
"system_time_62_64",
|
||
w62,
|
||
a62,
|
||
attempts,
|
||
)
|
||
|
||
ids_ordered = []
|
||
for c in cmds_s:
|
||
try:
|
||
ids_ordered.append(int(c["id"]))
|
||
except (KeyError, TypeError):
|
||
ids_ordered.append(int(c.id))
|
||
if attempts < 3:
|
||
for cid in ids_ordered:
|
||
await db.execute(
|
||
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
|
||
cid,
|
||
)
|
||
await execute_modbus_commands(ids_ordered, db)
|
||
await verify_modbus_commands(ids_ordered, db, site_id)
|
||
else:
|
||
logger.critical(
|
||
"[cmd clock] 3 failed verify attempts (62–64); režim se nemění automaticky"
|
||
)
|
||
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
|
||
await notify_modbus_clock_verify_exhausted(
|
||
db,
|
||
site_id,
|
||
site["code"] if site else str(site_id),
|
||
ac0,
|
||
(w62, w63, w64),
|
||
(a62, a63, a64),
|
||
)
|
||
return False
|
||
|
||
|
||
async def verify_modbus_commands(
|
||
command_ids: list[int],
|
||
db: asyncpg.Connection,
|
||
site_id: int,
|
||
) -> bool:
|
||
"""
|
||
Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write.
|
||
Při mismatch: retry (až 3×). Po vyčerpání pokusů u kritických registrů (108, 109, 142, 143, 145)
|
||
→ SELF_SUSTAIN + Discord; u „soft“ (178, TOU power W) jen log + Discord, režim se nemění.
|
||
"""
|
||
from services.notification_service import notify_modbus_mismatch
|
||
|
||
inv_cfg = await _load_inverter_config(site_id, db)
|
||
|
||
async def _apply_verify_result(
|
||
cmd: asyncpg.Record,
|
||
actual_i: int,
|
||
*,
|
||
client: Any,
|
||
unit: int,
|
||
) -> bool:
|
||
"""Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN)."""
|
||
reg = int(cmd["register"])
|
||
cmd_id = int(cmd["id"])
|
||
|
||
if reg in DEYE_CLOCK_REGS:
|
||
asset_id = int(cmd["asset_id"])
|
||
host = str(cmd["device_host"])
|
||
port_i = int(cmd["device_port"])
|
||
uid = int(cmd["device_unit_id"])
|
||
bundle = await _fetch_written_deye_clock_commands(
|
||
site_id, asset_id, host, port_i, uid, db
|
||
)
|
||
if not bundle:
|
||
bundle = [cmd]
|
||
try:
|
||
cvals = await client.read_holding_registers(62, 3, uid)
|
||
except Exception as e:
|
||
logger.error(
|
||
"verify clock guard read 62–64 failed (reg 0x%04X): %s", reg, e
|
||
)
|
||
return False
|
||
if len(cvals) != 3:
|
||
logger.error(
|
||
"verify clock guard: expected 3 regs, got %s", len(cvals)
|
||
)
|
||
return False
|
||
logger.warning(
|
||
"Clock register 0x%04X reached strict verify path; using tolerant 62–64 bundle",
|
||
reg,
|
||
)
|
||
return await _verify_deye_clock_written_bundle(
|
||
site_id,
|
||
bundle,
|
||
int(cvals[0]),
|
||
int(cvals[1]),
|
||
int(cvals[2]),
|
||
db,
|
||
)
|
||
|
||
expected_i = int(cmd["value_to_write"])
|
||
matches = actual_i == expected_i
|
||
if reg == 178:
|
||
first_178 = int(actual_i)
|
||
second_178: int | None = None
|
||
if not _deye_reg178_verify_match(expected_i, first_178):
|
||
try:
|
||
r178 = await client.read_holding_registers(178, 1, unit)
|
||
if r178 and len(r178) >= 1:
|
||
second_178 = int(r178[0])
|
||
except Exception as e:
|
||
logger.warning(
|
||
"[cmd %s] reg178 double-read failed: %s", cmd_id, e
|
||
)
|
||
matches, actual_i = _deye_reg178_verify_with_double_read(
|
||
expected_i, first_178, second_178
|
||
)
|
||
if (
|
||
matches
|
||
and second_178 is not None
|
||
and not _deye_reg178_verify_match(expected_i, first_178)
|
||
):
|
||
logger.info(
|
||
"[cmd %s] reg178 double-read recovered: first=%s second=%s",
|
||
cmd_id,
|
||
first_178,
|
||
second_178,
|
||
)
|
||
if reg == 179:
|
||
matches = _deye_reg179_verify_match(expected_i, actual_i)
|
||
if not matches and reg in DEYE_TOU_POWER_REGS and inv_cfg is not None:
|
||
matches = _deye_tou_power_verify_match(expected_i, actual_i, inv_cfg)
|
||
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.modbus_command
|
||
SET value_verified=$1::int, verified_at=now(),
|
||
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
|
||
WHERE id=$3::int
|
||
""",
|
||
actual_i,
|
||
matches,
|
||
cmd_id,
|
||
)
|
||
|
||
if not matches:
|
||
logger.error(
|
||
"[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s%s",
|
||
cmd_id,
|
||
cmd["asset_code"],
|
||
reg,
|
||
expected_i,
|
||
actual_i,
|
||
(
|
||
" (reg178 mask 0x%04X)" % REG178_VERIFY_MASK
|
||
if reg == 178
|
||
else (" (reg179 mask 0x%04X)" % REG179_MI_EXPORT_MASK if reg == 179 else "")
|
||
),
|
||
)
|
||
row_ac = await db.fetchrow(
|
||
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id
|
||
)
|
||
attempts = int(row_ac["attempt_count"] or 0) if row_ac else 0
|
||
await notify_modbus_mismatch(
|
||
db,
|
||
site_id,
|
||
cmd["asset_code"],
|
||
reg,
|
||
cmd["register_name"] or "",
|
||
expected_i,
|
||
actual_i,
|
||
attempts,
|
||
)
|
||
|
||
if attempts < 3:
|
||
await db.execute(
|
||
"UPDATE ems.modbus_command SET status='retrying' WHERE id=$1",
|
||
cmd_id,
|
||
)
|
||
await execute_modbus_commands([cmd_id], db)
|
||
await verify_modbus_commands([cmd_id], db, site_id)
|
||
else:
|
||
if deye_reg_triggers_self_sustain_after_verify_exhaust(reg):
|
||
logger.critical(
|
||
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
|
||
cmd_id,
|
||
)
|
||
await _switch_to_self_sustain(
|
||
site_id,
|
||
db,
|
||
reason=(
|
||
f"Modbus mismatch po 3 pokusech: {cmd['asset_code']} "
|
||
f"reg 0x{reg:04X}"
|
||
),
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"[cmd %s] 3 failed verify attempts on non-critical reg 0x%04X "
|
||
"(no mode change): %s",
|
||
cmd_id,
|
||
reg,
|
||
cmd["asset_code"],
|
||
)
|
||
return False
|
||
|
||
if reg == 178 and actual_i != expected_i:
|
||
logger.info(
|
||
"[cmd %s] verified OK (reg178 masked): %s 0x%04X value_to_write=%s actual=%s",
|
||
cmd_id,
|
||
cmd["asset_code"],
|
||
reg,
|
||
expected_i,
|
||
actual_i,
|
||
)
|
||
else:
|
||
logger.info(
|
||
"[cmd %s] verified OK: %s 0x%04X=%s",
|
||
cmd_id,
|
||
cmd["asset_code"],
|
||
reg,
|
||
actual_i,
|
||
)
|
||
return True
|
||
|
||
cmds: list[asyncpg.Record] = []
|
||
for cmd_id in command_ids:
|
||
cmd = await db.fetchrow(
|
||
"SELECT * FROM ems.modbus_command WHERE id=$1", cmd_id
|
||
)
|
||
if cmd is not None and cmd["status"] == "written":
|
||
cmds.append(cmd)
|
||
|
||
if not cmds:
|
||
return True
|
||
|
||
by_gw: dict[tuple[str, int, int], list[asyncpg.Record]] = defaultdict(list)
|
||
for cmd in cmds:
|
||
by_gw[
|
||
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
|
||
].append(cmd)
|
||
|
||
all_ok = True
|
||
for (host, port, unit), group in by_gw.items():
|
||
client = await get_modbus_client(host, port)
|
||
clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS]
|
||
rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS]
|
||
|
||
if clock_cmds:
|
||
asset_id = int(clock_cmds[0]["asset_id"])
|
||
bundle = await _fetch_written_deye_clock_commands(
|
||
site_id, asset_id, host, port, unit, db
|
||
)
|
||
if not bundle:
|
||
bundle = clock_cmds
|
||
try:
|
||
cvals = await client.read_holding_registers(62, 3, unit)
|
||
except Exception as e:
|
||
logger.error("verify clock read 62–64 failed: %s", e)
|
||
all_ok = False
|
||
else:
|
||
if len(cvals) != 3:
|
||
logger.error(
|
||
"verify clock read: expected 3 regs, got %s", len(cvals)
|
||
)
|
||
all_ok = False
|
||
else:
|
||
matched = await _verify_deye_clock_written_bundle(
|
||
site_id,
|
||
bundle,
|
||
int(cvals[0]),
|
||
int(cvals[1]),
|
||
int(cvals[2]),
|
||
db,
|
||
)
|
||
if not matched:
|
||
all_ok = False
|
||
|
||
for run in _modbus_command_contiguous_runs(rest):
|
||
start_reg = int(run[0]["register"])
|
||
n = len(run)
|
||
try:
|
||
values = await client.read_holding_registers(start_reg, n, unit)
|
||
except Exception as e:
|
||
logger.error(
|
||
"verify batch read 0x%04X count=%s failed: %s", start_reg, n, e
|
||
)
|
||
all_ok = False
|
||
continue
|
||
if len(values) != n:
|
||
logger.error(
|
||
"verify read 0x%04X: expected %s regs, got %s",
|
||
start_reg,
|
||
n,
|
||
len(values),
|
||
)
|
||
all_ok = False
|
||
continue
|
||
for cmd, actual in zip(run, values):
|
||
matched = await _apply_verify_result(
|
||
cmd, int(actual), client=client, unit=unit
|
||
)
|
||
if not matched:
|
||
all_ok = False
|
||
|
||
return all_ok
|
||
|
||
|
||
async def _fetch_operating_mode(site_id: int, db: asyncpg.Connection) -> OperatingModeInfo | None:
|
||
sql = """
|
||
SELECT som.mode_code, omd.battery_mode, omd.grid_mode,
|
||
omd.ev_enabled, omd.heat_pump_enabled, omd.loxone_mode_value,
|
||
som.valid_until
|
||
FROM ems.site_operating_mode som
|
||
JOIN ems.operating_mode_def omd ON omd.code = som.mode_code
|
||
WHERE som.site_id = $1
|
||
"""
|
||
row = await db.fetchrow(sql, site_id)
|
||
if row is None:
|
||
return None
|
||
vu = row["valid_until"]
|
||
if vu is not None:
|
||
now_utc = datetime.now(timezone.utc)
|
||
if vu.tzinfo is None:
|
||
vu = vu.replace(tzinfo=timezone.utc)
|
||
if vu <= now_utc:
|
||
exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()")
|
||
from services.notification_service import notify_operating_mode_changed
|
||
|
||
for er in exp_rows:
|
||
await notify_operating_mode_changed(
|
||
str(er["site_code"]),
|
||
str(er["old_mode"]),
|
||
str(er["new_mode"]),
|
||
"system:expiry",
|
||
"Automatické vypršení dočasného režimu",
|
||
)
|
||
row = await db.fetchrow(sql, site_id)
|
||
if row is None:
|
||
return None
|
||
return OperatingModeInfo(
|
||
mode_code=row["mode_code"],
|
||
battery_mode=row["battery_mode"],
|
||
grid_mode=row["grid_mode"],
|
||
ev_enabled=bool(row["ev_enabled"]),
|
||
heat_pump_enabled_def=bool(row["heat_pump_enabled"]),
|
||
loxone_mode_value=int(row["loxone_mode_value"]),
|
||
)
|
||
|
||
|
||
async def _get_current_soc(site_id: int, db: asyncpg.Connection) -> int:
|
||
soc = await db.fetchval(
|
||
"""
|
||
SELECT battery_soc_percent
|
||
FROM ems.telemetry_inverter
|
||
WHERE site_id = $1 AND battery_soc_percent IS NOT NULL
|
||
ORDER BY measured_at DESC
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
return int(soc) if soc is not None else 50
|
||
|
||
|
||
async def _load_inverter_config(
|
||
site_id: int, db: asyncpg.Connection
|
||
) -> InverterConfig | None:
|
||
row = await db.fetchrow(
|
||
"""
|
||
SELECT
|
||
ai.id, ai.code,
|
||
se.host, se.port, se.unit_id,
|
||
sgc.max_export_power_w,
|
||
sgc.max_import_power_w,
|
||
sgc.no_export,
|
||
ai.max_battery_charge_w,
|
||
ai.max_battery_discharge_w,
|
||
ab.min_soc_percent,
|
||
ab.reserve_soc_percent,
|
||
ab.max_soc_percent,
|
||
ab.usable_capacity_wh,
|
||
ai.deye_last_system_time_sync_minute,
|
||
ai.deye_last_system_time_sync_at,
|
||
ai.deye_last_tou_inactive_write_prague_date,
|
||
ai.deye_tou_inactive_signature,
|
||
COALESCE(ai.deye_zero_export_mode, 1) AS deye_zero_export_mode,
|
||
COALESCE(ai.deye_gen_microinverter_cutoff_enabled, false) AS deye_gen_microinverter_cutoff_enabled,
|
||
COALESCE(
|
||
ai.deye_register_max_charge_a,
|
||
FLOOR(
|
||
LEAST(
|
||
COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w),
|
||
ai.max_battery_charge_w
|
||
)::numeric / 51.2
|
||
)::int
|
||
) AS max_charge_a,
|
||
COALESCE(
|
||
ai.deye_register_max_discharge_a,
|
||
FLOOR(
|
||
LEAST(
|
||
COALESCE(ab.bms_max_discharge_w, ai.max_battery_discharge_w),
|
||
ai.max_battery_discharge_w
|
||
)::numeric / 51.2
|
||
)::int
|
||
) AS max_discharge_a
|
||
FROM ems.asset_inverter ai
|
||
JOIN ems.site_endpoint se ON se.id = ai.endpoint_id
|
||
JOIN ems.asset_battery ab ON ab.inverter_id = ai.id
|
||
LEFT JOIN ems.site_grid_connection sgc ON sgc.site_id = ai.site_id
|
||
WHERE ai.site_id = $1
|
||
AND ai.active = true
|
||
AND ai.controllable = true
|
||
AND se.enabled = true
|
||
AND se.endpoint_type = 'modbus_tcp'
|
||
ORDER BY ai.id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if row is None:
|
||
return None
|
||
mc = row["max_charge_a"]
|
||
md = row["max_discharge_a"]
|
||
max_charge_a = int(mc) if mc is not None else 0
|
||
max_discharge_a = int(md) if md is not None else 0
|
||
# Firmware Deye často drží max 350 A — vyšší hodnota z DB → mismatch 351 vs 350.
|
||
max_charge_a = min(max_charge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A)
|
||
max_discharge_a = min(max_discharge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A)
|
||
port = int(row["port"] or 502)
|
||
uid = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||
return InverterConfig(
|
||
id=int(row["id"]),
|
||
code=row["code"],
|
||
host=row["host"],
|
||
port=port,
|
||
unit_id=uid,
|
||
max_export_power_w=int(row["max_export_power_w"])
|
||
if row["max_export_power_w"] is not None
|
||
else None,
|
||
max_import_power_w=int(row["max_import_power_w"])
|
||
if row["max_import_power_w"] is not None
|
||
else None,
|
||
no_export=bool(row["no_export"] or False),
|
||
max_battery_charge_w=int(row["max_battery_charge_w"])
|
||
if row["max_battery_charge_w"] is not None
|
||
else None,
|
||
max_battery_discharge_w=int(row["max_battery_discharge_w"])
|
||
if row["max_battery_discharge_w"] is not None
|
||
else None,
|
||
min_soc_percent=int(round(float(row["min_soc_percent"])))
|
||
if row["min_soc_percent"] is not None
|
||
else None,
|
||
reserve_soc_percent=int(row["reserve_soc_percent"])
|
||
if row["reserve_soc_percent"] is not None
|
||
else None,
|
||
max_soc_percent=int(row["max_soc_percent"])
|
||
if row["max_soc_percent"] is not None
|
||
else None,
|
||
usable_capacity_wh=int(row["usable_capacity_wh"])
|
||
if row["usable_capacity_wh"] is not None
|
||
else None,
|
||
max_charge_a=max_charge_a,
|
||
max_discharge_a=max_discharge_a,
|
||
deye_last_system_time_sync_minute=row["deye_last_system_time_sync_minute"],
|
||
deye_last_system_time_sync_at=row["deye_last_system_time_sync_at"],
|
||
deye_last_tou_inactive_write_prague_date=row[
|
||
"deye_last_tou_inactive_write_prague_date"
|
||
],
|
||
deye_tou_inactive_signature=row["deye_tou_inactive_signature"],
|
||
deye_zero_export_mode=int(row["deye_zero_export_mode"]),
|
||
deye_gen_microinverter_cutoff_enabled=bool(row["deye_gen_microinverter_cutoff_enabled"] or False),
|
||
)
|
||
|
||
|
||
def _deye_system_time_register_rows() -> tuple[datetime, list[tuple[int, str, int]]]:
|
||
"""Hodnoty pro reg 62–64 (Europe/Prague); sekundy v reg 64 = 0 (stabilnější zápis)."""
|
||
now = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
|
||
reg62 = ((now.year - 2000) << 8) | now.month
|
||
reg63 = (now.day << 8) | now.hour
|
||
reg64 = (now.minute << 8) | 0
|
||
rows = [
|
||
(62, "", reg62),
|
||
(63, "", reg63),
|
||
(64, "", reg64),
|
||
]
|
||
return now, rows
|
||
|
||
|
||
def _deye_time_point_rows(
|
||
slot_index: int,
|
||
time_hhmm: int,
|
||
power_w: int,
|
||
soc_pct: int,
|
||
grid_charge: bool,
|
||
) -> list[tuple[int, str, int]]:
|
||
g = 1 if grid_charge else 0
|
||
return [
|
||
(148 + slot_index, "", time_hhmm),
|
||
(154 + slot_index, "", power_w),
|
||
(166 + slot_index, "", soc_pct),
|
||
(172 + slot_index, "", g),
|
||
]
|
||
|
||
|
||
async def _fetch_plan_row_for_slot_offset(
|
||
site_id: int, db: asyncpg.Connection, slot_offset: int
|
||
) -> asyncpg.Record | None:
|
||
"""Řádek plánu pro slot z ems.fn_planning_interval_at_offset (jsonb → Record-like dict)."""
|
||
raw = await db.fetchval(
|
||
"""
|
||
select ems.fn_planning_interval_at_offset($1::int, $2::int)
|
||
""",
|
||
site_id,
|
||
slot_offset,
|
||
)
|
||
if raw is None:
|
||
return None
|
||
data = raw if isinstance(raw, dict) else json.loads(raw)
|
||
if not data:
|
||
return None
|
||
return _DictRecord(data)
|
||
|
||
|
||
async def _fetch_max_charge_power_w(site_id: int, db: asyncpg.Connection) -> int:
|
||
v = await db.fetchval(
|
||
"select ems.fn_planning_max_effective_charge_w($1::int)",
|
||
site_id,
|
||
)
|
||
return int(v or 0)
|
||
|
||
|
||
class _DictRecord:
|
||
"""Minimální asyncpg Record kompatibilita pro dict z jsonb."""
|
||
|
||
__slots__ = ("_d",)
|
||
|
||
def __init__(self, d: dict[str, Any]) -> None:
|
||
self._d = d
|
||
|
||
def __getitem__(self, k: str) -> Any:
|
||
return self._d[k]
|
||
|
||
def get(self, k: str, default: Any = None) -> Any:
|
||
return self._d.get(k, default)
|
||
|
||
def __contains__(self, k: str) -> bool:
|
||
return k in self._d
|
||
|
||
|
||
def _build_setpoints(mode: OperatingModeInfo, pi: asyncpg.Record | None) -> ControlSetpoints | None:
|
||
code = mode.mode_code
|
||
if code == "MANUAL":
|
||
return None
|
||
|
||
if code == "AUTO":
|
||
if pi is None:
|
||
return None
|
||
grid_sp = int(pi["grid_setpoint_w"] or 0)
|
||
ev1_w = int(pi["ev1_setpoint_w"] or 0) if "ev1_setpoint_w" in pi else 0
|
||
ev2_w = int(pi["ev2_setpoint_w"] or 0) if "ev2_setpoint_w" in pi else 0
|
||
hp_en = bool(pi["heat_pump_enabled"])
|
||
tgt = pi["battery_soc_target_pct"]
|
||
target_soc = int(round(float(tgt))) if tgt is not None else None
|
||
pm_raw = pi.get("deye_physical_mode")
|
||
pm: str | None = str(pm_raw).strip().upper() if pm_raw is not None else None
|
||
sell_raw = pi.get("effective_sell_price")
|
||
sell_f: float | None = float(sell_raw) if sell_raw is not None else None
|
||
# Záporný výkup sám o sobě neblokuje export, pokud plán export explicitně žádá (soulad s LP).
|
||
export_ban = sell_f is not None and float(sell_f) < 0 and grid_sp >= 0
|
||
gen_cutoff_raw = pi.get("deye_gen_cutoff_enabled")
|
||
gen_cutoff = bool(gen_cutoff_raw) if gen_cutoff_raw is not None else False
|
||
return ControlSetpoints(
|
||
battery_w=int(pi["battery_setpoint_w"] or 0),
|
||
grid_export_limit=abs(min(grid_sp, 0)),
|
||
ev1_current_a=watts_to_amps(ev1_w, phases=3),
|
||
ev2_current_a=watts_to_amps(ev2_w, phases=1),
|
||
heat_pump_enable=hp_en,
|
||
grid_setpoint_w=grid_sp,
|
||
ev1_power_w=ev1_w,
|
||
ev2_power_w=ev2_w,
|
||
target_soc_pct=target_soc,
|
||
deye_physical_mode=pm,
|
||
export_ban=bool(export_ban),
|
||
deye_gen_cutoff_enabled=bool(gen_cutoff),
|
||
effective_sell_price_czk_kwh=sell_f,
|
||
)
|
||
|
||
if code == "SELF_SUSTAIN":
|
||
return ControlSetpoints(
|
||
battery_w=None,
|
||
grid_export_limit=0,
|
||
ev1_current_a=0,
|
||
ev2_current_a=0,
|
||
heat_pump_enable=False,
|
||
grid_setpoint_w=0,
|
||
ev1_power_w=0,
|
||
ev2_power_w=0,
|
||
target_soc_pct=None,
|
||
self_sustain_local_use=True,
|
||
)
|
||
|
||
if code == "CHARGE_CHEAP":
|
||
# max_charge doplníme v export_setpoints z DB
|
||
return ControlSetpoints(
|
||
battery_w=0,
|
||
grid_export_limit=0,
|
||
ev1_current_a=0,
|
||
ev2_current_a=0,
|
||
heat_pump_enable=False,
|
||
grid_setpoint_w=0,
|
||
ev1_power_w=0,
|
||
ev2_power_w=0,
|
||
target_soc_pct=None,
|
||
)
|
||
|
||
if code == "PRESERVE":
|
||
return ControlSetpoints(
|
||
battery_w=0,
|
||
grid_export_limit=0,
|
||
ev1_current_a=0,
|
||
ev2_current_a=0,
|
||
heat_pump_enable=False,
|
||
grid_setpoint_w=0,
|
||
ev1_power_w=0,
|
||
ev2_power_w=0,
|
||
target_soc_pct=None,
|
||
lock_battery=True,
|
||
)
|
||
|
||
logger.warning("Unknown mode_code %s for site export, skipping", code)
|
||
return None
|
||
|
||
|
||
def _apply_price_failsafe_guard(
|
||
site_id: int,
|
||
mode: OperatingModeInfo,
|
||
pi: asyncpg.Record | None,
|
||
sp: ControlSetpoints,
|
||
) -> ControlSetpoints:
|
||
if mode.mode_code != "AUTO" or pi is None:
|
||
return sp
|
||
if "is_predicted_price" not in pi or not bool(pi["is_predicted_price"]):
|
||
return sp
|
||
logger.warning(
|
||
"control export site=%s: AUTO slot uses predicted price -> forcing PASSIVE no-export guard",
|
||
site_id,
|
||
)
|
||
return ControlSetpoints(
|
||
battery_w=0,
|
||
grid_export_limit=0,
|
||
ev1_current_a=sp.ev1_current_a,
|
||
ev2_current_a=sp.ev2_current_a,
|
||
heat_pump_enable=sp.heat_pump_enable,
|
||
grid_setpoint_w=max(0, int(sp.grid_setpoint_w or 0)),
|
||
ev1_power_w=sp.ev1_power_w,
|
||
ev2_power_w=sp.ev2_power_w,
|
||
target_soc_pct=sp.target_soc_pct,
|
||
effective_sell_price_czk_kwh=sp.effective_sell_price_czk_kwh,
|
||
)
|
||
|
||
|
||
def _deye_reg143_export_w(no_export: bool, max_export_power_w: int | None) -> int:
|
||
"""Reg 143 – max export W z DB (např. SUN-20K / home-01 = 13 500 W)."""
|
||
if no_export:
|
||
return 0
|
||
return max(0, int(max_export_power_w or 0))
|
||
|
||
|
||
def _clamp_deye_tou_soc_pct(pct: int) -> int:
|
||
return max(5, min(95, pct))
|
||
|
||
|
||
def _clamp_deye_tou_soc_pct_hi(pct: int, hi: int) -> int:
|
||
"""Stejné dolní omezení 5 % jako u TOU; horní mez z parametru (např. 100 u priority baterie)."""
|
||
return max(5, min(int(hi), int(pct)))
|
||
|
||
|
||
def _deye_tou_min_soc_pct(inv: InverterConfig) -> int:
|
||
if inv.min_soc_percent is not None:
|
||
return _clamp_deye_tou_soc_pct(int(inv.min_soc_percent))
|
||
return 10
|
||
|
||
|
||
def _deye_tou_reserve_soc_pct(inv: InverterConfig) -> int:
|
||
if inv.reserve_soc_percent is not None:
|
||
return _clamp_deye_tou_soc_pct(int(inv.reserve_soc_percent))
|
||
return 20
|
||
|
||
|
||
def _deye_passive_tou_battery_soc_pct(
|
||
inv: InverterConfig,
|
||
setpoints: ControlSetpoints,
|
||
) -> int:
|
||
"""
|
||
Hodnota SOC u Deye TOU řádku (reg 166+) ve fyzickém PASSIVE.
|
||
|
||
Na home-01 Deye interpretuje TOU % jako „kam má směřovat využití baterie“:
|
||
je-li zapsané procento **nižší než skutečný SoC**, přebytek FVE míří spíš do sítě.
|
||
|
||
Při **záporné vykupní** nebo **plánovaném nabíjení** (kladný ``battery_w``) EMS
|
||
zapíše **100 %** do TOU (signál střídači „ber přebytek do baterie v celém rozsahu“).
|
||
**``max_soc_percent`` v DB** je odděleně: horní limit pro **plánovač / Wh bilance**
|
||
(denní provoz, viz komentář sloupce), **nikoli** časové „do kdy“.
|
||
|
||
Jinak zůstane provozní podlaha ``min_soc_percent`` (typicky nízká % → přetok do sítě
|
||
možný dle chování Deye).
|
||
|
||
Režim **SELF_SUSTAIN** (``self_sustain_local_use``): vždy ``min_soc_percent`` — nízké
|
||
TOU drží prioritu „baterie jako buffer“ při plném reg. 108/109 a reg. 142 zero-export;
|
||
neaplikuje se sem logika 100 % podle ceny (LP se v SELF_SUSTAIN nepoužívá).
|
||
"""
|
||
mn = _deye_tou_min_soc_pct(inv)
|
||
if setpoints.self_sustain_local_use:
|
||
return mn
|
||
|
||
bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
|
||
sell = setpoints.effective_sell_price_czk_kwh
|
||
want_battery_priority = bat_w > 0 or (sell is not None and float(sell) < 0)
|
||
|
||
if not want_battery_priority:
|
||
return mn
|
||
|
||
return _clamp_deye_tou_soc_pct_hi(DEYE_TOU_SOC_PASSIVE_BATTERY_PRIORITY_PCT, hi=100)
|
||
|
||
|
||
def _deye_zero_export_amps_for_passive(
|
||
grid_w: int,
|
||
bat_w: int,
|
||
max_charge_a: int,
|
||
max_discharge_a: int,
|
||
) -> tuple[int, int]:
|
||
"""
|
||
PASSIVE (zero export k CT/zátěži, reg. 142 dle DB): výchozí plné 108/109.
|
||
|
||
- Export v plánu (grid_w < 0) a žádné plánované vybíjení (bat_w >= 0): **108 = 0** — nepřebírat
|
||
přebytek FVE do baterie, ať může jít přetok do sítě.
|
||
- Import v plánu (grid_w > 0) a žádné plánované nabíjení (bat_w <= 0): **109 = 0** — nevybíjet
|
||
baterii, odběr ze sítě.
|
||
"""
|
||
if grid_w < 0 and bat_w >= 0:
|
||
return 0, max_discharge_a
|
||
if grid_w > 0 and bat_w <= 0:
|
||
return max_charge_a, 0
|
||
return max_charge_a, max_discharge_a
|
||
|
||
|
||
def get_deye_mode(setpoints: ControlSetpoints) -> str:
|
||
"""
|
||
Fyzický režim Deye: SELL | CHARGE | PASSIVE.
|
||
|
||
Primárně explicitně z plánu (`setpoints.deye_physical_mode`), fallback jen ze znamének (viz
|
||
``docs/04-modules/operating-modes.md``):
|
||
|
||
- **CHARGE** — ``battery_w`` > 0 **a** ``grid_setpoint_w`` > 0 (nabíjení ze sítě + import v plánu).
|
||
- **SELL** — ``grid_setpoint_w`` < 0 **a** ``battery_w`` < 0 (export + vybíjení baterie v plánu).
|
||
- **PASSIVE** (ZERO) — vše ostatní; reg. **108/109** dle ``_deye_zero_export_amps_for_passive``.
|
||
|
||
``battery_w=None`` (SELF_SUSTAIN) → bat_w 0 → typicky PASSIVE; v ``write_inverter_setpoints`` má
|
||
SELF_SUSTAIN vlastní větev (108/109 max).
|
||
"""
|
||
pm = (setpoints.deye_physical_mode or "").strip().upper()
|
||
if pm in {"PASSIVE", "SELL", "CHARGE"}:
|
||
return pm
|
||
|
||
grid_w = int(setpoints.grid_setpoint_w or 0)
|
||
bat_w = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
|
||
|
||
if bat_w > 0 and grid_w > 0:
|
||
return "CHARGE"
|
||
|
||
if grid_w < 0 and bat_w < 0:
|
||
return "SELL"
|
||
|
||
return "PASSIVE"
|
||
|
||
|
||
def _deye_tou_params(
|
||
setpoints: ControlSetpoints,
|
||
inv: InverterConfig,
|
||
) -> tuple[int, int, bool]:
|
||
"""
|
||
Parametry jednoho Deye time pointu: výkon W, SOC % (TOU reg 166+), grid_charge.
|
||
Ve PASSIVE viz _deye_passive_tou_battery_soc_pct (min vs. plný max z DB).
|
||
"""
|
||
max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
|
||
tp_discharge_w = 0 if setpoints.lock_battery else max_batt_w_discharge
|
||
tou_min = _deye_tou_min_soc_pct(inv)
|
||
tou_reserve = _deye_tou_reserve_soc_pct(inv)
|
||
if setpoints.lock_battery:
|
||
return tp_discharge_w, tou_min, False
|
||
deye_mode = get_deye_mode(setpoints)
|
||
if deye_mode == "CHARGE":
|
||
raw_bat = setpoints.battery_w
|
||
battery_w = int(raw_bat) if raw_bat is not None else 0
|
||
cap = int(inv.max_soc_percent) if inv.max_soc_percent is not None else 95
|
||
target_soc = max(10, min(95, cap))
|
||
tp_charge_w = (
|
||
battery_watts_to_amps(battery_w, int(inv.max_charge_a)) * int(BATT_VOLTAGE_V)
|
||
)
|
||
return tp_charge_w, target_soc, True
|
||
if deye_mode == "SELL":
|
||
return tp_discharge_w, tou_reserve, False
|
||
tou_soc = _deye_passive_tou_battery_soc_pct(inv, setpoints)
|
||
return tp_discharge_w, tou_soc, False
|
||
|
||
|
||
async def write_inverter_setpoints(
|
||
site_id: int,
|
||
setpoints_now: ControlSetpoints,
|
||
setpoints_next: ControlSetpoints | None,
|
||
db: asyncpg.Connection,
|
||
planning_run_id: int | None = None,
|
||
) -> str:
|
||
inv = await _load_inverter_config(site_id, db)
|
||
if inv is None:
|
||
return "FAIL inverter: no controllable Modbus endpoint"
|
||
|
||
unit_id = int(inv.unit_id if inv.unit_id is not None else 1)
|
||
raw_bat = setpoints_now.battery_w
|
||
grid_w = int(setpoints_now.grid_setpoint_w or 0)
|
||
no_export = inv.no_export
|
||
export_lim = _deye_reg143_export_w(no_export, inv.max_export_power_w)
|
||
max_batt_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
|
||
tp_discharge_w = 0 if setpoints_now.lock_battery else max_batt_w_discharge
|
||
tou_min_pct = _deye_tou_min_soc_pct(inv)
|
||
tou_reserve_pct = _deye_tou_reserve_soc_pct(inv)
|
||
|
||
try:
|
||
soc_telemetry = await _get_current_soc(site_id, db)
|
||
|
||
deye_mode = get_deye_mode(setpoints_now)
|
||
|
||
bat_w = int(raw_bat) if raw_bat is not None else 0
|
||
if setpoints_now.lock_battery:
|
||
charge_a = 0
|
||
discharge_a = 0
|
||
elif deye_mode == "CHARGE":
|
||
charge_a = battery_watts_to_amps(bat_w, inv.max_charge_a)
|
||
discharge_a = 0
|
||
elif deye_mode == "SELL":
|
||
# Záměrný výdej baterie do sítě: plný vybíjecí proud; export strop dle plánu níže.
|
||
charge_a = 0
|
||
discharge_a = int(inv.max_discharge_a)
|
||
elif setpoints_now.self_sustain_local_use:
|
||
# SELF_SUSTAIN: plný nabíjecí i vybíjecí proud invertoru — přebytek FVE jde do baterie,
|
||
# reg. 142 = zero export to load/CT (viz selling_mode níže), ne reg. 108 = 0.
|
||
charge_a = int(inv.max_charge_a)
|
||
discharge_a = int(inv.max_discharge_a)
|
||
else:
|
||
# PASSIVE (ZERO): výchozí plné 108/109; u přetoku FVE do sítě nebo importu bez baterie viz helper.
|
||
charge_a, discharge_a = _deye_zero_export_amps_for_passive(
|
||
grid_w,
|
||
bat_w,
|
||
int(inv.max_charge_a),
|
||
int(inv.max_discharge_a),
|
||
)
|
||
|
||
zero_exp_mode = int(inv.deye_zero_export_mode or 1)
|
||
selling_mode = 0 if deye_mode == "SELL" else zero_exp_mode
|
||
solar_sell = 0 if (setpoints_now.export_ban and deye_mode != "SELL") else 1
|
||
export_limit = export_lim
|
||
if deye_mode == "SELL" and grid_w < 0:
|
||
export_limit = min(export_lim, max(REG143_SELL_CAP_MIN_W, abs(grid_w)))
|
||
reg178_val = REG178_SELL if deye_mode == "SELL" else REG178_PASSIVE
|
||
|
||
logger.info(
|
||
f"[control] site={site_id} fyzický režim Deye: {deye_mode} | "
|
||
f"battery_w={raw_bat!r} grid_w={grid_w} | "
|
||
f"charge_a={charge_a} discharge_a={discharge_a} | "
|
||
f"reg142={selling_mode} reg145={solar_sell} reg143={export_limit}W reg178={reg178_val}"
|
||
)
|
||
|
||
now_cet, time_rows = _deye_system_time_register_rows()
|
||
skip_time = False
|
||
try:
|
||
mb_clock = await get_modbus_client(inv.host, inv.port)
|
||
tvals = await mb_clock.read_holding_registers(
|
||
62, 3, int(inv.unit_id if inv.unit_id is not None else 1)
|
||
)
|
||
if len(tvals) == 3:
|
||
skip_time = _deye_should_skip_time_sync_after_read(
|
||
inv, int(tvals[0]), int(tvals[1]), int(tvals[2])
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"Deye clock read: expected 3 registers, got %s; will sync 62–64",
|
||
len(tvals),
|
||
)
|
||
except Exception as e:
|
||
logger.warning("Deye clock read failed (will sync 62–64): %s", e)
|
||
|
||
if skip_time:
|
||
logger.info(
|
||
"Deye clock 62–64 skipped (drift ≤ %ss, last sync < %sh ago): %s CET",
|
||
DEYE_CLOCK_DRIFT_OK_SEC,
|
||
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
|
||
now_cet.strftime("%Y-%m-%d %H:%M:%S"),
|
||
)
|
||
else:
|
||
logger.info("Deye time will sync: %s CET", now_cet.strftime("%Y-%m-%d %H:%M:%S"))
|
||
|
||
registers: list[tuple[int, str, int]] = [] if skip_time else list(time_rows)
|
||
|
||
sp_tp2 = setpoints_next if setpoints_next is not None else setpoints_now
|
||
hh_cur = current_slot_hhmm()
|
||
hh_nxt = next_slot_hhmm()
|
||
p1, s1, g1 = _deye_tou_params(setpoints_now, inv)
|
||
p2, s2, g2 = _deye_tou_params(sp_tp2, inv)
|
||
registers.extend(_deye_time_point_rows(0, hh_cur, p1, s1, g1))
|
||
registers.extend(_deye_time_point_rows(1, hh_nxt, p2, s2, g2))
|
||
|
||
prague_date = datetime.now(PRAGUE_TZ).date()
|
||
inactive_sig = (
|
||
f"{DEYE_TOU_INACTIVE_HHMM}|{tou_min_pct}|{tou_reserve_pct}|{tp_discharge_w}"
|
||
)
|
||
need_inactive_tou = (
|
||
inv.deye_last_tou_inactive_write_prague_date != prague_date
|
||
or inv.deye_tou_inactive_signature != inactive_sig
|
||
)
|
||
if need_inactive_tou:
|
||
for idx in range(2, 6):
|
||
registers.extend(
|
||
_deye_time_point_rows(
|
||
idx, DEYE_TOU_INACTIVE_HHMM, tp_discharge_w, tou_min_pct, False
|
||
)
|
||
)
|
||
else:
|
||
logger.debug(
|
||
"Deye TOU rows 3–6 skipped (already written today, signature unchanged)"
|
||
)
|
||
|
||
registers.extend(
|
||
[
|
||
(108, "", charge_a),
|
||
(109, "", discharge_a),
|
||
(141, "energy_mode (0)", 0),
|
||
(142, "limit_control", selling_mode),
|
||
(143, "", export_limit),
|
||
(145, "solar_sell", solar_sell),
|
||
(178, "grid_peak_shaving_switch", reg178_val),
|
||
]
|
||
)
|
||
|
||
if inv.deye_gen_microinverter_cutoff_enabled:
|
||
want_cutoff = bool(setpoints_now.deye_gen_cutoff_enabled) and deye_mode != "SELL"
|
||
target_bits = (
|
||
REG179_MI_EXPORT_DISABLE if want_cutoff else REG179_MI_EXPORT_ENABLE
|
||
)
|
||
try:
|
||
mb179 = await get_modbus_client(inv.host, inv.port)
|
||
r179 = await mb179.read_holding_registers(179, 1, unit_id)
|
||
if r179 and len(r179) >= 1:
|
||
current_179 = int(r179[0])
|
||
new_179 = (current_179 & ~REG179_MI_EXPORT_MASK) | int(target_bits)
|
||
registers.append((179, "control_board_special_1", new_179))
|
||
logger.info(
|
||
"[control] %s: reg179 MI cutoff %s (old=%s new=%s mask=0x%04X)",
|
||
inv.code,
|
||
"ON" if want_cutoff else "OFF",
|
||
current_179,
|
||
new_179,
|
||
REG179_MI_EXPORT_MASK,
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"[control] %s: reg179 read returned %s values, skip cutoff write",
|
||
inv.code,
|
||
len(r179) if r179 is not None else None,
|
||
)
|
||
except Exception as e:
|
||
logger.warning("[control] %s: reg179 cutoff RMW failed: %s", inv.code, e)
|
||
|
||
logger.info(
|
||
"[control] %s: deye_mode=%s charge=%sA discharge=%sA "
|
||
"reg142=%s reg145=%s export=%sW "
|
||
"tp1=%s tp2=%s soc=%s%% (batt=%r grid=%sW)",
|
||
inv.code,
|
||
deye_mode,
|
||
charge_a,
|
||
discharge_a,
|
||
selling_mode,
|
||
solar_sell,
|
||
export_limit,
|
||
hh_cur,
|
||
hh_nxt,
|
||
soc_telemetry,
|
||
raw_bat,
|
||
grid_w,
|
||
)
|
||
|
||
last_verified = await _fetch_last_verified_inverter_registers(site_id, inv.id, db)
|
||
registers, skipped_unchanged = _drop_registers_matching_last_verified(
|
||
registers, last_verified
|
||
)
|
||
if skipped_unchanged:
|
||
logger.info(
|
||
"[control] %s: skip %s registers (value equals last verified): %s",
|
||
inv.code,
|
||
len(skipped_unchanged),
|
||
skipped_unchanged[:24],
|
||
)
|
||
if not registers:
|
||
logger.info(
|
||
"[control] %s: all Deye holding regs match last verified, no Modbus write",
|
||
inv.code,
|
||
)
|
||
if need_inactive_tou:
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.asset_inverter
|
||
SET deye_last_tou_inactive_write_prague_date = $1,
|
||
deye_tou_inactive_signature = $2
|
||
WHERE id = $3
|
||
""",
|
||
prague_date,
|
||
inactive_sig,
|
||
inv.id,
|
||
)
|
||
return (
|
||
f"OK inverter: batt_w={raw_bat!r} (no changes vs last verified Modbus snapshot)"
|
||
)
|
||
|
||
will_write_inactive = any(
|
||
int(r) in _DEYE_INACTIVE_TOU_REGISTERS for r, _, _ in registers
|
||
)
|
||
|
||
cmd_ids = await create_modbus_commands(
|
||
site_id,
|
||
planning_run_id,
|
||
"inverter",
|
||
inv.id,
|
||
inv.code,
|
||
inv.host,
|
||
inv.port,
|
||
inv.unit_id,
|
||
registers,
|
||
db,
|
||
deye_physical_mode=deye_mode,
|
||
)
|
||
if not await execute_modbus_commands(cmd_ids, db):
|
||
return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)"
|
||
logger.info("[control] Inverter %s journal write OK", inv.code)
|
||
|
||
will_write_time = any(int(r) in DEYE_CLOCK_REGS for r, _, _ in registers)
|
||
if will_write_time:
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.asset_inverter
|
||
SET deye_last_system_time_sync_minute = $1,
|
||
deye_last_system_time_sync_at = now()
|
||
WHERE id = $2
|
||
""",
|
||
_prague_minute_start_utc(),
|
||
inv.id,
|
||
)
|
||
|
||
if need_inactive_tou or will_write_inactive:
|
||
await db.execute(
|
||
"""
|
||
UPDATE ems.asset_inverter
|
||
SET deye_last_tou_inactive_write_prague_date = $1,
|
||
deye_tou_inactive_signature = $2
|
||
WHERE id = $3
|
||
""",
|
||
prague_date,
|
||
inactive_sig,
|
||
inv.id,
|
||
)
|
||
except Exception as e:
|
||
return f"FAIL inverter: {inv.code}: {e}"
|
||
|
||
return (
|
||
f"OK inverter: batt_w={raw_bat!r} "
|
||
f"(time points + FC 0x10: 108/109/141/142/178/143)"
|
||
)
|
||
|
||
|
||
async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict[str, Any]:
|
||
"""
|
||
Živé čtení holding registrů Deye 108, 109, 141–145, 178, 191 (stejné TCP spojení jako telemetrie/export).
|
||
Vše pod jedním mutexem + sdružené FC3 bloky — mezi jednotlivými read_register dřív telemetrie
|
||
střídavě brala lock a RS485 brány házely cizí transaction_id / I/O timeouty.
|
||
"""
|
||
inv = await _load_inverter_config(site_id, db)
|
||
if inv is None:
|
||
raise ValueError("no controllable Modbus inverter for site")
|
||
|
||
uid = int(inv.unit_id)
|
||
client = await get_modbus_client(inv.host, inv.port)
|
||
read_at = datetime.now(timezone.utc)
|
||
try:
|
||
async with client.batch(uid) as mb:
|
||
b108 = await mb.read_holding_registers(108, 2)
|
||
b141 = await mb.read_holding_registers(141, 5)
|
||
r178 = await mb.read_holding_registers(178, 1)
|
||
r179 = await mb.read_holding_registers(179, 1)
|
||
r191 = await mb.read_holding_registers(191, 1)
|
||
r108, r109 = b108[0], b108[1]
|
||
r141, r142, r143 = b141[0], b141[1], b141[2]
|
||
r145 = b141[4]
|
||
r178 = r178[0]
|
||
r179 = r179[0]
|
||
r191 = r191[0]
|
||
except Exception:
|
||
logger.exception("read_deye_registers_live site=%s failed", site_id)
|
||
raise
|
||
|
||
return {
|
||
"reg108_charge_a": int(r108),
|
||
"reg109_discharge_a": int(r109),
|
||
"reg141_energy_mode": int(r141),
|
||
"reg142_limit_control": int(r142),
|
||
"reg143_export_limit_w": int(r143),
|
||
"reg145_solar_sell": int(r145),
|
||
"reg178_peak_shaving_switch": int(r178),
|
||
"reg179_control_board_special_1": int(r179),
|
||
"reg179_mi_export_cutoff_bits": int(r179) & int(REG179_MI_EXPORT_MASK),
|
||
"reg179_mi_export_cutoff_is_on": (int(r179) & int(REG179_MI_EXPORT_MASK)) == int(REG179_MI_EXPORT_DISABLE),
|
||
"reg191_peak_shaving_w": int(r191),
|
||
"read_at": read_at.isoformat(),
|
||
}
|
||
|
||
|
||
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
|
||
c = (charger_code or "").strip().lower()
|
||
if c == "ev-charger-1":
|
||
a = sp.ev1_current_a
|
||
elif c == "ev-charger-2":
|
||
a = sp.ev2_current_a
|
||
elif c.endswith("-1") or c == "ev1":
|
||
a = sp.ev1_current_a
|
||
elif c.endswith("-2") or c == "ev2":
|
||
a = sp.ev2_current_a
|
||
else:
|
||
a = 0
|
||
if a < 6:
|
||
a = 0
|
||
return a
|
||
|
||
|
||
async def write_ev_setpoints(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str:
|
||
rows = await db.fetch(
|
||
"""
|
||
SELECT ec.code, se.host, se.port, se.unit_id
|
||
FROM ems.asset_ev_charger ec
|
||
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
|
||
WHERE ec.site_id = $1
|
||
AND ec.schedulable = true
|
||
AND se.enabled = true
|
||
AND se.endpoint_type = 'modbus_tcp'
|
||
ORDER BY ec.code
|
||
""",
|
||
site_id,
|
||
)
|
||
if not rows:
|
||
return "OK EV: no schedulable chargers"
|
||
|
||
for row in rows:
|
||
code = row["code"]
|
||
current_a = _current_limit_for_charger(code, setpoints)
|
||
logger.info(
|
||
"EV setpoint [%s]: %sA (TODO: Modbus registers)",
|
||
code,
|
||
current_a,
|
||
)
|
||
return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)"
|
||
|
||
|
||
async def write_heat_pump_setpoint(site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection) -> str:
|
||
rows = await db.fetch(
|
||
"""
|
||
SELECT hp.code, se.host, se.port, se.unit_id
|
||
FROM ems.asset_heat_pump hp
|
||
JOIN ems.site_endpoint se ON se.id = hp.endpoint_id
|
||
WHERE hp.site_id = $1
|
||
AND hp.schedulable = true
|
||
AND se.enabled = true
|
||
AND se.endpoint_type = 'modbus_tcp'
|
||
""",
|
||
site_id,
|
||
)
|
||
if not rows:
|
||
return "OK heat pump: no schedulable unit"
|
||
for row in rows:
|
||
logger.info(
|
||
"HP setpoint [%s]: enable=%s (TODO: Modbus registers)",
|
||
row["code"],
|
||
setpoints.heat_pump_enable,
|
||
)
|
||
return "OK heat pump: logged (Modbus TODO)"
|
||
|
||
|
||
async def send_loxone_setpoints(
|
||
site_id: int,
|
||
setpoints: ControlSetpoints,
|
||
mode: OperatingModeInfo,
|
||
db: asyncpg.Connection,
|
||
) -> str:
|
||
endpoint = await db.fetchrow(
|
||
"""
|
||
SELECT host, port, protocol
|
||
FROM ems.site_endpoint
|
||
WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true
|
||
ORDER BY id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if not endpoint:
|
||
return "OK Loxone: no endpoint, skipped"
|
||
|
||
proto = (endpoint["protocol"] or "http").lower()
|
||
if proto not in ("http", "https"):
|
||
proto = "http"
|
||
host = endpoint["host"]
|
||
port = int(endpoint["port"] or (443 if proto == "https" else 80))
|
||
base = f"{proto}://{host}:{port}/dev/sps/io"
|
||
|
||
settings = get_settings()
|
||
user = settings.loxone_user or os.getenv("LOXONE_USER") or ""
|
||
password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or ""
|
||
auth = (user, password) if user else None
|
||
|
||
batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
|
||
|
||
paths: list[tuple[str, int]] = [
|
||
(f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value),
|
||
(f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display),
|
||
(f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w),
|
||
(f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w),
|
||
(f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w),
|
||
(f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}", 1 if setpoints.heat_pump_enable else 0),
|
||
]
|
||
|
||
errs: list[str] = []
|
||
try:
|
||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||
for url, _ in paths:
|
||
try:
|
||
r = await client.get(url, auth=auth)
|
||
r.raise_for_status()
|
||
except Exception as e:
|
||
errs.append(f"{url!s}: {e}")
|
||
except Exception as e:
|
||
return f"FAIL Loxone: client {e}"
|
||
|
||
if errs:
|
||
return "FAIL Loxone: " + "; ".join(errs[:3])
|
||
return "OK Loxone: all virtual inputs updated"
|
||
|
||
|
||
async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None:
|
||
mode = await _fetch_operating_mode(site_id, db)
|
||
if mode is None:
|
||
logger.warning("control export site=%s: no operating mode row", site_id)
|
||
return
|
||
|
||
if mode.mode_code == "MANUAL":
|
||
logger.info("control export site=%s: MANUAL, skip writes", site_id)
|
||
return
|
||
|
||
try:
|
||
pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0)
|
||
pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1)
|
||
sp_now = _build_setpoints(mode, pi_now)
|
||
sp_next = _build_setpoints(mode, pi_next)
|
||
|
||
if mode.mode_code == "AUTO" and sp_now is None:
|
||
if pi_now is None:
|
||
logger.warning(
|
||
"control export site=%s: AUTO but no planning_interval for current slot, skip",
|
||
site_id,
|
||
)
|
||
return
|
||
|
||
if sp_now is None:
|
||
logger.warning(
|
||
"control export site=%s: no setpoints for mode %s, skip",
|
||
site_id,
|
||
mode.mode_code,
|
||
)
|
||
return
|
||
|
||
if mode.mode_code == "CHARGE_CHEAP":
|
||
max_ch = await _fetch_max_charge_power_w(site_id, db)
|
||
# Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB.
|
||
pw = max(1, int(max_ch))
|
||
sp_now = ControlSetpoints(
|
||
battery_w=pw,
|
||
grid_export_limit=0,
|
||
ev1_current_a=0,
|
||
ev2_current_a=0,
|
||
heat_pump_enable=False,
|
||
grid_setpoint_w=pw,
|
||
ev1_power_w=0,
|
||
ev2_power_w=0,
|
||
target_soc_pct=None,
|
||
effective_sell_price_czk_kwh=None,
|
||
)
|
||
sp_next = sp_now
|
||
else:
|
||
sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now)
|
||
if sp_next is not None:
|
||
sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next)
|
||
|
||
planning_run_id = await db.fetchval(
|
||
"""
|
||
SELECT id FROM ems.planning_run
|
||
WHERE site_id = $1 AND status = 'active'
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if planning_run_id is not None:
|
||
planning_run_id = int(planning_run_id)
|
||
|
||
try:
|
||
inv_res = await write_inverter_setpoints(
|
||
site_id, sp_now, sp_next, db, planning_run_id=planning_run_id
|
||
)
|
||
except Exception as e:
|
||
logger.error("inverter write failed: %s", e)
|
||
inv_res = f"FAIL inverter: {e}"
|
||
|
||
try:
|
||
ev_res = await write_ev_setpoints(site_id, sp_now, db)
|
||
except Exception as e:
|
||
logger.error("ev write failed: %s", e)
|
||
ev_res = f"FAIL ev: {e}"
|
||
|
||
try:
|
||
hp_res = await write_heat_pump_setpoint(site_id, sp_now, db)
|
||
except Exception as e:
|
||
logger.error("hp write failed: %s", e)
|
||
hp_res = f"FAIL heat pump: {e}"
|
||
|
||
try:
|
||
lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db)
|
||
except Exception as e:
|
||
logger.error("loxone write failed: %s", e)
|
||
lox_res = f"FAIL Loxone: {e}"
|
||
|
||
results = list(
|
||
zip(
|
||
("inverter", "ev", "heat_pump", "loxone"),
|
||
(inv_res, ev_res, hp_res, lox_res),
|
||
)
|
||
)
|
||
|
||
for name, res in results:
|
||
if isinstance(res, Exception):
|
||
logger.error("control export site=%s %s: FAIL %s", site_id, name, res)
|
||
elif isinstance(res, str) and res.startswith("FAIL"):
|
||
logger.error("control export site=%s %s: %s", site_id, name, res)
|
||
else:
|
||
logger.info("control export site=%s %s: %s", site_id, name, res)
|
||
finally:
|
||
try:
|
||
await enqueue_site_signals(site_id, db)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"control export site=%s: signal enqueue failed: %s", site_id, e
|
||
)
|