sql first refactor
Some checks failed
CI and deploy / migration-check (push) Successful in 5s
CI and deploy / deploy (push) Failing after 20s

This commit is contained in:
Dusan Vojacek
2026-04-19 20:02:20 +02:00
parent a02e11ee13
commit 93f883f5e0
74 changed files with 6022 additions and 4014 deletions

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import json
import logging
import os
from contextlib import asynccontextmanager
@@ -13,7 +14,7 @@ from zoneinfo import ZoneInfo
import asyncpg
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from app.db_json import record_to_dict
from app.db_json import fetch_json, record_to_dict
from app.deps import set_pg_pool
from app.routers.economics import router as economics_router
from app.routers.energy_flows import router as energy_flows_router
@@ -90,7 +91,7 @@ async def lifespan(app: FastAPI):
async def scheduled_heartbeat() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
await send_heartbeat(site["id"], conn)
@@ -99,7 +100,7 @@ async def lifespan(app: FastAPI):
async def scheduled_audit_filler() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
await fill_audit_for_completed_intervals(site["id"], conn)
@@ -108,7 +109,7 @@ async def lifespan(app: FastAPI):
async def scheduled_forecast_accuracy() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
n = await conn.fetchval(
@@ -143,7 +144,7 @@ async def lifespan(app: FastAPI):
async def scheduled_control_export() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
await export_setpoints(site["id"], conn)
@@ -156,7 +157,7 @@ async def lifespan(app: FastAPI):
Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu).
"""
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
cmd_rows = await conn.fetch(
@@ -182,7 +183,7 @@ async def lifespan(app: FastAPI):
async def scheduled_daily_plan() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
site_id = int(site["id"])
try:
@@ -194,7 +195,7 @@ async def lifespan(app: FastAPI):
async def scheduled_rolling_replan() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
site_id = int(site["id"])
try:
@@ -206,7 +207,7 @@ async def lifespan(app: FastAPI):
async def scheduled_baseline_update() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
n = await conn.fetchval(
@@ -225,7 +226,7 @@ async def lifespan(app: FastAPI):
async def scheduled_market_price_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
n = await conn.fetchval(
@@ -244,7 +245,7 @@ async def lifespan(app: FastAPI):
async def scheduled_tuv_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
try:
n = await conn.fetchval(
@@ -263,7 +264,7 @@ async def lifespan(app: FastAPI):
async def scheduled_forecast_refresh() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
site_id = int(site["id"])
try:
@@ -303,7 +304,7 @@ async def lifespan(app: FastAPI):
async def _refresh_negative_price_predictions_all_active(
conn: asyncpg.Connection,
) -> None:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
await _refresh_negative_price_predictions(conn, int(site["id"]))
@@ -444,7 +445,7 @@ async def lifespan(app: FastAPI):
from services.notification_service import notify_daily_economics
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id, code FROM ems.site WHERE active = true")
sites = await conn.fetch("select id, code from ems.vw_site_directory where active = true")
for site in sites:
site_id = int(site["id"])
site_code = site["code"]
@@ -546,9 +547,9 @@ async def list_sites(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> list[dic
async with db.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at
FROM ems.site
ORDER BY id
select id, code, name, timezone, latitude, longitude, active, notes, created_at
from ems.vw_site_directory
order by id
"""
)
return [record_to_dict(r) for r in rows]
@@ -567,17 +568,15 @@ async def get_site_prices(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT *
FROM ems.vw_site_effective_price
WHERE site_id = $1 AND interval_start::date = $2::date
ORDER BY interval_start
""",
rows = await fetch_json(
conn,
"select ems.fn_site_effective_prices_day_prague($1::int, $2::date)",
site_id,
d,
)
return [record_to_dict(r) for r in rows]
if not isinstance(rows, list):
rows = json.loads(rows) if isinstance(rows, str) else []
return [r for r in rows if isinstance(r, dict)]
class PricesImportResponse(BaseModel):
@@ -656,7 +655,7 @@ async def post_import_site_prices(
conn, site_id=None, target_date=target
)
if n >= 0:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
sites = await conn.fetch("select id from ems.vw_site_directory where active = true")
for site in sites:
await _refresh_negative_price_predictions(conn, int(site["id"]))
if n < 0:
@@ -698,59 +697,35 @@ async def get_site_negative_price_predictions(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
ndays = await conn.fetchval(
"""
SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int
FROM ems.market_interval_price
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
AND interval_start >= now() - INTERVAL '400 days'
"""
)
rows = await conn.fetch(
"""
SELECT
p.predicted_date,
p.window_start_hour,
p.window_end_hour,
p.probability_pct,
p.expected_min_price,
p.reason
FROM ems.predicted_negative_price_window p
WHERE p.site_id = $1
AND p.predicted_date > (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
'Europe/Prague'
)
)::date
AND p.predicted_date <= (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
'Europe/Prague'
)
)::date + 7
ORDER BY p.predicted_date, p.window_start_hour
""",
bundle = await fetch_json(
conn,
"select ems.fn_negative_price_predictions($1::int)",
site_id,
)
n_hist = int(ndays or 0)
if not isinstance(bundle, dict):
bundle = json.loads(bundle)
rows = bundle.get("predictions") or []
if not isinstance(rows, list):
rows = []
predictions: list[NegPricePredictionItem] = []
for r in rows:
em = r["expected_min_price"]
pd = r["predicted_date"]
if not isinstance(r, dict):
continue
em = r.get("expected_min_price")
pd = r.get("predicted_date")
predictions.append(
NegPricePredictionItem(
predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd),
window_start_hour=int(r["window_start_hour"]),
window_end_hour=int(r["window_end_hour"]),
probability_pct=float(r["probability_pct"]),
window_start_hour=int(r.get("window_start_hour") or 0),
window_end_hour=int(r.get("window_end_hour") or 0),
probability_pct=float(r.get("probability_pct") or 0),
expected_min_price=float(em) if em is not None else None,
reason=r["reason"] if r["reason"] is not None else "",
reason=str(r.get("reason") or ""),
)
)
return NegativePredictionsResponse(
predictions=predictions,
insufficient_history=n_hist < 28,
insufficient_history=bool(bundle.get("insufficient_history")),
)
@@ -763,29 +738,19 @@ async def get_site_prices_latest(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
row = await conn.fetchrow(
"""
SELECT
(interval_start AT TIME ZONE 'Europe/Prague')::date AS day,
COUNT(*)::int AS slots,
MIN(buy_raw_price_czk_kwh)::float AS min_price,
MAX(buy_raw_price_czk_kwh)::float AS max_price,
AVG(buy_raw_price_czk_kwh)::float AS avg_price
FROM ems.market_interval_price
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
GROUP BY day
ORDER BY day DESC
LIMIT 1
"""
)
if row is None or row["day"] is None:
row = await fetch_json(conn, "select ems.fn_latest_ote_day_stats()")
if not isinstance(row, dict):
row = json.loads(row)
day = row.get("latest_date")
if day is None:
raise HTTPException(status_code=404, detail="Žádná tržní data v databázi")
latest_date = day.isoformat() if hasattr(day, "isoformat") else str(day)[:10]
return PricesLatestResponse(
latest_date=row["day"].isoformat(),
slots=int(row["slots"] or 0),
min_price=float(row["min_price"] or 0.0),
max_price=float(row["max_price"] or 0.0),
avg_price=float(row["avg_price"] or 0.0),
latest_date=latest_date,
slots=int(row.get("slots") or 0),
min_price=float(row.get("min_price") or 0.0),
max_price=float(row.get("max_price") or 0.0),
avg_price=float(row.get("avg_price") or 0.0),
)
@@ -807,48 +772,45 @@ async def get_verify_modbus_commands(
raise HTTPException(status_code=404, detail="Site not found")
lookback = timedelta(minutes=minutes)
rows = await conn.fetch(
"""
SELECT id FROM ems.modbus_command
WHERE site_id = $1
AND status = 'written'
AND written_at >= now() - $2::interval
ORDER BY written_at
""",
id_json = await fetch_json(
conn,
"select ems.fn_modbus_written_command_ids($1::int, $2::interval)",
site_id,
lookback,
)
ids = [int(r["id"]) for r in rows]
if not isinstance(id_json, list):
id_json = json.loads(id_json) if isinstance(id_json, str) else []
ids = [int(x) for x in id_json]
checked = len(ids)
if ids:
await verify_modbus_commands(ids, conn, site_id)
detail_rows = (
await conn.fetch(
"""
SELECT id, asset_code, register_name, value_to_write, value_verified, status
FROM ems.modbus_command
WHERE id = ANY($1::int[])
ORDER BY id
""",
detail_json = (
await fetch_json(
conn,
"select ems.fn_modbus_commands_by_ids($1::int[])",
ids,
)
if ids
else []
)
if ids and not isinstance(detail_json, list):
detail_json = json.loads(detail_json) if isinstance(detail_json, str) else []
detail_rows = detail_json if ids else []
commands = [
ModbusCommandVerifyItem(
id=int(r["id"]),
asset_code=r["asset_code"],
register_name=r["register_name"],
asset_code=str(r.get("asset_code") or ""),
register_name=r.get("register_name"),
value_to_write=int(r["value_to_write"]),
value_verified=int(r["value_verified"])
if r["value_verified"] is not None
if r.get("value_verified") is not None
else None,
status=r["status"],
status=str(r.get("status") or ""),
)
for r in detail_rows
if isinstance(r, dict)
]
verified = sum(1 for c in commands if c.status == "verified")
mismatch = sum(1 for c in commands if c.status == "mismatch")
@@ -933,21 +895,17 @@ async def get_control_command_journal(
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT id, register, register_name, value_to_write, value_written,
value_verified, status, attempt_count, created_at
FROM ems.modbus_command
WHERE site_id = $1
ORDER BY created_at DESC
LIMIT $2
""",
rows = await fetch_json(
conn,
"select ems.fn_modbus_journal_list($1::int, $2::int)",
site_id,
limit,
)
if not isinstance(rows, list):
rows = json.loads(rows) if isinstance(rows, str) else []
cmds: list[ModbusJournalCommandRow] = []
for r in rows:
d = record_to_dict(r)
d = r if isinstance(r, dict) else {}
ca = d["created_at"]
cmds.append(
ModbusJournalCommandRow(
@@ -1006,51 +964,20 @@ async def get_site_forecast_pv(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT run_id, pv_array_id, interval_start, power_w,
irradiance_wm2, temp_c, pv_array_code, controllable
FROM (
SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id)
fpi.run_id,
fpi.pv_array_id,
fpi.interval_start,
fpi.power_w,
fpi.irradiance_wm2,
fpi.temp_c,
apa.code AS pv_array_code,
apa.controllable
FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
JOIN ems.asset_pv_array apa
ON apa.id = fpr.pv_array_id AND apa.site_id = fpr.site_id
WHERE fpr.site_id = $1
AND (
fpi.interval_start
AT TIME ZONE COALESCE(
(SELECT timezone FROM ems.site WHERE id = $1),
'Europe/Prague'
)
)::date = $2::date
AND fpr.status = 'ok'
ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC
) latest
ORDER BY controllable DESC, pv_array_code, interval_start
""",
split = await fetch_json(
conn,
"select ems.fn_forecast_pv_split($1::int, $2::date)",
site_id,
d,
)
# pv_a = řiditelná pole (curtailment / Deye), pv_b = neřízená (GEN, …) — sloučí více orientací
pv_a: list[dict[str, Any]] = []
pv_b: list[dict[str, Any]] = []
for r in rows:
item = record_to_dict(r)
item.pop("controllable", None)
if r["controllable"]:
pv_a.append(item)
else:
pv_b.append(item)
if not isinstance(split, dict):
split = json.loads(split) if isinstance(split, str) else {}
pv_a = split.get("pv_a") or []
pv_b = split.get("pv_b") or []
if not isinstance(pv_a, list):
pv_a = []
if not isinstance(pv_b, list):
pv_b = []
return {"pv_a": pv_a, "pv_b": pv_b}