# backend/services/planning/forecast.py # # EMS plánovač – korekce FVE forecastu podle skutečné výroby # (Fáze 1 dekompozice, čistý přesun z planning_engine.py). import json import logging from dataclasses import replace from datetime import datetime, timedelta from typing import Optional from services.planning.constants import ( CORRECTION_DECAY_SLOTS, CORRECTION_MAX_CLAMP, CORRECTION_MIN_CLAMP, CORRECTION_WINDOW_H, ) from services.planning.types import PlanningSlot, _parse_json_dt logger = logging.getLogger(__name__) async def compute_correction_factor( site_id: int, now: datetime, db, window_h: float = CORRECTION_WINDOW_H, ) -> tuple[float, dict]: """ Spočítá korekční faktor FVE forecastu z posledních window_h hodin. Vrátí (factor, log_data) kde factor je v rozsahu [CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP]. factor = 1.0 pokud není dostatek dat nebo je rozdíl zanedbatelný. """ window_start = now - timedelta(hours=window_h) raw = await db.fetchval( """ select ems.fn_pv_forecast_correction_factor( $1::int, $2::timestamptz, $3::timestamptz, $4::numeric, $5::numeric ) """, site_id, window_start, now, CORRECTION_MIN_CLAMP, CORRECTION_MAX_CLAMP, ) j = raw if isinstance(raw, dict) else json.loads(raw) factor = float(j.get("correction_factor", 1.0)) # JSON z DB má často ISO řetězce; asyncpg u $2/$3 vyžaduje datetime ws = _parse_json_dt(j.get("window_start")) or window_start we = _parse_json_dt(j.get("window_end")) or now log_data = { "window_start": ws, "window_end": we, "actual_pv_wh": j.get("actual_pv_wh"), "forecast_pv_wh": j.get("forecast_pv_wh"), "correction_factor": factor, "reason": j.get("reason", "ok"), } if j.get("raw_factor") is not None: log_data["raw_factor"] = j["raw_factor"] return factor, log_data def apply_forecast_correction( slots: list[PlanningSlot], now: datetime, factor: float, decay_slots: int = CORRECTION_DECAY_SLOTS, ) -> list[PlanningSlot]: """ Aplikuje korekční faktor na FVE forecast zbývajících slotů. Korekce se lineárně utlumuje: na 1. slotu plná korekce, na decay_slots-tém slotu žádná korekce. Příklad: factor=0.85, slot 0 → pv_a *= 0.85, slot 8 → pv_a *= 0.925, slot 16+ → žádná korekce """ corrected = [] for i, slot in enumerate(slots): if factor == 1.0 or i >= decay_slots: corrected.append(slot) continue # Lineární útlum: weight klesá od 1.0 (slot 0) do 0.0 (slot decay_slots) weight = 1.0 - (i / decay_slots) effective_factor = 1.0 + (factor - 1.0) * weight corrected.append( replace( slot, pv_a_forecast_w=max(0, int(slot.pv_a_forecast_w * effective_factor)), pv_b_forecast_w=max(0, int(slot.pv_b_forecast_w * effective_factor)), ) ) return corrected