- list poll 5→10 min; vehicle_data JEN při přechodu asleep→online nebo max 1×/15 min (data /usr/bin/zsh.002/req); wake nikdy (gate na online) - otevřená ev_session = auto u wallboxu → ŽÁDNÉ API cally (při AC nabíjení auto nespí — bez gatu by data tekla celou noc nabíjení) Odhad: </měsíc (online okna mimo wallbox jsou krátká). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
725 lines
28 KiB
Python
725 lines
28 KiB
Python
"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
|
||
import asyncpg
|
||
from app.ws_manager import manager
|
||
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from services.modbus_client import get_modbus_client
|
||
|
||
_PRAGUE_TZ_NOTIFY = ZoneInfo("Europe/Prague")
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
|
||
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
|
||
_BG_POOL: asyncpg.Pool | None = None
|
||
|
||
|
||
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
|
||
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
|
||
|
||
Deferred importy: planning_engine/control_exporter importují control balík,
|
||
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
|
||
"""
|
||
if _BG_POOL is None:
|
||
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
|
||
return
|
||
try:
|
||
from services.control_exporter import export_setpoints
|
||
from services.planning_engine import run_rolling_replan
|
||
|
||
async with _BG_POOL.acquire() as conn:
|
||
# Tesla: skutečné SoC do session PŘED replanem (selhání nesmí blokovat plán)
|
||
try:
|
||
await _patch_session_from_tesla(site_id, charger_code, conn)
|
||
except Exception:
|
||
logger.exception(
|
||
"Tesla SoC fetch failed (site=%s, %s) — replan jede na defaultech",
|
||
site_id, charger_code,
|
||
)
|
||
await run_rolling_replan(
|
||
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
|
||
)
|
||
await export_setpoints(site_id, conn)
|
||
try:
|
||
from services.ev_notify import send_ev_arrival
|
||
|
||
await send_ev_arrival(site_id, charger_code, conn)
|
||
except Exception:
|
||
logger.exception("EV arrival Discord notify failed (%s)", charger_code)
|
||
logger.info(
|
||
"EV arrival replan+export done (site=%s, charger=%s)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
|
||
)
|
||
|
||
|
||
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
|
||
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
|
||
|
||
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
|
||
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
|
||
"""
|
||
if _BG_POOL is None:
|
||
return
|
||
try:
|
||
from app.db_json import fetch_json
|
||
from services.tesla_client import get_charge_state
|
||
|
||
async with _BG_POOL.acquire() as conn:
|
||
ctx = await fetch_json(
|
||
conn,
|
||
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
|
||
return
|
||
state = await get_charge_state(conn, ctx.get("vin"))
|
||
if state is None:
|
||
return
|
||
await conn.execute(
|
||
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
|
||
site_id,
|
||
int(ctx["vehicle_id"]),
|
||
state.get("odometer_km"),
|
||
float(state["battery_level"]),
|
||
state.get("charging_state"),
|
||
)
|
||
logger.info(
|
||
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
|
||
site_id, charger_code,
|
||
state["battery_level"], state.get("odometer_km"),
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"EV departure obs failed (site=%s, %s)", site_id, charger_code
|
||
)
|
||
|
||
|
||
async def _patch_session_from_tesla(
|
||
site_id: int, charger_code: str, conn: asyncpg.Connection
|
||
) -> None:
|
||
"""Po příjezdu: pro vozidlo s api_type='tesla' doplnit reálné SoC do session.
|
||
|
||
Energie se NEpočítá tady — soc_at_connect_pct + target_soc_pct si přebere
|
||
fn_planning_site_context (SQL-first). VIN se při prvním úspěchu naučí.
|
||
"""
|
||
from app.db_json import fetch_json
|
||
from services.tesla_client import get_charge_state
|
||
|
||
ctx = await fetch_json(
|
||
conn,
|
||
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
|
||
return
|
||
session_id = ctx.get("session_id")
|
||
if session_id is None:
|
||
logger.warning("Tesla hook: chybí otevřená session (%s)", charger_code)
|
||
return
|
||
|
||
state = await get_charge_state(conn, ctx.get("vin"))
|
||
if state is None:
|
||
return
|
||
|
||
await conn.execute(
|
||
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
|
||
site_id,
|
||
int(ctx["vehicle_id"]),
|
||
state.get("odometer_km"),
|
||
float(state["battery_level"]),
|
||
state.get("charging_state"),
|
||
)
|
||
|
||
if not ctx.get("vin") and state.get("vin"):
|
||
await conn.execute(
|
||
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
|
||
int(ctx["vehicle_id"]),
|
||
str(state["vin"]),
|
||
)
|
||
|
||
patch: dict = {"soc_at_connect_pct": state["battery_level"]}
|
||
if state.get("charge_limit_soc"):
|
||
patch["target_soc_pct"] = state["charge_limit_soc"]
|
||
import json as _json
|
||
await fetch_json(
|
||
conn,
|
||
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
|
||
site_id,
|
||
int(session_id),
|
||
_json.dumps(patch),
|
||
)
|
||
logger.info(
|
||
"Tesla SoC -> session %s: level=%s%%, limit=%s%% (%s)",
|
||
session_id,
|
||
state["battery_level"],
|
||
state.get("charge_limit_soc"),
|
||
charger_code,
|
||
)
|
||
|
||
# Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers)
|
||
DEYE_REG_RUN_STATE = 500
|
||
DEYE_REG_BATT_CHARGE_TODAY = 514
|
||
DEYE_REG_BATT_DISCHARGE_TODAY = 515
|
||
DEYE_REG_BATTERY_SOC = 588
|
||
DEYE_REG_BATTERY_POWER_FLOW = 590
|
||
DEYE_REG_GRID_TOTAL_POWER = 625
|
||
DEYE_REG_GEN_PORT_POWER = 667
|
||
DEYE_REG_LOAD_TOTAL_POWER = 653
|
||
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
|
||
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
|
||
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
|
||
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
|
||
DEYE_REG_PV1_POWER = 672
|
||
DEYE_REG_PV2_POWER = 673
|
||
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits0–1 == 3 → cut-off ON).
|
||
# Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based).
|
||
# Viz modbus-registers.md.
|
||
DEYE_REG_SOLAR_SELL = 145
|
||
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
|
||
|
||
# Teltonika TeltoCharge – holding registry (oficiální „Modbus RTU Communication
|
||
# protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení).
|
||
# Blok 0–40: 0–2 napětí L1–L3 (V), 3–5 proud L1–L3 (×10 A), 6 EVSE status (DLM),
|
||
# 27 charge point state, 33 IEC61851, 34/35 warning/error bity,
|
||
# 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s).
|
||
# Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 6–32), 16 start/stop.
|
||
TELTO_REG_BLOCK_START = 0
|
||
TELTO_REG_BLOCK_COUNT = 41
|
||
|
||
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
|
||
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
|
||
TELTO_STATUS_MAP = {
|
||
0: "charging", # C – nabíjí
|
||
1: "preparing", # B1 – EV připojeno, čeká na EV
|
||
2: "preparing", # B2 – dříve nabíjelo, nedostatek výkonu
|
||
3: "preparing", # B3 – nenabíjelo, nedostatek výkonu
|
||
4: "suspended_ev", # D1 – zastaveno vozidlem
|
||
5: "suspended_evse", # D2 – bez autorizace
|
||
6: "suspended_evse", # D3 – nabíjení nepovoleno
|
||
7: "available", # A – bez EV
|
||
8: "faulted", # F – chyba
|
||
9: "unknown", # E
|
||
}
|
||
|
||
|
||
def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]:
|
||
"""Čistý parser bloku registrů 0–40 TeltoCharge (testovatelné bez Modbus)."""
|
||
if len(regs) < TELTO_REG_BLOCK_COUNT:
|
||
raise ValueError(f"TeltoCharge frame too short: {len(regs)}")
|
||
status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown")
|
||
current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0
|
||
return {
|
||
"status": status,
|
||
"power_w": int(regs[38]),
|
||
"session_energy_kwh": int(regs[39]) / 100.0,
|
||
"current_a": current_a,
|
||
"voltage_v": int(regs[0]),
|
||
"warning_bits": int(regs[34]),
|
||
"error_bits": int(regs[35]),
|
||
"evse_status_raw": int(regs[6]),
|
||
"charge_point_state_raw": int(regs[27]),
|
||
}
|
||
|
||
|
||
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
|
||
"""
|
||
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
|
||
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
|
||
"""
|
||
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
|
||
|
||
|
||
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
|
||
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
|
||
if reg145 is None and reg179 is None:
|
||
return None, None
|
||
flags = 0
|
||
if reg145 is not None and int(reg145) == 0:
|
||
flags |= 1
|
||
if reg179 is not None and (int(reg179) & 3) == 3:
|
||
flags |= 2
|
||
return (flags != 0), flags
|
||
|
||
|
||
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select inverter_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_inverter_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
inv_id = row["id"]
|
||
code = row["code"]
|
||
host = row["host"]
|
||
port = int(row["port"] or 502)
|
||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||
try:
|
||
client = await get_modbus_client(host, port)
|
||
async with client.batch(unit_id) as mb:
|
||
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
|
||
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
|
||
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
|
||
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
|
||
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
|
||
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
|
||
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
|
||
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
|
||
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
|
||
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
|
||
grid_energy_regs = await mb.read_holding_registers(
|
||
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
|
||
)
|
||
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
|
||
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
|
||
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
|
||
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
|
||
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
|
||
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
|
||
|
||
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
|
||
site_id,
|
||
inv_id,
|
||
measured_at,
|
||
pv_power_w,
|
||
pv1_power,
|
||
pv2_power,
|
||
gen_port_power,
|
||
float(battery_soc),
|
||
battery_power,
|
||
batt_charge_today,
|
||
batt_discharge_today,
|
||
grid_power,
|
||
load_power,
|
||
grid_import_total_wh,
|
||
grid_export_total_wh,
|
||
run_state,
|
||
is_export_limited,
|
||
pv_derating_flags,
|
||
)
|
||
inv_temp: float | None = None
|
||
await manager.broadcast_telemetry(
|
||
{
|
||
"type": "telemetry",
|
||
"site_id": site_id,
|
||
"ts": datetime.now(timezone.utc).isoformat(),
|
||
"pv_power_w": pv_power_w,
|
||
"battery_soc_pct": float(battery_soc),
|
||
"battery_power_w": battery_power,
|
||
"grid_power_w": grid_power,
|
||
"load_power_w": load_power,
|
||
"gen_port_power_w": gen_port_power,
|
||
"inverter_temp_c": inv_temp,
|
||
"is_export_limited": is_export_limited,
|
||
"pv_derating_flags": pv_derating_flags,
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
|
||
|
||
|
||
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select charger_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_ev_charger_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
connector_id = 1
|
||
for row in rows:
|
||
code = row["code"]
|
||
charger_id = row["id"]
|
||
host = row["host"]
|
||
port = int(row["port"] or 502)
|
||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||
|
||
try:
|
||
client = await get_modbus_client(host, port)
|
||
async with client.batch(unit_id) as mb:
|
||
regs = await mb.read_holding_registers(
|
||
TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT
|
||
)
|
||
frame = parse_teltocharge_frame(regs)
|
||
except Exception as e:
|
||
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
|
||
# falešně ukončoval EV session a špinil bazál (power 0).
|
||
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
|
||
continue
|
||
|
||
current_status = str(frame["status"])
|
||
if frame["error_bits"]:
|
||
logger.warning(
|
||
"EV charger %s error bits=0x%04x warning=0x%04x",
|
||
code, frame["error_bits"], frame["warning_bits"],
|
||
)
|
||
|
||
previous_status = await db.fetchval(
|
||
"""
|
||
select status
|
||
from ems.telemetry_ev_charger
|
||
where charger_id = $1 and connector_id = $2
|
||
order by measured_at desc
|
||
limit 1
|
||
""",
|
||
charger_id,
|
||
connector_id,
|
||
)
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
|
||
site_id,
|
||
charger_id,
|
||
measured_at,
|
||
connector_id,
|
||
current_status,
|
||
int(frame["power_w"]),
|
||
float(frame["session_energy_kwh"]),
|
||
float(frame["current_a"]),
|
||
)
|
||
|
||
if previous_status is not None:
|
||
await db.fetchval(
|
||
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
|
||
site_id,
|
||
charger_id,
|
||
str(previous_status),
|
||
current_status,
|
||
measured_at,
|
||
)
|
||
if previous_status == "available" and current_status != "available":
|
||
logger.info("EV arrival detected on charger %s", code)
|
||
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
|
||
elif previous_status != "available" and current_status == "available":
|
||
logger.info("EV departure detected on charger %s", code)
|
||
asyncio.create_task(_on_ev_departure(site_id, str(code)))
|
||
|
||
|
||
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select heat_pump_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_heat_pump_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
code = row["code"]
|
||
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
|
||
await db.execute(
|
||
"select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)",
|
||
site_id,
|
||
row["id"],
|
||
measured_at,
|
||
0,
|
||
10.0,
|
||
45.0,
|
||
55.0,
|
||
"standby",
|
||
)
|
||
|
||
|
||
async def poll_loxone_sensors(site_id: int, db: asyncpg.Connection) -> None:
|
||
"""Čidla z Loxone (teplota bazénu, akumulační nádrže…): GET /jdev/sps/io/<name>/state.
|
||
|
||
Endpoint = site loxone_http; auth LOXONE_USER/PASSWORD (env). Hodnota
|
||
z LL.value ("23.5°" → 23.5). Bez čidel v ems.loxone_sensor no-op.
|
||
"""
|
||
rows = await db.fetch(
|
||
"""
|
||
select ls.id, ls.loxone_name, se.host, se.port, se.protocol
|
||
from ems.loxone_sensor ls
|
||
join ems.site_endpoint se
|
||
on se.site_id = ls.site_id and se.endpoint_type = 'loxone_http' and se.enabled
|
||
where ls.site_id = $1 and ls.enabled
|
||
""",
|
||
site_id,
|
||
)
|
||
if not rows:
|
||
return
|
||
import os
|
||
import re as _re
|
||
|
||
import httpx
|
||
|
||
auth = None
|
||
user = os.getenv("LOXONE_USER") or ""
|
||
if user:
|
||
auth = (user, os.getenv("LOXONE_PASSWORD") or "")
|
||
measured_at = datetime.now(timezone.utc)
|
||
async with httpx.AsyncClient(timeout=5.0, auth=auth) as client:
|
||
for r in rows:
|
||
proto = (r["protocol"] or "http").lower()
|
||
port = int(r["port"] or (443 if proto == "https" else 80))
|
||
url = f"{proto}://{r['host']}:{port}/jdev/sps/io/{r['loxone_name']}/state"
|
||
try:
|
||
resp = await client.get(url)
|
||
resp.raise_for_status()
|
||
raw = str((resp.json().get("LL") or {}).get("value", ""))
|
||
m = _re.search(r"-?\d+(?:[.,]\d+)?", raw)
|
||
if m is None:
|
||
continue
|
||
value = float(m.group(0).replace(",", "."))
|
||
except Exception as e:
|
||
logger.warning("Loxone sensor %s read failed: %s", r["loxone_name"], e)
|
||
continue
|
||
await db.execute(
|
||
"""
|
||
insert into ems.telemetry_loxone_sensor (sensor_id, measured_at, value)
|
||
values ($1, $2, $3) on conflict do nothing
|
||
""",
|
||
int(r["id"]),
|
||
measured_at,
|
||
value,
|
||
)
|
||
|
||
|
||
#: presence poll pacing (sekundy) a geofence poloměr (m).
|
||
#: Fleet API je placené (vehicle_data $0.002/req, wake $0.02 — wake NIKDY):
|
||
#: list 10 min; vehicle_data jen při přechodu asleep→online a pak max 1×/15 min;
|
||
#: při otevřené ev_session se nepolluje vůbec (auto je u wallboxu = doma,
|
||
#: a při AC nabíjení nespí — bez gatu by data cally tekly celou noc).
|
||
EV_PRESENCE_POLL_S = 600
|
||
EV_PRESENCE_DATA_MIN_S = 900
|
||
EV_PRESENCE_HOME_RADIUS_M = 150
|
||
#: anti-spam "píchni auto": min. rozestup notifikací per vozidlo (s)
|
||
EV_PLUG_NUDGE_COOLDOWN_S = 2 * 3600
|
||
_EV_PRESENCE_LAST_POLL: dict[int, float] = {}
|
||
_EV_PRESENCE_LAST_DATA: dict[int, float] = {}
|
||
_EV_PRESENCE_LAST_STATE: dict[int, str] = {}
|
||
_EV_PLUG_NUDGE_LAST: dict[int, float] = {}
|
||
|
||
|
||
def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None) -> str | None:
|
||
"""Čistá detekce přechodu: 'arrived' / 'left' / None (testovatelné)."""
|
||
if new_at_home is None or prev_at_home is None:
|
||
return None
|
||
if not prev_at_home and new_at_home:
|
||
return "arrived"
|
||
if prev_at_home and not new_at_home:
|
||
return "left"
|
||
return None
|
||
|
||
|
||
async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
|
||
"""Přítomnost vozidla: /vehicles state (nebudí) + při online poloha → geofence.
|
||
|
||
Přechod pryč→doma + nepíchnuté → Discord pobídka (s aktuálním přebytkem).
|
||
Vše se loguje do ev_presence_obs (budoucí dostupnostní statistika).
|
||
"""
|
||
loop_now = asyncio.get_running_loop().time()
|
||
if loop_now - _EV_PRESENCE_LAST_POLL.get(site_id, 0.0) < EV_PRESENCE_POLL_S:
|
||
return
|
||
_EV_PRESENCE_LAST_POLL[site_id] = loop_now
|
||
|
||
veh = await db.fetchrow(
|
||
"""
|
||
select v.id, v.vin, v.name, s.latitude, s.longitude
|
||
from ems.asset_vehicle v join ems.site s on s.id = v.site_id
|
||
where v.site_id = $1 and v.api_type = 'tesla' and v.active
|
||
order by v.id limit 1
|
||
""",
|
||
site_id,
|
||
)
|
||
if veh is None or veh["latitude"] is None:
|
||
return
|
||
|
||
# auto na wallboxu (otevřená session) = doma; žádné API cally (šetří $)
|
||
plugged = await db.fetchval(
|
||
"""
|
||
select exists(
|
||
select 1 from ems.ev_session es
|
||
where es.vehicle_id = $1 and es.session_end is null
|
||
)
|
||
""",
|
||
int(veh["id"]),
|
||
)
|
||
if plugged:
|
||
return
|
||
|
||
from services.tesla_client import get_charge_state, get_vehicle_api_state, haversine_m
|
||
|
||
try:
|
||
api_state = await get_vehicle_api_state(db, veh["vin"])
|
||
except Exception as e:
|
||
logger.warning("Tesla presence: state poll failed: %s", e)
|
||
return
|
||
if api_state is None:
|
||
return
|
||
|
||
prev_state = _EV_PRESENCE_LAST_STATE.get(int(veh["id"]))
|
||
_EV_PRESENCE_LAST_STATE[int(veh["id"])] = api_state
|
||
woke_up = api_state == "online" and prev_state != "online"
|
||
data_due = (
|
||
loop_now - _EV_PRESENCE_LAST_DATA.get(int(veh["id"]), 0.0)
|
||
>= EV_PRESENCE_DATA_MIN_S
|
||
)
|
||
|
||
at_home = None
|
||
distance_m = None
|
||
charging_state = None
|
||
shift_state = None
|
||
if api_state == "online" and (woke_up or data_due):
|
||
_EV_PRESENCE_LAST_DATA[int(veh["id"])] = loop_now
|
||
try:
|
||
st = await get_charge_state(db, veh["vin"])
|
||
except Exception as e:
|
||
logger.warning("Tesla presence: data read failed: %s", e)
|
||
st = None
|
||
if st is not None and st.get("latitude") is not None:
|
||
distance_m = int(
|
||
haversine_m(
|
||
float(st["latitude"]), float(st["longitude"]),
|
||
float(veh["latitude"]), float(veh["longitude"]),
|
||
)
|
||
)
|
||
at_home = distance_m <= EV_PRESENCE_HOME_RADIUS_M
|
||
charging_state = st.get("charging_state")
|
||
shift_state = st.get("shift_state")
|
||
|
||
prev = await db.fetchrow(
|
||
"""
|
||
select at_home from ems.ev_presence_obs
|
||
where vehicle_id = $1 and at_home is not null
|
||
order by observed_at desc limit 1
|
||
""",
|
||
int(veh["id"]),
|
||
)
|
||
await db.execute(
|
||
"""
|
||
insert into ems.ev_presence_obs
|
||
(vehicle_id, api_state, at_home, distance_m, charging_state, shift_state)
|
||
values ($1, $2, $3, $4, $5, $6)
|
||
""",
|
||
int(veh["id"]), api_state, at_home, distance_m, charging_state, shift_state,
|
||
)
|
||
|
||
trans = ev_presence_transition(prev["at_home"] if prev else None, at_home)
|
||
if trans == "arrived" and charging_state == "Disconnected":
|
||
if loop_now - _EV_PLUG_NUDGE_LAST.get(int(veh["id"]), 0.0) < EV_PLUG_NUDGE_COOLDOWN_S:
|
||
return
|
||
_EV_PLUG_NUDGE_LAST[int(veh["id"])] = loop_now
|
||
grid_w = await db.fetchval(
|
||
"select grid_power_w from ems.vw_latest_inverter where site_id = $1 limit 1",
|
||
site_id,
|
||
)
|
||
surplus = f" — právě teče {abs(int(grid_w))/1000:.1f} kW do sítě" if grid_w and grid_w < -500 else ""
|
||
from services.notification_service import send_discord
|
||
|
||
await send_discord(
|
||
db, site_id,
|
||
f"🚗 **{veh['name'] or 'EV'} je doma a nepíchnuté**{surplus}.\n"
|
||
f"Píchni ho a plán se o zbytek postará (přebytky / levné sloty).",
|
||
level="info",
|
||
)
|
||
logger.info("EV plug nudge sent (site=%s, vehicle=%s)", site_id, veh["id"])
|
||
|
||
|
||
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
|
||
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
|
||
|
||
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
|
||
"""
|
||
loop = asyncio.get_running_loop()
|
||
start = loop.time()
|
||
sites = await conn.fetch(
|
||
"select id from ems.vw_site_directory where active = true"
|
||
)
|
||
for site in sites:
|
||
sid = site["id"]
|
||
try:
|
||
await poll_inverter(sid, conn)
|
||
await poll_ev_chargers(sid, conn)
|
||
await poll_heat_pump(sid, conn)
|
||
await poll_loxone_sensors(sid, conn)
|
||
await poll_tesla_presence(sid, conn)
|
||
await poll_pool_pumps(sid, conn)
|
||
except Exception as e:
|
||
logger.error("Telemetry loop error site %s: %s", sid, e)
|
||
return loop.time() - start
|
||
|
||
|
||
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
|
||
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
|
||
global _BG_POOL
|
||
_BG_POOL = pool
|
||
while True:
|
||
try:
|
||
async with pool.acquire() as conn:
|
||
elapsed = await run_telemetry_loop(conn)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception("Telemetry wrapper DB error: %s", e)
|
||
elapsed = 0.0
|
||
await asyncio.sleep(5)
|
||
continue
|
||
|
||
if elapsed > 50:
|
||
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
|
||
await asyncio.sleep(max(0.0, 60.0 - elapsed))
|
||
|
||
|
||
async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
|
||
"""Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s.
|
||
|
||
Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení.
|
||
"""
|
||
# Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main).
|
||
from services.shelly_client import get_switch_status, shelly_base_url
|
||
|
||
rows = await db.fetch(
|
||
"""
|
||
select pump_id as id, code, shelly_switch_id, protocol, host, port
|
||
from ems.vw_asset_pool_pump_http_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
code = row["code"]
|
||
base = shelly_base_url(row["protocol"], row["host"], row["port"])
|
||
try:
|
||
status = await get_switch_status(base, int(row["shelly_switch_id"] or 0))
|
||
except Exception as e:
|
||
# Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila
|
||
# historii spotřeby (stejný princip jako u EV nabíječek výše).
|
||
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
|
||
continue
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
|
||
site_id,
|
||
row["id"],
|
||
measured_at,
|
||
status.output,
|
||
int(round(status.apower_w)) if status.apower_w is not None else None,
|
||
int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None,
|
||
)
|