"""REST API – lokality: ceny OTE, forecast, Modbus journal/verify.""" from __future__ import annotations import json import logging from datetime import date, datetime, timedelta, timezone from typing import Annotated, Any import asyncpg from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from app.db_json import fetch_json, record_to_dict from app.deps import get_pg_pool from app.refresh_negative_prices import refresh_negative_price_predictions from services.control_exporter import read_deye_registers_live, verify_modbus_commands from services.forecast_service import fetch_pv_forecast from services.price_importer import import_ote_prices logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/sites", tags=["sites"]) def _parse_ymd(s: str) -> date: try: return date.fromisoformat(s) except ValueError: raise HTTPException( status_code=400, detail="Invalid date, expected YYYY-MM-DD" ) from None @router.get("") async def list_sites( db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> list[dict[str, Any]]: async with db.acquire() as conn: rows = await conn.fetch( """ select id, code, name, timezone, latitude, longitude, active, notes, created_at from ems.vw_site_directory order by id """ ) return [record_to_dict(r) for r in rows] @router.get("/{site_id}/prices") async def get_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], date_str: str | None = Query( None, alias="date", description="YYYY-MM-DD, default today" ), ) -> list[dict[str, Any]]: if date_str is None: date_str = date.today().isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await fetch_json( conn, "select ems.fn_site_effective_prices_day_prague($1::int, $2::date)", site_id, d, ) if not isinstance(rows, list): rows = json.loads(rows) if isinstance(rows, str) else [] return [r for r in rows if isinstance(r, dict)] @router.get("/{site_id}/prices/slots") async def get_site_prices_slots_range( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query( ..., alias="from", description="Začátek okna [from, to), typicky UTC zaokrouhlené na 15 min", ), to_ts: datetime = Query( ..., alias="to", description="Konec polouzavřeného intervalu (max. 14 dní za from)", ), ) -> dict[str, list[dict[str, Any]]]: if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=14): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 14 days", ) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") raw = await fetch_json( conn, "select ems.fn_site_effective_prices_slots_range($1::int, $2::timestamptz, $3::timestamptz)", site_id, from_ts, to_ts, ) rows = raw if isinstance(raw, list) else [] if not isinstance(rows, list): rows = [] return {"slots": [r for r in rows if isinstance(r, dict)]} class PricesImportResponse(BaseModel): slots_imported: int date: str first_price_czk_kwh: float class PricesLatestResponse(BaseModel): latest_date: str slots: int min_price: float max_price: float avg_price: float class ForecastRunResponse(BaseModel): intervals_saved: int pv_arrays: int class ModbusCommandVerifyItem(BaseModel): id: int asset_code: str register_name: str | None value_to_write: int value_verified: int | None status: str class ModbusVerifyResponse(BaseModel): checked: int verified: int mismatch: int commands: list[ModbusCommandVerifyItem] @router.post( "/{site_id}/prices/import", response_model=PricesImportResponse, summary="Import OTE cen (globální)", description=( "Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). " "site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se " "obnoví predikce záporných cen pro všechny aktivní lokality." ), ) async def post_import_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], date_str: str | None = Query( None, alias="date", description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)", ), ) -> PricesImportResponse: target: date | None = _parse_ymd(date_str) if date_str is not None else None import_error: str | None = None async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") n, day, first_price, import_error = await import_ote_prices( conn, site_id=None, target_date=target ) if n >= 0: sites_raw = await fetch_json( conn, "select ems.fn_vw_site_directory_active()" ) sites_list = sites_raw if isinstance(sites_raw, list) else [] for site in sites_list: if isinstance(site, dict): await refresh_negative_price_predictions(conn, int(site["id"])) if n < 0: raise HTTPException( status_code=422, detail=f"OTE import selhal ({import_error or 'unknown'})", ) return PricesImportResponse( slots_imported=n, date=day, first_price_czk_kwh=first_price, ) class NegPricePredictionItem(BaseModel): predicted_date: str window_start_hour: int window_end_hour: int probability_pct: float expected_min_price: float | None reason: str class NegativePredictionsResponse(BaseModel): predictions: list[NegPricePredictionItem] insufficient_history: bool @router.get( "/{site_id}/prices/negative-predictions", response_model=NegativePredictionsResponse, ) async def get_site_negative_price_predictions( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> NegativePredictionsResponse: """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE.""" async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") bundle = await fetch_json( conn, "select ems.fn_negative_price_predictions($1::int)", site_id, ) if not isinstance(bundle, dict): bundle = json.loads(bundle) rows = bundle.get("predictions") or [] if not isinstance(rows, list): rows = [] predictions: list[NegPricePredictionItem] = [] for r in rows: if not isinstance(r, dict): continue em = r.get("expected_min_price") pd = r.get("predicted_date") predictions.append( NegPricePredictionItem( predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd), window_start_hour=int(r.get("window_start_hour") or 0), window_end_hour=int(r.get("window_end_hour") or 0), probability_pct=float(r.get("probability_pct") or 0), expected_min_price=float(em) if em is not None else None, reason=str(r.get("reason") or ""), ) ) return NegativePredictionsResponse( predictions=predictions, insufficient_history=bool(bundle.get("insufficient_history")), ) @router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) async def get_site_prices_latest( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> PricesLatestResponse: async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") row = await fetch_json(conn, "select ems.fn_latest_ote_day_stats()") if not isinstance(row, dict): row = json.loads(row) day = row.get("latest_date") if day is None: raise HTTPException(status_code=404, detail="Žádná tržní data v databázi") latest_date = day.isoformat() if hasattr(day, "isoformat") else str(day)[:10] return PricesLatestResponse( latest_date=latest_date, slots=int(row.get("slots") or 0), min_price=float(row.get("min_price") or 0.0), max_price=float(row.get("max_price") or 0.0), avg_price=float(row.get("avg_price") or 0.0), ) @router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse) async def get_verify_modbus_commands( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"), ) -> ModbusVerifyResponse: """ Ruční ověření Modbus zápisů (written) z posledních N minut. Vhodné hned po manuálním exportu setpointů. """ async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") lookback = timedelta(minutes=minutes) id_json = await fetch_json( conn, "select ems.fn_modbus_written_command_ids($1::int, $2::interval)", site_id, lookback, ) if not isinstance(id_json, list): id_json = json.loads(id_json) if isinstance(id_json, str) else [] ids = [int(x) for x in id_json] checked = len(ids) if ids: await verify_modbus_commands(ids, conn, site_id) detail_json = ( await fetch_json( conn, "select ems.fn_modbus_commands_by_ids($1::int[])", ids, ) if ids else [] ) if ids and not isinstance(detail_json, list): detail_json = json.loads(detail_json) if isinstance(detail_json, str) else [] detail_rows = detail_json if ids else [] commands = [ ModbusCommandVerifyItem( id=int(r["id"]), asset_code=str(r.get("asset_code") or ""), register_name=r.get("register_name"), value_to_write=int(r["value_to_write"]), value_verified=int(r["value_verified"]) if r.get("value_verified") is not None else None, status=str(r.get("status") or ""), ) for r in detail_rows if isinstance(r, dict) ] verified = sum(1 for c in commands if c.status == "verified") mismatch = sum(1 for c in commands if c.status == "mismatch") return ModbusVerifyResponse( checked=checked, verified=verified, mismatch=mismatch, commands=commands, ) class DeyeRegistersLiveResponse(BaseModel): reg108_charge_a: int reg109_discharge_a: int reg141_energy_mode: int reg142_limit_control: int reg143_export_limit_w: int reg178_peak_shaving_switch: int reg178_control_board_special_1: int reg178_mi_export_cutoff_bits: int reg178_mi_export_cutoff_is_on: bool reg191_peak_shaving_w: int read_at: str @router.get( "/{site_id}/control/registers", response_model=DeyeRegistersLiveResponse, ) async def get_control_registers_live( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> DeyeRegistersLiveResponse: """Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient.""" async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: payload = await read_deye_registers_live(site_id, conn) except ValueError: raise HTTPException( status_code=404, detail="No controllable Modbus inverter for this site", ) from None except Exception as e: logger.warning("get_control_registers_live site=%s: %s", site_id, e) raise HTTPException( status_code=503, detail=f"Modbus read failed: {e}", ) from e return DeyeRegistersLiveResponse(**payload) class ModbusJournalCommandRow(BaseModel): id: int register: int register_name: str | None value_to_write: int value_written: int | None value_verified: int | None status: str attempt_count: int created_at: str class ModbusJournalListResponse(BaseModel): commands: list[ModbusJournalCommandRow] @router.get( "/{site_id}/control/journal", response_model=ModbusJournalListResponse, ) async def get_control_command_journal( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], limit: int = Query(50, ge=1, le=100), ) -> ModbusJournalListResponse: async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await fetch_json( conn, "select ems.fn_modbus_journal_list($1::int, $2::int)", site_id, limit, ) if not isinstance(rows, list): rows = json.loads(rows) if isinstance(rows, str) else [] cmds: list[ModbusJournalCommandRow] = [] for r in rows: d = r if isinstance(r, dict) else {} ca = d["created_at"] cmds.append( ModbusJournalCommandRow( id=int(d["id"]), register=int(d["register"]), register_name=d.get("register_name"), value_to_write=int(d["value_to_write"]), value_written=int(d["value_written"]) if d.get("value_written") is not None else None, value_verified=int(d["value_verified"]) if d.get("value_verified") is not None else None, status=str(d["status"]), attempt_count=int(d["attempt_count"]), created_at=ca if isinstance(ca, str) else str(ca), ) ) return ModbusJournalListResponse(commands=cmds) @router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse) async def post_run_site_forecast( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], ) -> ForecastRunResponse: async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") try: intervals, pv_arrays = await fetch_pv_forecast(site_id, conn) except Exception as e: logger.error("Forecast failed: %s", e, exc_info=True) raise HTTPException(status_code=422, detail=str(e)) from e if intervals >= 0: await refresh_negative_price_predictions(conn, site_id) if intervals < 0: raise HTTPException( status_code=422, detail="Forecast se nepodařilo stáhnout nebo zpracovat", ) return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays) @router.get("/{site_id}/forecast/pv") async def get_site_forecast_pv( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], date_str: str | None = Query( None, alias="date", description="YYYY-MM-DD, default tomorrow" ), ) -> dict[str, list[dict[str, Any]]]: if date_str is None: date_str = (date.today() + timedelta(days=1)).isoformat() d = _parse_ymd(date_str) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") split = await fetch_json( conn, "select ems.fn_forecast_pv_split($1::int, $2::date)", site_id, d, ) if not isinstance(split, dict): split = json.loads(split) if isinstance(split, str) else {} pv_a = split.get("pv_a") or [] pv_b = split.get("pv_b") or [] if not isinstance(pv_a, list): pv_a = [] if not isinstance(pv_b, list): pv_b = [] return {"pv_a": pv_a, "pv_b": pv_b} @router.get("/{site_id}/forecast/pv-slots") async def get_site_forecast_pv_slots_range( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query( ..., alias="from", description="Začátek okna [from, to), typicky UTC zaokrouhlené na 15 min", ), to_ts: datetime = Query( ..., alias="to", description="Konec polouzavřeného intervalu (max. 60 dní za from)", ), ) -> dict[str, list[dict[str, Any]]]: if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=60): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 60 days", ) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") raw = await fetch_json( conn, "select ems.fn_forecast_pv_slots_range($1::int, $2::timestamptz, $3::timestamptz)", site_id, from_ts, to_ts, ) slots = raw if isinstance(raw, list) else [] if not isinstance(slots, list): slots = [] return {"slots": slots} @router.get("/{site_id}/forecast/pv-slots-corrected") async def get_site_forecast_pv_slots_range_corrected( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query( ..., alias="from", description="Začátek okna [from, to), typicky UTC zaokrouhlené na 15 min", ), to_ts: datetime = Query( ..., alias="to", description="Konec polouzavřeného intervalu (max. 60 dní za from)", ), delta_from_ts: datetime | None = Query( None, alias="delta_from", description="Začátek okna historie pro výpočet delta profilu (default: now-60d)", ), delta_to_ts: datetime | None = Query( None, alias="delta_to", description="Konec okna historie pro výpočet delta profilu (default: now)", ), half_life_days: float = Query( 14, ge=1, le=90, description="Half-life vážení (dny) pro delta profil", ), threshold_w: int = Query( 150, ge=0, le=10_000, description="Ignorovat sloty s nízkou výrobou (W) při odhadu profilu", ), ) -> dict[str, list[dict[str, Any]]]: if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=60): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 60 days", ) now = datetime.now(tz=timezone.utc) delta_to = delta_to_ts or now delta_from = delta_from_ts or (delta_to - timedelta(days=60)) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") raw = await fetch_json( conn, """ select ems.fn_forecast_pv_slots_range_corrected( $1::int, $2::timestamptz, $3::timestamptz, $4::timestamptz, $5::timestamptz, $6::numeric, $7::int ) """, site_id, from_ts, to_ts, delta_from, delta_to, half_life_days, threshold_w, ) slots = raw if isinstance(raw, list) else [] if not isinstance(slots, list): slots = [] return {"slots": [s for s in slots if isinstance(s, dict)]} @router.get("/{site_id}/forecast/pv-delta-profile") async def get_site_forecast_pv_delta_profile( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query( ..., alias="from", description="Začátek okna historie pro výpočet delty [from, to)", ), to_ts: datetime = Query( ..., alias="to", description="Konec okna (max. 120 dní za from; typicky now)", ), half_life_days: float = Query( 14, ge=1, le=90, description="Half-life vážení (dny) pro delta profil", ), threshold_w: int = Query( 150, ge=0, le=10_000, description="Ignorovat sloty s nízkou výrobou (W) při odhadu profilu", ), top_n_days: int | None = Query( None, ge=0, le=31, description="Top N kalendářních dní podle day_score (NULL = z kalibrace / výchozí funkce)", ), non_top_day_factor: float | None = Query( None, ge=0, le=1, description="Ztlumení vah mimo top N (NULL = z kalibrace / default)", ), day_weight_gamma: float | None = Query( None, ge=0.25, le=8, description="Exponent na day_weight (NULL = z kalibrace / default)", ), ) -> dict[str, Any]: """JSON z `ems.fn_pv_forecast_delta_profile` (`deltas`, `deltas_by_array`, cutoff z DB).""" if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=120): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 120 days", ) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") raw = await fetch_json( conn, """ select ems.fn_pv_forecast_delta_profile( $1::int, $2::timestamptz, $3::timestamptz, $4::numeric, $5::int, $6::int, $7::numeric, $8::numeric ) """, site_id, from_ts, to_ts, half_life_days, threshold_w, top_n_days, non_top_day_factor, day_weight_gamma, ) if not isinstance(raw, dict): return {} return raw @router.get("/{site_id}/timeseries/telemetry-15m") async def get_site_telemetry_15m_range( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query(..., alias="from", description="Začátek okna [from, to)"), to_ts: datetime = Query(..., alias="to", description="Konec okna [from, to)"), ) -> dict[str, list[dict[str, Any]]]: if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=60): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 60 days", ) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ select slot_start, site_id, avg_pv_w, avg_load_w, avg_grid_w, avg_battery_w, last_soc_pct, sample_count from ems.telemetry_inverter_15m where site_id = $1 and slot_start >= $2::timestamptz and slot_start < $3::timestamptz order by slot_start asc """, site_id, from_ts, to_ts, ) return {"slots": [record_to_dict(r) for r in rows]} @router.get("/{site_id}/forecast/load-baseline-slots") async def get_site_load_baseline_slots_range( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pg_pool)], from_ts: datetime = Query(..., alias="from", description="Začátek okna [from, to)"), to_ts: datetime = Query(..., alias="to", description="Konec okna [from, to)"), ) -> dict[str, list[dict[str, Any]]]: if to_ts <= from_ts: raise HTTPException(status_code=422, detail="'to' must be after 'from'") if to_ts - from_ts > timedelta(days=60): raise HTTPException( status_code=422, detail="Span between 'from' and 'to' must be at most 60 days", ) async with db.acquire() as conn: site_ok = await conn.fetchval( "SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id ) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") rows = await conn.fetch( """ select interval_start, forecast_w, confidence_w from ems.fn_get_baseline_forecast($1::int, $2::timestamptz, $3::timestamptz) """, site_id, from_ts, to_ts, ) return {"slots": [record_to_dict(r) for r in rows]}