From 55ccf0662772e8cfc42967b837d6463027120895 Mon Sep 17 00:00:00 2001 From: Dusan Vojacek Date: Sat, 2 May 2026 19:42:58 +0200 Subject: [PATCH] refactor control repository access --- backend/services/control/exporter_monolith.py | 208 +---------------- backend/services/control/repository.py | 215 ++++++++++++++++++ 2 files changed, 222 insertions(+), 201 deletions(-) create mode 100644 backend/services/control/repository.py diff --git a/backend/services/control/exporter_monolith.py b/backend/services/control/exporter_monolith.py index de0316e..41ed1c4 100644 --- a/backend/services/control/exporter_monolith.py +++ b/backend/services/control/exporter_monolith.py @@ -21,7 +21,6 @@ from services.control.deye_helpers import ( DEYE_CLOCK_RESYNC_INTERVAL_HOURS, DEYE_CLOCK_VERIFY_MAX_DELTA_SEC, # noqa: F401 - re-export for compatibility DEYE_CRITICAL_REGS_SELF_SUSTAIN, # noqa: F401 - re-export for compatibility - DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A, DEYE_REGISTER_NAMES, DEYE_TOU_INACTIVE_HHMM, DEYE_TOU_POWER_REGS, @@ -50,6 +49,13 @@ from services.control.deye_helpers import ( watts_to_amps, ) from services.control.models import ControlSetpoints, InverterConfig, OperatingModeInfo +from services.control.repository import ( + _fetch_max_charge_power_w, + _fetch_operating_mode, + _fetch_plan_row_for_slot_offset, + _get_current_soc, + _load_inverter_config, +) from services.control.setpoints import ( _DictRecord, _apply_price_failsafe_guard, @@ -760,206 +766,6 @@ async def verify_modbus_commands( return all_ok -async def _fetch_operating_mode(site_id: int, db: asyncpg.Connection) -> OperatingModeInfo | None: - sql = """ - SELECT som.mode_code, omd.battery_mode, omd.grid_mode, - omd.ev_enabled, omd.heat_pump_enabled, omd.loxone_mode_value, - som.valid_until - FROM ems.site_operating_mode som - JOIN ems.operating_mode_def omd ON omd.code = som.mode_code - WHERE som.site_id = $1 - """ - row = await db.fetchrow(sql, site_id) - if row is None: - return None - vu = row["valid_until"] - if vu is not None: - now_utc = datetime.now(timezone.utc) - if vu.tzinfo is None: - vu = vu.replace(tzinfo=timezone.utc) - if vu <= now_utc: - exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()") - from services.notification_service import notify_operating_mode_changed - - for er in exp_rows: - await notify_operating_mode_changed( - str(er["site_code"]), - str(er["old_mode"]), - str(er["new_mode"]), - "system:expiry", - "Automatické vypršení dočasného režimu", - ) - row = await db.fetchrow(sql, site_id) - if row is None: - return None - return OperatingModeInfo( - mode_code=row["mode_code"], - battery_mode=row["battery_mode"], - grid_mode=row["grid_mode"], - ev_enabled=bool(row["ev_enabled"]), - heat_pump_enabled_def=bool(row["heat_pump_enabled"]), - loxone_mode_value=int(row["loxone_mode_value"]), - ) - - -async def _get_current_soc(site_id: int, db: asyncpg.Connection) -> int: - soc = await db.fetchval( - """ - SELECT battery_soc_percent - FROM ems.telemetry_inverter - WHERE site_id = $1 AND battery_soc_percent IS NOT NULL - ORDER BY measured_at DESC - LIMIT 1 - """, - site_id, - ) - return int(soc) if soc is not None else 50 - - -async def _load_inverter_config( - site_id: int, db: asyncpg.Connection -) -> InverterConfig | None: - row = await db.fetchrow( - """ - SELECT - ai.id, ai.code, - coalesce(ems.fn_inverter_pv_a_max_w(ai.id), 0) AS pv_a_cap_w, - se.host, se.port, se.unit_id, - sgc.max_export_power_w, - sgc.max_import_power_w, - sgc.no_export, - ai.max_battery_charge_w, - ai.max_battery_discharge_w, - ab.min_soc_percent, - ab.reserve_soc_percent, - ab.max_soc_percent, - ab.usable_capacity_wh, - ai.deye_last_system_time_sync_minute, - ai.deye_last_system_time_sync_at, - ai.deye_last_tou_inactive_write_prague_date, - ai.deye_tou_inactive_signature, - COALESCE(ai.deye_zero_export_mode, 1) AS deye_zero_export_mode, - COALESCE(ai.deye_gen_microinverter_cutoff_enabled, false) AS deye_gen_microinverter_cutoff_enabled, - coalesce(ems.fn_site_has_active_green_bonus_pv(ai.site_id), false) - AS deye_reg340_pv_a_control_enabled, - COALESCE( - ai.deye_register_max_charge_a, - FLOOR( - LEAST( - COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w), - ai.max_battery_charge_w - )::numeric / 51.2 - )::int - ) AS max_charge_a, - COALESCE( - ai.deye_register_max_discharge_a, - FLOOR( - LEAST( - COALESCE(ab.bms_max_discharge_w, ai.max_battery_discharge_w), - ai.max_battery_discharge_w - )::numeric / 51.2 - )::int - ) AS max_discharge_a - FROM ems.asset_inverter ai - JOIN ems.site_endpoint se ON se.id = ai.endpoint_id - JOIN ems.asset_battery ab ON ab.inverter_id = ai.id - LEFT JOIN ems.site_grid_connection sgc ON sgc.site_id = ai.site_id - WHERE ai.site_id = $1 - AND ai.active = true - AND ai.controllable = true - AND se.enabled = true - AND se.endpoint_type = 'modbus_tcp' - ORDER BY ai.id - LIMIT 1 - """, - site_id, - ) - if row is None: - return None - mc = row["max_charge_a"] - md = row["max_discharge_a"] - max_charge_a = int(mc) if mc is not None else 0 - max_discharge_a = int(md) if md is not None else 0 - # Firmware Deye často drží max 350 A — vyšší hodnota z DB → mismatch 351 vs 350. - max_charge_a = min(max_charge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) - max_discharge_a = min(max_discharge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) - port = int(row["port"] or 502) - uid = int(row["unit_id"] if row["unit_id"] is not None else 1) - return InverterConfig( - id=int(row["id"]), - code=row["code"], - host=row["host"], - port=port, - unit_id=uid, - max_export_power_w=int(row["max_export_power_w"]) - if row["max_export_power_w"] is not None - else None, - max_import_power_w=int(row["max_import_power_w"]) - if row["max_import_power_w"] is not None - else None, - no_export=bool(row["no_export"] or False), - max_battery_charge_w=int(row["max_battery_charge_w"]) - if row["max_battery_charge_w"] is not None - else None, - max_battery_discharge_w=int(row["max_battery_discharge_w"]) - if row["max_battery_discharge_w"] is not None - else None, - min_soc_percent=int(round(float(row["min_soc_percent"]))) - if row["min_soc_percent"] is not None - else None, - reserve_soc_percent=int(row["reserve_soc_percent"]) - if row["reserve_soc_percent"] is not None - else None, - max_soc_percent=int(row["max_soc_percent"]) - if row["max_soc_percent"] is not None - else None, - usable_capacity_wh=int(row["usable_capacity_wh"]) - if row["usable_capacity_wh"] is not None - else None, - max_charge_a=max_charge_a, - max_discharge_a=max_discharge_a, - deye_last_system_time_sync_minute=row["deye_last_system_time_sync_minute"], - deye_last_system_time_sync_at=row["deye_last_system_time_sync_at"], - deye_last_tou_inactive_write_prague_date=row[ - "deye_last_tou_inactive_write_prague_date" - ], - deye_tou_inactive_signature=row["deye_tou_inactive_signature"], - deye_zero_export_mode=int(row["deye_zero_export_mode"]), - deye_gen_microinverter_cutoff_enabled=bool(row["deye_gen_microinverter_cutoff_enabled"] or False), - pv_a_cap_w=int(row["pv_a_cap_w"] or 0), - deye_reg340_pv_a_control_enabled=bool( - row["deye_reg340_pv_a_control_enabled"] or False - ), - ) - - -async def _fetch_plan_row_for_slot_offset( - site_id: int, db: asyncpg.Connection, slot_offset: int -) -> asyncpg.Record | None: - """Řádek plánu pro slot z ems.fn_planning_interval_at_offset (jsonb → Record-like dict).""" - raw = await db.fetchval( - """ - select ems.fn_planning_interval_at_offset($1::int, $2::int) - """, - site_id, - slot_offset, - ) - if raw is None: - return None - data = raw if isinstance(raw, dict) else json.loads(raw) - if not data: - return None - return _DictRecord(data) - - -async def _fetch_max_charge_power_w(site_id: int, db: asyncpg.Connection) -> int: - v = await db.fetchval( - "select ems.fn_planning_max_effective_charge_w($1::int)", - site_id, - ) - return int(v or 0) - - async def write_inverter_setpoints( site_id: int, setpoints_now: ControlSetpoints, diff --git a/backend/services/control/repository.py b/backend/services/control/repository.py new file mode 100644 index 0000000..087404a --- /dev/null +++ b/backend/services/control/repository.py @@ -0,0 +1,215 @@ +"""DB načítání pro control export.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone + +import asyncpg + +from services.control.deye_helpers import DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A +from services.control.models import InverterConfig, OperatingModeInfo +from services.control.setpoints import _DictRecord + + +async def _fetch_operating_mode( + site_id: int, db: asyncpg.Connection +) -> OperatingModeInfo | None: + sql = """ + SELECT som.mode_code, omd.battery_mode, omd.grid_mode, + omd.ev_enabled, omd.heat_pump_enabled, omd.loxone_mode_value, + som.valid_until + FROM ems.site_operating_mode som + JOIN ems.operating_mode_def omd ON omd.code = som.mode_code + WHERE som.site_id = $1 + """ + row = await db.fetchrow(sql, site_id) + if row is None: + return None + vu = row["valid_until"] + if vu is not None: + now_utc = datetime.now(timezone.utc) + if vu.tzinfo is None: + vu = vu.replace(tzinfo=timezone.utc) + if vu <= now_utc: + exp_rows = await db.fetch("SELECT * FROM ems.fn_expire_modes()") + from services.notification_service import notify_operating_mode_changed + + for er in exp_rows: + await notify_operating_mode_changed( + str(er["site_code"]), + str(er["old_mode"]), + str(er["new_mode"]), + "system:expiry", + "Automatické vypršení dočasného režimu", + ) + row = await db.fetchrow(sql, site_id) + if row is None: + return None + return OperatingModeInfo( + mode_code=row["mode_code"], + battery_mode=row["battery_mode"], + grid_mode=row["grid_mode"], + ev_enabled=bool(row["ev_enabled"]), + heat_pump_enabled_def=bool(row["heat_pump_enabled"]), + loxone_mode_value=int(row["loxone_mode_value"]), + ) + + +async def _get_current_soc(site_id: int, db: asyncpg.Connection) -> int: + soc = await db.fetchval( + """ + SELECT battery_soc_percent + FROM ems.telemetry_inverter + WHERE site_id = $1 AND battery_soc_percent IS NOT NULL + ORDER BY measured_at DESC + LIMIT 1 + """, + site_id, + ) + return int(soc) if soc is not None else 50 + + +async def _load_inverter_config( + site_id: int, db: asyncpg.Connection +) -> InverterConfig | None: + row = await db.fetchrow( + """ + SELECT + ai.id, ai.code, + coalesce(ems.fn_inverter_pv_a_max_w(ai.id), 0) AS pv_a_cap_w, + se.host, se.port, se.unit_id, + sgc.max_export_power_w, + sgc.max_import_power_w, + sgc.no_export, + ai.max_battery_charge_w, + ai.max_battery_discharge_w, + ab.min_soc_percent, + ab.reserve_soc_percent, + ab.max_soc_percent, + ab.usable_capacity_wh, + ai.deye_last_system_time_sync_minute, + ai.deye_last_system_time_sync_at, + ai.deye_last_tou_inactive_write_prague_date, + ai.deye_tou_inactive_signature, + COALESCE(ai.deye_zero_export_mode, 1) AS deye_zero_export_mode, + COALESCE(ai.deye_gen_microinverter_cutoff_enabled, false) AS deye_gen_microinverter_cutoff_enabled, + coalesce(ems.fn_site_has_active_green_bonus_pv(ai.site_id), false) + AS deye_reg340_pv_a_control_enabled, + COALESCE( + ai.deye_register_max_charge_a, + FLOOR( + LEAST( + COALESCE(ab.bms_max_charge_w, ai.max_battery_charge_w), + ai.max_battery_charge_w + )::numeric / 51.2 + )::int + ) AS max_charge_a, + COALESCE( + ai.deye_register_max_discharge_a, + FLOOR( + LEAST( + COALESCE(ab.bms_max_discharge_w, ai.max_battery_discharge_w), + ai.max_battery_discharge_w + )::numeric / 51.2 + )::int + ) AS max_discharge_a + FROM ems.asset_inverter ai + JOIN ems.site_endpoint se ON se.id = ai.endpoint_id + JOIN ems.asset_battery ab ON ab.inverter_id = ai.id + LEFT JOIN ems.site_grid_connection sgc ON sgc.site_id = ai.site_id + WHERE ai.site_id = $1 + AND ai.active = true + AND ai.controllable = true + AND se.enabled = true + AND se.endpoint_type = 'modbus_tcp' + ORDER BY ai.id + LIMIT 1 + """, + site_id, + ) + if row is None: + return None + mc = row["max_charge_a"] + md = row["max_discharge_a"] + max_charge_a = int(mc) if mc is not None else 0 + max_discharge_a = int(md) if md is not None else 0 + max_charge_a = min(max_charge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) + max_discharge_a = min(max_discharge_a, DEYE_LV_BATTERY_MAX_CHARGE_DISCHARGE_A) + port = int(row["port"] or 502) + uid = int(row["unit_id"] if row["unit_id"] is not None else 1) + return InverterConfig( + id=int(row["id"]), + code=row["code"], + host=row["host"], + port=port, + unit_id=uid, + max_export_power_w=int(row["max_export_power_w"]) + if row["max_export_power_w"] is not None + else None, + max_import_power_w=int(row["max_import_power_w"]) + if row["max_import_power_w"] is not None + else None, + no_export=bool(row["no_export"] or False), + max_battery_charge_w=int(row["max_battery_charge_w"]) + if row["max_battery_charge_w"] is not None + else None, + max_battery_discharge_w=int(row["max_battery_discharge_w"]) + if row["max_battery_discharge_w"] is not None + else None, + min_soc_percent=int(round(float(row["min_soc_percent"]))) + if row["min_soc_percent"] is not None + else None, + reserve_soc_percent=int(row["reserve_soc_percent"]) + if row["reserve_soc_percent"] is not None + else None, + max_soc_percent=int(row["max_soc_percent"]) + if row["max_soc_percent"] is not None + else None, + usable_capacity_wh=int(row["usable_capacity_wh"]) + if row["usable_capacity_wh"] is not None + else None, + max_charge_a=max_charge_a, + max_discharge_a=max_discharge_a, + deye_last_system_time_sync_minute=row["deye_last_system_time_sync_minute"], + deye_last_system_time_sync_at=row["deye_last_system_time_sync_at"], + deye_last_tou_inactive_write_prague_date=row[ + "deye_last_tou_inactive_write_prague_date" + ], + deye_tou_inactive_signature=row["deye_tou_inactive_signature"], + deye_zero_export_mode=int(row["deye_zero_export_mode"]), + deye_gen_microinverter_cutoff_enabled=bool( + row["deye_gen_microinverter_cutoff_enabled"] or False + ), + pv_a_cap_w=int(row["pv_a_cap_w"] or 0), + deye_reg340_pv_a_control_enabled=bool( + row["deye_reg340_pv_a_control_enabled"] or False + ), + ) + + +async def _fetch_plan_row_for_slot_offset( + site_id: int, db: asyncpg.Connection, slot_offset: int +) -> asyncpg.Record | None: + """Řádek plánu pro slot z ems.fn_planning_interval_at_offset (jsonb -> Record-like dict).""" + raw = await db.fetchval( + """ + select ems.fn_planning_interval_at_offset($1::int, $2::int) + """, + site_id, + slot_offset, + ) + if raw is None: + return None + data = raw if isinstance(raw, dict) else json.loads(raw) + if not data: + return None + return _DictRecord(data) + + +async def _fetch_max_charge_power_w(site_id: int, db: asyncpg.Connection) -> int: + v = await db.fetchval( + "select ems.fn_planning_max_effective_charge_w($1::int)", + site_id, + ) + return int(v or 0)