diff --git a/backend/services/control/modbus_journal.py b/backend/services/control/modbus_journal.py index ab3aecf..558ec6a 100644 --- a/backend/services/control/modbus_journal.py +++ b/backend/services/control/modbus_journal.py @@ -198,6 +198,27 @@ def _modbus_command_contiguous_runs(cmds: list[asyncpg.Record]) -> list[list[asy return runs +def _modbus_error_text(e: BaseException) -> str: + """Text chyby pro error_msg — nikdy prázdný (TimeoutError() apod. má str '').""" + return str(e).strip() or repr(e) + + +async def _mark_commands_failed( + db: asyncpg.Connection, cmd_ids: list[int], error_msg: str +) -> None: + for cid in cmd_ids: + await db.execute( + """ + UPDATE ems.modbus_command + SET status='failed', error_msg=$1, + attempt_count=attempt_count+1 + WHERE id=$2 + """, + error_msg, + cid, + ) + + async def execute_modbus_commands( command_ids: list[int], db: asyncpg.Connection, @@ -205,6 +226,10 @@ async def execute_modbus_commands( """ Zapíše příkazy z modbus_command do zařízení (FC 0x10 po souvislých blocích). Aktualizuje status na 'written' nebo 'failed'. + + Invariant: žádný z předaných příkazů nesmí zůstat 'pending' — i při + CancelledError / GatewayLockTimeout / chybě DB se zbylé řádky označí + failed s neprázdným error_msg (safety net níže) a výjimka se propaguje. """ max_retries = 3 retry_delay = 0.5 @@ -226,67 +251,99 @@ async def execute_modbus_commands( (cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"])) ].append(cmd) + #: Ještě nerozhodnuté příkazy (pro safety net při výjimce mimo retry cyklus). + unresolved: set[int] = {int(c["id"]) for c in rows} + all_ok = True - for (host, port, unit), group in by_gw.items(): - client = await get_modbus_client(host, port) - for run in _modbus_command_contiguous_runs(group): - start_reg = int(run[0]["register"]) - values = [int(c["value_to_write"]) for c in run] - for attempt in range(max_retries): - try: - await client.write_registers(start_reg, values, unit) - for cmd, val in zip(run, values): - cid = int(cmd["id"]) - await db.execute( - """ - UPDATE ems.modbus_command - SET status='written', value_written=$1, written_at=now(), - attempt_count=attempt_count+1, error_msg=NULL - WHERE id=$2 - """, - val, - cid, - ) - logger.info( - "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", - cid, - cmd["asset_code"], - int(cmd["register"]), - val, - start_reg, - attempt + 1, - ) - break - except Exception as e: - if attempt < max_retries - 1: - logger.warning( - "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", - start_reg, - len(values), - attempt + 1, - e, - ) - await asyncio.sleep(retry_delay) - await client.force_disconnect() - else: - for cmd in run: - await db.execute( - """ - UPDATE ems.modbus_command - SET status='failed', error_msg=$1, - attempt_count=attempt_count+1 - WHERE id=$2 - """, - str(e), - int(cmd["id"]), + try: + for (host, port, unit), group in by_gw.items(): + client = await get_modbus_client(host, port) + for run in _modbus_command_contiguous_runs(group): + start_reg = int(run[0]["register"]) + values = [int(c["value_to_write"]) for c in run] + write_err: Exception | None = None + attempts_used = 0 + for attempt in range(max_retries): + attempts_used = attempt + 1 + try: + await client.write_registers(start_reg, values, unit) + write_err = None + break + except Exception as e: + write_err = e + if attempt < max_retries - 1: + logger.warning( + "Modbus batch write 0x%04X count=%s attempt %s failed: %s, retrying...", + start_reg, + len(values), + attempt + 1, + _modbus_error_text(e), ) - logger.error( - "Modbus batch 0x%04X count=%s all %s attempts failed: %s", - start_reg, - len(values), - max_retries, - e, - ) - all_ok = False + await asyncio.sleep(retry_delay) + try: + await client.force_disconnect() + except Exception as de: + logger.warning( + "Modbus force_disconnect %s:%s failed: %s", + host, + port, + _modbus_error_text(de), + ) + + if write_err is not None: + err = _modbus_error_text(write_err) + await _mark_commands_failed(db, [int(c["id"]) for c in run], err) + for c in run: + unresolved.discard(int(c["id"])) + logger.error( + "Modbus batch 0x%04X count=%s all %s attempts failed: %s", + start_reg, + len(values), + max_retries, + err, + ) + all_ok = False + continue + + # Journal update mimo retry cyklus — chyba DB nesmí vyvolat + # další zápis do zařízení; spadne do safety netu níže. + for cmd, val in zip(run, values): + cid = int(cmd["id"]) + await db.execute( + """ + UPDATE ems.modbus_command + SET status='written', value_written=$1, written_at=now(), + attempt_count=attempt_count+1, error_msg=NULL + WHERE id=$2 + """, + val, + cid, + ) + unresolved.discard(cid) + logger.info( + "[cmd %s] %s 0x%04X=%s OK batch@%s (attempt %s)", + cid, + cmd["asset_code"], + int(cmd["register"]), + val, + start_reg, + attempts_used, + ) + except BaseException as e: + # Safety net: CancelledError (shutdown / zrušený task), GatewayLockTimeout + # propadlý mimo retry cyklus, chyba DB v success větvi, … — nic nesmí + # zůstat 'pending'. Best effort: označit a výjimku propagovat dál. + err = f"execute aborted: {_modbus_error_text(e)}" + try: + await _mark_commands_failed(db, sorted(unresolved), err) + except Exception as me: + logger.error( + "Modbus journal: nelze označit %s příkazů failed (%s): %s", + len(unresolved), + err, + _modbus_error_text(me), + ) + logger.error("execute_modbus_commands aborted: %s", err) + raise return all_ok diff --git a/backend/services/modbus_client.py b/backend/services/modbus_client.py index d049de3..93e0c82 100644 --- a/backend/services/modbus_client.py +++ b/backend/services/modbus_client.py @@ -25,9 +25,27 @@ logger = logging.getLogger(__name__) _flock_warned = False +class GatewayLockTimeout(TimeoutError): + """Brána je držena jiným tahem (telemetrie / druhý proces) déle než timeout.""" + + _BACKEND_ROOT = Path(__file__).resolve().parent.parent _DEFAULT_LOCK_DIR = _BACKEND_ROOT / ".ems-modbus-locks" +#: Maximální čekání na exkluzivní zámek brány. Dřív se čekalo blokovaně bez +#: limitu — exporter pak mohl na bráně obsazené pollingem mrtvého unit_id +#: viset donekonečna (journal řádky trvale 'pending'). Po timeoutu se vyhodí +#: GatewayLockTimeout a volající označí příkaz failed ('gateway lock timeout'). +_FLOCK_TIMEOUT_DEFAULT_S = 20.0 +_FLOCK_POLL_INTERVAL_S = 0.25 + + +def _flock_timeout_s() -> float: + try: + return float(os.getenv("EMS_MODBUS_FLOCK_TIMEOUT_S", _FLOCK_TIMEOUT_DEFAULT_S)) + except ValueError: + return _FLOCK_TIMEOUT_DEFAULT_S + def _gateway_lock_path(host: str, port: int) -> Path: # Výchozí = backend/.ems-modbus-locks (v Dockeru /app → mount ./backend), aby flock sdílel @@ -65,14 +83,32 @@ async def _gateway_exclusive(host: str, port: int): path = _gateway_lock_path(host_s, port_i) path.parent.mkdir(parents=True, exist_ok=True) f = open(path, "a+b") # noqa: SIM115 + locked = False try: - await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_EX) + # Neblokující pokusy s deadline místo flock(LOCK_EX) bez limitu: + # blokované čekání v to_thread nejde zrušit a při bráně obsazené + # pollingem mrtvého unit_id (32 s z každé minuty) hrozí starvation. + timeout_s = _flock_timeout_s() + deadline = asyncio.get_running_loop().time() + timeout_s + while True: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + locked = True + break + except OSError: + if asyncio.get_running_loop().time() >= deadline: + raise GatewayLockTimeout( + f"gateway lock timeout {host_s}:{port_i} " + f"after {timeout_s:.0f}s" + ) from None + await asyncio.sleep(_FLOCK_POLL_INTERVAL_S) yield finally: - try: - await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_UN) - except OSError: - pass + if locked: + try: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + except OSError: + pass f.close() @@ -260,12 +296,17 @@ class PersistentModbusClient: return await self._write_registers_locked(address, values, device_id) async def force_disconnect(self) -> None: - """Uzavře socket pod lockem (např. před retry po chybě).""" - async with _gateway_exclusive(self.host, self.port): - async with self._lock: - if self._client is not None: - self._client.close() - self._client = None + """Uzavře socket pod lockem (např. před retry po chybě). + + Záměrně BEZ _gateway_exclusive: zavření vlastního TCP socketu není + transakce na RS485 sběrnici a čekání na zámek brány tady umělo + protáhnout / shodit retry cestu exporteru (GatewayLockTimeout + uvnitř except větve execute_modbus_commands). + """ + async with self._lock: + if self._client is not None: + self._client.close() + self._client = None @asynccontextmanager async def batch(self, device_id: int = 1) -> AsyncIterator[ModbusBatch]: diff --git a/backend/services/telemetry_collector.py b/backend/services/telemetry_collector.py index 7f75721..12e638d 100644 --- a/backend/services/telemetry_collector.py +++ b/backend/services/telemetry_collector.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import logging +import time from datetime import datetime, timezone import asyncpg @@ -264,6 +265,35 @@ DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178 TELTO_REG_BLOCK_START = 0 TELTO_REG_BLOCK_COUNT = 41 +#: Backoff pro nedosažitelný wallbox: čtení mrtvého unit_id drží exkluzivní +#: zámek brány až (retries+1)×timeout = 4×8 = 32 s (pymodbus) — každou minutu. +#: Po EV_POLL_FAIL_THRESHOLD selháních v řadě se poll daného (host,port,unit) +#: zkouší jen 1× za EV_POLL_BACKOFF_S; úspěšné čtení backoff resetuje. +EV_POLL_FAIL_THRESHOLD = 3 +EV_POLL_BACKOFF_S = 300.0 +_EV_POLL_FAIL_STREAK: dict[tuple[str, int, int], int] = {} +_EV_POLL_NEXT_ATTEMPT: dict[tuple[str, int, int], float] = {} + + +def _ev_poll_should_skip(key: tuple[str, int, int], now_mono: float) -> bool: + return ( + _EV_POLL_FAIL_STREAK.get(key, 0) >= EV_POLL_FAIL_THRESHOLD + and now_mono < _EV_POLL_NEXT_ATTEMPT.get(key, 0.0) + ) + + +def _ev_poll_record_failure(key: tuple[str, int, int], now_mono: float) -> int: + streak = _EV_POLL_FAIL_STREAK.get(key, 0) + 1 + _EV_POLL_FAIL_STREAK[key] = streak + if streak >= EV_POLL_FAIL_THRESHOLD: + _EV_POLL_NEXT_ATTEMPT[key] = now_mono + EV_POLL_BACKOFF_S + return streak + + +def _ev_poll_record_success(key: tuple[str, int, int]) -> None: + _EV_POLL_FAIL_STREAK.pop(key, None) + _EV_POLL_NEXT_ATTEMPT.pop(key, None) + #: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available' #: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'. TELTO_STATUS_MAP = { @@ -430,6 +460,15 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) + poll_key = (str(host), port, unit_id) + now_mono = time.monotonic() + if _ev_poll_should_skip(poll_key, now_mono): + logger.debug( + "EV charger %s (%s:%s u%s) in backoff, poll skipped", + code, host, port, unit_id, + ) + continue + try: client = await get_modbus_client(host, port) async with client.batch(unit_id) as mb: @@ -440,8 +479,19 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None: except Exception as e: # Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by # falešně ukončoval EV session a špinil bazál (power 0). - logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e) + streak = _ev_poll_record_failure(poll_key, time.monotonic()) + backoff = ( + f" (streak {streak} >= {EV_POLL_FAIL_THRESHOLD}, " + f"backoff {EV_POLL_BACKOFF_S:.0f}s — neblokovat bránu)" + if streak >= EV_POLL_FAIL_THRESHOLD + else "" + ) + logger.warning( + "EV charger %s (%s:%s u%s) read failed: %s%s", + code, host, port, unit_id, e, backoff, + ) continue + _ev_poll_record_success(poll_key) current_status = str(frame["status"]) if frame["error_bits"]: diff --git a/backend/tests/test_modbus_execute_failsafe.py b/backend/tests/test_modbus_execute_failsafe.py new file mode 100644 index 0000000..f461f96 --- /dev/null +++ b/backend/tests/test_modbus_execute_failsafe.py @@ -0,0 +1,234 @@ +"""execute_modbus_commands: žádná cesta nesmí nechat příkaz 'pending'. + +Regrese na živý incident home-01 (TeltoCharge 172.16.1.16): zápisová trojice +(15, 19–20) buď skončila 'failed' s prázdným error_msg (str(TimeoutError()) +== ''), nebo zůstala trvale 'pending' (export visel bez limitu na flock brány +obsazené pollingem mrtvého unit_id; výjimka mimo retry cyklus stav neuložila). + +Testy: (1) error_msg nikdy prázdný; (2) GatewayLockTimeout → failed +s 'gateway lock timeout'; (3) CancelledError / chyba DB → safety net označí +zbylé příkazy failed a výjimku propaguje; (4) flock s timeoutem v +modbus_client; (5) backoff pollingu nedosažitelného wallboxu. +""" + +import asyncio +import fcntl +import os +import tempfile +import unittest +from unittest.mock import AsyncMock, patch + +import services.control.modbus_journal as journal +import services.modbus_client as mc +import services.telemetry_collector as tc +from services.control.modbus_journal import ( + _modbus_error_text, + execute_modbus_commands, +) +from services.modbus_client import GatewayLockTimeout + + +def _cmd_row(cid: int, reg: int, val: int = 0) -> dict: + return { + "id": cid, + "register": reg, + "value_to_write": val, + "device_host": "172.16.1.16", + "device_port": 502, + "device_unit_id": 1, + "asset_code": "ev-charger-1", + } + + +class _JournalDB: + """In-memory journal — sleduje status a error_msg per command id.""" + + def __init__(self, rows: list[dict], fail_written_update: bool = False) -> None: + self.rows = {r["id"]: dict(r) for r in rows} + self.status = {r["id"]: "pending" for r in rows} + self.error_msg: dict[int, str | None] = {r["id"]: None for r in rows} + self.fail_written_update = fail_written_update + + async def fetchrow(self, query: str, cid: int) -> dict | None: + return self.rows.get(cid) + + async def execute(self, query: str, *args: object) -> None: + if "status='written'" in query: + if self.fail_written_update: + raise RuntimeError("db connection lost") + _val, cid = args + self.status[int(cid)] = "written" # type: ignore[arg-type] + self.error_msg[int(cid)] = None # type: ignore[arg-type] + elif "status='failed'" in query: + msg, cid = args + self.status[int(cid)] = "failed" # type: ignore[arg-type] + self.error_msg[int(cid)] = str(msg) # type: ignore[arg-type] + else: + raise AssertionError(f"unexpected execute: {query}") + + +def _fake_client(write_exc: BaseException | None = None) -> AsyncMock: + client = AsyncMock() + if write_exc is not None: + client.write_registers.side_effect = write_exc + client.force_disconnect = AsyncMock() + return client + + +class ErrorTextTests(unittest.TestCase): + def test_empty_str_exception_falls_back_to_repr(self) -> None: + self.assertEqual(_modbus_error_text(TimeoutError()), "TimeoutError()") + + def test_nonempty_str_kept(self) -> None: + self.assertEqual(_modbus_error_text(OSError("boom")), "boom") + + +class ExecuteFailsafeTests(unittest.IsolatedAsyncioTestCase): + async def _run( + self, + db: _JournalDB, + client: AsyncMock, + ids: list[int], + ) -> bool: + with ( + patch.object(journal, "get_modbus_client", AsyncMock(return_value=client)), + patch.object(journal.asyncio, "sleep", AsyncMock()), + ): + return await execute_modbus_commands(ids, db) # type: ignore[arg-type] + + async def test_timeout_with_empty_str_marks_failed_with_nonempty_msg(self) -> None: + db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)]) + ok = await self._run(db, _fake_client(TimeoutError()), [1, 2, 3]) + self.assertFalse(ok) + self.assertEqual(set(db.status.values()), {"failed"}) + for msg in db.error_msg.values(): + self.assertTrue(msg) # nikdy NULL/prázdný + + async def test_gateway_lock_timeout_marks_failed_with_reason(self) -> None: + db = _JournalDB([_cmd_row(1, 15)]) + exc = GatewayLockTimeout("gateway lock timeout 172.16.1.16:502 after 20s") + ok = await self._run(db, _fake_client(exc), [1]) + self.assertFalse(ok) + self.assertEqual(db.status[1], "failed") + self.assertIn("gateway lock timeout", db.error_msg[1] or "") + + async def test_cancelled_error_marks_failed_and_reraises(self) -> None: + db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)]) + with self.assertRaises(asyncio.CancelledError): + await self._run(db, _fake_client(asyncio.CancelledError()), [1, 2, 3]) + self.assertEqual(set(db.status.values()), {"failed"}) + for msg in db.error_msg.values(): + self.assertIn("execute aborted", msg or "") + + async def test_db_failure_in_written_update_marks_rest_failed(self) -> None: + db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19)], fail_written_update=True) + with self.assertRaises(RuntimeError): + await self._run(db, _fake_client(), [1, 2]) + self.assertEqual(set(db.status.values()), {"failed"}) + self.assertIn("db connection lost", db.error_msg[1] or "") + + async def test_force_disconnect_failure_does_not_leave_pending(self) -> None: + db = _JournalDB([_cmd_row(1, 15)]) + client = _fake_client(OSError("write boom")) + client.force_disconnect.side_effect = OSError("disconnect boom") + ok = await self._run(db, client, [1]) + self.assertFalse(ok) + self.assertEqual(db.status[1], "failed") + self.assertIn("write boom", db.error_msg[1] or "") + + async def test_success_path_still_written(self) -> None: + db = _JournalDB([_cmd_row(1, 15), _cmd_row(2, 19), _cmd_row(3, 20)]) + ok = await self._run(db, _fake_client(), [1, 2, 3]) + self.assertTrue(ok) + self.assertEqual(set(db.status.values()), {"written"}) + + +class GatewayFlockTimeoutTests(unittest.IsolatedAsyncioTestCase): + async def test_lock_timeout_raises_gateway_lock_timeout(self) -> None: + with tempfile.TemporaryDirectory() as d, patch.dict( + os.environ, + {"EMS_MODBUS_LOCK_DIR": d, "EMS_MODBUS_FLOCK_TIMEOUT_S": "0.3"}, + ): + path = mc._gateway_lock_path("10.99.99.99", 502) + path.parent.mkdir(parents=True, exist_ok=True) + holder = open(path, "a+b") # noqa: SIM115 + fcntl.flock(holder.fileno(), fcntl.LOCK_EX) + try: + with self.assertRaises(GatewayLockTimeout) as ctx: + async with mc._gateway_exclusive("10.99.99.99", 502): + pass + self.assertIn("gateway lock timeout", str(ctx.exception)) + finally: + fcntl.flock(holder.fileno(), fcntl.LOCK_UN) + holder.close() + + async def test_lock_acquired_when_free(self) -> None: + with tempfile.TemporaryDirectory() as d, patch.dict( + os.environ, {"EMS_MODBUS_LOCK_DIR": d} + ): + async with mc._gateway_exclusive("10.99.99.98", 502): + pass # bez výjimky + + +class EvPollBackoffTests(unittest.TestCase): + KEY = ("172.16.1.16", 502, 2) + + def setUp(self) -> None: + tc._EV_POLL_FAIL_STREAK.clear() + tc._EV_POLL_NEXT_ATTEMPT.clear() + + def test_below_threshold_never_skips(self) -> None: + tc._ev_poll_record_failure(self.KEY, 100.0) + tc._ev_poll_record_failure(self.KEY, 160.0) + self.assertFalse(tc._ev_poll_should_skip(self.KEY, 220.0)) + + def test_skips_after_threshold_until_backoff_elapses(self) -> None: + for t in (100.0, 160.0, 220.0): + tc._ev_poll_record_failure(self.KEY, t) + self.assertTrue(tc._ev_poll_should_skip(self.KEY, 221.0)) + self.assertTrue( + tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S - 1) + ) + self.assertFalse( + tc._ev_poll_should_skip(self.KEY, 220.0 + tc.EV_POLL_BACKOFF_S + 1) + ) + + def test_success_resets_streak(self) -> None: + for t in (100.0, 160.0, 220.0): + tc._ev_poll_record_failure(self.KEY, t) + tc._ev_poll_record_success(self.KEY) + self.assertFalse(tc._ev_poll_should_skip(self.KEY, 221.0)) + + +class _PollDB: + """Jen řádek chargeru pro poll_ev_chargers (failure path se dál nedotkne DB).""" + + def __init__(self) -> None: + self.row = { + "id": 7, + "code": "ev-charger-2", + "host": "172.16.1.16", + "port": 502, + "unit_id": 2, + } + + async def fetch(self, query: str, *args: object) -> list[dict]: + return [self.row] + + +class PollEvChargersBackoffIntegrationTests(unittest.IsolatedAsyncioTestCase): + async def test_dead_unit_stops_hitting_gateway_after_threshold(self) -> None: + tc._EV_POLL_FAIL_STREAK.clear() + tc._EV_POLL_NEXT_ATTEMPT.clear() + get_client = AsyncMock(side_effect=OSError("unit 2 unreachable")) + with patch.object(tc, "get_modbus_client", get_client): + for _ in range(tc.EV_POLL_FAIL_THRESHOLD): + await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type] + self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD) + # další tick uvnitř backoff okna už na bránu nesahá + await tc.poll_ev_chargers(1, _PollDB()) # type: ignore[arg-type] + self.assertEqual(get_client.await_count, tc.EV_POLL_FAIL_THRESHOLD) + + +if __name__ == "__main__": + unittest.main() diff --git a/docs/04-modules/modbus-command-journal.md b/docs/04-modules/modbus-command-journal.md index a11cae6..873d5c9 100644 --- a/docs/04-modules/modbus-command-journal.md +++ b/docs/04-modules/modbus-command-journal.md @@ -79,6 +79,46 @@ pro **reg 178** (spolu s peak shaving bity 4–5). **Dávky:** `execute_modbus_commands` slučuje souvislé adresy do jednoho **`write_registers`** (FC **0x10**). `verify_modbus_commands` čte zpět po souvislých blocích (`read_holding_registers`, FC 0x03). Detail režimů: `modbus-registers.md`. +## Robustnost zápisu — žádné věčné `pending` + +Invariant `execute_modbus_commands`: **každý předaný příkaz skončí `written` +nebo `failed` s neprázdným `error_msg`** — žádná cesta nesmí nechat řádek +trvale `pending` (živý incident home-01 2026-06-12: TeltoCharge trojice +15/19–20 zůstala `pending` > 13 min, jiný běh skončil `failed` bez chyby). + +1. **Zámek brány s timeoutem.** `_gateway_exclusive` (flock per `host:port`, + `services/modbus_client.py`) už nečeká blokovaně bez limitu, ale + neblokujícími pokusy do **`EMS_MODBUS_FLOCK_TIMEOUT_S`** (default **20 s**). + Po vypršení vyhodí `GatewayLockTimeout` („gateway lock timeout host:port…“) + → retry cyklus zápisu příkaz označí **`failed`** s touto zprávou. Dřív mohl + exporter na bráně obsazené pollingem mrtvého unit_id viset donekonečna + (flock nemá FIFO — starvation) a protože APScheduler drží + `max_instances=1`, zablokoval i všechny další exportní ticky. +2. **`error_msg` nikdy prázdný.** `_modbus_error_text`: `str(e)` nebo `repr(e)` + (`TimeoutError()` / `ConnectionResetError()` mají prázdný `str` → dřív + vypadalo jako `failed` „bez chyby“). +3. **Safety net.** Celý průchod je v `try/except BaseException`: i při + `CancelledError` (shutdown / zrušený task), chybě DB nebo bugu se zbylé + nerozhodnuté příkazy best-effort označí `failed` + (`execute aborted: …`) a výjimka se propaguje dál. +4. **Journal update mimo retry cyklus zařízení.** Chyba DB při UPDATE na + `written` nevyvolá další (duplicitní) zápis do zařízení — spadne do safety + netu. +5. **`force_disconnect` bez zámku brány** — zavření vlastního TCP socketu není + transakce na RS485; čekání na flock v retry větvi by jinak mohlo samo + timeoutovat. + +**Backoff telemetrie pro nedosažitelný wallbox** (`telemetry_collector.poll_ev_chargers`): +čtení mrtvého unit_id drží exkluzivní zámek brány až **(retries+1) × timeout += 4 × 8 = 32 s** (pymodbus) — při poll smyčce 60 s je brána obsazená ~53 % +času a zápisy exportu se na ní dusí. Po **3** selháních v řadě se poll daného +`(host, port, unit_id)` zkouší jen **1× za 5 min** (`EV_POLL_FAIL_THRESHOLD`, +`EV_POLL_BACKOFF_S`); úspěšné čtení backoff resetuje. Při výpadku čtení se +nadále nic nezapisuje do telemetrie (žádný fabrikovaný `available`). + +Testy: `backend/tests/test_modbus_execute_failsafe.py` (prázdný `str(e)`, +gateway lock timeout, CancelledError, chyba DB, backoff pollingu). + ## APScheduler | Job | Frekvence | Popis | @@ -104,6 +144,11 @@ Poznámka: **GEN port cut-off na BA81** se aktuálně provádí přímo přes De ## Konfigurace - `.env`: `DISCORD_WEBHOOK_URL` — prázdné = notifikace vypnuté (jen log). +- `.env`: `EMS_MODBUS_FLOCK_TIMEOUT_S` — max. čekání na exkluzivní zámek brány + (default 20 s); po vypršení `GatewayLockTimeout` → příkaz `failed` + s `error_msg = gateway lock timeout …`. +- `.env`: `EMS_MODBUS_LOCK_DIR`, `EMS_MODBUS_DISABLE_FLOCK` — umístění / + vypnutí flock souborů (`services/modbus_client.py`). ## Související soubory