Discord bot fáze B: tlačítka na EV zprávě → patch session + okamžitý replan
All checks were successful
CI and deploy / migration-check (push) Successful in 21s
CI and deploy / deploy (push) Has been skipped

services/discord_bot.py: gateway klient jako lifespan task (spojení ven,
žádný veřejný endpoint; bez DISCORD_BOT_TOKEN tiše spí). Tlačítka
[za 2h][za 4h][ráno][do plna][nenabíjet] s custom_id ev:<site>:<charger>:<akce>
(přežijí restart); whitelist DISCORD_ALLOWED_USER_IDS; akce = fn_ev_session_
apply_patch → run_rolling_replan → export_setpoints → edit zprávy novým plánem.

services/ev_notify.py: sdílený builder souhrnu (vyčleněno z collectoru),
send bot-first s webhook fallbackem. requirements: discord.py>=2.4.
7 testů helperů (parse, deadline akce vč. morning přes Prague TZ).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dusan Vojacek
2026-06-12 11:41:05 +02:00
parent 08a43aa236
commit 0e7f7b69ae
8 changed files with 435 additions and 88 deletions

View File

@@ -45,6 +45,11 @@ class Settings(BaseSettings):
planning_hp_max_cost_czk_kwh: float = Field(default=3.0)
planning_cheap_price_threshold: float = Field(default=0.85)
planning_expensive_price_threshold: float = Field(default=1.15)
# Discord bot — fáze B tlačítka (docs/discord-ev-interaction.md); prázdné = jen webhook
discord_bot_token: str = Field(default="")
discord_ev_channel_id: str = Field(default="")
discord_allowed_user_ids: str = Field(default="")
# Tesla Fleet API (docs/tesla-fleet-api.md); prázdné = integrace vypnutá
tesla_client_id: str = Field(default="")
tesla_client_secret: str = Field(default="")

View File

@@ -539,6 +539,11 @@ async def lifespan(app: FastAPI):
telemetry_task = asyncio.create_task(run_telemetry_loop_wrapper(app.state.pg_pool))
app.state.telemetry_task = telemetry_task
from services.discord_bot import run_discord_bot, set_pool as discord_set_pool
discord_set_pool(app.state.pg_pool)
discord_task = asyncio.create_task(run_discord_bot())
app.state.discord_task = discord_task
yield
@@ -547,6 +552,11 @@ async def lifespan(app: FastAPI):
logging.getLogger().removeHandler(ws_h)
app.state.ws_log_handler = None
discord_task.cancel()
try:
await discord_task
except (asyncio.CancelledError, Exception):
pass
telemetry_task.cancel()
try:
await telemetry_task

View File

@@ -12,3 +12,4 @@ pvlib>=0.11.0
pandas>=2.2.0
numpy>=2.0.0
httpx>=0.28.0
discord.py>=2.4.0

View File

