Files
ems/backend/tests/test_deye_clock.py
Dusan Vojacek ec55285bdd
All checks were successful
deploy / deploy (push) Successful in 4m23s
test / smoke-test (push) Successful in 6s
fix zapisovani casu
2026-04-10 20:17:17 +02:00

146 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deye systémový čas 6264: dekódování, toleranční verify, politika driftu."""
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.control_exporter import (
DEYE_CLOCK_DRIFT_OK_SEC,
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
DEYE_CLOCK_VERIFY_MAX_DELTA_SEC,
InverterConfig,
PRAGUE_TZ,
_deye_clock_registers_verify_match,
_deye_expected_clock_triplet_for_verify,
_deye_registers_to_prague_datetime,
_deye_should_skip_time_sync_after_read,
_deye_system_time_register_rows,
)
def _inv(
*,
sync_at: datetime | None = None,
) -> InverterConfig:
return InverterConfig(
id=1,
code="deye-main",
host="127.0.0.1",
port=502,
unit_id=1,
max_export_power_w=13_500,
max_import_power_w=13_500,
no_export=False,
max_battery_charge_w=10_000,
max_battery_discharge_w=10_000,
min_soc_percent=12,
reserve_soc_percent=20,
max_soc_percent=95,
usable_capacity_wh=64_000,
max_charge_a=100,
max_discharge_a=100,
deye_last_system_time_sync_at=sync_at,
)
class DeyeClockDecodeTests(unittest.TestCase):
def test_roundtrip_encode_decode(self) -> None:
now_cet, rows = _deye_system_time_register_rows()
r62 = next(v for a, _, v in rows if a == 62)
r63 = next(v for a, _, v in rows if a == 63)
r64 = next(v for a, _, v in rows if a == 64)
dt = _deye_registers_to_prague_datetime(r62, r63, r64)
self.assertIsNotNone(dt)
assert dt is not None
self.assertEqual(dt.tzinfo, PRAGUE_TZ)
self.assertEqual(dt.year, now_cet.year)
self.assertEqual(dt.month, now_cet.month)
self.assertEqual(dt.day, now_cet.day)
self.assertEqual(dt.hour, now_cet.hour)
self.assertEqual(dt.minute, now_cet.minute)
self.assertEqual(dt.second, 0)
def test_verify_same_triplet(self) -> None:
# 2006-10-03 15:45:00 Praha (platné měsíc/den/hod/min/sec)
r62, r63, r64 = 0x060A, 0x030F, 0x2D00
self.assertTrue(_deye_clock_registers_verify_match(r62, r63, r64, r62, r63, r64))
def test_verify_seconds_within_tolerance(self) -> None:
w62 = (2026 - 2000) << 8 | 4
w63 = 3 << 8 | 14
w64 = 30 << 8 | 10
a64 = 30 << 8 | 50
self.assertTrue(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64))
self.assertLessEqual(40, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC)
def test_verify_fails_when_minute_differs_beyond_tolerance(self) -> None:
w62 = (2026 - 2000) << 8 | 4
w63 = 3 << 8 | 14
w64 = 30 << 8 | 0
a64 = 33 << 8 | 0
self.assertFalse(_deye_clock_registers_verify_match(w62, w63, w64, w62, w63, a64))
class DeyeSkipTimeSyncPolicyTests(unittest.TestCase):
def test_no_skip_when_never_written_even_if_drift_ok(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | wall.second
self.assertFalse(
_deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64)
)
def test_no_skip_when_drift_large(self) -> None:
wall = datetime.now(PRAGUE_TZ)
wrong_min = (wall.minute - 3) % 60
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wrong_min << 8) | wall.second
self.assertFalse(_deye_should_skip_time_sync_after_read(_inv(sync_at=None), r62, r63, r64))
def test_no_skip_after_24h_even_if_drift_small(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | wall.second
old = datetime.now(timezone.utc) - timedelta(hours=DEYE_CLOCK_RESYNC_INTERVAL_HOURS) - timedelta(minutes=1)
self.assertFalse(
_deye_should_skip_time_sync_after_read(_inv(sync_at=old), r62, r63, r64)
)
def test_skip_within_24h_and_small_drift(self) -> None:
wall = datetime.now(PRAGUE_TZ)
r62 = ((wall.year - 2000) << 8) | wall.month
r63 = (wall.day << 8) | wall.hour
r64 = (wall.minute << 8) | min(59, wall.second + 5)
recent = datetime.now(timezone.utc) - timedelta(hours=1)
self.assertTrue(
_deye_should_skip_time_sync_after_read(_inv(sync_at=recent), r62, r63, r64)
)
self.assertGreater(DEYE_CLOCK_DRIFT_OK_SEC, 5)
class DeyeClockTripletForVerifyTests(unittest.TestCase):
def test_orphan_reg64_fills_w62_w63_from_device_read(self) -> None:
a62 = (2026 - 2000) << 8 | 4
a63 = 10 << 8 | 12
a64 = 45 << 8 | 30
bundle = [SimpleNamespace(register=64, value_to_write=(45 << 8) | 0)]
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, {}, a62, a63, a64)
self.assertEqual(w62, a62)
self.assertEqual(w63, a63)
self.assertEqual(w64, 45 << 8)
def test_last_verified_used_when_not_in_bundle(self) -> None:
bundle: list[SimpleNamespace] = []
last = {62: 1, 63: 2, 64: 3}
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last, 9, 8, 7)
self.assertEqual((w62, w63, w64), (1, 2, 3))
if __name__ == "__main__":
unittest.main()