Zivy incident home-01 (TeltoCharge .16): zapis 15/19-20 koncil failed s prazdnym error_msg, nebo zustal trvale pending a zablokoval exportni ticky. - _gateway_exclusive: neblokujici flock s deadline (EMS_MODBUS_FLOCK_TIMEOUT_S, default 20 s) -> GatewayLockTimeout misto starvation bez limitu - execute_modbus_commands: invariant written/failed + neprazdny error_msg (str(e) or repr(e)); safety net pres BaseException (CancelledError, chyba DB); journal update mimo retry cyklus zarizeni; force_disconnect bez zamku brany - telemetry poll_ev_chargers: po 3 selhanich backoff 5 min per (host,port,unit) - mrtvy unit_id drzi branu 4x8=32 s z kazde minuty - testy backend/tests/test_modbus_execute_failsafe.py; docs modbus-command-journal.md (sekce Robustnost zapisu + konfigurace) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
235 lines
9.4 KiB
Python
235 lines
9.4 KiB
Python
"""execute_modbus_commands: žádná cesta nesmí nechat příkaz 'pending'.
|
||
|
||
Regrese na živý incident home-01 (TeltoCharge 172.16.1.16): zápisová trojice
|
||
(15, 19–20) buď skončila 'failed' s prázdným error_msg (str(TimeoutError())
|
||
== ''), nebo zůstala trvale 'pending' (export visel bez limitu na flock brány
|
||
obsazené pollingem mrtvého unit_id; výjimka mimo retry cyklus stav neuložila).
|
||
|
||
Testy: (1) error_msg nikdy prázdný; (2) GatewayLockTimeout → failed
|
||
s 'gateway lock timeout'; (3) CancelledError / chyba DB → safety net označí
|
||
zbylé příkazy failed a výjimku propaguje; (4) flock s timeoutem v
|
||
modbus_client; (5) backoff pollingu nedosažitelného wallboxu.
|
||
"""
|
||
|
||
import asyncio
|
||
import fcntl
|
||
import os
|
||
import tempfile
|
||
import unittest
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
import services.control.modbus_journal as journal
|
||
import services.modbus_client as mc
|
||
import services.telemetry_collector as tc
|
||
from services.control.modbus_journal import (
|
||
_modbus_error_text,
|
||
execute_modbus_commands,
|
||
)
|
||
from services.modbus_client import GatewayLockTimeout
|
||
|
||
|
||
def _cmd_row(cid: int, reg: int, val: int = 0) -> dict:
|
||
return {
|
||
"id": cid,
|
||
"register": reg,
|
||
"value_to_write": val,
|
||
"device_host": "172.16.1.16",
|
||
"device_port": 502,
|
||
"device_unit_id": 1,
|
||
"asset_code": "ev-charger-1",
|
||
}
|
||
|
||
|
||
class _JournalDB:
|
||
"""In-memory journal — sleduje status a error_msg per command id."""
|
||
|
||
def __init__(self, rows: list[dict], fail_written_update: bool = False) -> None:
|
||
self.rows = {r["id"]: dict(r) for r in rows}
|
||
self.status = {r["id"]: "pending" for r in rows}
|
||
self.error_msg: dict[int, str | None] = {r["id"]: None for r in rows}
|
||
self.fail_written_update = fail_written_update
|
||
|
||
async def fetchrow(self, query: str, cid: int) -> dict | None:
|
||
return self.rows.get(cid)
|
||
|
||
async def execute(self, query: str, *args: object) -> None:
|
||
if "status='written'" in query:
|
||
if self.fail_written_update:
|
||
raise RuntimeError("db connection lost")
|
||
_val, cid = args
|
||
self.status[int(cid)] = "written" # type: ignore[arg-type]
|
||
self.error_msg[int(cid)] = None # type: ignore[arg-type]
|
||
elif "status='failed'" in query:
|
||
msg, cid = args
|
||
self.status[int(cid)] = "failed" # type: ignore[arg-type]
|
||
self.error_msg[int(cid)] = str(msg) # type: ignore[arg-type]
|
||
else:
|
||
raise AssertionError(f"unexpected execute: {query}")
|
||
|
||
|
||
def _fake_client(write_exc: BaseException | None = None) -> AsyncMock:
|
||
client = AsyncMock()
|
||
if write_exc is not None:
|
||
client.write_registers.side_effect = write_exc
|
||
client.force_disconnect = AsyncMock()
|
||
return client
|
||
|
||
|
||
class ErrorTextTests(unittest.TestCase):
|
||
def test_empty_str_exception_falls_back_to_repr(self) -> None:
|
||
self.assertEqual(_modbus_error_text(TimeoutError()), "TimeoutError()")
|
||
|
||
def test_nonempty_str_kept(self) -> None:
|
||
self.assertEqual(_modbus_error_text(OSError("boom")), "boom")
|
||
|
||
|
||
class ExecuteFailsafeTests(unittest.IsolatedAsyncioTestCase):
|
||
async def _run(
|
||
self,
|
||
db: _JournalDB,
|
||
client: AsyncMock,
|
||
ids: list[int],
|
||
) -> bool:
|
||
with (
|
||
patch.object(journal, "get_modbus_client", AsyncMock(return_value=client)),
|
||
patch.object(journal.asyncio, "sleep", AsyncMock()),
|
||
):
|
||
return await execute_modbus_commands(ids, db) # type: ignore[arg-type]
|
||
|
||
async def test_timeout_with_empty_str_marks_failed_with_nonempty_msg(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||
ok = await self._run(db, _fake_client(TimeoutError()), [1, 2, 3])
|
||
self.assertFalse(ok)
|
||
self.assertEqual(set(db.status.values()), {"failed"})
|
||
for msg in db.error_msg.values():
|
||
self.assertTrue(msg) # nikdy NULL/prázdný
|
||
|
||
async def test_gateway_lock_timeout_marks_failed_with_reason(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15)])
|
||
exc = GatewayLockTimeout("gateway lock timeout 172.16.1.16:502 after 20s")
|
||
ok = await self._run(db, _fake_client(exc), [1])
|
||
self.assertFalse(ok)
|
||
self.assertEqual(db.status[1], "failed")
|
||
self.assertIn("gateway lock timeout", db.error_msg[1] or "")
|
||
|
||
async def test_cancelled_error_marks_failed_and_reraises(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||
with self.assertRaises(asyncio.CancelledError):
|
||
await self._run(db, _fake_client(asyncio.CancelledError()), [1, 2, 3])
|
||
self.assertEqual(set(db.status.values()), {"failed"})
|
||
for msg in db.error_msg.values():
|
||
self.assertIn("execute aborted", msg or "")
|
||
|
||
async def test_db_failure_in_written_update_marks_rest_failed(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19)], fail_written_update=True)
|
||
with self.assertRaises(RuntimeError):
|
||
await self._run(db, _fake_client(), [1, 2])
|
||
self.assertEqual(set(db.status.values()), {"failed"})
|
||
self.assertIn("db connection lost", db.error_msg[1] or "")
|
||
|
||
async def test_force_disconnect_failure_does_not_leave_pending(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15)])
|
||
client = _fake_client(OSError("write boom"))
|
||
client.force_disconnect.side_effect = OSError("disconnect boom")
|
||
ok = await self._run(db, client, [1])
|
||
self.assertFalse(ok)
|
||
self.assertEqual(db.status[1], "failed")
|
||
self.assertIn("write boom", db.error_msg[1] or "")
|
||
|
||
async def test_success_path_still_written(self) -> None:
|
||
db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)])
|
||
ok = await self._run(db, _fake_client(), [1, 2, 3])
|
||
self.assertTrue(ok)
|
||
self.assertEqual(set(db.status.values()), {"written"})
|
||
|
||
|
||
class GatewayFlockTimeoutTests(unittest.IsolatedAsyncioTestCase):
|
||
async def test_lock_timeout_raises_gateway_lock_timeout(self) -> None:
|
||
with tempfile.TemporaryDirectory() as d, patch.dict(
|
||
os.environ,
|
||
{"EMS_MODBUS_LOCK_DIR": d, "EMS_MODBUS_FLOCK_TIMEOUT_S": "0.3"},
|
||
):
|
||
path = mc._gateway_lock_path("10.99.99.99", 502)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
holder = open(path, "a+b") # noqa: SIM115
|
||
fcntl.flock(holder.fileno(), fcntl.LOCK_EX)
|
||
try:
|
||
with self.assertRaises(GatewayLockTimeout) as ctx:
|
||
async with mc._gateway_exclusive("10.99.99.99", 502):
|
||
pass
|
||
self.assertIn("gateway lock timeout", str(ctx.exception))
|
||
finally:
|
||
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
|
||
holder.close()
|
||
|
||
async def test_lock_acquired_when_free(self) -> None:
|
||
with tempfile.TemporaryDirectory() as d, patch.dict(
|
||
os.environ, {"EMS_MODBUS_LOCK_DIR": d}
|
||
):
|
||
async with mc._gateway_exclusive("10.99.99.98", 502):
|
||
pass # bez výjimky
|
||
|
||
|
||
class EvPollBackoffTests(unittest.TestCase):
|
||
KEY = ("172.16.1.16", 502, 2)
|
||
|
||
def setUp(self) -> None:
|
||
tc._EV_POLL_FAIL_STREAK.clear()
|
||
tc._EV_POLL_NEXT_ATTEMPT.clear()
|
||
|
||
def test_below_threshold_never_skips(self) -> None:
|
||
tc._ev_poll_record_failure(self.KEY, 100.0)
|
||
tc._ev_poll_record_failure(self.KEY, 160.0)
|
||
self.assertFalse(tc._ev_poll_should_skip(self.KEY, 220.0))
|
||
|
||
def test_skips_after_threshold_until_backoff_elapses(self) -> None:
|
||
for t in (100.0, 160.0, 220.0):
|
||
tc._ev_poll_record_failure(self.KEY, t)
|
||
self.assertTrue(tc._ev_poll_should_skip(self.KEY, 221.0))
|
||
self.assertTrue(
|
||
tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S - 1)
|
||
)
|
||
self.assertFalse(
|
||
tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S + 1)
|
||
)
|
||
|
||
def test_success_resets_streak(self) -> None:
|
||
for t in (100.0, 160.0, 220.0):
|
||
tc._ev_poll_record_failure(self.KEY, t)
|
||
tc._ev_poll_record_success(self.KEY)
|
||
self.assertFalse(tc._ev_poll_should_skip(self.KEY, 221.0))
|
||
|
||
|
||
class _PollDB:
|
||
"""Jen řádek chargeru pro poll_ev_chargers (failure path se dál nedotkne DB)."""
|
||
|
||
def __init__(self) -> None:
|
||
self.row = {
|
||
"id": 7,
|
||
"code": "ev-charger-2",
|
||
"host": "172.16.1.16",
|
||
"port": 502,
|
||
"unit_id": 2,
|
||
}
|
||
|
||
async def fetch(self, query: str, *args: object) -> list[dict]:
|
||
return [self.row]
|
||
|
||
|
||
class PollEvChargersBackoffIntegrationTests(unittest.IsolatedAsyncioTestCase):
|
||
async def test_dead_unit_stops_hitting_gateway_after_threshold(self) -> None:
|
||
tc._EV_POLL_FAIL_STREAK.clear()
|
||
tc._EV_POLL_NEXT_ATTEMPT.clear()
|
||
get_client = AsyncMock(side_effect=OSError("unit 2 unreachable"))
|
||
with patch.object(tc, "get_modbus_client", get_client):
|
||
for _ in range(tc.EV_POLL_FAIL_THRESHOLD):
|
||
await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type]
|
||
self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD)
|
||
# další tick uvnitř backoff okna už na bránu nesahá
|
||
await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type]
|
||
self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|