"""FVE production forecast from Open-Meteo + pvlib (15min intervals).""" from __future__ import annotations import json import logging from datetime import timedelta, timezone from typing import Any from zoneinfo import ZoneInfo import httpx import pandas as pd import pvlib from pvlib import irradiance from app.config import get_settings logger = logging.getLogger(__name__) def _db_azimuth_to_pvlib(surface_azimuth_db_deg: float) -> float: """DB: 0=jih, 90=západ, -90=východ → pvlib (N=0, E=90, S=180, W=270).""" return float((surface_azimuth_db_deg + 180) % 360) async def fetch_pv_forecast(site_id: int, db) -> tuple[int, int]: """ Stáhne počasí (Open-Meteo), pro každé FVE pole spočte výkon (pvlib) a uloží intervaly. Open-Meteo nepodporuje název ``diffuse_horizontal_irradiance``; používá se ``diffuse_radiation`` (DHI) a ``shortwave_radiation`` (GHI). Data jsou ``minutely_15`` kvůli 15min slotům v ``ems.forecast_pv_interval``. Returns: ``(celkový_počet_řádků_forecast_pv_interval, počet_FVE_polí)``. Při chybě ``(-1, 0)``. Bez polí ``(0, 0)``. """ site = await db.fetchrow( """ SELECT latitude, longitude, timezone FROM ems.site WHERE id = $1 """, site_id, ) if site is None: logger.error("fetch_pv_forecast: site id=%s nenalezen", site_id) return -1, 0 if site["latitude"] is None or site["longitude"] is None: logger.error("fetch_pv_forecast: site id=%s nemá latitude/longitude", site_id) return -1, 0 lat = float(site["latitude"]) lon = float(site["longitude"]) tz_name: str = site["timezone"] or "Europe/Prague" try: ZoneInfo(tz_name) except Exception as e: logger.error("fetch_pv_forecast: neplatná timezone %r: %s", tz_name, e) return -1, 0 arrays = await db.fetch( """ SELECT id, code, nominal_power_wp, azimuth_deg, tilt_deg, shading_factor, controllable FROM ems.asset_pv_array WHERE site_id = $1 AND azimuth_deg IS NOT NULL AND tilt_deg IS NOT NULL ORDER BY id """, site_id, ) if not arrays: logger.info("fetch_pv_forecast: žádná FVE pole pro site_id=%s", site_id) return 0, 0 n_arrays = len(arrays) settings = get_settings() base = settings.open_meteo_api_url.rstrip("/") params = { "latitude": lat, "longitude": lon, "minutely_15": ",".join( [ "direct_normal_irradiance", "diffuse_radiation", "shortwave_radiation", "temperature_2m", ] ), "forecast_days": max(2, min(int(settings.open_meteo_forecast_days), 16)), "timezone": "auto", } try: async with httpx.AsyncClient(timeout=20.0) as client: resp = await client.get(base, params=params) resp.raise_for_status() data = resp.json() except httpx.TimeoutException: logger.warning("fetch_pv_forecast: timeout Open-Meteo") return -1, 0 except httpx.HTTPStatusError as e: logger.warning( "fetch_pv_forecast: HTTP %s Open-Meteo: %s", e.response.status_code, e.response.text[:500], ) return -1, 0 except httpx.HTTPError as e: logger.warning("fetch_pv_forecast: HTTP chyba Open-Meteo: %s", e) return -1, 0 m15 = data.get("minutely_15") or {} times_raw = m15.get("time") if not times_raw or not isinstance(times_raw, list): snippet = json.dumps(data, ensure_ascii=False)[:500] logger.error("fetch_pv_forecast: chybí minutely_15.time, začátek: %s", snippet) return -1, 0 api_tz = data.get("timezone") or tz_name try: tzinfo = ZoneInfo(api_tz) except Exception: tzinfo = ZoneInfo(tz_name) times = pd.DatetimeIndex(pd.to_datetime(times_raw)) if times.tz is None: times = times.tz_localize(tzinfo) def _series(key: str) -> pd.Series: raw = m15.get(key) if not isinstance(raw, list) or len(raw) != len(times): return pd.Series(0.0, index=times, dtype=float) return pd.Series( [0.0 if v is None else float(v) for v in raw], index=times, dtype=float, ) dni = _series("direct_normal_irradiance") ghi = _series("shortwave_radiation") dhi = _series("diffuse_radiation") temp_air = _series("temperature_2m") loc = pvlib.location.Location(lat, lon, tz=api_tz) solar_pos = loc.get_solarposition(times) dni_extra = irradiance.get_extra_radiation(times) total_rows = 0 horizon_start = times[0].tz_convert(timezone.utc).to_pydatetime() horizon_end = ( times[-1].tz_convert(timezone.utc).to_pydatetime() + timedelta(minutes=15) ) for arr in arrays: tilt = float(arr["tilt_deg"]) az_db = float(arr["azimuth_deg"]) az_pvlib = _db_azimuth_to_pvlib(az_db) nominal_power_wp = float(arr["nominal_power_wp"]) shading = float(arr["shading_factor"] or 1.0) poa_global = irradiance.get_total_irradiance( surface_tilt=tilt, surface_azimuth=az_pvlib, solar_zenith=solar_pos["apparent_zenith"], solar_azimuth=solar_pos["azimuth"], dni=dni, ghi=ghi, dhi=dhi, dni_extra=dni_extra, model="haydavies", )["poa_global"].fillna(0).clip(lower=0) area_m2 = nominal_power_wp / (1000.0 * 0.20) power_w = poa_global * area_m2 * 0.20 * shading cap_w = nominal_power_wp * 1.1 power_w = power_w.clip(lower=0, upper=cap_w).round().astype(int) model_params: dict[str, Any] = { "source": "open_meteo", "endpoint": base, "params": params, "pvlib_model": "haydavies", "nominal_power_wp": nominal_power_wp, "shading_factor": shading, "area_m2_ref_20pct": area_m2, } run_id = await db.fetchval( """ INSERT INTO ems.forecast_pv_run ( site_id, pv_array_id, forecast_source, model_params, horizon_start, horizon_end, status ) VALUES ($1, $2, $3, $4::jsonb, $5, $6, 'ok') RETURNING id """, site_id, arr["id"], "open_meteo", json.dumps(model_params), horizon_start, horizon_end, ) records = [] for ts, p, g, t in zip( times, power_w, ghi, temp_air, strict=True, ): interval_start = ts.tz_convert(timezone.utc).to_pydatetime() records.append( ( run_id, arr["id"], interval_start, int(p), float(g), float(t), ) ) await db.executemany( """ INSERT INTO ems.forecast_pv_interval ( run_id, pv_array_id, interval_start, power_w, irradiance_wm2, temp_c ) VALUES ($1, $2, $3, $4, $5, $6) """, records, ) total_rows += len(records) return total_rows, n_arrays