diff --git a/CLAUDE.md b/CLAUDE.md index fe8edb8..b445c5d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -153,6 +153,10 @@ Projekt je **SQL-first**: doménová logika, agregace, joiny mezi tabulkami a st | `ev_session` | Nabíjecí session na WB (deadline, energie, náklady). | | `ev_arrival_stats` | Agregované počty příjezdů EV podle dne v týdnu a hodiny (Europe/Prague); plní se z detekce příjezdu v telemetrii. | | `modbus_command` | Journal Modbus zápisů (pending → written → verified / mismatch / failed); retry a vazba na `planning_run`; u Deye exportu `deye_physical_mode` (PASSIVE/SELL/CHARGE). | +| `signal_def` | Katalog odchozích signálů (kód, typ hodnoty); seed `EXPORT_BAN_ACTIVE`. | +| `signal_route` | Mapování signál → cíl (`loxone_vi`, `http_rest`) per site + `endpoint_id` + volitelný `route_config_json` / `verify_config_json`. | +| `signal_outbound_journal` | Journal HTTP odeslání signálů (`queued` → `sent` → `verified` / retry / `abandoned`). | +| `signal_state` | Poslední požadovaná / odeslaná / ověřená hodnota na cíli (idempotence). | | `cutoff_switch_log` | Log přepnutí cut-off přepínačů (mikroinvertory); edge trigger, důvod a cena. | **View / funkce (nejsou tabulky):** `vw_site_effective_price`, `vw_site_directory`, `vw_modbus_last_verified`, `vw_asset_inverter_modbus_poll`, `vw_asset_ev_charger_modbus_poll`, `vw_asset_heat_pump_modbus_poll`, `vw_latest_telemetry`, `vw_telemetry_hourly_7d`, `vw_telemetry_15m_7d` (15min agregát pro dashboard sloty; repeatable `R__071_vw_telemetry_15m_7d.sql`), `vw_audit_summary`, `vw_operating_mode`, `vw_forecast_accuracy_by_lead_time`, `vw_forecast_accuracy_daily`; `fn_effective_price`, `fn_green_bonus_revenue`, `fn_cop_estimate`, `fn_fill_audit_interval`, `fn_fill_forecast_accuracy`, `fn_set_mode`, `fn_expire_modes` (vrací řádky přepnutí pro Discord), `fn_restore_previous_mode`, `fn_update_ev_arrival_stats`, `fn_ev_expected_arrival`, `fn_update_baseline_stats`, `fn_get_baseline_forecast`, `fn_update_market_price_stats`, `fn_update_tuv_usage_stats`, `fn_get_predicted_price`, dále read-modely: `fn_site_configuration`, `fn_site_full_status`, `fn_site_notifications_context`, `fn_plan_current_bundle`, `fn_planning_run_horizon`, `fn_planning_future_price_days`, `fn_economics_daily_month`, `fn_economics_monthly_chart`, `fn_economics_lock_day`, `fn_economics_unlock_day`, `fn_energy_flows_daily_month`, `fn_energy_flows_intervals_day`, `fn_forecast_pv_split`, `fn_ev_sessions_active`, `fn_ev_session_apply_patch`, `fn_ev_arrival_prediction_bundle`, `fn_ev_session_transition`, `fn_negative_price_predictions`, `fn_latest_ote_day_stats`, `fn_ote_day_slot_stats_prague`, `fn_ote_list_missing_days`, `fn_site_effective_prices_day_prague`, `fn_modbus_journal_list`, `fn_modbus_written_command_ids`, `fn_modbus_commands_by_ids`, `fn_inverter_modbus_caps_patch`, `fn_set_mode_with_context`, `fn_fill_audit_for_site_window`, plánování: `fn_load_planning_slots_full`, `fn_last_effective_ote`, `fn_planning_horizon_end`, `fn_planning_site_context`, `fn_pv_forecast_correction_factor`, `fn_planning_run_commit`, `fn_planning_slot_boundary_prague`, `fn_planning_interval_at_offset`, `fn_telemetry_inverter_sample`, `fn_telemetry_ev_charger_sample`, `fn_telemetry_heat_pump_sample`, `fn_battery_cycle_audit`, Deye helpery: `fn_deye_pack_system_time`, `fn_deye_clock_drift_sec`, `fn_deye_time_point_regs`, `fn_deye_tou_inactive_signature`, `fn_modbus_last_verified_map`. @@ -172,6 +176,7 @@ Specifikace z `docs/02-architecture.md`, modulových docs a komentářů v `plan | `run_rolling_replan` | **každých 15 min** (`*/15`) | `planning_engine.py` – přepočet od aktuálního slotu | | `control_exporter` | **každých 15 min** (slot boundary) | `docs/04-modules/control.md` | | `verify_modbus` | **každé 2 min** | Ověření `modbus_command` ve stavu `written` (posledních 10 min); viz `docs/04-modules/modbus-command-journal.md` | +| `signal_outbound_send` / `signal_outbound_verify` | **každých 15 s** | `services/signal_service.py` — odeslání fronty `signal_outbound_journal` a readback verify (Loxone / HTTP REST). | | `audit_filler` / `fn_fill_audit_interval` | **každých 15 min** | `docs/02-architecture.md`, DB `fn_fill_audit_interval` | | `forecast_accuracy` / `fn_fill_forecast_accuracy` | **každých 15 min** (min. 2,17,32,47) | Po audit filleru; doplní actual z telemetrie do `forecast_accuracy` | | `fn_update_baseline_stats` | **00:30** denně | Aktualizace `consumption_baseline_stats` z telemetrie (30d lookback) | diff --git a/backend/app/lifespan.py b/backend/app/lifespan.py index 2b9f9d1..75d445e 100644 --- a/backend/app/lifespan.py +++ b/backend/app/lifespan.py @@ -26,6 +26,10 @@ from services.heartbeat_service import send_heartbeat from services.notification_service import notify_operating_mode_changed from services.price_importer import import_ote_prices, ote_prague_day_slots_look_complete from services.telemetry_collector import run_telemetry_loop_wrapper +from services.signal_service import ( + run_signal_outbound_send_for_active_sites, + run_signal_outbound_verify_for_active_sites, +) logger = logging.getLogger(__name__) @@ -145,6 +149,18 @@ async def lifespan(app: FastAPI): "scheduled_control_export site=%s: %s", site["id"], e ) + async def scheduled_signal_outbound_send() -> None: + try: + await run_signal_outbound_send_for_active_sites(app.state.pg_pool) + except Exception: + logger.exception("scheduled_signal_outbound_send failed") + + async def scheduled_signal_outbound_verify() -> None: + try: + await run_signal_outbound_verify_for_active_sites(app.state.pg_pool) + except Exception: + logger.exception("scheduled_signal_outbound_verify failed") + async def scheduled_verify_modbus() -> None: """ Ověří příkazy ve stavu written z posledních 20 minut. @@ -362,6 +378,20 @@ async def lifespan(app: FastAPI): id="verify_modbus", replace_existing=True, ) + scheduler.add_job( + scheduled_signal_outbound_send, + "interval", + seconds=15, + id="signal_outbound_send", + replace_existing=True, + ) + scheduler.add_job( + scheduled_signal_outbound_verify, + "interval", + seconds=15, + id="signal_outbound_verify", + replace_existing=True, + ) scheduler.add_job(scheduled_daily_plan, "cron", hour=15, minute=0, id="daily_plan") scheduler.add_job( scheduled_rolling_replan, diff --git a/backend/services/control/exporter_monolith.py b/backend/services/control/exporter_monolith.py index 5f2c73d..6134c22 100644 --- a/backend/services/control/exporter_monolith.py +++ b/backend/services/control/exporter_monolith.py @@ -17,6 +17,7 @@ import httpx from app.config import get_settings from services.modbus_client import get_modbus_client +from services.signal_service import enqueue_site_signals logger = logging.getLogger(__name__) @@ -1952,98 +1953,106 @@ async def export_setpoints(site_id: int, db: asyncpg.Connection) -> None: logger.info("control export site=%s: MANUAL, skip writes", site_id) return - pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0) - pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1) - sp_now = _build_setpoints(mode, pi_now) - sp_next = _build_setpoints(mode, pi_next) + try: + pi_now = await _fetch_plan_row_for_slot_offset(site_id, db, 0) + pi_next = await _fetch_plan_row_for_slot_offset(site_id, db, 1) + sp_now = _build_setpoints(mode, pi_now) + sp_next = _build_setpoints(mode, pi_next) - if mode.mode_code == "AUTO" and sp_now is None: - if pi_now is None: + if mode.mode_code == "AUTO" and sp_now is None: + if pi_now is None: + logger.warning( + "control export site=%s: AUTO but no planning_interval for current slot, skip", + site_id, + ) + return + + if sp_now is None: logger.warning( - "control export site=%s: AUTO but no planning_interval for current slot, skip", + "control export site=%s: no setpoints for mode %s, skip", site_id, + mode.mode_code, ) - return + return - if sp_now is None: - logger.warning( - "control export site=%s: no setpoints for mode %s, skip", - site_id, - mode.mode_code, - ) - return - - if mode.mode_code == "CHARGE_CHEAP": - max_ch = await _fetch_max_charge_power_w(site_id, db) - # Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB. - pw = max(1, int(max_ch)) - sp_now = ControlSetpoints( - battery_w=pw, - grid_export_limit=0, - ev1_current_a=0, - ev2_current_a=0, - heat_pump_enable=False, - grid_setpoint_w=pw, - ev1_power_w=0, - ev2_power_w=0, - target_soc_pct=None, - effective_sell_price_czk_kwh=None, - ) - sp_next = sp_now - else: - sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now) - if sp_next is not None: - sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next) - - planning_run_id = await db.fetchval( - """ - SELECT id FROM ems.planning_run - WHERE site_id = $1 AND status = 'active' - ORDER BY created_at DESC - LIMIT 1 - """, - site_id, - ) - if planning_run_id is not None: - planning_run_id = int(planning_run_id) - - try: - inv_res = await write_inverter_setpoints( - site_id, sp_now, sp_next, db, planning_run_id=planning_run_id - ) - except Exception as e: - logger.error("inverter write failed: %s", e) - inv_res = f"FAIL inverter: {e}" - - try: - ev_res = await write_ev_setpoints(site_id, sp_now, db) - except Exception as e: - logger.error("ev write failed: %s", e) - ev_res = f"FAIL ev: {e}" - - try: - hp_res = await write_heat_pump_setpoint(site_id, sp_now, db) - except Exception as e: - logger.error("hp write failed: %s", e) - hp_res = f"FAIL heat pump: {e}" - - try: - lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db) - except Exception as e: - logger.error("loxone write failed: %s", e) - lox_res = f"FAIL Loxone: {e}" - - results = list( - zip( - ("inverter", "ev", "heat_pump", "loxone"), - (inv_res, ev_res, hp_res, lox_res), - ) - ) - - for name, res in results: - if isinstance(res, Exception): - logger.error("control export site=%s %s: FAIL %s", site_id, name, res) - elif isinstance(res, str) and res.startswith("FAIL"): - logger.error("control export site=%s %s: %s", site_id, name, res) + if mode.mode_code == "CHARGE_CHEAP": + max_ch = await _fetch_max_charge_power_w(site_id, db) + # Oba setpointy kladné → get_deye_mode CHARGE; min. 1 W, aby režim nebyl PASSIVE při nulové DB. + pw = max(1, int(max_ch)) + sp_now = ControlSetpoints( + battery_w=pw, + grid_export_limit=0, + ev1_current_a=0, + ev2_current_a=0, + heat_pump_enable=False, + grid_setpoint_w=pw, + ev1_power_w=0, + ev2_power_w=0, + target_soc_pct=None, + effective_sell_price_czk_kwh=None, + ) + sp_next = sp_now else: - logger.info("control export site=%s %s: %s", site_id, name, res) + sp_now = _apply_price_failsafe_guard(site_id, mode, pi_now, sp_now) + if sp_next is not None: + sp_next = _apply_price_failsafe_guard(site_id, mode, pi_next, sp_next) + + planning_run_id = await db.fetchval( + """ + SELECT id FROM ems.planning_run + WHERE site_id = $1 AND status = 'active' + ORDER BY created_at DESC + LIMIT 1 + """, + site_id, + ) + if planning_run_id is not None: + planning_run_id = int(planning_run_id) + + try: + inv_res = await write_inverter_setpoints( + site_id, sp_now, sp_next, db, planning_run_id=planning_run_id + ) + except Exception as e: + logger.error("inverter write failed: %s", e) + inv_res = f"FAIL inverter: {e}" + + try: + ev_res = await write_ev_setpoints(site_id, sp_now, db) + except Exception as e: + logger.error("ev write failed: %s", e) + ev_res = f"FAIL ev: {e}" + + try: + hp_res = await write_heat_pump_setpoint(site_id, sp_now, db) + except Exception as e: + logger.error("hp write failed: %s", e) + hp_res = f"FAIL heat pump: {e}" + + try: + lox_res = await send_loxone_setpoints(site_id, sp_now, mode, db) + except Exception as e: + logger.error("loxone write failed: %s", e) + lox_res = f"FAIL Loxone: {e}" + + results = list( + zip( + ("inverter", "ev", "heat_pump", "loxone"), + (inv_res, ev_res, hp_res, lox_res), + ) + ) + + for name, res in results: + if isinstance(res, Exception): + logger.error("control export site=%s %s: FAIL %s", site_id, name, res) + elif isinstance(res, str) and res.startswith("FAIL"): + logger.error("control export site=%s %s: %s", site_id, name, res) + else: + logger.info("control export site=%s %s: %s", site_id, name, res) + finally: + try: + await enqueue_site_signals(site_id, db) + except Exception as e: + logger.warning( + "control export site=%s: signal enqueue failed: %s", site_id, e + ) diff --git a/backend/services/signal_service.py b/backend/services/signal_service.py new file mode 100644 index 0000000..e1f593c --- /dev/null +++ b/backend/services/signal_service.py @@ -0,0 +1,714 @@ +""" +Odchozí signály EMS → Loxone / HTTP (journal, retry, readback verify). + +Kritické řízení výkonu (Deye, EV, TČ) zůstává v Modbus exporteru a modbus_command. +""" + +from __future__ import annotations + +import json +import logging +import os +import re +from datetime import datetime, timedelta, timezone +from typing import Any + +import asyncpg +import httpx + +from app.config import get_settings + +logger = logging.getLogger(__name__) + +SIGNAL_EXPORT_BAN_ACTIVE = "EXPORT_BAN_ACTIVE" + +# Po úspěšném verify neposílat stejnou hodnotu znovu po tuto dobu (idempotence). +_IDEMPOTENCE_TTL = timedelta(minutes=10) +# Max pokusů před abandoned (odeslání + verify dohromady řídí attempt_count). +_MAX_ATTEMPTS = 12 +_VERIFY_AFTER_SEND = timedelta(seconds=1) + + +def _loxone_auth() -> tuple[str, str] | None: + settings = get_settings() + user = settings.loxone_user or os.getenv("LOXONE_USER") or "" + password = settings.loxone_password or os.getenv("LOXONE_PASSWORD") or "" + return (user, password) if user else None + + +def _endpoint_base_url(proto: str | None, host: str, port: int | None) -> str: + p = (proto or "http").lower() + if p not in ("http", "https"): + p = "http" + prt = int(port or (443 if p == "https" else 80)) + return f"{p}://{host}:{prt}" + + +def _bool_to_text(v: bool, transform_json: dict[str, Any] | None) -> str: + if transform_json and "map_bool" in transform_json: + m = transform_json["map_bool"] + if isinstance(m, dict): + return str(m.get("true" if v else "false", "1" if v else "0")) + return "1" if v else "0" + + +def _parse_loxone_io_value(body: str) -> float | None: + """Z odpovědi Loxone /dev/sps/io/… vytáhni číselnou hodnotu.""" + if not body: + return None + s = body.strip() + # často XML nebo prostý text s číslem + nums = re.findall(r"-?\d+(?:\.\d+)?", s) + if not nums: + return None + try: + return float(nums[-1]) + except ValueError: + return None + + +def _http_rest_write_url( + base: str, route_config_json: dict[str, Any] | None, value_text: str +) -> tuple[str, str]: + """Vrátí (method, url) pro http_rest zápis.""" + cfg = route_config_json or {} + method = str(cfg.get("method", "GET")).upper() + path = str(cfg.get("path_template", "")) + path = path.replace("{value}", value_text).replace("{v}", value_text) + if not path.startswith("/"): + path = "/" + path + return method, f"{base.rstrip('/')}{path}" + + +def _http_rest_verify_url(base: str, verify_cfg: dict[str, Any] | None) -> str | None: + if not verify_cfg: + return None + path = str(verify_cfg.get("read_path", "")) + if not path: + return None + if not path.startswith("/"): + path = "/" + path + return f"{base.rstrip('/')}{path}" + + +def _read_json_path(data: Any, path: str | None) -> Any: + if path is None or path == "" or path == "$": + return data + if path.startswith("$."): + path = path[2:] + cur: Any = data + for part in path.split("."): + if not part: + continue + if isinstance(cur, dict) and part in cur: + cur = cur[part] + else: + return None + return cur + + +async def compute_export_ban_active(site_id: int, conn: asyncpg.Connection) -> bool: + """ + Kanonický význam EXPORT_BAN_ACTIVE (LED varianta B). + + True pokud EMS uplatňuje zákaz exportu: no_export, block_export override, + režimy bez exportu (SELF_SUSTAIN, CHARGE_CHEAP, PRESERVE), nebo AUTO se záporným + výkupem při grid_setpoint_w >= 0 (soulad s _build_setpoints / export_ban), včetně + price failsafe (predikovaná cena → pasivní ochrana). + """ + mode_row = await conn.fetchrow( + """ + SELECT som.mode_code + FROM ems.site_operating_mode som + WHERE som.site_id = $1::int + """, + site_id, + ) + if mode_row is None: + return False + mode_code = str(mode_row["mode_code"] or "").upper() + + if mode_code == "MANUAL": + return False + + if mode_code in ("SELF_SUSTAIN", "CHARGE_CHEAP", "PRESERVE"): + return True + + no_export = await conn.fetchval( + """ + SELECT COALESCE(sgc.no_export, false) + FROM ems.site_grid_connection sgc + WHERE sgc.site_id = $1::int + """, + site_id, + ) + if bool(no_export): + return True + + ov = await conn.fetchval( + """ + SELECT 1 + FROM ems.site_override o + WHERE o.site_id = $1::int + AND o.override_type = 'block_export' + AND o.valid_from <= now() + AND (o.valid_to IS NULL OR o.valid_to > now()) + LIMIT 1 + """, + site_id, + ) + if ov is not None: + return True + + if mode_code != "AUTO": + return False + + raw = await conn.fetchval( + """ + SELECT ems.fn_planning_interval_at_offset($1::int, 0) + """, + site_id, + ) + if raw is None: + return False + pi = raw if isinstance(raw, dict) else json.loads(raw) + if not pi: + return False + + if bool(pi.get("is_predicted_price")): + return True + + sell_raw = pi.get("effective_sell_price") + grid_sp = int(pi.get("grid_setpoint_w") or 0) + if sell_raw is None: + return False + try: + sell_f = float(sell_raw) + except (TypeError, ValueError): + return False + return sell_f < 0 and grid_sp >= 0 + + +async def _should_skip_enqueue( + conn: asyncpg.Connection, + site_id: int, + signal_code: str, + destination_type: str, + destination_key: str, + desired_text: str, +) -> bool: + row = await conn.fetchrow( + """ + SELECT last_sent_value_text, last_verified_value_text, last_verified_at + FROM ems.signal_state + WHERE site_id = $1 + AND signal_code = $2 + AND destination_type = $3 + AND destination_key = $4 + """, + site_id, + signal_code, + destination_type, + destination_key, + ) + if row is None: + return False + if row["last_sent_value_text"] != desired_text: + return False + if row["last_verified_value_text"] != desired_text: + return False + lv = row["last_verified_at"] + if lv is None: + return False + if lv.tzinfo is None: + lv = lv.replace(tzinfo=timezone.utc) + return datetime.now(timezone.utc) - lv < _IDEMPOTENCE_TTL + + +async def enqueue_site_signals(site_id: int, conn: asyncpg.Connection) -> None: + """Zařadí odchozí řádky pro všechny aktivní routy daného site (po výpočtu signálů).""" + export_ban = await compute_export_ban_active(site_id, conn) + desired = {SIGNAL_EXPORT_BAN_ACTIVE: export_ban} + + routes = await conn.fetch( + """ + SELECT r.id, r.site_id, r.destination_type, r.endpoint_id, r.signal_code, + r.destination_key, r.transform_json, r.verify_readback, r.verify_config_json, + r.route_config_json, r.enabled + FROM ems.signal_route r + WHERE r.site_id = $1::int AND r.enabled = true + """, + site_id, + ) + for r in routes: + sig = str(r["signal_code"]) + if sig not in desired: + continue + dest_type = str(r["destination_type"]) + dest_key = str(r["destination_key"]) + tf = r["transform_json"] + tfd = tf if isinstance(tf, dict) else (json.loads(tf) if tf else None) + val_bool = bool(desired[sig]) + value_text = _bool_to_text(val_bool, tfd) + + if await _should_skip_enqueue( + conn, site_id, sig, dest_type, dest_key, value_text + ): + continue + + await conn.execute( + """ + INSERT INTO ems.signal_state ( + site_id, signal_code, destination_type, destination_key, + last_desired_value_text, updated_at + ) + VALUES ($1, $2, $3, $4, $5, now()) + ON CONFLICT (site_id, signal_code, destination_type, destination_key) + DO UPDATE SET + last_desired_value_text = EXCLUDED.last_desired_value_text, + updated_at = now() + """, + site_id, + sig, + dest_type, + dest_key, + value_text, + ) + + await conn.execute( + """ + INSERT INTO ems.signal_outbound_journal ( + route_id, site_id, signal_code, value_text, value_num, status, + attempt_count, next_attempt_at + ) + VALUES ($1, $2, $3, $4, $5, 'queued', 0, now()) + """, + int(r["id"]), + site_id, + sig, + value_text, + 1.0 if val_bool else 0.0, + ) + + +async def process_signal_outbound_send( + conn: asyncpg.Connection, *, limit: int = 30 +) -> int: + """Odešle až `limit` řádků ve stavu queued. Vrátí počet zpracovaných.""" + rows = await conn.fetch( + """ + SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text, j.attempt_count + FROM ems.signal_outbound_journal j + WHERE j.status = 'queued' + AND j.next_attempt_at <= now() + ORDER BY j.id + LIMIT $1 + FOR UPDATE SKIP LOCKED + """, + limit, + ) + n = 0 + for j in rows: + jid = int(j["id"]) + route = await conn.fetchrow( + """ + SELECT r.*, e.host, e.port, e.protocol, e.endpoint_type + FROM ems.signal_route r + JOIN ems.site_endpoint e ON e.id = r.endpoint_id + WHERE r.id = $1::int AND r.enabled = true + """, + int(j["route_id"]), + ) + if route is None: + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = 'abandoned', last_error = 'route missing or disabled' + WHERE id = $1::bigint + """, + jid, + ) + n += 1 + continue + + dest_type = str(route["destination_type"]) + base = _endpoint_base_url( + route.get("protocol"), str(route["host"]), route.get("port") + ) + auth = _loxone_auth() + url: str + method = "GET" + cfg = route["route_config_json"] + rcfg = cfg if isinstance(cfg, dict) else (json.loads(cfg) if cfg else None) + + try: + if dest_type == "loxone_vi": + io_name = str(route["destination_key"]) + val = str(j["value_text"]) + url = f"{base}/dev/sps/io/{io_name}/{val}" + elif dest_type == "http_rest": + method, url = _http_rest_write_url(base, rcfg, str(j["value_text"])) + else: + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = 'abandoned', + last_error = $2, + attempt_count = attempt_count + 1 + WHERE id = $1::bigint + """, + jid, + f"unknown destination_type: {dest_type}", + ) + n += 1 + continue + except Exception as e: + ac = int(j["attempt_count"]) + 1 + delay = min(300, 2 ** min(ac, 8)) + st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = $2::text, + last_error = $3::text, + attempt_count = $4::int, + next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END + WHERE id = $1::bigint + """, + jid, + st, + str(e)[:500], + ac, + delay, + ) + n += 1 + continue + + t0 = datetime.now(timezone.utc) + try: + async with httpx.AsyncClient(timeout=8.0) as client: + if method == "GET": + resp = await client.get(url, auth=auth) + elif method == "POST": + body = None + if rcfg and isinstance(rcfg.get("json_body"), dict): + body = json.dumps(rcfg["json_body"]) + resp = await client.post( + url, + auth=auth, + content=body, + headers={"Content-Type": "application/json"} if body else None, + ) + else: + raise ValueError(f"unsupported HTTP method {method}") + resp.raise_for_status() + body_txt = (resp.text or "")[:2000] + except Exception as e: + ac = int(j["attempt_count"]) + 1 + delay = min(300, 2 ** min(ac, 8)) + st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = $2::text, + attempt_count = $3::int, + last_error = $4::text, + next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, + http_method = $6::text, + request_url = $7::text + WHERE id = $1::bigint + """, + jid, + st, + ac, + str(e)[:500], + delay, + method, + url, + ) + n += 1 + continue + + dt_ms = int( + (datetime.now(timezone.utc) - t0).total_seconds() * 1000 + ) + vr = bool(route["verify_readback"]) + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = $2::text, + http_method = $3::text, + request_url = $4::text, + http_status = $5::int, + latency_ms = $6::int, + response_body_trunc = $7::text, + sent_at = now(), + last_error = NULL, + verified_at = CASE WHEN $2::text = 'verified' THEN now() ELSE NULL END + WHERE id = $1::bigint + """, + jid, + "verified" if not vr else "sent", + method, + url, + 200, + dt_ms, + (body_txt or "")[:500], + ) + if not vr: + await conn.execute( + """ + INSERT INTO ems.signal_state ( + site_id, signal_code, destination_type, destination_key, + last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $5, now(), now(), now()) + ON CONFLICT (site_id, signal_code, destination_type, destination_key) + DO UPDATE SET + last_sent_value_text = EXCLUDED.last_sent_value_text, + last_verified_value_text = EXCLUDED.last_verified_value_text, + last_sent_at = now(), + last_verified_at = now(), + updated_at = now() + """, + int(j["site_id"]), + str(j["signal_code"]), + dest_type, + str(route["destination_key"]), + str(j["value_text"]), + ) + n += 1 + return n + + +async def process_signal_outbound_verify( + conn: asyncpg.Connection, *, limit: int = 30 +) -> int: + """Ověří řádky ve stavu sent (readback). Vrátí počet zpracovaných.""" + rows = await conn.fetch( + """ + SELECT j.id, j.route_id, j.site_id, j.signal_code, j.value_text + FROM ems.signal_outbound_journal j + WHERE j.status = 'sent' + AND j.verified_at IS NULL + AND j.sent_at IS NOT NULL + AND j.sent_at <= now() - $1::interval + ORDER BY j.id + LIMIT $2 + FOR UPDATE SKIP LOCKED + """, + _VERIFY_AFTER_SEND, + limit, + ) + n = 0 + for j in rows: + jid = int(j["id"]) + route = await conn.fetchrow( + """ + SELECT r.*, e.host, e.port, e.protocol + FROM ems.signal_route r + JOIN ems.site_endpoint e ON e.id = r.endpoint_id + WHERE r.id = $1::int AND r.enabled = true + """, + int(j["route_id"]), + ) + if route is None: + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = 'abandoned', last_error = 'route missing', verified_at = now() + WHERE id = $1::bigint + """, + jid, + ) + n += 1 + continue + + dest_type = str(route["destination_type"]) + base = _endpoint_base_url( + route.get("protocol"), str(route["host"]), route.get("port") + ) + auth = _loxone_auth() + vcfg_raw = route["verify_config_json"] + vcfg = ( + vcfg_raw + if isinstance(vcfg_raw, dict) + else (json.loads(vcfg_raw) if vcfg_raw else {}) + ) + + read_url: str | None = None + expected = str(j["value_text"]) + try: + if dest_type == "loxone_vi": + io_read = vcfg.get("loxone_io_name") if vcfg else None + if not io_read: + io_read = str(route["destination_key"]) + "_FB" + read_url = f"{base}/dev/sps/io/{io_read}" + elif dest_type == "http_rest": + read_url = _http_rest_verify_url(base, vcfg) + else: + read_url = None + + if not read_url: + raise ValueError("verify_config missing read URL") + + async with httpx.AsyncClient(timeout=8.0) as client: + rresp = await client.get(read_url, auth=auth) + rresp.raise_for_status() + body = rresp.text or "" + + ok = False + read_val: str | None = None + if dest_type == "loxone_vi": + fv = _parse_loxone_io_value(body) + if fv is not None: + read_val = str(int(round(fv))) + try: + ev = float(expected) + except ValueError: + ev = None + if ev is not None and abs(fv - ev) < 0.51: + ok = True + elif dest_type == "http_rest": + ct = (rresp.headers.get("content-type") or "").lower() + if "json" in ct: + data = rresp.json() + jpath = vcfg.get("json_path") or vcfg.get("json_key") + if isinstance(jpath, str) and jpath: + got = _read_json_path(data, jpath) + else: + got = data + if isinstance(got, bool): + read_val = "1" if got else "0" + elif isinstance(got, (int, float)): + read_val = "1" if float(got) >= 0.5 else "0" + elif got is not None: + read_val = str(got).strip().lower() + else: + read_val = None + exp_l = expected.strip().lower() + if read_val is not None: + if read_val in ("true", "on", "1"): + read_norm = "1" + elif read_val in ("false", "off", "0"): + read_norm = "0" + else: + read_norm = read_val + exp_norm = ( + "1" + if exp_l in ("1", "true", "on") + else "0" + if exp_l in ("0", "false", "off") + else expected + ) + ok = read_norm == exp_norm + else: + fv = _parse_loxone_io_value(body) + if fv is not None: + read_val = str(int(round(fv))) + try: + ev = float(expected) + except ValueError: + ev = None + ok = ev is not None and abs(fv - ev) < 0.51 + + if ok: + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = 'verified', verified_at = now(), last_error = NULL + WHERE id = $1::bigint + """, + jid, + ) + await conn.execute( + """ + INSERT INTO ems.signal_state ( + site_id, signal_code, destination_type, destination_key, + last_sent_value_text, last_verified_value_text, last_sent_at, last_verified_at, updated_at + ) + VALUES ($1, $2, $3, $4, $5, $5, + (SELECT sent_at FROM ems.signal_outbound_journal WHERE id = $6::bigint), + now(), now()) + ON CONFLICT (site_id, signal_code, destination_type, destination_key) + DO UPDATE SET + last_sent_value_text = EXCLUDED.last_sent_value_text, + last_verified_value_text = EXCLUDED.last_verified_value_text, + last_sent_at = EXCLUDED.last_sent_at, + last_verified_at = now(), + updated_at = now() + """, + int(j["site_id"]), + str(j["signal_code"]), + dest_type, + str(route["destination_key"]), + str(j["value_text"]), + jid, + ) + else: + ac_row = await conn.fetchrow( + "SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1", + jid, + ) + ac = int(ac_row["attempt_count"] or 0) + 1 + st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" + delay = min(300, 2 ** min(ac, 8)) + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = $2::text, + attempt_count = $3::int, + last_error = $4::text, + next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, + sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END, + verified_at = CASE WHEN $2::text != 'queued' THEN now() ELSE NULL END + WHERE id = $1::bigint + """, + jid, + st, + ac, + f"verify mismatch read={read_val!r} expected={expected!r}"[:500], + delay, + ) + except Exception as e: + ac_row = await conn.fetchrow( + "SELECT attempt_count FROM ems.signal_outbound_journal WHERE id = $1", + jid, + ) + ac = int(ac_row["attempt_count"] or 0) + 1 + st = "abandoned" if ac >= _MAX_ATTEMPTS else "queued" + delay = min(300, 2 ** min(ac, 8)) + await conn.execute( + """ + UPDATE ems.signal_outbound_journal + SET status = $2::text, + attempt_count = $3::int, + last_error = $4::text, + next_attempt_at = CASE WHEN $2::text = 'queued' THEN now() + ($5::int * interval '1 second') ELSE next_attempt_at END, + sent_at = CASE WHEN $2::text = 'queued' THEN NULL ELSE sent_at END + WHERE id = $1::bigint + """, + jid, + st, + ac, + str(e)[:500], + delay, + ) + n += 1 + return n + + +async def run_signal_outbound_send_for_active_sites(pool: asyncpg.Pool) -> None: + async with pool.acquire() as conn: + try: + await process_signal_outbound_send(conn, limit=80) + except Exception: + logger.exception("signal_outbound_send failed") + + +async def run_signal_outbound_verify_for_active_sites(pool: asyncpg.Pool) -> None: + async with pool.acquire() as conn: + try: + await process_signal_outbound_verify(conn, limit=80) + except Exception: + logger.exception("signal_outbound_verify failed") diff --git a/db/migration/V064__signal_outbound.sql b/db/migration/V064__signal_outbound.sql new file mode 100644 index 0000000..6f440ca --- /dev/null +++ b/db/migration/V064__signal_outbound.sql @@ -0,0 +1,122 @@ +-- Signály EMS → externí cíle (Loxone VI, HTTP REST), journal + idempotence + verify readback. +-- Kritické řízení výkonu (Deye, EV, TČ) zůstává v modbus_command / exporteru. + +-- ------------------------------------------------------------ +-- Definice signálů (globální katalog kódů) +-- ------------------------------------------------------------ +CREATE TABLE ems.signal_def ( + code TEXT PRIMARY KEY, + value_type TEXT NOT NULL, + description TEXT +); + +COMMENT ON TABLE ems.signal_def IS +'Katalog signálů EMS (logické výstupy). Hodnotu pro route počítá backend dle doménové logiky.'; + +COMMENT ON COLUMN ems.signal_def.code IS +'Unikátní kód signálu, např. EXPORT_BAN_ACTIVE.'; + +COMMENT ON COLUMN ems.signal_def.value_type IS +'bool | int | float | string — očekávaný typ hodnoty po transformaci na cíl.'; + +INSERT INTO ems.signal_def (code, value_type, description) +VALUES ( + 'EXPORT_BAN_ACTIVE', + 'bool', + 'Pravda pokud EMS aktuálně uplatňuje zákaz exportu do sítě (LED varianta B): override block_export, no_export, režimy bez exportu, AUTO se záporným výkupem při ne-negativním grid setpointu.' +) +ON CONFLICT (code) DO NOTHING; + +-- ------------------------------------------------------------ +-- Směrování signál → cíl (per site) +-- ------------------------------------------------------------ +CREATE TABLE ems.signal_route ( + id SERIAL PRIMARY KEY, + site_id INT NOT NULL REFERENCES ems.site (id), + destination_type TEXT NOT NULL, + endpoint_id INT NOT NULL REFERENCES ems.site_endpoint (id), + signal_code TEXT NOT NULL REFERENCES ems.signal_def (code), + destination_key TEXT NOT NULL, + route_config_json JSONB, + transform_json JSONB, + verify_readback BOOLEAN NOT NULL DEFAULT true, + verify_config_json JSONB, + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_signal_route_unique UNIQUE (site_id, destination_type, signal_code, destination_key) +); + +CREATE INDEX idx_signal_route_site_enabled + ON ems.signal_route (site_id, enabled) + WHERE enabled = true; + +COMMENT ON TABLE ems.signal_route IS +'Mapování signálu na cíl (Loxone Virtual Input, HTTP REST atd.). endpoint_id ukazuje na ems.site_endpoint (loxone_http, budoucí shelly_http, …).'; + +COMMENT ON COLUMN ems.signal_route.destination_type IS +'loxone_vi = GET /dev/sps/io/{destination_key}/{value}; http_rest = šablona v route_config_json.'; + +COMMENT ON COLUMN ems.signal_route.destination_key IS +'U Loxone název Virtual Inputu. U HTTP REST stabilní klíč pro log (např. relay0).'; + +COMMENT ON COLUMN ems.signal_route.route_config_json IS +'Volitelná konfigurace pro http_rest (path_template, method, …). U loxone_vi typicky NULL.'; + +COMMENT ON COLUMN ems.signal_route.verify_config_json IS +'Readback: u Loxone např. {"loxone_io_name":"EMS_ExportBan_Active_FB"} pro GET /dev/sps/io/{name}. U HTTP JSON path atd.'; + +-- ------------------------------------------------------------ +-- Odchozí journal +-- ------------------------------------------------------------ +CREATE TABLE ems.signal_outbound_journal ( + id BIGSERIAL PRIMARY KEY, + route_id INT NOT NULL REFERENCES ems.signal_route (id), + site_id INT NOT NULL REFERENCES ems.site (id), + signal_code TEXT NOT NULL, + value_text TEXT NOT NULL, + value_num NUMERIC, + status TEXT NOT NULL, + attempt_count INT NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_error TEXT, + http_method TEXT, + request_url TEXT, + http_status INT, + latency_ms INT, + response_body_trunc TEXT, + sent_at TIMESTAMPTZ, + verified_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT chk_signal_outbound_status CHECK ( + status IN ('queued', 'sent', 'verified', 'failed', 'abandoned') + ) +); + +CREATE INDEX idx_signal_outbound_worker + ON ems.signal_outbound_journal (status, next_attempt_at); + +CREATE INDEX idx_signal_outbound_site_debug + ON ems.signal_outbound_journal (site_id, signal_code, created_at DESC); + +COMMENT ON TABLE ems.signal_outbound_journal IS +'Journal odchozích signálů (HTTP). Worker odesílá queued, po úspěchu sent, po readback verified nebo failed s retry.'; + +-- ------------------------------------------------------------ +-- Poslední známý stav (idempotence) +-- ------------------------------------------------------------ +CREATE TABLE ems.signal_state ( + site_id INT NOT NULL REFERENCES ems.site (id), + signal_code TEXT NOT NULL, + destination_type TEXT NOT NULL, + destination_key TEXT NOT NULL, + last_desired_value_text TEXT, + last_sent_value_text TEXT, + last_verified_value_text TEXT, + last_sent_at TIMESTAMPTZ, + last_verified_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (site_id, signal_code, destination_type, destination_key) +); + +COMMENT ON TABLE ems.signal_state IS +'Poslední požadovaná / odeslaná / ověřená hodnota signálu per cíl — idempotence a diagnostika verify.'; diff --git a/docs/04-modules/signal-outbound.md b/docs/04-modules/signal-outbound.md new file mode 100644 index 0000000..d7c52c2 --- /dev/null +++ b/docs/04-modules/signal-outbound.md @@ -0,0 +1,51 @@ +# Modul: Odchozí signály (Loxone / HTTP) + +## Účel + +- EMS šle **logické signály** (např. `EXPORT_BAN_ACTIVE`) na externí cíle přes **DB konfigurované routy** (`ems.signal_route`). +- Každý pokus o odeslání se zapisuje do **`ems.signal_outbound_journal`** (retry, HTTP metadata, stav `queued` → `sent` → `verified`). +- **`ems.signal_state`** drží poslední požadovanou / odeslanou / ověřenou hodnotu pro **idempotenci** a diagnostiku readbacku. + +Kritické řízení výkonu (**Deye, EV, TČ**) zůstává ve **Modbus exporteru** a tabulce **`ems.modbus_command`**. Tento modul **nenahrazuje** plánované zápisy výkonu do invertoru ani paralelní zápis na stejné Modbus výstupy. + +## Hranice: signál vs. Modbus + +| Typ akce | Kde | +|----------|-----| +| Plán, ekonomika, limity výkonu, zápis registrů Deye / nabíječky / TČ | `control_exporter` + `ems.modbus_command` | +| Indikace (LED), informativní stav do Loxone, jednoduchý HTTP přepínač (Shelly) | `ems.signal_*` + `services/signal_service.py` | + +Duplicitní ovládání **stejného fyzického výstupu** přes Modbus i přes signálový HTTP kanál se vyhýbe — jedna autoritativní cesta na výstup. + +## Destinace + +| `destination_type` | Chování | +|--------------------|---------| +| `loxone_vi` | `GET {base}/dev/sps/io/{destination_key}/{value_text}` (stejná konvence jako legacy `send_loxone_setpoints`). | +| `http_rest` | Zápis z `route_config_json` (`method`, `path_template` s `{value}` / `{v}`). Verify přes `verify_config_json` (`read_path`, volitelně `json_path` pro JSON odpověď). | + +## Worker běh + +- Zařazení (`enqueue_site_signals`) probíhá po **každém** dokončeném pokusu o `export_setpoints` (včetně předčasných returnů u AUTO bez plánu), v `finally` bloku — aby LED mohla klesnout i při chybějícím plánu. +- Odeslání a verify běží v **APScheduler** jobech v `backend/app/lifespan.py` (interval řádů sekund). + +## Konfigurace (příklad Loxone + readback) + +Viz [Loxone integrace – VI/VO](../loxone-integration.md) (`EMS_ExportBan_Active`, `EMS_ExportBan_Active_FB`). + +SQL příklad routy (po doplnění `endpoint_id` z `ems.site_endpoint` pro daný miniserver): + +```sql +INSERT INTO ems.signal_route ( + site_id, destination_type, endpoint_id, signal_code, destination_key, + verify_readback, verify_config_json +) VALUES ( + 3, + 'loxone_vi', + (SELECT id FROM ems.site_endpoint WHERE site_id = 3 AND endpoint_type = 'loxone_http' LIMIT 1), + 'EXPORT_BAN_ACTIVE', + 'EMS_ExportBan_Active', + true, + '{"loxone_io_name": "EMS_ExportBan_Active_FB"}'::jsonb +); +``` diff --git a/docs/loxone-integration.md b/docs/loxone-integration.md index 43ae9ab..a573f9c 100644 --- a/docs/loxone-integration.md +++ b/docs/loxone-integration.md @@ -27,6 +27,7 @@ EMS posílá hodnoty přes HTTP GET: `/dev/sps/io/{název}/{hodnota}` | `EMS_EV1_Power_W` | Analog | 0–22000 | Povolený výkon nabíječky EV č. 1 ve W. 0 = zakázat nabíjení. | | `EMS_EV2_Power_W` | Analog | 0–22000 | Povolený výkon nabíječky EV č. 2 ve W. 0 = zakázat nabíjení. | | `EMS_HeatPump_Enable` | Digital | 0/1 | Povolení provozu tepelného čerpadla. 1 = povolen, 0 = zakázán. | +| `EMS_ExportBan_Active` | Digital | 0/1 | **Signál z EMS (journal):** 1 = EMS aktuálně uplatňuje zákaz exportu do sítě (LED / indikace; viz `EXPORT_BAN_ACTIVE` v DB `ems.signal_route`). 0 = export ban neaktivní. Hodnotu periodicky posílá backend po řídicím cyklu; ověření readback viz níže. | > **Poznámka k setpointům:** `EMS_Battery_Setpoint_W` a `EMS_Grid_Setpoint_W` jsou informativní vstupy pro AUTO režim. Loxone je předá jako Modbus příkazy do střídače Deye. V ostatních režimech (SELF_SUSTAIN, PRESERVE, MANUAL) Loxone tyto hodnoty ignoruje a řídí se vlastní logikou. @@ -40,6 +41,12 @@ Vytvořit jako **Virtual HTTP Output** nebo stav dostupný přes HTTP GET pro EM |---|---|---| | `EMS_Mode_Active` | Analog | Aktuálně aktivní režim v Loxone. EMS čte při startu pro zjištění stavu. | | `EMS_Watchdog_Triggered` | Digital | 1 pokud watchdog přepnul na SELF_SUSTAIN bez příkazu od EMS. Pro diagnostiku. | +| `EMS_ExportBan_Active_FB` | Digital | **Readback pro verify:** musí odrážet stejný stav jako `EMS_ExportBan_Active` (např. ve Function Blocku zkopírovat hodnotu z VI na VO). EMS po zápisu čte `GET /dev/sps/io/EMS_ExportBan_Active_FB` a porovná s odeslanou hodnotou. Název lze přepsat v `ems.signal_route.verify_config_json` klíčem `loxone_io_name`. | + +**Export ban – výchozí konfigurace DB (Loxone):** + +- `ems.signal_def`: řádek `EXPORT_BAN_ACTIVE` (seed v migraci). +- `ems.signal_route`: jeden řádek na lokalitu — `destination_type = 'loxone_vi'`, `endpoint_id` → `site_endpoint` typu `loxone_http`, `signal_code = 'EXPORT_BAN_ACTIVE'`, `destination_key = 'EMS_ExportBan_Active'`, `verify_readback = true`, `verify_config_json = {"loxone_io_name": "EMS_ExportBan_Active_FB"}`. ---