From 7f22311172aae40f3f885a90be047435a3f6a373 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:37:57 +0200 Subject: [PATCH] Shelly Gen2 RPC klient (httpx) + unit testy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Switch.GetStatus (output, apower W, aenergy.total Wh), Switch.Set - jen Gen2 RPC, Gen1 odpověď parser odmítá; timeout, bez retry smyček - testy: čistý parser + RPC přes httpx.MockTransport (bez sítě) Co-Authored-By: Claude Fable 5 --- backend/services/shelly_client.py | 112 ++++++++++++++++++++ backend/tests/test_shelly_client.py | 157 ++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 backend/services/shelly_client.py create mode 100644 backend/tests/test_shelly_client.py diff --git a/backend/services/shelly_client.py b/backend/services/shelly_client.py new file mode 100644 index 0000000..8177614 --- /dev/null +++ b/backend/services/shelly_client.py @@ -0,0 +1,112 @@ +""" +Shelly Gen2+ RPC klient (HTTP, httpx) — Switch.GetStatus / Switch.Set. + +Záměrně POUZE Gen2 RPC (`/rpc/?...`). Gen1 REST (`/relay/0?turn=on`) +nepodporujeme — všechna nasazovaná relé (Plus/Pro řada) mluví Gen2 a fallback +by jen maskoval chybnou konfiguraci. Viz docs/04-modules/pool-shelly.md. + +Žádné retry smyčky: telemetrii volá poll cyklus každých 60 s a další pokus +zajistí sám; ovládání jde přes signal_service (vlastní retry + verify). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import httpx + +DEFAULT_TIMEOUT_S = 5.0 + + +@dataclass(frozen=True) +class ShellySwitchStatus: + """Stav Switch komponenty ze Switch.GetStatus.""" + + output: bool + apower_w: float | None + aenergy_total_wh: float | None + + +def shelly_base_url(protocol: str | None, host: str, port: int | None) -> str: + """Base URL Shelly z řádku ems.site_endpoint (protocol/host/port).""" + p = (protocol or "http").lower() + if p not in ("http", "https"): + p = "http" + prt = int(port or (443 if p == "https" else 80)) + return f"{p}://{host}:{prt}" + + +def parse_switch_status(data: dict[str, Any]) -> ShellySwitchStatus: + """Čistý parser odpovědi Switch.GetStatus (testovatelné bez HTTP). + + Gen2: {"id":0,"output":true,"apower":745.3,"aenergy":{"total":12345.678,...},...} + `aenergy.total` je ve Wh; `apower` ve W. Obojí volitelné (ne každý model měří). + """ + if "output" not in data: + raise ValueError("Shelly Switch.GetStatus: missing 'output' (not a Gen2 RPC response?)") + output = bool(data["output"]) + + apower_w: float | None = None + if data.get("apower") is not None: + apower_w = float(data["apower"]) + + aenergy_total_wh: float | None = None + aenergy = data.get("aenergy") + if isinstance(aenergy, dict) and aenergy.get("total") is not None: + aenergy_total_wh = float(aenergy["total"]) + + return ShellySwitchStatus( + output=output, + apower_w=apower_w, + aenergy_total_wh=aenergy_total_wh, + ) + + +async def get_switch_status( + base_url: str, + switch_id: int = 0, + *, + timeout: float = DEFAULT_TIMEOUT_S, + client: httpx.AsyncClient | None = None, +) -> ShellySwitchStatus: + """GET {base}/rpc/Switch.GetStatus?id=N → ShellySwitchStatus. + + `client` lze injektovat (testy, sdílený klient); jinak se vytvoří jednorázový. + """ + url = f"{base_url.rstrip('/')}/rpc/Switch.GetStatus" + params = {"id": int(switch_id)} + if client is not None: + resp = await client.get(url, params=params) + else: + async with httpx.AsyncClient(timeout=timeout) as c: + resp = await c.get(url, params=params) + resp.raise_for_status() + return parse_switch_status(resp.json()) + + +async def set_switch( + base_url: str, + on: bool, + switch_id: int = 0, + *, + timeout: float = DEFAULT_TIMEOUT_S, + client: httpx.AsyncClient | None = None, +) -> bool | None: + """GET {base}/rpc/Switch.Set?id=N&on=true|false. Vrátí was_on (předchozí stav), pokud ho Shelly poslalo. + + Pozn.: produkční ovládání bazénu jde přes signal_service (journal + verify); + tato funkce je pro ruční zásahy / budoucí přímé použití. + """ + url = f"{base_url.rstrip('/')}/rpc/Switch.Set" + # Gen2 RPC parsuje query parametry jako JSON — bool musí být 'true'/'false'. + params = {"id": int(switch_id), "on": "true" if on else "false"} + if client is not None: + resp = await client.get(url, params=params) + else: + async with httpx.AsyncClient(timeout=timeout) as c: + resp = await c.get(url, params=params) + resp.raise_for_status() + data = resp.json() + was_on = data.get("was_on") if isinstance(data, dict) else None + return bool(was_on) if was_on is not None else None diff --git a/backend/tests/test_shelly_client.py b/backend/tests/test_shelly_client.py new file mode 100644 index 0000000..1266146 --- /dev/null +++ b/backend/tests/test_shelly_client.py @@ -0,0 +1,157 @@ +"""Shelly Gen2 RPC klient — parser Switch.GetStatus a stavba RPC volání (mock httpx).""" + +from __future__ import annotations + +import asyncio +import json +import unittest + +import httpx + +from services.shelly_client import ( + ShellySwitchStatus, + get_switch_status, + parse_switch_status, + set_switch, + shelly_base_url, +) + + +class ParseSwitchStatusTests(unittest.TestCase): + def test_full_gen2_payload(self) -> None: + st = parse_switch_status( + { + "id": 0, + "source": "HTTP_in", + "output": True, + "apower": 745.3, + "voltage": 231.2, + "current": 3.25, + "aenergy": {"total": 12345.678, "by_minute": [123, 120, 118]}, + "temperature": {"tC": 41.2}, + } + ) + self.assertEqual( + st, + ShellySwitchStatus(output=True, apower_w=745.3, aenergy_total_wh=12345.678), + ) + + def test_minimal_payload_without_metering(self) -> None: + # Levnější relé bez měření: jen output. + st = parse_switch_status({"id": 0, "output": False}) + self.assertFalse(st.output) + self.assertIsNone(st.apower_w) + self.assertIsNone(st.aenergy_total_wh) + + def test_missing_output_raises(self) -> None: + # Gen1 /relay/0 odpověď ('ison') nesmí tiše projít — podporujeme jen Gen2. + with self.assertRaises(ValueError): + parse_switch_status({"ison": True, "has_timer": False}) + + def test_zero_values_kept(self) -> None: + st = parse_switch_status( + {"id": 0, "output": False, "apower": 0.0, "aenergy": {"total": 0.0}} + ) + self.assertEqual(st.apower_w, 0.0) + self.assertEqual(st.aenergy_total_wh, 0.0) + + +class ShellyBaseUrlTests(unittest.TestCase): + def test_defaults(self) -> None: + self.assertEqual(shelly_base_url(None, "192.168.1.50", None), "http://192.168.1.50:80") + + def test_https_default_port(self) -> None: + self.assertEqual(shelly_base_url("https", "shelly.local", None), "https://shelly.local:443") + + def test_unknown_protocol_falls_back_to_http(self) -> None: + self.assertEqual(shelly_base_url("modbus_tcp", "1.2.3.4", 8080), "http://1.2.3.4:8080") + + +class ShellyRpcTests(unittest.TestCase): + """RPC přes httpx.MockTransport — bez sítě.""" + + def _client(self, handler) -> httpx.AsyncClient: + return httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + def test_get_switch_status(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["path"] = request.url.path + seen["id"] = request.url.params.get("id") + return httpx.Response( + 200, + json={"id": 0, "output": True, "apower": 740.0, "aenergy": {"total": 999.5}}, + ) + + async def run() -> ShellySwitchStatus: + async with self._client(handler) as client: + return await get_switch_status("http://192.168.1.50:80", 0, client=client) + + st = asyncio.run(run()) + self.assertEqual(seen["path"], "/rpc/Switch.GetStatus") + self.assertEqual(seen["id"], "0") + self.assertTrue(st.output) + self.assertEqual(st.apower_w, 740.0) + self.assertEqual(st.aenergy_total_wh, 999.5) + + def test_set_switch_sends_json_bool_and_returns_was_on(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["path"] = request.url.path + seen["id"] = request.url.params.get("id") + seen["on"] = request.url.params.get("on") + return httpx.Response(200, json={"was_on": False}) + + async def run() -> bool | None: + async with self._client(handler) as client: + return await set_switch("http://192.168.1.50:80/", True, 0, client=client) + + was_on = asyncio.run(run()) + self.assertEqual(seen["path"], "/rpc/Switch.Set") + self.assertEqual(seen["id"], "0") + # Gen2 RPC parsuje query jako JSON — bool musí být doslova 'true'/'false'. + self.assertEqual(seen["on"], "true") + self.assertIs(was_on, False) + + def test_set_switch_off(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["on"] = request.url.params.get("on") + return httpx.Response(200, json={"was_on": True}) + + async def run() -> bool | None: + async with self._client(handler) as client: + return await set_switch("http://10.0.0.7", False, client=client) + + self.assertIs(asyncio.run(run()), True) + self.assertEqual(seen["on"], "false") + + def test_http_error_raises(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(500, text="boom") + + async def run() -> None: + async with self._client(handler) as client: + await get_switch_status("http://10.0.0.7", client=client) + + with self.assertRaises(httpx.HTTPStatusError): + asyncio.run(run()) + + def test_non_gen2_body_raises_value_error(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + # Gen1 odpověď — klient ji odmítne (žádný Gen1 fallback). + return httpx.Response(200, content=json.dumps({"ison": True})) + + async def run() -> None: + async with self._client(handler) as client: + await get_switch_status("http://10.0.0.7", client=client) + + with self.assertRaises(ValueError): + asyncio.run(run()) + + +if __name__ == "__main__": + unittest.main()