@@ -0,0 +1,234 @@
"""Discord bot (gateway) — interaktivní EV zprávy s tlačítky.
Architektura: websocket spojení jde Z BACKENDU VEN (žádný veřejný endpoint,
EMS zůstává na VPN). Bot reaguje výhradně na whitelisted user ID a jediné,
co umí, je patch otevřené EV session + okamžitý replan — žádné režimy,
žádné registry. Bez DISCORD_BOT_TOKEN modul tiše spí (fáze A webhook).
Tlačítka (custom_id "ev:<site_id>:<charger_code>:<akce>" — DynamicItem
template, takže fungují i po restartu backendu):
h2 / h4 — odjezd za 2 / 4 hodiny (deadline = teď + N h)
morning — ráno (další výskyt default_deadline_hour vozidla, Prague)
full — do plna hned (target 100 %, deadline za hodinu → max tempo)
stop — nenabíjet (target = SoC při připojení)
Postup zřízení bota: docs/discord-ev-interaction.md.
"""
from __future__ import annotations
import asyncio
import logging
import re
from datetime import datetime, timedelta, timezone
from typing import Any
from zoneinfo import ZoneInfo
import asyncpg
from app.config import get_settings
logger = logging.getLogger(__name__)
_PRAGUE = ZoneInfo("Europe/Prague")
_POOL: asyncpg.Pool | None = None
_CLIENT: Any = None # discord.Client za lazy importem
CUSTOM_ID_RE = re.compile(r"^ev:(?P<site>\d+):(?P<charger>[a-z0-9\-]+):(?P<action>h2|h4|morning|full|stop)$")
ACTION_LABELS = [
("h2", "🕑 za 2 h"),
("h4", "🕓 za 4 h"),
("morning", "🌅 ráno"),
("full", "⚡ do plna"),
("stop", "✋ nenabíjet"),
]
def parse_custom_id(cid: str) -> tuple[int, str, str] | None:
m = CUSTOM_ID_RE.match(cid or "")
if not m:
return None
return int(m.group("site")), m.group("charger"), m.group("action")
def action_to_patch(
action: str,
*,
now: datetime,
soc_at_connect: float | None,
default_deadline_hour: int | None,
) -> dict:
"""Patch pro fn_ev_session_apply_patch dle tlačítka (čisté, testovatelné)."""
if action == "h2":
return {"target_deadline": (now + timedelta(hours=2)).isoformat()}
if action == "h4":
return {"target_deadline": (now + timedelta(hours=4)).isoformat()}
if action == "morning":
hour = int(default_deadline_hour or 7)
local = now.astimezone(_PRAGUE)
candidate = local.replace(hour=hour, minute=0, second=0, microsecond=0)
if candidate <= local:
candidate += timedelta(days=1)
return {"target_deadline": candidate.isoformat()}
if action == "full":
return {
"target_soc_pct": 100,
"target_deadline": (now + timedelta(hours=1)).isoformat(),
}
if action == "stop":
return {"target_soc_pct": float(soc_at_connect or 0)}
raise ValueError(f"unknown action {action}")
def set_pool(pool: asyncpg.Pool) -> None:
global _POOL
_POOL = pool
def _allowed_user_ids() -> set[int]:
raw = (getattr(get_settings(), "discord_allowed_user_ids", "") or "").strip()
out: set[int] = set()
for part in raw.split(","):
part = part.strip()
if part.isdigit():
out.add(int(part))
return out
async def post_ev_arrival(
site_id: int, charger_code: str, session_id: int, text: str
) -> bool:
"""Pošle zprávu s tlačítky přes bota. False = bot neběží/není kanál (fallback webhook)."""
if _CLIENT is None or not _CLIENT.is_ready():
return False
import discord
channel_id = int(getattr(get_settings(), "discord_ev_channel_id", 0) or 0)
if not channel_id:
return False
channel = _CLIENT.get_channel(channel_id)
if channel is None:
return False
view = discord.ui.View(timeout=None)
for action, label in ACTION_LABELS:
view.add_item(
discord.ui.Button(
label=label,
style=discord.ButtonStyle.secondary,
custom_id=f"ev:{site_id}:{charger_code}:{action}",
)
)
await channel.send(content=text, view=view)
return True
async def _handle_action(interaction: Any, site_id: int, charger_code: str, action: str) -> None:
import json
from services.control_exporter import export_setpoints
from services.ev_notify import build_ev_plan_summary, get_open_session
from services.planning_engine import run_rolling_replan
assert _POOL is not None
async with _POOL.acquire() as conn:
sess = await get_open_session(site_id, charger_code, conn)
if sess is None:
await interaction.followup.send(
"Session už není otevřená (auto odpojeno?).", ephemeral=True
)
return
patch = action_to_patch(
action,
now=datetime.now(timezone.utc),
soc_at_connect=sess["soc_at_connect_pct"],
default_deadline_hour=sess["default_deadline_hour"],
)
await conn.fetchval(
"select ems.fn_ev_session_apply_patch($1::int, $2::int, $3::jsonb)",
site_id,
int(sess["session_id"]),
json.dumps(patch),
)
await run_rolling_replan(
site_id, conn, triggered_by=f"discord:{action}:{charger_code}"
)
await export_setpoints(site_id, conn)
new_text = await build_ev_plan_summary(site_id, charger_code, conn)
label = dict(ACTION_LABELS).get(action, action)
if new_text:
await interaction.message.edit(
content=new_text + f"\n_(upraveno: {label})_", view=interaction.message.components and _rebuild_view(site_id, charger_code) or None
)
await interaction.followup.send(f"Přeplánováno ✓ ({label})", ephemeral=True)
def _rebuild_view(site_id: int, charger_code: str):
import discord
view = discord.ui.View(timeout=None)
for action, label in ACTION_LABELS:
view.add_item(
discord.ui.Button(
label=label,
style=discord.ButtonStyle.secondary,
custom_id=f"ev:{site_id}:{charger_code}:{action}",
)
)
return view
async def run_discord_bot() -> None:
"""Lifespan task: připojí gateway a obsluhuje tlačítka. Bez tokenu hned končí."""
token = (getattr(get_settings(), "discord_bot_token", "") or "").strip()
if not token:
logger.info("Discord bot: token není nastaven — fáze B vypnuta")
return
import discord
intents = discord.Intents.default()
client = discord.Client(intents=intents)
@client.event
async def on_ready() -> None:
logger.info("Discord bot připojen jako %s", client.user)
@client.event
async def on_interaction(interaction: discord.Interaction) -> None:
if interaction.type != discord.InteractionType.component:
return
cid = (interaction.data or {}).get("custom_id", "")
parsed = parse_custom_id(str(cid))
if parsed is None:
return
allowed = _allowed_user_ids()
if allowed and interaction.user.id not in allowed:
await interaction.response.send_message(
"Tohle tlačítko není pro tebe. 🙂", ephemeral=True
)
return
await interaction.response.defer(ephemeral=True, thinking=True)
site_id, charger_code, action = parsed
try:
await _handle_action(interaction, site_id, charger_code, action)
except Exception:
logger.exception("Discord akce selhala (%s)", cid)
try:
await interaction.followup.send(
"Akce selhala — mrkni do logů.", ephemeral=True
)
except Exception:
pass
global _CLIENT
_CLIENT = client
try:
await client.start(token)
except asyncio.CancelledError:
await client.close()
raise
except Exception:
logger.exception("Discord bot spadl — fáze B mimo provoz (fallback webhook)")
finally:
_CLIENT = None

