Files
ems/backend/tests/test_deye_clock.py

126 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deye systémový čas 6264: dekódování, toleranční verify, politika driftu."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from services.control_exporter import (
DEYE_CLOCK_DRIFT_OK_SEC,
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC,
InverterConfig,
PRAGUE_TZ,
_deye_clock_registers_verify_match,
_deye_registers_to_prague_datetime,
_deye_should_skip_time_sync_after_read,
_deye_system_time_register_rows,
)
def _inv(
*,
sync_at: datetime | None = None,
) -> InverterConfig:
return InverterConfig(
id=1,
code="deye-main",
host="127.0.0.1",
port=502,
unit_id=1,
max_export_power_w=13_500,
max_import_power_w=13_500,
no_export=False,
max_battery_charge_w=10_000,
max_battery_discharge_w=10_000,
min_soc_percent=12,
reserve_soc_percent=20,
max_soc_percent=95,
usable_capacity_wh=64_000,
max_charge_a=100,
max_discharge_a=100,
deye_last_system_time_sync_at=sync_at,
)
class DeyeClockDecodeTests(unittest.TestCase):
def test_roundtrip_encode_decode(self) -> None:
now_cet, rows = _deye_system_time_register_rows()
r62 = next(v for a, _, v in rows if a == 62)
r63 = next(v for a, _, v in rows if a == 63)
r64 = next(v for a, _, v in rows if a == 64)
dt = _deye_registers_to_prague_datetime(r62, r63, r64)
self.assertIsNotNone(dt)
assert dt is not None
self.assertEqual(dt.tzinfo, PRAGUE_TZ)
self.assertEqual(dt.year, now_cet.year)
self.assertEqual(dt.month, now_cet.month)
self.assertEqual(dt.day, now_cet.day)
self.assertEqual(dt.hour, now_cet.hour)
self.assertEqual(dt.minute, now_cet.minute)
self.assertEqual(dt.second, 0)
def test_verify_same_triplet(self) -> None:
# 2006-10-03 15:45:00 Praha (platné měsíc/den/hod/min/sec)
r62, r63, r64 = 0x060A, 0x030F, 0x2D00
self.assertTrue(_deye_clock_registers_verify_match(r62, r63, r64, r62, r63, r64))
def test_verify_seconds_within_tolerance(self) -> None:
w62 = (2026 - 2000) << 8 | 4
w63 = 3 << 8 | 14
w64 = 30 << 8 | 10
a64 = 30 << 8 | 50
self.assertTrue(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64))
self.assertLessEqual(40, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC)
def test_verify_fails_when_minute_differs_beyond_tolerance(self) -> None:
w62 = (2026 - 2000) << 8 | 4
w63 = 3 << 8 | 14
w64 = 30 << 8 | 0
a64 = 33 << 8 | 0
self.assertFalse(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64))
class DeyeSkipTimeSyncPolicyTests(unittest.TestCase):
def test_no_skip_when_never_written_even_if_drift_ok(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | wall.second
self.assertFalse(
_deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64)
)
def test_no_skip_when_drift_large(self) -> None:
wall = datetime.now(PRAGUE_TZ)
wrong_min = (wall.minute - 3) % 60
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wrong_min << 8) | wall.second
self.assertFalse(_deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64))
def test_no_skip_after_24h_even_if_drift_small(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | wall.second
old = datetime.now(timezone.utc) - timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS) - timedelta(minutes=1)
self.assertFalse(
_deye_should_skip_time_sync_after_read(_inv(sync_at=old), r62, r63, r64)
)
def test_skip_within_24h_and_small_drift(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | min(59, wall.second + 5)
recent = datetime.now(timezone.utc) - timedelta(hours=1)
self.assertTrue(
_deye_should_skip_time_sync_after_read(_inv(sync_at=recent), r62, r63, r64)
)
self.assertGreater(DEYE_CLOCK_DRIFT_OK_SEC, 5)
if __name__ == "__main__":
unittest.main()