"""TeltoCharge zápis: reg 15 (amps) VŽDY, watchdog 19/20 write-on-change. Export tick běží ~8x/hod (control_export :14,:29,:44,:59 + rolling replan */15 s exportem). **Reg 15 (amps to use) se zapisuje VŽDY** — TeltoCharge ho po výpadku komunikace sám přepíše na failsafe (reg 20) bez journal řádku, a kdyby byl write-on-change, EMS by tichý drift 0 → 8 A nikdy nezahlédlo (verify čte zpět jen `written`). **Reg 19/20 (watchdog config, EEPROM wear) zůstávají write-on-change** proti fn_modbus_device_state_map (nejnovější written/verified řádek per registr): zapíší se jednou po startu / po výpadku; sytí je i FC3 čtení telemetrie (60 s), periodické zápisy netřeba. """ import unittest from unittest.mock import AsyncMock, patch import services.control.modbus_journal as journal from services.control.modbus_journal import _drop_registers_matching_last_verified from services.control.models import ControlSetpoints from services.control.outputs import ( TELTO_REG_AMPS_TO_USE, TELTO_REG_COMM_TIMEOUT_S, TELTO_REG_FAILSAFE_CURRENT_A, TELTO_WATCHDOG_FAILSAFE_A, TELTO_WATCHDOG_TIMEOUT_S, _split_amps_and_watchdog, _telto_setpoint_registers, write_ev_arrival_hold, write_ev_setpoints, ) #: Stav zařízení po prvním úspěšném exportu s 0 A (klid, auto nepřipojené). _STEADY_STATE_0A = { TELTO_REG_AMPS_TO_USE: 0, TELTO_REG_COMM_TIMEOUT_S: TELTO_WATCHDOG_TIMEOUT_S, TELTO_REG_FAILSAFE_CURRENT_A: TELTO_WATCHDOG_FAILSAFE_A, } def _setpoints(ev1_a: int = 0) -> ControlSetpoints: return ControlSetpoints( battery_w=None, grid_export_limit=0, ev1_current_a=ev1_a, ev2_current_a=0, heat_pump_enable=False, grid_setpoint_w=0, ev1_power_w=0, ev2_power_w=0, ) class TeltoSetpointRegistersTests(unittest.TestCase): def test_triple_for_zero_amps(self) -> None: regs = _telto_setpoint_registers(0) self.assertEqual( [(r, v) for r, _, v in regs], [(15, 0), (19, 300), (20, 8)], ) def test_amps_below_six_coerced_to_zero_and_clamped_to_32(self) -> None: self.assertEqual(_telto_setpoint_registers(5)[0][2], 0) self.assertEqual(_telto_setpoint_registers(6)[0][2], 6) self.assertEqual(_telto_setpoint_registers(40)[0][2], 32) def test_per_charger_failsafe_and_timeout(self) -> None: regs = _telto_setpoint_registers(0, comm_timeout_s=120, failsafe_a=6) self.assertEqual([(r, v) for r, _, v in regs], [(15, 0), (19, 120), (20, 6)]) def test_failsafe_clamped_to_0_32(self) -> None: self.assertEqual(_telto_setpoint_registers(0, failsafe_a=99)[2][2], 32) self.assertEqual(_telto_setpoint_registers(0, failsafe_a=-5)[2][2], 0) def test_split_separates_amps_from_watchdog(self) -> None: amps, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0)) self.assertEqual([r for r, _, _ in amps], [15]) self.assertEqual([r for r, _, _ in watchdog], [19, 20]) class DropAgainstDeviceStateTests(unittest.TestCase): def test_watchdog_steady_state_drops_19_20(self) -> None: _, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0)) out, skipped = _drop_registers_matching_last_verified( watchdog, _STEADY_STATE_0A ) self.assertEqual(out, []) self.assertEqual(skipped, [19, 20]) def test_empty_state_after_outage_keeps_19_20(self) -> None: _, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0)) out, skipped = _drop_registers_matching_last_verified(watchdog, {}) self.assertEqual([r for r, _, _ in out], [19, 20]) self.assertEqual(skipped, []) class _FakeDB: """Jen řádky chargeru; journal funkce se patchují v modbus_journal.""" def __init__(self, failsafe_a: int = 8, comm_timeout_s: int = 300) -> None: self.row = { "asset_id": 7, "code": "ev-charger-1", "host": "172.16.1.16", "port": 502, "unit_id": 1, "watchdog_failsafe_a": failsafe_a, "watchdog_comm_timeout_s": comm_timeout_s, } async def fetch(self, query: str, *args: object) -> list[dict]: return [self.row] async def fetchrow(self, query: str, *args: object) -> dict: return self.row async def fetchval(self, query: str, *args: object) -> None: raise AssertionError(f"unexpected fetchval: {query}") class WriteEvSetpointsTests(unittest.IsolatedAsyncioTestCase): async def _run( self, device_state: dict[int, int], ev1_a: int, db: _FakeDB | None = None ) -> tuple[AsyncMock, AsyncMock]: create = AsyncMock(return_value=[1, 2, 3]) execute = AsyncMock(return_value=True) with ( patch.object( journal, "_fetch_device_state_registers", AsyncMock(return_value=device_state), ), patch.object(journal, "create_modbus_commands", create), patch.object(journal, "execute_modbus_commands", execute), ): await write_ev_setpoints(1, _setpoints(ev1_a), db or _FakeDB()) # type: ignore[arg-type] return create, execute async def test_steady_state_still_reasserts_reg_15(self) -> None: # Reg 15 se zapisuje VŽDY (re-asert proti tichému failsafe driftu), # i když je device-state mapa shodná. Watchdog 19/20 se přeskočí. create, execute = await self._run(_STEADY_STATE_0A, ev1_a=0) create.assert_awaited_once() registers = create.await_args.args[8] self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)]) execute.assert_awaited_once() async def test_plan_change_writes_only_amps(self) -> None: create, execute = await self._run(_STEADY_STATE_0A, ev1_a=16) create.assert_awaited_once() registers = create.await_args.args[8] self.assertEqual([(r, v) for r, _, v in registers], [(15, 16)]) execute.assert_awaited_once() async def test_after_outage_writes_amps_then_watchdog(self) -> None: create, execute = await self._run({}, ev1_a=0) registers = create.await_args.args[8] self.assertEqual([r for r, _, _ in registers], [15, 19, 20]) execute.assert_awaited_once() async def test_per_charger_failsafe_from_db(self) -> None: # Failsafe 6 A z DB → po výpadku se zapíše reg 20 = 6 (prázdná mapa). create, _ = await self._run( {}, ev1_a=0, db=_FakeDB(failsafe_a=6, comm_timeout_s=120) ) registers = create.await_args.args[8] self.assertEqual( [(r, v) for r, _, v in registers], [(15, 0), (19, 120), (20, 6)] ) class WriteEvArrivalHoldTests(unittest.IsolatedAsyncioTestCase): async def _run( self, device_state: dict[int, int] ) -> tuple[bool, AsyncMock, AsyncMock]: create = AsyncMock(return_value=[1]) execute = AsyncMock(return_value=True) with ( patch.object( journal, "_fetch_device_state_registers", AsyncMock(return_value=device_state), ), patch.object(journal, "create_modbus_commands", create), patch.object(journal, "execute_modbus_commands", execute), ): ok = await write_ev_arrival_hold(1, "ev-charger-1", _FakeDB()) # type: ignore[arg-type] return ok, create, execute async def test_hold_always_writes_reg_15_even_if_device_at_zero(self) -> None: # Tvrdé zastavení po píchnutí kabelu — reg 15 = 0 se zapíše VŽDY. ok, create, execute = await self._run(_STEADY_STATE_0A) self.assertTrue(ok) registers = create.await_args.args[8] self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)]) execute.assert_awaited_once() async def test_hold_writes_amps_and_watchdog_when_device_drifted(self) -> None: ok, create, execute = await self._run( { TELTO_REG_AMPS_TO_USE: 16, TELTO_REG_COMM_TIMEOUT_S: TELTO_WATCHDOG_TIMEOUT_S, TELTO_REG_FAILSAFE_CURRENT_A: TELTO_WATCHDOG_FAILSAFE_A, } ) self.assertTrue(ok) registers = create.await_args.args[8] # Reg 15 = 0 zapsán (i když device hlásí 16); 19/20 shodné → skip. self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)]) execute.assert_awaited_once() if __name__ == "__main__": unittest.main()