Merge branch 'fix/ev-teltocharge-reg15-and-session-visibility' into dev
# Conflicts: # docs/planning-changelog.md
This commit is contained in:
@@ -18,33 +18,52 @@ logger = logging.getLogger(__name__)
|
||||
TELTO_REG_AMPS_TO_USE = 15 # 0 = stop, 6–32 A
|
||||
TELTO_REG_COMM_TIMEOUT_S = 19 # watchdog: bez komunikace → failsafe
|
||||
TELTO_REG_FAILSAFE_CURRENT_A = 20
|
||||
#: Výpadek EMS: po 5 min bez zápisu wallbox přejde na failsafe proud —
|
||||
#: auto se přes noc nabije i bez EMS (pomalu), místo aby stálo na 0 A.
|
||||
#: Výpadek EMS: po watchdog_comm_timeout_s bez komunikace wallbox přejde na
|
||||
#: failsafe proud — auto se přes noc nabije i bez EMS (pomalu), místo aby
|
||||
#: stálo na 0 A. Defaulty (fallback, když řádek chargeru nemá vlastní hodnoty).
|
||||
TELTO_WATCHDOG_TIMEOUT_S = 300
|
||||
TELTO_WATCHDOG_FAILSAFE_A = 8
|
||||
|
||||
|
||||
def _telto_setpoint_registers(current_a: int) -> list[tuple[int, str, int]]:
|
||||
def _telto_setpoint_registers(
|
||||
current_a: int,
|
||||
*,
|
||||
comm_timeout_s: int = TELTO_WATCHDOG_TIMEOUT_S,
|
||||
failsafe_a: int = TELTO_WATCHDOG_FAILSAFE_A,
|
||||
) -> list[tuple[int, str, int]]:
|
||||
"""Registry pro jeden export tick: limit proudu + watchdog konfigurace.
|
||||
|
||||
Write-on-change: volající VŽDY filtruje přes drop-unchanged proti
|
||||
fn_modbus_device_state_map (poslední written/verified per registr) —
|
||||
watchdog 19/20 se reálně zapíše jen po startu / po výpadku zařízení,
|
||||
amps (15) jen při změně plánu. Watchdog timer TeltoCharge sytí jakákoli
|
||||
validní Modbus komunikace (i FC3 čtení telemetrie každých 60 s), takže
|
||||
periodické zápisy k udržení spojení NEJSOU potřeba (oficiální protokol,
|
||||
docs/04-modules/modbus-registers-teltocharge.md).
|
||||
**Reg 15 (amps to use) NENÍ write-on-change** — viz `_assert_amps_register`.
|
||||
Je to volatilní řídicí registr: TeltoCharge ho po výpadku komunikace sám
|
||||
přepíše na failsafe (reg 20), aniž by o tom vznikl journal řádek. Kdyby se
|
||||
reg 15 dropoval proti journalu (poslední „0 verified"), EMS by tichý drift
|
||||
0 → 8 A NIKDY nezahlédlo (verify čte zpět jen `written` řádky) a nikdy ho
|
||||
neopravilo. Proto se reg 15 re-asertuje KAŽDÝ export tick (≤ 8×/hod) —
|
||||
EEPROM wear se týká jen konfiguračních 19/20, které write-on-change zůstávají.
|
||||
|
||||
Watchdog timer TeltoCharge sytí jakákoli validní Modbus komunikace (i FC3
|
||||
čtení telemetrie každých 60 s), takže periodické zápisy k udržení spojení
|
||||
NEJSOU potřeba; failsafe/timeout (19/20) per charger z DB.
|
||||
"""
|
||||
a = int(current_a)
|
||||
if a < 6:
|
||||
a = 0
|
||||
return [
|
||||
(TELTO_REG_AMPS_TO_USE, "telto_amps_to_use", min(a, 32)),
|
||||
(TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", TELTO_WATCHDOG_TIMEOUT_S),
|
||||
(TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", TELTO_WATCHDOG_FAILSAFE_A),
|
||||
(TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", int(comm_timeout_s)),
|
||||
(TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", max(0, min(int(failsafe_a), 32))),
|
||||
]
|
||||
|
||||
|
||||
def _split_amps_and_watchdog(
|
||||
registers: list[tuple[int, str, int]],
|
||||
) -> tuple[list[tuple[int, str, int]], list[tuple[int, str, int]]]:
|
||||
"""Rozdělí registry na (reg 15 = vždy zapsat) a (19/20 = write-on-change)."""
|
||||
amps = [r for r in registers if r[0] == TELTO_REG_AMPS_TO_USE]
|
||||
watchdog = [r for r in registers if r[0] != TELTO_REG_AMPS_TO_USE]
|
||||
return amps, watchdog
|
||||
|
||||
|
||||
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
|
||||
c = (charger_code or "").strip().lower()
|
||||
if c == "ev-charger-1":
|
||||
@@ -74,7 +93,8 @@ async def write_ev_setpoints(
|
||||
|
||||
rows = await db.fetch(
|
||||
"""
|
||||
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id
|
||||
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id,
|
||||
ec.watchdog_failsafe_a, ec.watchdog_comm_timeout_s
|
||||
FROM ems.asset_ev_charger ec
|
||||
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
|
||||
WHERE ec.site_id = $1
|
||||
@@ -97,16 +117,31 @@ async def write_ev_setpoints(
|
||||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||||
current_a = _current_limit_for_charger(code, setpoints)
|
||||
|
||||
registers = _telto_setpoint_registers(current_a)
|
||||
# Write-on-change: poslední written/verified stav (ne jen verified) —
|
||||
# zápis se nesmí opakovat každý tick, když verify čtení zaostává.
|
||||
registers = _telto_setpoint_registers(
|
||||
current_a,
|
||||
comm_timeout_s=int(
|
||||
row["watchdog_comm_timeout_s"]
|
||||
if row["watchdog_comm_timeout_s"] is not None
|
||||
else TELTO_WATCHDOG_TIMEOUT_S
|
||||
),
|
||||
failsafe_a=int(
|
||||
row["watchdog_failsafe_a"]
|
||||
if row["watchdog_failsafe_a"] is not None
|
||||
else TELTO_WATCHDOG_FAILSAFE_A
|
||||
),
|
||||
)
|
||||
amps_regs, watchdog_regs = _split_amps_and_watchdog(registers)
|
||||
# Reg 15 = vždy (re-asert proti tichému watchdog failsafe driftu na
|
||||
# zařízení, který nemá journal řádek). Reg 19/20 = write-on-change
|
||||
# proti fn_modbus_device_state_map (poslední written/verified stav).
|
||||
device_state = await _fetch_device_state_registers(
|
||||
site_id, asset_id, db, asset_type="ev_charger"
|
||||
)
|
||||
registers, skipped = _drop_registers_matching_last_verified(
|
||||
registers, device_state
|
||||
watchdog_regs, skipped = _drop_registers_matching_last_verified(
|
||||
watchdog_regs, device_state
|
||||
)
|
||||
if not registers:
|
||||
to_write = amps_regs + watchdog_regs
|
||||
if not to_write:
|
||||
logger.debug("EV setpoint [%s]: beze změny (%s A)", code, current_a)
|
||||
continue
|
||||
|
||||
@@ -119,7 +154,7 @@ async def write_ev_setpoints(
|
||||
host,
|
||||
port,
|
||||
unit_id,
|
||||
registers,
|
||||
to_write,
|
||||
db,
|
||||
)
|
||||
ok = await execute_modbus_commands(cmd_ids, db)
|
||||
@@ -128,7 +163,7 @@ async def write_ev_setpoints(
|
||||
"EV setpoint [%s]: %s A (regs %s%s) -> %s",
|
||||
code,
|
||||
current_a,
|
||||
[r for r, _, _ in registers],
|
||||
[r for r, _, _ in to_write],
|
||||
f", skip {skipped}" if skipped else "",
|
||||
"written" if ok else "FAILED",
|
||||
)
|
||||
@@ -155,7 +190,8 @@ async def write_ev_arrival_hold(
|
||||
|
||||
row = await db.fetchrow(
|
||||
"""
|
||||
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id
|
||||
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id,
|
||||
ec.watchdog_failsafe_a, ec.watchdog_comm_timeout_s
|
||||
FROM ems.asset_ev_charger ec
|
||||
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
|
||||
WHERE ec.site_id = $1
|
||||
@@ -170,20 +206,29 @@ async def write_ev_arrival_hold(
|
||||
if row is None:
|
||||
return False
|
||||
asset_id = int(row["asset_id"])
|
||||
registers = _telto_setpoint_registers(0)
|
||||
registers = _telto_setpoint_registers(
|
||||
0,
|
||||
comm_timeout_s=int(
|
||||
row["watchdog_comm_timeout_s"]
|
||||
if row["watchdog_comm_timeout_s"] is not None
|
||||
else TELTO_WATCHDOG_TIMEOUT_S
|
||||
),
|
||||
failsafe_a=int(
|
||||
row["watchdog_failsafe_a"]
|
||||
if row["watchdog_failsafe_a"] is not None
|
||||
else TELTO_WATCHDOG_FAILSAFE_A
|
||||
),
|
||||
)
|
||||
amps_regs, watchdog_regs = _split_amps_and_watchdog(registers)
|
||||
# Reg 15 = 0 A se zapíše VŽDY (tvrdé zastavení po píchnutí kabelu; wallbox
|
||||
# po připojení sám rozjíždí nabíjení defaultem). Reg 19/20 write-on-change.
|
||||
device_state = await _fetch_device_state_registers(
|
||||
site_id, asset_id, db, asset_type="ev_charger"
|
||||
)
|
||||
registers, skipped = _drop_registers_matching_last_verified(
|
||||
registers, device_state
|
||||
watchdog_regs, skipped = _drop_registers_matching_last_verified(
|
||||
watchdog_regs, device_state
|
||||
)
|
||||
if not registers:
|
||||
logger.info(
|
||||
"EV arrival hold [%s]: 0 A už na zařízení (skip %s)",
|
||||
charger_code,
|
||||
skipped,
|
||||
)
|
||||
return True
|
||||
to_write = amps_regs + watchdog_regs
|
||||
cmd_ids = await create_modbus_commands(
|
||||
site_id,
|
||||
None,
|
||||
@@ -193,14 +238,14 @@ async def write_ev_arrival_hold(
|
||||
str(row["host"]),
|
||||
int(row["port"] or 502),
|
||||
int(row["unit_id"] if row["unit_id"] is not None else 1),
|
||||
registers,
|
||||
to_write,
|
||||
db,
|
||||
)
|
||||
ok = await execute_modbus_commands(cmd_ids, db)
|
||||
logger.info(
|
||||
"EV arrival hold [%s]: 0 A (regs %s%s) %s",
|
||||
charger_code,
|
||||
[r for r, _, _ in registers],
|
||||
[r for r, _, _ in to_write],
|
||||
f", skip {skipped}" if skipped else "",
|
||||
"written" if ok else "FAILED",
|
||||
)
|
||||
|
||||
@@ -31,12 +31,15 @@ def _ev_session_from_json(obj: object) -> Optional[SimpleNamespace]:
|
||||
obj = json.loads(obj)
|
||||
if not isinstance(obj, dict):
|
||||
return None
|
||||
# target_deadline SMÍ být None: oportunistická session (auto nad targetem,
|
||||
# nebo bez nastaveného cíle) zůstává v plánu kvůli headroomu i jako známá
|
||||
# zátěž. Tvrdý deadline constraint se aplikuje jen při energy_needed_wh > 0
|
||||
# (a needed > 0 nastane jen s deadlinem). Dřív se taková session zahazovala
|
||||
# (None) a plánovač pak neviděl zátěž auta — bug 2026-06-13.
|
||||
td = _parse_json_dt(obj.get("target_deadline"))
|
||||
if td is None:
|
||||
return None
|
||||
return SimpleNamespace(
|
||||
target_deadline=td,
|
||||
energy_needed_wh=float(obj["energy_needed_wh"]),
|
||||
energy_needed_wh=float(obj.get("energy_needed_wh") or 0.0),
|
||||
headroom_wh=float(obj.get("headroom_wh") or 0.0),
|
||||
opportunistic_value_czk_kwh=float(obj.get("opportunistic_value_czk_kwh") or 0.0),
|
||||
)
|
||||
|
||||
66
backend/tests/test_ev_session_parse.py
Normal file
66
backend/tests/test_ev_session_parse.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Parser EV session z fn_planning_site_context (_ev_session_from_json).
|
||||
|
||||
Bug 2026-06-13: session BEZ deadline (auto nad targetem / bez cíle) se v
|
||||
parseru zahazovala (None), takže plánovač neviděl zátěž auta ani oportunismus.
|
||||
Oprava: session bez deadline zůstává objektem s energy_needed_wh=0 a headroom.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from services.planning.db_io import _ev_session_from_json
|
||||
|
||||
|
||||
class EvSessionParseTests(unittest.TestCase):
|
||||
def test_none_and_empty_return_none(self) -> None:
|
||||
self.assertIsNone(_ev_session_from_json(None))
|
||||
self.assertIsNone(_ev_session_from_json([]))
|
||||
self.assertIsNone(_ev_session_from_json(123))
|
||||
|
||||
def test_session_without_deadline_kept_for_opportunism(self) -> None:
|
||||
sess = _ev_session_from_json(
|
||||
{
|
||||
"target_deadline": None,
|
||||
"energy_needed_wh": 0,
|
||||
"headroom_wh": 18000.0,
|
||||
"opportunistic_value_czk_kwh": 1.0,
|
||||
}
|
||||
)
|
||||
self.assertIsNotNone(sess)
|
||||
assert sess is not None
|
||||
self.assertIsNone(sess.target_deadline)
|
||||
self.assertEqual(sess.energy_needed_wh, 0.0)
|
||||
self.assertEqual(sess.headroom_wh, 18000.0)
|
||||
self.assertEqual(sess.opportunistic_value_czk_kwh, 1.0)
|
||||
|
||||
def test_session_with_deadline_and_need(self) -> None:
|
||||
sess = _ev_session_from_json(
|
||||
{
|
||||
"target_deadline": "2026-06-14T05:00:00+00:00",
|
||||
"energy_needed_wh": 12000.0,
|
||||
"headroom_wh": 6000.0,
|
||||
"opportunistic_value_czk_kwh": 1.0,
|
||||
}
|
||||
)
|
||||
assert sess is not None
|
||||
self.assertIsNotNone(sess.target_deadline)
|
||||
self.assertEqual(sess.energy_needed_wh, 12000.0)
|
||||
|
||||
def test_missing_needed_defaults_zero(self) -> None:
|
||||
sess = _ev_session_from_json(
|
||||
{"target_deadline": None, "headroom_wh": 1000.0}
|
||||
)
|
||||
assert sess is not None
|
||||
self.assertEqual(sess.energy_needed_wh, 0.0)
|
||||
self.assertEqual(sess.opportunistic_value_czk_kwh, 0.0)
|
||||
|
||||
def test_json_string_payload(self) -> None:
|
||||
sess = _ev_session_from_json(
|
||||
'{"target_deadline": null, "energy_needed_wh": 0, '
|
||||
'"headroom_wh": 5000, "opportunistic_value_czk_kwh": 1.0}'
|
||||
)
|
||||
assert sess is not None
|
||||
self.assertEqual(sess.headroom_wh, 5000.0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,11 +1,13 @@
|
||||
"""Write-on-change pro TeltoCharge: zápis JEN při skutečné změně hodnoty.
|
||||
"""TeltoCharge zápis: reg 15 (amps) VŽDY, watchdog 19/20 write-on-change.
|
||||
|
||||
Regrese na opakované zápisy do wallboxu (EEPROM wear): export tick běží
|
||||
~8x/hod (control_export :14,:29,:44,:59 + rolling replan */15 s exportem),
|
||||
ale reg 15/19/20 se smí zapsat jen při změně proti poslednímu known stavu
|
||||
zařízení (fn_modbus_device_state_map: nejnovější written/verified řádek
|
||||
journalu per registr). Watchdog (19/20) se zapíše jednou po startu / po
|
||||
výpadku; sytí ho i FC3 čtení telemetrie (60 s), periodické zápisy netřeba.
|
||||
Export tick běží ~8x/hod (control_export :14,:29,:44,:59 + rolling replan
|
||||
*/15 s exportem). **Reg 15 (amps to use) se zapisuje VŽDY** — TeltoCharge ho
|
||||
po výpadku komunikace sám přepíše na failsafe (reg 20) bez journal řádku, a
|
||||
kdyby byl write-on-change, EMS by tichý drift 0 → 8 A nikdy nezahlédlo
|
||||
(verify čte zpět jen `written`). **Reg 19/20 (watchdog config, EEPROM wear)
|
||||
zůstávají write-on-change** proti fn_modbus_device_state_map (nejnovější
|
||||
written/verified řádek per registr): zapíší se jednou po startu / po výpadku;
|
||||
sytí je i FC3 čtení telemetrie (60 s), periodické zápisy netřeba.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
@@ -20,6 +22,7 @@ from services.control.outputs import (
|
||||
TELTO_REG_FAILSAFE_CURRENT_A,
|
||||
TELTO_WATCHDOG_FAILSAFE_A,
|
||||
TELTO_WATCHDOG_TIMEOUT_S,
|
||||
_split_amps_and_watchdog,
|
||||
_telto_setpoint_registers,
|
||||
write_ev_arrival_hold,
|
||||
write_ev_setpoints,
|
||||
@@ -59,40 +62,48 @@ class TeltoSetpointRegistersTests(unittest.TestCase):
|
||||
self.assertEqual(_telto_setpoint_registers(6)[0][2], 6)
|
||||
self.assertEqual(_telto_setpoint_registers(40)[0][2], 32)
|
||||
|
||||
def test_per_charger_failsafe_and_timeout(self) -> None:
|
||||
regs = _telto_setpoint_registers(0, comm_timeout_s=120, failsafe_a=6)
|
||||
self.assertEqual([(r, v) for r, _, v in regs], [(15, 0), (19, 120), (20, 6)])
|
||||
|
||||
def test_failsafe_clamped_to_0_32(self) -> None:
|
||||
self.assertEqual(_telto_setpoint_registers(0, failsafe_a=99)[2][2], 32)
|
||||
self.assertEqual(_telto_setpoint_registers(0, failsafe_a=-5)[2][2], 0)
|
||||
|
||||
def test_split_separates_amps_from_watchdog(self) -> None:
|
||||
amps, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0))
|
||||
self.assertEqual([r for r, _, _ in amps], [15])
|
||||
self.assertEqual([r for r, _, _ in watchdog], [19, 20])
|
||||
|
||||
|
||||
class DropAgainstDeviceStateTests(unittest.TestCase):
|
||||
def test_steady_state_drops_everything(self) -> None:
|
||||
def test_watchdog_steady_state_drops_19_20(self) -> None:
|
||||
_, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0))
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(0), _STEADY_STATE_0A
|
||||
watchdog, _STEADY_STATE_0A
|
||||
)
|
||||
self.assertEqual(out, [])
|
||||
self.assertEqual(skipped, [15, 19, 20])
|
||||
|
||||
def test_amps_change_writes_only_reg_15(self) -> None:
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(16), _STEADY_STATE_0A
|
||||
)
|
||||
self.assertEqual([(r, v) for r, _, v in out], [(15, 16)])
|
||||
self.assertEqual(skipped, [19, 20])
|
||||
|
||||
def test_empty_state_after_outage_writes_full_triple(self) -> None:
|
||||
out, skipped = _drop_registers_matching_last_verified(
|
||||
_telto_setpoint_registers(0), {}
|
||||
)
|
||||
self.assertEqual([r for r, _, _ in out], [15, 19, 20])
|
||||
def test_empty_state_after_outage_keeps_19_20(self) -> None:
|
||||
_, watchdog = _split_amps_and_watchdog(_telto_setpoint_registers(0))
|
||||
out, skipped = _drop_registers_matching_last_verified(watchdog, {})
|
||||
self.assertEqual([r for r, _, _ in out], [19, 20])
|
||||
self.assertEqual(skipped, [])
|
||||
|
||||
|
||||
class _FakeDB:
|
||||
"""Jen řádky chargeru; journal funkce se patchují v modbus_journal."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, failsafe_a: int = 8, comm_timeout_s: int = 300) -> None:
|
||||
self.row = {
|
||||
"asset_id": 7,
|
||||
"code": "ev-charger-1",
|
||||
"host": "172.16.1.16",
|
||||
"port": 502,
|
||||
"unit_id": 1,
|
||||
"watchdog_failsafe_a": failsafe_a,
|
||||
"watchdog_comm_timeout_s": comm_timeout_s,
|
||||
}
|
||||
|
||||
async def fetch(self, query: str, *args: object) -> list[dict]:
|
||||
@@ -105,9 +116,9 @@ class _FakeDB:
|
||||
raise AssertionError(f"unexpected fetchval: {query}")
|
||||
|
||||
|
||||
class WriteEvSetpointsOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
class WriteEvSetpointsTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def _run(
|
||||
self, device_state: dict[int, int], ev1_a: int
|
||||
self, device_state: dict[int, int], ev1_a: int, db: _FakeDB | None = None
|
||||
) -> tuple[AsyncMock, AsyncMock]:
|
||||
create = AsyncMock(return_value=[1, 2, 3])
|
||||
execute = AsyncMock(return_value=True)
|
||||
@@ -120,13 +131,17 @@ class WriteEvSetpointsOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
patch.object(journal, "create_modbus_commands", create),
|
||||
patch.object(journal, "execute_modbus_commands", execute),
|
||||
):
|
||||
await write_ev_setpoints(1, _setpoints(ev1_a), _FakeDB()) # type: ignore[arg-type]
|
||||
await write_ev_setpoints(1, _setpoints(ev1_a), db or _FakeDB()) # type: ignore[arg-type]
|
||||
return create, execute
|
||||
|
||||
async def test_steady_state_no_write_at_all(self) -> None:
|
||||
async def test_steady_state_still_reasserts_reg_15(self) -> None:
|
||||
# Reg 15 se zapisuje VŽDY (re-asert proti tichému failsafe driftu),
|
||||
# i když je device-state mapa shodná. Watchdog 19/20 se přeskočí.
|
||||
create, execute = await self._run(_STEADY_STATE_0A, ev1_a=0)
|
||||
create.assert_not_awaited()
|
||||
execute.assert_not_awaited()
|
||||
create.assert_awaited_once()
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
async def test_plan_change_writes_only_amps(self) -> None:
|
||||
create, execute = await self._run(_STEADY_STATE_0A, ev1_a=16)
|
||||
@@ -135,14 +150,24 @@ class WriteEvSetpointsOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 16)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
async def test_after_outage_writes_full_triple(self) -> None:
|
||||
async def test_after_outage_writes_amps_then_watchdog(self) -> None:
|
||||
create, execute = await self._run({}, ev1_a=0)
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([r for r, _, _ in registers], [15, 19, 20])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
async def test_per_charger_failsafe_from_db(self) -> None:
|
||||
# Failsafe 6 A z DB → po výpadku se zapíše reg 20 = 6 (prázdná mapa).
|
||||
create, _ = await self._run(
|
||||
{}, ev1_a=0, db=_FakeDB(failsafe_a=6, comm_timeout_s=120)
|
||||
)
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual(
|
||||
[(r, v) for r, _, v in registers], [(15, 0), (19, 120), (20, 6)]
|
||||
)
|
||||
|
||||
class WriteEvArrivalHoldOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
class WriteEvArrivalHoldTests(unittest.IsolatedAsyncioTestCase):
|
||||
async def _run(
|
||||
self, device_state: dict[int, int]
|
||||
) -> tuple[bool, AsyncMock, AsyncMock]:
|
||||
@@ -160,13 +185,15 @@ class WriteEvArrivalHoldOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
ok = await write_ev_arrival_hold(1, "ev-charger-1", _FakeDB()) # type: ignore[arg-type]
|
||||
return ok, create, execute
|
||||
|
||||
async def test_hold_skips_when_device_already_at_zero(self) -> None:
|
||||
async def test_hold_always_writes_reg_15_even_if_device_at_zero(self) -> None:
|
||||
# Tvrdé zastavení po píchnutí kabelu — reg 15 = 0 se zapíše VŽDY.
|
||||
ok, create, execute = await self._run(_STEADY_STATE_0A)
|
||||
self.assertTrue(ok)
|
||||
create.assert_not_awaited()
|
||||
execute.assert_not_awaited()
|
||||
registers = create.await_args.args[8]
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
async def test_hold_writes_only_amps_when_watchdog_already_set(self) -> None:
|
||||
async def test_hold_writes_amps_and_watchdog_when_device_drifted(self) -> None:
|
||||
ok, create, execute = await self._run(
|
||||
{
|
||||
TELTO_REG_AMPS_TO_USE: 16,
|
||||
@@ -176,6 +203,7 @@ class WriteEvArrivalHoldOnChangeTests(unittest.IsolatedAsyncioTestCase):
|
||||
)
|
||||
self.assertTrue(ok)
|
||||
registers = create.await_args.args[8]
|
||||
# Reg 15 = 0 zapsán (i když device hlásí 16); 19/20 shodné → skip.
|
||||
self.assertEqual([(r, v) for r, _, v in registers], [(15, 0)])
|
||||
execute.assert_awaited_once()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user