priprava multisite
All checks were successful
deploy / deploy (push) Successful in 18s
test / smoke-test (push) Successful in 5s

This commit is contained in:
Dusan Vojacek
2026-04-05 21:56:58 +02:00
parent 22b7fa7756
commit 61892b258a
2 changed files with 108 additions and 90 deletions

View File

@@ -271,7 +271,7 @@ async def lifespan(app: FastAPI):
logger.exception("scheduled_forecast_refresh site=%s failed", site_id) logger.exception("scheduled_forecast_refresh site=%s failed", site_id)
async def _count_ote_slots_for_day( async def _count_ote_slots_for_day(
conn: asyncpg.Connection, site_id: int, target_day: date conn: asyncpg.Connection, target_day: date
) -> int: ) -> int:
return int( return int(
await conn.fetchval( await conn.fetchval(
@@ -286,50 +286,51 @@ async def lifespan(app: FastAPI):
or 0 or 0
) )
async def _scheduled_ote_import_for_site( async def _refresh_negative_price_predictions_all_active(
conn: asyncpg.Connection, site_id: int conn: asyncpg.Connection,
) -> None: ) -> None:
tz_name = await conn.fetchval( sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
"SELECT timezone FROM ems.site WHERE id = $1", for site in sites:
site_id, await _refresh_negative_price_predictions(conn, int(site["id"]))
)
tz = ZoneInfo(tz_name or "Europe/Prague") async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None:
now_loc = datetime.now(tz) """Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site."""
prague_tz = ZoneInfo("Europe/Prague")
now_loc = datetime.now(prague_tz)
today = now_loc.date() today = now_loc.date()
tomorrow = today + timedelta(days=1) tomorrow = today + timedelta(days=1)
any_import_ok = False
# Zajistit data pro dnešek i zítřek; import jen pokud není kompletních 96 slotů.
for day in (today, tomorrow): for day in (today, tomorrow):
slots = await _count_ote_slots_for_day(conn, site_id, day) slots = await _count_ote_slots_for_day(conn, day)
if slots >= 96: if slots >= 96:
continue continue
n, imported_day, _, err = await import_ote_prices( n, imported_day, _, err = await import_ote_prices(
site_id, conn, target_date=day conn, site_id=None, target_date=day
) )
if n < 0: if n < 0:
logger.warning( logger.warning(
"scheduled_ote_import site=%s day=%s failed (%s)", "scheduled_ote_import_global day=%s failed (%s)",
site_id,
day.isoformat(), day.isoformat(),
err, err,
) )
continue continue
logger.info( logger.info(
"scheduled_ote_import site=%s day=%s imported=%s", "scheduled_ote_import_global day=%s imported=%s slots",
site_id,
imported_day, imported_day,
n, n,
) )
await _refresh_negative_price_predictions(conn, site_id) any_import_ok = True
if any_import_ok:
await _refresh_negative_price_predictions_all_active(conn)
async def scheduled_ote_import() -> None: async def scheduled_ote_import() -> None:
async with app.state.pg_pool.acquire() as conn: async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try: try:
await _scheduled_ote_import_for_site(conn, int(site["id"])) await _scheduled_ote_import_global(conn)
except Exception: except Exception:
logger.exception("scheduled_ote_import site=%s failed", site["id"]) logger.exception("scheduled_ote_import_global failed")
scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat") scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat")
scheduler.add_job( scheduler.add_job(
@@ -598,17 +599,6 @@ class ModbusVerifyResponse(BaseModel):
commands: list[ModbusCommandVerifyItem] commands: list[ModbusCommandVerifyItem]
class NegativePricePredictionItem(BaseModel):
id: int
predicted_at: datetime
predicted_date: date
window_start_hour: int
window_end_hour: int
probability_pct: int
expected_min_price: float | None
reason: str | None
async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None: async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None:
"""Po importu cen / forecastu obnoví cache predikce záporných cen.""" """Po importu cen / forecastu obnoví cache predikce záporných cen."""
try: try:
@@ -621,14 +611,23 @@ async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id:
) )
@sites_router.post("/{site_id}/prices/import", response_model=PricesImportResponse) @sites_router.post(
"/{site_id}/prices/import",
response_model=PricesImportResponse,
summary="Import OTE cen (globální)",
description=(
"Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). "
"site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se "
"obnoví predikce záporných cen pro všechny aktivní lokality."
),
)
async def post_import_site_prices( async def post_import_site_prices(
site_id: int, site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)], db: Annotated[asyncpg.Pool, Depends(get_pool)],
date_str: str | None = Query( date_str: str | None = Query(
None, None,
alias="date", alias="date",
description="YYYY-MM-DD; výchozí = zítřek v časové zóně lokality", description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)",
), ),
) -> PricesImportResponse: ) -> PricesImportResponse:
target: date | None = _parse_ymd(date_str) if date_str is not None else None target: date | None = _parse_ymd(date_str) if date_str is not None else None
@@ -637,9 +636,13 @@ async def post_import_site_prices(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok: if not site_ok:
raise HTTPException(status_code=404, detail="Site not found") raise HTTPException(status_code=404, detail="Site not found")
n, day, first_price, import_error = await import_ote_prices(site_id, conn, target_date=target) n, day, first_price, import_error = await import_ote_prices(
conn, site_id=None, target_date=target
)
if n >= 0: if n >= 0:
await _refresh_negative_price_predictions(conn, site_id) sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
await _refresh_negative_price_predictions(conn, int(site["id"]))
if n < 0: if n < 0:
raise HTTPException( raise HTTPException(
status_code=422, status_code=422,
@@ -652,24 +655,44 @@ async def post_import_site_prices(
) )
class NegPricePredictionItem(BaseModel):
predicted_date: str
window_start_hour: int
window_end_hour: int
probability_pct: float
expected_min_price: float | None
reason: str
class NegativePredictionsResponse(BaseModel):
predictions: list[NegPricePredictionItem]
insufficient_history: bool
@sites_router.get( @sites_router.get(
"/{site_id}/prices/negative-predictions", "/{site_id}/prices/negative-predictions",
response_model=list[NegativePricePredictionItem], response_model=NegativePredictionsResponse,
) )
async def get_site_negative_price_predictions( async def get_site_negative_price_predictions(
site_id: int, site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)], db: Annotated[asyncpg.Pool, Depends(get_pool)],
) -> list[NegativePricePredictionItem]: ) -> NegativePredictionsResponse:
"""Záznamy z cache predikce záporných cen na příštích 7 kalendářních dní (v časové zóně lokality).""" """Cache predikce záporných cen (per site) + informace, zda je dost historie OTE."""
async with db.acquire() as conn: async with db.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok: if not site_ok:
raise HTTPException(status_code=404, detail="Site not found") raise HTTPException(status_code=404, detail="Site not found")
ndays = await conn.fetchval(
"""
SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int
FROM ems.market_interval_price
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
AND interval_start >= now() - INTERVAL '400 days'
"""
)
rows = await conn.fetch( rows = await conn.fetch(
""" """
SELECT SELECT
p.id,
p.predicted_at,
p.predicted_date, p.predicted_date,
p.window_start_hour, p.window_start_hour,
p.window_end_hour, p.window_end_hour,
@@ -694,22 +717,25 @@ async def get_site_negative_price_predictions(
""", """,
site_id, site_id,
) )
out: list[NegativePricePredictionItem] = [] n_hist = int(ndays or 0)
predictions: list[NegPricePredictionItem] = []
for r in rows: for r in rows:
em = r["expected_min_price"] em = r["expected_min_price"]
out.append( pd = r["predicted_date"]
NegativePricePredictionItem( predictions.append(
id=int(r["id"]), NegPricePredictionItem(
predicted_at=r["predicted_at"], predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd),
predicted_date=r["predicted_date"],
window_start_hour=int(r["window_start_hour"]), window_start_hour=int(r["window_start_hour"]),
window_end_hour=int(r["window_end_hour"]), window_end_hour=int(r["window_end_hour"]),
probability_pct=int(r["probability_pct"]), probability_pct=float(r["probability_pct"]),
expected_min_price=float(em) if em is not None else None, expected_min_price=float(em) if em is not None else None,
reason=r["reason"], reason=r["reason"] if r["reason"] is not None else "",
) )
) )
return out return NegativePredictionsResponse(
predictions=predictions,
insufficient_history=n_hist < 28,
)
@sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse) @sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse)
@@ -1004,45 +1030,30 @@ async def get_site_forecast_pv(
return {"pv_a": pv_a, "pv_b": pv_b} return {"pv_a": pv_a, "pv_b": pv_b}
class NegPricePredictionItem(BaseModel): me_router = APIRouter(prefix="/api/v1/me", tags=["me"])
predicted_date: str
window_start_hour: int
window_end_hour: int
probability_pct: float
expected_min_price: float | None
reason: str
class NegativePredictionsResponse(BaseModel): @me_router.get(
predictions: list[NegPricePredictionItem] "/sites",
insufficient_history: bool summary="Lokality přihlášeného uživatele (fáze bez auth)",
description="Aktuálně vrací všechny aktivní lokality; po zavedení autentizace se odfiltruje podle oprávnění.",
@sites_router.get(
"/{site_id}/prices/negative-predictions",
response_model=NegativePredictionsResponse,
) )
async def get_site_negative_price_predictions( async def list_my_sites(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)], db: Annotated[asyncpg.Pool, Depends(get_pool)],
) -> NegativePredictionsResponse: ) -> list[dict[str, Any]]:
"""Zástupný endpoint predikce modelu doplnit později; historii počítáme z OTE dat."""
async with db.acquire() as conn: async with db.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id) rows = await conn.fetch(
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
ndays = await conn.fetchval(
""" """
SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at
FROM ems.market_interval_price FROM ems.site
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM') WHERE active = true
AND interval_start >= now() - INTERVAL '400 days' ORDER BY code
""" """
) )
n = int(ndays or 0) return [record_to_dict(r) for r in rows]
return NegativePredictionsResponse(predictions=[], insufficient_history=n < 28)
app.include_router(me_router)
app.include_router(sites_router) app.include_router(sites_router)
app.add_middleware( app.add_middleware(

View File

@@ -90,27 +90,34 @@ async def _fetch_ote_json(date_str: str) -> tuple[dict | None, str | None]:
return None, last_err return None, last_err
OTE_TZ = ZoneInfo("Europe/Prague")
async def import_ote_prices( async def import_ote_prices(
site_id: int,
db, db,
site_id: int | None = None,
target_date: date | None = None, target_date: date | None = None,
) -> tuple[int, str, float, str | None]: ) -> tuple[int, str, float, str | None]:
""" """
Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json. Stáhne OTE JSON a předá ho PostgreSQL funkci ems.fn_ote_import_from_json.
Python nedělá žádné parsování ani přepočty vše je v DB funkcích. Data jsou globální (market_interval_price); site_id je volitelné jen pro výběr
„dnes/zítra“ při target_date=None (timezone lokality vs. výchozí Europe/Prague).
Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code) Returns: (počet_slotů, datum_str, první_cena_kč_kwh, error_code)
(-1, datum_str, 0.0, error_code) při chybě (-1, datum_str, 0.0, error_code) při chybě
""" """
settings = get_settings() settings = get_settings()
if site_id is not None:
row = await db.fetchrow( row = await db.fetchrow(
"SELECT timezone FROM ems.site WHERE id = $1", site_id "SELECT timezone FROM ems.site WHERE id = $1", site_id
) )
if row is None: if row is None:
logger.error("OTE import: site id=%s nenalezen", site_id) logger.error("OTE import: site id=%s nenalezen", site_id)
return -1, "", 0.0, "site_not_found" return -1, "", 0.0, "site_not_found"
site_tz = ZoneInfo(row["timezone"] or "Europe/Prague") site_tz = ZoneInfo(row["timezone"] or "Europe/Prague")
else:
site_tz = OTE_TZ
now_site = datetime.now(site_tz) now_site = datetime.now(site_tz)
today_site = now_site.date() today_site = now_site.date()
tomorrow_site = today_site + timedelta(days=1) tomorrow_site = today_site + timedelta(days=1)