EV spotřební forecast: týdenní rytmus vozidla → target SoC a deadline session
All checks were successful
CI and deploy / migration-check (push) Successful in 19s
CI and deploy / deploy (push) Successful in 56s

Myšlenka uživatele: pondělní služebka ~150 km (~35 kWh) chce skoro plnou,
konec týdne stačí míň, víkend = levné sloty na přípravu pondělka.

- V089: ev_vehicle_obs (odometer+SoC při příjezdu/ODJEZDU — auto v obou
  okamžicích vzhůru, žádné buzení navíc), ev_trip (km z odometru, kWh z ΔSoC;
  nabíjení cestou → charged_away flag), ev_usage_stats per (vozidlo, DOW);
  asset_vehicle: target_soc_forecast_enabled (default false), min_target_soc_pct
- R__096: fn_ev_build_trips (párování), fn_update_ev_usage_stats (job 00:50),
  fn_ev_next_departure (příští typický odjezd, >=4 vzorky, >=3 km),
  fn_ev_required_soc (P80 spotřeby dne + 10 p.b., clamp [min_target, 100])
- R__016: session při příjezdu bere forecast target+deadline (za per-vozidlo
  flagem, fallback defaulty, ruční patch vždy vyhrává) → víkendová session
  s pondělním deadline = v2 solver přirozeně nabije v levných slotech
- tesla_client: + vehicle_state endpoint (odometer v MÍLÍCH → km), collector:
  departure hook, lifespan: job 00:50

Aktivace po nasbírání dat: update asset_vehicle set target_soc_forecast_enabled=true.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-12 09:06:10 +02:00
parent 002566ae5f
commit 4095f0f912
7 changed files with 386 additions and 11 deletions

View File

@@ -257,6 +257,14 @@ async def lifespan(app: FastAPI):
"scheduled_tuv_usage_stats site=%s failed", site["id"]
)
async def scheduled_ev_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
n = await conn.fetchval("select ems.fn_update_ev_usage_stats(60)")
logger.info("ev_usage_stats updated %s rows", n)
except Exception:
logger.exception("scheduled_ev_usage_stats failed")
async def scheduled_forecast_refresh() -> None:
async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn):
@@ -423,6 +431,14 @@ async def lifespan(app: FastAPI):
id="tuv_usage_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_ev_usage_stats,
"cron",
hour=0,
minute=50,
id="ev_usage_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",

View File

@@ -55,6 +55,49 @@ async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
)
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
"""
if _BG_POOL is None:
return
try:
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
async with _BG_POOL.acquire() as conn:
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
logger.info(
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
site_id, charger_code,
state["battery_level"], state.get("odometer_km"),
)
except Exception:
logger.exception(
"EV departure obs failed (site=%s, %s)", site_id, charger_code
)
async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:
@@ -83,6 +126,15 @@ async def _patch_session_from_tesla(
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
if not ctx.get("vin") and state.get("vin"):
await conn.execute(
"select ems.fn_vehicle_set_vin($1::int, $2::text)",
@@ -353,6 +405,7 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
asyncio.create_task(_on_ev_arrival(site_id, str(code)))
elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code)
asyncio.create_task(_on_ev_departure(site_id, str(code)))
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:

View File

@@ -32,18 +32,27 @@ HTTP_TIMEOUT_S = 15.0
ACCESS_EXPIRY_MARGIN_S = 120
MILES_TO_KM = 1.609344
def parse_charge_state(vehicle_data: dict[str, Any]) -> dict[str, Any] | None:
"""Čistý parser odpovědi vehicle_data → {battery_level, charge_limit_soc, charging_state}."""
"""Parser vehicle_data → battery_level, charge_limit_soc, charging_state, odometer_km.
POZOR: Tesla API vrací odometer v MÍLÍCH → převod na km.
"""
resp = vehicle_data.get("response") or {}
cs = resp.get("charge_state") or {}
vs = resp.get("vehicle_state") or {}
level = cs.get("battery_level")
if level is None:
return None
odo_miles = vs.get("odometer")
return {
"vin": resp.get("vin"),
"battery_level": int(level),
"charge_limit_soc": int(cs.get("charge_limit_soc") or 0) or None,
"charging_state": cs.get("charging_state"),
"odometer_km": round(float(odo_miles) * MILES_TO_KM, 1) if odo_miles is not None else None,
}
@@ -135,7 +144,7 @@ async def get_charge_state(
r = await client.get(
f"{API_BASE}/api/1/vehicles/{chosen['id']}/vehicle_data",
params={"endpoints": "charge_state"},
params={"endpoints": "charge_state;vehicle_state"},
)
if r.status_code == 408:
logger.info("Tesla: vozidlo spí / nedostupné (408) — SoC nedoplněno")

View File

@@ -28,6 +28,20 @@ class ParseChargeStateTests(unittest.TestCase):
self.assertIsNone(parse_charge_state({"response": {"charge_state": {}}}))
self.assertIsNone(parse_charge_state({}))
def test_odometer_miles_to_km(self) -> None:
data = {
"response": {
"charge_state": {"battery_level": 60},
"vehicle_state": {"odometer": 12345.6}, # míle!
}
}
out = parse_charge_state(data)
self.assertAlmostEqual(out["odometer_km"], 19868.3, places=1)
def test_missing_odometer_is_none(self) -> None:
data = {"response": {"charge_state": {"battery_level": 60}}}
self.assertIsNone(parse_charge_state(data)["odometer_km"])
def test_zero_limit_normalized_to_none(self) -> None:
data = {"response": {"charge_state": {"battery_level": 10, "charge_limit_soc": 0}}}
self.assertIsNone(parse_charge_state(data)["charge_limit_soc"])