"""Persistentní Modbus TCP klient na sdílené Waveshare / RS485 bráně (jedno spojení + lock).""" from __future__ import annotations import asyncio import hashlib import logging import os from collections.abc import AsyncIterator from contextlib import asynccontextmanager from pathlib import Path from pymodbus.client import AsyncModbusTcpClient try: import fcntl _FCNTL = True except ImportError: fcntl = None # type: ignore[assignment] _FCNTL = False logger = logging.getLogger(__name__) _flock_warned = False _BACKEND_ROOT = Path(__file__).resolve().parent.parent _DEFAULT_LOCK_DIR = _BACKEND_ROOT / ".ems-modbus-locks" def _gateway_lock_path(host: str, port: int) -> Path: # Výchozí = backend/.ems-modbus-locks (v Dockeru /app → mount ./backend), aby flock sdílel # hostitel + kontejner při dvou backend procesech na stejné bráně; přepiš EMS_MODBUS_LOCK_DIR. base = Path(os.getenv("EMS_MODBUS_LOCK_DIR", str(_DEFAULT_LOCK_DIR))) h = hashlib.sha256(f"{host.strip()}:{int(port)}".encode()).hexdigest()[:20] return base / f"{h}.lock" @asynccontextmanager async def _gateway_exclusive(host: str, port: int): """ Jedna RS485 linka přes levné TCP↔serial brány nesmí obsluhovat dva procesy najednou — odpovědi pak mají cizí transaction_id (např. 22 vs 54000). flock serializuje napříč PID. Vypnout: EMS_MODBUS_DISABLE_FLOCK=1 (nebo neexistující fcntl, např. Windows). """ global _flock_warned port_i = int(port) host_s = host.strip() if ( not _FCNTL or os.getenv("EMS_MODBUS_DISABLE_FLOCK", "").lower() in ("1", "true", "yes") ): if not _FCNTL and not _flock_warned: logger.warning( "Modbus: fcntl nedostupný — meziprocesová serializace na bránu %s:%s " "neaktivní (riziko kolizí při dvou masterech)", host_s, port_i, ) _flock_warned = True yield return path = _gateway_lock_path(host_s, port_i) path.parent.mkdir(parents=True, exist_ok=True) f = open(path, "a+b") # noqa: SIM115 try: await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_EX) yield finally: try: await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_UN) except OSError: pass f.close() class ModbusBatch: """Více read/write pod jedním držením locku (žádný jiný task na stejném klientovi mezi nimi).""" def __init__(self, owner: PersistentModbusClient, device_id: int) -> None: self._o = owner self._device_id = device_id async def read_register(self, address: int) -> int: return await self._o._read_register_locked(address, self._device_id) async def read_register_signed(self, address: int) -> int: raw = await self.read_register(address) return raw - 65536 if raw > 32767 else raw async def read_holding_registers(self, address: int, count: int) -> list[int]: """Jedna PDU (FC3) pro po sobě jdoucí registry — méně kolizí na RS485 bránách.""" return await self._o._read_holding_registers_locked( address, count, self._device_id ) async def write_register(self, address: int, value: int) -> bool: return await self._o._write_register_locked(address, value, self._device_id) async def write_registers(self, address: int, values: list[int]) -> bool: return await self._o._write_registers_locked(address, values, self._device_id) class PersistentModbusClient: """ Jedno persistentní TCP spojení na převodník (host:port). Unit ID se předává u každé operace — serial servery často nezvládají paralelní TCP sockety na stejnou bránu (pližící se transaction_id mismatch). Serializuje všechny požadavky přes asyncio.Lock(). """ def __init__(self, host: str, port: int) -> None: self.host = host.strip() self.port = int(port) self._client: AsyncModbusTcpClient | None = None self._lock = asyncio.Lock() async def _ensure_connected(self) -> None: if self._client is not None and self._client.connected: return if self._client is not None: self._client.close() self._client = None logger.info("Modbus connecting %s:%s", self.host, self.port) self._client = AsyncModbusTcpClient( self.host, port=self.port, timeout=8, retries=3, ) await self._client.connect() if not self._client.connected: self._client.close() self._client = None raise ConnectionError(f"Cannot connect Modbus {self.host}:{self.port}") logger.info("Modbus connected %s:%s", self.host, self.port) async def _read_register_locked(self, address: int, device_id: int) -> int: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: r = await self._client.read_holding_registers( address, count=1, device_id=int(device_id) ) if r.isError() or not getattr(r, "registers", None): raise OSError(f"Read error 0x{address:04X}: {r!r}") return int(r.registers[0]) except Exception as e: logger.warning("Modbus read 0x%04X failed: %s", address, e) self._client.close() self._client = None raise async def _read_holding_registers_locked( self, address: int, count: int, device_id: int ) -> list[int]: if count < 1 or count > 125: raise ValueError(f"invalid holding register count: {count}") if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: r = await self._client.read_holding_registers( address, count=count, device_id=int(device_id) ) if r.isError() or not getattr(r, "registers", None): raise OSError(f"Read error 0x{address:04X} x{count}: {r!r}") if len(r.registers) != count: raise OSError( f"Read 0x{address:04X}: expected {count} regs, got {len(r.registers)}" ) return [int(x) for x in r.registers] except Exception as e: logger.warning( "Modbus read 0x%04X count=%s failed: %s", address, count, e ) self._client.close() self._client = None raise async def _write_registers_locked( self, address: int, values: list[int], device_id: int ) -> bool: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: clamped = [max(0, min(65535, int(v))) for v in values] r = await self._client.write_registers( address, clamped, device_id=int(device_id) ) if r.isError(): raise OSError(f"Write error 0x{address:04X}={clamped}: {r!r}") return True except Exception as e: logger.warning( "Modbus write_registers 0x%04X failed: %s", address, e ) self._client.close() self._client = None raise async def _write_register_locked( self, address: int, value: int, device_id: int ) -> bool: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: v = max(0, min(65535, int(value))) r = await self._client.write_register( address, v, device_id=int(device_id) ) if r.isError(): raise OSError(f"Write error 0x{address:04X}={v}: {r!r}") return True except Exception as e: logger.warning("Modbus write 0x%04X=%s failed: %s", address, value, e) self._client.close() self._client = None raise async def read_register(self, address: int, device_id: int = 1) -> int: async with _gateway_exclusive(self.host, self.port): async with self._lock: await self._ensure_connected() return await self._read_register_locked(address, device_id) async def read_register_signed(self, address: int, device_id: int = 1) -> int: raw = await self.read_register(address, device_id) return raw - 65536 if raw > 32767 else raw async def write_register(self, address: int, value: int, device_id: int = 1) -> bool: async with _gateway_exclusive(self.host, self.port): async with self._lock: await self._ensure_connected() return await self._write_register_locked(address, value, device_id) async def write_registers( self, address: int, values: list[int], device_id: int = 1 ) -> bool: """FC 0x10 – povinné pro Deye registry 60–499 (jeden i více registrů).""" async with _gateway_exclusive(self.host, self.port): async with self._lock: await self._ensure_connected() return await self._write_registers_locked(address, values, device_id) async def force_disconnect(self) -> None: """Uzavře socket pod lockem (např. před retry po chybě).""" async with _gateway_exclusive(self.host, self.port): async with self._lock: if self._client is not None: self._client.close() self._client = None @asynccontextmanager async def batch(self, device_id: int = 1) -> AsyncIterator[ModbusBatch]: """Drží lock pro více po sobě jdoucích operací (telemetrie vs. control na stejné bráně).""" async with _gateway_exclusive(self.host, self.port): async with self._lock: await self._ensure_connected() yield ModbusBatch(self, int(device_id)) def close(self) -> None: if self._client is not None: self._client.close() self._client = None _clients: dict[str, PersistentModbusClient] = {} _registry_lock = asyncio.Lock() async def get_modbus_client(host: str, port: int) -> PersistentModbusClient: """Jedno TCP spojení na převodník podle host:port (unit ID u jednotlivých volání).""" key = f"{host.strip()}:{int(port)}" async with _registry_lock: if key not in _clients: _clients[key] = PersistentModbusClient(host.strip(), port) return _clients[key]