"""REST API – aktivní EV session a úprava deadline / target SoC.""" from __future__ import annotations from datetime import date, datetime from typing import Annotated, Any import asyncpg from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, field_validator from app.db_json import record_to_dict from app.deps import get_pg_pool router = APIRouter(prefix="/sites/{site_id}/ev", tags=["ev"]) class EvSessionPatchBody(BaseModel): target_soc_pct: float | None = None target_deadline: datetime | None = None @field_validator("target_soc_pct") @classmethod def _soc_range(cls, v: float | None) -> float | None: if v is not None and not (10 <= v <= 100): raise ValueError("target_soc_pct must be between 10 and 100") return v class EvSessionPatchResponse(BaseModel): success: bool = True session_id: int @router.get("/sessions/active") async def get_active_ev_sessions( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> list[dict[str, Any]]: async with pool.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ SELECT es.id, es.charger_id, es.vehicle_id, es.session_start, es.energy_delivered_wh, es.target_soc_pct, es.target_deadline, av.make, av.model, av.battery_capacity_kwh, av.default_target_soc_pct, av.default_deadline_hour, ac.code AS charger_code, COALESCE( NULLIF(TRIM(CONCAT_WS(' ', ac.manufacturer, ac.model)), ''), ac.code ) AS charger_name FROM ems.ev_session es LEFT JOIN ems.asset_vehicle av ON av.id = es.vehicle_id JOIN ems.asset_ev_charger ac ON ac.id = es.charger_id WHERE es.site_id = $1 AND es.session_end IS NULL ORDER BY es.session_start DESC """, site_id, ) return [record_to_dict(r) for r in rows] @router.patch("/sessions/{session_id}", response_model=EvSessionPatchResponse) async def patch_ev_session( site_id: int, session_id: int, body: EvSessionPatchBody, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> EvSessionPatchResponse: async with pool.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") row = await conn.fetchrow( """ UPDATE ems.ev_session SET target_soc_pct = $1, target_deadline = $2 WHERE id = $3 AND site_id = $4 RETURNING id """, body.target_soc_pct, body.target_deadline, session_id, site_id, ) if row is None: raise HTTPException(status_code=404, detail="Session not found") return EvSessionPatchResponse(success=True, session_id=int(row["id"])) class ArrivalHourItem(BaseModel): hour: int confidence_pct: int samples: int class ChargerTomorrowArrival(BaseModel): tomorrow: list[ArrivalHourItem] class EvArrivalPredictionResponse(BaseModel): insufficient_data: bool tomorrow_date: str chargers: dict[str, ChargerTomorrowArrival] @router.get("/arrival-prediction", response_model=EvArrivalPredictionResponse) async def get_ev_arrival_prediction( site_id: int, pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> EvArrivalPredictionResponse: """Top hodiny příjezdu z ems.fn_ev_expected_arrival; při <5 session celkem insufficient_data.""" async with pool.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") n_sessions = int( await conn.fetchval( "SELECT COUNT(*)::int FROM ems.ev_session WHERE site_id = $1", site_id, ) or 0 ) insufficient = n_sessions < 5 tomorrow = await conn.fetchval( """ SELECT ( CURRENT_TIMESTAMP AT TIME ZONE COALESCE( NULLIF(TRIM(timezone), ''), 'Europe/Prague' ) )::date + 1 FROM ems.site WHERE id = $1 """, site_id, ) if tomorrow is None: raise HTTPException(status_code=500, detail="Site date resolution failed") tomorrow_d: date = tomorrow chargers_rows = await conn.fetch( "SELECT id, code FROM ems.asset_ev_charger WHERE site_id = $1 ORDER BY id", site_id, ) chargers: dict[str, ChargerTomorrowArrival] = {} for ch in chargers_rows: code = str(ch["code"]) preds = await conn.fetch( "SELECT * FROM ems.fn_ev_expected_arrival($1, $2, $3::date)", site_id, ch["id"], tomorrow_d, ) chargers[code] = ChargerTomorrowArrival( tomorrow=[ ArrivalHourItem( hour=int(r["expected_hour"]), confidence_pct=int(r["confidence_pct"]), samples=int(r["sample_count"]), ) for r in preds ] ) return EvArrivalPredictionResponse( insufficient_data=insufficient, tomorrow_date=tomorrow_d.isoformat(), chargers=chargers, )