speedup srovnani
This commit is contained in:
16
db/migration/V070__forecast_accuracy_delta_profile_index.sql
Normal file
16
db/migration/V070__forecast_accuracy_delta_profile_index.sql
Normal file
@@ -0,0 +1,16 @@
|
||||
-- Zrychlení fn_pv_forecast_delta_profile (volá ho pv-slots-corrected): range scan site + interval_start
|
||||
-- s podmínkami učení bez sekvenčního full scanu větší historie.
|
||||
|
||||
create index if not exists idx_forecast_accuracy_site_interval_delta_profile
|
||||
on ems.forecast_accuracy (
|
||||
site_id,
|
||||
interval_start desc,
|
||||
pv_array_id,
|
||||
forecast_created_at desc
|
||||
)
|
||||
where actual_power_w is not null
|
||||
and coalesce(learning_eligible, true) = true
|
||||
and forecast_created_at <= interval_start;
|
||||
|
||||
comment on index ems.idx_forecast_accuracy_site_interval_delta_profile is
|
||||
'Partial index pro výběr posledního forecast runu na slot (DISTINCT ON interval_start, pv_array_id) v delta profilu.';
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Plán „nejnovější run na slot“ často sahá po forecast_pv_interval přes (run_id, interval).
|
||||
-- Druhý pořádek (pole → čas) pomáhá alternativním plánům při filtru pv_array_id + časové okno.
|
||||
|
||||
create index if not exists idx_forecast_pv_interval_pv_array_interval_start
|
||||
on ems.forecast_pv_interval (pv_array_id, interval_start desc);
|
||||
|
||||
comment on index ems.idx_forecast_pv_interval_pv_array_interval_start is
|
||||
'Podpora dotazů s filtrem na pv_array_id a rozsah interval_start (pv-slots, DISTINCT ON).';
|
||||
@@ -8,6 +8,7 @@ create or replace function ems.fn_forecast_pv_slots_range(
|
||||
returns jsonb
|
||||
language sql
|
||||
stable
|
||||
set work_mem = '64MB'
|
||||
as $fn$
|
||||
with bounds as (
|
||||
select
|
||||
@@ -35,16 +36,18 @@ as $fn$
|
||||
select distinct on (fpi.interval_start, fpr.pv_array_id)
|
||||
fpi.interval_start,
|
||||
fpi.power_w
|
||||
from ems.forecast_pv_interval fpi
|
||||
join ems.forecast_pv_run fpr on fpr.id = fpi.run_id
|
||||
join ems.asset_pv_array apa
|
||||
on apa.id = fpr.pv_array_id
|
||||
and apa.site_id = fpr.site_id
|
||||
cross join bounds b
|
||||
where fpr.site_id = p_site_id
|
||||
and fpr.status = 'ok'
|
||||
and fpi.interval_start >= b.ts_from
|
||||
from bounds b
|
||||
inner join ems.forecast_pv_interval fpi
|
||||
on fpi.interval_start >= b.ts_from
|
||||
and fpi.interval_start < b.ts_to
|
||||
and fpi.pv_array_id in (
|
||||
select apa.id from ems.asset_pv_array apa where apa.site_id = p_site_id
|
||||
)
|
||||
inner join ems.forecast_pv_run fpr
|
||||
on fpr.id = fpi.run_id
|
||||
and fpr.site_id = p_site_id
|
||||
and fpr.pv_array_id = fpi.pv_array_id
|
||||
and fpr.status = 'ok'
|
||||
order by fpi.interval_start, fpr.pv_array_id, fpr.created_at desc
|
||||
) u
|
||||
group by u.interval_start
|
||||
|
||||
@@ -4,9 +4,7 @@
|
||||
-- + součtový profil `deltas` pro starší klienty (součet delt přes pole).
|
||||
-- ============================================================
|
||||
|
||||
DROP FUNCTION IF EXISTS ems.fn_pv_forecast_delta_profile;
|
||||
|
||||
CREATE OR REPLACE FUNCTION ems.fn_pv_forecast_delta_profile(
|
||||
create or replace function ems.fn_pv_forecast_delta_profile(
|
||||
p_site_id int,
|
||||
p_data_from timestamptz,
|
||||
p_data_to timestamptz DEFAULT now(),
|
||||
@@ -19,6 +17,7 @@ CREATE OR REPLACE FUNCTION ems.fn_pv_forecast_delta_profile(
|
||||
RETURNS jsonb
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
SET work_mem = '64MB'
|
||||
AS $fn$
|
||||
WITH eff AS (
|
||||
SELECT
|
||||
@@ -49,24 +48,21 @@ AS $fn$
|
||||
greatest((SELECT threshold_w FROM eff), 0::numeric) AS threshold_w
|
||||
),
|
||||
best AS (
|
||||
SELECT
|
||||
select distinct on (fa.interval_start, fa.pv_array_id)
|
||||
fa.interval_start,
|
||||
fa.pv_array_id,
|
||||
fa.forecast_power_w,
|
||||
fa.actual_power_w,
|
||||
fa.forecast_created_at,
|
||||
row_number() OVER (
|
||||
PARTITION BY fa.interval_start, fa.pv_array_id
|
||||
ORDER BY fa.forecast_created_at DESC
|
||||
) AS rn
|
||||
FROM ems.forecast_accuracy fa
|
||||
CROSS JOIN bounds b
|
||||
WHERE fa.site_id = p_site_id
|
||||
AND fa.interval_start >= b.ts_from
|
||||
AND fa.interval_start < b.ts_to
|
||||
AND fa.actual_power_w IS NOT NULL
|
||||
AND fa.forecast_created_at <= fa.interval_start
|
||||
AND coalesce(fa.learning_eligible, true) IS TRUE
|
||||
fa.forecast_created_at
|
||||
from ems.forecast_accuracy fa
|
||||
cross join bounds b
|
||||
where fa.site_id = p_site_id
|
||||
and fa.interval_start >= b.ts_from
|
||||
and fa.interval_start < b.ts_to
|
||||
and fa.actual_power_w is not null
|
||||
and fa.forecast_created_at <= fa.interval_start
|
||||
and coalesce(fa.learning_eligible, true) is true
|
||||
order by fa.interval_start, fa.pv_array_id, fa.forecast_created_at desc
|
||||
),
|
||||
slots AS (
|
||||
SELECT
|
||||
@@ -82,7 +78,6 @@ AS $fn$
|
||||
extract(epoch FROM (now() - b.interval_start)) / 86400.0 AS age_days
|
||||
FROM best b
|
||||
CROSS JOIN tz
|
||||
WHERE b.rn = 1
|
||||
),
|
||||
slot_totals AS (
|
||||
SELECT
|
||||
@@ -109,8 +104,9 @@ AS $fn$
|
||||
st.*,
|
||||
lag(st.actual_total_w) OVER (PARTITION BY st.day_local ORDER BY st.interval_start) AS prev_actual_w
|
||||
FROM slot_totals st
|
||||
cross join bounds bthr
|
||||
WHERE st.slot_of_day BETWEEN 20 AND 80
|
||||
AND st.actual_total_w > (SELECT threshold_w FROM bounds)
|
||||
AND st.actual_total_w > bthr.threshold_w
|
||||
),
|
||||
day_jump AS (
|
||||
SELECT
|
||||
@@ -125,7 +121,8 @@ AS $fn$
|
||||
st.day_local,
|
||||
percentile_cont(0.5) WITHIN GROUP (ORDER BY st.actual_total_w) AS p50_actual_w
|
||||
FROM slot_totals st
|
||||
WHERE st.actual_total_w > (SELECT threshold_w FROM bounds)
|
||||
cross join bounds bthr
|
||||
WHERE st.actual_total_w > bthr.threshold_w
|
||||
GROUP BY st.day_local
|
||||
),
|
||||
day_stats AS (
|
||||
@@ -178,13 +175,13 @@ AS $fn$
|
||||
s.pv_array_id,
|
||||
s.slot_of_day,
|
||||
(s.forecast_w - s.actual_w) AS error_w,
|
||||
exp(-s.age_days / nullif((SELECT half_life_days FROM bounds), 0))
|
||||
exp(-s.age_days / nullif(b.half_life_days, 0))
|
||||
* (
|
||||
CASE
|
||||
WHEN (SELECT top_n_days FROM eff) IS NULL THEN 1::numeric
|
||||
WHEN (SELECT top_n_days FROM eff) < 1 THEN 1::numeric
|
||||
WHEN dr.rn <= (SELECT top_n_days FROM eff) THEN 1::numeric
|
||||
ELSE greatest(0::numeric, least(1::numeric, coalesce((SELECT non_top_day_factor FROM eff), 0.02)))
|
||||
WHEN e.top_n_days IS NULL THEN 1::numeric
|
||||
WHEN e.top_n_days < 1 THEN 1::numeric
|
||||
WHEN dr.rn <= e.top_n_days THEN 1::numeric
|
||||
ELSE greatest(0::numeric, least(1::numeric, coalesce(e.non_top_day_factor, 0.02)))
|
||||
END
|
||||
)
|
||||
* (
|
||||
@@ -195,12 +192,12 @@ AS $fn$
|
||||
0.0,
|
||||
least(1.0, coalesce(ds.w_energy, 0.35) * coalesce(ds.w_smooth, 0.35))
|
||||
),
|
||||
greatest(0.25, least(coalesce((SELECT day_weight_gamma FROM eff), 1.0), 8.0))
|
||||
greatest(0.25, least(coalesce(e.day_weight_gamma, 1.0), 8.0))
|
||||
)
|
||||
) AS w
|
||||
FROM slots s
|
||||
CROSS JOIN bounds b
|
||||
CROSS JOIN eff
|
||||
CROSS JOIN eff e
|
||||
JOIN slot_totals st ON st.interval_start = s.interval_start
|
||||
LEFT JOIN day_stats ds ON ds.day_local = s.day_local
|
||||
LEFT JOIN day_rank dr ON dr.day_local = s.day_local
|
||||
|
||||
@@ -1,156 +1,178 @@
|
||||
-- ============================================================
|
||||
-- PV forecast sloty (15min) + aditivně korigovaný forecast
|
||||
-- corrected = sum_i max(0, forecast_i - delta_profile_i[slot_of_day])
|
||||
-- Agregace korekce v jednom průchodu (žádný korelovaný subselect na slot_spine).
|
||||
-- ============================================================
|
||||
|
||||
DROP FUNCTION IF EXISTS ems.fn_forecast_pv_slots_range_corrected;
|
||||
|
||||
CREATE OR REPLACE FUNCTION ems.fn_forecast_pv_slots_range_corrected(
|
||||
create or replace function ems.fn_forecast_pv_slots_range_corrected(
|
||||
p_site_id int,
|
||||
p_from timestamptz,
|
||||
p_to timestamptz,
|
||||
p_delta_data_from timestamptz,
|
||||
p_delta_data_to timestamptz DEFAULT now(),
|
||||
p_half_life_days numeric DEFAULT 14,
|
||||
p_threshold_w int DEFAULT 150
|
||||
p_delta_data_to timestamptz default now(),
|
||||
p_half_life_days numeric default 14,
|
||||
p_threshold_w int default 150
|
||||
)
|
||||
RETURNS jsonb
|
||||
LANGUAGE sql
|
||||
STABLE
|
||||
AS $fn$
|
||||
WITH tz AS (
|
||||
SELECT coalesce(nullif(trim(s.timezone), ''), 'Europe/Prague') AS tz_name
|
||||
FROM ems.site s
|
||||
WHERE s.id = p_site_id
|
||||
returns jsonb
|
||||
language sql
|
||||
stable
|
||||
set work_mem = '64MB'
|
||||
as $fn$
|
||||
with tz as (
|
||||
select coalesce(nullif(trim(s.timezone), ''), 'Europe/Prague') as tz_name
|
||||
from ems.site s
|
||||
where s.id = p_site_id
|
||||
),
|
||||
bounds AS (
|
||||
SELECT
|
||||
date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') AS ts_from,
|
||||
CASE
|
||||
WHEN p_to <= p_from THEN date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') + interval '15 minutes'
|
||||
WHEN p_to > p_from + interval '60 days' THEN date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') + interval '60 days'
|
||||
ELSE date_bin(interval '15 minutes', p_to, timestamptz '1970-01-01T00:00:00Z')
|
||||
END AS ts_to
|
||||
bounds as (
|
||||
select
|
||||
date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') as ts_from,
|
||||
case
|
||||
when p_to <= p_from then date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') + interval '15 minutes'
|
||||
when p_to > p_from + interval '60 days' then date_bin(interval '15 minutes', p_from, timestamptz '1970-01-01T00:00:00Z') + interval '60 days'
|
||||
else date_bin(interval '15 minutes', p_to, timestamptz '1970-01-01T00:00:00Z')
|
||||
end as ts_to
|
||||
),
|
||||
slot_spine AS (
|
||||
SELECT gs AS interval_start
|
||||
FROM bounds b,
|
||||
slot_spine as (
|
||||
select gs as interval_start
|
||||
from bounds b,
|
||||
generate_series(
|
||||
b.ts_from,
|
||||
(b.ts_to - interval '15 minutes')::timestamptz,
|
||||
interval '15 minutes'
|
||||
) AS gs
|
||||
) as gs
|
||||
),
|
||||
fc_by_array AS (
|
||||
SELECT DISTINCT ON (fpi.interval_start, fpr.pv_array_id)
|
||||
slot_tz as (
|
||||
select
|
||||
s.interval_start,
|
||||
(
|
||||
(extract(hour from (s.interval_start at time zone t.tz_name))::int * 60)
|
||||
+ extract(minute from (s.interval_start at time zone t.tz_name))::int
|
||||
) / 15 as slot_of_day
|
||||
from slot_spine s
|
||||
cross join tz t
|
||||
),
|
||||
fc_by_array as (
|
||||
select distinct on (fpi.interval_start, fpr.pv_array_id)
|
||||
fpi.interval_start,
|
||||
fpr.pv_array_id,
|
||||
fpi.power_w::bigint AS power_w
|
||||
FROM ems.forecast_pv_interval fpi
|
||||
JOIN ems.forecast_pv_run fpr ON fpr.id = fpi.run_id
|
||||
JOIN ems.asset_pv_array apa
|
||||
ON apa.id = fpr.pv_array_id
|
||||
AND apa.site_id = fpr.site_id
|
||||
CROSS JOIN bounds b
|
||||
WHERE fpr.site_id = p_site_id
|
||||
AND fpr.status = 'ok'
|
||||
AND fpi.interval_start >= b.ts_from
|
||||
AND fpi.interval_start < b.ts_to
|
||||
ORDER BY fpi.interval_start, fpr.pv_array_id, fpr.created_at DESC
|
||||
fpi.power_w::bigint as power_w
|
||||
from bounds b
|
||||
inner join ems.forecast_pv_interval fpi
|
||||
on fpi.interval_start >= b.ts_from
|
||||
and fpi.interval_start < b.ts_to
|
||||
and fpi.pv_array_id in (
|
||||
select apa.id from ems.asset_pv_array apa where apa.site_id = p_site_id
|
||||
)
|
||||
inner join ems.forecast_pv_run fpr
|
||||
on fpr.id = fpi.run_id
|
||||
and fpr.site_id = p_site_id
|
||||
and fpr.pv_array_id = fpi.pv_array_id
|
||||
and fpr.status = 'ok'
|
||||
order by fpi.interval_start, fpr.pv_array_id, fpr.created_at desc
|
||||
),
|
||||
fc_totals AS (
|
||||
SELECT u.interval_start, coalesce(sum(u.power_w), 0)::bigint AS pv_forecast_total_w
|
||||
FROM fc_by_array u
|
||||
GROUP BY u.interval_start
|
||||
fc_totals as (
|
||||
select u.interval_start, coalesce(sum(u.power_w), 0)::bigint as pv_forecast_total_w
|
||||
from fc_by_array u
|
||||
group by u.interval_start
|
||||
),
|
||||
profile AS (
|
||||
SELECT ems.fn_pv_forecast_delta_profile(
|
||||
profile as (
|
||||
select ems.fn_pv_forecast_delta_profile(
|
||||
p_site_id,
|
||||
p_delta_data_from,
|
||||
p_delta_data_to,
|
||||
p_half_life_days,
|
||||
p_threshold_w
|
||||
) AS j
|
||||
) as j
|
||||
),
|
||||
delta_by_array AS (
|
||||
SELECT (kv.key)::int AS pv_array_id,
|
||||
(x->>'slot_of_day')::int AS slot_of_day,
|
||||
(x->>'delta_w')::int AS delta_w
|
||||
FROM profile p
|
||||
CROSS JOIN LATERAL jsonb_each((p.j)->'deltas_by_array') kv(key, value)
|
||||
CROSS JOIN LATERAL jsonb_array_elements(kv.value->'deltas') x
|
||||
delta_by_array as (
|
||||
select (kv.key)::int as pv_array_id,
|
||||
(x->>'slot_of_day')::int as slot_of_day,
|
||||
(x->>'delta_w')::int as delta_w
|
||||
from profile p
|
||||
cross join lateral jsonb_each((p.j)->'deltas_by_array') kv(key, value)
|
||||
cross join lateral jsonb_array_elements(kv.value->'deltas') x
|
||||
),
|
||||
deltas_legacy AS (
|
||||
SELECT (x->>'slot_of_day')::int AS slot_of_day,
|
||||
(x->>'delta_w')::int AS delta_w
|
||||
FROM profile p
|
||||
CROSS JOIN LATERAL jsonb_array_elements(p.j->'deltas') x
|
||||
deltas_legacy as (
|
||||
select (x->>'slot_of_day')::int as slot_of_day,
|
||||
(x->>'delta_w')::int as delta_w
|
||||
from profile p
|
||||
cross join lateral jsonb_array_elements(p.j->'deltas') x
|
||||
),
|
||||
corrected AS (
|
||||
SELECT
|
||||
s.interval_start,
|
||||
coalesce(ft.pv_forecast_total_w, 0)::bigint AS pv_forecast_total_w,
|
||||
flags as (
|
||||
select exists (select 1 from delta_by_array) as use_per_array
|
||||
),
|
||||
fc_with_sod as (
|
||||
select
|
||||
fa.interval_start,
|
||||
fa.pv_array_id,
|
||||
fa.power_w,
|
||||
st.slot_of_day
|
||||
from fc_by_array fa
|
||||
join slot_tz st on st.interval_start = fa.interval_start
|
||||
),
|
||||
per_array_corrected as (
|
||||
select
|
||||
f.interval_start,
|
||||
coalesce(
|
||||
CASE
|
||||
WHEN EXISTS (SELECT 1 FROM delta_by_array LIMIT 1) THEN (
|
||||
SELECT sum(greatest(0, fa.power_w - coalesce(d.delta_w, 0)))::bigint
|
||||
FROM fc_by_array fa
|
||||
CROSS JOIN tz
|
||||
LEFT JOIN delta_by_array d
|
||||
ON d.pv_array_id = fa.pv_array_id
|
||||
AND d.slot_of_day = (
|
||||
(
|
||||
(extract(hour FROM (s.interval_start AT TIME ZONE tz.tz_name))::int * 60)
|
||||
+ extract(minute FROM (s.interval_start AT TIME ZONE tz.tz_name))::int
|
||||
) / 15
|
||||
)
|
||||
WHERE fa.interval_start = s.interval_start
|
||||
)
|
||||
ELSE greatest(
|
||||
0,
|
||||
coalesce(ft.pv_forecast_total_w, 0)::bigint
|
||||
- coalesce(
|
||||
(
|
||||
SELECT d.delta_w
|
||||
FROM deltas_legacy d
|
||||
CROSS JOIN tz
|
||||
WHERE d.slot_of_day = (
|
||||
(
|
||||
(extract(hour FROM (s.interval_start AT TIME ZONE tz.tz_name))::int * 60)
|
||||
+ extract(minute FROM (s.interval_start AT TIME ZONE tz.tz_name))::int
|
||||
) / 15
|
||||
)
|
||||
sum(greatest(0::bigint, f.power_w - coalesce(d.delta_w, 0)::bigint)),
|
||||
0
|
||||
)::bigint as pv_forecast_corrected_w
|
||||
from fc_with_sod f
|
||||
left join delta_by_array d
|
||||
on d.pv_array_id = f.pv_array_id
|
||||
and d.slot_of_day = f.slot_of_day
|
||||
group by f.interval_start
|
||||
),
|
||||
0
|
||||
legacy_corrected as (
|
||||
select
|
||||
sw.interval_start,
|
||||
greatest(
|
||||
0::bigint,
|
||||
coalesce(ft.pv_forecast_total_w, 0)::bigint
|
||||
- coalesce(dl.delta_w, 0)::bigint
|
||||
) as pv_forecast_corrected_w
|
||||
from slot_tz sw
|
||||
left join fc_totals ft on ft.interval_start = sw.interval_start
|
||||
left join lateral (
|
||||
select dl0.delta_w
|
||||
from deltas_legacy dl0
|
||||
where dl0.slot_of_day = sw.slot_of_day
|
||||
limit 1
|
||||
) dl on true
|
||||
),
|
||||
corrected as (
|
||||
select
|
||||
st.interval_start,
|
||||
coalesce(ft.pv_forecast_total_w, 0)::bigint as pv_forecast_total_w,
|
||||
case
|
||||
when fl.use_per_array then coalesce(pac.pv_forecast_corrected_w, 0)::bigint
|
||||
else coalesce(leg.pv_forecast_corrected_w, 0)::bigint
|
||||
end as pv_forecast_corrected_w,
|
||||
st.slot_of_day
|
||||
from slot_tz st
|
||||
cross join flags fl
|
||||
left join fc_totals ft on ft.interval_start = st.interval_start
|
||||
left join per_array_corrected pac
|
||||
on fl.use_per_array
|
||||
and pac.interval_start = st.interval_start
|
||||
left join legacy_corrected leg
|
||||
on not fl.use_per_array
|
||||
and leg.interval_start = st.interval_start
|
||||
)
|
||||
)
|
||||
END,
|
||||
0
|
||||
)::bigint AS pv_forecast_corrected_w
|
||||
FROM slot_spine s
|
||||
LEFT JOIN fc_totals ft ON ft.interval_start = s.interval_start
|
||||
)
|
||||
SELECT coalesce(
|
||||
select coalesce(
|
||||
jsonb_agg(
|
||||
jsonb_build_object(
|
||||
'interval_start', c.interval_start,
|
||||
'pv_forecast_total_w', c.pv_forecast_total_w,
|
||||
'pv_forecast_corrected_w', c.pv_forecast_corrected_w,
|
||||
'slot_of_day',
|
||||
(
|
||||
(
|
||||
(extract(hour FROM (c.interval_start AT TIME ZONE tz.tz_name))::int * 60)
|
||||
+ extract(minute FROM (c.interval_start AT TIME ZONE tz.tz_name))::int
|
||||
) / 15
|
||||
'slot_of_day', c.slot_of_day
|
||||
)
|
||||
)
|
||||
ORDER BY c.interval_start
|
||||
order by c.interval_start
|
||||
),
|
||||
'[]'::jsonb
|
||||
)
|
||||
FROM corrected c
|
||||
CROSS JOIN tz;
|
||||
from corrected c;
|
||||
$fn$;
|
||||
|
||||
COMMENT ON FUNCTION ems.fn_forecast_pv_slots_range_corrected IS
|
||||
comment on function ems.fn_forecast_pv_slots_range_corrected is
|
||||
'JSON pole {interval_start, pv_forecast_total_w, pv_forecast_corrected_w, slot_of_day} po 15 min pro [p_from, p_to). Korekce per pv_array_id z fn_pv_forecast_delta_profile.deltas_by_array (fallback na jedno pole `deltas`). Horizont max. 60 dní.';
|
||||
|
||||
Reference in New Issue
Block a user