refactor control exporter helpers

This commit is contained in:
Dusan Vojacek
2026-05-02 19:35:41 +02:00
parent e2f77eda14
commit 6d6341cde8
3 changed files with 345 additions and 302 deletions

View File

@@ -0,0 +1,236 @@
"""Čisté Deye konstanty a helpery pro control export."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from services.control.models import InverterConfig
PRAGUE_TZ = ZoneInfo("Europe/Prague")
# Hodiny Deye 62-64: po zápisu sekundy na zařízení dál běží, verify musí být toleranční.
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120
# Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund.
DEYE_CLOCK_DRIFT_OK_SEC = 60
# A zároveň neuplynul tento interval od posledního syncu / potvrzení driftu.
DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24
# Deye LV baterie: převod výkon -> proud pro registry 108/109.
BATT_VOLTAGE_V = 51.2
# Reg 143 ve SELL: min(|grid_setpoint_w|, ...) nesmí klesnout pod tuto podlahu (W).
REG143_SELL_CAP_MIN_W = 200
# Reg 178 - bitové pole: bity 4-5 (peak shaving switch) a bity 0-1 (MI export cutoff).
REG178_SELL = 0b00100000
REG178_PASSIVE = 0b00110000
REG178_VERIFY_MASK = 0x0030
REG178_MI_EXPORT_MASK = 0x0003
REG178_MI_EXPORT_DISABLE = 0b10
REG178_MI_EXPORT_ENABLE = 0b11
REG178_VERIFY_MASK_COMBINED = REG178_VERIFY_MASK | REG178_MI_EXPORT_MASK
DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145})
DEYE_TOU_POWER_REGS = frozenset(range(154, 160))
DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350
# Neaktivní TOU bloky (3-6): Deye často 23:59 (2359) neuloží, 23:55 je stabilní.
DEYE_TOU_INACTIVE_HHMM = 2355
_DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset(
[
150,
151,
152,
153,
156,
157,
158,
159,
168,
169,
170,
171,
174,
175,
176,
177,
]
)
DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64})
DEYE_REGISTER_NAMES: dict[int, str] = {
108: "max_charge_a (max nabíjecí proud baterie)",
109: "max_discharge_a (max vybíjecí proud baterie)",
141: "energy_mode (0, EMS nemění)",
142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)",
143: "export_limit_w (max export do sítě)",
145: "solar_sell (0=disabled, 1=enabled)",
340: "max_solar_power_w (strop DC PV A v W; součet nominal_power_wp řiditelných polí)",
178: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3; bits4-5 peak shaving 32/48)",
148: "time_point_1_time",
149: "time_point_2_time",
154: "time_point_1_power_w",
155: "time_point_2_power_w",
166: "time_point_1_soc_min_pct",
167: "time_point_2_soc_min_pct",
172: "time_point_1_grid_charge",
173: "time_point_2_grid_charge",
62: "system_time_year_month",
63: "system_time_day_hour",
64: "system_time_min_sec",
}
for _tp_i in range(6):
_n = _tp_i + 1
DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time")
DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w")
DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct")
DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge")
def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool:
return (int(expected_i) & REG178_VERIFY_MASK_COMBINED) == (
int(actual_i) & REG178_VERIFY_MASK_COMBINED
)
def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool:
"""True = po 3x mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr)."""
return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN
def _deye_tou_power_verify_match(
expected_i: int, actual_i: int, inv: InverterConfig
) -> bool:
"""Firmware často clampne TOU power W na max z reg. 108/109 x 51.2 V."""
if int(actual_i) == int(expected_i):
return True
max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V)
max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
a = int(actual_i)
return a == max_w_charge or a == max_w_discharge
def _deye_reg178_verify_with_double_read(
expected_i: int, actual_first: int, actual_second: int | None
) -> tuple[bool, int]:
"""
Vrátí (shoda, hodnota_pro_journal).
Druhé čtení použít jen když první neprojde maskou (RS485 / glitch).
"""
if _deye_reg178_verify_match(expected_i, actual_first):
return True, actual_first
if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second):
return True, int(actual_second)
return False, actual_first
def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int:
if not power_w or power_w <= 0:
return 0
return min(32, max(0, int(power_w / (phases * voltage))))
def battery_watts_to_amps(power_w: int, max_amps: int) -> int:
"""Proud z |výkonu| baterie; max_amps z DB."""
derived = int(abs(power_w) / BATT_VOLTAGE_V)
return min(max(0, max_amps), max(0, derived))
def current_slot_hhmm() -> int:
"""Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM."""
now = datetime.now(PRAGUE_TZ)
slot_min = (now.minute // 15) * 15
return now.hour * 100 + slot_min
def next_slot_hhmm() -> int:
"""Začátek příštího 15min slotu v Europe/Prague, formát HHMM."""
now = datetime.now(PRAGUE_TZ)
minutes = now.minute
slot_minutes = ((minutes // 15) + 1) * 15
if slot_minutes >= 60:
next_hour = (now.hour + 1) % 24
next_min = 0
else:
next_hour = now.hour
next_min = slot_minutes
return next_hour * 100 + next_min
def compute_pv_a_reg340_max_solar_w(cap_w: int, forecast_w: int, curtail_w: int) -> int:
"""Hodnota pro Deye reg 340 (max solar power, W) z capu a plánovaného curtailmentu pole A."""
if curtail_w <= 0:
return int(cap_w)
return max(0, min(int(cap_w), int(forecast_w) - int(curtail_w)))
def _prague_minute_start_utc() -> datetime:
"""UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague."""
p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
return p.astimezone(timezone.utc)
def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None:
"""Dekódování reg 62-64 (Deye system time v Europe/Prague)."""
try:
year = (int(r62) >> 8) + 2000
month = int(r62) & 0xFF
day = int(r63) >> 8
hour = int(r63) & 0xFF
minute = int(r64) >> 8
second = int(r64) & 0xFF
if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23):
return None
if not (0 <= minute <= 59 and 0 <= second <= 59):
return None
return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ)
except (ValueError, OverflowError):
return None
def _deye_clock_registers_verify_match(
w62: int,
w63: int,
w64: int,
a62: int,
a63: int,
a64: int,
) -> bool:
w_dt = _deye_registers_to_prague_datetime(w62, w63, w64)
a_dt = _deye_registers_to_prague_datetime(a62, a63, a64)
if w_dt is None or a_dt is None:
return False
return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC
def _deye_should_skip_time_sync_after_read(
inv: InverterConfig,
r62: int,
r63: int,
r64: int,
) -> bool:
"""
True = nezařazovat zápis 62-64: drift je malý a od posledního úspěšného zápisu
nebo tolerančního ověření neuplynulo 24h.
"""
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
if dev is None:
return False
wall = datetime.now(PRAGUE_TZ)
drift = abs((wall - dev).total_seconds())
if drift > DEYE_CLOCK_DRIFT_OK_SEC:
return False
last_write = inv.deye_last_system_time_sync_at
if last_write is None:
return False
if last_write.tzinfo is None:
last_write = last_write.replace(tzinfo=timezone.utc)
else:
last_write = last_write.astimezone(timezone.utc)
age = datetime.now(timezone.utc) - last_write
if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS):
return False
return True