View File

@@ -0,0 +1,115 @@
"""Souhrn EV nabíjecího plánu pro notifikace (Discord webhook i bot).
Sdílené mezi telemetry_collector (zpráva po příjezdu) a discord_bot
(přestavba zprávy po akci tlačítkem).
"""
from __future__ import annotations
import logging
from zoneinfo import ZoneInfo
import asyncpg
logger = logging.getLogger(__name__)
_PRAGUE = ZoneInfo("Europe/Prague")
async def get_open_session(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> asyncpg.Record | None:
return await conn.fetchrow(
"""
select es.id as session_id, es.soc_at_connect_pct, es.target_soc_pct,
es.target_deadline, v.battery_capacity_kwh, v.name as vehicle_name,
v.default_deadline_hour
from ems.ev_session es
join ems.asset_ev_charger c on c.id = es.charger_id
left join ems.asset_vehicle v on v.id = es.vehicle_id
where es.site_id = $1 and c.code = $2 and es.session_end is null
order by es.id desc limit 1
""",
site_id,
charger_code,
)
async def build_ev_plan_summary(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> str | None:
"""Markdown souhrn: stav baterie auta → cíl, deadline, nabíjecí okna z plánu."""
row = await get_open_session(site_id, charger_code, conn)
if row is None:
return None
ev_col = "ev1_setpoint_w" if charger_code.endswith("1") else "ev2_setpoint_w"
slots = await conn.fetch(
f"""
select pi.interval_start, pi.{ev_col} as w, pi.effective_buy_price
from ems.planning_interval pi
join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1 and pr.status = 'active'
and coalesce(pi.{ev_col}, 0) > 0
order by pi.interval_start
""",
site_id,
)
def _fmt(dt) -> str:
return dt.astimezone(_PRAGUE).strftime("%H:%M")
windows: list[str] = []
kwh = 0.0
prices: list[float] = []
if slots:
start = prev = slots[0]["interval_start"]
for r in slots:
ts = r["interval_start"]
if (ts - prev).total_seconds() > 900:
windows.append(f"{_fmt(start)}{_fmt(prev)} (+15m)")
start = ts
prev = ts
kwh += float(r["w"]) * 0.25 / 1000.0
prices.append(float(r["effective_buy_price"] or 0))
windows.append(f"{_fmt(start)}{_fmt(prev)} (+15m)")
soc = row["soc_at_connect_pct"]
tgt = row["target_soc_pct"]
cap = float(row["battery_capacity_kwh"] or 0)
need = max(0.0, (float(tgt or 0) - float(soc or 0)) / 100.0 * cap)
lines = [
f"🔌 **{row['vehicle_name'] or charger_code} připojeno**",
f"Baterie auta: **{soc if soc is not None else '?'} %** → cíl {tgt if tgt is not None else '?'} %"
+ (f" (~{need:.0f} kWh)" if need else ""),
]
dl = row["target_deadline"]
if dl is not None:
lines.append(f"Deadline: {dl.astimezone(_PRAGUE).strftime('%a %d.%m. %H:%M')}")
if windows:
avg_p = sum(prices) / max(1, len(prices))
lines.append(
f"Plán nabíjení: {'; '.join(windows[:4])}{kwh:.1f} kWh, ø {avg_p:.2f} Kč/kWh"
)
else:
lines.append("Plán nabíjení: zatím žádné sloty (čeká na levné okno / PV)")
return "\n".join(lines)
async def send_ev_arrival(site_id: int, charger_code: str, conn: asyncpg.Connection) -> None:
"""Pošle souhrn po příjezdu: přednostně bot s tlačítky, jinak webhook."""
from services.notification_service import send_discord
text = await build_ev_plan_summary(site_id, charger_code, conn)
if text is None:
return
try:
from services.discord_bot import post_ev_arrival
row = await get_open_session(site_id, charger_code, conn)
if row is not None and await post_ev_arrival(
site_id, charger_code, int(row["session_id"]), text
):
return
except Exception:
logger.exception("Discord bot post failed — fallback webhook")
await send_discord(conn, site_id, text, level="info")

View File

@@ -49,7 +49,9 @@ async def _on_ev_arrival(site_id: int, charger_code: str) -> None:
)
await export_setpoints(site_id, conn)
try:
await _notify_ev_arrival_plan(site_id, charger_code, conn)
from services.ev_notify import send_ev_arrival
await send_ev_arrival(site_id, charger_code, conn)
except Exception:
logger.exception("EV arrival Discord notify failed (%s)", charger_code)
logger.info(
@@ -106,87 +108,6 @@ async def _on_ev_departure(site_id: int, charger_code: str) -> None:
)
async def _notify_ev_arrival_plan(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:
"""Discord souhrn po příjezdu EV: stav baterie auta + kdy se bude nabíjet.
Čte čerstvý aktivní plán (ev sloty s výkonem > 0) a otevřenou session;
sloty shlukuje do souvislých oken. Fáze B (interakce „odjíždím za 2h"
tlačítkem) = Discord bot, viz docs/discord-ev-interaction.md.
"""
from services.notification_service import send_discord
row = await conn.fetchrow(
"""
select es.soc_at_connect_pct, es.target_soc_pct, es.target_deadline,
v.battery_capacity_kwh, v.name as vehicle_name
from ems.ev_session es
join ems.asset_ev_charger c on c.id = es.charger_id
left join ems.asset_vehicle v on v.id = es.vehicle_id
where es.site_id = $1 and c.code = $2 and es.session_end is null
order by es.id desc limit 1
""",
site_id,
charger_code,
)
if row is None:
return
ev_col = "ev1_setpoint_w" if charger_code.endswith("1") else "ev2_setpoint_w"
slots = await conn.fetch(
f"""
select pi.interval_start, pi.{ev_col} as w, pi.effective_buy_price
from ems.planning_interval pi
join ems.planning_run pr on pr.id = pi.run_id
where pr.site_id = $1 and pr.status = 'active'
and coalesce(pi.{ev_col}, 0) > 0
order by pi.interval_start
""",
site_id,
)
def _fmt(dt) -> str:
return dt.astimezone(_PRAGUE_TZ_NOTIFY).strftime("%H:%M")
windows: list[str] = []
if slots:
start = prev = slots[0]["interval_start"]
kwh = 0.0
prices: list[float] = []
for r in slots:
ts = r["interval_start"]
if (ts - prev).total_seconds() > 900:
windows.append(f"{_fmt(start)}{_fmt(prev)} (+15m)")
start = ts
prev = ts
kwh += float(r["w"]) * 0.25 / 1000.0
prices.append(float(r["effective_buy_price"] or 0))
windows.append(f"{_fmt(start)}{_fmt(prev)} (+15m)")
avg_p = sum(prices) / max(1, len(prices))
else:
kwh, avg_p = 0.0, 0.0
soc = row["soc_at_connect_pct"]
tgt = row["target_soc_pct"]
cap = float(row["battery_capacity_kwh"] or 0)
need = max(0.0, (float(tgt or 0) - float(soc or 0)) / 100.0 * cap)
dl = row["target_deadline"]
lines = [
f"🔌 **{row['vehicle_name'] or charger_code} připojeno**",
f"Baterie auta: **{soc or '?'} %** → cíl {tgt or '?'} %"
+ (f" (~{need:.0f} kWh)" if need else ""),
]
if dl is not None:
lines.append(f"Deadline: {dl.astimezone(_PRAGUE_TZ_NOTIFY).strftime('%a %d.%m. %H:%M')}")
if windows:
lines.append(
f"Plán nabíjení: {'; '.join(windows[:4])}{kwh:.1f} kWh, ø {avg_p:.2f} Kč/kWh"
)
else:
lines.append("Plán nabíjení: zatím žádné sloty (čeká na levné okno / PV)")
await send_discord(conn, site_id, "\n".join(lines), level="info")
async def _patch_session_from_tesla(
site_id: int, charger_code: str, conn: asyncpg.Connection
) -> None:

View File

@@ -0,0 +1,57 @@
"""Discord bot — čisté helpery (custom_id, patch akcí), bez sítě/discord lib."""
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from services.discord_bot import action_to_patch, parse_custom_id
_NOW = datetime(2026, 6, 12, 10, 0, tzinfo=timezone.utc) # 12:00 Prague
class ParseCustomIdTests(unittest.TestCase):
def test_valid(self) -> None:
self.assertEqual(
parse_custom_id("ev:2:ev-charger-1:h2"), (2, "ev-charger-1", "h2")
)
def test_invalid(self) -> None:
for bad in ("", "ev:2:x:jump", "foo:1:c:h2", "ev:abc:c:h2"):
self.assertIsNone(parse_custom_id(bad))
class ActionPatchTests(unittest.TestCase):
def _patch(self, action: str, **kw):
return action_to_patch(
action,
now=_NOW,
soc_at_connect=kw.get("soc", 55.0),
default_deadline_hour=kw.get("hour", 7),
)
def test_h2_deadline(self) -> None:
p = self._patch("h2")
self.assertIn("2026-06-12T12:00", p["target_deadline"])
def test_morning_next_occurrence(self) -> None:
p = self._patch("morning", hour=7)
# 12:00 Prague > 7:00 → zítra 7:00 Prague
self.assertIn("2026-06-13T07:00", p["target_deadline"])
def test_morning_today_if_before(self) -> None:
early = datetime(2026, 6, 12, 2, 0, tzinfo=timezone.utc) # 4:00 Prague
p = action_to_patch("morning", now=early, soc_at_connect=50, default_deadline_hour=7)
self.assertIn("2026-06-12T07:00", p["target_deadline"])
def test_full(self) -> None:
p = self._patch("full")
self.assertEqual(p["target_soc_pct"], 100)
def test_stop_targets_connect_soc(self) -> None:
p = self._patch("stop", soc=42.5)
self.assertEqual(p["target_soc_pct"], 42.5)
if __name__ == "__main__":
unittest.main()

View File

@@ -15,7 +15,7 @@ Plán nabíjení: 11:3013:45; 02:1504:30 — 34.2 kWh, ø 1.85 Kč/kWh
Implementace: `_notify_ev_arrival_plan` v `telemetry_collector.py` (sloty
`ev*_setpoint_w > 0` z aktivního plánu shlukované do oken).
## Fáze B — zpětná vazba tlačítkem („odjíždím za 2 h")
## Fáze B — zpětná vazba tlačítkem — ✅ IMPLEMENTOVÁNO (2026-06-12)
**Architektura: Discord BOT přes gateway** — spojení jde Z backendu VEN
(websocket), žádný veřejný endpoint do EMS (na rozdíl od interactions
@@ -34,11 +34,15 @@ Bezpečnost: bot reaguje jen na whitelisted user ID (majitel), akce omezené
na patch session + replan (žádné režimy/registry). Tlačítka expirují
s koncem session.
**Co je potřeba od uživatele:** vytvořit Discord aplikaci + bota
(discord.com/developers → New Application → Bot → token), pozvat na server
(scope `bot`, oprávnění Send Messages + Read History), token jako
`DISCORD_BOT_TOKEN` do `.env`. Pak implementuju `services/discord_bot.py`
(lifespan task vedle telemetry smyčky).
**Implementace:** `services/discord_bot.py` (lifespan task; discord.py
gateway), `services/ev_notify.py` (sdílený souhrn plánu; bot-first, webhook
fallback). custom_id `ev:<site>:<charger>:<akce>` — tlačítka přežijí restart.
Env: `DISCORD_BOT_TOKEN`, `DISCORD_EV_CHANNEL_ID`, `DISCORD_ALLOWED_USER_IDS`
(čárkami; prázdné = bot vypnut, jede fáze A webhook). Akce: h2/h4 (deadline
teď+N), morning (další default_deadline_hour vozidla, Prague), full (100 % +
deadline za 1 h → max tempo), stop (target = SoC při připojení). Po akci:
patch session → okamžitý replan + export → bot zedituje zprávu novým plánem.
Testy: tests/test_discord_bot.py (parse, patch akcí).
## Výhled (fáze C)
Stejný bot = kanál pro ranní triáž s dotazy („proč jsi v 19:00 nabíjel?" →