""" Odchozí signály EMS → Loxone / HTTP (journal, retry, readback verify). Kritické řízení výkonu (Deye, EV, TČ) zůstává v Modbus exporteru a modbus_command. """ from __future__ import annotations import json import logging import os import re from datetime import datetime, timedelta, timezone from typing import Any import asyncpg import httpx from app.config import get_settings logger = logging.getLogger(__name__) SIGNAL_EXPORT_BAN_ACTIVE = "EXPORT_BAN_ACTIVE" # Po úspěšném verify neposílat stejnou hodnotu znovu po tuto dobu (idempotence). _IDEMPOTENCE_TTL = timedelta(minutes=10) # Max pokusů před abandoned (odeslání + verify dohromady řídí attempt_count). _MAX_ATTEMPTS = 12 _VERIFY_AFTER_SEND = timedelta(seconds=1) def _loxone_auth() -> tuple[str, str] | None: settings = get_settings() user = settings.loxone_user or os.getenv("LOXONE_USER") or "" password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" return (user, password) if user else None def _endpoint_base_url(proto: str | None, host: str, port: int | None) -> str: p = (proto or "http").lower() if p not in ("http", "https"): p = "http" prt = int(port or (443 if p == "https" else 80)) return f"{p}://{host}:{prt}" def _bool_to_text(v: bool, transform_json: dict[str, Any] | None) -> str: if transform_json and "map_bool" in transform_json: m = transform_json["map_bool"] if isinstance(m, dict): return str(m.get("true" if v else "false", "1" if v else "0")) return "1" if v else "0" def _parse_loxone_io_value(body: str) -> float | None: """Z odpovědi Loxone /dev/sps/io/… vytáhni číselnou hodnotu.""" if not body: return None s = body.strip() # často XML nebo prostý text s číslem nums = re.findall(r"-?\d+(?:\.\d+)?", s) if not nums: return None try: return float(nums[-1]) except ValueError: return None def _http_rest_write_url( base: str, route_config_json: dict[str, Any] | None, value_text: str ) -> tuple[str, str]: """Vrátí (method, url) pro http_rest zápis.""" cfg = route_config_json or {} method = str(cfg.get("method", "GET")).upper() path = str(cfg.get("path_template", "")) path = path.replace("{value}", value_text).replace("{v}", value_text) if not path.startswith("/"): path = "/" + path return method, f"{base.rstrip('/')}{path}" def _http_rest_verify_url(base: str, verify_cfg: dict[str, Any] | None) -> str | None: if not verify_cfg: return None path = str(verify_cfg.get("read_path", "")) if not path: return None if not path.startswith("/"): path = "/" + path return f"{base.rstrip('/')}{path}" def _read_json_path(data: Any, path: str | None) -> Any: if path is None or path == "" or path == "$": return data if path.startswith("$."): path = path[2:] cur: Any = data for part in path.split("."): if not part: continue if isinstance(cur, dict) and part in cur: cur = cur[part] else: return None return cur async def compute_export_ban_active(site_id: int, conn: asyncpg.Connection) -> bool: """ Kanonický význam EXPORT_BAN_ACTIVE (LED varianta B). True pokud EMS uplatňuje zákaz exportu: no_export, block_export override, režimy bez exportu (SELF_SUSTAIN, CHARGE_CHEAP, PRESERVE), nebo AUTO se záporným výkupem při grid_setpoint_w >= 0 (soulad s _build_setpoints / export_ban), včetně price failsafe (predikovaná cena → pasivní ochrana). """ mode_row = await conn.fetchrow( """ SELECT som.mode_code FROM ems.site_operating_mode som WHERE som.site_id = $1::int """, site_id, ) if mode_row is None: return False mode_code = str(mode_row["mode_code"] or "").upper() if mode_code == "MANUAL": return False if mode_code in ("SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE"): return True no_export = await conn.fetchval( """ SELECT COALESCE(sgc.no_export, false) FROM ems.site_grid_connection sgc WHERE sgc.site_id = $1::int """, site_id, ) if bool(no_export): return True ov = await conn.fetchval( """ SELECT 1 FROM ems.site_override o WHERE o.site_id = $1::int AND o.override_type = 'block_export' AND o.valid_from <= now() AND (o.valid_to IS NULL OR o.valid_to > now()) LIMIT 1 """, site_id, ) if ov is not None: return True if mode_code != "AUTO": return False raw = await conn.fetchval( """ SELECT ems.fn_planning_interval_at_offset($1::int, 0) """, site_id, ) if raw is None: return False pi = raw if isinstance(raw, dict) else json.loads(raw) if not pi: return False if bool(pi.get("is_predicted_price")): return True sell_raw = pi.get("effective_sell_price") grid_sp = int(pi.get("grid_setpoint_w") or 0) if sell_raw is None: return False try: sell_f = float(sell_raw) except (TypeError, ValueError): return False return sell_f < 0 and grid_sp >= 0 async def _should_skip_enqueue( conn: asyncpg.Connection, site_id: int, signal_code: str, destination_type: str, destination_key: str, desired_text: str, ) -> bool: row = await conn.fetchrow( """ SELECT last_sent_value_text, last_verified_value_text, last_verified_at FROM ems.signal_state WHERE site_id = $1 AND signal_code = $2 AND destination_type = $3 AND destination_key = $4 """, site_id, signal_code, destination_type, destination_key, ) if row is None: return False if row["last_sent_value_text"] != desired_text: return False if row["last_verified_value_text"] != desired_text: return False lv = row["last_verified_at"] if lv is None: return False if lv.tzinfo is None: lv = lv.replace(tzinfo=timezone.utc) return datetime.now(timezone.utc) - lv < _IDEMPOTENCE_TTL async def enqueue_site_signals(site_id: int, conn: asyncpg.Connection) -> None: """Zařadí odchozí řádky pro všechny aktivní routy daného site (po výpočtu signálů).""" export_ban = await compute_export_ban_active(site_id, conn) desired = {SIGNAL_EXPORT_BAN_ACTIVE: export_ban} routes = await conn.fetch( """ SELECT r.id, r.site_id, r.destination_type, r.endpoint_id, r.signal_code, r.destination_key, r.transform_json, r.verify_readback, r.verify_config_json, r.route_config_json, r.enabled FROM ems.signal_route r WHERE r.site_id = $1::int AND r.enabled = true """, site_id, ) for r in routes: sig = str(r["signal_code"]) if sig not in desired: continue dest_type = str(r["destination_type"]) dest_key = str(r["destination_key"]) tf = r["transform_json"] tfd = tf if isinstance(tf, dict) else (json.loads(tf) if tf else None) val_bool = bool(desired[sig]) value_text = _bool_to_text(val_bool, tfd) if await _should_skip_enqueue( conn, site_id, sig, dest_type, dest_key, value_text ): continue await conn.execute( """ INSERT INTO ems.signal_state ( site_id, signal_code, destination_type, destination_key, last_desired_value_text, updated_at ) VALUES ($1, $2, $3, $4, $5, now()) ON CONFLICT (site_id, signal_code, destination_type, destination_key) DO UPDATE SET last_desired_value_text = EXCLUDED.last_desired_value_text, updated_at = now() """, site_id, sig, dest_type, dest_key, value_text, ) await conn.execute( """ INSERT INTO ems.signal_outbound_journal ( route_id, site_id, signal_code, value_text, value_num, status, attempt_count, next_attempt_at ) VALUES ($1, $2, $3, $4, $5, 'queued', 0, now()) """, int(r["id"]), site_id, sig, value_text, 1.0 if val_bool else 0.0, ) async def process_signal_outbound_send( conn: asyncpg.Connection, *, limit: int = 30 ) -> int: """Odešle až `limit` řádků ve stavu queued. Vrátí počet zpracovaných.""" rows = await conn.fetch( """ SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text, j.attempt_count FROM ems.signal_outbound_journal j WHERE j.status = 'queued' AND j.next_attempt_at <= now() ORDER BY j.id LIMIT $1 FOR UPDATE SKIP LOCKED """, limit, ) n = 0 for j in rows: jid = int(j["id"]) route = await conn.fetchrow( """ SELECT r.*, e.host, e.port, e.protocol, e.endpoint_type FROM ems.signal_route r JOIN ems.site_endpoint e ON e.id = r.endpoint_id WHERE r.id = $1::int AND r.enabled = true """, int(j["route_id"]), ) if route is None: await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = 'abandoned', last_error = 'route missing or disabled' WHERE id = $1::bigint """, jid, ) n += 1 continue dest_type = str(route["destination_type"]) base = _endpoint_base_url( route.get("protocol"), str(route["host"]), route.get("port") ) auth = _loxone_auth() url: str method = "GET" cfg = route["route_config_json"] rcfg = cfg if isinstance(cfg, dict) else (json.loads(cfg) if cfg else None) try: if dest_type == "loxone_vi": io_name = str(route["destination_key"]) val = str(j["value_text"]) url = f"{base}/dev/sps/io/{io_name}/{val}" elif dest_type == "http_rest": method, url = _http_rest_write_url(base, rcfg, str(j["value_text"])) else: await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = 'abandoned', last_error = $2, attempt_count = attempt_count + 1 WHERE id = $1::bigint """, jid, f"unknown destination_type: {dest_type}", ) n += 1 continue except Exception as e: ac = int(j["attempt_count"]) + 1 delay = min(300, 2 ** min(ac, 8)) st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = $2::text, last_error = $3::text, attempt_count = $4::int, next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END WHERE id = $1::bigint """, jid, st, str(e)[:500], ac, delay, ) n += 1 continue t0 = datetime.now(timezone.utc) try: async with httpx.AsyncClient(timeout=8.0) as client: if method == "GET": resp = await client.get(url, auth=auth) elif method == "POST": body = None if rcfg and isinstance(rcfg.get("json_body"), dict): body = json.dumps(rcfg["json_body"]) resp = await client.post( url, auth=auth, content=body, headers={"Content-Type": "application/json"} if body else None, ) else: raise ValueError(f"unsupported HTTP method {method}") resp.raise_for_status() body_txt = (resp.text or "")[:2000] except Exception as e: ac = int(j["attempt_count"]) + 1 delay = min(300, 2 ** min(ac, 8)) st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = $2::text, attempt_count = $3::int, last_error = $4::text, next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, http_method = $6::text, request_url = $7::text WHERE id = $1::bigint """, jid, st, ac, str(e)[:500], delay, method, url, ) n += 1 continue dt_ms = int( (datetime.now(timezone.utc) - t0).total_seconds() * 1000 ) vr = bool(route["verify_readback"]) await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = $2::text, http_method = $3::text, request_url = $4::text, http_status = $5::int, latency_ms = $6::int, response_body_trunc = $7::text, sent_at = now(), last_error = NULL, verified_at = CASE WHEN $2::text = 'verified' THEN now() ELSE NULL END WHERE id = $1::bigint """, jid, "verified" if not vr else "sent", method, url, 200, dt_ms, (body_txt or "")[:500], ) if not vr: await conn.execute( """ INSERT INTO ems.signal_state ( site_id, signal_code, destination_type, destination_key, last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $5, now(), now(), now()) ON CONFLICT (site_id, signal_code, destination_type, destination_key) DO UPDATE SET last_sent_value_text = EXCLUDED.last_sent_value_text, last_verified_value_text = EXCLUDED.last_verified_value_text, last_sent_at = now(), last_verified_at = now(), updated_at = now() """, int(j["site_id"]), str(j["signal_code"]), dest_type, str(route["destination_key"]), str(j["value_text"]), ) n += 1 return n async def process_signal_outbound_verify( conn: asyncpg.Connection, *, limit: int = 30 ) -> int: """Ověří řádky ve stavu sent (readback). Vrátí počet zpracovaných.""" rows = await conn.fetch( """ SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text FROM ems.signal_outbound_journal j WHERE j.status = 'sent' AND j.verified_at IS NULL AND j.sent_at IS NOT NULL AND j.sent_at <= now() - $1::interval ORDER BY j.id LIMIT $2 FOR UPDATE SKIP LOCKED """, _VERIFY_AFTER_SEND, limit, ) n = 0 for j in rows: jid = int(j["id"]) route = await conn.fetchrow( """ SELECT r.*, e.host, e.port, e.protocol FROM ems.signal_route r JOIN ems.site_endpoint e ON e.id = r.endpoint_id WHERE r.id = $1::int AND r.enabled = true """, int(j["route_id"]), ) if route is None: await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = 'abandoned', last_error = 'route missing', verified_at = now() WHERE id = $1::bigint """, jid, ) n += 1 continue dest_type = str(route["destination_type"]) base = _endpoint_base_url( route.get("protocol"), str(route["host"]), route.get("port") ) auth = _loxone_auth() vcfg_raw = route["verify_config_json"] vcfg = ( vcfg_raw if isinstance(vcfg_raw, dict) else (json.loads(vcfg_raw) if vcfg_raw else {}) ) read_url: str | None = None expected = str(j["value_text"]) try: if dest_type == "loxone_vi": io_read = vcfg.get("loxone_io_name") if vcfg else None if not io_read: io_read = str(route["destination_key"]) + "_FB" read_url = f"{base}/dev/sps/io/{io_read}" elif dest_type == "http_rest": read_url = _http_rest_verify_url(base, vcfg) else: read_url = None if not read_url: raise ValueError("verify_config missing read URL") async with httpx.AsyncClient(timeout=8.0) as client: rresp = await client.get(read_url, auth=auth) rresp.raise_for_status() body = rresp.text or "" ok = False read_val: str | None = None if dest_type == "loxone_vi": fv = _parse_loxone_io_value(body) if fv is not None: read_val = str(int(round(fv))) try: ev = float(expected) except ValueError: ev = None if ev is not None and abs(fv - ev) < 0.51: ok = True elif dest_type == "http_rest": ct = (rresp.headers.get("content-type") or "").lower() if "json" in ct: data = rresp.json() jpath = vcfg.get("json_path") or vcfg.get("json_key") if isinstance(jpath, str) and jpath: got = _read_json_path(data, jpath) else: got = data if isinstance(got, bool): read_val = "1" if got else "0" elif isinstance(got, (int, float)): read_val = "1" if float(got) >= 0.5 else "0" elif got is not None: read_val = str(got).strip().lower() else: read_val = None exp_l = expected.strip().lower() if read_val is not None: if read_val in ("true", "on", "1"): read_norm = "1" elif read_val in ("false", "off", "0"): read_norm = "0" else: read_norm = read_val exp_norm = ( "1" if exp_l in ("1", "true", "on") else "0" if exp_l in ("0", "false", "off") else expected ) ok = read_norm == exp_norm else: fv = _parse_loxone_io_value(body) if fv is not None: read_val = str(int(round(fv))) try: ev = float(expected) except ValueError: ev = None ok = ev is not None and abs(fv - ev) < 0.51 if ok: await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = 'verified', verified_at = now(), last_error = NULL WHERE id = $1::bigint """, jid, ) await conn.execute( """ INSERT INTO ems.signal_state ( site_id, signal_code, destination_type, destination_key, last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $5, (SELECT sent_at FROM ems.signal_outbound_journal WHERE id = $6::bigint), now(), now()) ON CONFLICT (site_id, signal_code, destination_type, destination_key) DO UPDATE SET last_sent_value_text = EXCLUDED.last_sent_value_text, last_verified_value_text = EXCLUDED.last_verified_value_text, last_sent_at = EXCLUDED.last_sent_at, last_verified_at = now(), updated_at = now() """, int(j["site_id"]), str(j["signal_code"]), dest_type, str(route["destination_key"]), str(j["value_text"]), jid, ) else: ac_row = await conn.fetchrow( "SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1", jid, ) ac = int(ac_row["attempt_count"] or 0) + 1 st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" delay = min(300, 2 ** min(ac, 8)) await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = $2::text, attempt_count = $3::int, last_error = $4::text, next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END, verified_at = CASE WHEN $2::text != 'queued' THEN now() ELSE NULL END WHERE id = $1::bigint """, jid, st, ac, f"verify mismatch read={read_val!r} expected={expected!r}"[:500], delay, ) except Exception as e: ac_row = await conn.fetchrow( "SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1", jid, ) ac = int(ac_row["attempt_count"] or 0) + 1 st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" delay = min(300, 2 ** min(ac, 8)) await conn.execute( """ UPDATE ems.signal_outbound_journal SET status = $2::text, attempt_count = $3::int, last_error = $4::text, next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END WHERE id = $1::bigint """, jid, st, ac, str(e)[:500], delay, ) n += 1 return n async def run_signal_outbound_send_for_active_sites(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: try: await process_signal_outbound_send(conn, limit=80) except Exception: logger.exception("signal_outbound_send failed") async def run_signal_outbound_verify_for_active_sites(pool: asyncpg.Pool) -> None: async with pool.acquire() as conn: try: await process_signal_outbound_verify(conn, limit=80) except Exception: logger.exception("signal_outbound_verify failed")