244 lines
8.0 KiB
Python
244 lines
8.0 KiB
Python
"""Čisté Deye konstanty a helpery pro control export."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from services.control.models import InverterConfig
|
|
|
|
PRAGUE_TZ = ZoneInfo("Europe/Prague")
|
|
|
|
# Hodiny Deye 62-64: po zápisu sekundy na zařízení dál běží, verify musí být toleranční.
|
|
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120
|
|
# Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund.
|
|
DEYE_CLOCK_DRIFT_OK_SEC = 60
|
|
# A zároveň neuplynul tento interval od posledního syncu / potvrzení driftu.
|
|
DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24
|
|
|
|
# Deye LV baterie: převod výkon -> proud pro registry 108/109.
|
|
BATT_VOLTAGE_V = 51.2
|
|
|
|
# Reg 178 - bitové pole: bity 4-5 (peak shaving switch) a bity 0-1 (MI export cutoff).
|
|
REG178_SELL = 0b00100000
|
|
REG178_PASSIVE = 0b00110000
|
|
REG178_VERIFY_MASK = 0x0030
|
|
REG178_MI_EXPORT_MASK = 0x0003
|
|
REG178_MI_EXPORT_DISABLE = 0b10
|
|
REG178_MI_EXPORT_ENABLE = 0b11
|
|
REG178_VERIFY_MASK_COMBINED = REG178_VERIFY_MASK | REG178_MI_EXPORT_MASK
|
|
|
|
DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145})
|
|
DEYE_TOU_POWER_REGS = frozenset(range(154, 160))
|
|
DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350
|
|
|
|
# Neaktivní TOU bloky (3-6): Deye často 23:59 (2359) neuloží, 23:55 je stabilní.
|
|
DEYE_TOU_INACTIVE_HHMM = 2355
|
|
|
|
_DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset(
|
|
[
|
|
150,
|
|
151,
|
|
152,
|
|
153,
|
|
156,
|
|
157,
|
|
158,
|
|
159,
|
|
168,
|
|
169,
|
|
170,
|
|
171,
|
|
174,
|
|
175,
|
|
176,
|
|
177,
|
|
]
|
|
)
|
|
|
|
DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64})
|
|
|
|
DEYE_REGISTER_NAMES: dict[int, str] = {
|
|
108: "max_charge_a (max nabíjecí proud baterie)",
|
|
109: "max_discharge_a (max vybíjecí proud baterie)",
|
|
141: "energy_mode (0, EMS nemění)",
|
|
142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)",
|
|
143: "export_limit_w (max export do sítě)",
|
|
145: "solar_sell (0=disabled, 1=enabled)",
|
|
340: "max_solar_power_w (strop DC PV A v W; cap z fn_inverter_pv_a_max_w / deye_reg340_max_solar_w)",
|
|
178: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3; bits4-5 peak shaving 32/48)",
|
|
148: "time_point_1_time",
|
|
149: "time_point_2_time",
|
|
154: "time_point_1_power_w",
|
|
155: "time_point_2_power_w",
|
|
166: "time_point_1_soc_min_pct",
|
|
167: "time_point_2_soc_min_pct",
|
|
172: "time_point_1_grid_charge",
|
|
173: "time_point_2_grid_charge",
|
|
62: "system_time_year_month",
|
|
63: "system_time_day_hour",
|
|
64: "system_time_min_sec",
|
|
}
|
|
for _tp_i in range(6):
|
|
_n = _tp_i + 1
|
|
DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time")
|
|
DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w")
|
|
DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct")
|
|
DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge")
|
|
|
|
|
|
def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool:
|
|
return (int(expected_i) & REG178_VERIFY_MASK_COMBINED) == (
|
|
int(actual_i) & REG178_VERIFY_MASK_COMBINED
|
|
)
|
|
|
|
|
|
def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool:
|
|
"""True = po 3x mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr)."""
|
|
return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN
|
|
|
|
|
|
def _deye_tou_power_verify_match(
|
|
expected_i: int, actual_i: int, inv: InverterConfig
|
|
) -> bool:
|
|
"""Firmware často clampne TOU power W na max z reg. 108/109 x 51.2 V."""
|
|
if int(actual_i) == int(expected_i):
|
|
return True
|
|
max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V)
|
|
max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
|
|
a = int(actual_i)
|
|
return a == max_w_charge or a == max_w_discharge
|
|
|
|
|
|
def _deye_reg178_verify_with_double_read(
|
|
expected_i: int, actual_first: int, actual_second: int | None
|
|
) -> tuple[bool, int]:
|
|
"""
|
|
Vrátí (shoda, hodnota_pro_journal).
|
|
Druhé čtení použít jen když první neprojde maskou (RS485 / glitch).
|
|
"""
|
|
if _deye_reg178_verify_match(expected_i, actual_first):
|
|
return True, actual_first
|
|
if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second):
|
|
return True, int(actual_second)
|
|
return False, actual_first
|
|
|
|
|
|
def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int:
|
|
if not power_w or power_w <= 0:
|
|
return 0
|
|
return min(32, max(0, int(power_w / (phases * voltage))))
|
|
|
|
|
|
def battery_watts_to_amps(power_w: int, max_amps: int) -> int:
|
|
"""Proud z |výkonu| baterie; max_amps z DB."""
|
|
derived = int(abs(power_w) / BATT_VOLTAGE_V)
|
|
return min(max(0, max_amps), max(0, derived))
|
|
|
|
|
|
def current_slot_hhmm() -> int:
|
|
"""Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM."""
|
|
now = datetime.now(PRAGUE_TZ)
|
|
slot_min = (now.minute // 15) * 15
|
|
return now.hour * 100 + slot_min
|
|
|
|
|
|
def next_slot_hhmm() -> int:
|
|
"""Začátek příštího 15min slotu v Europe/Prague, formát HHMM."""
|
|
now = datetime.now(PRAGUE_TZ)
|
|
minutes = now.minute
|
|
slot_minutes = ((minutes // 15) + 1) * 15
|
|
if slot_minutes >= 60:
|
|
next_hour = (now.hour + 1) % 24
|
|
next_min = 0
|
|
else:
|
|
next_hour = now.hour
|
|
next_min = slot_minutes
|
|
return next_hour * 100 + next_min
|
|
|
|
|
|
def compute_pv_a_reg340_max_solar_w(
|
|
cap_w: int,
|
|
forecast_w: int,
|
|
curtail_w: int,
|
|
*,
|
|
min_w: int = 0,
|
|
) -> int:
|
|
"""Hodnota pro Deye reg 340 (max solar power, W) z capu a plánovaného curtailmentu pole A."""
|
|
if curtail_w <= 0:
|
|
raw = int(cap_w)
|
|
else:
|
|
raw = max(0, min(int(cap_w), int(forecast_w) - int(curtail_w)))
|
|
if raw > 0 and int(min_w) > 0:
|
|
return max(int(min_w), raw)
|
|
return raw
|
|
|
|
|
|
def _prague_minute_start_utc() -> datetime:
|
|
"""UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague."""
|
|
p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
|
|
return p.astimezone(timezone.utc)
|
|
|
|
|
|
def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None:
|
|
"""Dekódování reg 62-64 (Deye system time v Europe/Prague)."""
|
|
try:
|
|
year = (int(r62) >> 8) + 2000
|
|
month = int(r62) & 0xFF
|
|
day = int(r63) >> 8
|
|
hour = int(r63) & 0xFF
|
|
minute = int(r64) >> 8
|
|
second = int(r64) & 0xFF
|
|
if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23):
|
|
return None
|
|
if not (0 <= minute <= 59 and 0 <= second <= 59):
|
|
return None
|
|
return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ)
|
|
except (ValueError, OverflowError):
|
|
return None
|
|
|
|
|
|
def _deye_clock_registers_verify_match(
|
|
w62: int,
|
|
w63: int,
|
|
w64: int,
|
|
a62: int,
|
|
a63: int,
|
|
a64: int,
|
|
) -> bool:
|
|
w_dt = _deye_registers_to_prague_datetime(w62, w63, w64)
|
|
a_dt = _deye_registers_to_prague_datetime(a62, a63, a64)
|
|
if w_dt is None or a_dt is None:
|
|
return False
|
|
return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC
|
|
|
|
|
|
def _deye_should_skip_time_sync_after_read(
|
|
inv: InverterConfig,
|
|
r62: int,
|
|
r63: int,
|
|
r64: int,
|
|
) -> bool:
|
|
"""
|
|
True = nezařazovat zápis 62-64: drift je malý a od posledního úspěšného zápisu
|
|
nebo tolerančního ověření neuplynulo 24h.
|
|
"""
|
|
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
|
|
if dev is None:
|
|
return False
|
|
wall = datetime.now(PRAGUE_TZ)
|
|
drift = abs((wall - dev).total_seconds())
|
|
if drift > DEYE_CLOCK_DRIFT_OK_SEC:
|
|
return False
|
|
last_write = inv.deye_last_system_time_sync_at
|
|
if last_write is None:
|
|
return False
|
|
if last_write.tzinfo is None:
|
|
last_write = last_write.replace(tzinfo=timezone.utc)
|
|
else:
|
|
last_write = last_write.astimezone(timezone.utc)
|
|
age = datetime.now(timezone.utc) - last_write
|
|
if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS):
|
|
return False
|
|
return True
|