fix zapisovani casu
All checks were successful
deploy / deploy (push) Successful in 4m23s
test / smoke-test (push) Successful in 6s

This commit is contained in:
Dusan Vojacek
2026-04-10 20:17:17 +02:00
parent abd6f484c6
commit ec55285bdd
5 changed files with 200 additions and 48 deletions

View File

@@ -49,6 +49,9 @@ _DEYE_INACTIVE_TOU_REGISTERS: frozenset[int] = frozenset(
]
)
# Systémový čas Deye — vždy toleranční verify jako celek 6264 (reg 64 sám nesmí do striktní větve).
DEYE_CLOCK_REGS: frozenset[int] = frozenset({62, 63, 64})
DEYE_REGISTER_NAMES: dict[int, str] = {
108: "max_charge_a (max nabíjecí proud baterie)",
109: "max_discharge_a (max vybíjecí proud baterie)",
@@ -178,9 +181,9 @@ def _deye_should_skip_time_sync_after_read(
r64: int,
) -> bool:
"""
True = nezařazovat zápis 6264: drift je malý a od posledního úspěšného ověření času
(status verified v journalu 6264) neuplynul 24h — sloupec deye_last_system_time_sync_at
se doplňuje jen po tolerančním ověření v _verify_deye_clock_command_run.
True = nezařazovat zápis 6264: drift je malý a od posledního úspěšného zápisu (FC 0x10 ACK)
nebo tolerančního ověření neuplynulo 24h — sloupec deye_last_system_time_sync_at doplňuje
write_inverter_setpoints po úspěšném zápisu batche obsahujícího 6264 a znovu po úspěšném verify.
"""
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
if dev is None:
@@ -202,11 +205,35 @@ def _deye_should_skip_time_sync_after_read(
return True
def _is_deye_contiguous_clock_run(run: list[asyncpg.Record]) -> bool:
if len(run) != 3:
return False
regs = sorted(int(c["register"]) for c in run)
return regs == [62, 63, 64]
async def _fetch_written_deye_clock_commands(
site_id: int,
asset_id: int,
host: str,
port: int,
unit_id: int,
db: asyncpg.Connection,
) -> list[asyncpg.Record]:
"""Všechny řádky journalu 6264 ve stavu written pro daný invertor/endpoint."""
rows = await db.fetch(
"""
SELECT * FROM ems.modbus_command
WHERE site_id = $1
AND asset_type = 'inverter'
AND asset_id = $2
AND device_host = $3
AND device_port = $4
AND device_unit_id = $5
AND register IN (62, 63, 64)
AND status = 'written'
ORDER BY register
""",
site_id,
asset_id,
host,
port,
unit_id,
)
return list(rows)
async def _fetch_last_verified_inverter_registers(
@@ -453,30 +480,72 @@ async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason:
logger.critical("Site %s switched to SELF_SUSTAIN: %s", site_id, reason)
async def _verify_deye_clock_command_run(
run: list[asyncpg.Record],
values: list[int],
db: asyncpg.Connection,
def _modbus_cmd_register(cmd: Any) -> int:
"""asyncpg.Record má __getitem__; objekty s atributem .register též (testy)."""
try:
return int(cmd["register"])
except (KeyError, TypeError):
return int(cmd.register)
def _deye_expected_clock_triplet_for_verify(
bundle: list[asyncpg.Record],
last_verified: dict[int, int],
a62: int,
a63: int,
a64: int,
) -> tuple[int, int, int]:
"""
Sestaví očekávané (w62,w63,w64) pro toleranční verify.
Chybějící registry doplní poslední verified nebo aktuálním přečtením ze zařízení
(aby osiřelý zápis např. jen 64 nešel do striktního porovnání reg64).
"""
by_reg = {_modbus_cmd_register(c): c for c in bundle}
def _vtw(c: Any) -> int:
try:
return int(c["value_to_write"])
except (KeyError, TypeError):
return int(c.value_to_write)
w62 = _vtw(by_reg[62]) if 62 in by_reg else last_verified.get(62, a62)
w63 = _vtw(by_reg[63]) if 63 in by_reg else last_verified.get(63, a63)
w64 = _vtw(by_reg[64]) if 64 in by_reg else last_verified.get(64, a64)
return (int(w62), int(w63), int(w64))
async def _verify_deye_clock_written_bundle(
site_id: int,
bundle: list[asyncpg.Record],
a62: int,
a63: int,
a64: int,
db: asyncpg.Connection,
) -> bool:
"""
Ověření souvislého bloku 6264: porovnání času z trojice registrů s tolerancí (sekundy na Deye běží).
Při mismatch retry všech tří řádků journalu společně.
Toleranční ověření pro jeden až tři řádky journalu 6264 ve stavu written.
Při mismatch retry společně; bez přepnutí do SELF_SUSTAIN po 3 pokusech.
"""
from services.notification_service import (
notify_modbus_clock_verify_exhausted,
notify_modbus_mismatch,
)
run_s = sorted(run, key=lambda c: int(c["register"]))
w62 = int(run_s[0]["value_to_write"])
w63 = int(run_s[1]["value_to_write"])
w64 = int(run_s[2]["value_to_write"])
a62, a63, a64 = (int(values[0]), int(values[1]), int(values[2]))
cmds_s = sorted(bundle, key=_modbus_cmd_register)
try:
asset_id = int(cmds_s[0]["asset_id"])
except (KeyError, TypeError):
asset_id = int(cmds_s[0].asset_id)
last_v = await _fetch_last_verified_inverter_registers(site_id, asset_id, db)
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last_v, a62, a63, a64)
clock_ok = _deye_clock_registers_verify_match(w62, w63, w64, a62, a63, a64)
actual_by_reg = {62: a62, 63: a63, 64: a64}
for cmd, actual in zip(run_s, values):
cid = int(cmd["id"])
for cmd in cmds_s:
try:
cid = int(cmd["id"])
except (KeyError, TypeError):
cid = int(cmd.id)
r = _modbus_cmd_register(cmd)
await db.execute(
"""
UPDATE ems.modbus_command
@@ -484,13 +553,12 @@ async def _verify_deye_clock_command_run(
status=CASE WHEN $2::boolean THEN 'verified' ELSE 'mismatch' END
WHERE id=$3::int
""",
int(actual),
actual_by_reg[r],
clock_ok,
cid,
)
if clock_ok:
inv_asset_id = int(run_s[0]["asset_id"])
await db.execute(
"""
UPDATE ems.asset_inverter
@@ -499,22 +567,35 @@ async def _verify_deye_clock_command_run(
WHERE id = $2
""",
_prague_minute_start_utc(),
inv_asset_id,
asset_id,
)
for cmd, actual in zip(run_s, values):
for cmd in cmds_s:
try:
cid_l = int(cmd["id"])
except (KeyError, TypeError):
cid_l = int(cmd.id)
try:
code_l = str(cmd["asset_code"])
except (KeyError, TypeError):
code_l = str(cmd.asset_code)
rr = _modbus_cmd_register(cmd)
logger.info(
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
int(cmd["id"]),
cmd["asset_code"],
int(cmd["register"]),
int(actual),
cid_l,
code_l,
rr,
actual_by_reg[rr],
)
return True
cmd0 = run_s[0]
cmd0 = cmds_s[0]
try:
ac0 = str(cmd0["asset_code"])
except (KeyError, TypeError):
ac0 = str(cmd0.asset_code)
logger.error(
"[cmd clock] MISMATCH %s 6264: written=(%s,%s,%s) actual=(%s,%s,%s)",
cmd0["asset_code"],
ac0,
w62,
w63,
w64,
@@ -524,15 +605,19 @@ async def _verify_deye_clock_command_run(
)
attempts = 0
for cmd in run_s:
for cmd in cmds_s:
try:
cid_q = int(cmd["id"])
except (KeyError, TypeError):
cid_q = int(cmd.id)
row_ac = await db.fetchrow(
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", int(cmd["id"])
"SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cid_q
)
ac = int(row_ac["attempt_count"] or 0) if row_ac else 0
attempts = max(attempts, ac)
await notify_modbus_mismatch(
str(cmd0["asset_code"]),
ac0,
62,
"system_time_62_64",
w62,
@@ -540,7 +625,12 @@ async def _verify_deye_clock_command_run(
attempts,
)
ids_ordered = [int(c["id"]) for c in run_s]
ids_ordered = []
for c in cmds_s:
try:
ids_ordered.append(int(c["id"]))
except (KeyError, TypeError):
ids_ordered.append(int(c.id))
if attempts < 3:
for cid in ids_ordered:
await db.execute(
@@ -556,7 +646,7 @@ async def _verify_deye_clock_command_run(
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
await notify_modbus_clock_verify_exhausted(
site["code"] if site else str(site_id),
str(cmd0["asset_code"]),
ac0,
(w62, w63, w64),
(a62, a63, a64),
)
@@ -663,7 +753,40 @@ async def verify_modbus_commands(
all_ok = True
for (host, port, unit), group in by_gw.items():
client = await get_modbus_client(host, port)
for run in _modbus_command_contiguous_runs(group):
clock_cmds = [c for c in group if int(c["register"]) in DEYE_CLOCK_REGS]
rest = [c for c in group if int(c["register"]) not in DEYE_CLOCK_REGS]
if clock_cmds:
asset_id = int(clock_cmds[0]["asset_id"])
bundle = await _fetch_written_deye_clock_commands(
site_id, asset_id, host, port, unit, db
)
if not bundle:
bundle = clock_cmds
try:
cvals = await client.read_holding_registers(62, 3, unit)
except Exception as e:
logger.error("verify clock read 6264 failed: %s", e)
all_ok = False
else:
if len(cvals) != 3:
logger.error(
"verify clock read: expected 3 regs, got %s", len(cvals)
)
all_ok = False
else:
matched = await _verify_deye_clock_written_bundle(
site_id,
bundle,
int(cvals[0]),
int(cvals[1]),
int(cvals[2]),
db,
)
if not matched:
all_ok = False
for run in _modbus_command_contiguous_runs(rest):
start_reg = int(run[0]["register"])
n = len(run)
try:
@@ -683,11 +806,6 @@ async def verify_modbus_commands(
)
all_ok = False
continue
if _is_deye_contiguous_clock_run(run):
matched = await _verify_deye_clock_command_run(run, values, db, site_id)
if not matched:
all_ok = False
continue
for cmd, actual in zip(run, values):
matched = await _apply_verify_result(cmd, int(actual))
if not matched:
@@ -1172,7 +1290,7 @@ async def write_inverter_setpoints(
if skip_time:
logger.info(
"Deye clock 6264 skipped (drift ≤ %ss, last write < %sh ago): %s CET",
"Deye clock 6264 skipped (drift ≤ %ss, last sync < %sh ago): %s CET",
DEYE_CLOCK_DRIFT_OK_SEC,
DEYE_CLOCK_RESYNC_INTERVAL_HOURS,
now_cet.strftime("%Y-%m-%d %H:%M:%S"),
@@ -1290,6 +1408,19 @@ async def write_inverter_setpoints(
return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)"
logger.info("[control] Inverter %s journal write OK", inv.code)
will_write_time = any(int(r) in DEYE_CLOCK_REGS for r, _, _ in registers)
if will_write_time:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
inv.id,
)
if need_inactive_tou or will_write_inactive:
await db.execute(
"""

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from services.control_exporter import (
DEYE_CLOCK_DRIFT_OK_SEC,
@@ -12,6 +13,7 @@ from services.control_exporter import (
InverterConfig,
PRAGUE_TZ,
_deye_clock_registers_verify_match,
_deye_expected_clock_triplet_for_verify,
_deye_registers_to_prague_datetime,
_deye_should_skip_time_sync_after_read,
_deye_system_time_register_rows,
@@ -121,5 +123,23 @@ class DeyeSkipTimeSyncPolicyTests(unittest.TestCase):
self.assertGreater(DEYE_CLOCK_DRIFT_OK_SEC, 5)
class DeyeClockTripletForVerifyTests(unittest.TestCase):
def test_orphan_reg64_fills_w62_w63_from_device_read(self) -> None:
a62 = (2026 - 2000) << 8 | 4
a63 = 10 << 8 | 12
a64 = 45 << 8 | 30
bundle = [SimpleNamespace(register=64, value_to_write=(45 << 8) | 0)]
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, {}, a62, a63, a64)
self.assertEqual(w62, a62)
self.assertEqual(w63, a63)
self.assertEqual(w64, 45 << 8)
def test_last_verified_used_when_not_in_bundle(self) -> None:
bundle: list[SimpleNamespace] = []
last = {62: 1, 63: 2, 64: 3}
w62, w63, w64 = _deye_expected_clock_triplet_for_verify(bundle, last, 9, 8, 7)
self.assertEqual((w62, w63, w64), (1, 2, 3))
if __name__ == "__main__":
unittest.main()