"""Čisté Deye konstanty a helpery pro control export.""" from __future__ import annotations from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo from services.control.models import InverterConfig PRAGUE_TZ = ZoneInfo("Europe/Prague") # Hodiny Deye 62-64: po zápisu sekundy na zařízení dál běží, verify musí být toleranční. DEYE_CLOCK_VERIFY_MAX_DELTA_SEC = 120 # Řidší zápis: bez zápisu, pokud čas na invertoru neodbočí od Prahy víc než o tolik sekund. DEYE_CLOCK_DRIFT_OK_SEC = 60 # A zároveň neuplynul tento interval od posledního syncu / potvrzení driftu. DEYE_CLOCK_RESYNC_INTERVAL_HOURS = 24 # Deye LV baterie: převod výkon -> proud pro registry 108/109. BATT_VOLTAGE_V = 51.2 # Reg 178 - bitové pole: bity 4-5 (peak shaving switch) a bity 0-1 (MI export cutoff). REG178_SELL = 0b00100000 REG178_PASSIVE = 0b00110000 REG178_VERIFY_MASK = 0x0030 REG178_MI_EXPORT_MASK = 0x0003 REG178_MI_EXPORT_DISABLE = 0b10 REG178_MI_EXPORT_ENABLE = 0b11 REG178_VERIFY_MASK_COMBINED = REG178_VERIFY_MASK | REG178_MI_EXPORT_MASK DEYE_CRITICAL_REGS_SELF_SUSTAIN = frozenset({108, 109, 142, 143, 145}) DEYE_TOU_POWER_REGS = frozenset(range(154, 160)) DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A = 350 # Neaktivní TOU bloky (3-6): Deye často 23:59 (2359) neuloží, 23:55 je stabilní. DEYE_TOU_INACTIVE_HHMM = 2355 _DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset( [ 150, 151, 152, 153, 156, 157, 158, 159, 168, 169, 170, 171, 174, 175, 176, 177, ] ) DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64}) DEYE_REGISTER_NAMES: dict[int, str] = { 108: "max_charge_a (max nabíjecí proud baterie)", 109: "max_discharge_a (max vybíjecí proud baterie)", 141: "energy_mode (0, EMS nemění)", 142: "limit_control (0=selling first, 1=zero export to load, 2=zero export to CT)", 143: "export_limit_w (max export do sítě)", 145: "solar_sell (0=disabled, 1=enabled)", 340: "max_solar_power_w (strop DC PV A v W; cap z fn_inverter_pv_a_max_w / deye_reg340_max_solar_w)", 178: "control_board_special_1 (bits0-1: MI export cutoff disable=2 enable=3; bits4-5 peak shaving 32/48)", 148: "time_point_1_time", 149: "time_point_2_time", 154: "time_point_1_power_w", 155: "time_point_2_power_w", 166: "time_point_1_soc_min_pct", 167: "time_point_2_soc_min_pct", 172: "time_point_1_grid_charge", 173: "time_point_2_grid_charge", 62: "system_time_year_month", 63: "system_time_day_hour", 64: "system_time_min_sec", } for _tp_i in range(6): _n = _tp_i + 1 DEYE_REGISTER_NAMES.setdefault(148 + _tp_i, f"time_point_{_n}_time") DEYE_REGISTER_NAMES.setdefault(154 + _tp_i, f"time_point_{_n}_power_w") DEYE_REGISTER_NAMES.setdefault(166 + _tp_i, f"time_point_{_n}_soc_min_pct") DEYE_REGISTER_NAMES.setdefault(172 + _tp_i, f"time_point_{_n}_grid_charge") def _deye_reg178_verify_match(expected_i: int, actual_i: int) -> bool: return (int(expected_i) & REG178_VERIFY_MASK_COMBINED) == ( int(actual_i) & REG178_VERIFY_MASK_COMBINED ) def deye_reg_triggers_self_sustain_after_verify_exhaust(reg: int) -> bool: """True = po 3x mismatch přepnout lokalitu do SELF_SUSTAIN (kritický registr).""" return int(reg) in DEYE_CRITICAL_REGS_SELF_SUSTAIN def _deye_tou_power_verify_match( expected_i: int, actual_i: int, inv: InverterConfig ) -> bool: """Firmware často clampne TOU power W na max z reg. 108/109 x 51.2 V.""" if int(actual_i) == int(expected_i): return True max_w_charge = int(inv.max_charge_a * BATT_VOLTAGE_V) max_w_discharge = int(inv.max_discharge_a * BATT_VOLTAGE_V) a = int(actual_i) return a == max_w_charge or a == max_w_discharge def _deye_reg178_verify_with_double_read( expected_i: int, actual_first: int, actual_second: int | None ) -> tuple[bool, int]: """ Vrátí (shoda, hodnota_pro_journal). Druhé čtení použít jen když první neprojde maskou (RS485 / glitch). """ if _deye_reg178_verify_match(expected_i, actual_first): return True, actual_first if actual_second is not None and _deye_reg178_verify_match(expected_i, actual_second): return True, int(actual_second) return False, actual_first def watts_to_amps(power_w: int | None, phases: int = 3, voltage: int = 230) -> int: if not power_w or power_w <= 0: return 0 return min(32, max(0, int(power_w / (phases * voltage)))) def battery_watts_to_amps(power_w: int, max_amps: int) -> int: """Proud z |výkonu| baterie; max_amps z DB.""" derived = int(abs(power_w) / BATT_VOLTAGE_V) return min(max(0, max_amps), max(0, derived)) def current_slot_hhmm() -> int: """Začátek probíhajícího 15min slotu v Europe/Prague, formát HHMM.""" now = datetime.now(PRAGUE_TZ) slot_min = (now.minute // 15) * 15 return now.hour * 100 + slot_min def next_slot_hhmm() -> int: """Začátek příštího 15min slotu v Europe/Prague, formát HHMM.""" now = datetime.now(PRAGUE_TZ) minutes = now.minute slot_minutes = ((minutes // 15) + 1) * 15 if slot_minutes >= 60: next_hour = (now.hour + 1) % 24 next_min = 0 else: next_hour = now.hour next_min = slot_minutes return next_hour * 100 + next_min def compute_pv_a_reg340_max_solar_w( cap_w: int, forecast_w: int, curtail_w: int, *, min_w: int = 0, ) -> int: """Hodnota pro Deye reg 340 (max solar power, W) z capu a plánovaného curtailmentu pole A.""" if curtail_w <= 0: raw = int(cap_w) else: raw = max(0, min(int(cap_w), int(forecast_w) - int(curtail_w))) if raw > 0 and int(min_w) > 0: return max(int(min_w), raw) return raw def _prague_minute_start_utc() -> datetime: """UTC okamžik odpovídající začátku aktuální kalendářní minuty v Europe/Prague.""" p = datetime.now(PRAGUE_TZ).replace(second=0, microsecond=0) return p.astimezone(timezone.utc) def _deye_registers_to_prague_datetime(r62: int, r63: int, r64: int) -> datetime | None: """Dekódování reg 62-64 (Deye system time v Europe/Prague).""" try: year = (int(r62) >> 8) + 2000 month = int(r62) & 0xFF day = int(r63) >> 8 hour = int(r63) & 0xFF minute = int(r64) >> 8 second = int(r64) & 0xFF if not (1 <= month <= 12 and 1 <= day <= 31 and 0 <= hour <= 23): return None if not (0 <= minute <= 59 and 0 <= second <= 59): return None return datetime(year, month, day, hour, minute, second, tzinfo=PRAGUE_TZ) except (ValueError, OverflowError): return None def _deye_clock_registers_verify_match( w62: int, w63: int, w64: int, a62: int, a63: int, a64: int, ) -> bool: w_dt = _deye_registers_to_prague_datetime(w62, w63, w64) a_dt = _deye_registers_to_prague_datetime(a62, a63, a64) if w_dt is None or a_dt is None: return False return abs((a_dt - w_dt).total_seconds()) <= DEYE_CLOCK_VERIFY_MAX_DELTA_SEC def _deye_should_skip_time_sync_after_read( inv: InverterConfig, r62: int, r63: int, r64: int, ) -> bool: """ True = nezařazovat zápis 62-64: drift je malý a od posledního úspěšného zápisu nebo tolerančního ověření neuplynulo 24h. """ dev = _deye_registers_to_prague_datetime(r62, r63, r64) if dev is None: return False wall = datetime.now(PRAGUE_TZ) drift = abs((wall - dev).total_seconds()) if drift > DEYE_CLOCK_DRIFT_OK_SEC: return False last_write = inv.deye_last_system_time_sync_at if last_write is None: return False if last_write.tzinfo is None: last_write = last_write.replace(tzinfo=timezone.utc) else: last_write = last_write.astimezone(timezone.utc) age = datetime.now(timezone.utc) - last_write if age >= timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS): return False return True