- R__097: vw_latest_pool_pump + vw_pool_pump_day_energy (denní kWh z delty čítače, minuty běhu) + ems_anon granty - PoolCard na Dashboardu: stav/W/dnešní kWh+hodiny/7denní mini sloupce - _notify_ev_arrival_plan: po příjezdu EV Discord souhrn (SoC auta → cíl, deadline, nabíjecí okna shlukovaná ze slotů aktivního plánu, ø cena) - docs/discord-ev-interaction.md: fáze B (bot s tlačítky přes gateway — žádný veřejný endpoint; čeká na DISCORD_BOT_TOKEN od uživatele) - docs: pool-shelly + ev-charging aktualizovány (pravidlo docs 1:1) První commit na dev větvi (nová kadence: deploy až s milníkovým merge). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
606 lines
23 KiB
Python
606 lines
23 KiB
Python
"""Sběr telemetrie z Modbus: Deye (střídač), Teltonika TeltoCharge (EV); TČ zatím placeholder."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
|
||
import asyncpg
|
||
from app.ws_manager import manager
|
||
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from services.modbus_client import get_modbus_client
|
||
|
||
_PRAGUE_TZ_NOTIFY = ZoneInfo("Europe/Prague")
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
#: Pool pro fire-and-forget akce mimo hlavní poll spojení (např. replan po
|
||
#: příjezdu EV). Nastavuje run_telemetry_loop_wrapper.
|
||
_BG_POOL: asyncpg.Pool | None = None
|
||
|
||
|
||
async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
|
||
"""Okamžitý replan + export po příjezdu EV (jinak by se čekalo až na */15).
|
||
|
||
Deferred importy: planning_engine/control_exporter importují control balík,
|
||
který se importuje nezávisle — vyhýbáme se importnímu cyklu při startu.
|
||
"""
|
||
if _BG_POOL is None:
|
||
logger.warning("EV arrival: BG pool není k dispozici, čeká se na rolling tick")
|
||
return
|
||
try:
|
||
from services.control_exporter import export_setpoints
|
||
from services.planning_engine import run_rolling_replan
|
||
|
||
async with _BG_POOL.acquire() as conn:
|
||
# Tesla: skutečné SoC do session PŘED replanem (selhání nesmí blokovat plán)
|
||
try:
|
||
await _patch_session_from_tesla(site_id, charger_code, conn)
|
||
except Exception:
|
||
logger.exception(
|
||
"Tesla SoC fetch failed (site=%s, %s) — replan jede na defaultech",
|
||
site_id, charger_code,
|
||
)
|
||
await run_rolling_replan(
|
||
site_id, conn, triggered_by=f"ev_arrival:{charger_code}"
|
||
)
|
||
await export_setpoints(site_id, conn)
|
||
try:
|
||
await _notify_ev_arrival_plan(site_id, charger_code, conn)
|
||
except Exception:
|
||
logger.exception("EV arrival Discord notify failed (%s)", charger_code)
|
||
logger.info(
|
||
"EV arrival replan+export done (site=%s, charger=%s)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"EV arrival replan failed (site=%s, charger=%s)", site_id, charger_code
|
||
)
|
||
|
||
|
||
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
|
||
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
|
||
|
||
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
|
||
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
|
||
"""
|
||
if _BG_POOL is None:
|
||
return
|
||
try:
|
||
from app.db_json import fetch_json
|
||
from services.tesla_client import get_charge_state
|
||
|
||
async with _BG_POOL.acquire() as conn:
|
||
ctx = await fetch_json(
|
||
conn,
|
||
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
|
||
return
|
||
state = await get_charge_state(conn, ctx.get("vin"))
|
||
if state is None:
|
||
return
|
||
await conn.execute(
|
||
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
|
||
site_id,
|
||
int(ctx["vehicle_id"]),
|
||
state.get("odometer_km"),
|
||
float(state["battery_level"]),
|
||
state.get("charging_state"),
|
||
)
|
||
logger.info(
|
||
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
|
||
site_id, charger_code,
|
||
state["battery_level"], state.get("odometer_km"),
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"EV departure obs failed (site=%s, %s)", site_id, charger_code
|
||
)
|
||
|
||
|
||
async def _notify_ev_arrival_plan(
|
||
site_id: int, charger_code: str, conn: asyncpg.Connection
|
||
) -> None:
|
||
"""Discord souhrn po příjezdu EV: stav baterie auta + kdy se bude nabíjet.
|
||
|
||
Čte čerstvý aktivní plán (ev sloty s výkonem > 0) a otevřenou session;
|
||
sloty shlukuje do souvislých oken. Fáze B (interakce „odjíždím za 2h"
|
||
tlačítkem) = Discord bot, viz docs/discord-ev-interaction.md.
|
||
"""
|
||
from services.notification_service import send_discord
|
||
|
||
row = await conn.fetchrow(
|
||
"""
|
||
select es.soc_at_connect_pct, es.target_soc_pct, es.target_deadline,
|
||
v.battery_capacity_kwh, v.name as vehicle_name
|
||
from ems.ev_session es
|
||
join ems.asset_ev_charger c on c.id = es.charger_id
|
||
left join ems.asset_vehicle v on v.id = es.vehicle_id
|
||
where es.site_id = $1 and c.code = $2 and es.session_end is null
|
||
order by es.id desc limit 1
|
||
""",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
if row is None:
|
||
return
|
||
ev_col = "ev1_setpoint_w" if charger_code.endswith("1") else "ev2_setpoint_w"
|
||
slots = await conn.fetch(
|
||
f"""
|
||
select pi.interval_start, pi.{ev_col} as w, pi.effective_buy_price
|
||
from ems.planning_interval pi
|
||
join ems.planning_run pr on pr.id = pi.run_id
|
||
where pr.site_id = $1 and pr.status = 'active'
|
||
and coalesce(pi.{ev_col}, 0) > 0
|
||
order by pi.interval_start
|
||
""",
|
||
site_id,
|
||
)
|
||
|
||
def _fmt(dt) -> str:
|
||
return dt.astimezone(_PRAGUE_TZ_NOTIFY).strftime("%H:%M")
|
||
|
||
windows: list[str] = []
|
||
if slots:
|
||
start = prev = slots[0]["interval_start"]
|
||
kwh = 0.0
|
||
prices: list[float] = []
|
||
for r in slots:
|
||
ts = r["interval_start"]
|
||
if (ts - prev).total_seconds() > 900:
|
||
windows.append(f"{_fmt(start)}–{_fmt(prev)} (+15m)")
|
||
start = ts
|
||
prev = ts
|
||
kwh += float(r["w"]) * 0.25 / 1000.0
|
||
prices.append(float(r["effective_buy_price"] or 0))
|
||
windows.append(f"{_fmt(start)}–{_fmt(prev)} (+15m)")
|
||
avg_p = sum(prices) / max(1, len(prices))
|
||
else:
|
||
kwh, avg_p = 0.0, 0.0
|
||
|
||
soc = row["soc_at_connect_pct"]
|
||
tgt = row["target_soc_pct"]
|
||
cap = float(row["battery_capacity_kwh"] or 0)
|
||
need = max(0.0, (float(tgt or 0) - float(soc or 0)) / 100.0 * cap)
|
||
dl = row["target_deadline"]
|
||
lines = [
|
||
f"🔌 **{row['vehicle_name'] or charger_code} připojeno**",
|
||
f"Baterie auta: **{soc or '?'} %** → cíl {tgt or '?'} %"
|
||
+ (f" (~{need:.0f} kWh)" if need else ""),
|
||
]
|
||
if dl is not None:
|
||
lines.append(f"Deadline: {dl.astimezone(_PRAGUE_TZ_NOTIFY).strftime('%a %d.%m. %H:%M')}")
|
||
if windows:
|
||
lines.append(
|
||
f"Plán nabíjení: {'; '.join(windows[:4])} — {kwh:.1f} kWh, ø {avg_p:.2f} Kč/kWh"
|
||
)
|
||
else:
|
||
lines.append("Plán nabíjení: zatím žádné sloty (čeká na levné okno / PV)")
|
||
await send_discord(conn, site_id, "\n".join(lines), level="info")
|
||
|
||
|
||
async def _patch_session_from_tesla(
|
||
site_id: int, charger_code: str, conn: asyncpg.Connection
|
||
) -> None:
|
||
"""Po příjezdu: pro vozidlo s api_type='tesla' doplnit reálné SoC do session.
|
||
|
||
Energie se NEpočítá tady — soc_at_connect_pct + target_soc_pct si přebere
|
||
fn_planning_site_context (SQL-first). VIN se při prvním úspěchu naučí.
|
||
"""
|
||
from app.db_json import fetch_json
|
||
from services.tesla_client import get_charge_state
|
||
|
||
ctx = await fetch_json(
|
||
conn,
|
||
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
|
||
site_id,
|
||
charger_code,
|
||
)
|
||
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
|
||
return
|
||
session_id = ctx.get("session_id")
|
||
if session_id is None:
|
||
logger.warning("Tesla hook: chybí otevřená session (%s)", charger_code)
|
||
return
|
||
|
||
state = await get_charge_state(conn, ctx.get("vin"))
|
||
if state is None:
|
||
return
|
||
|
||
await conn.execute(
|
||
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
|
||
site_id,
|
||
int(ctx["vehicle_id"]),
|
||
state.get("odometer_km"),
|
||
float(state["battery_level"]),
|
||
state.get("charging_state"),
|
||
)
|
||
|
||
if not ctx.get("vin") and state.get("vin"):
|
||
await conn.execute(
|
||
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
|
||
int(ctx["vehicle_id"]),
|
||
str(state["vin"]),
|
||
)
|
||
|
||
patch: dict = {"soc_at_connect_pct": state["battery_level"]}
|
||
if state.get("charge_limit_soc"):
|
||
patch["target_soc_pct"] = state["charge_limit_soc"]
|
||
import json as _json
|
||
await fetch_json(
|
||
conn,
|
||
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
|
||
site_id,
|
||
int(session_id),
|
||
_json.dumps(patch),
|
||
)
|
||
logger.info(
|
||
"Tesla SoC -> session %s: level=%s%%, limit=%s%% (%s)",
|
||
session_id,
|
||
state["battery_level"],
|
||
state.get("charge_limit_soc"),
|
||
charger_code,
|
||
)
|
||
|
||
# Deye SUN – holding registry (decimal adresa = přímo pro read_holding_registers)
|
||
DEYE_REG_RUN_STATE = 500
|
||
DEYE_REG_BATT_CHARGE_TODAY = 514
|
||
DEYE_REG_BATT_DISCHARGE_TODAY = 515
|
||
DEYE_REG_BATTERY_SOC = 588
|
||
DEYE_REG_BATTERY_POWER_FLOW = 590
|
||
DEYE_REG_GRID_TOTAL_POWER = 625
|
||
DEYE_REG_GEN_PORT_POWER = 667
|
||
DEYE_REG_LOAD_TOTAL_POWER = 653
|
||
DEYE_REG_GRID_IMPORT_TOTAL_LO = 522
|
||
DEYE_REG_GRID_IMPORT_TOTAL_HI = 523
|
||
DEYE_REG_GRID_EXPORT_TOTAL_LO = 524
|
||
DEYE_REG_GRID_EXPORT_TOTAL_HI = 525
|
||
DEYE_REG_PV1_POWER = 672
|
||
DEYE_REG_PV2_POWER = 673
|
||
# Solar sell (0 = přebytek řiditelné FVE nesmí do sítě) a GEN/MI cut-off (reg178 bits0–1 == 3 → cut-off ON).
|
||
# Pozn.: v některých manuálech/UI se uvádí "register 179" (1-based), ale Modbus adresa je 178 (0-based).
|
||
# Viz modbus-registers.md.
|
||
DEYE_REG_SOLAR_SELL = 145
|
||
DEYE_REG_CONTROL_BOARD_SPECIAL1 = 178
|
||
|
||
# Teltonika TeltoCharge – holding registry (oficiální „Modbus RTU Communication
|
||
# protocol" rev 0.5, 2024-07-23; přes Waveshare RS485→TCP, FC 3 čtení).
|
||
# Blok 0–40: 0–2 napětí L1–L3 (V), 3–5 proud L1–L3 (×10 A), 6 EVSE status (DLM),
|
||
# 27 charge point state, 33 IEC61851, 34/35 warning/error bity,
|
||
# 38 okamžitý výkon (W), 39 energie session (kWh×100), 40 trvání session (s).
|
||
# Zápisové (řízení, zatím nepoužité): 15 Amps to use (0=stop, 6–32), 16 start/stop.
|
||
TELTO_REG_BLOCK_START = 0
|
||
TELTO_REG_BLOCK_COUNT = 41
|
||
|
||
#: EVSE status (reg 6) → interní stav; session detekce stojí na 'available' vs ≠'available'
|
||
#: (fn_ev_session_transition), proto každý stav s připojeným EV musí být ≠ 'available'.
|
||
TELTO_STATUS_MAP = {
|
||
0: "charging", # C – nabíjí
|
||
1: "preparing", # B1 – EV připojeno, čeká na EV
|
||
2: "preparing", # B2 – dříve nabíjelo, nedostatek výkonu
|
||
3: "preparing", # B3 – nenabíjelo, nedostatek výkonu
|
||
4: "suspended_ev", # D1 – zastaveno vozidlem
|
||
5: "suspended_evse", # D2 – bez autorizace
|
||
6: "suspended_evse", # D3 – nabíjení nepovoleno
|
||
7: "available", # A – bez EV
|
||
8: "faulted", # F – chyba
|
||
9: "unknown", # E
|
||
}
|
||
|
||
|
||
def parse_teltocharge_frame(regs: list[int]) -> dict[str, object]:
|
||
"""Čistý parser bloku registrů 0–40 TeltoCharge (testovatelné bez Modbus)."""
|
||
if len(regs) < TELTO_REG_BLOCK_COUNT:
|
||
raise ValueError(f"TeltoCharge frame too short: {len(regs)}")
|
||
status = TELTO_STATUS_MAP.get(int(regs[6]), "unknown")
|
||
current_a = max(int(regs[3]), int(regs[4]), int(regs[5])) / 10.0
|
||
return {
|
||
"status": status,
|
||
"power_w": int(regs[38]),
|
||
"session_energy_kwh": int(regs[39]) / 100.0,
|
||
"current_a": current_a,
|
||
"voltage_v": int(regs[0]),
|
||
"warning_bits": int(regs[34]),
|
||
"error_bits": int(regs[35]),
|
||
"evse_status_raw": int(regs[6]),
|
||
"charge_point_state_raw": int(regs[27]),
|
||
}
|
||
|
||
|
||
def aggregate_pv_production_w(pv1_w: int, pv2_w: int, gen_port_w: int) -> int:
|
||
"""
|
||
Okamžitá „výroba FVE“ pro dashboard / audit součtu: Deye registry 672/673/667
|
||
jsou int16 W; záporné hodnoty (např. večer při exportu) nejsou DC výroba.
|
||
"""
|
||
return max(0, int(pv1_w)) + max(0, int(pv2_w)) + max(0, int(gen_port_w))
|
||
|
||
|
||
def _export_limit_flags_from_deye_regs(reg145: int | None, reg179: int | None) -> tuple[bool | None, int | None]:
|
||
"""Odvoď is_export_limited / pv_derating_flags z přečtených holding registrů (NULL = neznámé)."""
|
||
if reg145 is None and reg179 is None:
|
||
return None, None
|
||
flags = 0
|
||
if reg145 is not None and int(reg145) == 0:
|
||
flags |= 1
|
||
if reg179 is not None and (int(reg179) & 3) == 3:
|
||
flags |= 2
|
||
return (flags != 0), flags
|
||
|
||
|
||
async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select inverter_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_inverter_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
inv_id = row["id"]
|
||
code = row["code"]
|
||
host = row["host"]
|
||
port = int(row["port"] or 502)
|
||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||
try:
|
||
client = await get_modbus_client(host, port)
|
||
async with client.batch(unit_id) as mb:
|
||
run_state = await mb.read_register(DEYE_REG_RUN_STATE)
|
||
battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC)
|
||
battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW)
|
||
batt_charge_today = await mb.read_register(DEYE_REG_BATT_CHARGE_TODAY)
|
||
batt_discharge_today = await mb.read_register(DEYE_REG_BATT_DISCHARGE_TODAY)
|
||
grid_power = await mb.read_register_signed(DEYE_REG_GRID_TOTAL_POWER)
|
||
load_power = await mb.read_register_signed(DEYE_REG_LOAD_TOTAL_POWER)
|
||
pv1_power = await mb.read_register_signed(DEYE_REG_PV1_POWER)
|
||
pv2_power = await mb.read_register_signed(DEYE_REG_PV2_POWER)
|
||
gen_port_power = await mb.read_register_signed(DEYE_REG_GEN_PORT_POWER)
|
||
grid_energy_regs = await mb.read_holding_registers(
|
||
DEYE_REG_GRID_IMPORT_TOTAL_LO, 4
|
||
)
|
||
reg145 = await mb.read_register(DEYE_REG_SOLAR_SELL)
|
||
reg179 = await mb.read_register(DEYE_REG_CONTROL_BOARD_SPECIAL1)
|
||
pv_power_w = aggregate_pv_production_w(pv1_power, pv2_power, gen_port_power)
|
||
grid_import_total_wh = (grid_energy_regs[1] << 16 | grid_energy_regs[0]) * 100
|
||
grid_export_total_wh = (grid_energy_regs[3] << 16 | grid_energy_regs[2]) * 100
|
||
is_export_limited, pv_derating_flags = _export_limit_flags_from_deye_regs(reg145, reg179)
|
||
|
||
logger.debug("inverter:%s Deye run_state raw=%s", code, run_state)
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_inverter_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::int, $6::int, $7::int, $8::float8, $9::int, $10::int, $11::int, $12::int, $13::int, $14::bigint, $15::bigint, $16::int, $17::boolean, $18::int)",
|
||
site_id,
|
||
inv_id,
|
||
measured_at,
|
||
pv_power_w,
|
||
pv1_power,
|
||
pv2_power,
|
||
gen_port_power,
|
||
float(battery_soc),
|
||
battery_power,
|
||
batt_charge_today,
|
||
batt_discharge_today,
|
||
grid_power,
|
||
load_power,
|
||
grid_import_total_wh,
|
||
grid_export_total_wh,
|
||
run_state,
|
||
is_export_limited,
|
||
pv_derating_flags,
|
||
)
|
||
inv_temp: float | None = None
|
||
await manager.broadcast_telemetry(
|
||
{
|
||
"type": "telemetry",
|
||
"site_id": site_id,
|
||
"ts": datetime.now(timezone.utc).isoformat(),
|
||
"pv_power_w": pv_power_w,
|
||
"battery_soc_pct": float(battery_soc),
|
||
"battery_power_w": battery_power,
|
||
"grid_power_w": grid_power,
|
||
"load_power_w": load_power,
|
||
"gen_port_power_w": gen_port_power,
|
||
"inverter_temp_c": inv_temp,
|
||
"is_export_limited": is_export_limited,
|
||
"pv_derating_flags": pv_derating_flags,
|
||
}
|
||
)
|
||
except Exception as e:
|
||
logger.error("poll_inverter site=%s inverter=%s: %s", site_id, code, e)
|
||
|
||
|
||
async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select charger_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_ev_charger_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
connector_id = 1
|
||
for row in rows:
|
||
code = row["code"]
|
||
charger_id = row["id"]
|
||
host = row["host"]
|
||
port = int(row["port"] or 502)
|
||
unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1)
|
||
|
||
try:
|
||
client = await get_modbus_client(host, port)
|
||
async with client.batch(unit_id) as mb:
|
||
regs = await mb.read_holding_registers(
|
||
TELTO_REG_BLOCK_START, TELTO_REG_BLOCK_COUNT
|
||
)
|
||
frame = parse_teltocharge_frame(regs)
|
||
except Exception as e:
|
||
# Při výpadku čtení NIC nezapisovat — fabrikovaný 'available' by
|
||
# falešně ukončoval EV session a špinil bazál (power 0).
|
||
logger.warning("EV charger %s (%s:%s) read failed: %s", code, host, port, e)
|
||
continue
|
||
|
||
current_status = str(frame["status"])
|
||
if frame["error_bits"]:
|
||
logger.warning(
|
||
"EV charger %s error bits=0x%04x warning=0x%04x",
|
||
code, frame["error_bits"], frame["warning_bits"],
|
||
)
|
||
|
||
previous_status = await db.fetchval(
|
||
"""
|
||
select status
|
||
from ems.telemetry_ev_charger
|
||
where charger_id = $1 and connector_id = $2
|
||
order by measured_at desc
|
||
limit 1
|
||
""",
|
||
charger_id,
|
||
connector_id,
|
||
)
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_ev_charger_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::text, $6::int, $7::float8, $8::float8)",
|
||
site_id,
|
||
charger_id,
|
||
measured_at,
|
||
connector_id,
|
||
current_status,
|
||
int(frame["power_w"]),
|
||
float(frame["session_energy_kwh"]),
|
||
float(frame["current_a"]),
|
||
)
|
||
|
||
if previous_status is not None:
|
||
await db.fetchval(
|
||
"select ems.fn_ev_session_transition($1::int, $2::int, $3::text, $4::text, $5::timestamptz)",
|
||
site_id,
|
||
charger_id,
|
||
str(previous_status),
|
||
current_status,
|
||
measured_at,
|
||
)
|
||
if previous_status == "available" and current_status != "available":
|
||
logger.info("EV arrival detected on charger %s", code)
|
||
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
|
||
elif previous_status != "available" and current_status == "available":
|
||
logger.info("EV departure detected on charger %s", code)
|
||
asyncio.create_task(_on_ev_departure(site_id, str(code)))
|
||
|
||
|
||
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:
|
||
rows = await db.fetch(
|
||
"""
|
||
select heat_pump_id as id, code, host, port, unit_id
|
||
from ems.vw_asset_heat_pump_modbus_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
code = row["code"]
|
||
logger.info("TODO: heat pump Modbus registry pending (heat_pump=%s)", code)
|
||
await db.execute(
|
||
"select ems.fn_telemetry_heat_pump_sample($1::int, $2::int, $3::timestamptz, $4::int, $5::float8, $6::float8, $7::float8, $8::text)",
|
||
site_id,
|
||
row["id"],
|
||
measured_at,
|
||
0,
|
||
10.0,
|
||
45.0,
|
||
55.0,
|
||
"standby",
|
||
)
|
||
|
||
|
||
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
|
||
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
|
||
|
||
Poll probíhá sekvenčně — jedno asyncpg spojení nesmí obsluhovat paralelní dotazy.
|
||
"""
|
||
loop = asyncio.get_running_loop()
|
||
start = loop.time()
|
||
sites = await conn.fetch(
|
||
"select id from ems.vw_site_directory where active = true"
|
||
)
|
||
for site in sites:
|
||
sid = site["id"]
|
||
try:
|
||
await poll_inverter(sid, conn)
|
||
await poll_ev_chargers(sid, conn)
|
||
await poll_heat_pump(sid, conn)
|
||
await poll_pool_pumps(sid, conn)
|
||
except Exception as e:
|
||
logger.error("Telemetry loop error site %s: %s", sid, e)
|
||
return loop.time() - start
|
||
|
||
|
||
async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None:
|
||
"""Background task: každá iterace získá spojení z poolu; neblokuje pool během sleep."""
|
||
global _BG_POOL
|
||
_BG_POOL = pool
|
||
while True:
|
||
try:
|
||
async with pool.acquire() as conn:
|
||
elapsed = await run_telemetry_loop(conn)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception("Telemetry wrapper DB error: %s", e)
|
||
elapsed = 0.0
|
||
await asyncio.sleep(5)
|
||
continue
|
||
|
||
if elapsed > 50:
|
||
logger.warning("Telemetry loop took %.1fs (>50s)", elapsed)
|
||
await asyncio.sleep(max(0.0, 60.0 - elapsed))
|
||
|
||
|
||
async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None:
|
||
"""Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s.
|
||
|
||
Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení.
|
||
"""
|
||
# Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main).
|
||
from services.shelly_client import get_switch_status, shelly_base_url
|
||
|
||
rows = await db.fetch(
|
||
"""
|
||
select pump_id as id, code, shelly_switch_id, protocol, host, port
|
||
from ems.vw_asset_pool_pump_http_poll
|
||
where site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
measured_at = datetime.now(timezone.utc)
|
||
for row in rows:
|
||
code = row["code"]
|
||
base = shelly_base_url(row["protocol"], row["host"], row["port"])
|
||
try:
|
||
status = await get_switch_status(base, int(row["shelly_switch_id"] or 0))
|
||
except Exception as e:
|
||
# Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila
|
||
# historii spotřeby (stejný princip jako u EV nabíječek výše).
|
||
logger.warning("pool pump %s (%s) read failed: %s", code, base, e)
|
||
continue
|
||
|
||
await db.execute(
|
||
"select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)",
|
||
site_id,
|
||
row["id"],
|
||
measured_at,
|
||
status.output,
|
||
int(round(status.apower_w)) if status.apower_w is not None else None,
|
||
int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None,
|
||
)
|