EV řízení: zápis Amps-to-use přes journal + watchdog + okamžitý replan po příjezdu
Bod 1 — write_ev_setpoints reálně (konec TODO stubu):
- reg 15 (0=stop, 6–32 A) z plánu přes _current_limit_for_charger; plná
journal pipeline (create_modbus_commands → execute, verify job 2 min generic)
- watchdog reg 19=300 s + reg 20=8 A: výpadek EMS → wallbox po 5 min failsafe
8 A (auto se přes noc nabije); drop-unchanged → zapisuje se jen při změně
- fn_modbus_last_verified_map: + p_asset_type (drop 2-arg; dosud hardcoded
'inverter' — pro chargery vracela {})
- verify: SELF_SUSTAIN fallback explicitně jen pro asset_type='inverter' —
mismatch wallboxu nesmí degradovat režim celé site
- journal register_name: mimo inverter platí jméno od volajícího
Bod 2 — telemetry_collector: přechod available→connected spustí fire-and-forget
run_rolling_replan(triggered_by=ev_arrival:<code>) + export_setpoints přes BG
pool — reakce na příjezd ~60 s místo až 15 min.
Bod 3 (Tesla API SoC) čeká na developer credentials.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,8 +46,12 @@ async def _fetch_written_deye_clock_commands(
|
|||||||
return list(rows)
|
return list(rows)
|
||||||
|
|
||||||
|
|
||||||
async def _fetch_last_verified_inverter_registers(
|
async def _fetch_last_verified_registers(
|
||||||
site_id: int, inverter_asset_id: int, db: asyncpg.Connection
|
site_id: int,
|
||||||
|
asset_id: int,
|
||||||
|
db: asyncpg.Connection,
|
||||||
|
*,
|
||||||
|
asset_type: str = "inverter",
|
||||||
) -> dict[int, int]:
|
) -> dict[int, int]:
|
||||||
"""
|
"""
|
||||||
Poslední hodnota na zařízení podle journalu (jen status verified).
|
Poslední hodnota na zařízení podle journalu (jen status verified).
|
||||||
@@ -55,15 +59,25 @@ async def _fetch_last_verified_inverter_registers(
|
|||||||
"""
|
"""
|
||||||
raw = await db.fetchval(
|
raw = await db.fetchval(
|
||||||
"""
|
"""
|
||||||
select ems.fn_modbus_last_verified_map($1::int, $2::int)
|
select ems.fn_modbus_last_verified_map($1::int, $2::int, $3::text)
|
||||||
""",
|
""",
|
||||||
site_id,
|
site_id,
|
||||||
inverter_asset_id,
|
asset_id,
|
||||||
|
asset_type,
|
||||||
)
|
)
|
||||||
data = raw if isinstance(raw, dict) else json.loads(raw)
|
data = raw if isinstance(raw, dict) else json.loads(raw)
|
||||||
return {int(k): int(v) for k, v in data.items()}
|
return {int(k): int(v) for k, v in data.items()}
|
||||||
|
|
||||||
|
|
||||||
|
async def _fetch_last_verified_inverter_registers(
|
||||||
|
site_id: int, inverter_asset_id: int, db: asyncpg.Connection
|
||||||
|
) -> dict[int, int]:
|
||||||
|
"""Zpětně kompatibilní alias (Deye cesty)."""
|
||||||
|
return await _fetch_last_verified_registers(
|
||||||
|
site_id, inverter_asset_id, db, asset_type="inverter"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _drop_registers_matching_last_verified(
|
def _drop_registers_matching_last_verified(
|
||||||
registers: list[tuple[int, str, int]],
|
registers: list[tuple[int, str, int]],
|
||||||
last_verified: dict[int, int],
|
last_verified: dict[int, int],
|
||||||
@@ -102,8 +116,14 @@ async def create_modbus_commands(
|
|||||||
Vrátí list command IDs.
|
Vrátí list command IDs.
|
||||||
"""
|
"""
|
||||||
ids: list[int] = []
|
ids: list[int] = []
|
||||||
for reg, _ignored_name, val in registers:
|
for reg, given_name, val in registers:
|
||||||
register_name = DEYE_REGISTER_NAMES.get(reg, f"reg_{reg}")
|
# Deye registry mají kanonická jména; pro ostatní zařízení (Teltonika…)
|
||||||
|
# platí jméno dodané volajícím.
|
||||||
|
register_name = (
|
||||||
|
DEYE_REGISTER_NAMES.get(reg)
|
||||||
|
if asset_type == "inverter"
|
||||||
|
else None
|
||||||
|
) or given_name or f"reg_{reg}"
|
||||||
cmd_id = await db.fetchval(
|
cmd_id = await db.fetchval(
|
||||||
"""
|
"""
|
||||||
INSERT INTO ems.modbus_command
|
INSERT INTO ems.modbus_command
|
||||||
|
|||||||
@@ -13,6 +13,32 @@ from services.control.models import ControlSetpoints, OperatingModeInfo
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Teltonika TeltoCharge – zápisové registry (oficiální protokol rev 0.5;
|
||||||
|
# docs/04-modules/modbus-registers-teltocharge.md). FC 16 přes journal.
|
||||||
|
TELTO_REG_AMPS_TO_USE = 15 # 0 = stop, 6–32 A
|
||||||
|
TELTO_REG_COMM_TIMEOUT_S = 19 # watchdog: bez komunikace → failsafe
|
||||||
|
TELTO_REG_FAILSAFE_CURRENT_A = 20
|
||||||
|
#: Výpadek EMS: po 5 min bez zápisu wallbox přejde na failsafe proud —
|
||||||
|
#: auto se přes noc nabije i bez EMS (pomalu), místo aby stálo na 0 A.
|
||||||
|
TELTO_WATCHDOG_TIMEOUT_S = 300
|
||||||
|
TELTO_WATCHDOG_FAILSAFE_A = 8
|
||||||
|
|
||||||
|
|
||||||
|
def _telto_setpoint_registers(current_a: int) -> list[tuple[int, str, int]]:
|
||||||
|
"""Registry pro jeden export tick: limit proudu + watchdog konfigurace.
|
||||||
|
|
||||||
|
Watchdog (19/20) se posílá s každým tickem, ale journal drop-unchanged ho
|
||||||
|
po prvním verified zápisu přeskakuje — reálně se zapíše jednou.
|
||||||
|
"""
|
||||||
|
a = int(current_a)
|
||||||
|
if a < 6:
|
||||||
|
a = 0
|
||||||
|
return [
|
||||||
|
(TELTO_REG_AMPS_TO_USE, "telto_amps_to_use", min(a, 32)),
|
||||||
|
(TELTO_REG_COMM_TIMEOUT_S, "telto_comm_timeout_s", TELTO_WATCHDOG_TIMEOUT_S),
|
||||||
|
(TELTO_REG_FAILSAFE_CURRENT_A, "telto_failsafe_a", TELTO_WATCHDOG_FAILSAFE_A),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
|
def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
|
||||||
c = (charger_code or "").strip().lower()
|
c = (charger_code or "").strip().lower()
|
||||||
@@ -34,9 +60,16 @@ def _current_limit_for_charger(charger_code: str, sp: ControlSetpoints) -> int:
|
|||||||
async def write_ev_setpoints(
|
async def write_ev_setpoints(
|
||||||
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
|
site_id: int, setpoints: ControlSetpoints, db: asyncpg.Connection
|
||||||
) -> str:
|
) -> str:
|
||||||
|
from services.control.modbus_journal import (
|
||||||
|
_drop_registers_matching_last_verified,
|
||||||
|
_fetch_last_verified_registers,
|
||||||
|
create_modbus_commands,
|
||||||
|
execute_modbus_commands,
|
||||||
|
)
|
||||||
|
|
||||||
rows = await db.fetch(
|
rows = await db.fetch(
|
||||||
"""
|
"""
|
||||||
SELECT ec.code, se.host, se.port, se.unit_id
|
SELECT ec.id AS asset_id, ec.code, se.host, se.port, se.unit_id
|
||||||
FROM ems.asset_ev_charger ec
|
FROM ems.asset_ev_charger ec
|
||||||
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
|
JOIN ems.site_endpoint se ON se.id = ec.endpoint_id
|
||||||
WHERE ec.site_id = $1
|
WHERE ec.site_id = $1
|
||||||
@@ -50,15 +83,49 @@ async def write_ev_setpoints(
|
|||||||
if not rows:
|
if not rows:
|
||||||
return "OK EV: no schedulable chargers"
|
return "OK EV: no schedulable chargers"
|
||||||
|
|
||||||
|
written = 0
|
||||||
for row in rows:
|
for row in rows:
|
||||||
code = row["code"]
|
code = row["code"]
|
||||||
|
asset_id = int(row["asset_id"])
|
||||||
|
host = str(row["host"])
|
||||||
|
port = int(row["port"] or 502)
|
||||||
|
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||||||
current_a = _current_limit_for_charger(code, setpoints)
|
current_a = _current_limit_for_charger(code, setpoints)
|
||||||
|
|
||||||
|
registers = _telto_setpoint_registers(current_a)
|
||||||
|
last_verified = await _fetch_last_verified_registers(
|
||||||
|
site_id, asset_id, db, asset_type="ev_charger"
|
||||||
|
)
|
||||||
|
registers, skipped = _drop_registers_matching_last_verified(
|
||||||
|
registers, last_verified
|
||||||
|
)
|
||||||
|
if not registers:
|
||||||
|
logger.debug("EV setpoint [%s]: beze změny (%s A)", code, current_a)
|
||||||
|
continue
|
||||||
|
|
||||||
|
cmd_ids = await create_modbus_commands(
|
||||||
|
site_id,
|
||||||
|
None,
|
||||||
|
"ev_charger",
|
||||||
|
asset_id,
|
||||||
|
code,
|
||||||
|
host,
|
||||||
|
port,
|
||||||
|
unit_id,
|
||||||
|
registers,
|
||||||
|
db,
|
||||||
|
)
|
||||||
|
ok = await execute_modbus_commands(cmd_ids, db)
|
||||||
|
written += 1
|
||||||
logger.info(
|
logger.info(
|
||||||
"EV setpoint [%s]: %sA (TODO: Modbus registers)",
|
"EV setpoint [%s]: %s A (regs %s%s) -> %s",
|
||||||
code,
|
code,
|
||||||
current_a,
|
current_a,
|
||||||
|
[r for r, _, _ in registers],
|
||||||
|
f", skip {skipped}" if skipped else "",
|
||||||
|
"written" if ok else "FAILED",
|
||||||
)
|
)
|
||||||
return f"OK EV: logged {len(rows)} charger(s) (Modbus TODO)"
|
return f"OK EV: {written}/{len(rows)} charger(s) written"
|
||||||
|
|
||||||
|
|
||||||
async def write_heat_pump_setpoint(
|
async def write_heat_pump_setpoint(
|
||||||
|
|||||||
@@ -353,7 +353,12 @@ async def verify_modbus_commands(
|
|||||||
await execute_modbus_commands([cmd_id], db)
|
await execute_modbus_commands([cmd_id], db)
|
||||||
await verify_modbus_commands([cmd_id], db, site_id)
|
await verify_modbus_commands([cmd_id], db, site_id)
|
||||||
else:
|
else:
|
||||||
if deye_reg_triggers_self_sustain_after_verify_exhaust(reg):
|
# SELF_SUSTAIN fallback je Deye politika — mismatch na jiném
|
||||||
|
# zařízení (EV wallbox…) nesmí degradovat režim celé lokality.
|
||||||
|
if (
|
||||||
|
str(cmd["asset_type"]) == "inverter"
|
||||||
|
and deye_reg_triggers_self_sustain_after_verify_exhaust(reg)
|
||||||
|
):
|
||||||
logger.critical(
|
logger.critical(
|
||||||
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
|
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
|
||||||
cmd_id,
|
cmd_id,
|
||||||
|
|||||||
@@ -13,6 +13,39 @@ from services.modbus_client import get_modbus_client
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
|
||||||
|
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
|
||||||
|
_BG_POOL: asyncpg.Pool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
|
||||||
|
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
|
||||||
|
|
||||||
|
Deferred importy: planning_engine/control_exporter importují control balík,
|
||||||
|
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
|
||||||
|
"""
|
||||||
|
if _BG_POOL is None:
|
||||||
|
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
from services.control_exporter import export_setpoints
|
||||||
|
from services.planning_engine import run_rolling_replan
|
||||||
|
|
||||||
|
async with _BG_POOL.acquire() as conn:
|
||||||
|
await run_rolling_replan(
|
||||||
|
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
|
||||||
|
)
|
||||||
|
await export_setpoints(site_id, conn)
|
||||||
|
logger.info(
|
||||||
|
"EV arrival replan+export done (site=%s, charger=%s)",
|
||||||
|
site_id,
|
||||||
|
charger_code,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
|
||||||
|
)
|
||||||
|
|
||||||
# Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers)
|
# Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers)
|
||||||
DEYE_REG_RUN_STATE = 500
|
DEYE_REG_RUN_STATE = 500
|
||||||
DEYE_REG_BATT_CHARGE_TODAY = 514
|
DEYE_REG_BATT_CHARGE_TODAY = 514
|
||||||
@@ -254,6 +287,7 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
|
|||||||
)
|
)
|
||||||
if previous_status == "available" and current_status != "available":
|
if previous_status == "available" and current_status != "available":
|
||||||
logger.info("EV arrival detected on charger %s", code)
|
logger.info("EV arrival detected on charger %s", code)
|
||||||
|
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
|
||||||
elif previous_status != "available" and current_status == "available":
|
elif previous_status != "available" and current_status == "available":
|
||||||
logger.info("EV departure detected on charger %s", code)
|
logger.info("EV departure detected on charger %s", code)
|
||||||
|
|
||||||
@@ -307,6 +341,8 @@ async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
|
|||||||
|
|
||||||
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
|
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
|
||||||
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
|
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
|
||||||
|
global _BG_POOL
|
||||||
|
_BG_POOL = pool
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
|
|||||||
@@ -1,8 +1,13 @@
|
|||||||
-- map register -> value_verified z modbus_command (poslední verified řádek per register)
|
-- map register -> value_verified z modbus_command (poslední verified řádek per register)
|
||||||
|
-- Od 2026-06-11 generické přes p_asset_type (inverter / ev_charger / …);
|
||||||
|
-- starou 2-arg signaturu dropnout (default by způsobil ambiguitu).
|
||||||
|
|
||||||
|
drop function if exists ems.fn_modbus_last_verified_map(int, int);
|
||||||
|
|
||||||
create or replace function ems.fn_modbus_last_verified_map(
|
create or replace function ems.fn_modbus_last_verified_map(
|
||||||
p_site_id int,
|
p_site_id int,
|
||||||
p_asset_id int
|
p_asset_id int,
|
||||||
|
p_asset_type text default 'inverter'
|
||||||
)
|
)
|
||||||
returns jsonb
|
returns jsonb
|
||||||
language sql
|
language sql
|
||||||
@@ -18,7 +23,10 @@ as $fn$
|
|||||||
v.value_verified
|
v.value_verified
|
||||||
from ems.vw_modbus_last_verified v
|
from ems.vw_modbus_last_verified v
|
||||||
where v.site_id = p_site_id
|
where v.site_id = p_site_id
|
||||||
and v.asset_type = 'inverter'
|
and v.asset_type = p_asset_type
|
||||||
and v.asset_id = p_asset_id
|
and v.asset_id = p_asset_id
|
||||||
) t;
|
) t;
|
||||||
$fn$;
|
$fn$;
|
||||||
|
|
||||||
|
comment on function ems.fn_modbus_last_verified_map is
|
||||||
|
'Mapa register -> poslední verified hodnota z modbus_command pro dané zařízení (site, asset_type, asset_id). Slouží k drop-unchanged v control exporteru.';
|
||||||
|
|||||||
Reference in New Issue
Block a user