Files
ems/backend/services/control/deye_helpers.py
Dusan Vojacek 0f7dc6ed94
Some checks failed
CI and deploy / migration-check (push) Failing after 14s
CI and deploy / deploy (push) Has been skipped
Branch 4: BA81 GEN cutoff audit + exekuce při sell<0
2026-06-06 22:36:27 +02:00

265 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Čisté Deye konstanty a helpery pro control export."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from services.control.models import InverterConfig
PRAGUE_TZ = ZoneInfo("Europe/Prague")
# Hodiny Deye 62-64: po zápisu sekundy na zařízení dál běží, verify musí být toleranční.
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120
# Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund.
DEYE_CLOCK_DRIFT_OK_SEC = 60
# A zároveň neuplynul tento interval od posledního syncu / potvrzení driftu.
DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24
# Deye LV baterie: převod výkon -> proud pro registry 108/109.
BATT_VOLTAGE_V = 51.2
# Reg 178 - bitové pole: bity 4-5 (peak shaving switch) a bity 0-1 (MI export cutoff).
REG178_SELL = 0b00100000
REG178_PASSIVE = 0b00110000
REG178_VERIFY_MASK = 0x0030
REG178_MI_EXPORT_MASK = 0x0003
REG178_MI_EXPORT_DISABLE = 0b10
REG178_MI_EXPORT_ENABLE = 0b11
REG178_VERIFY_MASK_COMBINED = REG178_VERIFY_MASK | REG178_MI_EXPORT_MASK
DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145})
DEYE_TOU_POWER_REGS = frozenset(range(154, 160))
DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350
# Neaktivní TOU bloky (3-6): Deye často 23:59 (2359) neuloží, 23:55 je stabilní.
DEYE_TOU_INACTIVE_HHMM = 2355
_DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset(
[
150,
151,
152,
153,
156,
157,
158,
159,
168,
169,
170,
171,
174,
175,
176,
177,
]
)
DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64})
DEYE_REGISTER_NAMES: dict[int, str] = {
108: "max_charge_a (max nabíjecí proud baterie)",
109: "max_discharge_a (max vybíjecí proud baterie)",
141: "energy_mode (0, EMS nemění)",
142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)",
143: "export_limit_w (max export do sítě)",
145: "solar_sell (0=disabled, 1=enabled)",
340: "max_solar_power_w (strop DC PV A v W; cap z fn_inverter_pv_a_max_w / deye_reg340_max_solar_w)",
178: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3; bits4-5 peak shaving 32/48)",
148: "time_point_1_time",
149: "time_point_2_time",
154: "time_point_1_power_w",
155: "time_point_2_power_w",
166: "time_point_1_soc_min_pct",
167: "time_point_2_soc_min_pct",
172: "time_point_1_grid_charge",
173: "time_point_2_grid_charge",
62: "system_time_year_month",
63: "system_time_day_hour",
64: "system_time_min_sec",
}
for _tp_i in range(6):
_n = _tp_i + 1
DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time")
DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w")
DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct")
DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge")
def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool:
return (int(expected_i) & REG178_VERIFY_MASK_COMBINED) == (
int(actual_i) & REG178_VERIFY_MASK_COMBINED
)
def deye_mi_export_cutoff_want_enabled(
*,
gen_microinverter_cutoff_enabled: bool,
deye_gen_cutoff_enabled: bool,
export_ban: bool,
deye_mode: str,
) -> bool:
"""
True = zapnout MI export cut-off (reg 178 bits 01 = 11b).
Plán může mít z_gen_cutoff=0 (PV B jen do domu v LP), ale bez cut-off na GEN portu
mikroinvertory fyzicky exportují do sítě — při export_ban (záporná vykupní, grid≥0)
cut-off vynutit i bez solver flagu.
"""
if not gen_microinverter_cutoff_enabled:
return False
if deye_mode == "SELL":
return False
return bool(deye_gen_cutoff_enabled) or bool(export_ban)
def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool:
"""True = po 3x mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr)."""
return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN
def _deye_tou_power_verify_match(
expected_i: int, actual_i: int, inv: InverterConfig
) -> bool:
"""Firmware často clampne TOU power W na max z reg. 108/109 x 51.2 V."""
if int(actual_i) == int(expected_i):
return True
max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V)
max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V)
a = int(actual_i)
return a == max_w_charge or a == max_w_discharge
def _deye_reg178_verify_with_double_read(
expected_i: int, actual_first: int, actual_second: int | None
) -> tuple[bool, int]:
"""
Vrátí (shoda, hodnota_pro_journal).
Druhé čtení použít jen když první neprojde maskou (RS485 / glitch).
"""
if _deye_reg178_verify_match(expected_i, actual_first):
return True, actual_first
if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second):
return True, int(actual_second)
return False, actual_first
def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int:
if not power_w or power_w <= 0:
return 0
return min(32, max(0, int(power_w / (phases * voltage))))
def battery_watts_to_amps(power_w: int, max_amps: int) -> int:
"""Proud z |výkonu| baterie; max_amps z DB."""
derived = int(abs(power_w) / BATT_VOLTAGE_V)
return min(max(0, max_amps), max(0, derived))
def current_slot_hhmm() -> int:
"""Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM."""
now = datetime.now(PRAGUE_TZ)
slot_min = (now.minute // 15) * 15
return now.hour * 100 + slot_min
def next_slot_hhmm() -> int:
"""Začátek příštího 15min slotu v Europe/Prague, formát HHMM."""
now = datetime.now(PRAGUE_TZ)
minutes = now.minute
slot_minutes = ((minutes // 15) + 1) * 15
if slot_minutes >= 60:
next_hour = (now.hour + 1) % 24
next_min = 0
else:
next_hour = now.hour
next_min = slot_minutes
return next_hour * 100 + next_min
def compute_pv_a_reg340_max_solar_w(
cap_w: int,
forecast_w: int,
curtail_w: int,
*,
min_w: int = 0,
) -> int:
"""Hodnota pro Deye reg 340 (max solar power, W) z capu a plánovaného curtailmentu pole A."""
if curtail_w <= 0:
raw = int(cap_w)
else:
raw = max(0, min(int(cap_w), int(forecast_w) - int(curtail_w)))
if raw > 0 and int(min_w) > 0:
return max(int(min_w), raw)
return raw
def _prague_minute_start_utc() -> datetime:
"""UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague."""
p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0)
return p.astimezone(timezone.utc)
def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None:
"""Dekódování reg 62-64 (Deye system time v Europe/Prague)."""
try:
year = (int(r62) >> 8) + 2000
month = int(r62) & 0xFF
day = int(r63) >> 8
hour = int(r63) & 0xFF
minute = int(r64) >> 8
second = int(r64) & 0xFF
if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23):
return None
if not (0 <= minute <= 59 and 0 <= second <= 59):
return None
return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ)
except (ValueError, OverflowError):
return None
def _deye_clock_registers_verify_match(
w62: int,
w63: int,
w64: int,
a62: int,
a63: int,
a64: int,
) -> bool:
w_dt = _deye_registers_to_prague_datetime(w62, w63, w64)
a_dt = _deye_registers_to_prague_datetime(a62, a63, a64)
if w_dt is None or a_dt is None:
return False
return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC
def _deye_should_skip_time_sync_after_read(
inv: InverterConfig,
r62: int,
r63: int,
r64: int,
) -> bool:
"""
True = nezařazovat zápis 62-64: drift je malý a od posledního úspěšného zápisu
nebo tolerančního ověření neuplynulo 24h.
"""
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
if dev is None:
return False
wall = datetime.now(PRAGUE_TZ)
drift = abs((wall - dev).total_seconds())
if drift > DEYE_CLOCK_DRIFT_OK_SEC:
return False
last_write = inv.deye_last_system_time_sync_at
if last_write is None:
return False
if last_write.tzinfo is None:
last_write = last_write.replace(tzinfo=timezone.utc)
else:
last_write = last_write.astimezone(timezone.utc)
age = datetime.now(timezone.utc) - last_write
if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS):
return False
return True