From ccdca068a1eb7bae25a311e60fe3851972f2d226 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:37:42 +0200 Subject: [PATCH 1/4] =?UTF-8?q?DB:=20baz=C3=A9nov=C3=A9=20=C4=8Derpadlo=20?= =?UTF-8?q?p=C5=99es=20Shelly=20rel=C3=A9=20(V085)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ems.asset_pool_pump (endpoint http, rated_power_w, min_run_min, daily_runtime_min jako aktuální sezónní hodnota, schedulable) - ems.telemetry_pool_pump — 1min hypertable (is_on, power_w, energy_wh_total) - signal_def POOL_PUMP_ON (bool) pro ovládání přes signal infrastrukturu - fn_telemetry_pool_pump_sample (R__092), vw_asset_pool_pump_http_poll (R__093) - fn_signal_enqueue_bool (R__094) — SQL-first zařazení bool signálu do fronty Co-Authored-By: Claude Fable 5 --- db/migration/V085__pool_shelly.sql | 107 ++++++++++++++++++ .../R__092_fn_telemetry_pool_pump_sample.sql | 32 ++++++ db/routines/R__094_fn_signal_enqueue_bool.sql | 59 ++++++++++ .../R__093_vw_asset_pool_pump_http_poll.sql | 18 +++ 4 files changed, 216 insertions(+) create mode 100644 db/migration/V085__pool_shelly.sql create mode 100644 db/routines/R__092_fn_telemetry_pool_pump_sample.sql create mode 100644 db/routines/R__094_fn_signal_enqueue_bool.sql create mode 100644 db/views/R__093_vw_asset_pool_pump_http_poll.sql diff --git a/db/migration/V085__pool_shelly.sql b/db/migration/V085__pool_shelly.sql new file mode 100644 index 0000000..c7c45a0 --- /dev/null +++ b/db/migration/V085__pool_shelly.sql @@ -0,0 +1,107 @@ +-- Bazénové čerpadlo přes Shelly relé (Gen2 RPC). +-- (a) asset + 1min telemetrie vlastním pollingem (Shelly drží jen okamžitý stav a čítač +-- aenergy.total — historii si stavíme sami jako u ostatních zařízení, 60 s), +-- (b) ovládání on/off přes existující signal infrastrukturu (signal_def POOL_PUMP_ON, +-- route http_rest na Switch.Set — route je per site, seed v docs/04-modules/pool-shelly.md), +-- (c) plánovač: odložitelná zátěž s denní povinnou dobou filtrace (follow-up, viz docs). + +-- ------------------------------------------------------------ +-- Aktivum: bazénové čerpadlo za Shelly relé +-- ------------------------------------------------------------ +create table ems.asset_pool_pump ( + id serial primary key, + site_id int not null references ems.site (id), + code text not null, + manufacturer text, + model text, + endpoint_id int references ems.site_endpoint (id), + shelly_switch_id int not null default 0, + rated_power_w int not null, + min_run_min int not null default 15, + daily_runtime_min int not null default 240, + schedulable boolean not null default true, + notes text, + constraint uq_asset_pool_pump_site_code unique (site_id, code) +); + +comment on table ems.asset_pool_pump is + 'Bazénové (filtrační) čerpadlo spínané přes Shelly relé (Gen2 RPC, HTTP). Konstantní příkon, odložitelná zátěž s denní povinnou dobou běhu.'; + +comment on column ems.asset_pool_pump.site_id is + 'Vazba na lokalitu.'; + +comment on column ems.asset_pool_pump.code is + 'Kód aktiva, unikátní v rámci lokality. Příklad: pool-pump-01.'; + +comment on column ems.asset_pool_pump.endpoint_id is + 'HTTP endpoint Shelly relé (ems.site_endpoint, endpoint_type http_api nebo shelly_http). Bez endpointu se čerpadlo nepolluje.'; + +comment on column ems.asset_pool_pump.shelly_switch_id is + 'Id Switch komponenty v Shelly Gen2 RPC (Switch.GetStatus?id=N). U 1kanálových relé 0.'; + +comment on column ems.asset_pool_pump.rated_power_w is + 'Jmenovitý příkon čerpadla ve W. Plánovač s ním počítá jako s konstantním výkonem při běhu.'; + +comment on column ems.asset_pool_pump.min_run_min is + 'Minimální nepřerušený běh v minutách (ochrana čerpadla před krátkým cyklováním). Násobky 15min slotů.'; + +comment on column ems.asset_pool_pump.daily_runtime_min is + 'Denní povinná doba filtrace v minutách — AKTUÁLNÍ sezónní hodnota (léto typ. více, zima méně / 0). Mění ji provozovatel ručně podle sezóny; plnohodnotný sezónní profil (tabulka měsíc → minuty) je follow-up, viz docs/04-modules/pool-shelly.md. 0 = filtrace vypnutá (mimo sezónu).'; + +comment on column ems.asset_pool_pump.schedulable is + 'true = plánovač smí rozkládat běh do levných/přebytkových slotů; false = EMS jen měří, nespíná.'; + +-- ------------------------------------------------------------ +-- 1min telemetrie (TimescaleDB hypertable) +-- ------------------------------------------------------------ +create table ems.telemetry_pool_pump ( + site_id int not null references ems.site (id), + pump_id int not null references ems.asset_pool_pump (id), + measured_at timestamptz not null, + is_on boolean, + power_w int, + energy_wh_total bigint, + primary key (pump_id, measured_at) +); + +comment on table ems.telemetry_pool_pump is + 'Telemetrie bazénového čerpadla ze Shelly relé (Gen2 Switch.GetStatus), 1min polling. TimescaleDB hypertable. Historie se staví výhradně tady — Shelly ji nedrží.'; + +comment on column ems.telemetry_pool_pump.site_id is + 'Vazba na lokalitu.'; + +comment on column ems.telemetry_pool_pump.pump_id is + 'Vazba na ems.asset_pool_pump.'; + +comment on column ems.telemetry_pool_pump.measured_at is + 'Čas měření (UTC).'; + +comment on column ems.telemetry_pool_pump.is_on is + 'Stav relé (Switch.GetStatus output).'; + +comment on column ems.telemetry_pool_pump.power_w is + 'Okamžitý činný příkon ve W (Switch.GetStatus apower). NULL pokud model neměří výkon.'; + +comment on column ems.telemetry_pool_pump.energy_wh_total is + 'Kumulativní čítač energie ve Wh (Switch.GetStatus aenergy.total). Po výpadku napájení Shelly může čítač začít znovu — energii za interval počítat jako kladnou diferenci.'; + +select create_hypertable( + 'ems.telemetry_pool_pump', + 'measured_at', + chunk_time_interval => interval '1 week', + if_not_exists => true +); + +create index idx_telemetry_pool_pump_site_time + on ems.telemetry_pool_pump (site_id, measured_at desc); + +-- ------------------------------------------------------------ +-- Signál pro ovládání relé (route per site se seeduje provozně, šablona v docs) +-- ------------------------------------------------------------ +insert into ems.signal_def (code, value_type, description) +values ( + 'POOL_PUMP_ON', + 'bool', + 'Požadovaný stav bazénového čerpadla (Shelly relé). Doručuje signal_service přes signal_route http_rest na Shelly Gen2 Switch.Set, readback verify přes Switch.GetStatus. Hodnotu nastavuje plánovač / operátor (fn_signal_enqueue_bool).' +) +on conflict (code) do nothing; diff --git a/db/routines/R__092_fn_telemetry_pool_pump_sample.sql b/db/routines/R__092_fn_telemetry_pool_pump_sample.sql new file mode 100644 index 0000000..fc85e91 --- /dev/null +++ b/db/routines/R__092_fn_telemetry_pool_pump_sample.sql @@ -0,0 +1,32 @@ +create or replace function ems.fn_telemetry_pool_pump_sample( + p_site_id int, + p_pump_id int, + p_measured_at timestamptz, + p_is_on boolean, + p_power_w int, + p_energy_wh_total bigint +) +returns void +language sql +as $fn$ + insert into ems.telemetry_pool_pump ( + site_id, + pump_id, + measured_at, + is_on, + power_w, + energy_wh_total + ) + values ( + p_site_id, + p_pump_id, + p_measured_at, + p_is_on, + p_power_w, + p_energy_wh_total + ) + on conflict (pump_id, measured_at) do nothing; +$fn$; + +comment on function ems.fn_telemetry_pool_pump_sample is + 'Insert 1min telemetrie bazénového čerpadla (Shelly Switch.GetStatus: output, apower, aenergy.total). Volá telemetry_collector.poll_pool_pumps.'; diff --git a/db/routines/R__094_fn_signal_enqueue_bool.sql b/db/routines/R__094_fn_signal_enqueue_bool.sql new file mode 100644 index 0000000..40ab4bd --- /dev/null +++ b/db/routines/R__094_fn_signal_enqueue_bool.sql @@ -0,0 +1,59 @@ +create or replace function ems.fn_signal_enqueue_bool( + p_site_id int, + p_signal_code text, + p_value boolean +) +returns int +language plpgsql +as $fn$ +declare + v_route record; + v_value_text text; + v_count int := 0; +begin + -- Zařadí bool signál do odchozí fronty pro všechny aktivní routy (site, kód). + -- Transformaci na text dělá per route stejně jako backend (_bool_to_text): + -- transform_json->'map_bool'->>'true'/'false', default '1'/'0'. + for v_route in + select r.id, r.site_id, r.destination_type, r.destination_key, r.transform_json + from ems.signal_route r + where r.site_id = p_site_id + and r.signal_code = p_signal_code + and r.enabled = true + loop + v_value_text := coalesce( + v_route.transform_json -> 'map_bool' ->> (case when p_value then 'true' else 'false' end), + case when p_value then '1' else '0' end + ); + + insert into ems.signal_state ( + site_id, signal_code, destination_type, destination_key, + last_desired_value_text, updated_at + ) + values ( + p_site_id, p_signal_code, v_route.destination_type, v_route.destination_key, + v_value_text, now() + ) + on conflict (site_id, signal_code, destination_type, destination_key) + do update set + last_desired_value_text = excluded.last_desired_value_text, + updated_at = now(); + + insert into ems.signal_outbound_journal ( + route_id, site_id, signal_code, value_text, value_num, status, + attempt_count, next_attempt_at + ) + values ( + v_route.id, p_site_id, p_signal_code, v_value_text, + case when p_value then 1 else 0 end, 'queued', 0, now() + ); + + v_count := v_count + 1; + end loop; + + return v_count; +end; +$fn$; + +comment on function ems.fn_signal_enqueue_bool is + 'Zařadí bool signál (např. POOL_PUMP_ON) do signal_outbound_journal pro všechny aktivní routy daného site a kódu; doručení a verify řeší signal_service (každých 15 s). Vrací počet zařazených řádků. Použití: select ems.fn_signal_enqueue_bool(1, ''POOL_PUMP_ON'', true);'; diff --git a/db/views/R__093_vw_asset_pool_pump_http_poll.sql b/db/views/R__093_vw_asset_pool_pump_http_poll.sql new file mode 100644 index 0000000..f9bf130 --- /dev/null +++ b/db/views/R__093_vw_asset_pool_pump_http_poll.sql @@ -0,0 +1,18 @@ +drop view if exists ems.vw_asset_pool_pump_http_poll; + +create view ems.vw_asset_pool_pump_http_poll as +select + pp.site_id, + pp.id as pump_id, + pp.code, + pp.shelly_switch_id, + se.protocol, + se.host, + se.port +from ems.asset_pool_pump pp +join ems.site_endpoint se on se.id = pp.endpoint_id +where se.enabled = true + and se.endpoint_type in ('http_api', 'shelly_http'); + +comment on view ems.vw_asset_pool_pump_http_poll is + 'Bazénová čerpadla se Shelly HTTP endpointem pro telemetry_collector (Gen2 RPC polling, 60 s).'; From 7f22311172aae40f3f885a90be047435a3f6a373 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:37:57 +0200 Subject: [PATCH 2/4] Shelly Gen2 RPC klient (httpx) + unit testy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Switch.GetStatus (output, apower W, aenergy.total Wh), Switch.Set - jen Gen2 RPC, Gen1 odpověď parser odmítá; timeout, bez retry smyček - testy: čistý parser + RPC přes httpx.MockTransport (bez sítě) Co-Authored-By: Claude Fable 5 --- backend/services/shelly_client.py | 112 ++++++++++++++++++++ backend/tests/test_shelly_client.py | 157 ++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 backend/services/shelly_client.py create mode 100644 backend/tests/test_shelly_client.py diff --git a/backend/services/shelly_client.py b/backend/services/shelly_client.py new file mode 100644 index 0000000..8177614 --- /dev/null +++ b/backend/services/shelly_client.py @@ -0,0 +1,112 @@ +""" +Shelly Gen2+ RPC klient (HTTP, httpx) — Switch.GetStatus / Switch.Set. + +Záměrně POUZE Gen2 RPC (`/rpc/?...`). Gen1 REST (`/relay/0?turn=on`) +nepodporujeme — všechna nasazovaná relé (Plus/Pro řada) mluví Gen2 a fallback +by jen maskoval chybnou konfiguraci. Viz docs/04-modules/pool-shelly.md. + +Žádné retry smyčky: telemetrii volá poll cyklus každých 60 s a další pokus +zajistí sám; ovládání jde přes signal_service (vlastní retry + verify). +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import httpx + +DEFAULT_TIMEOUT_S = 5.0 + + +@dataclass(frozen=True) +class ShellySwitchStatus: + """Stav Switch komponenty ze Switch.GetStatus.""" + + output: bool + apower_w: float | None + aenergy_total_wh: float | None + + +def shelly_base_url(protocol: str | None, host: str, port: int | None) -> str: + """Base URL Shelly z řádku ems.site_endpoint (protocol/host/port).""" + p = (protocol or "http").lower() + if p not in ("http", "https"): + p = "http" + prt = int(port or (443 if p == "https" else 80)) + return f"{p}://{host}:{prt}" + + +def parse_switch_status(data: dict[str, Any]) -> ShellySwitchStatus: + """Čistý parser odpovědi Switch.GetStatus (testovatelné bez HTTP). + + Gen2: {"id":0,"output":true,"apower":745.3,"aenergy":{"total":12345.678,...},...} + `aenergy.total` je ve Wh; `apower` ve W. Obojí volitelné (ne každý model měří). + """ + if "output" not in data: + raise ValueError("Shelly Switch.GetStatus: missing 'output' (not a Gen2 RPC response?)") + output = bool(data["output"]) + + apower_w: float | None = None + if data.get("apower") is not None: + apower_w = float(data["apower"]) + + aenergy_total_wh: float | None = None + aenergy = data.get("aenergy") + if isinstance(aenergy, dict) and aenergy.get("total") is not None: + aenergy_total_wh = float(aenergy["total"]) + + return ShellySwitchStatus( + output=output, + apower_w=apower_w, + aenergy_total_wh=aenergy_total_wh, + ) + + +async def get_switch_status( + base_url: str, + switch_id: int = 0, + *, + timeout: float = DEFAULT_TIMEOUT_S, + client: httpx.AsyncClient | None = None, +) -> ShellySwitchStatus: + """GET {base}/rpc/Switch.GetStatus?id=N → ShellySwitchStatus. + + `client` lze injektovat (testy, sdílený klient); jinak se vytvoří jednorázový. + """ + url = f"{base_url.rstrip('/')}/rpc/Switch.GetStatus" + params = {"id": int(switch_id)} + if client is not None: + resp = await client.get(url, params=params) + else: + async with httpx.AsyncClient(timeout=timeout) as c: + resp = await c.get(url, params=params) + resp.raise_for_status() + return parse_switch_status(resp.json()) + + +async def set_switch( + base_url: str, + on: bool, + switch_id: int = 0, + *, + timeout: float = DEFAULT_TIMEOUT_S, + client: httpx.AsyncClient | None = None, +) -> bool | None: + """GET {base}/rpc/Switch.Set?id=N&on=true|false. Vrátí was_on (předchozí stav), pokud ho Shelly poslalo. + + Pozn.: produkční ovládání bazénu jde přes signal_service (journal + verify); + tato funkce je pro ruční zásahy / budoucí přímé použití. + """ + url = f"{base_url.rstrip('/')}/rpc/Switch.Set" + # Gen2 RPC parsuje query parametry jako JSON — bool musí být 'true'/'false'. + params = {"id": int(switch_id), "on": "true" if on else "false"} + if client is not None: + resp = await client.get(url, params=params) + else: + async with httpx.AsyncClient(timeout=timeout) as c: + resp = await c.get(url, params=params) + resp.raise_for_status() + data = resp.json() + was_on = data.get("was_on") if isinstance(data, dict) else None + return bool(was_on) if was_on is not None else None diff --git a/backend/tests/test_shelly_client.py b/backend/tests/test_shelly_client.py new file mode 100644 index 0000000..1266146 --- /dev/null +++ b/backend/tests/test_shelly_client.py @@ -0,0 +1,157 @@ +"""Shelly Gen2 RPC klient — parser Switch.GetStatus a stavba RPC volání (mock httpx).""" + +from __future__ import annotations + +import asyncio +import json +import unittest + +import httpx + +from services.shelly_client import ( + ShellySwitchStatus, + get_switch_status, + parse_switch_status, + set_switch, + shelly_base_url, +) + + +class ParseSwitchStatusTests(unittest.TestCase): + def test_full_gen2_payload(self) -> None: + st = parse_switch_status( + { + "id": 0, + "source": "HTTP_in", + "output": True, + "apower": 745.3, + "voltage": 231.2, + "current": 3.25, + "aenergy": {"total": 12345.678, "by_minute": [123, 120, 118]}, + "temperature": {"tC": 41.2}, + } + ) + self.assertEqual( + st, + ShellySwitchStatus(output=True, apower_w=745.3, aenergy_total_wh=12345.678), + ) + + def test_minimal_payload_without_metering(self) -> None: + # Levnější relé bez měření: jen output. + st = parse_switch_status({"id": 0, "output": False}) + self.assertFalse(st.output) + self.assertIsNone(st.apower_w) + self.assertIsNone(st.aenergy_total_wh) + + def test_missing_output_raises(self) -> None: + # Gen1 /relay/0 odpověď ('ison') nesmí tiše projít — podporujeme jen Gen2. + with self.assertRaises(ValueError): + parse_switch_status({"ison": True, "has_timer": False}) + + def test_zero_values_kept(self) -> None: + st = parse_switch_status( + {"id": 0, "output": False, "apower": 0.0, "aenergy": {"total": 0.0}} + ) + self.assertEqual(st.apower_w, 0.0) + self.assertEqual(st.aenergy_total_wh, 0.0) + + +class ShellyBaseUrlTests(unittest.TestCase): + def test_defaults(self) -> None: + self.assertEqual(shelly_base_url(None, "192.168.1.50", None), "http://192.168.1.50:80") + + def test_https_default_port(self) -> None: + self.assertEqual(shelly_base_url("https", "shelly.local", None), "https://shelly.local:443") + + def test_unknown_protocol_falls_back_to_http(self) -> None: + self.assertEqual(shelly_base_url("modbus_tcp", "1.2.3.4", 8080), "http://1.2.3.4:8080") + + +class ShellyRpcTests(unittest.TestCase): + """RPC přes httpx.MockTransport — bez sítě.""" + + def _client(self, handler) -> httpx.AsyncClient: + return httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + def test_get_switch_status(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["path"] = request.url.path + seen["id"] = request.url.params.get("id") + return httpx.Response( + 200, + json={"id": 0, "output": True, "apower": 740.0, "aenergy": {"total": 999.5}}, + ) + + async def run() -> ShellySwitchStatus: + async with self._client(handler) as client: + return await get_switch_status("http://192.168.1.50:80", 0, client=client) + + st = asyncio.run(run()) + self.assertEqual(seen["path"], "/rpc/Switch.GetStatus") + self.assertEqual(seen["id"], "0") + self.assertTrue(st.output) + self.assertEqual(st.apower_w, 740.0) + self.assertEqual(st.aenergy_total_wh, 999.5) + + def test_set_switch_sends_json_bool_and_returns_was_on(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["path"] = request.url.path + seen["id"] = request.url.params.get("id") + seen["on"] = request.url.params.get("on") + return httpx.Response(200, json={"was_on": False}) + + async def run() -> bool | None: + async with self._client(handler) as client: + return await set_switch("http://192.168.1.50:80/", True, 0, client=client) + + was_on = asyncio.run(run()) + self.assertEqual(seen["path"], "/rpc/Switch.Set") + self.assertEqual(seen["id"], "0") + # Gen2 RPC parsuje query jako JSON — bool musí být doslova 'true'/'false'. + self.assertEqual(seen["on"], "true") + self.assertIs(was_on, False) + + def test_set_switch_off(self) -> None: + seen: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + seen["on"] = request.url.params.get("on") + return httpx.Response(200, json={"was_on": True}) + + async def run() -> bool | None: + async with self._client(handler) as client: + return await set_switch("http://10.0.0.7", False, client=client) + + self.assertIs(asyncio.run(run()), True) + self.assertEqual(seen["on"], "false") + + def test_http_error_raises(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(500, text="boom") + + async def run() -> None: + async with self._client(handler) as client: + await get_switch_status("http://10.0.0.7", client=client) + + with self.assertRaises(httpx.HTTPStatusError): + asyncio.run(run()) + + def test_non_gen2_body_raises_value_error(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + # Gen1 odpověď — klient ji odmítne (žádný Gen1 fallback). + return httpx.Response(200, content=json.dumps({"ison": True})) + + async def run() -> None: + async with self._client(handler) as client: + await get_switch_status("http://10.0.0.7", client=client) + + with self.assertRaises(ValueError): + asyncio.run(run()) + + +if __name__ == "__main__": + unittest.main() From 733224d18de9769d47f956f7e42b5591028cfece Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:37:57 +0200 Subject: [PATCH 3/4] =?UTF-8?q?Collector:=20poll=5Fpool=5Fpumps=20?= =?UTF-8?q?=E2=80=94=201min=20telemetrie=20Shelly=20baz=C3=A9nu?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - nová funkce na konci souboru + jeden řádek v run_telemetry_loop (minimální dotyk kvůli souběžným změnám na main) - čte vw_asset_pool_pump_http_poll, zapisuje fn_telemetry_pool_pump_sample - při výpadku čtení nic nezapisuje (žádná fabrikovaná nula) Co-Authored-By: Claude Fable 5 --- backend/services/telemetry_collector.py | 40 +++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/backend/services/telemetry_collector.py b/backend/services/telemetry_collector.py index 588d180..aefe92a 100644 --- a/backend/services/telemetry_collector.py +++ b/backend/services/telemetry_collector.py @@ -300,6 +300,7 @@ async def run_telemetry_loop(conn: asyncpg.Connection) -> float: await poll_inverter(sid, conn) await poll_ev_chargers(sid, conn) await poll_heat_pump(sid, conn) + await poll_pool_pumps(sid, conn) except Exception as e: logger.error("Telemetry loop error site %s: %s", sid, e) return loop.time() - start @@ -322,3 +323,42 @@ async def run_telemetry_loop_wrapper(pool: asyncpg.Pool) -> None: if elapsed > 50: logger.warning("Telemetry loop took %.1fs (>50s)", elapsed) await asyncio.sleep(max(0.0, 60.0 - elapsed)) + + +async def poll_pool_pumps(site_id: int, db: asyncpg.Connection) -> None: + """Poll bazénových čerpadel přes Shelly relé (Gen2 RPC Switch.GetStatus), 60 s. + + Shelly nedrží historii — stavíme ji 1min vzorky jako u ostatních zařízení. + """ + # Lokální import: minimální dotyk hlavičky souboru (souběžné změny na main). + from services.shelly_client import get_switch_status, shelly_base_url + + rows = await db.fetch( + """ + select pump_id as id, code, shelly_switch_id, protocol, host, port + from ems.vw_asset_pool_pump_http_poll + where site_id = $1 + """, + site_id, + ) + measured_at = datetime.now(timezone.utc) + for row in rows: + code = row["code"] + base = shelly_base_url(row["protocol"], row["host"], row["port"]) + try: + status = await get_switch_status(base, int(row["shelly_switch_id"] or 0)) + except Exception as e: + # Při výpadku čtení NIC nezapisovat — fabrikovaná nula by špinila + # historii spotřeby (stejný princip jako u EV nabíječek výše). + logger.warning("pool pump %s (%s) read failed: %s", code, base, e) + continue + + await db.execute( + "select ems.fn_telemetry_pool_pump_sample($1::int, $2::int, $3::timestamptz, $4::boolean, $5::int, $6::bigint)", + site_id, + row["id"], + measured_at, + status.output, + int(round(status.apower_w)) if status.apower_w is not None else None, + int(round(status.aenergy_total_wh)) if status.aenergy_total_wh is not None else None, + ) From cf663ae41774e404754783b77e6e6a0d6f5f25be Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Thu, 11 Jun 2026 22:37:57 +0200 Subject: [PATCH 4/4] =?UTF-8?q?Docs:=20pool-shelly=20=E2=80=94=20architekt?= =?UTF-8?q?ura,=20=C5=A1ablony=20seed=C5=AF,=20n=C3=A1vrh=20solver=20integ?= =?UTF-8?q?race?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - šablona insertů endpoint/asset/signal_route (placeholdery IP, výkon, runtime) - tok ovládání přes fn_signal_enqueue_bool a signal_service - návrh pool[t] binárky analogicky hp[t] s denním runtime constraintem - checklist oživení, otevřené otázky (sezónnost, bazál, UI) Co-Authored-By: Claude Fable 5 --- docs/04-modules/pool-shelly.md | 112 +++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 docs/04-modules/pool-shelly.md diff --git a/docs/04-modules/pool-shelly.md b/docs/04-modules/pool-shelly.md new file mode 100644 index 0000000..1764b8f --- /dev/null +++ b/docs/04-modules/pool-shelly.md @@ -0,0 +1,112 @@ +# Bazénové čerpadlo přes Shelly relé + +**Stav:** telemetrie + ovládací infrastruktura **implementováno** (V085); integrace do LP solveru **📋 návrh / follow-up** (viz `planning-neg-sell-strategy.md`, sekce bazén a UI workshop). + +Cíl: (a) vlastní historie spotřeby čerpadla, (b) ovládání on/off, (c) plánovač jako odložitelná zátěž — „denní povinné hodiny filtrace, ideálně v levných / přebytkových slotech". + +--- + +## 1. Architektura + +``` +Shelly relé (Gen2 RPC, HTTP) + ▲ poll 60 s ▲ Switch.Set + readback verify + │ Switch.GetStatus │ +telemetry_collector.poll_pool_pumps signal_service (15 s worker) + │ ▲ + ▼ │ signal_outbound_journal (queued) +ems.telemetry_pool_pump ems.fn_signal_enqueue_bool(site, 'POOL_PUMP_ON', bool) + (hypertable, 1 min) ▲ + │ zatím: operátor / cron; follow-up: plánovač +``` + +- **Jen Gen2 RPC** (`/rpc/Switch.GetStatus`, `/rpc/Switch.Set`) — Plus/Pro řada. **Gen1 REST (`/relay/0?turn=on`) záměrně nepodporujeme**; parser odmítne Gen1 odpověď (`ison`) chybou, aby se chybná konfigurace neprojevila tichým výpadkem dat. +- **Historie**: Shelly drží jen okamžitý stav a kumulativní čítač `aenergy.total` (Wh). Historii si stavíme sami 1min pollingem jako u všeho ostatního (Deye, EV, TČ). +- **Ovládání**: žádný zásah do control exporteru — používá se existující signal infrastruktura (`signal_def` / `signal_route` / `signal_outbound_journal`, `services/signal_service.py`) s journalem, retry a readback verify. + +## 2. DB objekty (V085 + repeatables) + +| Objekt | Soubor | Popis | +|--------|--------|-------| +| `ems.asset_pool_pump` | `db/migration/V085__pool_shelly.sql` | Aktivum: `endpoint_id` → `site_endpoint` (typ `http_api` / `shelly_http`), `shelly_switch_id` (Gen2 Switch id, typ. 0), `rated_power_w` (konstantní příkon), `min_run_min`, `daily_runtime_min`, `schedulable`. | +| `ems.telemetry_pool_pump` | tamtéž | Hypertable 1min: `is_on`, `power_w` (apower), `energy_wh_total` (aenergy.total, Wh). PK `(pump_id, measured_at)`. | +| `signal_def` `POOL_PUMP_ON` | tamtéž | Bool signál — požadovaný stav relé. | +| `ems.fn_telemetry_pool_pump_sample` | `db/routines/R__092_…` | Insert vzorku (on conflict do nothing). | +| `ems.vw_asset_pool_pump_http_poll` | `db/views/R__093_…` | Čerpadla s aktivním HTTP endpointem pro collector. | +| `ems.fn_signal_enqueue_bool` | `db/routines/R__094_…` | SQL-first zařazení bool signálu do odchozí fronty (všechny aktivní routy site+kód); aplikuje `transform_json.map_bool` per route stejně jako backend. | + +**Sezónnost:** `daily_runtime_min` je **aktuální sezónní hodnota** (léto typicky 240–480 min, zima méně / 0 = filtrace vypnutá). Mění ji provozovatel ručně; plnohodnotný sezónní profil (tabulka měsíc → minuty, případně podle teploty vody) je follow-up — viz §6. + +## 3. Telemetrie + +- `telemetry_collector.poll_pool_pumps(site_id, db)` — součást 60s smyčky (`run_telemetry_loop`), čte `vw_asset_pool_pump_http_poll`, volá `services/shelly_client.get_switch_status`, zapisuje přes `fn_telemetry_pool_pump_sample`. +- Při výpadku čtení se **nic nezapisuje** (žádná fabrikovaná nula — stejný princip jako EV nabíječky). +- `energy_wh_total` je čítač — energie za interval = kladná diference po sobě jdoucích vzorků (po výpadku napájení Shelly může čítač začít znovu; záporné diference zahazovat). +- Follow-up: zařadit `power_w` čerpadla mezi řízené zátěže při výpočtu bazálu (`fn_update_baseline_stats`) a do `vw_latest_telemetry`, jakmile poteče reálná telemetrie. + +## 4. Ovládání on/off (signál `POOL_PUMP_ON`) + +Route je per site a obsahuje IP — **neseeduje se migrací**, zakládá se provozně podle šablony (placeholdery `<...>`): + +```sql +-- 1) endpoint Shelly relé +insert into ems.site_endpoint (site_id, endpoint_type, host, port, protocol, enabled, notes) +values (, 'http_api', '', 80, 'http', true, 'Shelly relé bazénového čerpadla') +returning id; -- → + +-- 2) aktivum +insert into ems.asset_pool_pump (site_id, code, endpoint_id, rated_power_w, min_run_min, daily_runtime_min) +values (, 'pool-pump-01', , , 15, ); + +-- 3) route signálu na Shelly (Gen2 RPC; bool v query musí být doslova true/false → map_bool) +insert into ems.signal_route ( + site_id, destination_type, endpoint_id, signal_code, destination_key, + route_config_json, transform_json, verify_readback, verify_config_json +) +values ( + , 'http_rest', , 'POOL_PUMP_ON', 'switch0', + '{"method": "GET", "path_template": "/rpc/Switch.Set?id=0&on={value}"}', + '{"map_bool": {"true": "true", "false": "false"}}', + true, + '{"read_path": "/rpc/Switch.GetStatus?id=0", "json_path": "$.output"}' +); +``` + +Tok: `select ems.fn_signal_enqueue_bool(, 'POOL_PUMP_ON', true);` → `signal_outbound_journal` (`queued`) → worker `signal_outbound_send` (15 s) pošle `GET /rpc/Switch.Set?id=0&on=true` → `sent` → worker `signal_outbound_verify` přečte `Switch.GetStatus` a porovná `$.output` → `verified` (retry/backoff a `abandoned` po 12 pokusech dle `signal_service`). + +**Kdo signál nastavuje (fáze):** + +1. **Teď:** operátor ručně (`fn_signal_enqueue_bool`) nebo jednoduchý cron (např. pg_cron / APScheduler tick: zapnout v naplánovaných hodinách, vypnout mimo ně). +2. **Follow-up (plná integrace):** plánovač zapíše běh bazénu do `planning_interval` (nový sloupec, např. `pool_pump_on boolean`); tick na hranici 15min slotu (analogický control exporteru, ale přes signály) porovná plán s `signal_state` a zavolá `fn_signal_enqueue_bool` jen při změně (idempotenci řeší `signal_state` + `_should_skip_enqueue` logika). + +## 5. Integrace do solveru (📋 návrh — analogie `hp[t]`) + +V `solver_v2.py` je TČ spojitá proměnná `hp[t] ∈ [0, rated_w]` vstupující do bilance `load_site`. Bazén je jednodušší — **konstantní příkon, binární běh**: + +- Proměnné: `pool[t] ∈ {0, 1}` (LpBinary) pro sloty v plánovacím horizontu; příkon ve slotu = `pool[t] * rated_power_w`. +- Bilance: `load_site = load_baseline + ev + hp[t] + pool[t] * rated_power_w`. +- **Denní povinný runtime** (kalendářní den v `site.timezone`, jako ostatní denní logika): pro každý den `d` plně pokrytý horizontem: `sum(pool[t] for t in day_d) * 15 >= daily_runtime_min` (zbytek dne při rolling replanu: odečíst již odběhané minuty z `telemetry_pool_pump` od půlnoci — viz `fn_battery_cycle_audit` vzor agregace). +- **Min. souvislý běh**: `min_run_min / 15` slotů — klasická minimální up-time formulace přes binárku startu `pool_start[t] >= pool[t] − pool[t−1]` a `pool[t..t+k] >= pool_start[t]`. +- Cíl: žádný extra term — levné/přebytkové sloty vyberou samy ceny v účelové funkci (import za `buy[t]`, ušlý export za `sell[t]`); v okně `sell < 0` funguje bazén přirozeně jako **flex sink** (viz `planning-neg-sell-strategy.md`, `E_surplus_after_t`). +- `schedulable = false` → solver čerpadlo ignoruje (jen telemetrie), `daily_runtime_min = 0` → žádný constraint. +- Pozor na MILP velikost: +96–144 binárek/den; držet se vzoru `z_export`/`y_imp` (HiGHS to zvládá). +- Výstup: `planning_interval.pool_pump_on` (nová migrace) + export přes signál (§4 fáze 2); audit follow-up: skutečnost z `telemetry_pool_pump` do `audit_interval`. + +## 6. Otevřené otázky / follow-upy + +- Sezónní profil `daily_runtime_min` (tabulka měsíc → minuty? řízení podle teploty vody?) — zatím ruční změna hodnoty. +- Produktové rozhodnutí UI pro flex zátěže (workshop dle `planning-neg-sell-strategy.md` §UI) — bazén do slot detailu a „Dnes X/Y h filtrace". +- Bazál: odečítat `telemetry_pool_pump.power_w` v `fn_update_baseline_stats` (jinak se bazén započte do baseline a solver by ho počítal dvakrát). +- PostgREST granty (`ems_anon`) na `telemetry_pool_pump` / view, až bude UI číst. +- Více čerpadel na jednom Shelly Pro 2PM (`shelly_switch_id` 0/1) — schéma to umí, collector i route ano; netestováno. + +## 7. Checklist oživení (placeholdery) + +1. [ ] Shelly připojené na LAN, statická IP ``, ověřit ručně: `curl http:///rpc/Switch.GetStatus?id=0` → JSON s `output` (Gen2!). +2. [ ] `insert into ems.site_endpoint …` (šablona §4) → ``. +3. [ ] `insert into ems.asset_pool_pump …` s `` (štítek čerpadla, typ. 400–1100 W) a `` (sezóna). +4. [ ] Počkat ≤ 60 s, ověřit telemetrii: `select * from ems.telemetry_pool_pump order by measured_at desc limit 5;` +5. [ ] `insert into ems.signal_route …` (šablona §4). +6. [ ] Test zapnutí: `select ems.fn_signal_enqueue_bool(, 'POOL_PUMP_ON', true);` → do ~30 s `signal_outbound_journal.status = 'verified'` a relé sepnuté; pak vypnout (`false`). +7. [ ] Zkontrolovat `power_w` v telemetrii při běhu ≈ `rated_power_w` (případně upravit). +8. [ ] Nastavit dočasné spínání (cron / ručně) do doby solver integrace (§5).