Files
ems/backend/app/routers/ev.py
Dusan Vojacek 9f4126946d second version
2026-04-03 14:23:16 +02:00

179 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""REST API aktivní EV session a úprava deadline / target SoC."""
from __future__ import annotations
from datetime import date, datetime
from typing import Annotated, Any
import asyncpg
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, field_validator
from app.db_json import record_to_dict
from app.deps import get_pg_pool
router = APIRouter(prefix="/sites/{site_id}/ev", tags=["ev"])
class EvSessionPatchBody(BaseModel):
target_soc_pct: float | None = None
target_deadline: datetime | None = None
@field_validator("target_soc_pct")
@classmethod
def _soc_range(cls, v: float | None) -> float | None:
if v is not None and not (10 <= v <= 100):
raise ValueError("target_soc_pct must be between 10 and 100")
return v
class EvSessionPatchResponse(BaseModel):
success: bool = True
session_id: int
@router.get("/sessions/active")
async def get_active_ev_sessions(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> list[dict[str, Any]]:
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT es.id, es.charger_id, es.vehicle_id,
es.session_start, es.energy_delivered_wh,
es.target_soc_pct, es.target_deadline,
av.make, av.model, av.battery_capacity_kwh,
av.default_target_soc_pct, av.default_deadline_hour,
ac.code AS charger_code,
COALESCE(
NULLIF(TRIM(CONCAT_WS(' ', ac.manufacturer, ac.model)), ''),
ac.code
) AS charger_name
FROM ems.ev_session es
LEFT JOIN ems.asset_vehicle av ON av.id = es.vehicle_id
JOIN ems.asset_ev_charger ac ON ac.id = es.charger_id
WHERE es.site_id = $1 AND es.session_end IS NULL
ORDER BY es.session_start DESC
""",
site_id,
)
return [record_to_dict(r) for r in rows]
@router.patch("/sessions/{session_id}", response_model=EvSessionPatchResponse)
async def patch_ev_session(
site_id: int,
session_id: int,
body: EvSessionPatchBody,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> EvSessionPatchResponse:
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
row = await conn.fetchrow(
"""
UPDATE ems.ev_session
SET target_soc_pct = $1, target_deadline = $2
WHERE id = $3 AND site_id = $4
RETURNING id
""",
body.target_soc_pct,
body.target_deadline,
session_id,
site_id,
)
if row is None:
raise HTTPException(status_code=404, detail="Session not found")
return EvSessionPatchResponse(success=True, session_id=int(row["id"]))
class ArrivalHourItem(BaseModel):
hour: int
confidence_pct: int
samples: int
class ChargerTomorrowArrival(BaseModel):
tomorrow: list[ArrivalHourItem]
class EvArrivalPredictionResponse(BaseModel):
insufficient_data: bool
tomorrow_date: str
chargers: dict[str, ChargerTomorrowArrival]
@router.get("/arrival-prediction", response_model=EvArrivalPredictionResponse)
async def get_ev_arrival_prediction(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> EvArrivalPredictionResponse:
"""Top hodiny příjezdu z ems.fn_ev_expected_arrival; při <5 session celkem insufficient_data."""
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
n_sessions = int(
await conn.fetchval(
"SELECT COUNT(*)::int FROM ems.ev_session WHERE site_id = $1",
site_id,
)
or 0
)
insufficient = n_sessions < 5
tomorrow = await conn.fetchval(
"""
SELECT (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF(TRIM(timezone), ''),
'Europe/Prague'
)
)::date + 1
FROM ems.site
WHERE id = $1
""",
site_id,
)
if tomorrow is None:
raise HTTPException(status_code=500, detail="Site date resolution failed")
tomorrow_d: date = tomorrow
chargers_rows = await conn.fetch(
"SELECT id, code FROM ems.asset_ev_charger WHERE site_id = $1 ORDER BY id",
site_id,
)
chargers: dict[str, ChargerTomorrowArrival] = {}
for ch in chargers_rows:
code = str(ch["code"])
preds = await conn.fetch(
"SELECT * FROM ems.fn_ev_expected_arrival($1, $2, $3::date)",
site_id,
ch["id"],
tomorrow_d,
)
chargers[code] = ChargerTomorrowArrival(
tomorrow=[
ArrivalHourItem(
hour=int(r["expected_hour"]),
confidence_pct=int(r["confidence_pct"]),
samples=int(r["sample_count"]),
)
for r in preds
]
)
return EvArrivalPredictionResponse(
insufficient_data=insufficient,
tomorrow_date=tomorrow_d.isoformat(),
chargers=chargers,
)