Hotfix merge: DB výkon (delta cache throttle) + presence watcher + Tesla zákopy
All checks were successful
CI and deploy / migration-check (push) Successful in 26s
CI and deploy / deploy (push) Has been skipped

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-12 14:25:32 +02:00
8 changed files with 292 additions and 3 deletions

View File

@@ -108,7 +108,7 @@ async def lifespan(app: FastAPI):
for site in await _active_site_rows(conn):
try:
n = await conn.fetchval(
"SELECT ems.fn_fill_forecast_accuracy($1, 48)",
"SELECT ems.fn_fill_forecast_accuracy($1, 3)",
site["id"],
)
if n:
@@ -257,6 +257,19 @@ async def lifespan(app: FastAPI):
"scheduled_tuv_usage_stats site=%s failed", site["id"]
)
async def scheduled_forecast_accuracy_catchup() -> None:
"""Denní 48h catch-up (pozdní telemetrie) — 15min tick jede jen 3 h okno."""
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
try:
await conn.fetchval(
"SELECT ems.fn_fill_forecast_accuracy($1, 48)", site["id"]
)
except Exception:
logger.exception(
"forecast_accuracy catchup site=%s failed", site["id"]
)
async def scheduled_ev_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
@@ -431,6 +444,14 @@ async def lifespan(app: FastAPI):
id="tuv_usage_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_forecast_accuracy_catchup,
"cron",
hour=5,
minute=50,
id="forecast_accuracy_catchup",
replace_existing=True,
)
scheduler.add_job(
scheduled_ev_usage_stats,
"cron",

View File

@@ -499,6 +499,147 @@ async def poll_loxone_sensors(site_id: int, db: asyncpg.Connection) -> None:
)
#: presence poll pacing (sekundy) a geofence poloměr (m).
#: Fleet API je placené (vehicle_data $0.002/req, wake $0.02 — wake NIKDY):
#: list 10 min; vehicle_data jen při přechodu asleep→online a pak max 1×/15 min;
#: při otevřené ev_session se nepolluje vůbec (auto je u wallboxu = doma,
#: a při AC nabíjení nespí — bez gatu by data cally tekly celou noc).
EV_PRESENCE_POLL_S = 600
EV_PRESENCE_DATA_MIN_S = 900
EV_PRESENCE_HOME_RADIUS_M = 150
#: anti-spam "píchni auto": min. rozestup notifikací per vozidlo (s)
EV_PLUG_NUDGE_COOLDOWN_S = 2 * 3600
_EV_PRESENCE_LAST_POLL: dict[int, float] = {}
_EV_PRESENCE_LAST_DATA: dict[int, float] = {}
_EV_PRESENCE_LAST_STATE: dict[int, str] = {}
_EV_PLUG_NUDGE_LAST: dict[int, float] = {}
def ev_presence_transition(prev_at_home: bool | None, new_at_home: bool | None) -> str | None:
"""Čistá detekce přechodu: 'arrived' / 'left' / None (testovatelné)."""
if new_at_home is None or prev_at_home is None:
return None
if not prev_at_home and new_at_home:
return "arrived"
if prev_at_home and not new_at_home:
return "left"
return None
async def poll_tesla_presence(site_id: int, db: asyncpg.Connection) -> None:
"""Přítomnost vozidla: /vehicles state (nebudí) + při online poloha → geofence.
Přechod pryč→doma + nepíchnuté → Discord pobídka (s aktuálním přebytkem).
Vše se loguje do ev_presence_obs (budoucí dostupnostní statistika).
"""
loop_now = asyncio.get_running_loop().time()
if loop_now - _EV_PRESENCE_LAST_POLL.get(site_id, 0.0) < EV_PRESENCE_POLL_S:
return
_EV_PRESENCE_LAST_POLL[site_id] = loop_now
veh = await db.fetchrow(
"""
select v.id, v.vin, v.name, s.latitude, s.longitude
from ems.asset_vehicle v join ems.site s on s.id = v.site_id
where v.site_id = $1 and v.api_type = 'tesla' and v.active
order by v.id limit 1
""",
site_id,
)
if veh is None or veh["latitude"] is None:
return
# auto na wallboxu (otevřená session) = doma; žádné API cally (šetří $)
plugged = await db.fetchval(
"""
select exists(
select 1 from ems.ev_session es
where es.vehicle_id = $1 and es.session_end is null
)
""",
int(veh["id"]),
)
if plugged:
return
from services.tesla_client import get_charge_state, get_vehicle_api_state, haversine_m
try:
api_state = await get_vehicle_api_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: state poll failed: %s", e)
return
if api_state is None:
return
prev_state = _EV_PRESENCE_LAST_STATE.get(int(veh["id"]))
_EV_PRESENCE_LAST_STATE[int(veh["id"])] = api_state
woke_up = api_state == "online" and prev_state != "online"
data_due = (
loop_now - _EV_PRESENCE_LAST_DATA.get(int(veh["id"]), 0.0)
>= EV_PRESENCE_DATA_MIN_S
)
at_home = None
distance_m = None
charging_state = None
shift_state = None
if api_state == "online" and (woke_up or data_due):
_EV_PRESENCE_LAST_DATA[int(veh["id"])] = loop_now
try:
st = await get_charge_state(db, veh["vin"])
except Exception as e:
logger.warning("Tesla presence: data read failed: %s", e)
st = None
if st is not None and st.get("latitude") is not None:
distance_m = int(
haversine_m(
float(st["latitude"]), float(st["longitude"]),
float(veh["latitude"]), float(veh["longitude"]),
)
)
at_home = distance_m <= EV_PRESENCE_HOME_RADIUS_M
charging_state = st.get("charging_state")
shift_state = st.get("shift_state")
prev = await db.fetchrow(
"""
select at_home from ems.ev_presence_obs
where vehicle_id = $1 and at_home is not null
order by observed_at desc limit 1
""",
int(veh["id"]),
)
await db.execute(
"""
insert into ems.ev_presence_obs
(vehicle_id, api_state, at_home, distance_m, charging_state, shift_state)
values ($1, $2, $3, $4, $5, $6)
""",
int(veh["id"]), api_state, at_home, distance_m, charging_state, shift_state,
)
trans = ev_presence_transition(prev["at_home"] if prev else None, at_home)
if trans == "arrived" and charging_state == "Disconnected":
if loop_now - _EV_PLUG_NUDGE_LAST.get(int(veh["id"]), 0.0) < EV_PLUG_NUDGE_COOLDOWN_S:
return
_EV_PLUG_NUDGE_LAST[int(veh["id"])] = loop_now
grid_w = await db.fetchval(
"select grid_power_w from ems.vw_latest_inverter where site_id = $1 limit 1",
site_id,
)
surplus = f" — právě teče {abs(int(grid_w))/1000:.1f} kW do sítě" if grid_w and grid_w < -500 else ""
from services.notification_service import send_discord
await send_discord(
db, site_id,
f"🚗 **{veh['name'] or 'EV'} je doma a nepíchnuté**{surplus}.\n"
f"Píchni ho a plán se o zbytek postará (přebytky / levné sloty).",
level="info",
)
logger.info("EV plug nudge sent (site=%s, vehicle=%s)", site_id, veh["id"])
async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
"""Jeden průchod smyčky; vrátí uplynulý čas v sekundách (pro sleep).
@@ -516,6 +657,7 @@ async def run_telemetry_loop(conn: asyncpg.Connection) -> float:
await poll_ev_chargers(sid, conn)
await poll_heat_pump(sid, conn)
await poll_loxone_sensors(sid, conn)
await poll_tesla_presence(sid, conn)
await poll_pool_pumps(sid, conn)
except Exception as e:
logger.error("Telemetry loop error site %s: %s", sid, e)

View File

@@ -166,3 +166,33 @@ async def get_charge_state(
return None
r.raise_for_status()
return parse_charge_state(r.json())
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Vzdálenost dvou GPS bodů v metrech (čisté, testovatelné)."""
import math
r = 6_371_000.0
p1, p2 = math.radians(lat1), math.radians(lat2)
dp = math.radians(lat2 - lat1)
dl = math.radians(lon2 - lon1)
a = math.sin(dp / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2
return 2 * r * math.asin(math.sqrt(a))
async def get_vehicle_api_state(db: asyncpg.Connection, vin: str | None) -> str | None:
"""Jen state z /vehicles (online/asleep/offline) — NIKDY nebudí auto."""
token = await _get_access_token(db)
if token is None:
return None
async with httpx.AsyncClient(
timeout=HTTP_TIMEOUT_S, headers={"Authorization": f"Bearer {token}"}
) as client:
r = await client.get(f"{API_BASE}/api/1/vehicles")
r.raise_for_status()
vehicles = r.json().get("response") or []
if vin:
v = next((x for x in vehicles if x.get("vin") == vin), None)
else:
v = vehicles[0] if len(vehicles) == 1 else None
return str(v["state"]) if v else None

View File

@@ -0,0 +1,41 @@
"""EV presence — čisté helpery (haversine, přechody)."""
from __future__ import annotations
import unittest
from services.telemetry_collector import ev_presence_transition
from services.tesla_client import haversine_m
class HaversineTests(unittest.TestCase):
def test_zero_distance(self) -> None:
self.assertAlmostEqual(haversine_m(49.2445, 17.4070, 49.2445, 17.4070), 0.0, places=2)
def test_known_distance(self) -> None:
# ~111 km na 1° zeměpisné šířky
d = haversine_m(49.0, 17.0, 50.0, 17.0)
self.assertAlmostEqual(d, 111_195, delta=300)
def test_geofence_scale(self) -> None:
# ~100 m posun (0.0009° lat)
d = haversine_m(49.24457, 17.407054, 49.24547, 17.407054)
self.assertTrue(80 < d < 120, d)
class TransitionTests(unittest.TestCase):
def test_arrived(self) -> None:
self.assertEqual(ev_presence_transition(False, True), "arrived")
def test_left(self) -> None:
self.assertEqual(ev_presence_transition(True, False), "left")
def test_none_cases(self) -> None:
self.assertIsNone(ev_presence_transition(None, True))
self.assertIsNone(ev_presence_transition(True, None))
self.assertIsNone(ev_presence_transition(True, True))
self.assertIsNone(ev_presence_transition(False, False))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,22 @@
-- Presence vozidla (Tesla location scope): kde auto je, kdy bývá doma.
-- Zdroj: levný poll /vehicles (state, NEbudí) + při online location_data.
-- Účel: (a) notifikace "auto doma a nepíchlé + svítí přebytek → píchni ho",
-- (b) dostupnostní statistika per DOW×hodina pro plánovač (maska ev_connected
-- a zreálnění oportunistické hodnoty) — follow-up nad těmito daty.
create table ems.ev_presence_obs (
id bigserial primary key,
vehicle_id int not null references ems.asset_vehicle (id),
observed_at timestamptz not null default now(),
api_state text, -- online / asleep / offline (z /vehicles, bez buzení)
at_home boolean, -- null = poloha neznámá (asleep)
distance_m int,
charging_state text, -- Disconnected / Stopped / Charging…
shift_state text
);
create index idx_ev_presence_obs_vehicle_time
on ems.ev_presence_obs (vehicle_id, observed_at desc);
comment on table ems.ev_presence_obs is
'Pozorování přítomnosti vozidla (geofence vs GPS site). Poll ~5 min, poloha jen když je auto vzhůru (nebudí). Vstup pro "píchni auto" notifikace a budoucí dostupnostní statistiku.';

View File

@@ -1,13 +1,29 @@
-- Cache delta profilu PV (těžká agregace forecast_accuracy) — refresh po fn_fill_forecast_accuracy.
-- Prefix R__018: musí běžet před R__022 (volá fn_refresh_site_pv_delta_profile_cache).
create or replace function ems.fn_refresh_site_pv_delta_profile_cache(p_site_id int)
drop function if exists ems.fn_refresh_site_pv_delta_profile_cache(int);
create or replace function ems.fn_refresh_site_pv_delta_profile_cache(
p_site_id int,
p_force boolean default false
)
returns void
language plpgsql
as $fn$
declare
v_profile jsonb;
begin
-- VÝKON: agregace 120 dní nad forecast_accuracy trvá ~44 s/site na prod —
-- po 15 min ji přepočítávat dusilo celou DB (timeouty API). Profil má 14d
-- poločas, denní granularitu — čerstvost 6 h bohatě stačí.
if not p_force and exists (
select 1 from ems.site_pv_forecast_calibration c
where c.site_id = p_site_id
and c.delta_profile_cached_at > now() - interval '6 hours'
) then
return;
end if;
v_profile := ems.fn_pv_forecast_delta_profile(
p_site_id,
now() - interval '120 days',

View File

@@ -60,7 +60,7 @@ async def m():
print("location scope:", "OK" if ds.get("latitude") is not None
else f"CHYBI (HTTP {r2.status_code})")
else:
print("auto spí — refresh OK, location doověřit po jízdě")
print("PRÁZDNÝ SEZNAM vozidel — chybí partner registrace pro tento client_id (docs §3); pozn.: public key hash je vázaný na app — po smazání app nutná rotace klíče (mv private.pem + setup_tesla_domain.sh)") if v is None else print("auto spí — refresh OK, location doověřit po jízdě")
finally:
await c.close()
asyncio.run(m())

View File

@@ -101,3 +101,20 @@ curl -s https://fleet-auth.prd.vn.cloud.tesla.com/oauth2/v3/token \
/opt/ems-deploy/.env up -d backend` (recreate kvůli env)
- [ ] ověření: po příjezdu Tesly log `Tesla SoC -> session …` +
`select soc_at_connect_pct, target_soc_pct from ems.ev_session order by id desc limit 1`
## Presence watcher (2026-06-12, dev)
Poll ~5 min: `GET /vehicles` (state, NEBUDÍ) → při `online` poloha
(`location_data`, vyžaduje scope `vehicle_location`) → geofence 150 m vs GPS
site → `ems.ev_presence_obs` (V095). Přechod pryč→doma + `Disconnected`
Discord pobídka „auto doma a nepíchnuté (+aktuální přebytek)“; cooldown 2 h.
Data = základ dostupnostní statistiky per DOW×hodina (follow-up: maska
ev_connected v plánovači + zreálnění oportunistické hodnoty).
### Zákopy z reauth (12. 6.) — ať se neopakují
1. redirect URI `/t-auth` (musí sedět všude), 2. refresh token ROTUJE →
provozní hodnota v `ems.tesla_token` (ne .env), 3. po revokaci souhlasu ~10 min
výpadek auth, 4. `client_not_found` = app smazána/nové ID → opravit .env +
recreate, 5. **public key hash je vázán na app** — po smazání app nutná rotace
klíče (`mv private.pem .old` + `setup_tesla_domain.sh`) před partner registrací,
6. prázdný seznam vozidel = chybí partner registrace (ne spánek).