Files
ems/backend/services/control/outputs.py
Dusan Vojacek 7decfebdbd TeltoCharge write-on-change: zápis jen při změně hodnoty (EEPROM wear)
Wallbox dostával zápisy 15/19/20 každý export tick (~8x/hod: control_export
:14,:29,:44,:59 + rolling replan */15 s exportem), protože drop-unchanged
stál na fn_modbus_last_verified_map — dokud verify čtení nedoběhlo/selhalo,
mapa byla prázdná a celá trojice se psala pořád dokola. write_ev_arrival_hold
navíc psal trojici nepodmíněně při každém píchnutí kabelu (docstring lhal).

- nová ems.fn_modbus_device_state_map (R__100): nejnovější řádek journalu
  per registr, hodnota jen pro written/verified; failed/mismatch => registr
  chybí => po výpadku se konfigurace obnoví jedním zápisem
- write_ev_setpoints + write_ev_arrival_hold filtrují přes tuto mapu:
  reg 15 jen při změně plánu, watchdog 19/20 jednou po startu/po výpadku
- verify job EV chargery ověřuje už dnes (fn_modbus_written_command_ids bez
  filtru asset_type); registry 15/19/20 jsou dle oficiálního protokolu R/W
- watchdog Telto sytí jakákoli validní komunikace vč. FC3 čtení telemetrie
  (60 s << 300 s) — periodické zápisy k udržení spojení nejsou potřeba,
  failsafe 8 A nastane jen při skutečném výpadku EMS
- testy: tests/test_ev_write_on_change.py (drop, setpoints, arrival hold)
- docs: modbus-registers-teltocharge.md (sekce Zápis už není "NEimplementováno",
  R/W tabulka, watchdog sémantika), modbus-command-journal.md (sekce EV
  wallbox), CLAUDE.md (fn_modbus_device_state_map)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 22:21:59 +02:00

