1291 lines
44 KiB
Python
1291 lines
44 KiB
Python
"""EMS FastAPI – health, provozní režimy, PostgREST doplňky."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
from datetime import date, datetime, timedelta, timezone
|
||
from typing import Annotated, Any, Literal
|
||
from zoneinfo import ZoneInfo
|
||
|
||
import asyncpg
|
||
import httpx
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from app.db_json import record_to_dict
|
||
from app.deps import set_pg_pool
|
||
from app.routers.economics import router as economics_router
|
||
from app.routers.energy_flows import router as energy_flows_router
|
||
from app.routers.ev import router as ev_router
|
||
from app.routers.full_status import router as full_status_router
|
||
from app.routers.plan import router as plan_router
|
||
from app.ws_log_handler import WSLogHandler
|
||
from app.ws_manager import manager
|
||
from fastapi import (
|
||
APIRouter,
|
||
Depends,
|
||
FastAPI,
|
||
HTTPException,
|
||
Query,
|
||
Request,
|
||
WebSocket,
|
||
WebSocketDisconnect,
|
||
)
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from services.audit_filler import fill_audit_for_completed_intervals
|
||
from services.control_exporter import (
|
||
export_setpoints,
|
||
read_deye_registers_live,
|
||
verify_modbus_commands,
|
||
)
|
||
from services.heartbeat_service import send_heartbeat
|
||
from services.forecast_service import fetch_pv_forecast
|
||
from services.notification_service import (
|
||
notify_operating_mode_changed,
|
||
run_fn_set_mode_with_discord,
|
||
)
|
||
from services.price_importer import import_ote_prices
|
||
from services.telemetry_collector import run_telemetry_loop_wrapper
|
||
from pydantic import BaseModel, Field
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _dsn() -> str:
|
||
host = os.getenv("DB_HOST", "localhost")
|
||
port = os.getenv("DB_PORT", "5432")
|
||
name = os.getenv("DB_NAME", "ems")
|
||
user = os.getenv("DB_USER", "ems_user")
|
||
password = os.getenv("DB_PASSWORD", "")
|
||
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
|
||
|
||
|
||
pool: asyncpg.Pool | None = None
|
||
|
||
|
||
async def get_pool() -> asyncpg.Pool:
|
||
if pool is None:
|
||
raise HTTPException(status_code=503, detail="Database pool not ready")
|
||
return pool
|
||
|
||
|
||
# Cron hodiny/minuty = Europe/Prague (import OTE 13:30 / 14:00, denní plán 15:00, …)
|
||
scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Prague"))
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
global pool
|
||
pool = await asyncpg.create_pool(_dsn(), min_size=1, max_size=5)
|
||
set_pg_pool(pool)
|
||
app.state.pg_pool = pool
|
||
|
||
app.state.ws_log_handler = WSLogHandler()
|
||
app.state.ws_log_handler.setLevel(logging.INFO)
|
||
logging.getLogger().addHandler(app.state.ws_log_handler)
|
||
|
||
from services.planning_engine import run_daily_plan, run_rolling_replan
|
||
|
||
async def scheduled_heartbeat() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
await send_heartbeat(site["id"], conn)
|
||
except Exception:
|
||
logger.exception("scheduled_heartbeat site=%s failed", site["id"])
|
||
|
||
async def scheduled_audit_filler() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
await fill_audit_for_completed_intervals(site["id"], conn)
|
||
except Exception:
|
||
logger.exception("scheduled_audit_filler site=%s failed", site["id"])
|
||
|
||
async def scheduled_forecast_accuracy() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
n = await conn.fetchval(
|
||
"SELECT ems.fn_fill_forecast_accuracy($1, 48)",
|
||
site["id"],
|
||
)
|
||
if n:
|
||
logger.info(
|
||
"forecast_accuracy filled %s slots for site %s",
|
||
n,
|
||
site["id"],
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_forecast_accuracy site=%s failed", site["id"]
|
||
)
|
||
|
||
async def scheduled_expire_modes() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
try:
|
||
rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()")
|
||
for r in rows:
|
||
await notify_operating_mode_changed(
|
||
str(r["site_code"]),
|
||
str(r["old_mode"]),
|
||
str(r["new_mode"]),
|
||
"system:expiry",
|
||
"Automatické vypršení dočasného režimu",
|
||
)
|
||
except Exception:
|
||
logger.exception("scheduled_expire_modes failed")
|
||
|
||
async def scheduled_control_export() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
await export_setpoints(site["id"], conn)
|
||
except Exception as e:
|
||
logger.exception("scheduled_control_export site=%s: %s", site["id"], e)
|
||
|
||
async def scheduled_verify_modbus() -> None:
|
||
"""
|
||
Ověří příkazy ve stavu written z posledních 20 minut.
|
||
Běží každé 2 minuty, nezávisle na control_exporter (delší okno kvůli zpoždění jobu).
|
||
"""
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
cmd_rows = await conn.fetch(
|
||
"""
|
||
SELECT id FROM ems.modbus_command
|
||
WHERE site_id = $1
|
||
AND status = 'written'
|
||
AND written_at >= now() - INTERVAL '20 minutes'
|
||
ORDER BY written_at
|
||
""",
|
||
site["id"],
|
||
)
|
||
if cmd_rows:
|
||
await verify_modbus_commands(
|
||
[int(r["id"]) for r in cmd_rows],
|
||
conn,
|
||
int(site["id"]),
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_verify_modbus site=%s failed", site["id"]
|
||
)
|
||
|
||
async def scheduled_daily_plan() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
site_id = int(site["id"])
|
||
try:
|
||
await run_daily_plan(site_id, conn)
|
||
# Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu.
|
||
await export_setpoints(site_id, conn)
|
||
except Exception:
|
||
logger.exception("scheduled_daily_plan site=%s failed", site_id)
|
||
|
||
async def scheduled_rolling_replan() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
site_id = int(site["id"])
|
||
try:
|
||
await run_rolling_replan(site_id, conn)
|
||
# Aplikuj nový active run okamžitě, nečekej na další 15min tick exportu.
|
||
await export_setpoints(site_id, conn)
|
||
except Exception:
|
||
logger.exception("scheduled_rolling_replan site=%s failed", site_id)
|
||
|
||
async def scheduled_baseline_update() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
n = await conn.fetchval(
|
||
"SELECT ems.fn_update_baseline_stats($1, 30)",
|
||
site["id"],
|
||
)
|
||
logger.info(
|
||
"baseline_stats updated %s rows for site %s",
|
||
n,
|
||
site["id"],
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_baseline_update site=%s failed", site["id"]
|
||
)
|
||
|
||
async def scheduled_market_price_stats() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
n = await conn.fetchval(
|
||
"SELECT ems.fn_update_market_price_stats($1, 90)",
|
||
site["id"],
|
||
)
|
||
logger.info(
|
||
"market_price_stats updated %s rows site=%s",
|
||
n,
|
||
site["id"],
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_market_price_stats site=%s failed", site["id"]
|
||
)
|
||
|
||
async def scheduled_tuv_usage_stats() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
try:
|
||
n = await conn.fetchval(
|
||
"SELECT ems.fn_update_tuv_usage_stats($1, 30)",
|
||
site["id"],
|
||
)
|
||
logger.info(
|
||
"tuv_usage_stats updated %s rows site=%s",
|
||
n,
|
||
site["id"],
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_tuv_usage_stats site=%s failed", site["id"]
|
||
)
|
||
|
||
async def scheduled_forecast_refresh() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
site_id = int(site["id"])
|
||
try:
|
||
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
|
||
if intervals >= 0:
|
||
logger.info(
|
||
"scheduled_forecast_refresh site=%s intervals=%s arrays=%s",
|
||
site_id,
|
||
intervals,
|
||
pv_arrays,
|
||
)
|
||
await _refresh_negative_price_predictions(conn, site_id)
|
||
else:
|
||
logger.warning(
|
||
"scheduled_forecast_refresh site=%s failed",
|
||
site_id,
|
||
)
|
||
except Exception:
|
||
logger.exception("scheduled_forecast_refresh site=%s failed", site_id)
|
||
|
||
async def _count_ote_slots_for_day(
|
||
conn: asyncpg.Connection, target_day: date
|
||
) -> int:
|
||
return int(
|
||
await conn.fetchval(
|
||
"""
|
||
SELECT COUNT(*)::int
|
||
FROM ems.market_interval_price
|
||
WHERE market_source = 'OTE_CZ'
|
||
AND interval_start::date = $1::date
|
||
""",
|
||
target_day,
|
||
)
|
||
or 0
|
||
)
|
||
|
||
async def _refresh_negative_price_predictions_all_active(
|
||
conn: asyncpg.Connection,
|
||
) -> None:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
await _refresh_negative_price_predictions(conn, int(site["id"]))
|
||
|
||
async def _scheduled_ote_import_global(conn: asyncpg.Connection) -> None:
|
||
"""Jeden OTE fetch na chybějící den; market_interval_price je globální pro všechny site."""
|
||
prague_tz = ZoneInfo("Europe/Prague")
|
||
now_loc = datetime.now(prague_tz)
|
||
today = now_loc.date()
|
||
tomorrow = today + timedelta(days=1)
|
||
any_import_ok = False
|
||
|
||
for day in (today, tomorrow):
|
||
slots = await _count_ote_slots_for_day(conn, day)
|
||
if slots >= 96:
|
||
continue
|
||
n, imported_day, _, err = await import_ote_prices(
|
||
conn, site_id=None, target_date=day
|
||
)
|
||
if n < 0:
|
||
logger.warning(
|
||
"scheduled_ote_import_global day=%s failed (%s)",
|
||
day.isoformat(),
|
||
err,
|
||
)
|
||
continue
|
||
logger.info(
|
||
"scheduled_ote_import_global day=%s imported=%s slots",
|
||
imported_day,
|
||
n,
|
||
)
|
||
any_import_ok = True
|
||
|
||
if any_import_ok:
|
||
await _refresh_negative_price_predictions_all_active(conn)
|
||
|
||
async def scheduled_ote_import() -> None:
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
try:
|
||
await _scheduled_ote_import_global(conn)
|
||
except Exception:
|
||
logger.exception("scheduled_ote_import_global failed")
|
||
|
||
scheduler.add_job(scheduled_heartbeat, "interval", seconds=60, id="heartbeat")
|
||
scheduler.add_job(
|
||
scheduled_audit_filler,
|
||
"cron",
|
||
minute="1,16,31,46",
|
||
second=0,
|
||
id="audit_filler",
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_forecast_accuracy,
|
||
"cron",
|
||
minute="2,17,32,47",
|
||
id="forecast_accuracy",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(scheduled_expire_modes, "interval", minutes=1, id="expire_modes")
|
||
scheduler.add_job(
|
||
scheduled_control_export,
|
||
"cron",
|
||
minute="14,29,44,59",
|
||
second=0,
|
||
id="control_export",
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_verify_modbus,
|
||
"interval",
|
||
minutes=2,
|
||
id="verify_modbus",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan")
|
||
scheduler.add_job(
|
||
scheduled_rolling_replan,
|
||
"cron",
|
||
minute="*/15",
|
||
id="rolling_replan",
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_baseline_update,
|
||
"cron",
|
||
hour=0,
|
||
minute=30,
|
||
id="baseline_update",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_market_price_stats,
|
||
"cron",
|
||
hour=14,
|
||
minute=45,
|
||
id="market_price_stats",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_tuv_usage_stats,
|
||
"cron",
|
||
hour=0,
|
||
minute=45,
|
||
id="tuv_usage_stats",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_ote_import,
|
||
"cron",
|
||
hour=13,
|
||
minute=30,
|
||
id="ote_import_preopen",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_ote_import,
|
||
"cron",
|
||
hour=14,
|
||
minute=0,
|
||
id="ote_import_main",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_ote_import,
|
||
"cron",
|
||
hour=0,
|
||
minute=5,
|
||
id="ote_import_backfill",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.add_job(
|
||
scheduled_forecast_refresh,
|
||
"cron",
|
||
hour="*/2",
|
||
minute=5,
|
||
id="forecast_refresh_2h",
|
||
replace_existing=True,
|
||
)
|
||
|
||
async def scheduled_daily_economics_notification() -> None:
|
||
from services.notification_service import notify_daily_economics
|
||
|
||
async with app.state.pg_pool.acquire() as conn:
|
||
sites = await conn.fetch("SELECT id, code FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
site_id = int(site["id"])
|
||
site_code = site["code"]
|
||
try:
|
||
row = await conn.fetchrow(
|
||
"""
|
||
SELECT import_kwh, export_kwh,
|
||
import_cost_czk, export_revenue_czk,
|
||
green_bonus_czk, total_balance_czk,
|
||
planned_balance_czk
|
||
FROM ems.vw_economics_daily
|
||
WHERE site_id = $1
|
||
AND day_local = (
|
||
CURRENT_DATE AT TIME ZONE 'Europe/Prague' - INTERVAL '1 day'
|
||
)::date
|
||
""",
|
||
site_id,
|
||
)
|
||
if row is None:
|
||
continue
|
||
yesterday = (
|
||
datetime.now(ZoneInfo("Europe/Prague"))
|
||
- timedelta(days=1)
|
||
).strftime("%Y-%m-%d")
|
||
await notify_daily_economics(
|
||
site_code=site_code,
|
||
day=yesterday,
|
||
import_kwh=float(row["import_kwh"] or 0),
|
||
import_cost=float(row["import_cost_czk"] or 0),
|
||
export_kwh=float(row["export_kwh"] or 0),
|
||
export_revenue=float(row["export_revenue_czk"] or 0),
|
||
green_bonus=float(row["green_bonus_czk"] or 0),
|
||
total_balance=float(row["total_balance_czk"] or 0),
|
||
planned_balance=float(row["planned_balance_czk"])
|
||
if row["planned_balance_czk"] is not None
|
||
else None,
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"scheduled_daily_economics_notification site=%s failed",
|
||
site_id,
|
||
)
|
||
|
||
scheduler.add_job(
|
||
scheduled_daily_economics_notification,
|
||
"cron",
|
||
hour=7,
|
||
minute=0,
|
||
id="daily_economics_notification",
|
||
replace_existing=True,
|
||
)
|
||
scheduler.start()
|
||
|
||
telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool))
|
||
app.state.telemetry_task = telemetry_task
|
||
|
||
yield
|
||
|
||
ws_h = getattr(app.state, "ws_log_handler", None)
|
||
if ws_h is not None:
|
||
logging.getLogger().removeHandler(ws_h)
|
||
app.state.ws_log_handler = None
|
||
|
||
telemetry_task.cancel()
|
||
try:
|
||
await telemetry_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
scheduler.shutdown(wait=False)
|
||
set_pg_pool(None)
|
||
app.state.pg_pool = None
|
||
if pool:
|
||
await pool.close()
|
||
pool = None
|
||
|
||
|
||
app = FastAPI(title="EMS Platform", lifespan=lifespan)
|
||
|
||
app.include_router(plan_router, prefix="/api/v1")
|
||
app.include_router(ev_router, prefix="/api/v1")
|
||
app.include_router(full_status_router, prefix="/api/v1")
|
||
app.include_router(economics_router, prefix="/api/v1")
|
||
app.include_router(energy_flows_router, prefix="/api/v1")
|
||
|
||
sites_router = APIRouter(prefix="/api/v1/sites", tags=["sites"])
|
||
|
||
|
||
def _parse_ymd(s: str) -> date:
|
||
try:
|
||
return date.fromisoformat(s)
|
||
except ValueError:
|
||
raise HTTPException(status_code=400, detail="Invalid date, expected YYYY-MM-DD") from None
|
||
|
||
|
||
@sites_router.get("")
|
||
async def list_sites(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> list[dict[str, Any]]:
|
||
async with db.acquire() as conn:
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at
|
||
FROM ems.site
|
||
ORDER BY id
|
||
"""
|
||
)
|
||
return [record_to_dict(r) for r in rows]
|
||
|
||
|
||
@sites_router.get("/{site_id}/prices")
|
||
async def get_site_prices(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default today"),
|
||
) -> list[dict[str, Any]]:
|
||
if date_str is None:
|
||
date_str = date.today().isoformat()
|
||
d = _parse_ymd(date_str)
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT *
|
||
FROM ems.vw_site_effective_price
|
||
WHERE site_id = $1 AND interval_start::date = $2::date
|
||
ORDER BY interval_start
|
||
""",
|
||
site_id,
|
||
d,
|
||
)
|
||
return [record_to_dict(r) for r in rows]
|
||
|
||
|
||
class PricesImportResponse(BaseModel):
|
||
slots_imported: int
|
||
date: str
|
||
first_price_czk_kwh: float
|
||
|
||
|
||
class PricesLatestResponse(BaseModel):
|
||
latest_date: str
|
||
slots: int
|
||
min_price: float
|
||
max_price: float
|
||
avg_price: float
|
||
|
||
|
||
class ForecastRunResponse(BaseModel):
|
||
intervals_saved: int
|
||
pv_arrays: int
|
||
|
||
|
||
class ModbusCommandVerifyItem(BaseModel):
|
||
id: int
|
||
asset_code: str
|
||
register_name: str | None
|
||
value_to_write: int
|
||
value_verified: int | None
|
||
status: str
|
||
|
||
|
||
class ModbusVerifyResponse(BaseModel):
|
||
checked: int
|
||
verified: int
|
||
mismatch: int
|
||
commands: list[ModbusCommandVerifyItem]
|
||
|
||
|
||
async def _refresh_negative_price_predictions(conn: asyncpg.Connection, site_id: int) -> None:
|
||
"""Po importu cen / forecastu obnoví cache predikce záporných cen."""
|
||
try:
|
||
await conn.fetch("SELECT * FROM ems.fn_predict_negative_price_windows($1, 7)", site_id)
|
||
except Exception:
|
||
logger.warning(
|
||
"fn_predict_negative_price_windows failed for site %s",
|
||
site_id,
|
||
exc_info=True,
|
||
)
|
||
|
||
|
||
@sites_router.post(
|
||
"/{site_id}/prices/import",
|
||
response_model=PricesImportResponse,
|
||
summary="Import OTE cen (globální)",
|
||
description=(
|
||
"Zapíše do sdílené tabulky ems.market_interval_price (jedna sada dat pro všechny lokality). "
|
||
"site_id v cestě slouží ke kontrole existence lokality (kompatibilita s UI); po importu se "
|
||
"obnoví predikce záporných cen pro všechny aktivní lokality."
|
||
),
|
||
)
|
||
async def post_import_site_prices(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
date_str: str | None = Query(
|
||
None,
|
||
alias="date",
|
||
description="YYYY-MM-DD; výchozí = zítřek/dnes dle logiky OTE (Europe/Prague)",
|
||
),
|
||
) -> PricesImportResponse:
|
||
target: date | None = _parse_ymd(date_str) if date_str is not None else None
|
||
import_error: str | None = None
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
n, day, first_price, import_error = await import_ote_prices(
|
||
conn, site_id=None, target_date=target
|
||
)
|
||
if n >= 0:
|
||
sites = await conn.fetch("SELECT id FROM ems.site WHERE active = true")
|
||
for site in sites:
|
||
await _refresh_negative_price_predictions(conn, int(site["id"]))
|
||
if n < 0:
|
||
raise HTTPException(
|
||
status_code=422,
|
||
detail=f"OTE import selhal ({import_error or 'unknown'})",
|
||
)
|
||
return PricesImportResponse(
|
||
slots_imported=n,
|
||
date=day,
|
||
first_price_czk_kwh=first_price,
|
||
)
|
||
|
||
|
||
class NegPricePredictionItem(BaseModel):
|
||
predicted_date: str
|
||
window_start_hour: int
|
||
window_end_hour: int
|
||
probability_pct: float
|
||
expected_min_price: float | None
|
||
reason: str
|
||
|
||
|
||
class NegativePredictionsResponse(BaseModel):
|
||
predictions: list[NegPricePredictionItem]
|
||
insufficient_history: bool
|
||
|
||
|
||
@sites_router.get(
|
||
"/{site_id}/prices/negative-predictions",
|
||
response_model=NegativePredictionsResponse,
|
||
)
|
||
async def get_site_negative_price_predictions(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> NegativePredictionsResponse:
|
||
"""Cache predikce záporných cen (per site) + informace, zda je dost historie OTE."""
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
ndays = await conn.fetchval(
|
||
"""
|
||
SELECT COUNT(DISTINCT (interval_start AT TIME ZONE 'Europe/Prague')::date)::int
|
||
FROM ems.market_interval_price
|
||
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
|
||
AND interval_start >= now() - INTERVAL '400 days'
|
||
"""
|
||
)
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT
|
||
p.predicted_date,
|
||
p.window_start_hour,
|
||
p.window_end_hour,
|
||
p.probability_pct,
|
||
p.expected_min_price,
|
||
p.reason
|
||
FROM ems.predicted_negative_price_window p
|
||
WHERE p.site_id = $1
|
||
AND p.predicted_date > (
|
||
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
|
||
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
|
||
'Europe/Prague'
|
||
)
|
||
)::date
|
||
AND p.predicted_date <= (
|
||
CURRENT_TIMESTAMP AT TIME ZONE COALESCE(
|
||
NULLIF((SELECT timezone FROM ems.site WHERE id = $1), ''),
|
||
'Europe/Prague'
|
||
)
|
||
)::date + 7
|
||
ORDER BY p.predicted_date, p.window_start_hour
|
||
""",
|
||
site_id,
|
||
)
|
||
n_hist = int(ndays or 0)
|
||
predictions: list[NegPricePredictionItem] = []
|
||
for r in rows:
|
||
em = r["expected_min_price"]
|
||
pd = r["predicted_date"]
|
||
predictions.append(
|
||
NegPricePredictionItem(
|
||
predicted_date=pd.isoformat() if hasattr(pd, "isoformat") else str(pd),
|
||
window_start_hour=int(r["window_start_hour"]),
|
||
window_end_hour=int(r["window_end_hour"]),
|
||
probability_pct=float(r["probability_pct"]),
|
||
expected_min_price=float(em) if em is not None else None,
|
||
reason=r["reason"] if r["reason"] is not None else "",
|
||
)
|
||
)
|
||
return NegativePredictionsResponse(
|
||
predictions=predictions,
|
||
insufficient_history=n_hist < 28,
|
||
)
|
||
|
||
|
||
@sites_router.get("/{site_id}/prices/latest", response_model=PricesLatestResponse)
|
||
async def get_site_prices_latest(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> PricesLatestResponse:
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
row = await conn.fetchrow(
|
||
"""
|
||
SELECT
|
||
(interval_start AT TIME ZONE 'Europe/Prague')::date AS day,
|
||
COUNT(*)::int AS slots,
|
||
MIN(buy_raw_price_czk_kwh)::float AS min_price,
|
||
MAX(buy_raw_price_czk_kwh)::float AS max_price,
|
||
AVG(buy_raw_price_czk_kwh)::float AS avg_price
|
||
FROM ems.market_interval_price
|
||
WHERE market_source IN ('OTE_CZ', 'OTE_CZ_DAM')
|
||
GROUP BY day
|
||
ORDER BY day DESC
|
||
LIMIT 1
|
||
"""
|
||
)
|
||
if row is None or row["day"] is None:
|
||
raise HTTPException(status_code=404, detail="Žádná tržní data v databázi")
|
||
return PricesLatestResponse(
|
||
latest_date=row["day"].isoformat(),
|
||
slots=int(row["slots"] or 0),
|
||
min_price=float(row["min_price"] or 0.0),
|
||
max_price=float(row["max_price"] or 0.0),
|
||
avg_price=float(row["avg_price"] or 0.0),
|
||
)
|
||
|
||
|
||
@sites_router.get("/{site_id}/control/verify", response_model=ModbusVerifyResponse)
|
||
async def get_verify_modbus_commands(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
minutes: int = Query(10, ge=1, le=1440, description="Jak daleko zpět hledat written příkazy"),
|
||
) -> ModbusVerifyResponse:
|
||
"""
|
||
Ruční ověření Modbus zápisů (written) z posledních N minut.
|
||
Vhodné hned po manuálním exportu setpointů.
|
||
"""
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
lookback = timedelta(minutes=minutes)
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT id FROM ems.modbus_command
|
||
WHERE site_id = $1
|
||
AND status = 'written'
|
||
AND written_at >= now() - $2::interval
|
||
ORDER BY written_at
|
||
""",
|
||
site_id,
|
||
lookback,
|
||
)
|
||
ids = [int(r["id"]) for r in rows]
|
||
checked = len(ids)
|
||
if ids:
|
||
await verify_modbus_commands(ids, conn, site_id)
|
||
|
||
detail_rows = (
|
||
await conn.fetch(
|
||
"""
|
||
SELECT id, asset_code, register_name, value_to_write, value_verified, status
|
||
FROM ems.modbus_command
|
||
WHERE id = ANY($1::int[])
|
||
ORDER BY id
|
||
""",
|
||
ids,
|
||
)
|
||
if ids
|
||
else []
|
||
)
|
||
|
||
commands = [
|
||
ModbusCommandVerifyItem(
|
||
id=int(r["id"]),
|
||
asset_code=r["asset_code"],
|
||
register_name=r["register_name"],
|
||
value_to_write=int(r["value_to_write"]),
|
||
value_verified=int(r["value_verified"])
|
||
if r["value_verified"] is not None
|
||
else None,
|
||
status=r["status"],
|
||
)
|
||
for r in detail_rows
|
||
]
|
||
verified = sum(1 for c in commands if c.status == "verified")
|
||
mismatch = sum(1 for c in commands if c.status == "mismatch")
|
||
return ModbusVerifyResponse(
|
||
checked=checked,
|
||
verified=verified,
|
||
mismatch=mismatch,
|
||
commands=commands,
|
||
)
|
||
|
||
|
||
class DeyeRegistersLiveResponse(BaseModel):
|
||
reg108_charge_a: int
|
||
reg109_discharge_a: int
|
||
reg141_energy_mode: int
|
||
reg142_limit_control: int
|
||
reg143_export_limit_w: int
|
||
reg178_peak_shaving_switch: int
|
||
reg191_peak_shaving_w: int
|
||
read_at: str
|
||
|
||
|
||
@sites_router.get(
|
||
"/{site_id}/control/registers",
|
||
response_model=DeyeRegistersLiveResponse,
|
||
)
|
||
async def get_control_registers_live(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> DeyeRegistersLiveResponse:
|
||
"""Živé hodnoty registrů Deye 108/109/141/142/143/178/191 přes sdílený Modbus klient."""
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
try:
|
||
payload = await read_deye_registers_live(site_id, conn)
|
||
except ValueError:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="No controllable Modbus inverter for this site",
|
||
) from None
|
||
except Exception as e:
|
||
logger.warning("get_control_registers_live site=%s: %s", site_id, e)
|
||
raise HTTPException(
|
||
status_code=503,
|
||
detail=f"Modbus read failed: {e}",
|
||
) from e
|
||
return DeyeRegistersLiveResponse(**payload)
|
||
|
||
|
||
class ModbusJournalCommandRow(BaseModel):
|
||
id: int
|
||
register: int
|
||
register_name: str | None
|
||
value_to_write: int
|
||
value_written: int | None
|
||
value_verified: int | None
|
||
status: str
|
||
attempt_count: int
|
||
created_at: str
|
||
|
||
|
||
class ModbusJournalListResponse(BaseModel):
|
||
commands: list[ModbusJournalCommandRow]
|
||
|
||
|
||
@sites_router.get(
|
||
"/{site_id}/control/journal",
|
||
response_model=ModbusJournalListResponse,
|
||
)
|
||
async def get_control_command_journal(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
limit: int = Query(50, ge=1, le=100),
|
||
) -> ModbusJournalListResponse:
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval(
|
||
"SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id
|
||
)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT id, register, register_name, value_to_write, value_written,
|
||
value_verified, status, attempt_count, created_at
|
||
FROM ems.modbus_command
|
||
WHERE site_id = $1
|
||
ORDER BY created_at DESC
|
||
LIMIT $2
|
||
""",
|
||
site_id,
|
||
limit,
|
||
)
|
||
cmds: list[ModbusJournalCommandRow] = []
|
||
for r in rows:
|
||
d = record_to_dict(r)
|
||
ca = d["created_at"]
|
||
cmds.append(
|
||
ModbusJournalCommandRow(
|
||
id=int(d["id"]),
|
||
register=int(d["register"]),
|
||
register_name=d.get("register_name"),
|
||
value_to_write=int(d["value_to_write"]),
|
||
value_written=int(d["value_written"])
|
||
if d.get("value_written") is not None
|
||
else None,
|
||
value_verified=int(d["value_verified"])
|
||
if d.get("value_verified") is not None
|
||
else None,
|
||
status=str(d["status"]),
|
||
attempt_count=int(d["attempt_count"]),
|
||
created_at=ca if isinstance(ca, str) else str(ca),
|
||
)
|
||
)
|
||
return ModbusJournalListResponse(commands=cmds)
|
||
|
||
|
||
@sites_router.post("/{site_id}/forecast/run", response_model=ForecastRunResponse)
|
||
async def post_run_site_forecast(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> ForecastRunResponse:
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
try:
|
||
intervals, pv_arrays = await fetch_pv_forecast(site_id, conn)
|
||
except Exception as e:
|
||
logger.error("Forecast failed: %s", e, exc_info=True)
|
||
raise HTTPException(status_code=422, detail=str(e)) from e
|
||
if intervals >= 0:
|
||
await _refresh_negative_price_predictions(conn, site_id)
|
||
if intervals < 0:
|
||
raise HTTPException(
|
||
status_code=422,
|
||
detail="Forecast se nepodařilo stáhnout nebo zpracovat",
|
||
)
|
||
return ForecastRunResponse(intervals_saved=intervals, pv_arrays=pv_arrays)
|
||
|
||
|
||
@sites_router.get("/{site_id}/forecast/pv")
|
||
async def get_site_forecast_pv(
|
||
site_id: int,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
date_str: str | None = Query(None, alias="date", description="YYYY-MM-DD, default tomorrow"),
|
||
) -> dict[str, list[dict[str, Any]]]:
|
||
if date_str is None:
|
||
date_str = (date.today() + timedelta(days=1)).isoformat()
|
||
d = _parse_ymd(date_str)
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT run_id, pv_array_id, interval_start, power_w,
|
||
irradiance_wm2, temp_c, pv_array_code
|
||
FROM (
|
||
SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id)
|
||
fpi.run_id,
|
||
fpi.pv_array_id,
|
||
fpi.interval_start,
|
||
fpi.power_w,
|
||
fpi.irradiance_wm2,
|
||
fpi.temp_c,
|
||
apa.code AS pv_array_code
|
||
FROM ems.forecast_pv_interval fpi
|
||
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
|
||
JOIN ems.asset_pv_array apa
|
||
ON apa.id = fpr.pv_array_id AND apa.site_id = fpr.site_id
|
||
WHERE fpr.site_id = $1
|
||
AND fpi.interval_start::date = $2::date
|
||
AND fpr.status = 'ok'
|
||
ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC
|
||
) latest
|
||
ORDER BY pv_array_code, interval_start
|
||
""",
|
||
site_id,
|
||
d,
|
||
)
|
||
|
||
pv_a: list[dict[str, Any]] = []
|
||
pv_b: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
item = record_to_dict(r)
|
||
code = item.get("pv_array_code")
|
||
if code == "pv-a":
|
||
pv_a.append(item)
|
||
elif code == "pv-b":
|
||
pv_b.append(item)
|
||
return {"pv_a": pv_a, "pv_b": pv_b}
|
||
|
||
|
||
me_router = APIRouter(prefix="/api/v1/me", tags=["me"])
|
||
|
||
|
||
@me_router.get(
|
||
"/sites",
|
||
summary="Lokality přihlášeného uživatele (fáze bez auth)",
|
||
description="Aktuálně vrací všechny aktivní lokality; po zavedení autentizace se odfiltruje podle oprávnění.",
|
||
)
|
||
async def list_my_sites(
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> list[dict[str, Any]]:
|
||
async with db.acquire() as conn:
|
||
rows = await conn.fetch(
|
||
"""
|
||
SELECT id, code, name, timezone, latitude, longitude, active, notes, created_at
|
||
FROM ems.site
|
||
WHERE active = true
|
||
ORDER BY code
|
||
"""
|
||
)
|
||
return [record_to_dict(r) for r in rows]
|
||
|
||
|
||
app.include_router(me_router)
|
||
app.include_router(sites_router)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:5173,http://127.0.0.1:5173").split(","),
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
@app.websocket("/ws/telemetry")
|
||
async def ws_telemetry(websocket: WebSocket) -> None:
|
||
await manager.connect_telemetry(websocket)
|
||
try:
|
||
while True:
|
||
await websocket.receive_text() # keepalive
|
||
except WebSocketDisconnect:
|
||
manager.disconnect(websocket)
|
||
|
||
|
||
@app.websocket("/ws/logs")
|
||
async def ws_logs(websocket: WebSocket) -> None:
|
||
await manager.connect_logs(websocket)
|
||
try:
|
||
while True:
|
||
await websocket.receive_text()
|
||
except WebSocketDisconnect:
|
||
manager.disconnect(websocket)
|
||
|
||
|
||
async def _health_payload(db: asyncpg.Pool) -> dict[str, Any]:
|
||
db_status = "error"
|
||
active_plan_slots = 0
|
||
try:
|
||
async with db.acquire() as conn:
|
||
await conn.fetchval("SELECT 1")
|
||
db_status = "ok"
|
||
active_plan_slots = int(
|
||
await conn.fetchval(
|
||
"""
|
||
SELECT COUNT(*)::bigint
|
||
FROM ems.planning_interval pi
|
||
INNER JOIN ems.planning_run pr ON pr.id = pi.run_id
|
||
WHERE pr.status = 'active'
|
||
"""
|
||
)
|
||
or 0
|
||
)
|
||
except Exception as e:
|
||
logger.warning("health DB check failed: %s", e)
|
||
db_status = "error"
|
||
|
||
return {
|
||
"status": "ok" if db_status == "ok" else "degraded",
|
||
"db": db_status,
|
||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||
"active_plan_slots": active_plan_slots,
|
||
}
|
||
|
||
|
||
@app.get("/health")
|
||
@app.get("/api/v1/health")
|
||
async def health(db: Annotated[asyncpg.Pool, Depends(get_pool)]) -> dict[str, Any]:
|
||
return await _health_payload(db)
|
||
|
||
|
||
@app.get("/api/v1/health/detailed")
|
||
async def health_detailed(
|
||
request: Request,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> dict[str, Any]:
|
||
db_status: Literal["ok", "error"] = "error"
|
||
last_telemetry_age_sec = -1
|
||
last_plan_age_sec = -1
|
||
try:
|
||
async with db.acquire() as conn:
|
||
await conn.fetchval("SELECT 1")
|
||
db_status = "ok"
|
||
tel = await conn.fetchval(
|
||
"""
|
||
SELECT CASE
|
||
WHEN MAX(measured_at) IS NULL THEN -1
|
||
ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(measured_at)))::int)
|
||
END
|
||
FROM ems.telemetry_inverter
|
||
"""
|
||
)
|
||
if tel is not None:
|
||
last_telemetry_age_sec = int(tel)
|
||
plan_age = await conn.fetchval(
|
||
"""
|
||
SELECT CASE
|
||
WHEN MAX(pr.created_at) IS NULL THEN -1
|
||
ELSE GREATEST(0, EXTRACT(EPOCH FROM (now() - MAX(pr.created_at)))::int)
|
||
END
|
||
FROM ems.planning_run pr
|
||
WHERE pr.status = 'active'
|
||
"""
|
||
)
|
||
if plan_age is not None:
|
||
last_plan_age_sec = int(plan_age)
|
||
except Exception as e:
|
||
logger.warning("health detailed DB check failed: %s", e)
|
||
db_status = "error"
|
||
|
||
sched_state: Literal["running", "stopped"] = "running" if scheduler.running else "stopped"
|
||
t_task = getattr(request.app.state, "telemetry_task", None)
|
||
tel_loop: Literal["running", "stopped"] = (
|
||
"running" if t_task is not None and not t_task.done() else "stopped"
|
||
)
|
||
|
||
active_jobs: list[dict[str, Any]] = []
|
||
for job in scheduler.get_jobs():
|
||
nrt = job.next_run_time
|
||
active_jobs.append(
|
||
{
|
||
"id": str(job.id),
|
||
"next_run_time": nrt.isoformat() if nrt is not None else None,
|
||
}
|
||
)
|
||
|
||
return {
|
||
"db": db_status,
|
||
"scheduler": sched_state,
|
||
"telemetry_loop": tel_loop,
|
||
"last_telemetry_age_sec": last_telemetry_age_sec,
|
||
"last_plan_age_sec": last_plan_age_sec,
|
||
"active_jobs": active_jobs,
|
||
}
|
||
|
||
|
||
class SetSiteModeBody(BaseModel):
|
||
mode: str = Field(..., min_length=1)
|
||
notes: str | None = None
|
||
valid_until: datetime | None = None
|
||
|
||
|
||
class SetSiteModeResponse(BaseModel):
|
||
success: bool
|
||
mode: str
|
||
activated_at: datetime
|
||
|
||
|
||
@app.post("/api/v1/sites/{site_id}/mode", response_model=SetSiteModeResponse)
|
||
async def set_site_mode(
|
||
site_id: int,
|
||
body: SetSiteModeBody,
|
||
db: Annotated[asyncpg.Pool, Depends(get_pool)],
|
||
) -> SetSiteModeResponse:
|
||
mode = body.mode.strip().upper()
|
||
allowed = {"AUTO", "SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE", "MANUAL"}
|
||
if mode not in allowed:
|
||
raise HTTPException(status_code=400, detail=f"Unsupported mode: {body.mode}")
|
||
|
||
async with db.acquire() as conn:
|
||
site_ok = await conn.fetchval("SELECT EXISTS(SELECT 1 FROM ems.site WHERE id = $1)", site_id)
|
||
if not site_ok:
|
||
raise HTTPException(status_code=404, detail="Site not found")
|
||
|
||
try:
|
||
await run_fn_set_mode_with_discord(
|
||
conn,
|
||
site_id,
|
||
mode,
|
||
"user:api",
|
||
body.valid_until,
|
||
body.notes,
|
||
)
|
||
except asyncpg.PostgresError as e:
|
||
logger.warning("fn_set_mode failed: %s", e)
|
||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||
|
||
row = await conn.fetchrow(
|
||
"""
|
||
SELECT m.mode_code, m.activated_at, d.loxone_mode_value
|
||
FROM ems.site_operating_mode m
|
||
JOIN ems.operating_mode_def d ON d.code = m.mode_code
|
||
WHERE m.site_id = $1
|
||
""",
|
||
site_id,
|
||
)
|
||
if row is None:
|
||
raise HTTPException(status_code=500, detail="Mode row missing after set")
|
||
|
||
ep = await conn.fetchrow(
|
||
"""
|
||
SELECT host, port, protocol
|
||
FROM ems.site_endpoint
|
||
WHERE site_id = $1 AND endpoint_type = 'loxone_http' AND enabled = true
|
||
ORDER BY id
|
||
LIMIT 1
|
||
""",
|
||
site_id,
|
||
)
|
||
|
||
activated_at: datetime = row["activated_at"]
|
||
if activated_at.tzinfo is None:
|
||
activated_at = activated_at.replace(tzinfo=timezone.utc)
|
||
|
||
loxone_val: int | None = row["loxone_mode_value"]
|
||
if ep and loxone_val is not None:
|
||
proto = (ep["protocol"] or "http").lower()
|
||
if proto not in ("http", "https"):
|
||
proto = "http"
|
||
host = ep["host"]
|
||
port = int(ep["port"] or (443 if proto == "https" else 80))
|
||
base = f"{proto}://{host}:{port}"
|
||
url = f"{base}/dev/sps/io/EMS_Mode/{loxone_val}"
|
||
user = os.getenv("LOXONE_USER") or ""
|
||
password = os.getenv("LOXONE_PASSWORD") or ""
|
||
auth = (user, password) if user else None
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
r = await client.get(url, auth=auth)
|
||
r.raise_for_status()
|
||
except Exception as e:
|
||
logger.warning("Loxone EMS_Mode notify failed for site %s: %s", site_id, e)
|
||
|
||
return SetSiteModeResponse(success=True, mode=row["mode_code"], activated_at=activated_at)
|