Shelly Gen2 RPC klient (httpx) + unit testy

- Switch.GetStatus (output, apower W, aenergy.total Wh), Switch.Set
- jen Gen2 RPC, Gen1 odpověď parser odmítá; timeout, bez retry smyček
- testy: čistý parser + RPC přes httpx.MockTransport (bez sítě)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-11 22:37:57 +02:00
parent ccdca068a1
commit 7f22311172
2 changed files with 269 additions and 0 deletions

View File

@@ -0,0 +1,112 @@
"""
Shelly Gen2+ RPC klient (HTTP, httpx) — Switch.GetStatus / Switch.Set.
Záměrně POUZE Gen2 RPC (`/rpc/<Method>?...`). Gen1 REST (`/relay/0?turn=on`)
nepodporujeme — všechna nasazovaná relé (Plus/Pro řada) mluví Gen2 a fallback
by jen maskoval chybnou konfiguraci. Viz docs/04-modules/pool-shelly.md.
Žádné retry smyčky: telemetrii volá poll cyklus každých 60 s a další pokus
zajistí sám; ovládání jde přes signal_service (vlastní retry + verify).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import httpx
DEFAULT_TIMEOUT_S = 5.0
@dataclass(frozen=True)
class ShellySwitchStatus:
"""Stav Switch komponenty ze Switch.GetStatus."""
output: bool
apower_w: float | None
aenergy_total_wh: float | None
def shelly_base_url(protocol: str | None, host: str, port: int | None) -> str:
"""Base URL Shelly z řádku ems.site_endpoint (protocol/host/port)."""
p = (protocol or "http").lower()
if p not in ("http", "https"):
p = "http"
prt = int(port or (443 if p == "https" else 80))
return f"{p}://{host}:{prt}"
def parse_switch_status(data: dict[str, Any]) -> ShellySwitchStatus:
"""Čistý parser odpovědi Switch.GetStatus (testovatelné bez HTTP).
Gen2: {"id":0,"output":true,"apower":745.3,"aenergy":{"total":12345.678,...},...}
`aenergy.total` je ve Wh; `apower` ve W. Obojí volitelné (ne každý model měří).
"""
if "output" not in data:
raise ValueError("Shelly Switch.GetStatus: missing 'output' (not a Gen2 RPC response?)")
output = bool(data["output"])
apower_w: float | None = None
if data.get("apower") is not None:
apower_w = float(data["apower"])
aenergy_total_wh: float | None = None
aenergy = data.get("aenergy")
if isinstance(aenergy, dict) and aenergy.get("total") is not None:
aenergy_total_wh = float(aenergy["total"])
return ShellySwitchStatus(
output=output,
apower_w=apower_w,
aenergy_total_wh=aenergy_total_wh,
)
async def get_switch_status(
base_url: str,
switch_id: int = 0,
*,
timeout: float = DEFAULT_TIMEOUT_S,
client: httpx.AsyncClient | None = None,
) -> ShellySwitchStatus:
"""GET {base}/rpc/Switch.GetStatus?id=N → ShellySwitchStatus.
`client` lze injektovat (testy, sdílený klient); jinak se vytvoří jednorázový.
"""
url = f"{base_url.rstrip('/')}/rpc/Switch.GetStatus"
params = {"id": int(switch_id)}
if client is not None:
resp = await client.get(url, params=params)
else:
async with httpx.AsyncClient(timeout=timeout) as c:
resp = await c.get(url, params=params)
resp.raise_for_status()
return parse_switch_status(resp.json())
async def set_switch(
base_url: str,
on: bool,
switch_id: int = 0,
*,
timeout: float = DEFAULT_TIMEOUT_S,
client: httpx.AsyncClient | None = None,
) -> bool | None:
"""GET {base}/rpc/Switch.Set?id=N&on=true|false. Vrátí was_on (předchozí stav), pokud ho Shelly poslalo.
Pozn.: produkční ovládání bazénu jde přes signal_service (journal + verify);
tato funkce je pro ruční zásahy / budoucí přímé použití.
"""
url = f"{base_url.rstrip('/')}/rpc/Switch.Set"
# Gen2 RPC parsuje query parametry jako JSON — bool musí být 'true'/'false'.
params = {"id": int(switch_id), "on": "true" if on else "false"}
if client is not None:
resp = await client.get(url, params=params)
else:
async with httpx.AsyncClient(timeout=timeout) as c:
resp = await c.get(url, params=params)
resp.raise_for_status()
data = resp.json()
was_on = data.get("was_on") if isinstance(data, dict) else None
return bool(was_on) if was_on is not None else None

View File

@@ -0,0 +1,157 @@
"""Shelly Gen2 RPC klient — parser Switch.GetStatus a stavba RPC volání (mock httpx)."""
from __future__ import annotations
import asyncio
import json
import unittest
import httpx
from services.shelly_client import (
ShellySwitchStatus,
get_switch_status,
parse_switch_status,
set_switch,
shelly_base_url,
)
class ParseSwitchStatusTests(unittest.TestCase):
def test_full_gen2_payload(self) -> None:
st = parse_switch_status(
{
"id": 0,
"source": "HTTP_in",
"output": True,
"apower": 745.3,
"voltage": 231.2,
"current": 3.25,
"aenergy": {"total": 12345.678, "by_minute": [123, 120, 118]},
"temperature": {"tC": 41.2},
}
)
self.assertEqual(
st,
ShellySwitchStatus(output=True, apower_w=745.3, aenergy_total_wh=12345.678),
)
def test_minimal_payload_without_metering(self) -> None:
# Levnější relé bez měření: jen output.
st = parse_switch_status({"id": 0, "output": False})
self.assertFalse(st.output)
self.assertIsNone(st.apower_w)
self.assertIsNone(st.aenergy_total_wh)
def test_missing_output_raises(self) -> None:
# Gen1 /relay/0 odpověď ('ison') nesmí tiše projít — podporujeme jen Gen2.
with self.assertRaises(ValueError):
parse_switch_status({"ison": True, "has_timer": False})
def test_zero_values_kept(self) -> None:
st = parse_switch_status(
{"id": 0, "output": False, "apower": 0.0, "aenergy": {"total": 0.0}}
)
self.assertEqual(st.apower_w, 0.0)
self.assertEqual(st.aenergy_total_wh, 0.0)
class ShellyBaseUrlTests(unittest.TestCase):
def test_defaults(self) -> None:
self.assertEqual(shelly_base_url(None, "192.168.1.50", None), "http://192.168.1.50:80")
def test_https_default_port(self) -> None:
self.assertEqual(shelly_base_url("https", "shelly.local", None), "https://shelly.local:443")
def test_unknown_protocol_falls_back_to_http(self) -> None:
self.assertEqual(shelly_base_url("modbus_tcp", "1.2.3.4", 8080), "http://1.2.3.4:8080")
class ShellyRpcTests(unittest.TestCase):
"""RPC přes httpx.MockTransport — bez sítě."""
def _client(self, handler) -> httpx.AsyncClient:
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
def test_get_switch_status(self) -> None:
seen: dict[str, str] = {}
def handler(request: httpx.Request) -> httpx.Response:
seen["path"] = request.url.path
seen["id"] = request.url.params.get("id")
return httpx.Response(
200,
json={"id": 0, "output": True, "apower": 740.0, "aenergy": {"total": 999.5}},
)
async def run() -> ShellySwitchStatus:
async with self._client(handler) as client:
return await get_switch_status("http://192.168.1.50:80", 0, client=client)
st = asyncio.run(run())
self.assertEqual(seen["path"], "/rpc/Switch.GetStatus")
self.assertEqual(seen["id"], "0")
self.assertTrue(st.output)
self.assertEqual(st.apower_w, 740.0)
self.assertEqual(st.aenergy_total_wh, 999.5)
def test_set_switch_sends_json_bool_and_returns_was_on(self) -> None:
seen: dict[str, str] = {}
def handler(request: httpx.Request) -> httpx.Response:
seen["path"] = request.url.path
seen["id"] = request.url.params.get("id")
seen["on"] = request.url.params.get("on")
return httpx.Response(200, json={"was_on": False})
async def run() -> bool | None:
async with self._client(handler) as client:
return await set_switch("http://192.168.1.50:80/", True, 0, client=client)
was_on = asyncio.run(run())
self.assertEqual(seen["path"], "/rpc/Switch.Set")
self.assertEqual(seen["id"], "0")
# Gen2 RPC parsuje query jako JSON — bool musí být doslova 'true'/'false'.
self.assertEqual(seen["on"], "true")
self.assertIs(was_on, False)
def test_set_switch_off(self) -> None:
seen: dict[str, str] = {}
def handler(request: httpx.Request) -> httpx.Response:
seen["on"] = request.url.params.get("on")
return httpx.Response(200, json={"was_on": True})
async def run() -> bool | None:
async with self._client(handler) as client:
return await set_switch("http://10.0.0.7", False, client=client)
self.assertIs(asyncio.run(run()), True)
self.assertEqual(seen["on"], "false")
def test_http_error_raises(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(500, text="boom")
async def run() -> None:
async with self._client(handler) as client:
await get_switch_status("http://10.0.0.7", client=client)
with self.assertRaises(httpx.HTTPStatusError):
asyncio.run(run())
def test_non_gen2_body_raises_value_error(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
# Gen1 odpověď — klient ji odmítne (žádný Gen1 fallback).
return httpx.Response(200, content=json.dumps({"ison": True}))
async def run() -> None:
async with self._client(handler) as client:
await get_switch_status("http://10.0.0.7", client=client)
with self.assertRaises(ValueError):
asyncio.run(run())
if __name__ == "__main__":
unittest.main()