feat(ev): geofence arrival trigger (default-off)

ev_vehicle_obs.trigger += 'geofence_arrival' (V109); presence cesta zapíše příjezd
i bez píchnutí (za flagem EV_GEOFENCE_ARRIVAL_OBS_ENABLED, default OFF); fn_ev_build_trips
páruje. Constraint name ověřen živě. Worktree agent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-14 22:55:17 +02:00
parent a32839bf67
commit fc6d9833a7
3 changed files with 115 additions and 1 deletions

View File

@@ -760,6 +760,27 @@ _EV_PRESENCE_LAST_DATA: dict[int, float] = {}
_EV_PRESENCE_LAST_STATE: dict[int, str] = {}
_EV_PLUG_NUDGE_LAST: dict[int, float] = {}
#: Geofence arrival obs (trigger='geofence_arrival') — příjezd domů BEZ píchnutí
#: do wallboxu. DEFAULT VYPNUTO (env EV_GEOFENCE_ARRIVAL_OBS_ENABLED=true zapne);
#: vypnuté = funkce běží jako dřív, jen se nový obs nezapisuje (golden gate /
#: plánovač beze změny). Debounce: vyžaduje N po sobě jdoucích čtení at_home=true
#: (GPS jitter u 150m hranice nesmí jeden flip brát jako příjezd). Dedup: emituje
#: jen jednou na epizodu (po emitu se "odzbrojí", znovu se "nabije" až po odjezdu);
#: a vůbec neběží, když je auto na wallboxu (plug-in cesta je autoritativní —
#: poll_tesla_presence se při otevřené session vrací dřív, viz `plugged`).
EV_GEOFENCE_ARRIVAL_CONFIRM_SAMPLES = 2
_EV_GEOFENCE_HOME_STREAK: dict[int, int] = {}
_EV_GEOFENCE_ARMED: dict[int, bool] = {}
def _ev_geofence_obs_enabled() -> bool:
"""Feature flag: zápis geofence_arrival obs (default false → inertní)."""
import os
return (os.getenv("EV_GEOFENCE_ARRIVAL_OBS_ENABLED") or "").strip().lower() in (
"1", "true", "yes", "on",
)
def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None) -> str | None:
"""Čistá detekce přechodu: 'arrived' / 'left' / None (testovatelné)."""
@@ -772,6 +793,41 @@ def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None)
return None
def ev_geofence_arrival_decision(
vehicle_id: int,
at_home: bool | None,
confirm_samples: int = EV_GEOFENCE_ARRIVAL_CONFIRM_SAMPLES,
) -> bool:
"""Debounce + dedup geofence příjezdu (čistá, testovatelná funkce nad stavem).
Vstup `at_home` je výsledek aktuálního geofence čtení (None = poloha neznámá,
např. auto spí → stav se NEMĚNÍ). Vrací True právě jednou za epizodu příjezdu,
a to až po `confirm_samples` po sobě jdoucích čteních at_home=true:
- at_home is None → neznámé, streak ani armed se nemění (žádné rozhodnutí).
- at_home is False → auto je pryč: vynuluj streak, "nabij" (armed=True), aby
příští potvrzený příjezd mohl emitovat.
- at_home is True → inkrementuj streak; pokud streak dosáhl prahu a jsme
armed, "odzbroj" (armed=False) a vrať True (emituj jednou).
Tím se jeden GPS flip u hranice nepočítá jako příjezd a opakovaná at_home=true
čtení během stání doma negenerují duplicitní obs.
"""
if at_home is None:
return False
if at_home is False:
_EV_GEOFENCE_HOME_STREAK[vehicle_id] = 0
_EV_GEOFENCE_ARMED[vehicle_id] = True
return False
# at_home is True
streak = _EV_GEOFENCE_HOME_STREAK.get(vehicle_id, 0) + 1
_EV_GEOFENCE_HOME_STREAK[vehicle_id] = streak
if streak >= confirm_samples and _EV_GEOFENCE_ARMED.get(vehicle_id, False):
_EV_GEOFENCE_ARMED[vehicle_id] = False
return True
return False
async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
"""Přítomnost vozidla: /vehicles state (nebudí) + při online poloha → geofence.
@@ -830,6 +886,7 @@ async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
distance_m = None
charging_state = None
shift_state = None
st = None
if api_state == "online" and (woke_up or data_due):
_EV_PRESENCE_LAST_DATA[int(veh["id"])] = loop_now
try:
@@ -865,6 +922,34 @@ async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
int(veh["id"]), api_state, at_home, distance_m, charging_state, shift_state,
)
# Geofence příjezd (auto přijelo domů, NEpíchnuté — sem se dostaneme jen když
# NENÍ otevřená session, viz `plugged` výše: wallbox je autoritativní). Debounce
# + dedup řeší ev_geofence_arrival_decision; zápis je za feature flagem (default
# off → inertní). Zapisuje se z presence readu (st), proto jen když máme st se
# SoC i odometrem, ať jízda (km z odometru) dostane platný arrival.
if _ev_geofence_obs_enabled():
emit = ev_geofence_arrival_decision(int(veh["id"]), at_home)
if emit and st is not None and st.get("battery_level") is not None:
try:
await db.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'geofence_arrival', $3::numeric, $4::numeric, $5::text)",
site_id,
int(veh["id"]),
st.get("odometer_km"),
float(st["battery_level"]),
st.get("charging_state"),
)
logger.info(
"EV geofence arrival obs (site=%s, vehicle=%s): soc=%s%%, odo=%s km",
site_id, veh["id"],
st["battery_level"], st.get("odometer_km"),
)
except Exception:
logger.exception(
"EV geofence arrival obs failed (site=%s, vehicle=%s)",
site_id, veh["id"],
)
trans = ev_presence_transition(prev["at_home"] if prev else None, at_home)
if trans == "arrived" and charging_state == "Disconnected":
if loop_now - _EV_PLUG_NUDGE_LAST.get(int(veh["id"]), 0.0) < EV_PLUG_NUDGE_COOLDOWN_S: