EV spotřební forecast: týdenní rytmus vozidla → target SoC a deadline session
All checks were successful
CI and deploy / migration-check (push) Successful in 19s
CI and deploy / deploy (push) Successful in 56s

Myšlenka uživatele: pondělní služebka ~150 km (~35 kWh) chce skoro plnou,
konec týdne stačí míň, víkend = levné sloty na přípravu pondělka.

- V089: ev_vehicle_obs (odometer+SoC při příjezdu/ODJEZDU — auto v obou
  okamžicích vzhůru, žádné buzení navíc), ev_trip (km z odometru, kWh z ΔSoC;
  nabíjení cestou → charged_away flag), ev_usage_stats per (vozidlo, DOW);
  asset_vehicle: target_soc_forecast_enabled (default false), min_target_soc_pct
- R__096: fn_ev_build_trips (párování), fn_update_ev_usage_stats (job 00:50),
  fn_ev_next_departure (příští typický odjezd, >=4 vzorky, >=3 km),
  fn_ev_required_soc (P80 spotřeby dne + 10 p.b., clamp [min_target, 100])
- R__016: session při příjezdu bere forecast target+deadline (za per-vozidlo
  flagem, fallback defaulty, ruční patch vždy vyhrává) → víkendová session
  s pondělním deadline = v2 solver přirozeně nabije v levných slotech
- tesla_client: + vehicle_state endpoint (odometer v MÍLÍCH → km), collector:
  departure hook, lifespan: job 00:50

Aktivace po nasbírání dat: update asset_vehicle set target_soc_forecast_enabled=true.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-12 09:06:10 +02:00
parent 002566ae5f
commit 4095f0f912
7 changed files with 386 additions and 11 deletions

View File

@@ -257,6 +257,14 @@ async def lifespan(app: FastAPI):
"scheduled_tuv_usage_stats site=%s failed", site["id"] "scheduled_tuv_usage_stats site=%s failed", site["id"]
) )
async def scheduled_ev_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
n = await conn.fetchval("select ems.fn_update_ev_usage_stats(60)")
logger.info("ev_usage_stats updated %s rows", n)
except Exception:
logger.exception("scheduled_ev_usage_stats failed")
async def scheduled_forecast_refresh() -> None: async def scheduled_forecast_refresh() -> None:
async with app.state.pg_pool.acquire() as conn: async with app.state.pg_pool.acquire() as conn:
for site in await _active_site_rows(conn): for site in await _active_site_rows(conn):
@@ -423,6 +431,14 @@ async def lifespan(app: FastAPI):
id="tuv_usage_stats", id="tuv_usage_stats",
replace_existing=True, replace_existing=True,
) )
scheduler.add_job(
scheduled_ev_usage_stats,
"cron",
hour=0,
minute=50,
id="ev_usage_stats",
replace_existing=True,
)
scheduler.add_job( scheduler.add_job(
scheduled_ote_import, scheduled_ote_import,
"cron", "cron",

View File

@@ -55,6 +55,49 @@ async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
) )
async def _on_ev_departure(site_id: int, charger_code: str) -> None:
"""Odjezd: zapsat pozorování (odometer+SoC) — auto právě jede, je vzhůru.
Pár odjezd→příjezd dává jízdu (km, kWh) pro ev_usage_stats. Spící/nečitelné
auto (408) = tiché přeskočení, jízda se dopočítá z příštích pozorování.
"""
if _BG_POOL is None:
return
try:
from app.db_json import fetch_json
from services.tesla_client import get_charge_state
async with _BG_POOL.acquire() as conn:
ctx = await fetch_json(
conn,
"select ems.fn_tesla_arrival_context($1::int, $2::text)",
site_id,
charger_code,
)
if not isinstance(ctx, dict) or ctx.get("api_type") != "tesla":
return
state = await get_charge_state(conn, ctx.get("vin"))
if state is None:
return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'departure', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
logger.info(
"EV departure obs (site=%s, %s): soc=%s%%, odo=%s km",
site_id, charger_code,
state["battery_level"], state.get("odometer_km"),
)
except Exception:
logger.exception(
"EV departure obs failed (site=%s, %s)", site_id, charger_code
)
async def _patch_session_from_tesla( async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None: ) -> None:
@@ -83,6 +126,15 @@ async def _patch_session_from_tesla(
if state is None: if state is None:
return return
await conn.execute(
"select ems.fn_ev_vehicle_obs_insert($1::int, $2::int, 'arrival', $3::numeric, $4::numeric, $5::text)",
site_id,
int(ctx["vehicle_id"]),
state.get("odometer_km"),
float(state["battery_level"]),
state.get("charging_state"),
)
if not ctx.get("vin") and state.get("vin"): if not ctx.get("vin") and state.get("vin"):
await conn.execute( await conn.execute(
"select ems.fn_vehicle_set_vin($1::int, $2::text)", "select ems.fn_vehicle_set_vin($1::int, $2::text)",
@@ -353,6 +405,7 @@ async def poll_ev_chargers(site_id: int, db: asyncpg.Connection) -> None:
asyncio.create_task(_on_ev_arrival(site_id, str(code))) asyncio.create_task(_on_ev_arrival(site_id, str(code)))
elif previous_status != "available" and current_status == "available": elif previous_status != "available" and current_status == "available":
logger.info("EV departure detected on charger %s", code) logger.info("EV departure detected on charger %s", code)
asyncio.create_task(_on_ev_departure(site_id, str(code)))
async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None: async def poll_heat_pump(site_id: int, db: asyncpg.Connection) -> None:

View File

@@ -32,18 +32,27 @@ HTTP_TIMEOUT_S = 15.0
ACCESS_EXPIRY_MARGIN_S = 120 ACCESS_EXPIRY_MARGIN_S = 120
MILES_TO_KM = 1.609344
def parse_charge_state(vehicle_data: dict[str, Any]) -> dict[str, Any] | None: def parse_charge_state(vehicle_data: dict[str, Any]) -> dict[str, Any] | None:
"""Čistý parser odpovědi vehicle_data → {battery_level, charge_limit_soc, charging_state}.""" """Parser vehicle_data → battery_level, charge_limit_soc, charging_state, odometer_km.
POZOR: Tesla API vrací odometer v MÍLÍCH → převod na km.
"""
resp = vehicle_data.get("response") or {} resp = vehicle_data.get("response") or {}
cs = resp.get("charge_state") or {} cs = resp.get("charge_state") or {}
vs = resp.get("vehicle_state") or {}
level = cs.get("battery_level") level = cs.get("battery_level")
if level is None: if level is None:
return None return None
odo_miles = vs.get("odometer")
return { return {
"vin": resp.get("vin"), "vin": resp.get("vin"),
"battery_level": int(level), "battery_level": int(level),
"charge_limit_soc": int(cs.get("charge_limit_soc") or 0) or None, "charge_limit_soc": int(cs.get("charge_limit_soc") or 0) or None,
"charging_state": cs.get("charging_state"), "charging_state": cs.get("charging_state"),
"odometer_km": round(float(odo_miles) * MILES_TO_KM, 1) if odo_miles is not None else None,
} }
@@ -135,7 +144,7 @@ async def get_charge_state(
r = await client.get( r = await client.get(
f"{API_BASE}/api/1/vehicles/{chosen['id']}/vehicle_data", f"{API_BASE}/api/1/vehicles/{chosen['id']}/vehicle_data",
params={"endpoints": "charge_state"}, params={"endpoints": "charge_state;vehicle_state"},
) )
if r.status_code == 408: if r.status_code == 408:
logger.info("Tesla: vozidlo spí / nedostupné (408) — SoC nedoplněno") logger.info("Tesla: vozidlo spí / nedostupné (408) — SoC nedoplněno")

View File

@@ -28,6 +28,20 @@ class ParseChargeStateTests(unittest.TestCase):
self.assertIsNone(parse_charge_state({"response": {"charge_state": {}}})) self.assertIsNone(parse_charge_state({"response": {"charge_state": {}}}))
self.assertIsNone(parse_charge_state({})) self.assertIsNone(parse_charge_state({}))
def test_odometer_miles_to_km(self) -> None:
data = {
"response": {
"charge_state": {"battery_level": 60},
"vehicle_state": {"odometer": 12345.6}, # míle!
}
}
out = parse_charge_state(data)
self.assertAlmostEqual(out["odometer_km"], 19868.3, places=1)
def test_missing_odometer_is_none(self) -> None:
data = {"response": {"charge_state": {"battery_level": 60}}}
self.assertIsNone(parse_charge_state(data)["odometer_km"])
def test_zero_limit_normalized_to_none(self) -> None: def test_zero_limit_normalized_to_none(self) -> None:
data = {"response": {"charge_state": {"battery_level": 10, "charge_limit_soc": 0}}} data = {"response": {"charge_state": {"battery_level": 10, "charge_limit_soc": 0}}}
self.assertIsNone(parse_charge_state(data)["charge_limit_soc"]) self.assertIsNone(parse_charge_state(data)["charge_limit_soc"])

View File

@@ -0,0 +1,63 @@
-- EV spotřební forecast: pozorování (odometer+SoC při příjezdu/odjezdu, auto je
-- vzhůru — žádné buzení navíc), jízdy, statistiky per den v týdnu. Cíl: target
-- SoC a deadline session z reálného týdenního rytmu (pondělí služebka ~150 km
-- → skoro plná; konec týdne míň; víkend = levné sloty na přípravu pondělka).
-- Zapnutí per vozidlo: target_soc_forecast_enabled (default false = sbírá se,
-- session jedou na defaultech).
alter table ems.asset_vehicle
add column if not exists target_soc_forecast_enabled boolean not null default false,
add column if not exists min_target_soc_pct numeric(5,2) not null default 30.0;
comment on column ems.asset_vehicle.target_soc_forecast_enabled is
'true = target SoC + deadline session z ev_usage_stats (fn_ev_required_soc); false = default_target_soc_pct/default_deadline_hour. Forecast vyžaduje >= 4 vzorky pro daný den v týdnu, jinak fallback.';
comment on column ems.asset_vehicle.min_target_soc_pct is
'Komfortní spodní mez forecast targetu (%). Forecast smí jít pod default_target_soc_pct (např. pátek), ne pod tuto mez.';
create table ems.ev_vehicle_obs (
id bigserial primary key,
site_id int not null references ems.site (id),
vehicle_id int not null references ems.asset_vehicle (id),
observed_at timestamptz not null default now(),
trigger text not null check (trigger in ('arrival', 'departure', 'manual')),
odometer_km numeric(10, 1),
soc_pct numeric(5, 2),
charging_state text
);
create index idx_ev_vehicle_obs_vehicle_time
on ems.ev_vehicle_obs (vehicle_id, observed_at desc);
comment on table ems.ev_vehicle_obs is
'Pozorování vozidla z API výrobce (Tesla Fleet) při příjezdu/odjezdu — auto je v těch okamžicích vzhůru, čtení nebudí. Zdroj pro ev_trip.';
create table ems.ev_trip (
id serial primary key,
vehicle_id int not null references ems.asset_vehicle (id),
departure_obs_id bigint not null references ems.ev_vehicle_obs (id),
arrival_obs_id bigint not null references ems.ev_vehicle_obs (id),
departed_at timestamptz not null,
arrived_at timestamptz not null,
km numeric(8, 1),
kwh_est numeric(7, 2),
charged_away boolean not null default false,
constraint uq_ev_trip_departure unique (departure_obs_id)
);
comment on table ems.ev_trip is
'Jízda = pár odjezd→příjezd: km z odometru, kWh z ΔSoC × kapacita. charged_away = SoC po cestě vzrostlo (nabíjení mimo dům) — kWh nevypovídá, vyloučit ze statistik.';
create table ems.ev_usage_stats (
vehicle_id int not null references ems.asset_vehicle (id),
day_of_week int not null check (day_of_week between 0 and 6),
avg_day_kwh numeric(7, 2),
stddev_day_kwh numeric(7, 2),
avg_day_km numeric(8, 1),
avg_departure_hour numeric(4, 2),
sample_count int not null default 0,
last_updated timestamptz not null default now(),
primary key (vehicle_id, day_of_week)
);
comment on table ems.ev_usage_stats is
'Týdenní rytmus vozidla per den v týdnu (0=neděle, Europe/Prague): průměrná denní spotřeba jízdou (kWh), km, typická hodina prvního odjezdu. Plní fn_update_ev_usage_stats (job 00:50). Vstup fn_ev_required_soc / fn_ev_expected_departure.';

View File

@@ -45,17 +45,23 @@ begin
ac.id, ac.id,
av.id, av.id,
now(), now(),
av.default_target_soc_pct, -- forecast z týdenního rytmu (ev_usage_stats), fallback defaulty;
case -- ruční přepis přes fn_ev_session_apply_patch vždy vyhrává.
when av.default_deadline_hour is not null then coalesce(fc.required_soc, av.default_target_soc_pct),
( coalesce(
(timezone('Europe/Prague', now()))::date + interval '1 day' fc.expected_departure,
+ make_interval(hours => av.default_deadline_hour) case
)::timestamp at time zone 'Europe/Prague' when av.default_deadline_hour is not null then
end (
(timezone('Europe/Prague', now()))::date + interval '1 day'
+ make_interval(hours => av.default_deadline_hour)
)::timestamp at time zone 'Europe/Prague'
end
)
from ems.asset_ev_charger ac from ems.asset_ev_charger ac
left join lateral ( left join lateral (
select v.id, v.default_target_soc_pct, v.default_deadline_hour select v.id, v.default_target_soc_pct, v.default_deadline_hour,
v.target_soc_forecast_enabled
from ems.asset_vehicle v from ems.asset_vehicle v
where v.default_charger_id = ac.id where v.default_charger_id = ac.id
and v.site_id = ac.site_id and v.site_id = ac.site_id
@@ -63,6 +69,15 @@ begin
order by v.id order by v.id
limit 1 limit 1
) av on true ) av on true
left join lateral (
select dep.expected_departure,
ems.fn_ev_required_soc(av.id, dep.expected_departure) as required_soc
from (
select ems.fn_ev_next_departure(av.id, now()) as expected_departure
) dep
where av.target_soc_forecast_enabled
and dep.expected_departure is not null
) fc on true
where ac.id = p_charger_id where ac.id = p_charger_id
and ac.site_id = p_site_id and ac.site_id = p_site_id
on conflict (charger_id) where session_end is null do nothing; on conflict (charger_id) where session_end is null do nothing;

View File

@@ -0,0 +1,205 @@
-- EV spotřební forecast (SQL-first): vložení pozorování, párování jízd,
-- statistiky per DOW, odvození target SoC a očekávaného odjezdu.
create or replace function ems.fn_ev_vehicle_obs_insert(
p_site_id int,
p_vehicle_id int,
p_trigger text,
p_odometer_km numeric,
p_soc_pct numeric,
p_charging_state text
)
returns bigint
language sql
as $fn$
insert into ems.ev_vehicle_obs (
site_id, vehicle_id, trigger, odometer_km, soc_pct, charging_state
)
values (
p_site_id, p_vehicle_id, p_trigger, p_odometer_km, p_soc_pct, p_charging_state
)
returning id;
$fn$;
-- Spáruje odjezdy s nejbližším následujícím příjezdem téhož vozidla.
create or replace function ems.fn_ev_build_trips()
returns int
language plpgsql
as $fn$
declare
v_count int := 0;
r record;
v_arr record;
begin
for r in
select o.*
from ems.ev_vehicle_obs o
where o.trigger = 'departure'
and o.odometer_km is not null
and not exists (
select 1 from ems.ev_trip t where t.departure_obs_id = o.id
)
order by o.vehicle_id, o.observed_at
loop
select a.* into v_arr
from ems.ev_vehicle_obs a
where a.vehicle_id = r.vehicle_id
and a.trigger = 'arrival'
and a.observed_at > r.observed_at
and a.odometer_km is not null
order by a.observed_at
limit 1;
if v_arr.id is null then
continue; -- jízda ještě neskončila
end if;
insert into ems.ev_trip (
vehicle_id, departure_obs_id, arrival_obs_id,
departed_at, arrived_at, km, kwh_est, charged_away
)
select
r.vehicle_id, r.id, v_arr.id,
r.observed_at, v_arr.observed_at,
greatest(0, v_arr.odometer_km - r.odometer_km),
case
when r.soc_pct is null or v_arr.soc_pct is null then null
when v_arr.soc_pct > r.soc_pct then null -- nabíjeno cestou
else round(((r.soc_pct - v_arr.soc_pct) / 100.0
* av.battery_capacity_kwh)::numeric, 2)
end,
coalesce(v_arr.soc_pct > r.soc_pct, false)
from ems.asset_vehicle av
where av.id = r.vehicle_id
on conflict (departure_obs_id) do nothing;
v_count := v_count + 1;
end loop;
return v_count;
end;
$fn$;
-- Přepočet týdenního rytmu z jízd za lookback okno (plný přepočet, ne EMA —
-- rebuild-friendly; jízdy s nabíjením cestou se počítají do km, ne do kWh).
create or replace function ems.fn_update_ev_usage_stats(
p_lookback_days int default 60
)
returns int
language plpgsql
as $fn$
declare
v_built int;
v_count int;
begin
v_built := ems.fn_ev_build_trips();
with daily as (
select
t.vehicle_id,
(t.departed_at at time zone 'Europe/Prague')::date as d,
extract(dow from t.departed_at at time zone 'Europe/Prague')::int as dow,
sum(t.kwh_est) filter (where not t.charged_away) as day_kwh,
sum(t.km) as day_km,
min(extract(hour from t.departed_at at time zone 'Europe/Prague')
+ extract(minute from t.departed_at at time zone 'Europe/Prague') / 60.0
) as first_departure_hour
from ems.ev_trip t
where t.departed_at >= now() - make_interval(days => p_lookback_days)
and t.km >= 1.0 -- přepojení kabelu bez jízdy nepočítat
group by 1, 2, 3
),
agg as (
select
vehicle_id, dow,
avg(day_kwh) as avg_kwh,
stddev(day_kwh) as sd_kwh,
avg(day_km) as avg_km,
avg(first_departure_hour) as avg_dep,
count(*) as samples
from daily
group by 1, 2
)
insert into ems.ev_usage_stats (
vehicle_id, day_of_week, avg_day_kwh, stddev_day_kwh,
avg_day_km, avg_departure_hour, sample_count, last_updated
)
select
vehicle_id, dow, round(avg_kwh::numeric, 2), round(coalesce(sd_kwh, 0)::numeric, 2),
round(avg_km::numeric, 1), round(avg_dep::numeric, 2), samples, now()
from agg
on conflict (vehicle_id, day_of_week) do update set
avg_day_kwh = excluded.avg_day_kwh,
stddev_day_kwh = excluded.stddev_day_kwh,
avg_day_km = excluded.avg_day_km,
avg_departure_hour = excluded.avg_departure_hour,
sample_count = excluded.sample_count,
last_updated = now();
get diagnostics v_count = row_count;
return v_count;
end;
$fn$;
comment on function ems.fn_update_ev_usage_stats is
'Spáruje nové jízdy (fn_ev_build_trips) a přepočte ev_usage_stats za lookback okno. Job 00:50.';
-- Příští očekávaný den s jízdou (>= 3 km průměru a >= 4 vzorky) v horizontu 7 dní.
create or replace function ems.fn_ev_next_departure(
p_vehicle_id int,
p_from timestamptz default now()
)
returns timestamptz
language sql
stable
as $fn$
select min(dep)
from (
select (
((p_from at time zone 'Europe/Prague')::date + offs)::timestamp
+ make_interval(mins => (round(s.avg_departure_hour * 60))::int)
) at time zone 'Europe/Prague' as dep
from generate_series(0, 7) offs
join ems.ev_usage_stats s
on s.vehicle_id = p_vehicle_id
and s.day_of_week = extract(
dow from (p_from at time zone 'Europe/Prague')::date + offs
)::int
where s.sample_count >= 4
and s.avg_day_km >= 3.0
) cand
where cand.dep > p_from + interval '30 minutes';
$fn$;
comment on function ems.fn_ev_next_departure is
'Nejbližší typický odjezd vozidla dle ev_usage_stats (DOW s >=4 vzorky a >=3 km). NULL = málo dat → volající použije default_deadline_hour.';
-- Target SoC pro odjezd v daný den: P80 denní spotřeby + rezerva 10 p.b.,
-- clamp do [min_target_soc_pct, 100]. NULL = málo dat.
create or replace function ems.fn_ev_required_soc(
p_vehicle_id int,
p_departure_at timestamptz
)
returns numeric
language sql
stable
as $fn$
select least(100.0, greatest(
av.min_target_soc_pct,
ceil(
(s.avg_day_kwh + 0.8416 * s.stddev_day_kwh) -- ~P80
/ nullif(av.battery_capacity_kwh, 0) * 100.0
) + 10.0
))
from ems.asset_vehicle av
join ems.ev_usage_stats s
on s.vehicle_id = av.id
and s.day_of_week = extract(
dow from p_departure_at at time zone 'Europe/Prague'
)::int
where av.id = p_vehicle_id
and s.sample_count >= 4
and s.avg_day_kwh is not null;
$fn$;
comment on function ems.fn_ev_required_soc is
'Cílové SoC (%) pro odjezd: P80 spotřeby dne v týdnu + 10 p.b. rezerva, mez [min_target_soc_pct, 100]. NULL = málo dat → default_target_soc_pct.';