From 182d5a37e1d9437568e9dff7b398dcb2cddd5c72 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Fri, 3 Apr 2026 16:03:06 +0200 Subject: [PATCH] third version before modbus cleanup --- .gitignore | 1 + backend/app/routers/full_status.py | 41 +++- backend/services/audit_filler.py | 8 +- backend/services/control_exporter.py | 81 ++++--- backend/services/modbus_client.py | 201 ++++++++++++---- backend/services/planning_engine.py | 207 ++++++++++++++--- backend/services/telemetry_collector.py | 4 +- backend/tests/test_planning_dispatch_milp.py | 215 ++++++++++++++++++ ...027__planning_inputs_battery_semantics.sql | 52 +++++ ...n_fill_baseline_load_forecast_accuracy.sql | 106 +++++++++ .../R__vw_baseline_load_forecast_accuracy.sql | 12 + db/views/R__z_postgrest_ems_anon_grants.sql | 2 + docs/03-data-model.md | 9 +- docs/04-modules/consumption.md | 2 +- docs/04-modules/modbus-registers.md | 2 +- docs/04-modules/planning.md | 25 +- frontend/src/pages/Dashboard.tsx | 4 +- .../src/types/react-router-dom-ambient.d.ts | 2 + 18 files changed, 846 insertions(+), 128 deletions(-) create mode 100644 backend/tests/test_planning_dispatch_milp.py create mode 100644 db/migration/V027__planning_inputs_battery_semantics.sql create mode 100644 db/routines/R__fn_fill_baseline_load_forecast_accuracy.sql create mode 100644 db/views/R__vw_baseline_load_forecast_accuracy.sql diff --git a/.gitignore b/.gitignore index 474ee3e..8e9e2e9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ __pycache__/ .ruff_cache/ venv/ .venv/ +backend/.ems-modbus-locks/ node_modules/ dist/ *.tsbuildinfo diff --git a/backend/app/routers/full_status.py b/backend/app/routers/full_status.py index 112d6b1..2fee3ec 100644 --- a/backend/app/routers/full_status.py +++ b/backend/app/routers/full_status.py @@ -151,7 +151,8 @@ async def get_site_status_full( reserve_row = await conn.fetchrow( """ - SELECT MIN(reserve_soc_percent)::float AS reserve_soc + SELECT MIN(reserve_soc_percent)::float AS reserve_soc, + MIN(min_soc_percent)::float AS min_soc FROM ems.asset_battery WHERE site_id = $1 """, @@ -173,7 +174,10 @@ async def get_site_status_full( if run_row: int_rows = await conn.fetch( """ - SELECT interval_start, battery_setpoint_w + SELECT interval_start, battery_setpoint_w, + load_baseline_w, + pv_a_forecast_raw_w, pv_b_forecast_raw_w, + pv_a_forecast_solver_w, pv_b_forecast_solver_w FROM ems.planning_interval WHERE run_id = $1 ORDER BY interval_start @@ -243,6 +247,7 @@ async def get_site_status_full( mode_code = (mode_row["mode_code"] if mode_row else None) or "" reserve_soc = float(reserve_row["reserve_soc"]) if reserve_row and reserve_row["reserve_soc"] is not None else None + min_soc = float(reserve_row["min_soc"]) if reserve_row and reserve_row["min_soc"] is not None else None soc = float(inv_row["battery_soc_percent"]) if inv_row and inv_row["battery_soc_percent"] is not None else None alerts: list[dict[str, str]] = [] @@ -265,8 +270,10 @@ async def get_site_status_full( if mode_code.upper() == "MANUAL": add_alert("warn", "Systém v manuálním režimu") - if reserve_soc is not None and soc is not None and soc < reserve_soc: - add_alert("error", "SoC baterie pod rezervou") + if min_soc is not None and soc is not None and soc < min_soc: + add_alert("error", "SoC baterie pod minimálním limitem") + elif reserve_soc is not None and soc is not None and soc < reserve_soc: + add_alert("warn", "SoC pod ekonomickou rezervou (arbitrážní podlaha)") if hb_age is None or hb_age > HEARTBEAT_STALE_SEC: add_alert("error", "EMS heartbeat výpadek") @@ -300,6 +307,7 @@ def _infrastructure_notification_items( has_plan: bool, tomorrow_slots: int, mode_code: str, + min_soc: float | None, reserve_soc: float | None, soc: float | None, inv_age: int | None, @@ -354,8 +362,20 @@ def _infrastructure_notification_items( if mode_code.upper() == "MANUAL": push("mode_manual", "info", "Manuální režim", "Automatická optimalizace je vypnutá.") - if reserve_soc is not None and soc is not None and soc < reserve_soc: - push("soc_reserve", "error", "SoC pod rezervou", "Nabití baterie je pod nastavenou bezpečnostní rezervou.") + if min_soc is not None and soc is not None and soc < min_soc: + push( + "soc_min", + "error", + "SoC pod minimem", + "SoC je pod absolutním minimem z konfigurace baterie.", + ) + elif reserve_soc is not None and soc is not None and soc < reserve_soc: + push( + "soc_reserve", + "warning", + "SoC pod ekonomickou rezervou", + "SoC je pod arbitrážní podlahou – plánovač může v tomto pásmu omezovat export.", + ) if hb_age is None or hb_age > HEARTBEAT_STALE_SEC: push("heartbeat", "error", "EMS heartbeat", "Služba EMS nehlásí pravidelný heartbeat.") @@ -402,7 +422,8 @@ async def get_site_notifications( ) reserve_row = await conn.fetchrow( """ - SELECT MIN(reserve_soc_percent)::float AS reserve_soc + SELECT MIN(reserve_soc_percent)::float AS reserve_soc, + MIN(min_soc_percent)::float AS min_soc FROM ems.asset_battery WHERE site_id = $1 """, @@ -512,6 +533,11 @@ async def get_site_notifications( if reserve_row and reserve_row["reserve_soc"] is not None else None ) + min_soc = ( + float(reserve_row["min_soc"]) + if reserve_row and reserve_row["min_soc"] is not None + else None + ) soc = ( float(inv_row["battery_soc_percent"]) if inv_row and inv_row["battery_soc_percent"] is not None @@ -524,6 +550,7 @@ async def get_site_notifications( has_plan=has_plan, tomorrow_slots=int(tomorrow_slots or 0), mode_code=mode_code, + min_soc=min_soc, reserve_soc=reserve_soc, soc=soc, inv_age=inv_age, diff --git a/backend/services/audit_filler.py b/backend/services/audit_filler.py index fd72015..4e6a4e9 100644 --- a/backend/services/audit_filler.py +++ b/backend/services/audit_filler.py @@ -37,10 +37,16 @@ async def fill_audit_for_completed_intervals(site_id: int, db) -> None: ) for row in rows: + slot = row["slot"] await db.execute( "SELECT ems.fn_fill_audit_interval($1, $2)", site_id, - row["slot"], + slot, + ) + await db.execute( + "SELECT ems.fn_fill_baseline_load_forecast_accuracy($1, $2)", + site_id, + slot, ) if rows: diff --git a/backend/services/control_exporter.py b/backend/services/control_exporter.py index b62f244..a615c79 100644 --- a/backend/services/control_exporter.py +++ b/backend/services/control_exporter.py @@ -25,6 +25,10 @@ BATT_VOLTAGE_V = 51.2 REG178_SELL = 0b00100000 # 32, grid peak shaving disable REG178_PASSIVE = 0b00110000 # 48, grid peak shaving enable (PASSIVE i CHARGE) +# Neaktivní TOU bloky (3–6): „konec dne“ — Deye často 23:59 (2359) neuloží a vrátí např. 2355, +# verify pak hlásí mismatch. 23:55 je na zařízeních stabilní (viz HHMM jako desítkové číslo). +DEYE_TOU_INACTIVE_HHMM = 2355 + DEYE_REGISTER_NAMES: dict[int, str] = { 108: "max_charge_a (max nabíjecí proud baterie)", 109: "max_discharge_a (max vybíjecí proud baterie)", @@ -97,6 +101,7 @@ class InverterConfig: max_battery_charge_w: int | None max_battery_discharge_w: int | None reserve_soc_percent: int | None + max_soc_percent: int | None usable_capacity_wh: int | None max_charge_a: int max_discharge_a: int @@ -195,13 +200,14 @@ async def execute_modbus_commands( ) if cmd is None: continue - client = await get_modbus_client( - cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]) - ) + unit = int(cmd["device_unit_id"]) + client = await get_modbus_client(cmd["device_host"], int(cmd["device_port"])) for attempt in range(MAX_RETRIES): try: await client.write_registers( - int(cmd["register"]), [int(cmd["value_to_write"])] + int(cmd["register"]), + [int(cmd["value_to_write"])], + unit, ) await db.execute( """ @@ -231,7 +237,7 @@ async def execute_modbus_commands( e, ) await asyncio.sleep(RETRY_DELAY) - client._client = None # force reconnect + await client.force_disconnect() else: await db.execute( """ @@ -290,30 +296,31 @@ async def verify_modbus_commands( continue try: - client = await get_modbus_client( - cmd["device_host"], int(cmd["device_port"]), int(cmd["device_unit_id"]) - ) - actual = await client.read_register(int(cmd["register"])) + unit = int(cmd["device_unit_id"]) + client = await get_modbus_client(cmd["device_host"], int(cmd["device_port"])) + actual = await client.read_register(int(cmd["register"]), unit) + actual_i = int(actual) + expected_i = int(cmd["value_to_write"]) await db.execute( """ UPDATE ems.modbus_command - SET value_verified=$1, verified_at=now(), - status=CASE WHEN $1=$2 THEN 'verified' ELSE 'mismatch' END - WHERE id=$3 + SET value_verified=$1::int, verified_at=now(), + status=CASE WHEN $1::int = $2::int THEN 'verified' ELSE 'mismatch' END + WHERE id=$3::int """, - actual, - int(cmd["value_to_write"]), + actual_i, + expected_i, cmd_id, ) - if actual != int(cmd["value_to_write"]): + if actual_i != expected_i: logger.error( "[cmd %s] MISMATCH %s 0x%04X: expected=%s actual=%s", cmd_id, cmd["asset_code"], int(cmd["register"]), - cmd["value_to_write"], - actual, + expected_i, + actual_i, ) row_ac = await db.fetchrow( "SELECT attempt_count FROM ems.modbus_command WHERE id=$1", cmd_id @@ -323,8 +330,8 @@ async def verify_modbus_commands( cmd["asset_code"], int(cmd["register"]), cmd["register_name"] or "", - int(cmd["value_to_write"]), - actual, + expected_i, + actual_i, attempts, ) @@ -356,8 +363,8 @@ async def verify_modbus_commands( site["code"], ( f"Modbus mismatch: {cmd['asset_code']} " - f"0x{cmd['register']:04X} expected={cmd['value_to_write']} " - f"actual={actual}" + f"0x{cmd['register']:04X} expected={expected_i} " + f"actual={actual_i}" ), ) all_ok = False @@ -367,7 +374,7 @@ async def verify_modbus_commands( cmd_id, cmd["asset_code"], int(cmd["register"]), - actual, + actual_i, ) except Exception as e: logger.error("[cmd %s] verify read failed: %s", cmd_id, e) @@ -436,6 +443,7 @@ async def _load_inverter_config( ai.max_battery_charge_w, ai.max_battery_discharge_w, ab.reserve_soc_percent, + ab.max_soc_percent, ab.usable_capacity_wh, LEAST( COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w), @@ -489,6 +497,9 @@ async def _load_inverter_config( reserve_soc_percent=int(row["reserve_soc_percent"]) if row["reserve_soc_percent"] is not None else None, + max_soc_percent=int(row["max_soc_percent"]) + if row["max_soc_percent"] is not None + else None, usable_capacity_wh=int(row["usable_capacity_wh"]) if row["usable_capacity_wh"] is not None else None, @@ -729,7 +740,8 @@ def _deye_tou_params( if deye_mode == "CHARGE": raw_bat = setpoints.battery_w battery_w = int(raw_bat) if raw_bat is not None else 0 - target_soc = min(95, setpoints.target_soc_pct or 80) + cap = int(inv.max_soc_percent) if inv.max_soc_percent is not None else 95 + target_soc = max(10, min(95, cap)) tp_charge_w = battery_watts_to_amps(battery_w, inv.max_charge_a) * int(BATT_VOLTAGE_V) return tp_charge_w, target_soc, True return tp_discharge_w, reserve_soc, False @@ -798,7 +810,7 @@ async def write_inverter_setpoints( for idx in range(2, 6): registers.extend( _deye_time_point_rows( - idx, 2359, tp_discharge_w, reserve_soc, False + idx, DEYE_TOU_INACTIVE_HHMM, tp_discharge_w, reserve_soc, False ) ) @@ -857,21 +869,26 @@ async def write_inverter_setpoints( async def read_deye_registers_live(site_id: int, db: asyncpg.Connection) -> dict[str, Any]: """ Živé čtení holding registrů Deye 108, 109, 141, 142, 143, 178, 191 (stejné TCP spojení jako telemetrie/export). + Vše pod jedním mutexem + sdružené FC3 bloky — mezi jednotlivými read_register dřív telemetrie + střídavě brala lock a RS485 brány házely cizí transaction_id / I/O timeouty. """ inv = await _load_inverter_config(site_id, db) if inv is None: raise ValueError("no controllable Modbus inverter for site") - client = await get_modbus_client(inv.host, inv.port, inv.unit_id) + uid = int(inv.unit_id) + client = await get_modbus_client(inv.host, inv.port) read_at = datetime.now(timezone.utc) try: - r108 = await client.read_register(108) - r109 = await client.read_register(109) - r141 = await client.read_register(141) - r142 = await client.read_register(142) - r143 = await client.read_register(143) - r178 = await client.read_register(178) - r191 = await client.read_register(191) + async with client.batch(uid) as mb: + b108 = await mb.read_holding_registers(108, 2) + b141 = await mb.read_holding_registers(141, 3) + r178 = await mb.read_holding_registers(178, 1) + r191 = await mb.read_holding_registers(191, 1) + r108, r109 = b108[0], b108[1] + r141, r142, r143 = b141[0], b141[1], b141[2] + r178 = r178[0] + r191 = r191[0] except Exception: logger.exception("read_deye_registers_live site=%s failed", site_id) raise diff --git a/backend/services/modbus_client.py b/backend/services/modbus_client.py index 0ed1192..3eac006 100644 --- a/backend/services/modbus_client.py +++ b/backend/services/modbus_client.py @@ -3,46 +3,117 @@ from __future__ import annotations import asyncio +import hashlib import logging +import os from collections.abc import AsyncIterator from contextlib import asynccontextmanager +from pathlib import Path from pymodbus.client import AsyncModbusTcpClient +try: + import fcntl + + _FCNTL = True +except ImportError: + fcntl = None # type: ignore[assignment] + _FCNTL = False + logger = logging.getLogger(__name__) +_flock_warned = False + + +_BACKEND_ROOT = Path(__file__).resolve().parent.parent +_DEFAULT_LOCK_DIR = _BACKEND_ROOT / ".ems-modbus-locks" + + +def _gateway_lock_path(host: str, port: int) -> Path: + # Výchozí = backend/.ems-modbus-locks (v Dockeru /app → mount ./backend), aby flock sdílel + # hostitel + kontejner při dvou backend procesech na stejné bráně; přepiš EMS_MODBUS_LOCK_DIR. + base = Path(os.getenv("EMS_MODBUS_LOCK_DIR", str(_DEFAULT_LOCK_DIR))) + h = hashlib.sha256(f"{host.strip()}:{int(port)}".encode()).hexdigest()[:20] + return base / f"{h}.lock" + + +@asynccontextmanager +async def _gateway_exclusive(host: str, port: int): + """ + Jedna RS485 linka přes levné TCP↔serial brány nesmí obsluhovat dva procesy najednou — + odpovědi pak mají cizí transaction_id (např. 22 vs 54000). flock serializuje napříč PID. + Vypnout: EMS_MODBUS_DISABLE_FLOCK=1 (nebo neexistující fcntl, např. Windows). + """ + global _flock_warned + port_i = int(port) + host_s = host.strip() + if ( + not _FCNTL + or os.getenv("EMS_MODBUS_DISABLE_FLOCK", "").lower() in ("1", "true", "yes") + ): + if not _FCNTL and not _flock_warned: + logger.warning( + "Modbus: fcntl nedostupný — meziprocesová serializace na bránu %s:%s " + "neaktivní (riziko kolizí při dvou masterech)", + host_s, + port_i, + ) + _flock_warned = True + yield + return + + path = _gateway_lock_path(host_s, port_i) + path.parent.mkdir(parents=True, exist_ok=True) + f = open(path, "a+b") # noqa: SIM115 + try: + await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_EX) + yield + finally: + try: + await asyncio.to_thread(fcntl.flock, f.fileno(), fcntl.LOCK_UN) + except OSError: + pass + f.close() + class ModbusBatch: """Více read/write pod jedním držením locku (žádný jiný task na stejném klientovi mezi nimi).""" - def __init__(self, owner: PersistentModbusClient) -> None: + def __init__(self, owner: PersistentModbusClient, device_id: int) -> None: self._o = owner + self._device_id = device_id async def read_register(self, address: int) -> int: - return await self._o._read_register_locked(address) + return await self._o._read_register_locked(address, self._device_id) async def read_register_signed(self, address: int) -> int: raw = await self.read_register(address) return raw - 65536 if raw > 32767 else raw + async def read_holding_registers(self, address: int, count: int) -> list[int]: + """Jedna PDU (FC3) pro po sobě jdoucí registry — méně kolizí na RS485 bránách.""" + return await self._o._read_holding_registers_locked( + address, count, self._device_id + ) + async def write_register(self, address: int, value: int) -> bool: - return await self._o._write_register_locked(address, value) + return await self._o._write_register_locked(address, value, self._device_id) async def write_registers(self, address: int, values: list[int]) -> bool: - return await self._o._write_registers_locked(address, values) + return await self._o._write_registers_locked(address, values, self._device_id) class PersistentModbusClient: """ - Jedno persistentní TCP spojení na převodník. + Jedno persistentní TCP spojení na převodník (host:port). + Unit ID se předává u každé operace — serial servery často nezvládají + paralelní TCP sockety na stejnou bránu (pližící se transaction_id mismatch). Serializuje všechny požadavky přes asyncio.Lock(). - Automaticky reconnectuje při výpadku. """ - def __init__(self, host: str, port: int, device_id: int = 1) -> None: - self.host = host - self.port = port - self.device_id = device_id + def __init__(self, host: str, port: int) -> None: + self.host = host.strip() + self.port = int(port) self._client: AsyncModbusTcpClient | None = None self._lock = asyncio.Lock() @@ -52,12 +123,12 @@ class PersistentModbusClient: if self._client is not None: self._client.close() self._client = None - logger.info("Modbus connecting %s:%s dev=%s", self.host, self.port, self.device_id) + logger.info("Modbus connecting %s:%s", self.host, self.port) self._client = AsyncModbusTcpClient( self.host, port=self.port, - timeout=5, - retries=2, + timeout=8, + retries=3, ) await self._client.connect() if not self._client.connected: @@ -66,13 +137,13 @@ class PersistentModbusClient: raise ConnectionError(f"Cannot connect Modbus {self.host}:{self.port}") logger.info("Modbus connected %s:%s", self.host, self.port) - async def _read_register_locked(self, address: int) -> int: + async def _read_register_locked(self, address: int, device_id: int) -> int: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: r = await self._client.read_holding_registers( - address, count=1, device_id=self.device_id + address, count=1, device_id=int(device_id) ) if r.isError() or not getattr(r, "registers", None): raise OSError(f"Read error 0x{address:04X}: {r!r}") @@ -83,14 +154,43 @@ class PersistentModbusClient: self._client = None raise - async def _write_registers_locked(self, address: int, values: list[int]) -> bool: + async def _read_holding_registers_locked( + self, address: int, count: int, device_id: int + ) -> list[int]: + if count < 1 or count > 125: + raise ValueError(f"invalid holding register count: {count}") + if self._client is None or not self._client.connected: + await self._ensure_connected() + assert self._client is not None + try: + r = await self._client.read_holding_registers( + address, count=count, device_id=int(device_id) + ) + if r.isError() or not getattr(r, "registers", None): + raise OSError(f"Read error 0x{address:04X} x{count}: {r!r}") + if len(r.registers) != count: + raise OSError( + f"Read 0x{address:04X}: expected {count} regs, got {len(r.registers)}" + ) + return [int(x) for x in r.registers] + except Exception as e: + logger.warning( + "Modbus read 0x%04X count=%s failed: %s", address, count, e + ) + self._client.close() + self._client = None + raise + + async def _write_registers_locked( + self, address: int, values: list[int], device_id: int + ) -> bool: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: clamped = [max(0, min(65535, int(v))) for v in values] r = await self._client.write_registers( - address, clamped, device_id=self.device_id + address, clamped, device_id=int(device_id) ) if r.isError(): raise OSError(f"Write error 0x{address:04X}={clamped}: {r!r}") @@ -103,13 +203,17 @@ class PersistentModbusClient: self._client = None raise - async def _write_register_locked(self, address: int, value: int) -> bool: + async def _write_register_locked( + self, address: int, value: int, device_id: int + ) -> bool: if self._client is None or not self._client.connected: await self._ensure_connected() assert self._client is not None try: v = max(0, min(65535, int(value))) - r = await self._client.write_register(address, v, device_id=self.device_id) + r = await self._client.write_register( + address, v, device_id=int(device_id) + ) if r.isError(): raise OSError(f"Write error 0x{address:04X}={v}: {r!r}") return True @@ -119,32 +223,46 @@ class PersistentModbusClient: self._client = None raise - async def read_register(self, address: int) -> int: - async with self._lock: - await self._ensure_connected() - return await self._read_register_locked(address) + async def read_register(self, address: int, device_id: int = 1) -> int: + async with _gateway_exclusive(self.host, self.port): + async with self._lock: + await self._ensure_connected() + return await self._read_register_locked(address, device_id) - async def read_register_signed(self, address: int) -> int: - raw = await self.read_register(address) + async def read_register_signed(self, address: int, device_id: int = 1) -> int: + raw = await self.read_register(address, device_id) return raw - 65536 if raw > 32767 else raw - async def write_register(self, address: int, value: int) -> bool: - async with self._lock: - await self._ensure_connected() - return await self._write_register_locked(address, value) + async def write_register(self, address: int, value: int, device_id: int = 1) -> bool: + async with _gateway_exclusive(self.host, self.port): + async with self._lock: + await self._ensure_connected() + return await self._write_register_locked(address, value, device_id) - async def write_registers(self, address: int, values: list[int]) -> bool: + async def write_registers( + self, address: int, values: list[int], device_id: int = 1 + ) -> bool: """FC 0x10 – povinné pro Deye registry 60–499 (jeden i více registrů).""" - async with self._lock: - await self._ensure_connected() - return await self._write_registers_locked(address, values) + async with _gateway_exclusive(self.host, self.port): + async with self._lock: + await self._ensure_connected() + return await self._write_registers_locked(address, values, device_id) + + async def force_disconnect(self) -> None: + """Uzavře socket pod lockem (např. před retry po chybě).""" + async with _gateway_exclusive(self.host, self.port): + async with self._lock: + if self._client is not None: + self._client.close() + self._client = None @asynccontextmanager - async def batch(self) -> AsyncIterator[ModbusBatch]: + async def batch(self, device_id: int = 1) -> AsyncIterator[ModbusBatch]: """Drží lock pro více po sobě jdoucích operací (telemetrie vs. control na stejné bráně).""" - async with self._lock: - await self._ensure_connected() - yield ModbusBatch(self) + async with _gateway_exclusive(self.host, self.port): + async with self._lock: + await self._ensure_connected() + yield ModbusBatch(self, int(device_id)) def close(self) -> None: if self._client is not None: @@ -156,11 +274,10 @@ _clients: dict[str, PersistentModbusClient] = {} _registry_lock = asyncio.Lock() -async def get_modbus_client( - host: str, port: int, device_id: int = 1 -) -> PersistentModbusClient: - key = f"{host}:{port}:{device_id}" +async def get_modbus_client(host: str, port: int) -> PersistentModbusClient: + """Jedno TCP spojení na převodník podle host:port (unit ID u jednotlivých volání).""" + key = f"{host.strip()}:{int(port)}" async with _registry_lock: if key not in _clients: - _clients[key] = PersistentModbusClient(host, port, device_id) + _clients[key] = PersistentModbusClient(host.strip(), port) return _clients[key] diff --git a/backend/services/planning_engine.py b/backend/services/planning_engine.py index 3433a41..87efa2d 100644 --- a/backend/services/planning_engine.py +++ b/backend/services/planning_engine.py @@ -36,6 +36,9 @@ CORRECTION_MIN_CLAMP = 0.5 # spodní limit korekčního faktoru CORRECTION_MAX_CLAMP = 1.5 # horní limit korekčního faktoru # Útlum korekce: čím dál od aktuálního času, tím méně korigujeme forecast CORRECTION_DECAY_SLOTS = 16 # po 16 slotech (4h) klesne korekce na 0 +# Dynamická ekonomická podlaha (MILP w_arb): lookahead FVE energie v dalších slotech +ARB_LOOKAHEAD_SLOTS = 32 # 8 h při INTERVAL_H=0.25 +ARB_FLOOR_E_REF_FRAC = 0.5 # má scale Wh = tato frakce usable_capacity (0..1) _PRAGUE_TZ = ZoneInfo("Europe/Prague") @@ -83,6 +86,34 @@ def _pv_coverage_ratio(slots: list["PlanningSlot"], battery, hours: int = 24) -> return max(0.0, min(1.0, pv_kwh / batt_kwh)) +def _dynamic_arb_floor_wh_series( + slots: list["PlanningSlot"], + min_soc_wh: float, + arb_base_wh: float, + usable_wh: float, +) -> list[float]: + """ + Časově proměnná ekonomická podlaha Wh pro MILP (nad min_soc_wh). + Hodně očekávané FVE energie v dalších ARB_LOOKAHEAD_SLOTS → podlaha klesá k min_soc_wh; + málo slunce → zůstává u arb_base_wh (typicky reserve z DB). + """ + T = len(slots) + if T == 0: + return [] + e_ref = max(1.0, ARB_FLOOR_E_REF_FRAC * float(usable_wh)) + spread = max(0.0, float(arb_base_wh) - float(min_soc_wh)) + out: list[float] = [] + for t in range(T): + e_pv_wh = 0.0 + for k in range(t, min(T, t + ARB_LOOKAHEAD_SLOTS)): + s = slots[k] + e_pv_wh += max(0, s.pv_a_forecast_w + s.pv_b_forecast_w) * INTERVAL_H + f = min(1.0, e_pv_wh / e_ref) if e_ref > 1e-9 else 1.0 + arb_t = float(min_soc_wh) + (1.0 - f) * spread + out.append(arb_t) + return out + + def _soc_security_profile(slots: list["PlanningSlot"], battery) -> tuple[float, float]: """ Při nízkém očekávaném slunci drží solver vyšší SoC buffer: @@ -282,12 +313,25 @@ def solve_dispatch( prob = pulp.LpProblem("ems_dispatch", pulp.LpMinimize) + min_soc_wh = float(getattr(battery, "min_soc_wh", battery.reserve_soc_wh)) + arb_base_wh = max( + float(getattr(battery, "arb_floor_wh", battery.reserve_soc_wh)), + min_soc_wh, + ) + if getattr(battery, "disable_dynamic_arb_floor", False): + arb_floor_series = [arb_base_wh] * T + else: + arb_floor_series = _dynamic_arb_floor_wh_series( + slots, min_soc_wh, arb_base_wh, float(battery.usable_capacity_wh) + ) + # --- Proměnné --- gi = [pulp.LpVariable(f"gi_{t}", 0, grid.max_import_power_w) for t in range(T)] ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)] bc = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)] bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)] - soc = [pulp.LpVariable(f"soc_{t}", battery.reserve_soc_wh, battery.soc_max_wh) for t in range(T)] + soc = [pulp.LpVariable(f"soc_{t}", min_soc_wh, battery.soc_max_wh) for t in range(T)] + w_arb = [pulp.LpVariable(f"w_arb_{t}", cat=pulp.LpBinary) for t in range(T)] ca = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)] hp = [pulp.LpVariable(f"hp_{t}", 0, heat_pump.rated_heating_power_w) for t in range(T)] soc_deficit_24h = pulp.LpVariable("soc_deficit_24h", 0, battery.usable_capacity_wh) @@ -346,14 +390,26 @@ def solve_dispatch( if s.sell_price < 0: prob += ge[t] == 0 - # Záporná nákupní cena → cap import na reálnou spotřebu + # Záporná nákupní cena → cap import (baseline domu + akumulace + řízené zátěže) if s.buy_price < 0: prob += gi[t] <= ( - battery.max_charge_power_w + s.load_baseline_w + + battery.max_charge_power_w + sum(v.max_charge_power_w for v in vehicles) + heat_pump.rated_heating_power_w ) + soc_prev_expr = current_soc_wh if t == 0 else soc[t - 1] + arb_t = arb_floor_series[t] + prob += soc_prev_expr >= (arb_t - (arb_t - min_soc_wh) * (1 - w_arb[t])) + prob += bd[t] <= ( + s.load_baseline_w + + ev_total_t + + hp[t] + + bc[t] + + battery.max_discharge_power_w * w_arb[t] + ) + # EV – limity a připojení for e in range(EV): connected = ( @@ -519,6 +575,7 @@ async def run_daily_plan(site_id: int, db, triggered_by: str = "scheduler:daily" price_failsafe_active=price_failsafe_active, ) + slot_inputs = _build_slot_inputs(slots, slots) run_id = await _save_planning_run( site_id, results, @@ -531,6 +588,7 @@ async def run_daily_plan(site_id: int, db, triggered_by: str = "scheduler:daily" duration_ms=duration_ms, correction=1.0, db=db, + slot_inputs=slot_inputs, ) logger.info(f"[site={site_id}] Daily plan done in {duration_ms} ms") return run_id, duration_ms @@ -589,6 +647,7 @@ async def run_rolling_replan( correction_factor, correction_log = await compute_correction_factor(site_id, now, db) slots = await _load_slots(site_id, replan_from, horizon_to, db) + slots_before_pv_correction = list(slots) critical_slots = int(36 / INTERVAL_H) missing_ote_count = sum(1 for s in slots[:critical_slots] if s.is_predicted_price) price_failsafe_active = missing_ote_count > 0 @@ -610,6 +669,7 @@ async def run_rolling_replan( price_failsafe_active=price_failsafe_active, ) + slot_inputs = _build_slot_inputs(slots_before_pv_correction, slots) run_id = await _save_planning_run( site_id, results, @@ -622,6 +682,7 @@ async def run_rolling_replan( duration_ms=duration_ms, correction=correction_factor, db=db, + slot_inputs=slot_inputs, ) await db.execute( @@ -718,6 +779,7 @@ async def _load_site_context(site_id: int, db): brow = await db.fetchrow( """ SELECT ab.usable_capacity_wh, + ab.min_soc_percent, ab.reserve_soc_percent, ab.max_soc_percent, ab.charge_efficiency, @@ -770,11 +832,14 @@ async def _load_site_context(site_id: int, db): ) uc = float(brow["usable_capacity_wh"]) - reserve_wh = float(brow["reserve_soc_percent"]) / 100.0 * uc + min_soc_wh = float(brow["min_soc_percent"]) / 100.0 * uc + arb_floor_wh = float(brow["reserve_soc_percent"]) / 100.0 * uc soc_max_wh = float(brow["max_soc_percent"]) / 100.0 * uc battery = SimpleNamespace( usable_capacity_wh=uc, - reserve_soc_wh=reserve_wh, + min_soc_wh=min_soc_wh, + arb_floor_wh=arb_floor_wh, + reserve_soc_wh=arb_floor_wh, soc_max_wh=soc_max_wh, charge_efficiency=float(brow["charge_efficiency"]), discharge_efficiency=float(brow["discharge_efficiency"]), @@ -894,7 +959,7 @@ async def _load_site_context(site_id: int, db): soc_wh = uc * 0.5 else: soc_wh = float(soc_pct) / 100.0 * uc - soc_wh = max(reserve_wh, min(soc_wh, soc_max_wh)) + soc_wh = max(min_soc_wh, min(soc_wh, soc_max_wh)) tuv = await db.fetchval( """ @@ -1032,12 +1097,36 @@ async def _load_slots(site_id, from_dt, to_dt, db) -> list[PlanningSlot]: return out +def _build_slot_inputs( + slots_raw_pv: list[PlanningSlot], + slots_solver: list[PlanningSlot], +) -> list[tuple[int, int, int, int, int]]: + """(load_baseline_w, pv_a_raw, pv_b_raw, pv_a_solver, pv_b_solver) pro každý slot.""" + if len(slots_raw_pv) != len(slots_solver): + raise ValueError("slots_raw_pv and slots_solver length mismatch") + out: list[tuple[int, int, int, int, int]] = [] + for raw, sol in zip(slots_raw_pv, slots_solver): + out.append( + ( + int(raw.load_baseline_w), + int(raw.pv_a_forecast_w), + int(raw.pv_b_forecast_w), + int(sol.pv_a_forecast_w), + int(sol.pv_b_forecast_w), + ) + ) + return out + + async def _save_planning_run( site_id, results, horizon_from, horizon_to, run_type, triggered_by, replan_from, - soc_wh, duration_ms, correction, db + soc_wh, duration_ms, correction, db, + slot_inputs: Optional[list[tuple[int, int, int, int, int]]] = None, ) -> int: """Uloží výsledky solveru jako nový planning_run, deaktivuje předchozí.""" + if slot_inputs is not None and len(slot_inputs) != len(results): + raise ValueError("slot_inputs and results length mismatch") run_id = await db.fetchval(""" INSERT INTO ems.planning_run (site_id, horizon_start, horizon_end, status, @@ -1050,28 +1139,88 @@ async def _save_planning_run( soc_wh, duration_ms, correction) # Bulk insert výsledků - await db.executemany(""" - INSERT INTO ems.planning_interval - (run_id, interval_start, - battery_setpoint_w, battery_soc_target_pct, - grid_setpoint_w, - ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w, - heat_pump_enabled, heat_pump_setpoint_w, - pv_a_curtailed_w, expected_cost_czk, - effective_buy_price, effective_sell_price, - is_predicted_price) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16) - """, [ - (run_id, r.interval_start, - r.battery_setpoint_w, r.battery_soc_target, - r.grid_setpoint_w, - r.ev1_setpoint_w, r.ev2_setpoint_w, r.ev1_via_bat_w, r.ev2_via_bat_w, - r.heat_pump_enabled, r.heat_pump_setpoint_w, - r.pv_a_curtailed_w, r.expected_cost_czk, - r.effective_buy_price, r.effective_sell_price, - r.is_predicted_price) - for r in results - ]) + if slot_inputs is not None: + rows_pi = [ + ( + run_id, + r.interval_start, + r.battery_setpoint_w, + r.battery_soc_target, + r.grid_setpoint_w, + r.ev1_setpoint_w, + r.ev2_setpoint_w, + r.ev1_via_bat_w, + r.ev2_via_bat_w, + r.heat_pump_enabled, + r.heat_pump_setpoint_w, + r.pv_a_curtailed_w, + r.expected_cost_czk, + r.effective_buy_price, + r.effective_sell_price, + r.is_predicted_price, + si[0], + si[1], + si[2], + si[3], + si[4], + ) + for r, si in zip(results, slot_inputs) + ] + await db.executemany( + """ + INSERT INTO ems.planning_interval + (run_id, interval_start, + battery_setpoint_w, battery_soc_target_pct, + grid_setpoint_w, + ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w, + heat_pump_enabled, heat_pump_setpoint_w, + pv_a_curtailed_w, expected_cost_czk, + effective_buy_price, effective_sell_price, + is_predicted_price, + load_baseline_w, + pv_a_forecast_raw_w, pv_b_forecast_raw_w, + pv_a_forecast_solver_w, pv_b_forecast_solver_w) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16, + $17,$18,$19,$20,$21) + """, + rows_pi, + ) + else: + await db.executemany( + """ + INSERT INTO ems.planning_interval + (run_id, interval_start, + battery_setpoint_w, battery_soc_target_pct, + grid_setpoint_w, + ev1_setpoint_w, ev2_setpoint_w, ev1_via_bat_w, ev2_via_bat_w, + heat_pump_enabled, heat_pump_setpoint_w, + pv_a_curtailed_w, expected_cost_czk, + effective_buy_price, effective_sell_price, + is_predicted_price) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16) + """, + [ + ( + run_id, + r.interval_start, + r.battery_setpoint_w, + r.battery_soc_target, + r.grid_setpoint_w, + r.ev1_setpoint_w, + r.ev2_setpoint_w, + r.ev1_via_bat_w, + r.ev2_via_bat_w, + r.heat_pump_enabled, + r.heat_pump_setpoint_w, + r.pv_a_curtailed_w, + r.expected_cost_czk, + r.effective_buy_price, + r.effective_sell_price, + r.is_predicted_price, + ) + for r in results + ], + ) # Aktivovat nový plán, supersede předchozí await db.execute(""" diff --git a/backend/services/telemetry_collector.py b/backend/services/telemetry_collector.py index 2e91e9f..b3f5223 100644 --- a/backend/services/telemetry_collector.py +++ b/backend/services/telemetry_collector.py @@ -47,8 +47,8 @@ async def poll_inverter(site_id: int, db: asyncpg.Connection) -> None: port = int(row["port"] or 502) unit_id = int(row["unit_id"] if row["unit_id"] is not None else 1) try: - client = await get_modbus_client(host, port, unit_id) - async with client.batch() as mb: + client = await get_modbus_client(host, port) + async with client.batch(unit_id) as mb: run_state = await mb.read_register(DEYE_REG_RUN_STATE) battery_soc = await mb.read_register(DEYE_REG_BATTERY_SOC) battery_power = await mb.read_register_signed(DEYE_REG_BATTERY_POWER_FLOW) diff --git a/backend/tests/test_planning_dispatch_milp.py b/backend/tests/test_planning_dispatch_milp.py new file mode 100644 index 0000000..7186108 --- /dev/null +++ b/backend/tests/test_planning_dispatch_milp.py @@ -0,0 +1,215 @@ +"""MILP dispatch: dvouúrovňové SoC a záporná nákupní cena (bez DB).""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timezone +from types import SimpleNamespace + +from services.planning_engine import ( + PlanningSlot, + _dynamic_arb_floor_wh_series, + solve_dispatch, +) + + +def _slot( + *, + load: int = 2000, + buy: float = 3.0, + sell: float = 3.0, + pv_a: int = 0, + pv_b: int = 0, +) -> PlanningSlot: + return PlanningSlot( + interval_start=datetime(2026, 4, 3, 12, 0, tzinfo=timezone.utc), + buy_price=buy, + sell_price=sell, + pv_a_forecast_w=pv_a, + pv_b_forecast_w=pv_b, + load_baseline_w=load, + ev1_connected=False, + ev2_connected=False, + is_predicted_price=False, + ) + + +def _battery( + *, + uc_wh: float = 100_000.0, + min_pct: float = 10.0, + arb_pct: float = 20.0, + max_pct: float = 95.0, +) -> SimpleNamespace: + uc = uc_wh + min_wh = min_pct / 100.0 * uc + arb_wh = arb_pct / 100.0 * uc + return SimpleNamespace( + usable_capacity_wh=uc, + min_soc_wh=min_wh, + arb_floor_wh=arb_wh, + reserve_soc_wh=arb_wh, + soc_max_wh=max_pct / 100.0 * uc, + charge_efficiency=0.95, + discharge_efficiency=0.95, + degradation_cost_czk_kwh=0.15, + max_charge_power_w=10_000, + max_discharge_power_w=10_000, + ) + + +class DynamicArbFloorTests(unittest.TestCase): + def test_more_pv_ahead_lowers_floor(self) -> None: + """Čím víc FVE ve lookahead, tím nižší ekonomická podlaha v prvním slotu.""" + min_w = 1_000.0 + base_w = 2_000.0 + uc = 10_000.0 + s0 = _slot() + s_low_pv = replace_slot(s0, pv_a=100, pv_b=0) + s_high_pv = replace_slot(s0, pv_a=50_000, pv_b=0) + ser_low = _dynamic_arb_floor_wh_series([s_low_pv] * 40, min_w, base_w, uc) + ser_high = _dynamic_arb_floor_wh_series([s_high_pv] * 40, min_w, base_w, uc) + self.assertLess(ser_high[0], ser_low[0]) + self.assertGreaterEqual(ser_low[0], min_w) + self.assertLessEqual(ser_low[0], base_w) + + +def replace_slot( + s: PlanningSlot, + *, + pv_a: int | None = None, + pv_b: int | None = None, + load: int | None = None, +) -> PlanningSlot: + return PlanningSlot( + interval_start=s.interval_start, + buy_price=s.buy_price, + sell_price=s.sell_price, + pv_a_forecast_w=pv_a if pv_a is not None else s.pv_a_forecast_w, + pv_b_forecast_w=pv_b if pv_b is not None else s.pv_b_forecast_w, + load_baseline_w=load if load is not None else s.load_baseline_w, + ev1_connected=s.ev1_connected, + ev2_connected=s.ev2_connected, + is_predicted_price=s.is_predicted_price, + ) + + +class PlanningDispatchMilpTests(unittest.TestCase): + def test_two_tier_soc_solves_optimal(self) -> None: + slots = [_slot()] + battery = _battery() + hp = SimpleNamespace( + rated_heating_power_w=0, + tuv_min_temp_c=45.0, + tuv_target_temp_c=55.0, + ) + grid = SimpleNamespace(max_import_power_w=15_000, max_export_power_w=15_000) + vehicles = [ + SimpleNamespace( + max_charge_power_w=0, + battery_capacity_kwh=1.0, + default_target_soc_pct=80.0, + ), + SimpleNamespace( + max_charge_power_w=0, + battery_capacity_kwh=1.0, + default_target_soc_pct=80.0, + ), + ] + soc0 = 0.15 * battery.usable_capacity_wh + results, ms = solve_dispatch( + slots, + battery, + hp, + grid, + [None, None], + vehicles, + soc0, + 50.0, + tuv_delta_stats=None, + operating_mode="AUTO", + price_failsafe_active=False, + ) + self.assertGreaterEqual(ms, 0) + self.assertEqual(len(results), 1) + + def test_deep_discharge_allows_covering_load_only(self) -> None: + slots = [ + _slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0), + _slot(load=3000, buy=1.0, sell=6.0, pv_a=0, pv_b=0), + ] + battery = _battery(uc_wh=50_000.0) + hp = SimpleNamespace( + rated_heating_power_w=0, + tuv_min_temp_c=45.0, + tuv_target_temp_c=55.0, + ) + grid = SimpleNamespace(max_import_power_w=20_000, max_export_power_w=20_000) + vehicles = [ + SimpleNamespace( + max_charge_power_w=11_000, + battery_capacity_kwh=50.0, + default_target_soc_pct=80.0, + ), + SimpleNamespace( + max_charge_power_w=11_000, + battery_capacity_kwh=50.0, + default_target_soc_pct=80.0, + ), + ] + soc0 = 0.12 * battery.usable_capacity_wh + results, _ms = solve_dispatch( + slots, + battery, + hp, + grid, + [None, None], + vehicles, + soc0, + 50.0, + tuv_delta_stats=None, + operating_mode="AUTO", + price_failsafe_active=False, + ) + self.assertEqual(len(results), 2) + + def test_negative_buy_price_allows_import_for_baseline(self) -> None: + slots = [_slot(load=6000, buy=-0.5, sell=2.0)] + battery = _battery() + hp = SimpleNamespace( + rated_heating_power_w=8000, + tuv_min_temp_c=45.0, + tuv_target_temp_c=55.0, + ) + grid = SimpleNamespace(max_import_power_w=25_000, max_export_power_w=15_000) + vehicles = [ + SimpleNamespace( + max_charge_power_w=11_000, + battery_capacity_kwh=50.0, + default_target_soc_pct=80.0, + ), + SimpleNamespace( + max_charge_power_w=11_000, + battery_capacity_kwh=50.0, + default_target_soc_pct=80.0, + ), + ] + soc0 = 0.5 * battery.usable_capacity_wh + results, _ms = solve_dispatch( + slots, + battery, + hp, + grid, + [None, None], + vehicles, + soc0, + 50.0, + tuv_delta_stats=None, + operating_mode="AUTO", + price_failsafe_active=False, + ) + self.assertGreaterEqual(results[0].grid_setpoint_w, 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/db/migration/V027__planning_inputs_battery_semantics.sql b/db/migration/V027__planning_inputs_battery_semantics.sql new file mode 100644 index 0000000..e58f1bb --- /dev/null +++ b/db/migration/V027__planning_inputs_battery_semantics.sql @@ -0,0 +1,52 @@ +-- EMS: two-tier SoC semantics (DB comments), restore arbitrage floor after V026 tuning, +-- planning_interval solver inputs, baseline load forecast accuracy tracking. + +-- Semantics: min_soc = absolute LP floor; reserve_soc = economic / export discharge floor +COMMENT ON COLUMN ems.asset_battery.min_soc_percent IS +'Minimální SoC v % – tvrdá spodní mez v LP (ochrana packu / BMS).'; +COMMENT ON COLUMN ems.asset_battery.reserve_soc_percent IS +'Ekonomická podlaha v %: pod ní solver neplánuje „nadbytečné“ vybíjení související s exportem (MILP); nadřazuje se min_soc_percent.'; + +-- Obnovit rezervu 20 % tam, kde V026 sladila ekonomiku na reserve=10 společně s degradací 0.15 +UPDATE ems.asset_battery +SET reserve_soc_percent = 20.00 +WHERE reserve_soc_percent = 10.00 + AND degradation_cost_czk_kwh = 0.1500; + +ALTER TABLE ems.planning_interval + ADD COLUMN IF NOT EXISTS load_baseline_w INT, + ADD COLUMN IF NOT EXISTS pv_a_forecast_raw_w INT, + ADD COLUMN IF NOT EXISTS pv_b_forecast_raw_w INT, + ADD COLUMN IF NOT EXISTS pv_a_forecast_solver_w INT, + ADD COLUMN IF NOT EXISTS pv_b_forecast_solver_w INT; + +COMMENT ON COLUMN ems.planning_interval.load_baseline_w IS +'Bazální spotřeba (W) vstupující do LP pro tento slot (stats DOW+hodina / fallback).'; +COMMENT ON COLUMN ems.planning_interval.pv_a_forecast_raw_w IS +'FVE pole A – výkon z DB před rolling korekcí forecastu.'; +COMMENT ON COLUMN ems.planning_interval.pv_b_forecast_raw_w IS +'FVE pole B – výkon z DB před rolling korekcí forecastu.'; +COMMENT ON COLUMN ems.planning_interval.pv_a_forecast_solver_w IS +'FVE pole A – výkon po korekci (vstup do solve_dispatch).'; +COMMENT ON COLUMN ems.planning_interval.pv_b_forecast_solver_w IS +'FVE pole B – výkon po korekci (vstup do solve_dispatch).'; + +CREATE TABLE IF NOT EXISTS ems.baseline_load_forecast_accuracy ( + site_id INT NOT NULL REFERENCES ems.site(id), + interval_start TIMESTAMPTZ NOT NULL, + planning_run_id INT NOT NULL REFERENCES ems.planning_run(id), + forecast_baseline_w INT, + actual_baseline_w INT, + error_w INT, + error_pct NUMERIC(8,4), + lead_time_hours NUMERIC(6,2), + filled_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (site_id, interval_start) +); + +COMMENT ON TABLE ems.baseline_load_forecast_accuracy IS +'Zpětná kontrola: plánovaný bazální výkon vs. skutečnost (load - EV - TČ za 15min z auditu).'; +COMMENT ON COLUMN ems.baseline_load_forecast_accuracy.forecast_baseline_w IS 'Vstup z planning_interval při uložení plánu.'; +COMMENT ON COLUMN ems.baseline_load_forecast_accuracy.actual_baseline_w IS 'Skutečný bazál W (shodná definice jako fn_update_baseline_stats).'; +COMMENT ON COLUMN ems.baseline_load_forecast_accuracy.lead_time_hours IS 'Hodiny mezi created_at plánu a začátkem intervalu.'; + diff --git a/db/routines/R__fn_fill_baseline_load_forecast_accuracy.sql b/db/routines/R__fn_fill_baseline_load_forecast_accuracy.sql new file mode 100644 index 0000000..1f1a52f --- /dev/null +++ b/db/routines/R__fn_fill_baseline_load_forecast_accuracy.sql @@ -0,0 +1,106 @@ +-- Doplní jeden řádek baseline_load_forecast_accuracy po vyplnění auditu (stejný interval). + +CREATE OR REPLACE FUNCTION ems.fn_fill_baseline_load_forecast_accuracy( + p_site_id INT, + p_interval_start TIMESTAMPTZ +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + v_run_id INT; + v_forecast INT; + v_actual_load INT; + v_actual_ev INT; + v_actual_hp INT; + v_actual_baseline INT; + v_err INT; + v_pct NUMERIC(8,4); + v_lead NUMERIC(6,2); + v_created TIMESTAMPTZ; +BEGIN + SELECT + ai.planning_run_id, + ai.actual_load_power_w, + ai.actual_ev_power_w, + ai.actual_heat_pump_power_w + INTO v_run_id, v_actual_load, v_actual_ev, v_actual_hp + FROM ems.audit_interval ai + WHERE ai.site_id = p_site_id + AND ai.interval_start = p_interval_start; + + IF v_run_id IS NULL THEN + RETURN; + END IF; + + SELECT pi.load_baseline_w + INTO v_forecast + FROM ems.planning_interval pi + WHERE pi.run_id = v_run_id + AND pi.interval_start = p_interval_start; + + IF v_forecast IS NULL THEN + RETURN; + END IF; + + IF v_actual_load IS NULL THEN + RETURN; + END IF; + + v_actual_baseline := GREATEST(0, + v_actual_load + - COALESCE(v_actual_ev, 0) + - COALESCE(v_actual_hp, 0) + ); + + v_err := v_forecast - v_actual_baseline; + IF v_actual_baseline > 0 THEN + v_pct := (v_err::NUMERIC / v_actual_baseline) * 100.0; + ELSE + v_pct := NULL; + END IF; + + SELECT pr.created_at INTO v_created + FROM ems.planning_run pr + WHERE pr.id = v_run_id; + + IF v_created IS NOT NULL THEN + v_lead := EXTRACT(EPOCH FROM (p_interval_start - v_created)) / 3600.0; + ELSE + v_lead := NULL; + END IF; + + INSERT INTO ems.baseline_load_forecast_accuracy ( + site_id, + interval_start, + planning_run_id, + forecast_baseline_w, + actual_baseline_w, + error_w, + error_pct, + lead_time_hours, + filled_at + ) VALUES ( + p_site_id, + p_interval_start, + v_run_id, + v_forecast, + v_actual_baseline, + v_err, + v_pct, + v_lead, + now() + ) + ON CONFLICT (site_id, interval_start) DO UPDATE SET + planning_run_id = EXCLUDED.planning_run_id, + forecast_baseline_w = EXCLUDED.forecast_baseline_w, + actual_baseline_w = EXCLUDED.actual_baseline_w, + error_w = EXCLUDED.error_w, + error_pct = EXCLUDED.error_pct, + lead_time_hours = EXCLUDED.lead_time_hours, + filled_at = EXCLUDED.filled_at; +END; +$$; + +COMMENT ON FUNCTION ems.fn_fill_baseline_load_forecast_accuracy IS +'Po fn_fill_audit_interval: uloží odchylku plánovaného load_baseline vs. skutečný bazál z auditu.'; diff --git a/db/views/R__vw_baseline_load_forecast_accuracy.sql b/db/views/R__vw_baseline_load_forecast_accuracy.sql new file mode 100644 index 0000000..8951221 --- /dev/null +++ b/db/views/R__vw_baseline_load_forecast_accuracy.sql @@ -0,0 +1,12 @@ +CREATE OR REPLACE VIEW ems.vw_baseline_load_forecast_accuracy_daily AS +SELECT + site_id, + date_trunc('day', interval_start AT TIME ZONE 'Europe/Prague') AS day_prague, + COUNT(*) AS slot_count, + AVG(ABS(error_w))::NUMERIC(12,2) AS mae_w, + AVG(error_pct) FILTER (WHERE error_pct IS NOT NULL)::NUMERIC(8,4) AS avg_error_pct +FROM ems.baseline_load_forecast_accuracy +GROUP BY site_id, date_trunc('day', interval_start AT TIME ZONE 'Europe/Prague'); + +COMMENT ON VIEW ems.vw_baseline_load_forecast_accuracy_daily IS +'Denní souhrn přesnosti predikce bazální spotřeby (|chyba| v průměru W).'; diff --git a/db/views/R__z_postgrest_ems_anon_grants.sql b/db/views/R__z_postgrest_ems_anon_grants.sql index 54893ae..907f376 100644 --- a/db/views/R__z_postgrest_ems_anon_grants.sql +++ b/db/views/R__z_postgrest_ems_anon_grants.sql @@ -18,3 +18,5 @@ GRANT SELECT ON ems.vw_forecast_accuracy_daily TO ems_anon; GRANT SELECT ON ems.consumption_baseline_stats TO ems_anon; GRANT SELECT ON ems.market_price_stats TO ems_anon; GRANT SELECT ON ems.tuv_usage_stats TO ems_anon; +GRANT SELECT ON ems.baseline_load_forecast_accuracy TO ems_anon; +GRANT SELECT ON ems.vw_baseline_load_forecast_accuracy_daily TO ems_anon; diff --git a/docs/03-data-model.md b/docs/03-data-model.md index 18ddfc0..64289a0 100644 --- a/docs/03-data-model.md +++ b/docs/03-data-model.md @@ -110,8 +110,8 @@ CREATE TABLE asset_battery ( inverter_id INT REFERENCES asset_inverter(id), code TEXT NOT NULL, usable_capacity_wh INT NOT NULL, -- 64000 - min_soc_percent NUMERIC(5,2) DEFAULT 10, - reserve_soc_percent NUMERIC(5,2) DEFAULT 20, -- rezerva pro výpadek + min_soc_percent NUMERIC(5,2) DEFAULT 10, -- absolutní podlaha LP + reserve_soc_percent NUMERIC(5,2) DEFAULT 20, -- ekonomická podlaha (export/arbitráž) max_soc_percent NUMERIC(5,2) DEFAULT 95, charge_efficiency NUMERIC(5,4) DEFAULT 0.95, discharge_efficiency NUMERIC(5,4) DEFAULT 0.95, @@ -362,10 +362,15 @@ CREATE TABLE planning_interval ( expected_cost_czk NUMERIC(10,4), effective_buy_price NUMERIC(10,6), effective_sell_price NUMERIC(10,6), + -- + sloupce z migrací (curtailment, EV1/2, predicted price, vstupy solveru): + -- load_baseline_w, pv_a_forecast_raw_w, pv_b_forecast_raw_w, + -- pv_a_forecast_solver_w, pv_b_forecast_solver_w PRIMARY KEY (run_id, interval_start) ); ``` +Tabulka `baseline_load_forecast_accuracy` (migrace V027+) ukládá zpětně plánovaný bazál vs. skutečný bazál z auditu; plní `fn_fill_baseline_load_forecast_accuracy` po `fn_fill_audit_interval`. + --- ## Audit diff --git a/docs/04-modules/consumption.md b/docs/04-modules/consumption.md index c8f3a68..6ac22a3 100644 --- a/docs/04-modules/consumption.md +++ b/docs/04-modules/consumption.md @@ -42,7 +42,7 @@ bazální_w = load_power_w - ev_power_w - heat_pump_power_w **Predikce do horizontu:** **`ems.fn_get_baseline_forecast(site_id, from, to)`** generuje 15min sloty (`generate_series`), pro každý slot najde řádek podle DOW+hodiny v Praze. **`forecast_w`** = uložený průměr; **`confidence_w`** = konzervativní odhad `avg + 0.5 * COALESCE(stddev, 100)`. Pokud pro slot neexistuje statistika, fallback **`forecast_w = 500` W** (málo nebo žádná historie; prakticky odpovídá situaci před ~4 týdny kvalitních dat v jednotlivých hodinách). Směrodatná odchylka je v DB k dispozici pro budoucí použití v solveru (fáze 2). -**Solver (`planning_engine._load_slots`):** pro každý 15min interval efektivní ceny bere **`avg_power_w` z `consumption_baseline_stats`** podle DOW+hodiny slotu, jinak **500 W** – nečte `consumption_baseline_interval`. +**Solver (`planning_engine._load_slots`):** pro každý 15min interval efektivní ceny bere **`avg_power_w` z `consumption_baseline_stats`** podle DOW+hodiny slotu, jinak **500 W** – nečte `consumption_baseline_interval`. Stejná hodnota se ukládá do **`planning_interval.load_baseline_w`** při každém běhu plánovače (přehled v UI / PostgREST). Odchylka vs. skutečnost: tabulka **`baseline_load_forecast_accuracy`**, plněno po auditu. > **Poznámka:** TUV jako samostatný odečet zůstává otevřený bod, pokud není měřen zvlášť; aktuálně je TČ zahrnut v `heat_pump_power_w`. diff --git a/docs/04-modules/modbus-registers.md b/docs/04-modules/modbus-registers.md index cb15aa8..f775fe5 100644 --- a/docs/04-modules/modbus-registers.md +++ b/docs/04-modules/modbus-registers.md @@ -76,7 +76,7 @@ Deye má 6 časových bloků. EMS přepisuje **bloky 1–2** při každém `cont |------|---------------------------|-------------|------|---------|-------------| | 1 | **`current_slot_hhmm()`** – začátek **probíhajícího** 15min slotu | `planning_interval` pro **aktuální** slot (`_fetch_plan_row_for_slot_offset(..., 0)`) | PASSIVE / SELL / CHARGE dle `_deye_tou_params` | viz tabulka níže | viz tabulka níže | | 2 | **`next_slot_hhmm()`** – začátek **následujícího** 15min slotu | `planning_interval` pro **další** slot (`_fetch_plan_row_for_slot_offset(..., 1)`) | Přechod na další čtvrthodinu | viz tabulka níže | viz tabulka níže | -| 3–6 | 23:59 | — | Neaktivní (rezerva) | `reserve_soc` (DB) | NE | +| 3–6 | **23:55** (2355) | — | Neaktivní (rezerva); ne 23:59 — firmware Deye často 2359 neuloží → verify mismatch | `reserve_soc` (DB) | NE | **Registry 108 / 109 / 142 / 178 / 143** odpovídají **aktuálnímu** plánu (okamžitý výstup; `setpoints_now` v `write_inverter_setpoints`). TOU řádky 1–2 doplňují stejnou logiku pro časové segmenty (`_deye_tou_params`). diff --git a/docs/04-modules/planning.md b/docs/04-modules/planning.md index 4df5c49..bc4992a 100644 --- a/docs/04-modules/planning.md +++ b/docs/04-modules/planning.md @@ -13,14 +13,19 @@ - **Runtime guard v exportu setpointů:** - při `AUTO` + `is_predicted_price=true` se na exportní vrstvě vynutí PASSIVE/no-export chování. - **Ekonomika baterie:** - - `reserve_soc_percent` naladěn na 10 %, - - `degradation_cost_czk_kwh` naladěn na 0.1500, - - penalizace cyklu je v objective symetrická (`0.5*(charge+discharge)`). + - `min_soc_percent` = tvrdá spodní mez SoC v LP (typicky 10 %), + - `reserve_soc_percent` = ekonomická („arbitrážní“) podlaha – pod ní MILP s binární proměnnou omezuje vybíjení tak, aby export z baterie nečerpal hluboké pásmo (typicky 20 %; migrace V027 může vrátit hodnotu po V026), + - `degradation_cost_czk_kwh` (např. 0.15) / penalizace cyklu v objective symetrická (`0.5*(charge+discharge)`). - **PV-aware nejistota:** - objective používá `pv_scarcity_factor` (0.65..1.0), odvozený z forecastu slunce, - při slabém slunci je plán ochotnější držet energii v baterii. -- **SoC buffer bez hard pravidel:** - - místo explicitních pravidel se používá ekonomická penalizace deficitu vůči bezpečnostnímu SoC cíli na konci 24h horizontu. +- **SoC buffer:** + - měkký cíl na konci 24h přes `_soc_security_profile` + tvrdé dvouúrovňové pravidlo výše. +- **Dynamická ekonomická podlaha (fáze 2):** + - `_dynamic_arb_floor_wh_series`: podle součtu FVE výkonu v dalších ~8 h (`ARB_LOOKAHEAD_SLOTS`) se `arb_floor_wh[t]` posouvá mezi `min_soc_wh` a rezervou z DB – silné očekávané slunce ji sníží (ráno / po obloze); vynutit konstantní chování lze `battery.disable_dynamic_arb_floor=True` jen pro testy / ladění. +- **Záporná nákupní cena:** + - horní mez `grid_import` zahrnuje `load_baseline_w` + nabíjení/EV/TČ (bez nekonečného importu). +- **Uložené vstupy plánu** (`planning_interval`): `load_baseline_w`, `pv_*_forecast_raw_w`, `pv_*_forecast_solver_w` pro UI a audit. Solver optimalizuje celý horizont (typicky 36h) najednou, čímž přirozeně zvládá: - pohled dopředu (ráno ví že přes poledne bude záporná cena → prodává z baterie) @@ -179,11 +184,11 @@ soc[0] == current_soc_wh # počáteční podmínka z telemetrie ### SoC limity ```python -soc_min_wh <= soc[t] <= soc_max_wh +soc_min_wh <= soc[t] <= soc_max_wh # min_soc_percent z DB (např. 10 %) -# Rezerva pro výpadek sítě – nikdy nesahat -soc_reserve_wh = battery.reserve_soc_percent / 100 * battery.usable_capacity_wh -soc[t] >= soc_reserve_wh # za normálních podmínek +# Ekonomická podlaha (reserve_soc_percent, např. 20 %): binární w_arb[t] v MILP – +# pod touto hranicí je bd omezeno na load+EV+TČ+bc (žádné „nadbytečné“ vybíjení pro export z baterie). +# Měkký buffer na konci 24h dál přes soc_deficit_24h. ``` ### Limity výkonu @@ -266,7 +271,7 @@ def solve_dispatch( batt_charge = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)] batt_discharge = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)] soc = [pulp.LpVariable(f"soc_{t}", - battery.reserve_soc_wh, + battery.min_soc_wh, battery.soc_max_wh) for t in range(T)] curtail_a = [pulp.LpVariable(f"ca_{t}", 0, slots[t].pv_a_forecast_w) for t in range(T)] ev_charge = [pulp.LpVariable(f"ev_{t}", 0, ev_max_total_w) for t in range(T)] diff --git a/frontend/src/pages/Dashboard.tsx b/frontend/src/pages/Dashboard.tsx index 7bb1f07..e272e32 100644 --- a/frontend/src/pages/Dashboard.tsx +++ b/frontend/src/pages/Dashboard.tsx @@ -1,6 +1,7 @@ import type { ChartArea } from 'chart.js' import { Activity, Battery, ChevronDown, ChevronUp, Sun, Zap } from 'lucide-react' import { memo, useCallback, useEffect, useState } from 'react' +import { useNavigate } from 'react-router-dom' import { EnergyChart } from '../components/charts/EnergyChart' import { ForecastPanel } from '../components/charts/ForecastPanel' @@ -57,6 +58,7 @@ function MetricSkeleton() { } export function Dashboard() { + const navigate = useNavigate() const { site: siteRow, ready: siteReady, error: siteErr } = useSiteStatus() const siteId = siteRow?.site_id ?? null const data = useDashboardData(siteId) @@ -172,7 +174,7 @@ export function Dashboard() { activatedAt={modeActivatedAt} nextReplanIn={nextReplanIn} onReplan={handleReplan} - onModeChange={() => {}} + onModeChange={() => navigate('/settings')} /> ) : null} diff --git a/frontend/src/types/react-router-dom-ambient.d.ts b/frontend/src/types/react-router-dom-ambient.d.ts index 1943566..ccba5ca 100644 --- a/frontend/src/types/react-router-dom-ambient.d.ts +++ b/frontend/src/types/react-router-dom-ambient.d.ts @@ -6,4 +6,6 @@ declare module 'react-router-dom' { export function Route(props: Record): JSX.Element | null export function Outlet(): JSX.Element | null export function NavLink(props: Record): JSX.Element + /** Zjednodušená deklarace – celý modul je zde ručně kvůli buildu bez @types balíčku. */ + export function useNavigate(): (to: string) => void }