uprava zapisovani casu do deye (nevyvolava self_sustain), notifikace na discord pri zmene rezimu
Some checks failed
deploy / deploy (push) Failing after 20s
test / smoke-test (push) Successful in 3s

This commit is contained in:
Dusan Vojacek
2026-04-06 20:53:58 +02:00
parent 4881966d00
commit c955efb9cb
9 changed files with 182 additions and 96 deletions

View File

@@ -40,6 +40,10 @@ from services.control_exporter import (
)
from services.heartbeat_service import send_heartbeat
from services.forecast_service import fetch_pv_forecast
from services.notification_service import (
notify_operating_mode_changed,
run_fn_set_mode_with_discord,
)
from services.price_importer import import_ote_prices
from services.telemetry_collector import run_telemetry_loop_wrapper
from pydantic import BaseModel, Field
@@ -123,7 +127,15 @@ async def lifespan(app: FastAPI):
async def scheduled_expire_modes() -> None:
async with app.state.pg_pool.acquire() as conn:
try:
await conn.fetchval("SELECT ems.fn_expire_modes()")
rows = await conn.fetch("SELECT * FROM ems.fn_expire_modes()")
for r in rows:
await notify_operating_mode_changed(
str(r["site_code"]),
str(r["old_mode"]),
str(r["new_mode"]),
"system:expiry",
"Automatické vypršení dočasného režimu",
)
except Exception:
logger.exception("scheduled_expire_modes failed")
@@ -1215,8 +1227,8 @@ async def set_site_mode(
raise HTTPException(status_code=404, detail="Site not found")
try:
await conn.execute(
"SELECT ems.fn_set_mode($1, $2, $3, $4, $5)",
await run_fn_set_mode_with_discord(
conn,
site_id,
mode,
"user:api",

View File

@@ -178,8 +178,9 @@ def _deye_should_skip_time_sync_after_read(
r64: int,
) -> bool:
"""
True = nezařazovat zápis 6264: drift je malý a od posledního úspěšného zápisu času
neuplynul 24h (deye_last_system_time_sync_at se mění jen při zápisu, ne při přeskočení).
True = nezařazovat zápis 6264: drift je malý a od posledního úspěšného ověření času
(status verified v journalu 6264) neuplynul 24h — sloupec deye_last_system_time_sync_at
se doplňuje jen po tolerančním ověření v _verify_deye_clock_command_run.
"""
dev = _deye_registers_to_prague_datetime(r62, r63, r64)
if dev is None:
@@ -438,9 +439,11 @@ async def execute_modbus_commands(
async def _switch_to_self_sustain(site_id: int, db: asyncpg.Connection, reason: str) -> None:
"""Přepne lokalitu na SELF_SUSTAIN a zaloguje důvod."""
await db.execute(
"SELECT ems.fn_set_mode($1, $2, $3, $4, $5)",
"""Přepne lokalitu na SELF_SUSTAIN, zaloguje důvod a při změně pošle Discord."""
from services.notification_service import run_fn_set_mode_with_discord
await run_fn_set_mode_with_discord(
db,
site_id,
"SELF_SUSTAIN",
"system:mismatch",
@@ -461,8 +464,8 @@ async def _verify_deye_clock_command_run(
Při mismatch retry všech tří řádků journalu společně.
"""
from services.notification_service import (
notify_modbus_clock_verify_exhausted,
notify_modbus_mismatch,
notify_self_sustain_activated,
)
run_s = sorted(run, key=lambda c: int(c["register"]))
@@ -487,6 +490,17 @@ async def _verify_deye_clock_command_run(
)
if clock_ok:
inv_asset_id = int(run_s[0]["asset_id"])
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
inv_asset_id,
)
for cmd, actual in zip(run_s, values):
logger.info(
"[cmd %s] verified OK (clock tolerant): %s 0x%04X=%s",
@@ -537,26 +551,15 @@ async def _verify_deye_clock_command_run(
await verify_modbus_commands(ids_ordered, db, site_id)
else:
logger.critical(
"[cmd clock] 3 failed attempts (6264 batch), switching to SELF_SUSTAIN"
"[cmd clock] 3 failed verify attempts (6264); režim se nemění automaticky"
)
site = await db.fetchrow("SELECT code FROM ems.site WHERE id=$1", site_id)
await _switch_to_self_sustain(
site_id,
db,
reason=(
f"Modbus mismatch po 3 pokusech: {cmd0['asset_code']} "
"regs 6264 (system time)"
),
await notify_modbus_clock_verify_exhausted(
site["code"] if site else str(site_id),
str(cmd0["asset_code"]),
(w62, w63, w64),
(a62, a63, a64),
)
if site:
await notify_self_sustain_activated(
site["code"],
(
f"Modbus mismatch: {cmd0['asset_code']} "
f"regs 6264 (system time) written=({w62},{w63},{w64}) "
f"actual=({a62},{a63},{a64})"
),
)
return False
@@ -569,10 +572,7 @@ async def verify_modbus_commands(
Přečte registry zpět (FC 3 po souvislých blocích) a porovná s value_to_write.
Při mismatch: retry → SELF_SUSTAIN + Discord.
"""
from services.notification_service import (
notify_modbus_mismatch,
notify_self_sustain_activated,
)
from services.notification_service import notify_modbus_mismatch
async def _apply_verify_result(cmd: asyncpg.Record, actual_i: int) -> bool:
"""Vrátí True při shodě, False při mismatch (a obslouží retry / SELF_SUSTAIN)."""
@@ -624,9 +624,6 @@ async def verify_modbus_commands(
"[cmd %s] 3 failed attempts, switching to SELF_SUSTAIN",
cmd_id,
)
site = await db.fetchrow(
"SELECT code FROM ems.site WHERE id=$1", site_id
)
await _switch_to_self_sustain(
site_id,
db,
@@ -635,15 +632,6 @@ async def verify_modbus_commands(
f"reg 0x{cmd['register']:04X}"
),
)
if site:
await notify_self_sustain_activated(
site["code"],
(
f"Modbus mismatch: {cmd['asset_code']} "
f"0x{cmd['register']:04X} expected={expected_i} "
f"actual={actual_i}"
),
)
return False
logger.info(
@@ -726,7 +714,17 @@ async def _fetch_operating_mode(site_id: int, db: asyncpg.Connection) -> Operati
if vu.tzinfo is None:
vu = vu.replace(tzinfo=timezone.utc)
if vu <= now_utc:
await db.execute("SELECT ems.fn_expire_modes()")
exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()")
from services.notification_service import notify_operating_mode_changed
for er in exp_rows:
await notify_operating_mode_changed(
str(er["site_code"]),
str(er["old_mode"]),
str(er["new_mode"]),
"system:expiry",
"Automatické vypršení dočasného režimu",
)
row = await db.fetchrow(sql, site_id)
if row is None:
return None
@@ -1183,7 +1181,6 @@ async def write_inverter_setpoints(
logger.info("Deye time will sync: %s CET", now_cet.strftime("%Y-%m-%d %H:%M:%S"))
registers: list[tuple[int, str, int]] = [] if skip_time else list(time_rows)
time_rows_were_scheduled = not skip_time
sp_tp2 = setpoints_next if setpoints_next is not None else setpoints_now
hh_cur = current_slot_hhmm()
@@ -1268,22 +1265,10 @@ async def write_inverter_setpoints(
inactive_sig,
inv.id,
)
if time_rows_were_scheduled:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
_prague_minute_start_utc(),
inv.id,
)
return (
f"OK inverter: batt_w={raw_bat!r} (no changes vs last verified Modbus snapshot)"
)
will_write_time = any(int(r) in (62, 63, 64) for r, _, _ in registers)
will_write_inactive = any(
int(r) in _DEYE_INACTIVE_TOU_REGISTERS for r, _, _ in registers
)
@@ -1305,18 +1290,6 @@ async def write_inverter_setpoints(
return f"FAIL inverter: {inv.code}: Modbus write failed (see modbus_command)"
logger.info("[control] Inverter %s journal write OK", inv.code)
minute_utc = _prague_minute_start_utc()
if will_write_time:
await db.execute(
"""
UPDATE ems.asset_inverter
SET deye_last_system_time_sync_minute = $1,
deye_last_system_time_sync_at = now()
WHERE id = $2
""",
minute_utc,
inv.id,
)
if need_inactive_tou or will_write_inactive:
await db.execute(
"""

View File

@@ -3,7 +3,9 @@
from __future__ import annotations
import logging
from datetime import datetime
import asyncpg
import httpx
from app.config import get_settings
@@ -11,6 +13,80 @@ from app.config import get_settings
logger = logging.getLogger(__name__)
def _discord_level_for_mode_change(activated_by: str) -> str:
if activated_by == "system:mismatch":
return "critical"
if activated_by.startswith("system:"):
return "warning"
return "info"
async def notify_operating_mode_changed(
site_code: str,
previous_mode: str,
new_mode: str,
activated_by: str,
notes: str | None,
*,
level: str | None = None,
) -> None:
lvl = level or _discord_level_for_mode_change(activated_by)
note_line = f"\nPoznámka: {notes}" if notes else ""
msg = (
f"Přepnutí provozního režimu lokalita `{site_code}`\n"
f"**{previous_mode}** → **{new_mode}**\n"
f"Aktivoval: `{activated_by}`{note_line}"
)
await send_discord(msg, level=lvl)
async def run_fn_set_mode_with_discord(
conn: asyncpg.Connection,
site_id: int,
mode_code: str,
activated_by: str,
valid_until: datetime | None,
notes: str | None,
*,
notify_level: str | None = None,
) -> str:
"""
Zavolá ems.fn_set_mode. Při skutečné změně režimu pošle Discord (pokud je webhook).
Vrátí aktuální mode_code z DB po volání.
"""
prev = await conn.fetchval(
"SELECT mode_code FROM ems.site_operating_mode WHERE site_id = $1",
site_id,
)
await conn.execute(
"SELECT ems.fn_set_mode($1, $2, $3, $4, $5)",
site_id,
mode_code,
activated_by,
valid_until,
notes,
)
new = await conn.fetchval(
"SELECT mode_code FROM ems.site_operating_mode WHERE site_id = $1",
site_id,
)
if new is None:
new = mode_code
if prev is not None and prev != new:
site_code = await conn.fetchval(
"SELECT code FROM ems.site WHERE id = $1", site_id
)
await notify_operating_mode_changed(
site_code or str(site_id),
str(prev),
str(new),
activated_by,
notes,
level=notify_level,
)
return str(new)
async def send_discord(message: str, level: str = "info") -> bool:
"""
Pošle notifikaci na Discord webhook.
@@ -65,6 +141,21 @@ async def notify_self_sustain_activated(site_code: str, reason: str) -> None:
await send_discord(msg, level="critical")
async def notify_modbus_clock_verify_exhausted(
site_code: str,
asset_code: str,
written: tuple[int, int, int],
actual: tuple[int, int, int],
) -> None:
msg = (
f"Modbus **systémový čas 6264** po 3 neúspěšných ověřeních **bez** přepnutí režimu.\n"
f"Lokalita `{site_code}`, zařízení `{asset_code}`\n"
f"Zapsáno: `{written}` | Přečteno: `{actual}`\n"
f"Doporučení: zkontrolovat firmware/RS485; režim EMS se nemění automaticky."
)
await send_discord(msg, level="critical")
async def notify_daily_economics(
site_code: str,
day: str,