"""REST API – aktivní EV session a úprava deadline / target SoC.""" from __future__ import annotations import json from datetime import date, datetime from typing import Annotated, Any import asyncpg from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, field_validator from app.db_json import fetch_json from app.deps import get_pg_pool router = APIRouter(prefix="/sites/{site_id}/ev", tags=["ev"]) class EvSessionPatchBody(BaseModel): target_soc_pct: float | None = None target_deadline: datetime | None = None @field_validator("target_soc_pct") @classmethod def _soc_range(cls, v: float | None) -> float | None: if v is not None and not (10 <= v <= 100): raise ValueError("target_soc_pct must be between 10 and 100") return v class EvSessionPatchResponse(BaseModel): success: bool = True session_id: int @router.get("/sessions/active") async def get_active_ev_sessions( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> list[dict[str, Any]]: async with pool.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await fetch_json( conn, "select ems.fn_ev_sessions_active($1::int)", site_id, ) if not isinstance(rows, list): rows = json.loads(rows) if isinstance(rows, str) else [] return [r for r in rows if isinstance(r, dict)] @router.patch("/sessions/{session_id}", response_model=EvSessionPatchResponse) async def patch_ev_session( site_id: int, session_id: int, body: EvSessionPatchBody, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> EvSessionPatchResponse: async with pool.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") patch = body.model_dump(exclude_unset=True) raw = await fetch_json( conn, "select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)", site_id, session_id, json.dumps(patch), ) if not isinstance(raw, dict): raw = json.loads(raw) if not raw.get("success"): raise HTTPException(status_code=404, detail="Session not found") return EvSessionPatchResponse(success=True, session_id=int(raw["session_id"])) class ArrivalHourItem(BaseModel): hour: int confidence_pct: int samples: int class ChargerTomorrowArrival(BaseModel): tomorrow: list[ArrivalHourItem] class EvArrivalPredictionResponse(BaseModel): insufficient_data: bool tomorrow_date: str chargers: dict[str, ChargerTomorrowArrival] @router.get("/arrival-prediction", response_model=EvArrivalPredictionResponse) async def get_ev_arrival_prediction( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> EvArrivalPredictionResponse: async with pool.acquire() as conn: raw = await fetch_json( conn, "select ems.fn_ev_arrival_prediction_bundle($1::int)", site_id, ) if not isinstance(raw, dict): raw = json.loads(raw) if raw.get("error") == "site_not_found": raise HTTPException(status_code=404, detail="Site not found") chargers: dict[str, ChargerTomorrowArrival] = {} ch_raw = raw.get("chargers") or {} if isinstance(ch_raw, dict): for code, v in ch_raw.items(): if not isinstance(v, dict): continue tlist = v.get("tomorrow") or [] items: list[ArrivalHourItem] = [] if isinstance(tlist, list): for it in tlist: if not isinstance(it, dict): continue items.append( ArrivalHourItem( hour=int(it.get("hour") or 0), confidence_pct=int(it.get("confidence_pct") or 0), samples=int(it.get("samples") or 0), ) ) chargers[str(code)] = ChargerTomorrowArrival(tomorrow=items) td = raw.get("tomorrow_date") if isinstance(td, date): td_s = td.isoformat() elif isinstance(td, datetime): td_s = td.date().isoformat() else: td_s = str(td or "") return EvArrivalPredictionResponse( insufficient_data=bool(raw.get("insufficient_data")), tomorrow_date=td_s, chargers=chargers, )