TeltoCharge write-on-change: zápis jen při změně hodnoty (EEPROM wear)
Wallbox dostával zápisy 15/19/20 každý export tick (~8x/hod: control_export :14,:29,:44,:59 + rolling replan */15 s exportem), protože drop-unchanged stál na fn_modbus_last_verified_map — dokud verify čtení nedoběhlo/selhalo, mapa byla prázdná a celá trojice se psala pořád dokola. write_ev_arrival_hold navíc psal trojici nepodmíněně při každém píchnutí kabelu (docstring lhal). - nová ems.fn_modbus_device_state_map (R__100): nejnovější řádek journalu per registr, hodnota jen pro written/verified; failed/mismatch => registr chybí => po výpadku se konfigurace obnoví jedním zápisem - write_ev_setpoints + write_ev_arrival_hold filtrují přes tuto mapu: reg 15 jen při změně plánu, watchdog 19/20 jednou po startu/po výpadku - verify job EV chargery ověřuje už dnes (fn_modbus_written_command_ids bez filtru asset_type); registry 15/19/20 jsou dle oficiálního protokolu R/W - watchdog Telto sytí jakákoli validní komunikace vč. FC3 čtení telemetrie (60 s << 300 s) — periodické zápisy k udržení spojení nejsou potřeba, failsafe 8 A nastane jen při skutečném výpadku EMS - testy: tests/test_ev_write_on_change.py (drop, setpoints, arrival hold) - docs: modbus-registers-teltocharge.md (sekce Zápis už není "NEimplementováno", R/W tabulka, watchdog sémantika), modbus-command-journal.md (sekce EV wallbox), CLAUDE.md (fn_modbus_device_state_map) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
184
backend/tests/test_ev_write_on_change.py
Normal file
184
backend/tests/test_ev_write_on_change.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Write-on-change pro TeltoCharge: zápis JEN při skutečné změně hodnoty.
|
||||
|
||||
Regrese na opakované zápisy do wallboxu (EEPROM wear): export tick běží
|
||||
~8x/hod (control_export :14,:29,:44,:59 + rolling replan */15 s exportem),
|
||||
ale reg 15/19/20 se smí zapsat jen při změně proti poslednímu known stavu
|
||||
zařízení (fn_modbus_device_state_map: nejnovější written/verified řádek
|
||||
journalu per registr). Watchdog (19/20) se zapíše jednou po startu / po
|
||||
výpadku; sytí ho i FC3 čtení telemetrie (60 s), periodické zápisy netřeba.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import services.control.modbus_journal as journal
|
||||
from services.control.modbus_journal import _drop_registers_matching_last_verified
|
||||
from services.control.models import ControlSetpoints
|
||||
from services.control.outputs import (
|
||||
TELTO_REG_AMPS_TO_USE,
|
||||
TELTO_REG_COMM_TIMEOUT_S,
|
||||
TELTO_REG_FAILSAFE_CURRENT_A,
|
||||
TELTO_WATCHDOG_FAILSAFE_A,
|
||||
TELTO_WATCHDOG_TIMEOUT_S,
|
||||
_telto_setpoint_registers,
|
||||
write_ev_arrival_hold,
|
||||
write_ev_setpoints,
|
||||
)
|
||||
|
||||
#: Stav zařízení po prvním úspěšném exportu s 0 A (klid, auto nepřipojené).
|
||||
_STEADY_STATE_0A = {
|
||||
TELTO_REG_AMPS_TO_USE: 0,
|
||||
TELTO_REG_COMM_TIMEOUT_S: TELTO_WATCHDOG_TIMEOUT_S,
|
||||
TELTO_REG_FAILSAFE_CURRENT_A: TELTO_WATCHDOG_FAILSAFE_A,
|
||||
}
|
||||
|
||||
|
||||
def _setpoints(ev1_a: int = 0) -> ControlSetpoints:
|
||||
return ControlSetpoints(
|
||||
battery_w=None,
|
||||
grid_export_limit=0,
|
||||
ev1_current_a=ev1_a,
|
||||
ev2_current_a=0,
|
||||
heat_pump_enable=False,
|
||||
grid_setpoint_w=0,
|
||||
ev1_power_w=0,
|
||||
ev2_power_w=0,
|
||||
)
|
||||
|
||||
|
||||
class TeltoSetpointRegistersTests(unittest.TestCase):
|
||||
def test_triple_for_zero_amps(self) -> None:
|
||||
regs = _telto_setpoint_registers(0)
|
||||
self.assertEqual(
|
||||
[(r, v) for r, _, v in regs],
|
||||
[(15, 0), (19, 300), (20, 8)],
|
||||
)
|
||||
|
||||
def test_amps_below_six_coerced_to_zero_and_clamped_to_32(self) -> None:
|
||||
self.assertEqual(_telto_setpoint_registers(5)[0][2], 0)
|
||||
self.assertEqual(_telto_setpoint_registers(6)[0][2], 6)
|
||||
self.assertEqual(_telto_setpoint_registers(40)[0][2], 32)
|
||||
|
||||
|
||||
class DropAgainstDeviceStateTests(unittest.TestCase):
|
||||
def test_steady_state_drops_everything(self) -> None:
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(0), _STEADY_STATE_0A
|
||||
)
|
||||
self.assertEqual(out, [])
|
||||
self.assertEqual(skipped, [15, 19, 20])
|
||||
|
||||
def test_amps_change_writes_only_reg_15(self) -> None:
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(16), _STEADY_STATE_0A
|
||||
)
|
||||
self.assertEqual([(r, v) for r, _, v in out], [(15, 16)])
|
||||
self.assertEqual(skipped, [19, 20])
|
||||
|
||||
def test_empty_state_after_outage_writes_full_triple(self) -> None:
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(0), {}
|
||||
)
|
||||
self.assertEqual([r for r, _, _ in out], [15, 19, 20])
|
||||
self.assertEqual(skipped, [])
|
||||
|
||||
|
||||
class _FakeDB:
|
||||
"""Jen řádky chargeru; journal funkce se patchují v modbus_journal."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.row = {
|
||||
"asset_id": 7,
|
||||
"code": "ev-charger-1",
|
||||
"host": "172.16.1.16",
|
||||
"port": 502,
|
||||
"unit_id": 1,
|
||||
}
|
||||
|
||||
async def fetch(self, query: str, *args: object) -> list[dict]:
|
||||
return [self.row]
|
||||
|
||||
async def fetchrow(self, query: str, *args: object) -> dict:
|
||||
return self.row
|
||||
|
||||
async def fetchval(self, query: str, *args: object) -> None:
|
||||
raise AssertionError(f"unexpected fetchval: {query}")
|
||||
|
||||
|
||||
class WriteEvSetpointsOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def _run(
|
||||
self, device_state: dict[int, int], ev1_a: int
|
||||
) -> tuple[AsyncMock, AsyncMock]:
|
||||
create = AsyncMock(return_value=[1, 2, 3])
|
||||
execute = AsyncMock(return_value=True)
|
||||
with (
|
||||
patch.object(
|
||||
journal,
|
||||
"_fetch_device_state_registers",
|
||||
AsyncMock(return_value=device_state),
|
||||
),
|
||||
patch.object(journal, "create_modbus_commands", create),
|
||||
patch.object(journal, "execute_modbus_commands", execute),
|
||||
):
|
||||
await write_ev_setpoints(1, _setpoints(ev1_a), _FakeDB()) # type: ignore[arg-type]
|
||||
return create, execute
|
||||
|
||||
async def test_steady_state_no_write_at_all(self) -> None:
|
||||
create, execute = await self._run(_STEADY_STATE_0A, ev1_a=0)
|
||||
create.assert_not_awaited()
|
||||
execute.assert_not_awaited()
|
||||
|
||||
async def test_plan_change_writes_only_amps(self) -> None:
|
||||
create, execute = await self._run(_STEADY_STATE_0A, ev1_a=16)
|
||||
create.assert_awaited_once()
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 16)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
async def test_after_outage_writes_full_triple(self) -> None:
|
||||
create, execute = await self._run({}, ev1_a=0)
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([r for r, _, _ in registers], [15, 19, 20])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
|
||||
class WriteEvArrivalHoldOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def _run(
|
||||
self, device_state: dict[int, int]
|
||||
) -> tuple[bool, AsyncMock, AsyncMock]:
|
||||
create = AsyncMock(return_value=[1])
|
||||
execute = AsyncMock(return_value=True)
|
||||
with (
|
||||
patch.object(
|
||||
journal,
|
||||
"_fetch_device_state_registers",
|
||||
AsyncMock(return_value=device_state),
|
||||
),
|
||||
patch.object(journal, "create_modbus_commands", create),
|
||||
patch.object(journal, "execute_modbus_commands", execute),
|
||||
):
|
||||
ok = await write_ev_arrival_hold(1, "ev-charger-1", _FakeDB()) # type: ignore[arg-type]
|
||||
return ok, create, execute
|
||||
|
||||
async def test_hold_skips_when_device_already_at_zero(self) -> None:
|
||||
ok, create, execute = await self._run(_STEADY_STATE_0A)
|
||||
self.assertTrue(ok)
|
||||
create.assert_not_awaited()
|
||||
execute.assert_not_awaited()
|
||||
|
||||
async def test_hold_writes_only_amps_when_watchdog_already_set(self) -> None:
|
||||
ok, create, execute = await self._run(
|
||||
{
|
||||
TELTO_REG_AMPS_TO_USE: 16,
|
||||
TELTO_REG_COMM_TIMEOUT_S: TELTO_WATCHDOG_TIMEOUT_S,
|
||||
TELTO_REG_FAILSAFE_CURRENT_A: TELTO_WATCHDOG_FAILSAFE_A,
|
||||
}
|
||||
)
|
||||
self.assertTrue(ok)
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user