296 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Non-Deye output writers for control export."""
from __future__ import annotations
import logging
import os
import asyncpg
import httpx
from app.config import get_settings
from services.control.models import ControlSetpoints, OperatingModeInfo
logger = logging.getLogger(__name__)
# Teltonika TeltoCharge zápisové registry (oficiální protokol rev 0.5;
# docs/04-modules/modbus-registers-teltocharge.md). FC 16 přes journal.
TELTO_REG_AMPS_TO_USE = 15 # 0 = stop, 632 A
TELTO_REG_COMM_TIMEOUT_S = 19 # watchdog: bez komunikace → failsafe
TELTO_REG_FAILSAFE_CURRENT_A = 20
#: Výpadek EMS: po 5 min bez zápisu wallbox přejde na failsafe proud —
#: auto se přes noc nabije i bez EMS (pomalu), místo aby stálo na 0 A.
TELTO_WATCHDOG_TIMEOUT_S = 300
TELTO_WATCHDOG_FAILSAFE_A = 8
def _telto_setpoint_registers(current_a: int) -> list[tuple[int, str, int]]:
"""Registry pro jeden export tick: limit proudu + watchdog konfigurace.
Write-on-change: volající VŽDY filtruje přes drop-unchanged proti
fn_modbus_device_state_map (poslední written/verified per registr) —
watchdog 19/20 se reálně zapíše jen po startu / po výpadku zařízení,
amps (15) jen při změně plánu. Watchdog timer TeltoCharge sytí jakákoli
validní Modbus komunikace (i FC3 čtení telemetrie každých 60 s), takže
periodické zápisy k udržení spojení NEJSOU potřeba (oficiální protokol,
docs/04-modules/modbus-registers-teltocharge.md).
"""
a = int(current_a)
if a < 6:
a = 0
return [
(TELTO_REG_AMPS_TO_USE, "telto_amps_to_use", min(a, 32)),
(TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", TELTO_WATCHDOG_TIMEOUT_S),
(TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", TELTO_WATCHDOG_FAILSAFE_A),
]
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
c = (charger_code or "").strip().lower()
if c == "ev-charger-1":
a = sp.ev1_current_a
elif c == "ev-charger-2":
a = sp.ev2_current_a
elif c.endswith("-1") or c == "ev1":
a = sp.ev1_current_a
elif c.endswith("-2") or c == "ev2":
a = sp.ev2_current_a
else:
a = 0
if a < 6:
a = 0
return a
async def write_ev_setpoints(
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
) -> str:
from services.control.modbus_journal import (
_drop_registers_matching_last_verified,
_fetch_device_state_registers,
create_modbus_commands,
execute_modbus_commands,
)
rows = await db.fetch(
"""
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id
FROM ems.asset_ev_charger ec
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
WHERE ec.site_id = $1
AND ec.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
ORDER BY ec.code
""",
site_id,
)
if not rows:
return "OK EV: no schedulable chargers"
written = 0
for row in rows:
code = row["code"]
asset_id = int(row["asset_id"])
host = str(row["host"])
port = int(row["port"] or 502)
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
current_a = _current_limit_for_charger(code, setpoints)
registers = _telto_setpoint_registers(current_a)
# Write-on-change: poslední written/verified stav (ne jen verified) —
# zápis se nesmí opakovat každý tick, když verify čtení zaostává.
device_state = await _fetch_device_state_registers(
site_id, asset_id, db, asset_type="ev_charger"
)
registers, skipped = _drop_registers_matching_last_verified(
registers, device_state
)
if not registers:
logger.debug("EV setpoint [%s]: beze změny (%s A)", code, current_a)
continue
cmd_ids = await create_modbus_commands(
site_id,
None,
"ev_charger",
asset_id,
code,
host,
port,
unit_id,
registers,
db,
)
ok = await execute_modbus_commands(cmd_ids, db)
written += 1
logger.info(
"EV setpoint [%s]: %s A (regs %s%s) -> %s",
code,
current_a,
[r for r, _, _ in registers],
f", skip {skipped}" if skipped else "",
"written" if ok else "FAILED",
)
return f"OK EV: {written}/{len(rows)} charger(s) written"
async def write_ev_arrival_hold(
site_id: int, charger_code: str, db: asyncpg.Connection
) -> bool:
"""Okamžitě po DETEKCI příjezdu zapsat 0 A na daný wallbox (přes journal).
TeltoCharge po připojení kabelu sám rozjede nabíjení svým defaultem —
nabíjet smí až PLÁN (replan + export běží hned poté v _on_ev_arrival,
takže držení trvá sekundy až ~1 min). Write-on-change: registry shodné
s posledním written/verified stavem (typicky watchdog 19/20, často
i 15=0) se přeskočí — žádný zbytečný zápis při každém píchnutí kabelu.
"""
from services.control.modbus_journal import (
_drop_registers_matching_last_verified,
_fetch_device_state_registers,
create_modbus_commands,
execute_modbus_commands,
)
row = await db.fetchrow(
"""
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id
FROM ems.asset_ev_charger ec
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
WHERE ec.site_id = $1
AND ec.code = $2
AND ec.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
charger_code,
)
if row is None:
return False
asset_id = int(row["asset_id"])
registers = _telto_setpoint_registers(0)
device_state = await _fetch_device_state_registers(
site_id, asset_id, db, asset_type="ev_charger"
)
registers, skipped = _drop_registers_matching_last_verified(
registers, device_state
)
if not registers:
logger.info(
"EV arrival hold [%s]: 0 A už na zařízení (skip %s)",
charger_code,
skipped,
)
return True
cmd_ids = await create_modbus_commands(
site_id,
None,
"ev_charger",
asset_id,
str(row["code"]),
str(row["host"]),
int(row["port"] or 502),
int(row["unit_id"] if row["unit_id"] is not None else 1),
registers,
db,
)
ok = await execute_modbus_commands(cmd_ids, db)
logger.info(
"EV arrival hold [%s]: 0 A (regs %s%s) %s",
charger_code,
[r for r, _, _ in registers],
f", skip {skipped}" if skipped else "",
"written" if ok else "FAILED",
)
return bool(ok)
async def write_heat_pump_setpoint(
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
) -> str:
rows = await db.fetch(
"""
SELECT hp.code, se.host, se.port, se.unit_id
FROM ems.asset_heat_pump hp
JOIN ems.site_endpoint se ON se.id = hp.endpoint_id
WHERE hp.site_id = $1
AND hp.schedulable = true
AND se.enabled = true
AND se.endpoint_type = 'modbus_tcp'
""",
site_id,
)
if not rows:
return "OK heat pump: no schedulable unit"
for row in rows:
logger.info(
"HP setpoint [%s]: enable=%s (TODO: Modbus registers)",
row["code"],
setpoints.heat_pump_enable,
)
return "OK heat pump: logged (Modbus TODO)"
async def send_loxone_setpoints(
site_id: int,
setpoints: ControlSetpoints,
mode: OperatingModeInfo,
db: asyncpg.Connection,
) -> str:
endpoint = await db.fetchrow(
"""
SELECT host, port, protocol
FROM ems.site_endpoint
WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true
ORDER BY id
LIMIT 1
""",
site_id,
)
if not endpoint:
return "OK Loxone: no endpoint, skipped"
proto = (endpoint["protocol"] or "http").lower()
if proto not in ("http", "https"):
proto = "http"
host = endpoint["host"]
port = int(endpoint["port"] or (443 if proto == "https" else 80))
base = f"{proto}://{host}:{port}/dev/sps/io"
settings = get_settings()
user = settings.loxone_user or os.getenv("LOXONE_USER") or ""
password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or ""
auth = (user, password) if user else None
batt_display = 0 if setpoints.battery_w is None else int(setpoints.battery_w)
paths: list[tuple[str, int]] = [
(f"{base}/EMS_Mode/{mode.loxone_mode_value}", mode.loxone_mode_value),
(f"{base}/EMS_Battery_Setpoint_W/{batt_display}", batt_display),
(f"{base}/EMS_Grid_Setpoint_W/{setpoints.grid_setpoint_w}", setpoints.grid_setpoint_w),
(f"{base}/EMS_EV1_Power_W/{setpoints.ev1_power_w}", setpoints.ev1_power_w),
(f"{base}/EMS_EV2_Power_W/{setpoints.ev2_power_w}", setpoints.ev2_power_w),
(
f"{base}/EMS_HeatPump_Enable/{1 if setpoints.heat_pump_enable else 0}",
1 if setpoints.heat_pump_enable else 0,
),
]
errs: list[str] = []
try:
async with httpx.AsyncClient(timeout=5.0) as client:
for url, _ in paths:
try:
r = await client.get(url, auth=auth)
r.raise_for_status()
except Exception as e:
errs.append(f"{url!s}: {e}")
except Exception as e:
return f"FAIL Loxone: client {e}"
if errs:
return "FAIL Loxone: " + "; ".join(errs[:3])
return "OK Loxone: all virtual inputs updated"