poll_ev_chargers četl placeholder ('available'/0 W) — EV spotřeba se nikdy
neodečítala z bazálu a session detekce nefungovala. Nyní: blok registrů 0-40
jedním FC 3 (oficiální protokol rev 0.5), parse_teltocharge_frame (status z
reg 6 → available/preparing/charging/..., výkon reg 38, energie session reg 39,
proud max L1-L3 reg 3-5). Při selhání čtení se vzorek NEzapisuje (fabrikovaný
available by falešně ukončoval session).
fn_telemetry_ev_charger_sample: + p_current_a (drop staré 7-arg signatury).
6 nových testů parseru; plná sada beze změny. Docs: modbus-registers-teltocharge.md.
Po deployi: home-01 ev-charger-1/2 začnou posílat reálná data; bazál se začne
čistit od EV (EMA 00:30); rebuild stats má smysl až po ~2 týdnech čisté historie.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
59 lines
2.1 KiB
Python
59 lines
2.1 KiB
Python
"""Parser rámce TeltoCharge (registry 0–40) a mapování stavů na EV session logiku."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import unittest
|
||
|
||
from services.telemetry_collector import (
|
||
TELTO_REG_BLOCK_COUNT,
|
||
TELTO_STATUS_MAP,
|
||
parse_teltocharge_frame,
|
||
)
|
||
|
||
|
||
def _frame(**over: int) -> list[int]:
|
||
regs = [0] * TELTO_REG_BLOCK_COUNT
|
||
regs[0], regs[1], regs[2] = 230, 231, 229 # napětí
|
||
regs[3], regs[4], regs[5] = 160, 158, 0 # proud ×10 A (16.0 A max)
|
||
regs[6] = 7 # A – bez EV
|
||
regs[38] = 0 # výkon W
|
||
regs[39] = 0 # session kWh ×100
|
||
for k, v in over.items():
|
||
regs[int(k.lstrip("r"))] = v
|
||
return regs
|
||
|
||
|
||
class TeltoChargeParseTests(unittest.TestCase):
|
||
def test_charging_frame(self) -> None:
|
||
f = parse_teltocharge_frame(_frame(r6=0, r38=10870, r39=523))
|
||
self.assertEqual(f["status"], "charging")
|
||
self.assertEqual(f["power_w"], 10870)
|
||
self.assertAlmostEqual(f["session_energy_kwh"], 5.23)
|
||
self.assertAlmostEqual(f["current_a"], 16.0)
|
||
|
||
def test_no_ev_is_available(self) -> None:
|
||
self.assertEqual(parse_teltocharge_frame(_frame(r6=7))["status"], "available")
|
||
|
||
def test_all_connected_states_are_not_available(self) -> None:
|
||
# detekce příjezdu (fn_ev_session_transition) stojí na ≠ 'available'
|
||
for raw, mapped in TELTO_STATUS_MAP.items():
|
||
if raw == 7:
|
||
continue
|
||
self.assertNotEqual(mapped, "available", f"EVSE status {raw}")
|
||
|
||
def test_unknown_raw_status(self) -> None:
|
||
self.assertEqual(parse_teltocharge_frame(_frame(r6=42))["status"], "unknown")
|
||
|
||
def test_error_bits_passthrough(self) -> None:
|
||
f = parse_teltocharge_frame(_frame(r6=8, r35=0b10000))
|
||
self.assertEqual(f["status"], "faulted")
|
||
self.assertEqual(f["error_bits"], 16)
|
||
|
||
def test_short_frame_raises(self) -> None:
|
||
with self.assertRaises(ValueError):
|
||
parse_teltocharge_frame([0] * 10)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
unittest.main()
|