Files
ems/backend/scripts/backfill_ote_prices.py
Dusan Vojacek f0dfcefd54
All checks were successful
deploy / deploy (push) Successful in 19s
test / smoke-test (push) Successful in 5s
fix letni /zimni cas OTE
2026-04-12 21:57:37 +02:00

222 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Doplnění ems.market_interval_price z veřejného OTE JSON endpointu (stejný jako price_importer).
Produkce (Docker závislosti v image backendu), z adresáře kde leží docker-compose.yml:
cd /opt/ems-deploy
docker compose exec -T backend python3 scripts/backfill_ote_prices.py --dry-run
Nebo z kořene stacku: bash app/deploy/run_backfill_ote_prices.sh --dry-run
Lokálně (venv s backend/requirements.txt):
cd /path/to/ems-cursor
PYTHONPATH=backend python3 backend/scripts/backfill_ote_prices.py --dry-run
Volby:
--days 730 posledních N kalendářních dní (Europe/Prague), výchozí 730 ≈ 2 roky
--from-date / --to-date pevný rozsah YYYY-MM-DD (má přednost před --days u konce rozsahu)
--force stáhnout znovu i dny s plným počtem slotů OTE (92/96/100)
--dry-run jen vypsat chybějící dny, bez HTTP
--delay SEC pauza mezi dny (výchozí 0.35)
--refresh-predictions po skončení zavolat fn_predict_negative_price_windows pro aktivní site
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import sys
from datetime import date, datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
_BACKEND_ROOT = Path(__file__).resolve().parent.parent
if str(_BACKEND_ROOT) not in sys.path:
sys.path.insert(0, str(_BACKEND_ROOT))
os.chdir(_BACKEND_ROOT)
try:
import asyncpg
except ModuleNotFoundError as e:
print(
"Chybí modul 'asyncpg' (závislost backendu).\n"
"\n"
"Na serveru s Docker stackem EMS spusťte skript uvnitř kontejneru backendu, např.:\n"
" cd /opt/ems-deploy\n"
" docker compose exec -T backend python3 scripts/backfill_ote_prices.py --dry-run\n"
"\n"
"Lokálně nainstalujte závislosti: pip install -r backend/requirements.txt\n",
file=sys.stderr,
)
raise SystemExit(1) from e
from app.config import get_settings # noqa: E402
from services.price_importer import ( # noqa: E402
OTE_FULL_DAY_SLOT_COUNTS,
backfill_ote_prices,
count_ote_slots_prague_day,
ote_prague_day_slots_look_complete,
)
PRAGUE = ZoneInfo("Europe/Prague")
def _parse_ymd(s: str) -> date:
y, m, d = (int(p) for p in s.split("-", 2))
return date(y, m, d)
async def _dry_run_missing(
conn: asyncpg.Connection,
start: date,
end: date,
today_prague: date,
) -> list[date]:
out: list[date] = []
d = start
while d <= end:
if d > today_prague:
break
n = await count_ote_slots_prague_day(conn, d)
if not ote_prague_day_slots_look_complete(n):
out.append(d)
d += timedelta(days=1)
return out
async def _refresh_predictions_all(conn: asyncpg.Connection) -> None:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for row in sites:
sid = int(row["id"])
try:
await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", sid)
logging.info("Predikce záporných cen obnovena pro site_id=%s", sid)
except Exception:
logging.exception("fn_predict_negative_price_windows selhalo pro site_id=%s", sid)
async def main_async(args: argparse.Namespace) -> int:
settings = get_settings()
pool = await asyncpg.create_pool(
host=settings.db_host,
port=settings.db_port,
user=settings.db_user,
password=settings.db_password,
database=settings.db_name,
min_size=1,
max_size=3,
)
try:
today_prague = datetime.now(PRAGUE).date()
if args.to_date:
end = _parse_ymd(args.to_date)
else:
end = today_prague
if args.from_date:
start = _parse_ymd(args.from_date)
else:
start = end - timedelta(days=max(0, int(args.days) - 1))
if start > end:
logging.error("--from-date je po --to-date")
return 2
logging.info(
"Rozsah backfillu: %s%s (kurz EUR/CZK z .env = %s)",
start.isoformat(),
end.isoformat(),
settings.eur_czk_rate,
)
async with pool.acquire() as conn:
if args.dry_run:
missing = await _dry_run_missing(conn, start, end, today_prague)
logging.info(
"Dry-run: %s chybějících nebo neúplných dní (plný den = jedna z %s)",
len(missing),
sorted(OTE_FULL_DAY_SLOT_COUNTS),
)
for md in missing[:50]:
n = await count_ote_slots_prague_day(conn, md)
logging.info(" %s (%s slotů)", md.isoformat(), n)
if len(missing) > 50:
logging.info(" … a dalších %s dní", len(missing) - 50)
return 0
stats = await backfill_ote_prices(
conn,
start_date=start,
end_date=end,
only_missing=not args.force,
pause_between_days_s=float(args.delay),
)
logging.info(
"Hotovo: zkontrolováno %s dní, importováno %s, přeskočeno (kompletní) %s, "
"přeskočeno (budoucnost) %s, selhalo %s",
stats.days_checked,
stats.days_imported,
stats.days_skipped_complete,
stats.days_skipped_future,
stats.days_failed,
)
for day_str, err in stats.failures[:20]:
logging.warning(" %s: %s", day_str, err)
if len(stats.failures) > 20:
logging.warning(" … dalších %s chyb v seznamu", len(stats.failures) - 20)
if args.refresh_predictions and stats.days_imported > 0:
await _refresh_predictions_all(conn)
return 1 if stats.days_failed else 0
finally:
await pool.close()
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(message)s",
)
parser = argparse.ArgumentParser(description="Backfill OTE cen do ems.market_interval_price")
parser.add_argument(
"--days",
type=int,
default=730,
help="Počet dní zpět od --to-date (výchozí 730)",
)
parser.add_argument("--from-date", type=str, default=None, help="YYYY-MM-DD začátek rozsahu")
parser.add_argument(
"--to-date",
type=str,
default=None,
help="YYYY-MM-DD konec rozsahu (výchozí dnes Europe/Prague)",
)
parser.add_argument(
"--force",
action="store_true",
help="Stáhnout znovu i dny s plným počtem slotů OTE (92/96/100)",
)
parser.add_argument("--dry-run", action="store_true", help="Jen vypsat chybějící dny")
parser.add_argument(
"--delay",
type=float,
default=0.35,
help="Sekundy pauzy mezi dny (výchozí 0.35)",
)
parser.add_argument(
"--refresh-predictions",
action="store_true",
help="Po importu obnovit fn_predict_negative_price_windows pro aktivní lokality",
)
ns = parser.parse_args()
raise SystemExit(asyncio.run(main_async(ns)))
if __name__ == "__main__":
main()