second version

This commit is contained in:
Dusan Vojacek
2026-04-03 14:23:16 +02:00
parent 897b95f728
commit 9f4126946d
105 changed files with 9738 additions and 1470 deletions

View File

@@ -24,17 +24,22 @@ class Settings(BaseSettings):
postgrest_anon_role: str = Field(default="ems_anon")
ote_api_url: str = Field(
default="https://www.ote-cr.cz/pubapi/v1/market-data/dam",
default=(
"https://www.ote-cr.cz/cs/kratkodobe-trhy/elektrina/denni-trh/@@chart-data"
),
)
eur_czk_rate: float = Field(default=25.0)
open_meteo_api_url: str = Field(
default="https://api.open-meteo.com/v1/forecast",
)
open_meteo_forecast_days: int = Field(default=7)
loxone_user: str = Field(default="")
loxone_password: str = Field(default="")
discord_webhook_url: str = Field(default="")
telemetry_poll_interval_sec: int = Field(default=60)
planning_horizon_hours: int = Field(default=36)
planning_hp_max_cost_czk_kwh: float = Field(default=3.0)

View File

@@ -8,6 +8,7 @@ import os
from contextlib import asynccontextmanager
from datetime import date, datetime, timedelta, timezone
from typing import Annotated, Any, Literal
from zoneinfo import ZoneInfo
import asyncpg
import httpx
@@ -17,13 +18,29 @@ from app.deps import set_pg_pool
from app.routers.ev import router as ev_router
from app.routers.full_status import router as full_status_router
from app.routers.plan import router as plan_router
from app.ws_log_handler import WSLogHandler
from app.ws_manager import manager
from fastapi import (
APIRouter,
Depends,
FastAPI,
HTTPException,
Query,
Request,
WebSocket,
WebSocketDisconnect,
)
from fastapi.middleware.cors import CORSMiddleware
from services.audit_filler import fill_audit_for_completed_intervals
from services.control_exporter import (
export_setpoints,
read_deye_registers_live,
verify_modbus_commands,
)
from services.heartbeat_service import send_heartbeat
from services.forecast_service import fetch_pv_forecast
from services.price_importer import import_ote_prices
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Query, Request
from services.audit_filler import fill_audit_for_completed_intervals
from services.heartbeat_service import send_heartbeat
from services.telemetry_collector import run_telemetry_loop_wrapper
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
@@ -47,7 +64,8 @@ async def get_pool() -> asyncpg.Pool:
return pool
scheduler = AsyncIOScheduler()
# Cron hodiny/minuty = Europe/Prague (import OTE 13:30 / 14:00, denní plán 15:00, …)
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague"))
@asynccontextmanager
@@ -57,7 +75,10 @@ async def lifespan(app: FastAPI):
set_pg_pool(pool)
app.state.pg_pool = pool
from services.control_exporter import export_setpoints
app.state.ws_log_handler = WSLogHandler()
app.state.ws_log_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(app.state.ws_log_handler)
from services.planning_engine import run_daily_plan, run_rolling_replan
async def scheduled_heartbeat() -> None:
@@ -78,6 +99,26 @@ async def lifespan(app: FastAPI):
except Exception:
logger.exception("scheduled_audit_filler site=%s failed", site["id"])
async def scheduled_forecast_accuracy() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
n = await conn.fetchval(
"SELECT ems.fn_fill_forecast_accuracy($1, 48)",
site["id"],
)
if n:
logger.info(
"forecast_accuracy filled %s slots for site %s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_forecast_accuracy site=%s failed", site["id"]
)
async def scheduled_expire_modes() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
@@ -94,23 +135,200 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.exception("scheduled_control_export site=%s: %s", site["id"], e)
async def scheduled_daily_plan() -> None:
async def scheduled_verify_modbus() -> None:
"""
Ověří příkazy ve stavu written z posledních 20 minut.
Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu).
"""
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
await run_daily_plan(site["id"], conn)
cmd_rows = await conn.fetch(
"""
SELECT id FROM ems.modbus_command
WHERE site_id = $1
AND status = 'written'
AND written_at >= now() - INTERVAL '20 minutes'
ORDER BY written_at
""",
site["id"],
)
if cmd_rows:
await verify_modbus_commands(
[int(r["id"]) for r in cmd_rows],
conn,
int(site["id"]),
)
except Exception:
logger.exception("scheduled_daily_plan site=%s failed", site["id"])
logger.exception(
"scheduled_verify_modbus site=%s failed", site["id"]
)
async def scheduled_daily_plan() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
site_id = int(site["id"])
try:
await run_daily_plan(site_id, conn)
# Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu.
await export_setpoints(site_id, conn)
except Exception:
logger.exception("scheduled_daily_plan site=%s failed", site_id)
async def scheduled_rolling_replan() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
site_id = int(site["id"])
try:
await run_rolling_replan(site["id"], conn)
await run_rolling_replan(site_id, conn)
# Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu.
await export_setpoints(site_id, conn)
except Exception:
logger.exception("scheduled_rolling_replan site=%s failed", site["id"])
logger.exception("scheduled_rolling_replan site=%s failed", site_id)
async def scheduled_baseline_update() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
n = await conn.fetchval(
"SELECT ems.fn_update_baseline_stats($1, 30)",
site["id"],
)
logger.info(
"baseline_stats updated %s rows for site %s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_baseline_update site=%s failed", site["id"]
)
async def scheduled_market_price_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
n = await conn.fetchval(
"SELECT ems.fn_update_market_price_stats($1, 90)",
site["id"],
)
logger.info(
"market_price_stats updated %s rows site=%s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_market_price_stats site=%s failed", site["id"]
)
async def scheduled_tuv_usage_stats() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
n = await conn.fetchval(
"SELECT ems.fn_update_tuv_usage_stats($1, 30)",
site["id"],
)
logger.info(
"tuv_usage_stats updated %s rows site=%s",
n,
site["id"],
)
except Exception:
logger.exception(
"scheduled_tuv_usage_stats site=%s failed", site["id"]
)
async def scheduled_forecast_refresh() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
site_id = int(site["id"])
try:
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
if intervals >= 0:
logger.info(
"scheduled_forecast_refresh site=%s intervals=%s arrays=%s",
site_id,
intervals,
pv_arrays,
)
await _refresh_negative_price_predictions(conn, site_id)
else:
logger.warning(
"scheduled_forecast_refresh site=%s failed",
site_id,
)
except Exception:
logger.exception("scheduled_forecast_refresh site=%s failed", site_id)
async def _count_ote_slots_for_day(
conn: asyncpg.Connection, site_id: int, target_day: date
) -> int:
return int(
await conn.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.market_interval_price
WHERE market_source = 'OTE_CZ'
AND interval_start::date = $1::date
""",
target_day,
)
or 0
)
async def _scheduled_ote_import_for_site(
conn: asyncpg.Connection, site_id: int
) -> None:
tz_name = await conn.fetchval(
"SELECT timezone FROM ems.site WHERE id = $1",
site_id,
)
tz = ZoneInfo(tz_name or "Europe/Prague")
now_loc = datetime.now(tz)
today = now_loc.date()
tomorrow = today + timedelta(days=1)
# Zajistit data pro dnešek i zítřek; import jen pokud není kompletních 96 slotů.
for day in (today, tomorrow):
slots = await _count_ote_slots_for_day(conn, site_id, day)
if slots >= 96:
continue
n, imported_day, _, err = await import_ote_prices(
site_id, conn, target_date=day
)
if n < 0:
logger.warning(
"scheduled_ote_import site=%s day=%s failed (%s)",
site_id,
day.isoformat(),
err,
)
continue
logger.info(
"scheduled_ote_import site=%s day=%s imported=%s",
site_id,
imported_day,
n,
)
await _refresh_negative_price_predictions(conn, site_id)
async def scheduled_ote_import() -> None:
async with app.state.pg_pool.acquire() as conn:
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
for site in sites:
try:
await _scheduled_ote_import_for_site(conn, int(site["id"]))
except Exception:
logger.exception("scheduled_ote_import site=%s failed", site["id"])
scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat")
scheduler.add_job(
@@ -120,6 +338,13 @@ async def lifespan(app: FastAPI):
second=0,
id="audit_filler",
)
scheduler.add_job(
scheduled_forecast_accuracy,
"cron",
minute="2,17,32,47",
id="forecast_accuracy",
replace_existing=True,
)
scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes")
scheduler.add_job(
scheduled_control_export,
@@ -128,6 +353,13 @@ async def lifespan(app: FastAPI):
second=0,
id="control_export",
)
scheduler.add_job(
scheduled_verify_modbus,
"interval",
minutes=2,
id="verify_modbus",
replace_existing=True,
)
scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan")
scheduler.add_job(
scheduled_rolling_replan,
@@ -135,6 +367,62 @@ async def lifespan(app: FastAPI):
minute="*/15",
id="rolling_replan",
)
scheduler.add_job(
scheduled_baseline_update,
"cron",
hour=0,
minute=30,
id="baseline_update",
replace_existing=True,
)
scheduler.add_job(
scheduled_market_price_stats,
"cron",
hour=14,
minute=45,
id="market_price_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_tuv_usage_stats,
"cron",
hour=0,
minute=45,
id="tuv_usage_stats",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=13,
minute=30,
id="ote_import_preopen",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=14,
minute=0,
id="ote_import_main",
replace_existing=True,
)
scheduler.add_job(
scheduled_ote_import,
"cron",
hour=0,
minute=5,
id="ote_import_backfill",
replace_existing=True,
)
scheduler.add_job(
scheduled_forecast_refresh,
"cron",
hour="*/2",
minute=5,
id="forecast_refresh_2h",
replace_existing=True,
)
scheduler.start()
telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool))
@@ -142,6 +430,11 @@ async def lifespan(app: FastAPI):
yield
ws_h = getattr(app.state, "ws_log_handler", None)
if ws_h is not None:
logging.getLogger().removeHandler(ws_h)
app.state.ws_log_handler = None
telemetry_task.cancel()
try:
await telemetry_task
@@ -230,6 +523,45 @@ class ForecastRunResponse(BaseModel):
pv_arrays: int
class ModbusCommandVerifyItem(BaseModel):
id: int
asset_code: str
register_name: str | None
value_to_write: int
value_verified: int | None
status: str
class ModbusVerifyResponse(BaseModel):
checked: int
verified: int
mismatch: int
commands: list[ModbusCommandVerifyItem]
class NegativePricePredictionItem(BaseModel):
id: int
predicted_at: datetime
predicted_date: date
window_start_hour: int
window_end_hour: int
probability_pct: int
expected_min_price: float | None
reason: str | None
async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None:
"""Po importu cen / forecastu obnoví cache predikce záporných cen."""
try:
await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", site_id)
except Exception:
logger.warning(
"fn_predict_negative_price_windows failed for site %s",
site_id,
exc_info=True,
)
@sites_router.post("/{site_id}/prices/import", response_model=PricesImportResponse)
async def post_import_site_prices(
site_id: int,
@@ -241,15 +573,18 @@ async def post_import_site_prices(
),
) -> PricesImportResponse:
target: date | None = _parse_ymd(date_str) if date_str is not None else None
import_error: str | None = None
async with db.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
n, day, first_price = await import_ote_prices(site_id, conn, target_date=target)
n, day, first_price, import_error = await import_ote_prices(site_id, conn, target_date=target)
if n >= 0:
await _refresh_negative_price_predictions(conn, site_id)
if n < 0:
raise HTTPException(
status_code=422,
detail="OTE API nedostupné nebo nevrátilo data",
detail=f"OTE import selhal ({import_error or 'unknown'})",
)
return PricesImportResponse(
slots_imported=n,
@@ -258,6 +593,66 @@ async def post_import_site_prices(
)
@sites_router.get(
"/{site_id}/prices/negative-predictions",
response_model=list[NegativePricePredictionItem],
)
async def get_site_negative_price_predictions(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)],
) -> list[NegativePricePredictionItem]:
"""Záznamy z cache predikce záporných cen na příštích 7 kalendářních dní (v časové zóně lokality)."""
async with db.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT
p.id,
p.predicted_at,
p.predicted_date,
p.window_start_hour,
p.window_end_hour,
p.probability_pct,
p.expected_min_price,
p.reason
FROM ems.predicted_negative_price_window p
WHERE p.site_id = $1
AND p.predicted_date > (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
'Europe/Prague'
)
)::date
AND p.predicted_date <= (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
'Europe/Prague'
)
)::date + 7
ORDER BY p.predicted_date, p.window_start_hour
""",
site_id,
)
out: list[NegativePricePredictionItem] = []
for r in rows:
em = r["expected_min_price"]
out.append(
NegativePricePredictionItem(
id=int(r["id"]),
predicted_at=r["predicted_at"],
predicted_date=r["predicted_date"],
window_start_hour=int(r["window_start_hour"]),
window_end_hour=int(r["window_end_hour"]),
probability_pct=int(r["probability_pct"]),
expected_min_price=float(em) if em is not None else None,
reason=r["reason"],
)
)
return out
@sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse)
async def get_site_prices_latest(
site_id: int,
@@ -293,6 +688,186 @@ async def get_site_prices_latest(
)
@sites_router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse)
async def get_verify_modbus_commands(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)],
minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"),
) -> ModbusVerifyResponse:
"""
Ruční ověření Modbus zápisů (written) z posledních N minut.
Vhodné hned po manuálním exportu setpointů.
"""
async with db.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
lookback = timedelta(minutes=minutes)
rows = await conn.fetch(
"""
SELECT id FROM ems.modbus_command
WHERE site_id = $1
AND status = 'written'
AND written_at >= now() - $2::interval
ORDER BY written_at
""",
site_id,
lookback,
)
ids = [int(r["id"]) for r in rows]
checked = len(ids)
if ids:
await verify_modbus_commands(ids, conn, site_id)
detail_rows = (
await conn.fetch(
"""
SELECT id, asset_code, register_name, value_to_write, value_verified, status
FROM ems.modbus_command
WHERE id = ANY($1::int[])
ORDER BY id
""",
ids,
)
if ids
else []
)
commands = [
ModbusCommandVerifyItem(
id=int(r["id"]),
asset_code=r["asset_code"],
register_name=r["register_name"],
value_to_write=int(r["value_to_write"]),
value_verified=int(r["value_verified"])
if r["value_verified"] is not None
else None,
status=r["status"],
)
for r in detail_rows
]
verified = sum(1 for c in commands if c.status == "verified")
mismatch = sum(1 for c in commands if c.status == "mismatch")
return ModbusVerifyResponse(
checked=checked,
verified=verified,
mismatch=mismatch,
commands=commands,
)
class DeyeRegistersLiveResponse(BaseModel):
reg108_charge_a: int
reg109_discharge_a: int
reg141_energy_mode: int
reg142_limit_control: int
reg143_export_limit_w: int
reg178_peak_shaving_switch: int
reg191_peak_shaving_w: int
read_at: str
@sites_router.get(
"/{site_id}/control/registers",
response_model=DeyeRegistersLiveResponse,
)
async def get_control_registers_live(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)],
) -> DeyeRegistersLiveResponse:
"""Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient."""
async with db.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
try:
payload = await read_deye_registers_live(site_id, conn)
except ValueError:
raise HTTPException(
status_code=404,
detail="No controllable Modbus inverter for this site",
) from None
except Exception as e:
logger.warning("get_control_registers_live site=%s: %s", site_id, e)
raise HTTPException(
status_code=503,
detail=f"Modbus read failed: {e}",
) from e
return DeyeRegistersLiveResponse(**payload)
class ModbusJournalCommandRow(BaseModel):
id: int
register: int
register_name: str | None
value_to_write: int
value_written: int | None
value_verified: int | None
status: str
attempt_count: int
created_at: str
class ModbusJournalListResponse(BaseModel):
commands: list[ModbusJournalCommandRow]
@sites_router.get(
"/{site_id}/control/journal",
response_model=ModbusJournalListResponse,
)
async def get_control_command_journal(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)],
limit: int = Query(50, ge=1, le=100),
) -> ModbusJournalListResponse:
async with db.acquire() as conn:
site_ok = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT id, register, register_name, value_to_write, value_written,
value_verified, status, attempt_count, created_at
FROM ems.modbus_command
WHERE site_id = $1
ORDER BY created_at DESC
LIMIT $2
""",
site_id,
limit,
)
cmds: list[ModbusJournalCommandRow] = []
for r in rows:
d = record_to_dict(r)
ca = d["created_at"]
cmds.append(
ModbusJournalCommandRow(
id=int(d["id"]),
register=int(d["register"]),
register_name=d.get("register_name"),
value_to_write=int(d["value_to_write"]),
value_written=int(d["value_written"])
if d.get("value_written") is not None
else None,
value_verified=int(d["value_verified"])
if d.get("value_verified") is not None
else None,
status=str(d["status"]),
attempt_count=int(d["attempt_count"]),
created_at=ca if isinstance(ca, str) else str(ca),
)
)
return ModbusJournalListResponse(commands=cmds)
@sites_router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse)
async def post_run_site_forecast(
site_id: int,
@@ -302,7 +877,13 @@ async def post_run_site_forecast(
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
try:
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
except Exception as e:
logger.error("Forecast failed: %s", e, exc_info=True)
raise HTTPException(status_code=422, detail=str(e)) from e
if intervals >= 0:
await _refresh_negative_price_predictions(conn, site_id)
if intervals < 0:
raise HTTPException(
status_code=422,
@@ -326,14 +907,27 @@ async def get_site_forecast_pv(
raise HTTPException(status_code=404, detail="Site not found")
rows = await conn.fetch(
"""
SELECT fpi.*, apa.code AS pv_array_code
FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
JOIN ems.asset_pv_array apa ON apa.id = fpi.pv_array_id AND apa.site_id = fpr.site_id
WHERE fpr.site_id = $1
AND fpi.interval_start::date = $2::date
AND fpr.status = 'ok'
ORDER BY apa.code, fpi.interval_start
SELECT run_id, pv_array_id, interval_start, power_w,
irradiance_wm2, temp_c, pv_array_code
FROM (
SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id)
fpi.run_id,
fpi.pv_array_id,
fpi.interval_start,
fpi.power_w,
fpi.irradiance_wm2,
fpi.temp_c,
apa.code AS pv_array_code
FROM ems.forecast_pv_interval fpi
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
JOIN ems.asset_pv_array apa
ON apa.id = fpr.pv_array_id AND apa.site_id = fpr.site_id
WHERE fpr.site_id = $1
AND fpi.interval_start::date = $2::date
AND fpr.status = 'ok'
ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC
) latest
ORDER BY pv_array_code, interval_start
""",
site_id,
d,
@@ -351,6 +945,45 @@ async def get_site_forecast_pv(
return {"pv_a": pv_a, "pv_b": pv_b}
class NegPricePredictionItem(BaseModel):
predicted_date: str
window_start_hour: int
window_end_hour: int
probability_pct: float
expected_min_price: float | None
reason: str
class NegativePredictionsResponse(BaseModel):
predictions: list[NegPricePredictionItem]
insufficient_history: bool
@sites_router.get(
"/{site_id}/prices/negative-predictions",
response_model=NegativePredictionsResponse,
)
async def get_site_negative_price_predictions(
site_id: int,
db: Annotated[asyncpg.Pool, Depends(get_pool)],
) -> NegativePredictionsResponse:
"""Zástupný endpoint predikce modelu doplnit později; historii počítáme z OTE dat."""
async with db.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
ndays = await conn.fetchval(
"""
SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int
FROM ems.market_interval_price
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
AND interval_start >= now() - INTERVAL '400 days'
"""
)
n = int(ndays or 0)
return NegativePredictionsResponse(predictions=[], insufficient_history=n < 28)
app.include_router(sites_router)
app.add_middleware(
@@ -362,6 +995,26 @@ app.add_middleware(
)
@app.websocket("/ws/telemetry")
async def ws_telemetry(websocket: WebSocket) -> None:
await manager.connect_telemetry(websocket)
try:
while True:
await websocket.receive_text() # keepalive
except WebSocketDisconnect:
manager.disconnect(websocket)
@app.websocket("/ws/logs")
async def ws_logs(websocket: WebSocket) -> None:
await manager.connect_logs(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]:
db_status = "error"
active_plan_slots = 0

View File

@@ -0,0 +1,249 @@
"""Pravidla pro GET /sites/{id}/notifications (ceny, EV, predikce záporných cen)."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from typing import Any, Literal
from zoneinfo import ZoneInfo
PRAGUE = ZoneInfo("Europe/Prague")
NotificationLevel = Literal["success", "info", "warning", "error"]
NotificationAction = Literal["connect_ev", "replan", "import_prices", "switch_auto"]
@dataclass(frozen=True)
class PriceSlot:
interval_start: datetime
buy: float
sell: float
@dataclass(frozen=True)
class EvSessionRow:
id: int
charger_id: int
energy_delivered_wh: float
target_soc_pct: float | None
session_start: datetime
battery_capacity_kwh: float | None
make: str | None
model: str | None
default_target_soc_pct: float | None
charger_code: str
soc_at_connect_pct: float | None
@dataclass(frozen=True)
class NegWindowRow:
predicted_date: date
window_start_hour: int
window_end_hour: int
probability_pct: int
def _num(v: Any) -> float | None:
if v is None:
return None
if isinstance(v, Decimal):
return float(v)
return float(v)
def _ev_connect_hint(ev_sessions: list[EvSessionRow], current_price: float, avg_buy: float | None) -> str:
if ev_sessions:
return ""
kwh = 30.0
if current_price < 0:
return f"Připojíš-li Teslu, dostaneš zaplaceno za ~{kwh * abs(current_price):.0f} Kč za 30 kWh."
if avg_buy is None:
return ""
savings = avg_buy - current_price
return f"Připojíš-li Teslu, ušetříš ~{kwh * max(0, savings):.0f} Kč."
def _estimate_ev_soc(s: EvSessionRow) -> float:
cap = s.battery_capacity_kwh
delivered_kwh = (s.energy_delivered_wh or 0) / 1000.0
if s.soc_at_connect_pct is not None and cap and cap > 0:
return min(95.0, float(s.soc_at_connect_pct) + (delivered_kwh / cap) * 100.0)
if cap and cap > 0 and s.energy_delivered_wh:
return min(95.0, (delivered_kwh / cap) * 100.0 + 20.0)
return 50.0
def _estimate_needed_kwh(soc_pct: float, battery_kwh: float, ev_sessions: list[EvSessionRow]) -> float:
bat_needed = max(0.0, (80.0 - soc_pct) / 100.0 * battery_kwh)
ev_needed = 0.0
for s in ev_sessions:
cap = s.battery_capacity_kwh or 60.0
tgt = (s.target_soc_pct if s.target_soc_pct is not None else None) or (
s.default_target_soc_pct if s.default_target_soc_pct is not None else 80.0
)
delivered = (s.energy_delivered_wh or 0) / 1000.0
want = max(0.0, (tgt / 100.0) * cap - delivered)
ev_needed += want
return bat_needed + ev_needed
def _estimate_ev_free_kwh(s: EvSessionRow) -> float:
cap = s.battery_capacity_kwh or 60.0
delivered = (s.energy_delivered_wh or 0) / 1000.0
return max(0.0, cap * 0.95 - delivered)
def _tesla_potential_kwh(ev_sessions: list[EvSessionRow]) -> float:
"""Odhad „kolik lze ještě dobít“ pro typickou Teslu, pokud není připojena."""
if ev_sessions:
return 0.0
return 55.0
def _date_label(d: date) -> str:
today = datetime.now(PRAGUE).date()
if d == today:
return "dnes"
if d == today + timedelta(days=1):
return "zítra"
return f"{d.day}. {d.month}."
def _hours_until(predicted_date: date, start_hour: int) -> float:
start_local = datetime.combine(predicted_date, time(hour=start_hour, minute=0), tzinfo=PRAGUE)
now = datetime.now(PRAGUE)
delta = start_local - now
return max(0.0, delta.total_seconds() / 3600.0)
def build_smart_notifications(
*,
prices: list[PriceSlot],
avg_buy: float | None,
soc_pct: float | None,
battery_kwh: float | None,
ev_sessions: list[EvSessionRow],
neg_windows: list[NegWindowRow],
mode: str,
sell_price_now: float | None,
) -> list[dict[str, Any]]:
notifications: list[dict[str, Any]] = []
soc = float(soc_pct) if soc_pct is not None else 50.0
bat_kwh = float(battery_kwh) if battery_kwh is not None and battery_kwh > 0 else 10.0
current_buy = prices[0].buy if prices else None
current_sell = prices[0].sell if prices else None
# 1. Záporná cena právě teď
if current_buy is not None and current_buy < 0:
bat_free_kwh = (100.0 - soc) / 100.0 * bat_kwh
ev_hint = _ev_connect_hint(ev_sessions, current_buy, avg_buy)
notifications.append(
{
"id": "neg_price_now",
"level": "success",
"title": f"Záporné ceny právě teď ({current_buy:.3f} Kč/kWh)",
"body": (
f"Dostaneš zaplaceno za každý odebraný kWh. "
f"Baterie může pojmout ještě {bat_free_kwh:.1f} kWh. "
+ ev_hint
),
"eta_minutes": 0,
"action": "connect_ev" if not ev_sessions else None,
}
)
# 2. Levná cena v příštích 6 h
avg_ok = avg_buy is not None and avg_buy > 0
if prices and avg_ok and not (current_buy is not None and current_buy < 0):
horizon = prices[:24]
cheap_slots = [p for p in horizon if p.buy < avg_buy * 0.60]
if cheap_slots:
cheapest = min(cheap_slots, key=lambda p: p.buy)
now_utc = datetime.now(timezone.utc)
istart = cheapest.interval_start
if istart.tzinfo is None:
istart = istart.replace(tzinfo=timezone.utc)
eta_min = int((istart - now_utc).total_seconds() / 60)
eta_min = max(0, eta_min)
savings_per_kwh = avg_buy - cheapest.buy
ev_plug_useful = (not ev_sessions) or any(_estimate_ev_soc(s) < 60.0 for s in ev_sessions)
bat_low = soc < 70.0
if ev_plug_useful or bat_low:
needed_kwh = _estimate_needed_kwh(soc, bat_kwh, ev_sessions)
savings_czk = needed_kwh * max(0.0, savings_per_kwh)
extra_body = ""
if ev_plug_useful and not ev_sessions:
extra_body = " Připoj auto před tímto oknem."
notifications.append(
{
"id": "cheap_price_soon",
"level": "info",
"title": f"Levná elektřina za {eta_min} min ({cheapest.buy:.3f} Kč/kWh)",
"body": (
f"Cena bude o {savings_per_kwh:.2f} Kč/kWh nižší než průměr. "
f"Potenciální úspora: ~{savings_czk:.0f} Kč."
+ extra_body
),
"eta_minutes": eta_min,
"action": "connect_ev" if ev_plug_useful and not ev_sessions else None,
}
)
# 3. Predikované záporné ceny
for window in neg_windows[:2]:
if any(n["id"] == "neg_price_now" for n in notifications):
continue
date_label = _date_label(window.predicted_date)
window_str = f"{window.window_start_hour:02d}:00{window.window_end_hour:02d}:00"
hours_until = _hours_until(window.predicted_date, window.window_start_hour)
bat_free = (100.0 - soc) / 100.0 * bat_kwh
ev_free = sum(_estimate_ev_free_kwh(s) for s in ev_sessions)
total_free = bat_free + ev_free
tesla_kwh = _tesla_potential_kwh(ev_sessions)
ev_hint = (
f" Připojíš-li Teslu, lze dobít až {tesla_kwh:.0f} kWh navíc." if not ev_sessions else ""
)
lvl: NotificationLevel = "success" if window.probability_pct >= 70 else "info"
notifications.append(
{
"id": f"neg_pred_{window.predicted_date}_{window.window_start_hour}",
"level": lvl,
"title": (
f"Záporné ceny {date_label} {window_str} ({window.probability_pct}% jistota)"
),
"body": (
f"Solver naplánuje max. odběr ze sítě. Lze dobít ~{total_free:.0f} kWh zdarma."
+ ev_hint
),
"eta_minutes": int(hours_until * 60),
"action": "connect_ev" if not ev_sessions else None,
}
)
# 4. Manuální režim + drahá cena + plná baterie
mode_u = (mode or "").strip().upper()
sell = sell_price_now if sell_price_now is not None else (current_sell if current_sell is not None else None)
if mode_u != "AUTO" and avg_ok and sell is not None and sell > avg_buy * 1.30 and soc > 70.0:
pct_above = ((sell / avg_buy) - 1.0) * 100.0 if avg_buy else 0.0
notifications.append(
{
"id": "manual_expensive",
"level": "warning",
"title": "Drahá elektřina systém není v AUTO",
"body": (
f"Cena prodeje {sell:.3f} Kč/kWh (+{pct_above:.0f}% nad průměr). "
f"Baterie {soc:.0f}%. Přepnutím na AUTO solver využije cenové okno."
),
"eta_minutes": None,
"action": "switch_auto",
}
)
priority = {"error": 0, "success": 1, "warning": 2, "info": 3}
notifications.sort(key=lambda n: priority.get(n["level"], 9))
return notifications

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from datetime import datetime
from datetime import date, datetime
from typing import Annotated, Any
import asyncpg
@@ -91,3 +91,88 @@ async def patch_ev_session(
if row is None:
raise HTTPException(status_code=404, detail="Session not found")
return EvSessionPatchResponse(success=True, session_id=int(row["id"]))
class ArrivalHourItem(BaseModel):
hour: int
confidence_pct: int
samples: int
class ChargerTomorrowArrival(BaseModel):
tomorrow: list[ArrivalHourItem]
class EvArrivalPredictionResponse(BaseModel):
insufficient_data: bool
tomorrow_date: str
chargers: dict[str, ChargerTomorrowArrival]
@router.get("/arrival-prediction", response_model=EvArrivalPredictionResponse)
async def get_ev_arrival_prediction(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> EvArrivalPredictionResponse:
"""Top hodiny příjezdu z ems.fn_ev_expected_arrival; při <5 session celkem insufficient_data."""
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
n_sessions = int(
await conn.fetchval(
"SELECT COUNT(*)::int FROM ems.ev_session WHERE site_id = $1",
site_id,
)
or 0
)
insufficient = n_sessions < 5
tomorrow = await conn.fetchval(
"""
SELECT (
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
NULLIF(TRIM(timezone), ''),
'Europe/Prague'
)
)::date + 1
FROM ems.site
WHERE id = $1
""",
site_id,
)
if tomorrow is None:
raise HTTPException(status_code=500, detail="Site date resolution failed")
tomorrow_d: date = tomorrow
chargers_rows = await conn.fetch(
"SELECT id, code FROM ems.asset_ev_charger WHERE site_id = $1 ORDER BY id",
site_id,
)
chargers: dict[str, ChargerTomorrowArrival] = {}
for ch in chargers_rows:
code = str(ch["code"])
preds = await conn.fetch(
"SELECT * FROM ems.fn_ev_expected_arrival($1, $2, $3::date)",
site_id,
ch["id"],
tomorrow_d,
)
chargers[code] = ChargerTomorrowArrival(
tomorrow=[
ArrivalHourItem(
hour=int(r["expected_hour"]),
confidence_pct=int(r["confidence_pct"]),
samples=int(r["sample_count"]),
)
for r in preds
]
)
return EvArrivalPredictionResponse(
insufficient_data=insufficient,
tomorrow_date=tomorrow_d.isoformat(),
chargers=chargers,
)

View File

@@ -2,17 +2,38 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from datetime import date, datetime, timedelta, timezone
from typing import Annotated, Any, Literal
from zoneinfo import ZoneInfo
import asyncpg
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from app.db_json import record_to_dict
from app.deps import get_pg_pool
from app.notifications_logic import (
EvSessionRow,
NegWindowRow,
PriceSlot,
build_smart_notifications,
)
router = APIRouter(prefix="/sites/{site_id}", tags=["sites"])
class SiteNotificationItem(BaseModel):
id: str
level: Literal["success", "info", "warning", "error"]
title: str
body: str
eta_minutes: int | None = None
action: Literal["connect_ev", "replan", "import_prices", "switch_auto"] | None = None
class SiteNotificationsResponse(BaseModel):
notifications: list[SiteNotificationItem] = Field(default_factory=list)
INV_STALE_SEC = 300
HEARTBEAT_STALE_SEC = 300
EXPECTED_TOMORROW_PRICE_SLOTS = 90
@@ -235,7 +256,10 @@ async def get_site_status_full(
if not has_plan:
add_alert("warn", "Není aktivní plán EMS neoptimalizuje")
if tomorrow_slots < EXPECTED_TOMORROW_PRICE_SLOTS:
# OTE D+1 typicky až po ~14:30 Europe/Prague před tím nevarovat
now_prague = datetime.now(ZoneInfo("Europe/Prague"))
prices_expected = (now_prague.hour, now_prague.minute) >= (14, 30)
if tomorrow_slots < EXPECTED_TOMORROW_PRICE_SLOTS and prices_expected:
add_alert("warn", "Chybí spotové ceny pro zítřek")
if mode_code.upper() == "MANUAL":
@@ -266,3 +290,326 @@ async def get_site_status_full(
"planning": planning,
"alerts": alerts,
}
_NOTIF_LEVEL_PRIORITY = {"error": 0, "success": 1, "warning": 2, "info": 3}
def _infrastructure_notification_items(
*,
has_plan: bool,
tomorrow_slots: int,
mode_code: str,
reserve_soc: float | None,
soc: float | None,
inv_age: int | None,
hb_age: int | None,
) -> list[SiteNotificationItem]:
"""Kritické / provozní notifikace (telemetrie, plán, ceny, režim, heartbeat)."""
items: list[SiteNotificationItem] = []
def push(
nid: str,
level: Literal["success", "info", "warning", "error"],
title: str,
body: str,
*,
eta_minutes: int | None = None,
action: Literal["connect_ev", "replan", "import_prices", "switch_auto"] | None = None,
) -> None:
items.append(
SiteNotificationItem(
id=nid,
level=level,
title=title,
body=body,
eta_minutes=eta_minutes,
action=action,
)
)
if inv_age is None or inv_age > INV_STALE_SEC:
push("telemetry_inverter", "error", "Telemetrie střídače", "Data ze střídače nejsou aktuální.")
if not has_plan:
push(
"no_active_plan",
"warning",
"Chybí aktivní plán",
"EMS zatím neoptimalizuje provoz spusťte plánování.",
action="replan",
)
now_prague = datetime.now(ZoneInfo("Europe/Prague"))
prices_expected = (now_prague.hour, now_prague.minute) >= (14, 30)
if tomorrow_slots < EXPECTED_TOMORROW_PRICE_SLOTS and prices_expected:
push(
"prices_tomorrow",
"warning",
"Ceny na zítřek",
"Nejsou kompletní spotové ceny OTE pro následující den.",
action="import_prices",
)
if mode_code.upper() == "MANUAL":
push("mode_manual", "info", "Manuální režim", "Automatická optimalizace je vypnutá.")
if reserve_soc is not None and soc is not None and soc < reserve_soc:
push("soc_reserve", "error", "SoC pod rezervou", "Nabití baterie je pod nastavenou bezpečnostní rezervou.")
if hb_age is None or hb_age > HEARTBEAT_STALE_SEC:
push("heartbeat", "error", "EMS heartbeat", "Služba EMS nehlásí pravidelný heartbeat.")
return items
def _float_or_none(v: Any) -> float | None:
if v is None:
return None
return float(v)
@router.get("/notifications", response_model=SiteNotificationsResponse)
async def get_site_notifications(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> SiteNotificationsResponse:
async with pool.acquire() as conn:
site = await conn.fetchrow(
"SELECT id, timezone FROM ems.site WHERE id = $1",
site_id,
)
if site is None:
raise HTTPException(status_code=404, detail="Site not found")
tz = site["timezone"] or "Europe/Prague"
mode_row = await conn.fetchrow(
"""
SELECT m.mode_code
FROM ems.site_operating_mode m
WHERE m.site_id = $1
""",
site_id,
)
run_row = await conn.fetchrow(
"""
SELECT id FROM ems.planning_run
WHERE site_id = $1 AND status = 'active'
ORDER BY created_at DESC
LIMIT 1
""",
site_id,
)
reserve_row = await conn.fetchrow(
"""
SELECT MIN(reserve_soc_percent)::float AS reserve_soc
FROM ems.asset_battery
WHERE site_id = $1
""",
site_id,
)
inv_row = await conn.fetchrow(
"""
SELECT battery_soc_percent, measured_at
FROM ems.vw_latest_inverter
WHERE site_id = $1
ORDER BY measured_at DESC NULLS LAST
LIMIT 1
""",
site_id,
)
hb_row = await conn.fetchrow(
"SELECT last_seen FROM ems.site_heartbeat WHERE site_id = $1",
site_id,
)
tomorrow_slots = await conn.fetchval(
"""
SELECT COUNT(*)::int
FROM ems.vw_site_effective_price v
WHERE v.site_id = $1
AND (v.interval_start AT TIME ZONE $2)::date =
((CURRENT_TIMESTAMP AT TIME ZONE $2)::date + INTERVAL '1 day')::date
""",
site_id,
tz,
)
price_rows = await conn.fetch(
"""
SELECT interval_start,
effective_buy_price_czk_kwh,
effective_sell_price_czk_kwh
FROM ems.vw_site_effective_price
WHERE site_id = $1
AND interval_start >= now()
AND interval_start < now() + INTERVAL '48 hours'
ORDER BY interval_start
""",
site_id,
)
avg_row = await conn.fetchrow(
"""
SELECT AVG(effective_buy_price_czk_kwh)::float AS avg_buy
FROM ems.vw_site_effective_price
WHERE site_id = $1
AND interval_start::date IN (CURRENT_DATE, CURRENT_DATE + INTERVAL '1 day')
""",
site_id,
)
bat_row = await conn.fetchrow(
"""
SELECT COALESCE(SUM(ab.usable_capacity_wh), 0)::float AS usable_wh
FROM ems.asset_battery ab
JOIN ems.asset_inverter ai ON ai.id = ab.inverter_id
WHERE ai.site_id = $1
""",
site_id,
)
ev_rows = await conn.fetch(
"""
SELECT DISTINCT ON (es.id)
es.id,
es.charger_id,
es.energy_delivered_wh,
es.target_soc_pct,
es.session_start,
es.soc_at_connect_pct,
COALESCE(av_id.battery_capacity_kwh, av_def.battery_capacity_kwh) AS battery_capacity_kwh,
COALESCE(av_id.make, av_def.make) AS make,
COALESCE(av_id.model, av_def.model) AS model,
COALESCE(av_id.default_target_soc_pct, av_def.default_target_soc_pct) AS default_target_soc_pct,
ac.code AS charger_code
FROM ems.ev_session es
JOIN ems.asset_ev_charger ac ON ac.id = es.charger_id
LEFT JOIN ems.asset_vehicle av_id ON av_id.id = es.vehicle_id
LEFT JOIN ems.asset_vehicle av_def
ON av_def.default_charger_id = ac.id AND es.vehicle_id IS NULL
WHERE es.site_id = $1 AND es.session_end IS NULL
ORDER BY es.id, av_def.id NULLS LAST
""",
site_id,
)
neg_rows = await conn.fetch(
"""
SELECT predicted_date, window_start_hour, window_end_hour, probability_pct
FROM ems.predicted_negative_price_window
WHERE site_id = $1
AND predicted_date BETWEEN CURRENT_DATE AND CURRENT_DATE + 2
AND probability_pct >= 50
ORDER BY predicted_date, window_start_hour
""",
site_id,
)
has_plan = run_row is not None
mode_code = (mode_row["mode_code"] if mode_row else None) or ""
reserve_soc = (
float(reserve_row["reserve_soc"])
if reserve_row and reserve_row["reserve_soc"] is not None
else None
)
soc = (
float(inv_row["battery_soc_percent"])
if inv_row and inv_row["battery_soc_percent"] is not None
else None
)
inv_age = _age_seconds(inv_row["measured_at"] if inv_row else None)
hb_age = _age_seconds(hb_row["last_seen"] if hb_row else None)
infra = _infrastructure_notification_items(
has_plan=has_plan,
tomorrow_slots=int(tomorrow_slots or 0),
mode_code=mode_code,
reserve_soc=reserve_soc,
soc=soc,
inv_age=inv_age,
hb_age=hb_age,
)
prices: list[PriceSlot] = []
for r in price_rows:
buy = _float_or_none(r["effective_buy_price_czk_kwh"])
if buy is None:
continue
sell_v = _float_or_none(r["effective_sell_price_czk_kwh"])
istart = r["interval_start"]
prices.append(
PriceSlot(
interval_start=istart,
buy=buy,
sell=sell_v if sell_v is not None else buy,
)
)
avg_buy = _float_or_none(avg_row["avg_buy"]) if avg_row else None
usable_wh = _float_or_none(bat_row["usable_wh"]) if bat_row else None
battery_kwh = (usable_wh / 1000.0) if usable_wh is not None else None
ev_sessions: list[EvSessionRow] = []
for er in ev_rows:
ev_sessions.append(
EvSessionRow(
id=int(er["id"]),
charger_id=int(er["charger_id"]),
energy_delivered_wh=float(er["energy_delivered_wh"] or 0),
target_soc_pct=_float_or_none(er["target_soc_pct"]),
session_start=er["session_start"],
battery_capacity_kwh=_float_or_none(er["battery_capacity_kwh"]),
make=er["make"],
model=er["model"],
default_target_soc_pct=_float_or_none(er["default_target_soc_pct"]),
charger_code=str(er["charger_code"] or ""),
soc_at_connect_pct=_float_or_none(er["soc_at_connect_pct"]),
)
)
neg_windows: list[NegWindowRow] = []
for nr in neg_rows:
dr = nr["predicted_date"]
if isinstance(dr, datetime):
d_conv = dr.date()
elif isinstance(dr, date):
d_conv = dr
else:
d_conv = date.today()
neg_windows.append(
NegWindowRow(
predicted_date=d_conv,
window_start_hour=int(nr["window_start_hour"]),
window_end_hour=int(nr["window_end_hour"]),
probability_pct=int(nr["probability_pct"]),
)
)
sell_now = prices[0].sell if prices else None
smart_raw = build_smart_notifications(
prices=prices,
avg_buy=avg_buy,
soc_pct=soc,
battery_kwh=battery_kwh,
ev_sessions=ev_sessions,
neg_windows=neg_windows,
mode=mode_code,
sell_price_now=sell_now,
)
smart_items = [
SiteNotificationItem(
id=d["id"],
level=d["level"],
title=d["title"],
body=d["body"],
eta_minutes=d.get("eta_minutes"),
action=d.get("action"),
)
for d in smart_raw
]
merged = infra + smart_items
merged.sort(key=lambda x: _NOTIF_LEVEL_PRIORITY.get(x.level, 9))
return SiteNotificationsResponse(notifications=merged[:5])

View File

@@ -1,21 +1,21 @@
"""REST API aktivní plán a ruční přepočet."""
from datetime import datetime, timedelta, timezone
import logging
from datetime import datetime, timezone
from typing import Annotated, Any, Literal
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict, Field
from app.db_json import record_to_dict
from app.deps import get_pg_pool
from services.planning_engine import _current_slot_start, run_plan_api
from services.control_exporter import export_setpoints
from services.planning_engine import run_plan_api
router = APIRouter(prefix="/sites/{site_id}/plan", tags=["plan"])
PRICE_CHECK_HOURS = 24
_SLOTS_PER_HOUR = 4
_EXPECTED_PRICE_SLOTS = PRICE_CHECK_HOURS * _SLOTS_PER_HOUR
logger = logging.getLogger(__name__)
class RunPlanResponse(BaseModel):
@@ -25,6 +25,27 @@ class RunPlanResponse(BaseModel):
horizon_end: datetime
class PlanningIntervalDto(BaseModel):
"""Řádek `ems.planning_interval` v odpovědi aktivního plánu."""
model_config = ConfigDict(extra="allow")
interval_start: str
is_predicted_price: bool = Field(
default=False,
description=(
"True pokud solver pro slot použil predikovanou cenu (market_price_stats), "
"nikoli přesný řádek z vw_site_effective_price / OTE."
),
)
class CurrentPlanResponseModel(BaseModel):
run: dict[str, Any]
intervals: list[PlanningIntervalDto]
summary: dict[str, Any]
def _build_summary(intervals: list[dict[str, Any]]) -> dict[str, Any]:
total_cost = 0.0
total_curtailed_kwh = 0.0
@@ -55,11 +76,29 @@ def _build_summary(intervals: list[dict[str, Any]]) -> dict[str, Any]:
}
@router.get("/current")
def _pv_scarcity_factor_from_intervals(
intervals: list[dict[str, Any]], battery_usable_wh: float | None
) -> float:
"""Stejná logika jako v solveru: 0.65..1.0 podle očekávané FVE energie na ~24h."""
if not intervals:
return 1.0
batt_kwh = max(1.0, float(battery_usable_wh or 0.0) / 1000.0)
horizon_slots = min(len(intervals), int(24 / 0.25))
pv_kwh = 0.0
for row in intervals[:horizon_slots]:
pv = row.get("pv_forecast_total_w")
if pv is not None:
pv_kwh += max(0.0, float(pv)) * 0.25 / 1000.0
coverage = pv_kwh / batt_kwh
coverage_clamped = max(0.0, min(1.0, coverage))
return round(0.65 + 0.35 * coverage_clamped, 4)
@router.get("/current", response_model=CurrentPlanResponseModel)
async def get_current_plan(
site_id: int,
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
) -> dict[str, Any]:
) -> CurrentPlanResponseModel:
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
@@ -81,17 +120,53 @@ async def get_current_plan(
run_id = run_row["id"]
int_rows = await conn.fetch(
"""
SELECT *
FROM ems.planning_interval
WHERE run_id = $1
ORDER BY interval_start
WITH latest_fc AS (
SELECT id
FROM ems.forecast_pv_run
WHERE site_id = $2 AND status = 'ok'
ORDER BY created_at DESC
LIMIT 1
),
fc_slot AS (
SELECT fpi.interval_start, COALESCE(SUM(fpi.power_w), 0)::BIGINT AS pv_forecast_total_w
FROM ems.forecast_pv_interval fpi
WHERE fpi.run_id = (SELECT id FROM latest_fc)
GROUP BY fpi.interval_start
)
SELECT
pi.*,
ai.actual_pv_power_w AS pv_power_w,
fs.pv_forecast_total_w AS pv_forecast_total_w
FROM ems.planning_interval pi
LEFT JOIN ems.audit_interval ai
ON ai.site_id = $2 AND ai.interval_start = pi.interval_start
LEFT JOIN fc_slot fs ON fs.interval_start = pi.interval_start
WHERE pi.run_id = $1
ORDER BY pi.interval_start
""",
run_id,
site_id,
)
battery_usable_wh = await conn.fetchval(
"""
SELECT COALESCE(SUM(ab.usable_capacity_wh), 0)::float
FROM ems.asset_battery ab
WHERE ab.site_id = $1
""",
site_id,
)
intervals = [record_to_dict(r) for r in int_rows]
summary = _build_summary(intervals)
return {"run": record_to_dict(run_row), "intervals": intervals, "summary": summary}
intervals_raw = [record_to_dict(r) for r in int_rows]
summary = _build_summary(intervals_raw)
summary["pv_scarcity_factor"] = _pv_scarcity_factor_from_intervals(
intervals_raw, float(battery_usable_wh or 0.0)
)
intervals = [PlanningIntervalDto.model_validate(d) for d in intervals_raw]
return CurrentPlanResponseModel(
run=record_to_dict(run_row),
intervals=intervals,
summary=summary,
)
@router.post("/run", response_model=RunPlanResponse)
@@ -100,52 +175,52 @@ async def post_run_plan(
pool: Annotated[asyncpg.Pool, Depends(get_pg_pool)],
plan_type: Literal["daily", "rolling"] = Query("rolling", alias="type"),
) -> RunPlanResponse:
window_start = _current_slot_start(datetime.now(timezone.utc))
window_end = window_start + timedelta(hours=PRICE_CHECK_HOURS)
async with pool.acquire() as conn:
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
if not site_ok:
raise HTTPException(status_code=404, detail="Site not found")
price_slots = await conn.fetchval(
days_with_prices = await conn.fetchval(
"""
SELECT COUNT(DISTINCT interval_start::date)::int AS days_with_prices
FROM ems.market_interval_price
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
AND interval_start >= now()
AND interval_start < now() + INTERVAL '48 hours'
"""
SELECT COUNT(DISTINCT interval_start)::int
FROM ems.vw_site_effective_price
WHERE site_id = $1
AND interval_start >= $2
AND interval_start < $3
""",
site_id,
window_start,
window_end,
)
if (price_slots or 0) < _EXPECTED_PRICE_SLOTS:
if (days_with_prices or 0) < 1:
raise HTTPException(
status_code=422,
detail="Nejsou dostupné tržní ceny. Spusťte nejdřív import cen.",
detail="Nejsou dostupné tržní ceny",
)
try:
run_id, solver_duration_ms = await run_plan_api(
site_id, plan_type, conn, triggered_by="api"
)
# Nový active run aplikuj hned; nečekej na periodický control_export job.
await export_setpoints(site_id, conn)
row = await conn.fetchrow(
"""
SELECT horizon_start, horizon_end
FROM ems.planning_run
WHERE id = $1
""",
run_id,
)
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except RuntimeError as e:
raise HTTPException(status_code=422, detail=str(e)) from e
except Exception as e:
logger.error("Plan run failed: %s", e, exc_info=True)
raise HTTPException(status_code=422, detail=str(e)) from e
row = await conn.fetchrow(
"""
SELECT horizon_start, horizon_end
FROM ems.planning_run
WHERE id = $1
""",
run_id,
)
if row is None:
raise HTTPException(status_code=500, detail="Planning run row missing after insert")
if row is None:
raise HTTPException(status_code=500, detail="Planning run row missing after insert")
return RunPlanResponse(
run_id=run_id,

View File

@@ -0,0 +1,25 @@
import asyncio
import logging
from datetime import datetime, timezone
from .ws_manager import manager
class WSLogHandler(logging.Handler):
"""Posílá log záznamy přes WebSocket všem připojeným klientům /ws/logs."""
def emit(self, record: logging.LogRecord) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime("%H:%M:%S")
msg = {
"ts": ts,
"level": record.levelname,
"logger": record.name.split(".")[-1],
"msg": record.getMessage(),
}
loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(manager.broadcast_log(msg))
)

38
backend/app/ws_manager.py Normal file
View File

@@ -0,0 +1,38 @@
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self._telemetry: list[WebSocket] = []
self._logs: list[WebSocket] = []
async def connect_telemetry(self, ws: WebSocket):
await ws.accept()
self._telemetry.append(ws)
async def connect_logs(self, ws: WebSocket):
await ws.accept()
self._logs.append(ws)
def disconnect(self, ws: WebSocket):
self._telemetry = [w for w in self._telemetry if w != ws]
self._logs = [w for w in self._logs if w != ws]
async def broadcast_telemetry(self, data: dict):
await self._broadcast(self._telemetry, data)
async def broadcast_log(self, record: dict):
await self._broadcast(self._logs, record)
async def _broadcast(self, clients: list, data: dict):
dead = []
for ws in clients:
try:
await ws.send_json(data)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
manager = ConnectionManager()