"""Deye systémový čas 62–64: dekódování, toleranční verify, politika driftu.""" from __future__ import annotations import unittest from datetime import datetime, timedelta, timezone from services.control_exporter import ( DEYE_CLOCK_DRIFT_OK_SEC, DEYE_CLOCK_RESYNC_INTERVAL_HOURS, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC, InverterConfig, PRAGUE_TZ, _deye_clock_registers_verify_match, _deye_registers_to_prague_datetime, _deye_should_skip_time_sync_after_read, _deye_system_time_register_rows, ) def _inv( *, sync_at: datetime | None = None, ) -> InverterConfig: return InverterConfig( id=1, code="deye-main", host="127.0.0.1", port=502, unit_id=1, max_export_power_w=13_500, max_import_power_w=13_500, no_export=False, max_battery_charge_w=10_000, max_battery_discharge_w=10_000, min_soc_percent=12, reserve_soc_percent=20, max_soc_percent=95, usable_capacity_wh=64_000, max_charge_a=100, max_discharge_a=100, deye_last_system_time_sync_at=sync_at, ) class DeyeClockDecodeTests(unittest.TestCase): def test_roundtrip_encode_decode(self) -> None: now_cet, rows = _deye_system_time_register_rows() r62 = next(v for a, _, v in rows if a == 62) r63 = next(v for a, _, v in rows if a == 63) r64 = next(v for a, _, v in rows if a == 64) dt = _deye_registers_to_prague_datetime(r62, r63, r64) self.assertIsNotNone(dt) assert dt is not None self.assertEqual(dt.tzinfo, PRAGUE_TZ) self.assertEqual(dt.year, now_cet.year) self.assertEqual(dt.month, now_cet.month) self.assertEqual(dt.day, now_cet.day) self.assertEqual(dt.hour, now_cet.hour) self.assertEqual(dt.minute, now_cet.minute) self.assertEqual(dt.second, 0) def test_verify_same_triplet(self) -> None: # 2006-10-03 15:45:00 Praha (platné měsíc/den/hod/min/sec) r62, r63, r64 = 0x060A, 0x030F, 0x2D00 self.assertTrue(_deye_clock_registers_verify_match(r62, r63, r64, r62, r63, r64)) def test_verify_seconds_within_tolerance(self) -> None: w62 = (2026 - 2000) << 8 | 4 w63 = 3 << 8 | 14 w64 = 30 << 8 | 10 a64 = 30 << 8 | 50 self.assertTrue(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64)) self.assertLessEqual(40, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC) def test_verify_fails_when_minute_differs_beyond_tolerance(self) -> None: w62 = (2026 - 2000) << 8 | 4 w63 = 3 << 8 | 14 w64 = 30 << 8 | 0 a64 = 33 << 8 | 0 self.assertFalse(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64)) class DeyeSkipTimeSyncPolicyTests(unittest.TestCase): def test_no_skip_when_never_written_even_if_drift_ok(self) -> None: wall = datetime.now(PRAGUE_TZ) r62 = ((wall.year - 2000) << 8) | wall.month r63 = (wall.day << 8) | wall.hour r64 = (wall.minute << 8) | wall.second self.assertFalse( _deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64) ) def test_no_skip_when_drift_large(self) -> None: wall = datetime.now(PRAGUE_TZ) wrong_min = (wall.minute - 3) % 60 r62 = ((wall.year - 2000) << 8) | wall.month r63 = (wall.day << 8) | wall.hour r64 = (wrong_min << 8) | wall.second self.assertFalse(_deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64)) def test_no_skip_after_24h_even_if_drift_small(self) -> None: wall = datetime.now(PRAGUE_TZ) r62 = ((wall.year - 2000) << 8) | wall.month r63 = (wall.day << 8) | wall.hour r64 = (wall.minute << 8) | wall.second old = datetime.now(timezone.utc) - timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS) - timedelta(minutes=1) self.assertFalse( _deye_should_skip_time_sync_after_read(_inv(sync_at=old), r62, r63, r64) ) def test_skip_within_24h_and_small_drift(self) -> None: wall = datetime.now(PRAGUE_TZ) r62 = ((wall.year - 2000) << 8) | wall.month r63 = (wall.day << 8) | wall.hour r64 = (wall.minute << 8) | min(59, wall.second + 5) recent = datetime.now(timezone.utc) - timedelta(hours=1) self.assertTrue( _deye_should_skip_time_sync_after_read(_inv(sync_at=recent), r62, r63, r64) ) self.assertGreater(DEYE_CLOCK_DRIFT_OK_SEC, 5) if __name__ == "__main__": unittest.main()