implementace LED loxone u zaporncyh cen
This commit is contained in:
714
backend/services/signal_service.py
Normal file
714
backend/services/signal_service.py
Normal file
@@ -0,0 +1,714 @@
|
||||
"""
|
||||
Odchozí signály EMS → Loxone / HTTP (journal, retry, readback verify).
|
||||
|
||||
Kritické řízení výkonu (Deye, EV, TČ) zůstává v Modbus exporteru a modbus_command.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import asyncpg
|
||||
import httpx
|
||||
|
||||
from app.config import get_settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SIGNAL_EXPORT_BAN_ACTIVE = "EXPORT_BAN_ACTIVE"
|
||||
|
||||
# Po úspěšném verify neposílat stejnou hodnotu znovu po tuto dobu (idempotence).
|
||||
_IDEMPOTENCE_TTL = timedelta(minutes=10)
|
||||
# Max pokusů před abandoned (odeslání + verify dohromady řídí attempt_count).
|
||||
_MAX_ATTEMPTS = 12
|
||||
_VERIFY_AFTER_SEND = timedelta(seconds=1)
|
||||
|
||||
|
||||
def _loxone_auth() -> tuple[str, str] | None:
|
||||
settings = get_settings()
|
||||
user = settings.loxone_user or os.getenv("LOXONE_USER") or ""
|
||||
password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or ""
|
||||
return (user, password) if user else None
|
||||
|
||||
|
||||
def _endpoint_base_url(proto: str | None, host: str, port: int | None) -> str:
|
||||
p = (proto or "http").lower()
|
||||
if p not in ("http", "https"):
|
||||
p = "http"
|
||||
prt = int(port or (443 if p == "https" else 80))
|
||||
return f"{p}://{host}:{prt}"
|
||||
|
||||
|
||||
def _bool_to_text(v: bool, transform_json: dict[str, Any] | None) -> str:
|
||||
if transform_json and "map_bool" in transform_json:
|
||||
m = transform_json["map_bool"]
|
||||
if isinstance(m, dict):
|
||||
return str(m.get("true" if v else "false", "1" if v else "0"))
|
||||
return "1" if v else "0"
|
||||
|
||||
|
||||
def _parse_loxone_io_value(body: str) -> float | None:
|
||||
"""Z odpovědi Loxone /dev/sps/io/… vytáhni číselnou hodnotu."""
|
||||
if not body:
|
||||
return None
|
||||
s = body.strip()
|
||||
# často XML nebo prostý text s číslem
|
||||
nums = re.findall(r"-?\d+(?:\.\d+)?", s)
|
||||
if not nums:
|
||||
return None
|
||||
try:
|
||||
return float(nums[-1])
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _http_rest_write_url(
|
||||
base: str, route_config_json: dict[str, Any] | None, value_text: str
|
||||
) -> tuple[str, str]:
|
||||
"""Vrátí (method, url) pro http_rest zápis."""
|
||||
cfg = route_config_json or {}
|
||||
method = str(cfg.get("method", "GET")).upper()
|
||||
path = str(cfg.get("path_template", ""))
|
||||
path = path.replace("{value}", value_text).replace("{v}", value_text)
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
return method, f"{base.rstrip('/')}{path}"
|
||||
|
||||
|
||||
def _http_rest_verify_url(base: str, verify_cfg: dict[str, Any] | None) -> str | None:
|
||||
if not verify_cfg:
|
||||
return None
|
||||
path = str(verify_cfg.get("read_path", ""))
|
||||
if not path:
|
||||
return None
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
return f"{base.rstrip('/')}{path}"
|
||||
|
||||
|
||||
def _read_json_path(data: Any, path: str | None) -> Any:
|
||||
if path is None or path == "" or path == "$":
|
||||
return data
|
||||
if path.startswith("$."):
|
||||
path = path[2:]
|
||||
cur: Any = data
|
||||
for part in path.split("."):
|
||||
if not part:
|
||||
continue
|
||||
if isinstance(cur, dict) and part in cur:
|
||||
cur = cur[part]
|
||||
else:
|
||||
return None
|
||||
return cur
|
||||
|
||||
|
||||
async def compute_export_ban_active(site_id: int, conn: asyncpg.Connection) -> bool:
|
||||
"""
|
||||
Kanonický význam EXPORT_BAN_ACTIVE (LED varianta B).
|
||||
|
||||
True pokud EMS uplatňuje zákaz exportu: no_export, block_export override,
|
||||
režimy bez exportu (SELF_SUSTAIN, CHARGE_CHEAP, PRESERVE), nebo AUTO se záporným
|
||||
výkupem při grid_setpoint_w >= 0 (soulad s _build_setpoints / export_ban), včetně
|
||||
price failsafe (predikovaná cena → pasivní ochrana).
|
||||
"""
|
||||
mode_row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT som.mode_code
|
||||
FROM ems.site_operating_mode som
|
||||
WHERE som.site_id = $1::int
|
||||
""",
|
||||
site_id,
|
||||
)
|
||||
if mode_row is None:
|
||||
return False
|
||||
mode_code = str(mode_row["mode_code"] or "").upper()
|
||||
|
||||
if mode_code == "MANUAL":
|
||||
return False
|
||||
|
||||
if mode_code in ("SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE"):
|
||||
return True
|
||||
|
||||
no_export = await conn.fetchval(
|
||||
"""
|
||||
SELECT COALESCE(sgc.no_export, false)
|
||||
FROM ems.site_grid_connection sgc
|
||||
WHERE sgc.site_id = $1::int
|
||||
""",
|
||||
site_id,
|
||||
)
|
||||
if bool(no_export):
|
||||
return True
|
||||
|
||||
ov = await conn.fetchval(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM ems.site_override o
|
||||
WHERE o.site_id = $1::int
|
||||
AND o.override_type = 'block_export'
|
||||
AND o.valid_from <= now()
|
||||
AND (o.valid_to IS NULL OR o.valid_to > now())
|
||||
LIMIT 1
|
||||
""",
|
||||
site_id,
|
||||
)
|
||||
if ov is not None:
|
||||
return True
|
||||
|
||||
if mode_code != "AUTO":
|
||||
return False
|
||||
|
||||
raw = await conn.fetchval(
|
||||
"""
|
||||
SELECT ems.fn_planning_interval_at_offset($1::int, 0)
|
||||
""",
|
||||
site_id,
|
||||
)
|
||||
if raw is None:
|
||||
return False
|
||||
pi = raw if isinstance(raw, dict) else json.loads(raw)
|
||||
if not pi:
|
||||
return False
|
||||
|
||||
if bool(pi.get("is_predicted_price")):
|
||||
return True
|
||||
|
||||
sell_raw = pi.get("effective_sell_price")
|
||||
grid_sp = int(pi.get("grid_setpoint_w") or 0)
|
||||
if sell_raw is None:
|
||||
return False
|
||||
try:
|
||||
sell_f = float(sell_raw)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
return sell_f < 0 and grid_sp >= 0
|
||||
|
||||
|
||||
async def _should_skip_enqueue(
|
||||
conn: asyncpg.Connection,
|
||||
site_id: int,
|
||||
signal_code: str,
|
||||
destination_type: str,
|
||||
destination_key: str,
|
||||
desired_text: str,
|
||||
) -> bool:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT last_sent_value_text, last_verified_value_text, last_verified_at
|
||||
FROM ems.signal_state
|
||||
WHERE site_id = $1
|
||||
AND signal_code = $2
|
||||
AND destination_type = $3
|
||||
AND destination_key = $4
|
||||
""",
|
||||
site_id,
|
||||
signal_code,
|
||||
destination_type,
|
||||
destination_key,
|
||||
)
|
||||
if row is None:
|
||||
return False
|
||||
if row["last_sent_value_text"] != desired_text:
|
||||
return False
|
||||
if row["last_verified_value_text"] != desired_text:
|
||||
return False
|
||||
lv = row["last_verified_at"]
|
||||
if lv is None:
|
||||
return False
|
||||
if lv.tzinfo is None:
|
||||
lv = lv.replace(tzinfo=timezone.utc)
|
||||
return datetime.now(timezone.utc) - lv < _IDEMPOTENCE_TTL
|
||||
|
||||
|
||||
async def enqueue_site_signals(site_id: int, conn: asyncpg.Connection) -> None:
|
||||
"""Zařadí odchozí řádky pro všechny aktivní routy daného site (po výpočtu signálů)."""
|
||||
export_ban = await compute_export_ban_active(site_id, conn)
|
||||
desired = {SIGNAL_EXPORT_BAN_ACTIVE: export_ban}
|
||||
|
||||
routes = await conn.fetch(
|
||||
"""
|
||||
SELECT r.id, r.site_id, r.destination_type, r.endpoint_id, r.signal_code,
|
||||
r.destination_key, r.transform_json, r.verify_readback, r.verify_config_json,
|
||||
r.route_config_json, r.enabled
|
||||
FROM ems.signal_route r
|
||||
WHERE r.site_id = $1::int AND r.enabled = true
|
||||
""",
|
||||
site_id,
|
||||
)
|
||||
for r in routes:
|
||||
sig = str(r["signal_code"])
|
||||
if sig not in desired:
|
||||
continue
|
||||
dest_type = str(r["destination_type"])
|
||||
dest_key = str(r["destination_key"])
|
||||
tf = r["transform_json"]
|
||||
tfd = tf if isinstance(tf, dict) else (json.loads(tf) if tf else None)
|
||||
val_bool = bool(desired[sig])
|
||||
value_text = _bool_to_text(val_bool, tfd)
|
||||
|
||||
if await _should_skip_enqueue(
|
||||
conn, site_id, sig, dest_type, dest_key, value_text
|
||||
):
|
||||
continue
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO ems.signal_state (
|
||||
site_id, signal_code, destination_type, destination_key,
|
||||
last_desired_value_text, updated_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, now())
|
||||
ON CONFLICT (site_id, signal_code, destination_type, destination_key)
|
||||
DO UPDATE SET
|
||||
last_desired_value_text = EXCLUDED.last_desired_value_text,
|
||||
updated_at = now()
|
||||
""",
|
||||
site_id,
|
||||
sig,
|
||||
dest_type,
|
||||
dest_key,
|
||||
value_text,
|
||||
)
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO ems.signal_outbound_journal (
|
||||
route_id, site_id, signal_code, value_text, value_num, status,
|
||||
attempt_count, next_attempt_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, 'queued', 0, now())
|
||||
""",
|
||||
int(r["id"]),
|
||||
site_id,
|
||||
sig,
|
||||
value_text,
|
||||
1.0 if val_bool else 0.0,
|
||||
)
|
||||
|
||||
|
||||
async def process_signal_outbound_send(
|
||||
conn: asyncpg.Connection, *, limit: int = 30
|
||||
) -> int:
|
||||
"""Odešle až `limit` řádků ve stavu queued. Vrátí počet zpracovaných."""
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text, j.attempt_count
|
||||
FROM ems.signal_outbound_journal j
|
||||
WHERE j.status = 'queued'
|
||||
AND j.next_attempt_at <= now()
|
||||
ORDER BY j.id
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""",
|
||||
limit,
|
||||
)
|
||||
n = 0
|
||||
for j in rows:
|
||||
jid = int(j["id"])
|
||||
route = await conn.fetchrow(
|
||||
"""
|
||||
SELECT r.*, e.host, e.port, e.protocol, e.endpoint_type
|
||||
FROM ems.signal_route r
|
||||
JOIN ems.site_endpoint e ON e.id = r.endpoint_id
|
||||
WHERE r.id = $1::int AND r.enabled = true
|
||||
""",
|
||||
int(j["route_id"]),
|
||||
)
|
||||
if route is None:
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = 'abandoned', last_error = 'route missing or disabled'
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
)
|
||||
n += 1
|
||||
continue
|
||||
|
||||
dest_type = str(route["destination_type"])
|
||||
base = _endpoint_base_url(
|
||||
route.get("protocol"), str(route["host"]), route.get("port")
|
||||
)
|
||||
auth = _loxone_auth()
|
||||
url: str
|
||||
method = "GET"
|
||||
cfg = route["route_config_json"]
|
||||
rcfg = cfg if isinstance(cfg, dict) else (json.loads(cfg) if cfg else None)
|
||||
|
||||
try:
|
||||
if dest_type == "loxone_vi":
|
||||
io_name = str(route["destination_key"])
|
||||
val = str(j["value_text"])
|
||||
url = f"{base}/dev/sps/io/{io_name}/{val}"
|
||||
elif dest_type == "http_rest":
|
||||
method, url = _http_rest_write_url(base, rcfg, str(j["value_text"]))
|
||||
else:
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = 'abandoned',
|
||||
last_error = $2,
|
||||
attempt_count = attempt_count + 1
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
f"unknown destination_type: {dest_type}",
|
||||
)
|
||||
n += 1
|
||||
continue
|
||||
except Exception as e:
|
||||
ac = int(j["attempt_count"]) + 1
|
||||
delay = min(300, 2 ** min(ac, 8))
|
||||
st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued"
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = $2::text,
|
||||
last_error = $3::text,
|
||||
attempt_count = $4::int,
|
||||
next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
st,
|
||||
str(e)[:500],
|
||||
ac,
|
||||
delay,
|
||||
)
|
||||
n += 1
|
||||
continue
|
||||
|
||||
t0 = datetime.now(timezone.utc)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=8.0) as client:
|
||||
if method == "GET":
|
||||
resp = await client.get(url, auth=auth)
|
||||
elif method == "POST":
|
||||
body = None
|
||||
if rcfg and isinstance(rcfg.get("json_body"), dict):
|
||||
body = json.dumps(rcfg["json_body"])
|
||||
resp = await client.post(
|
||||
url,
|
||||
auth=auth,
|
||||
content=body,
|
||||
headers={"Content-Type": "application/json"} if body else None,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"unsupported HTTP method {method}")
|
||||
resp.raise_for_status()
|
||||
body_txt = (resp.text or "")[:2000]
|
||||
except Exception as e:
|
||||
ac = int(j["attempt_count"]) + 1
|
||||
delay = min(300, 2 ** min(ac, 8))
|
||||
st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued"
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = $2::text,
|
||||
attempt_count = $3::int,
|
||||
last_error = $4::text,
|
||||
next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END,
|
||||
http_method = $6::text,
|
||||
request_url = $7::text
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
st,
|
||||
ac,
|
||||
str(e)[:500],
|
||||
delay,
|
||||
method,
|
||||
url,
|
||||
)
|
||||
n += 1
|
||||
continue
|
||||
|
||||
dt_ms = int(
|
||||
(datetime.now(timezone.utc) - t0).total_seconds() * 1000
|
||||
)
|
||||
vr = bool(route["verify_readback"])
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = $2::text,
|
||||
http_method = $3::text,
|
||||
request_url = $4::text,
|
||||
http_status = $5::int,
|
||||
latency_ms = $6::int,
|
||||
response_body_trunc = $7::text,
|
||||
sent_at = now(),
|
||||
last_error = NULL,
|
||||
verified_at = CASE WHEN $2::text = 'verified' THEN now() ELSE NULL END
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
"verified" if not vr else "sent",
|
||||
method,
|
||||
url,
|
||||
200,
|
||||
dt_ms,
|
||||
(body_txt or "")[:500],
|
||||
)
|
||||
if not vr:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO ems.signal_state (
|
||||
site_id, signal_code, destination_type, destination_key,
|
||||
last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $5, now(), now(), now())
|
||||
ON CONFLICT (site_id, signal_code, destination_type, destination_key)
|
||||
DO UPDATE SET
|
||||
last_sent_value_text = EXCLUDED.last_sent_value_text,
|
||||
last_verified_value_text = EXCLUDED.last_verified_value_text,
|
||||
last_sent_at = now(),
|
||||
last_verified_at = now(),
|
||||
updated_at = now()
|
||||
""",
|
||||
int(j["site_id"]),
|
||||
str(j["signal_code"]),
|
||||
dest_type,
|
||||
str(route["destination_key"]),
|
||||
str(j["value_text"]),
|
||||
)
|
||||
n += 1
|
||||
return n
|
||||
|
||||
|
||||
async def process_signal_outbound_verify(
|
||||
conn: asyncpg.Connection, *, limit: int = 30
|
||||
) -> int:
|
||||
"""Ověří řádky ve stavu sent (readback). Vrátí počet zpracovaných."""
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text
|
||||
FROM ems.signal_outbound_journal j
|
||||
WHERE j.status = 'sent'
|
||||
AND j.verified_at IS NULL
|
||||
AND j.sent_at IS NOT NULL
|
||||
AND j.sent_at <= now() - $1::interval
|
||||
ORDER BY j.id
|
||||
LIMIT $2
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""",
|
||||
_VERIFY_AFTER_SEND,
|
||||
limit,
|
||||
)
|
||||
n = 0
|
||||
for j in rows:
|
||||
jid = int(j["id"])
|
||||
route = await conn.fetchrow(
|
||||
"""
|
||||
SELECT r.*, e.host, e.port, e.protocol
|
||||
FROM ems.signal_route r
|
||||
JOIN ems.site_endpoint e ON e.id = r.endpoint_id
|
||||
WHERE r.id = $1::int AND r.enabled = true
|
||||
""",
|
||||
int(j["route_id"]),
|
||||
)
|
||||
if route is None:
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = 'abandoned', last_error = 'route missing', verified_at = now()
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
)
|
||||
n += 1
|
||||
continue
|
||||
|
||||
dest_type = str(route["destination_type"])
|
||||
base = _endpoint_base_url(
|
||||
route.get("protocol"), str(route["host"]), route.get("port")
|
||||
)
|
||||
auth = _loxone_auth()
|
||||
vcfg_raw = route["verify_config_json"]
|
||||
vcfg = (
|
||||
vcfg_raw
|
||||
if isinstance(vcfg_raw, dict)
|
||||
else (json.loads(vcfg_raw) if vcfg_raw else {})
|
||||
)
|
||||
|
||||
read_url: str | None = None
|
||||
expected = str(j["value_text"])
|
||||
try:
|
||||
if dest_type == "loxone_vi":
|
||||
io_read = vcfg.get("loxone_io_name") if vcfg else None
|
||||
if not io_read:
|
||||
io_read = str(route["destination_key"]) + "_FB"
|
||||
read_url = f"{base}/dev/sps/io/{io_read}"
|
||||
elif dest_type == "http_rest":
|
||||
read_url = _http_rest_verify_url(base, vcfg)
|
||||
else:
|
||||
read_url = None
|
||||
|
||||
if not read_url:
|
||||
raise ValueError("verify_config missing read URL")
|
||||
|
||||
async with httpx.AsyncClient(timeout=8.0) as client:
|
||||
rresp = await client.get(read_url, auth=auth)
|
||||
rresp.raise_for_status()
|
||||
body = rresp.text or ""
|
||||
|
||||
ok = False
|
||||
read_val: str | None = None
|
||||
if dest_type == "loxone_vi":
|
||||
fv = _parse_loxone_io_value(body)
|
||||
if fv is not None:
|
||||
read_val = str(int(round(fv)))
|
||||
try:
|
||||
ev = float(expected)
|
||||
except ValueError:
|
||||
ev = None
|
||||
if ev is not None and abs(fv - ev) < 0.51:
|
||||
ok = True
|
||||
elif dest_type == "http_rest":
|
||||
ct = (rresp.headers.get("content-type") or "").lower()
|
||||
if "json" in ct:
|
||||
data = rresp.json()
|
||||
jpath = vcfg.get("json_path") or vcfg.get("json_key")
|
||||
if isinstance(jpath, str) and jpath:
|
||||
got = _read_json_path(data, jpath)
|
||||
else:
|
||||
got = data
|
||||
if isinstance(got, bool):
|
||||
read_val = "1" if got else "0"
|
||||
elif isinstance(got, (int, float)):
|
||||
read_val = "1" if float(got) >= 0.5 else "0"
|
||||
elif got is not None:
|
||||
read_val = str(got).strip().lower()
|
||||
else:
|
||||
read_val = None
|
||||
exp_l = expected.strip().lower()
|
||||
if read_val is not None:
|
||||
if read_val in ("true", "on", "1"):
|
||||
read_norm = "1"
|
||||
elif read_val in ("false", "off", "0"):
|
||||
read_norm = "0"
|
||||
else:
|
||||
read_norm = read_val
|
||||
exp_norm = (
|
||||
"1"
|
||||
if exp_l in ("1", "true", "on")
|
||||
else "0"
|
||||
if exp_l in ("0", "false", "off")
|
||||
else expected
|
||||
)
|
||||
ok = read_norm == exp_norm
|
||||
else:
|
||||
fv = _parse_loxone_io_value(body)
|
||||
if fv is not None:
|
||||
read_val = str(int(round(fv)))
|
||||
try:
|
||||
ev = float(expected)
|
||||
except ValueError:
|
||||
ev = None
|
||||
ok = ev is not None and abs(fv - ev) < 0.51
|
||||
|
||||
if ok:
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = 'verified', verified_at = now(), last_error = NULL
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
)
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO ems.signal_state (
|
||||
site_id, signal_code, destination_type, destination_key,
|
||||
last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $5,
|
||||
(SELECT sent_at FROM ems.signal_outbound_journal WHERE id = $6::bigint),
|
||||
now(), now())
|
||||
ON CONFLICT (site_id, signal_code, destination_type, destination_key)
|
||||
DO UPDATE SET
|
||||
last_sent_value_text = EXCLUDED.last_sent_value_text,
|
||||
last_verified_value_text = EXCLUDED.last_verified_value_text,
|
||||
last_sent_at = EXCLUDED.last_sent_at,
|
||||
last_verified_at = now(),
|
||||
updated_at = now()
|
||||
""",
|
||||
int(j["site_id"]),
|
||||
str(j["signal_code"]),
|
||||
dest_type,
|
||||
str(route["destination_key"]),
|
||||
str(j["value_text"]),
|
||||
jid,
|
||||
)
|
||||
else:
|
||||
ac_row = await conn.fetchrow(
|
||||
"SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1",
|
||||
jid,
|
||||
)
|
||||
ac = int(ac_row["attempt_count"] or 0) + 1
|
||||
st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued"
|
||||
delay = min(300, 2 ** min(ac, 8))
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = $2::text,
|
||||
attempt_count = $3::int,
|
||||
last_error = $4::text,
|
||||
next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END,
|
||||
sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END,
|
||||
verified_at = CASE WHEN $2::text != 'queued' THEN now() ELSE NULL END
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
st,
|
||||
ac,
|
||||
f"verify mismatch read={read_val!r} expected={expected!r}"[:500],
|
||||
delay,
|
||||
)
|
||||
except Exception as e:
|
||||
ac_row = await conn.fetchrow(
|
||||
"SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1",
|
||||
jid,
|
||||
)
|
||||
ac = int(ac_row["attempt_count"] or 0) + 1
|
||||
st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued"
|
||||
delay = min(300, 2 ** min(ac, 8))
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE ems.signal_outbound_journal
|
||||
SET status = $2::text,
|
||||
attempt_count = $3::int,
|
||||
last_error = $4::text,
|
||||
next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END,
|
||||
sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END
|
||||
WHERE id = $1::bigint
|
||||
""",
|
||||
jid,
|
||||
st,
|
||||
ac,
|
||||
str(e)[:500],
|
||||
delay,
|
||||
)
|
||||
n += 1
|
||||
return n
|
||||
|
||||
|
||||
async def run_signal_outbound_send_for_active_sites(pool: asyncpg.Pool) -> None:
|
||||
async with pool.acquire() as conn:
|
||||
try:
|
||||
await process_signal_outbound_send(conn, limit=80)
|
||||
except Exception:
|
||||
logger.exception("signal_outbound_send failed")
|
||||
|
||||
|
||||
async def run_signal_outbound_verify_for_active_sites(pool: asyncpg.Pool) -> None:
|
||||
async with pool.acquire() as conn:
|
||||
try:
|
||||
await process_signal_outbound_verify(conn, limit=80)
|
||||
except Exception:
|
||||
logger.exception("signal_outbound_verify failed")
|
||||
Reference in New Issue
Block a user