implementace load first
Some checks failed
CI and deploy / migration-check (push) Failing after 12s
CI and deploy / deploy (push) Has been skipped

This commit is contained in:
Dusan Vojacek
2026-05-21 17:29:09 +02:00
parent e295e55770
commit 7c63fed296
4 changed files with 126 additions and 18 deletions

View File

@@ -40,6 +40,8 @@ SOLVER_TIME_LIMIT = 10 # sekund
GE_MIN_EXPORT_W = 1.0
# Dvouprůchodové solve: stop když acquisition z pass1 vs pass2 se liší méně než (Kč/kWh).
ACQUISITION_TWO_PASS_EPS_KWH = 0.05
# Load-first (Deye): PV nejdřív pokryje load+EV+TČ; bc_pv/ge_pv jen z pv_sp (přebytek).
LOAD_FIRST_INCENTIVE_CZK_KWH = 0.05
# Dokud je kotva pro hluboký dump (první sell < 0 v horizontu, jinak první extrémní buy) dál než
# tento počet 15min slotů, držíme plánovací spodek na rezervě (arb_base_wh) místo planner floor —
# priorita: beze „ztráty na prodeji“ (sell >= 0) držet buffer, hluboký vývoz až těsně před záporným prodejem.
@@ -799,8 +801,11 @@ def solve_dispatch(
ge = [pulp.LpVariable(f"ge_{t}", 0, grid.max_export_power_w) for t in range(T)]
ge_pv = [pulp.LpVariable(f"ge_pv_{t}", 0, grid.max_export_power_w) for t in range(T)]
ge_bat = [pulp.LpVariable(f"ge_bat_{t}", 0, grid.max_export_power_w) for t in range(T)]
bc = [pulp.LpVariable(f"bc_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bc_pv = [pulp.LpVariable(f"bc_pv_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bc_gi = [pulp.LpVariable(f"bc_gi_{t}", 0, battery.max_charge_power_w) for t in range(T)]
bd = [pulp.LpVariable(f"bd_{t}", 0, battery.max_discharge_power_w) for t in range(T)]
pv_ld = [pulp.LpVariable(f"pv_ld_{t}", 0) for t in range(T)]
pv_sp = [pulp.LpVariable(f"pv_sp_{t}", 0) for t in range(T)]
soc = [
pulp.LpVariable(f"soc_{t}", soc_panel_min[t], battery.soc_max_wh) for t in range(T)
]
@@ -950,7 +955,12 @@ def solve_dispatch(
else 0
)
+ gi_over[t] * IMPORT_OVER_BREAKER_PENALTY_CZK_KWH * INTERVAL_H / 1000
+ 0.5 * (bc[t] + bd[t]) * degradation_cost_effective * INTERVAL_H / 1000
+ 0.5 * (bc_pv[t] + bc_gi[t] + bd[t]) * degradation_cost_effective * INTERVAL_H / 1000
- (
pv_ld[t] * LOAD_FIRST_INCENTIVE_CZK_KWH * INTERVAL_H / 1000
if om == "AUTO"
else 0
)
+ (
ge_bat[t] * charge_acquisition_czk_kwh * INTERVAL_H / 1000
if om == "AUTO" and t in discharge_export_slots
@@ -1006,13 +1016,42 @@ def solve_dispatch(
if z_gen_cutoff is not None
else float(s.pv_b_forecast_w)
)
prob += (
pv_a_net + pv_b_effective + gi[t] + bd[t]
== s.load_baseline_w + ev_total_t + hp[t] + bc[t] + ge[t]
)
pv_total_ub = float(s.pv_a_forecast_w) + float(s.pv_b_forecast_w)
if om == "AUTO":
load_site_expr = float(s.load_baseline_w) + ev_total_t + hp[t]
prob += pv_ld[t] + pv_sp[t] == pv_a_net + pv_b_effective
prob += pv_ld[t] <= load_site_expr
prob += pv_ld[t] <= pv_a_net + pv_b_effective
prob += pv_sp[t] <= pv_total_ub
prob += pv_sp[t] >= pv_a_net + pv_b_effective - load_site_expr
prob += bc_pv[t] <= pv_sp[t]
prob += bc_gi[t] <= gi[t]
prob += ge_pv[t] <= pv_sp[t]
prob += bc_pv[t] + ge_pv[t] <= pv_sp[t]
# Import na deficit po PV→load, nebo na grid-nabíjení (bc_gi).
prob += gi[t] <= load_site_expr + bc_gi[t]
# Plná bilance (pv_ld+pv_sp rozpad je ortogonální k tokům přebytku).
prob += (
pv_a_net + pv_b_effective + gi[t] + bd[t]
== float(s.load_baseline_w) + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t]
)
else:
prob += pv_ld[t] == 0
prob += pv_sp[t] == pv_a_net + pv_b_effective
prob += bc_pv[t] <= pv_sp[t]
prob += bc_gi[t] <= gi[t]
prob += (
pv_a_net + pv_b_effective + gi[t] + bd[t]
== s.load_baseline_w + ev_total_t + hp[t] + bc_pv[t] + bc_gi[t] + ge[t]
)
prob += ge[t] == ge_pv[t] + ge_bat[t]
# Baterie nesmí „přestrojit“ FVE export: přebytek nad PV musí jít přes ge_bat.
prob += ge_bat[t] >= ge[t] - (pv_a_net + pv_b_effective)
# Baterie nesmí „přestrojit“ FVE export: jen z pv_sp (po load-first).
if om == "AUTO":
prob += ge_bat[t] >= ge[t] - pv_sp[t]
else:
prob += ge_bat[t] >= ge[t] - (pv_a_net + pv_b_effective)
# Měkký breaker cap: gi_over[t] >= max(0, gi[t] - breaker).
prob += gi_over[t] >= gi[t] - float(grid.max_import_power_w)
@@ -1021,8 +1060,8 @@ def solve_dispatch(
soc_prev = current_soc_wh if t == 0 else soc[t - 1]
prob += soc[t] == (
soc_prev
+ bc[t] * battery.charge_efficiency * INTERVAL_H
- bd[t] / battery.discharge_efficiency * INTERVAL_H
+ (bc_pv[t] + bc_gi[t]) * battery.charge_efficiency * INTERVAL_H
- bd[t] / battery.discharge_efficiency * INTERVAL_H
)
sv = safety_vars[t]
@@ -1097,7 +1136,8 @@ def solve_dispatch(
s.load_baseline_w
+ ev_total_t
+ hp[t]
+ bc[t]
+ bc_pv[t]
+ bc_gi[t]
)
else:
prob += soc_prev_expr >= (
@@ -1107,7 +1147,8 @@ def solve_dispatch(
s.load_baseline_w
+ ev_total_t
+ hp[t]
+ bc[t]
+ bc_pv[t]
+ bc_gi[t]
+ battery.max_discharge_power_w * w_arb[t]
)
@@ -1148,14 +1189,15 @@ def solve_dispatch(
prob += ev_direct[e][t] + ev_via_bat[e][t] <= vehicles[e].max_charge_power_w
for tt, cv, prev in commit_lp:
prob += cv >= prev - bc[tt]
prob += cv >= prev - (bc_pv[tt] + bc_gi[tt])
if om == "SELF_SUSTAIN":
for t in range(T):
prob += gi[t] <= slots[t].load_baseline_w
elif om == "PRESERVE":
for t in range(T):
prob += bc[t] == 0
prob += bc_pv[t] == 0
prob += bc_gi[t] == 0
prob += bd[t] == 0
elif om == "CHARGE_CHEAP":
for t in range(T):
@@ -1176,10 +1218,11 @@ def solve_dispatch(
- int(s.load_baseline_w),
)
# Mimo grid-charge masku smí nabíjet jen z PV přebytku (ne import ze sítě).
prob += bc_gi[t] == 0
if pv_surplus_w <= 0:
prob += bc[t] == 0
prob += bc_pv[t] == 0
else:
prob += bc[t] <= pv_surplus_w
prob += bc_pv[t] <= float(pv_surplus_w)
if t not in discharge_export_slots:
prob += ge_bat[t] == 0
prob += z_export[t] == 0
@@ -1282,7 +1325,8 @@ def solve_dispatch(
for t in range(T):
hp_raw = pulp.value(hp[t])
hp_on = hp_raw > heat_pump.rated_heating_power_w * 0.3
batt_w = round(pulp.value(bc[t]) - pulp.value(bd[t]))
bc_tot = float(pulp.value(bc_pv[t]) or 0) + float(pulp.value(bc_gi[t]) or 0)
batt_w = round(bc_tot - float(pulp.value(bd[t]) or 0))
grid_w = round(pulp.value(gi[t]) - pulp.value(ge[t]))
soc_pct = round(pulp.value(soc[t]) / battery.usable_capacity_wh * 100, 1)
export_limit_w = int(grid.max_export_power_w) if grid_w < 0 else 0