diff --git a/backend/app/main.py b/backend/app/main.py index e869f0b..1a7afc7 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -271,7 +271,7 @@ async def lifespan(app: FastAPI): logger.exception("scheduled_forecast_refresh site=%s failed", site_id) async def _count_ote_slots_for_day( - conn: asyncpg.Connection, site_id: int, target_day: date + conn: asyncpg.Connection, target_day: date ) -> int: return int( await conn.fetchval( @@ -286,50 +286,51 @@ async def lifespan(app: FastAPI): or 0 ) - async def _scheduled_ote_import_for_site( - conn: asyncpg.Connection, site_id: int + async def _refresh_negative_price_predictions_all_active( + conn: asyncpg.Connection, ) -> None: - tz_name = await conn.fetchval( - "SELECT timezone FROM ems.site WHERE id = $1", - site_id, - ) - tz = ZoneInfo(tz_name or "Europe/Prague") - now_loc = datetime.now(tz) + sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") + for site in sites: + await _refresh_negative_price_predictions(conn, int(site["id"])) + + async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None: + """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site.""" + prague_tz = ZoneInfo("Europe/Prague") + now_loc = datetime.now(prague_tz) today = now_loc.date() tomorrow = today + timedelta(days=1) + any_import_ok = False - # Zajistit data pro dnešek i zítřek; import jen pokud není kompletních 96 slotů. for day in (today, tomorrow): - slots = await _count_ote_slots_for_day(conn, site_id, day) + slots = await _count_ote_slots_for_day(conn, day) if slots >= 96: continue n, imported_day, _, err = await import_ote_prices( - site_id, conn, target_date=day + conn, site_id=None, target_date=day ) if n < 0: logger.warning( - "scheduled_ote_import site=%s day=%s failed (%s)", - site_id, + "scheduled_ote_import_global day=%s failed (%s)", day.isoformat(), err, ) continue logger.info( - "scheduled_ote_import site=%s day=%s imported=%s", - site_id, + "scheduled_ote_import_global day=%s imported=%s slots", imported_day, n, ) - await _refresh_negative_price_predictions(conn, site_id) + any_import_ok = True + + if any_import_ok: + await _refresh_negative_price_predictions_all_active(conn) async def scheduled_ote_import() -> None: async with app.state.pg_pool.acquire() as conn: - sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") - for site in sites: - try: - await _scheduled_ote_import_for_site(conn, int(site["id"])) - except Exception: - logger.exception("scheduled_ote_import site=%s failed", site["id"]) + try: + await _scheduled_ote_import_global(conn) + except Exception: + logger.exception("scheduled_ote_import_global failed") scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") scheduler.add_job( @@ -598,17 +599,6 @@ class ModbusVerifyResponse(BaseModel): commands: list[ModbusCommandVerifyItem] -class NegativePricePredictionItem(BaseModel): - id: int - predicted_at: datetime - predicted_date: date - window_start_hour: int - window_end_hour: int - probability_pct: int - expected_min_price: float | None - reason: str | None - - async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None: """Po importu cen / forecastu obnoví cache predikce záporných cen.""" try: @@ -621,14 +611,23 @@ async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: ) -@sites_router.post("/{site_id}/prices/import", response_model=PricesImportResponse) +@sites_router.post( + "/{site_id}/prices/import", + response_model=PricesImportResponse, + summary="Import OTE cen (globální)", + description=( + "Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). " + "site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se " + "obnoví predikce záporných cen pro všechny aktivní lokality." + ), +) async def post_import_site_prices( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], date_str: str | None = Query( None, alias="date", - description="YYYY-MM-DD; výchozí = zítřek v časové zóně lokality", + description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)", ), ) -> PricesImportResponse: target: date | None = _parse_ymd(date_str) if date_str is not None else None @@ -637,9 +636,13 @@ async def post_import_site_prices( site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") - n, day, first_price, import_error = await import_ote_prices(site_id, conn, target_date=target) + n, day, first_price, import_error = await import_ote_prices( + conn, site_id=None, target_date=target + ) if n >= 0: - await _refresh_negative_price_predictions(conn, site_id) + sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true") + for site in sites: + await _refresh_negative_price_predictions(conn, int(site["id"])) if n < 0: raise HTTPException( status_code=422, @@ -652,24 +655,44 @@ async def post_import_site_prices( ) +class NegPricePredictionItem(BaseModel): + predicted_date: str + window_start_hour: int + window_end_hour: int + probability_pct: float + expected_min_price: float | None + reason: str + + +class NegativePredictionsResponse(BaseModel): + predictions: list[NegPricePredictionItem] + insufficient_history: bool + + @sites_router.get( "/{site_id}/prices/negative-predictions", - response_model=list[NegativePricePredictionItem], + response_model=NegativePredictionsResponse, ) async def get_site_negative_price_predictions( site_id: int, db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> list[NegativePricePredictionItem]: - """Záznamy z cache predikce záporných cen na příštích 7 kalendářních dní (v časové zóně lokality).""" +) -> NegativePredictionsResponse: + """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE.""" async with db.acquire() as conn: site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) if not site_ok: raise HTTPException(status_code=404, detail="Site not found") + ndays = await conn.fetchval( + """ + SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int + FROM ems.market_interval_price + WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') + AND interval_start >= now() - INTERVAL '400 days' + """ + ) rows = await conn.fetch( """ SELECT - p.id, - p.predicted_at, p.predicted_date, p.window_start_hour, p.window_end_hour, @@ -694,22 +717,25 @@ async def get_site_negative_price_predictions( """, site_id, ) - out: list[NegativePricePredictionItem] = [] + n_hist = int(ndays or 0) + predictions: list[NegPricePredictionItem] = [] for r in rows: em = r["expected_min_price"] - out.append( - NegativePricePredictionItem( - id=int(r["id"]), - predicted_at=r["predicted_at"], - predicted_date=r["predicted_date"], + pd = r["predicted_date"] + predictions.append( + NegPricePredictionItem( + predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd), window_start_hour=int(r["window_start_hour"]), window_end_hour=int(r["window_end_hour"]), - probability_pct=int(r["probability_pct"]), + probability_pct=float(r["probability_pct"]), expected_min_price=float(em) if em is not None else None, - reason=r["reason"], + reason=r["reason"] if r["reason"] is not None else "", ) ) - return out + return NegativePredictionsResponse( + predictions=predictions, + insufficient_history=n_hist < 28, + ) @sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) @@ -1004,45 +1030,30 @@ async def get_site_forecast_pv( return {"pv_a": pv_a, "pv_b": pv_b} -class NegPricePredictionItem(BaseModel): - predicted_date: str - window_start_hour: int - window_end_hour: int - probability_pct: float - expected_min_price: float | None - reason: str +me_router = APIRouter(prefix="/api/v1/me", tags=["me"]) -class NegativePredictionsResponse(BaseModel): - predictions: list[NegPricePredictionItem] - insufficient_history: bool - - -@sites_router.get( - "/{site_id}/prices/negative-predictions", - response_model=NegativePredictionsResponse, +@me_router.get( + "/sites", + summary="Lokality přihlášeného uživatele (fáze bez auth)", + description="Aktuálně vrací všechny aktivní lokality; po zavedení autentizace se odfiltruje podle oprávnění.", ) -async def get_site_negative_price_predictions( - site_id: int, +async def list_my_sites( db: Annotated[asyncpg.Pool, Depends(get_pool)], -) -> NegativePredictionsResponse: - """Zástupný endpoint – predikce modelu doplnit později; historii počítáme z OTE dat.""" +) -> list[dict[str, Any]]: async with db.acquire() as conn: - site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) - if not site_ok: - raise HTTPException(status_code=404, detail="Site not found") - ndays = await conn.fetchval( + rows = await conn.fetch( """ - SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int - FROM ems.market_interval_price - WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') - AND interval_start >= now() - INTERVAL '400 days' + SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at + FROM ems.site + WHERE active = true + ORDER BY code """ ) - n = int(ndays or 0) - return NegativePredictionsResponse(predictions=[], insufficient_history=n < 28) + return [record_to_dict(r) for r in rows] +app.include_router(me_router) app.include_router(sites_router) app.add_middleware( diff --git a/backend/services/price_importer.py b/backend/services/price_importer.py index fac41f3..2ce5e3d 100644 --- a/backend/services/price_importer.py +++ b/backend/services/price_importer.py @@ -90,27 +90,34 @@ async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]: return None, last_err +OTE_TZ = ZoneInfo("Europe/Prague") + + async def import_ote_prices( - site_id: int, db, + site_id: int | None = None, target_date: date | None = None, ) -> tuple[int, str, float, str | None]: """ Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json. - Python nedělá žádné parsování ani přepočty – vše je v DB funkcích. + Data jsou globální (market_interval_price); site_id je volitelné jen pro výběr + „dnes/zítra“ při target_date=None (timezone lokality vs. výchozí Europe/Prague). Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code) (-1, datum_str, 0.0, error_code) při chybě """ settings = get_settings() - row = await db.fetchrow( - "SELECT timezone FROM ems.site WHERE id = $1", site_id - ) - if row is None: - logger.error("OTE import: site id=%s nenalezen", site_id) - return -1, "", 0.0, "site_not_found" + if site_id is not None: + row = await db.fetchrow( + "SELECT timezone FROM ems.site WHERE id = $1", site_id + ) + if row is None: + logger.error("OTE import: site id=%s nenalezen", site_id) + return -1, "", 0.0, "site_not_found" + site_tz = ZoneInfo(row["timezone"] or "Europe/Prague") + else: + site_tz = OTE_TZ - site_tz = ZoneInfo(row["timezone"] or "Europe/Prague") now_site = datetime.now(site_tz) today_site = now_site.date() tomorrow_site = today_site + timedelta(days=1)