fix(modbus): zadne vecne pending v journalu + flock timeout + EV poll backoff
Zivy incident home-01 (TeltoCharge .16): zapis 15/19-20 koncil failed s prazdnym error_msg, nebo zustal trvale pending a zablokoval exportni ticky. - _gateway_exclusive: neblokujici flock s deadline (EMS_MODBUS_FLOCK_TIMEOUT_S, default 20 s) -> GatewayLockTimeout misto starvation bez limitu - execute_modbus_commands: invariant written/failed + neprazdny error_msg (str(e) or repr(e)); safety net pres BaseException (CancelledError, chyba DB); journal update mimo retry cyklus zarizeni; force_disconnect bez zamku brany - telemetry poll_ev_chargers: po 3 selhanich backoff 5 min per (host,port,unit) - mrtvy unit_id drzi branu 4x8=32 s z kazde minuty - testy backend/tests/test_modbus_execute_failsafe.py; docs modbus-command-journal.md (sekce Robustnost zapisu + konfigurace) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -198,6 +198,27 @@ def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asy
|
||||
return runs
|
||||
|
||||
|
||||
def _modbus_error_text(e: BaseException) -> str:
|
||||
"""Text chyby pro error_msg — nikdy prázdný (TimeoutError() apod. má str '')."""
|
||||
return str(e).strip() or repr(e)
|
||||
|
||||
|
||||
async def _mark_commands_failed(
|
||||
db: asyncpg.Connection, cmd_ids: list[int], error_msg: str
|
||||
) -> None:
|
||||
for cid in cmd_ids:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE ems.modbus_command
|
||||
SET status='failed', error_msg=$1,
|
||||
attempt_count=attempt_count+1
|
||||
WHERE id=$2
|
||||
""",
|
||||
error_msg,
|
||||
cid,
|
||||
)
|
||||
|
||||
|
||||
async def execute_modbus_commands(
|
||||
command_ids: list[int],
|
||||
db: asyncpg.Connection,
|
||||
@@ -205,6 +226,10 @@ async def execute_modbus_commands(
|
||||
"""
|
||||
Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích).
|
||||
Aktualizuje status na 'written' nebo 'failed'.
|
||||
|
||||
Invariant: žádný z předaných příkazů nesmí zůstat 'pending' — i při
|
||||
CancelledError / GatewayLockTimeout / chybě DB se zbylé řádky označí
|
||||
failed s neprázdným error_msg (safety net níže) a výjimka se propaguje.
|
||||
"""
|
||||
max_retries = 3
|
||||
retry_delay = 0.5
|
||||
@@ -226,67 +251,99 @@ async def execute_modbus_commands(
|
||||
(cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]))
|
||||
].append(cmd)
|
||||
|
||||
#: Ještě nerozhodnuté příkazy (pro safety net při výjimce mimo retry cyklus).
|
||||
unresolved: set[int] = {int(c["id"]) for c in rows}
|
||||
|
||||
all_ok = True
|
||||
for (host, port, unit), group in by_gw.items():
|
||||
client = await get_modbus_client(host, port)
|
||||
for run in _modbus_command_contiguous_runs(group):
|
||||
start_reg = int(run[0]["register"])
|
||||
values = [int(c["value_to_write"]) for c in run]
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
await client.write_registers(start_reg, values, unit)
|
||||
for cmd, val in zip(run, values):
|
||||
cid = int(cmd["id"])
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE ems.modbus_command
|
||||
SET status='written', value_written=$1, written_at=now(),
|
||||
attempt_count=attempt_count+1, error_msg=NULL
|
||||
WHERE id=$2
|
||||
""",
|
||||
val,
|
||||
cid,
|
||||
)
|
||||
logger.info(
|
||||
"[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)",
|
||||
cid,
|
||||
cmd["asset_code"],
|
||||
int(cmd["register"]),
|
||||
val,
|
||||
start_reg,
|
||||
attempt + 1,
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
"Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...",
|
||||
start_reg,
|
||||
len(values),
|
||||
attempt + 1,
|
||||
e,
|
||||
)
|
||||
await asyncio.sleep(retry_delay)
|
||||
await client.force_disconnect()
|
||||
else:
|
||||
for cmd in run:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE ems.modbus_command
|
||||
SET status='failed', error_msg=$1,
|
||||
attempt_count=attempt_count+1
|
||||
WHERE id=$2
|
||||
""",
|
||||
str(e),
|
||||
int(cmd["id"]),
|
||||
try:
|
||||
for (host, port, unit), group in by_gw.items():
|
||||
client = await get_modbus_client(host, port)
|
||||
for run in _modbus_command_contiguous_runs(group):
|
||||
start_reg = int(run[0]["register"])
|
||||
values = [int(c["value_to_write"]) for c in run]
|
||||
write_err: Exception | None = None
|
||||
attempts_used = 0
|
||||
for attempt in range(max_retries):
|
||||
attempts_used = attempt + 1
|
||||
try:
|
||||
await client.write_registers(start_reg, values, unit)
|
||||
write_err = None
|
||||
break
|
||||
except Exception as e:
|
||||
write_err = e
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(
|
||||
"Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...",
|
||||
start_reg,
|
||||
len(values),
|
||||
attempt + 1,
|
||||
_modbus_error_text(e),
|
||||
)
|
||||
logger.error(
|
||||
"Modbus batch 0x%04X count=%s all %s attempts failed: %s",
|
||||
start_reg,
|
||||
len(values),
|
||||
max_retries,
|
||||
e,
|
||||
)
|
||||
all_ok = False
|
||||
await asyncio.sleep(retry_delay)
|
||||
try:
|
||||
await client.force_disconnect()
|
||||
except Exception as de:
|
||||
logger.warning(
|
||||
"Modbus force_disconnect %s:%s failed: %s",
|
||||
host,
|
||||
port,
|
||||
_modbus_error_text(de),
|
||||
)
|
||||
|
||||
if write_err is not None:
|
||||
err = _modbus_error_text(write_err)
|
||||
await _mark_commands_failed(db, [int(c["id"]) for c in run], err)
|
||||
for c in run:
|
||||
unresolved.discard(int(c["id"]))
|
||||
logger.error(
|
||||
"Modbus batch 0x%04X count=%s all %s attempts failed: %s",
|
||||
start_reg,
|
||||
len(values),
|
||||
max_retries,
|
||||
err,
|
||||
)
|
||||
all_ok = False
|
||||
continue
|
||||
|
||||
# Journal update mimo retry cyklus — chyba DB nesmí vyvolat
|
||||
# další zápis do zařízení; spadne do safety netu níže.
|
||||
for cmd, val in zip(run, values):
|
||||
cid = int(cmd["id"])
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE ems.modbus_command
|
||||
SET status='written', value_written=$1, written_at=now(),
|
||||
attempt_count=attempt_count+1, error_msg=NULL
|
||||
WHERE id=$2
|
||||
""",
|
||||
val,
|
||||
cid,
|
||||
)
|
||||
unresolved.discard(cid)
|
||||
logger.info(
|
||||
"[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)",
|
||||
cid,
|
||||
cmd["asset_code"],
|
||||
int(cmd["register"]),
|
||||
val,
|
||||
start_reg,
|
||||
attempts_used,
|
||||
)
|
||||
except BaseException as e:
|
||||
# Safety net: CancelledError (shutdown / zrušený task), GatewayLockTimeout
|
||||
# propadlý mimo retry cyklus, chyba DB v success větvi, … — nic nesmí
|
||||
# zůstat 'pending'. Best effort: označit a výjimku propagovat dál.
|
||||
err = f"execute aborted: {_modbus_error_text(e)}"
|
||||
try:
|
||||
await _mark_commands_failed(db, sorted(unresolved), err)
|
||||
except Exception as me:
|
||||
logger.error(
|
||||
"Modbus journal: nelze označit %s příkazů failed (%s): %s",
|
||||
len(unresolved),
|
||||
err,
|
||||
_modbus_error_text(me),
|
||||
)
|
||||
logger.error("execute_modbus_commands aborted: %s", err)
|
||||
raise
|
||||
|
||||
return all_ok
|
||||
|
||||
@@ -25,9 +25,27 @@ logger = logging.getLogger(__name__)
|
||||
_flock_warned = False
|
||||
|
||||
|
||||
class GatewayLockTimeout(TimeoutError):
|
||||
"""Brána je držena jiným tahem (telemetrie / druhý proces) déle než timeout."""
|
||||
|
||||
|
||||
_BACKEND_ROOT = Path(__file__).resolve().parent.parent
|
||||
_DEFAULT_LOCK_DIR = _BACKEND_ROOT / ".ems-modbus-locks"
|
||||
|
||||
#: Maximální čekání na exkluzivní zámek brány. Dřív se čekalo blokovaně bez
|
||||
#: limitu — exporter pak mohl na bráně obsazené pollingem mrtvého unit_id
|
||||
#: viset donekonečna (journal řádky trvale 'pending'). Po timeoutu se vyhodí
|
||||
#: GatewayLockTimeout a volající označí příkaz failed ('gateway lock timeout').
|
||||
_FLOCK_TIMEOUT_DEFAULT_S = 20.0
|
||||
_FLOCK_POLL_INTERVAL_S = 0.25
|
||||
|
||||
|
||||
def _flock_timeout_s() -> float:
|
||||
try:
|
||||
return float(os.getenv("EMS_MODBUS_FLOCK_TIMEOUT_S", _FLOCK_TIMEOUT_DEFAULT_S))
|
||||
except ValueError:
|
||||
return _FLOCK_TIMEOUT_DEFAULT_S
|
||||
|
||||
|
||||
def _gateway_lock_path(host: str, port: int) -> Path:
|
||||
# Výchozí = backend/.ems-modbus-locks (v Dockeru /app → mount ./backend), aby flock sdílel
|
||||
@@ -65,14 +83,32 @@ async def _gateway_exclusive(host: str, port: int):
|
||||
path = _gateway_lock_path(host_s, port_i)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
f = open(path, "a+b") # noqa: SIM115
|
||||
locked = False
|
||||
try:
|
||||
await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_EX)
|
||||
# Neblokující pokusy s deadline místo flock(LOCK_EX) bez limitu:
|
||||
# blokované čekání v to_thread nejde zrušit a při bráně obsazené
|
||||
# pollingem mrtvého unit_id (32 s z každé minuty) hrozí starvation.
|
||||
timeout_s = _flock_timeout_s()
|
||||
deadline = asyncio.get_running_loop().time() + timeout_s
|
||||
while True:
|
||||
try:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
locked = True
|
||||
break
|
||||
except OSError:
|
||||
if asyncio.get_running_loop().time() >= deadline:
|
||||
raise GatewayLockTimeout(
|
||||
f"gateway lock timeout {host_s}:{port_i} "
|
||||
f"after {timeout_s:.0f}s"
|
||||
) from None
|
||||
await asyncio.sleep(_FLOCK_POLL_INTERVAL_S)
|
||||
yield
|
||||
finally:
|
||||
try:
|
||||
await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_UN)
|
||||
except OSError:
|
||||
pass
|
||||
if locked:
|
||||
try:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
|
||||
except OSError:
|
||||
pass
|
||||
f.close()
|
||||
|
||||
|
||||
@@ -260,12 +296,17 @@ class PersistentModbusClient:
|
||||
return await self._write_registers_locked(address, values, device_id)
|
||||
|
||||
async def force_disconnect(self) -> None:
|
||||
"""Uzavře socket pod lockem (např. před retry po chybě)."""
|
||||
async with _gateway_exclusive(self.host, self.port):
|
||||
async with self._lock:
|
||||
if self._client is not None:
|
||||
self._client.close()
|
||||
self._client = None
|
||||
"""Uzavře socket pod lockem (např. před retry po chybě).
|
||||
|
||||
Záměrně BEZ _gateway_exclusive: zavření vlastního TCP socketu není
|
||||
transakce na RS485 sběrnici a čekání na zámek brány tady umělo
|
||||
protáhnout / shodit retry cestu exporteru (GatewayLockTimeout
|
||||
uvnitř except větve execute_modbus_commands).
|
||||
"""
|
||||
async with self._lock:
|
||||
if self._client is not None:
|
||||
self._client.close()
|
||||
self._client = None
|
||||
|
||||
@asynccontextmanager
|
||||
async def batch(self, device_id: int = 1) -> AsyncIterator[ModbusBatch]:
|
||||
|
||||
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import asyncpg
|
||||
@@ -264,6 +265,35 @@ DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
|
||||
TELTO_REG_BLOCK_START = 0
|
||||
TELTO_REG_BLOCK_COUNT = 41
|
||||
|
||||
#: Backoff pro nedosažitelný wallbox: čtení mrtvého unit_id drží exkluzivní
|
||||
#: zámek brány až (retries+1)×timeout = 4×8 = 32 s (pymodbus) — každou minutu.
|
||||
#: Po EV_POLL_FAIL_THRESHOLD selháních v řadě se poll daného (host,port,unit)
|
||||
#: zkouší jen 1× za EV_POLL_BACKOFF_S; úspěšné čtení backoff resetuje.
|
||||
EV_POLL_FAIL_THRESHOLD = 3
|
||||
EV_POLL_BACKOFF_S = 300.0
|
||||
_EV_POLL_FAIL_STREAK: dict[tuple[str, int, int], int] = {}
|
||||
_EV_POLL_NEXT_ATTEMPT: dict[tuple[str, int, int], float] = {}
|
||||
|
||||
|
||||
def _ev_poll_should_skip(key: tuple[str, int, int], now_mono: float) -> bool:
|
||||
return (
|
||||
_EV_POLL_FAIL_STREAK.get(key, 0) >= EV_POLL_FAIL_THRESHOLD
|
||||
and now_mono < _EV_POLL_NEXT_ATTEMPT.get(key, 0.0)
|
||||
)
|
||||
|
||||
|
||||
def _ev_poll_record_failure(key: tuple[str, int, int], now_mono: float) -> int:
|
||||
streak = _EV_POLL_FAIL_STREAK.get(key, 0) + 1
|
||||
_EV_POLL_FAIL_STREAK[key] = streak
|
||||
if streak >= EV_POLL_FAIL_THRESHOLD:
|
||||
_EV_POLL_NEXT_ATTEMPT[key] = now_mono + EV_POLL_BACKOFF_S
|
||||
return streak
|
||||
|
||||
|
||||
def _ev_poll_record_success(key: tuple[str, int, int]) -> None:
|
||||
_EV_POLL_FAIL_STREAK.pop(key, None)
|
||||
_EV_POLL_NEXT_ATTEMPT.pop(key, None)
|
||||
|
||||
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
|
||||
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
|
||||
TELTO_STATUS_MAP = {
|
||||
@@ -430,6 +460,15 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
|
||||
port = int(row["port"] or 502)
|
||||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||||
|
||||
poll_key = (str(host), port, unit_id)
|
||||
now_mono = time.monotonic()
|
||||
if _ev_poll_should_skip(poll_key, now_mono):
|
||||
logger.debug(
|
||||
"EV charger %s (%s:%s u%s) in backoff, poll skipped",
|
||||
code, host, port, unit_id,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
client = await get_modbus_client(host, port)
|
||||
async with client.batch(unit_id) as mb:
|
||||
@@ -440,8 +479,19 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
|
||||
except Exception as e:
|
||||
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
|
||||
# falešně ukončoval EV session a špinil bazál (power 0).
|
||||
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
|
||||
streak = _ev_poll_record_failure(poll_key, time.monotonic())
|
||||
backoff = (
|
||||
f" (streak {streak} >= {EV_POLL_FAIL_THRESHOLD}, "
|
||||
f"backoff {EV_POLL_BACKOFF_S:.0f}s — neblokovat bránu)"
|
||||
if streak >= EV_POLL_FAIL_THRESHOLD
|
||||
else ""
|
||||
)
|
||||
logger.warning(
|
||||
"EV charger %s (%s:%s u%s) read failed: %s%s",
|
||||
code, host, port, unit_id, e, backoff,
|
||||
)
|
||||
continue
|
||||
_ev_poll_record_success(poll_key)
|
||||
|
||||
current_status = str(frame["status"])
|
||||
if frame["error_bits"]:
|
||||
|
||||
234
backend/tests/test_modbus_execute_failsafe.py
Normal file
234
backend/tests/test_modbus_execute_failsafe.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""execute_modbus_commands: žádná cesta nesmí nechat příkaz 'pending'.
|
||||
|
||||
Regrese na živý incident home-01 (TeltoCharge 172.16.1.16): zápisová trojice
|
||||
(15, 19–20) buď skončila 'failed' s prázdným error_msg (str(TimeoutError())
|
||||
== ''), nebo zůstala trvale 'pending' (export visel bez limitu na flock brány
|
||||
obsazené pollingem mrtvého unit_id; výjimka mimo retry cyklus stav neuložila).
|
||||
|
||||
Testy: (1) error_msg nikdy prázdný; (2) GatewayLockTimeout → failed
|
||||
s 'gateway lock timeout'; (3) CancelledError / chyba DB → safety net označí
|
||||
zbylé příkazy failed a výjimku propaguje; (4) flock s timeoutem v
|
||||
modbus_client; (5) backoff pollingu nedosažitelného wallboxu.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import fcntl
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import services.control.modbus_journal as journal
|
||||
import services.modbus_client as mc
|
||||
import services.telemetry_collector as tc
|
||||
from services.control.modbus_journal import (
|
||||
_modbus_error_text,
|
||||
execute_modbus_commands,
|
||||
)
|
||||
from services.modbus_client import GatewayLockTimeout
|
||||
|
||||
|
||||
def _cmd_row(cid: int, reg: int, val: int = 0) -> dict:
|
||||
return {
|
||||
"id": cid,
|
||||
"register": reg,
|
||||
"value_to_write": val,
|
||||
"device_host": "172.16.1.16",
|
||||
"device_port": 502,
|
||||
"device_unit_id": 1,
|
||||
"asset_code": "ev-charger-1",
|
||||
}
|
||||
|
||||
|
||||
class _JournalDB:
|
||||
"""In-memory journal — sleduje status a error_msg per command id."""
|
||||
|
||||
def __init__(self, rows: list[dict], fail_written_update: bool = False) -> None:
|
||||
self.rows = {r["id"]: dict(r) for r in rows}
|
||||
self.status = {r["id"]: "pending" for r in rows}
|
||||
self.error_msg: dict[int, str | None] = {r["id"]: None for r in rows}
|
||||
self.fail_written_update = fail_written_update
|
||||
|
||||
async def fetchrow(self, query: str, cid: int) -> dict | None:
|
||||
return self.rows.get(cid)
|
||||
|
||||
async def execute(self, query: str, *args: object) -> None:
|
||||
if "status='written'" in query:
|
||||
if self.fail_written_update:
|
||||
raise RuntimeError("db connection lost")
|
||||
_val, cid = args
|
||||
self.status[int(cid)] = "written" # type: ignore[arg-type]
|
||||
self.error_msg[int(cid)] = None # type: ignore[arg-type]
|
||||
elif "status='failed'" in query:
|
||||
msg, cid = args
|
||||
self.status[int(cid)] = "failed" # type: ignore[arg-type]
|
||||
self.error_msg[int(cid)] = str(msg) # type: ignore[arg-type]
|
||||
else:
|
||||
raise AssertionError(f"unexpected execute: {query}")
|
||||
|
||||
|
||||
def _fake_client(write_exc: BaseException | None = None) -> AsyncMock:
|
||||
client = AsyncMock()
|
||||
if write_exc is not None:
|
||||
client.write_registers.side_effect = write_exc
|
||||
client.force_disconnect = AsyncMock()
|
||||
return client
|
||||
|
||||
|
||||
class ErrorTextTests(unittest.TestCase):
|
||||
def test_empty_str_exception_falls_back_to_repr(self) -> None:
|
||||
self.assertEqual(_modbus_error_text(TimeoutError()), "TimeoutError()")
|
||||
|
||||
def test_nonempty_str_kept(self) -> None:
|
||||
self.assertEqual(_modbus_error_text(OSError("boom")), "boom")
|
||||
|
||||
|
||||
class ExecuteFailsafeTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def _run(
|
||||
self,
|
||||
db: _JournalDB,
|
||||
client: AsyncMock,
|
||||
ids: list[int],
|
||||
) -> bool:
|
||||
with (
|
||||
patch.object(journal, "get_modbus_client", AsyncMock(return_value=client)),
|
||||
patch.object(journal.asyncio, "sleep", AsyncMock()),
|
||||
):
|
||||
return await execute_modbus_commands(ids, db) # type: ignore[arg-type]
|
||||
|
||||
async def test_timeout_with_empty_str_marks_failed_with_nonempty_msg(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||||
ok = await self._run(db, _fake_client(TimeoutError()), [1, 2, 3])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(set(db.status.values()), {"failed"})
|
||||
for msg in db.error_msg.values():
|
||||
self.assertTrue(msg) # nikdy NULL/prázdný
|
||||
|
||||
async def test_gateway_lock_timeout_marks_failed_with_reason(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15)])
|
||||
exc = GatewayLockTimeout("gateway lock timeout 172.16.1.16:502 after 20s")
|
||||
ok = await self._run(db, _fake_client(exc), [1])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(db.status[1], "failed")
|
||||
self.assertIn("gateway lock timeout", db.error_msg[1] or "")
|
||||
|
||||
async def test_cancelled_error_marks_failed_and_reraises(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await self._run(db, _fake_client(asyncio.CancelledError()), [1, 2, 3])
|
||||
self.assertEqual(set(db.status.values()), {"failed"})
|
||||
for msg in db.error_msg.values():
|
||||
self.assertIn("execute aborted", msg or "")
|
||||
|
||||
async def test_db_failure_in_written_update_marks_rest_failed(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19)], fail_written_update=True)
|
||||
with self.assertRaises(RuntimeError):
|
||||
await self._run(db, _fake_client(), [1, 2])
|
||||
self.assertEqual(set(db.status.values()), {"failed"})
|
||||
self.assertIn("db connection lost", db.error_msg[1] or "")
|
||||
|
||||
async def test_force_disconnect_failure_does_not_leave_pending(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15)])
|
||||
client = _fake_client(OSError("write boom"))
|
||||
client.force_disconnect.side_effect = OSError("disconnect boom")
|
||||
ok = await self._run(db, client, [1])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(db.status[1], "failed")
|
||||
self.assertIn("write boom", db.error_msg[1] or "")
|
||||
|
||||
async def test_success_path_still_written(self) -> None:
|
||||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||||
ok = await self._run(db, _fake_client(), [1, 2, 3])
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(set(db.status.values()), {"written"})
|
||||
|
||||
|
||||
class GatewayFlockTimeoutTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_lock_timeout_raises_gateway_lock_timeout(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d, patch.dict(
|
||||
os.environ,
|
||||
{"EMS_MODBUS_LOCK_DIR": d, "EMS_MODBUS_FLOCK_TIMEOUT_S": "0.3"},
|
||||
):
|
||||
path = mc._gateway_lock_path("10.99.99.99", 502)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
holder = open(path, "a+b") # noqa: SIM115
|
||||
fcntl.flock(holder.fileno(), fcntl.LOCK_EX)
|
||||
try:
|
||||
with self.assertRaises(GatewayLockTimeout) as ctx:
|
||||
async with mc._gateway_exclusive("10.99.99.99", 502):
|
||||
pass
|
||||
self.assertIn("gateway lock timeout", str(ctx.exception))
|
||||
finally:
|
||||
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
|
||||
holder.close()
|
||||
|
||||
async def test_lock_acquired_when_free(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as d, patch.dict(
|
||||
os.environ, {"EMS_MODBUS_LOCK_DIR": d}
|
||||
):
|
||||
async with mc._gateway_exclusive("10.99.99.98", 502):
|
||||
pass # bez výjimky
|
||||
|
||||
|
||||
class EvPollBackoffTests(unittest.TestCase):
|
||||
KEY = ("172.16.1.16", 502, 2)
|
||||
|
||||
def setUp(self) -> None:
|
||||
tc._EV_POLL_FAIL_STREAK.clear()
|
||||
tc._EV_POLL_NEXT_ATTEMPT.clear()
|
||||
|
||||
def test_below_threshold_never_skips(self) -> None:
|
||||
tc._ev_poll_record_failure(self.KEY, 100.0)
|
||||
tc._ev_poll_record_failure(self.KEY, 160.0)
|
||||
self.assertFalse(tc._ev_poll_should_skip(self.KEY, 220.0))
|
||||
|
||||
def test_skips_after_threshold_until_backoff_elapses(self) -> None:
|
||||
for t in (100.0, 160.0, 220.0):
|
||||
tc._ev_poll_record_failure(self.KEY, t)
|
||||
self.assertTrue(tc._ev_poll_should_skip(self.KEY, 221.0))
|
||||
self.assertTrue(
|
||||
tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S - 1)
|
||||
)
|
||||
self.assertFalse(
|
||||
tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S + 1)
|
||||
)
|
||||
|
||||
def test_success_resets_streak(self) -> None:
|
||||
for t in (100.0, 160.0, 220.0):
|
||||
tc._ev_poll_record_failure(self.KEY, t)
|
||||
tc._ev_poll_record_success(self.KEY)
|
||||
self.assertFalse(tc._ev_poll_should_skip(self.KEY, 221.0))
|
||||
|
||||
|
||||
class _PollDB:
|
||||
"""Jen řádek chargeru pro poll_ev_chargers (failure path se dál nedotkne DB)."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.row = {
|
||||
"id": 7,
|
||||
"code": "ev-charger-2",
|
||||
"host": "172.16.1.16",
|
||||
"port": 502,
|
||||
"unit_id": 2,
|
||||
}
|
||||
|
||||
async def fetch(self, query: str, *args: object) -> list[dict]:
|
||||
return [self.row]
|
||||
|
||||
|
||||
class PollEvChargersBackoffIntegrationTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_dead_unit_stops_hitting_gateway_after_threshold(self) -> None:
|
||||
tc._EV_POLL_FAIL_STREAK.clear()
|
||||
tc._EV_POLL_NEXT_ATTEMPT.clear()
|
||||
get_client = AsyncMock(side_effect=OSError("unit 2 unreachable"))
|
||||
with patch.object(tc, "get_modbus_client", get_client):
|
||||
for _ in range(tc.EV_POLL_FAIL_THRESHOLD):
|
||||
await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type]
|
||||
self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD)
|
||||
# další tick uvnitř backoff okna už na bránu nesahá
|
||||
await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type]
|
||||
self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -79,6 +79,46 @@ pro **reg 178** (spolu s peak shaving bity 4–5).
|
||||
|
||||
**Dávky:** `execute_modbus_commands` slučuje souvislé adresy do jednoho **`write_registers`** (FC **0x10**). `verify_modbus_commands` čte zpět po souvislých blocích (`read_holding_registers`, FC 0x03). Detail režimů: `modbus-registers.md`.
|
||||
|
||||
## Robustnost zápisu — žádné věčné `pending`
|
||||
|
||||
Invariant `execute_modbus_commands`: **každý předaný příkaz skončí `written`
|
||||
nebo `failed` s neprázdným `error_msg`** — žádná cesta nesmí nechat řádek
|
||||
trvale `pending` (živý incident home-01 2026-06-12: TeltoCharge trojice
|
||||
15/19–20 zůstala `pending` > 13 min, jiný běh skončil `failed` bez chyby).
|
||||
|
||||
1. **Zámek brány s timeoutem.** `_gateway_exclusive` (flock per `host:port`,
|
||||
`services/modbus_client.py`) už nečeká blokovaně bez limitu, ale
|
||||
neblokujícími pokusy do **`EMS_MODBUS_FLOCK_TIMEOUT_S`** (default **20 s**).
|
||||
Po vypršení vyhodí `GatewayLockTimeout` („gateway lock timeout host:port…“)
|
||||
→ retry cyklus zápisu příkaz označí **`failed`** s touto zprávou. Dřív mohl
|
||||
exporter na bráně obsazené pollingem mrtvého unit_id viset donekonečna
|
||||
(flock nemá FIFO — starvation) a protože APScheduler drží
|
||||
`max_instances=1`, zablokoval i všechny další exportní ticky.
|
||||
2. **`error_msg` nikdy prázdný.** `_modbus_error_text`: `str(e)` nebo `repr(e)`
|
||||
(`TimeoutError()` / `ConnectionResetError()` mají prázdný `str` → dřív
|
||||
vypadalo jako `failed` „bez chyby“).
|
||||
3. **Safety net.** Celý průchod je v `try/except BaseException`: i při
|
||||
`CancelledError` (shutdown / zrušený task), chybě DB nebo bugu se zbylé
|
||||
nerozhodnuté příkazy best-effort označí `failed`
|
||||
(`execute aborted: …`) a výjimka se propaguje dál.
|
||||
4. **Journal update mimo retry cyklus zařízení.** Chyba DB při UPDATE na
|
||||
`written` nevyvolá další (duplicitní) zápis do zařízení — spadne do safety
|
||||
netu.
|
||||
5. **`force_disconnect` bez zámku brány** — zavření vlastního TCP socketu není
|
||||
transakce na RS485; čekání na flock v retry větvi by jinak mohlo samo
|
||||
timeoutovat.
|
||||
|
||||
**Backoff telemetrie pro nedosažitelný wallbox** (`telemetry_collector.poll_ev_chargers`):
|
||||
čtení mrtvého unit_id drží exkluzivní zámek brány až **(retries+1) × timeout
|
||||
= 4 × 8 = 32 s** (pymodbus) — při poll smyčce 60 s je brána obsazená ~53 %
|
||||
času a zápisy exportu se na ní dusí. Po **3** selháních v řadě se poll daného
|
||||
`(host, port, unit_id)` zkouší jen **1× za 5 min** (`EV_POLL_FAIL_THRESHOLD`,
|
||||
`EV_POLL_BACKOFF_S`); úspěšné čtení backoff resetuje. Při výpadku čtení se
|
||||
nadále nic nezapisuje do telemetrie (žádný fabrikovaný `available`).
|
||||
|
||||
Testy: `backend/tests/test_modbus_execute_failsafe.py` (prázdný `str(e)`,
|
||||
gateway lock timeout, CancelledError, chyba DB, backoff pollingu).
|
||||
|
||||
## APScheduler
|
||||
|
||||
| Job | Frekvence | Popis |
|
||||
@@ -104,6 +144,11 @@ Poznámka: **GEN port cut-off na BA81** se aktuálně provádí přímo přes De
|
||||
## Konfigurace
|
||||
|
||||
- `.env`: `DISCORD_WEBHOOK_URL` — prázdné = notifikace vypnuté (jen log).
|
||||
- `.env`: `EMS_MODBUS_FLOCK_TIMEOUT_S` — max. čekání na exkluzivní zámek brány
|
||||
(default 20 s); po vypršení `GatewayLockTimeout` → příkaz `failed`
|
||||
s `error_msg = gateway lock timeout …`.
|
||||
- `.env`: `EMS_MODBUS_LOCK_DIR`, `EMS_MODBUS_DISABLE_FLOCK` — umístění /
|
||||
vypnutí flock souborů (`services/modbus_client.py`).
|
||||
|
||||
## Související soubory
|
||||
|
||||
|
||||
Reference in New Issue
Block a user