From 631dda8b4e5fe9cb7bba2ee22773d9ecada605d7 Mon Sep 17 00:00:00 2001 From: Feike Steenbergen Date: Wed, 4 Dec 2024 15:46:09 +0100 Subject: [PATCH] Include prototype for incremental refresh The initial idea behind this work was to create incremental *parallel* refresh, however, there still are explicit locks being held on CAgg hypertables during refresh, so it has become *only* incremental refresh. We introduce 3 concepts: - producer job - consumer job - work queue By having these items separate, we can schedule work for CAgg refreshes in smaller increments (say 12 hours instead of 3 weeks), yet also allow us to intervene by injecting higher priority refreshes if needed. For details, see README.md --- .../cagg_refresh_producer_consumer/README.md | 90 +++++++++ .../consumer.sql | 172 +++++++++++++++++ .../producer.sql | 182 ++++++++++++++++++ .../test_schema.sql | 120 ++++++++++++ 4 files changed, 564 insertions(+) create mode 100644 utils/cagg_refresh_producer_consumer/README.md create mode 100644 utils/cagg_refresh_producer_consumer/consumer.sql create mode 100644 utils/cagg_refresh_producer_consumer/producer.sql create mode 100644 utils/cagg_refresh_producer_consumer/test_schema.sql diff --git a/utils/cagg_refresh_producer_consumer/README.md b/utils/cagg_refresh_producer_consumer/README.md new file mode 100644 index 0000000..984d03f --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/README.md @@ -0,0 +1,90 @@ +# Continous Aggregates, incremental parallel setup + +This code is exploring the possibilities to do incremental CAgg refreshes in +parallel. The setup it uses is as following. + +At a very high level these are the components: + +- a table that acts as a work queue: + `_timescaledb_additional.incremental_continuous_aggregate_refreshes` +- one (or more) producer jobs that schedule CAgg refreshes +- one (or more) consumer jobs that process the jobs based on priority + +The producer jobs can be scheduled very frequently, as no duplicate tasks will +be written to the work queue. + +## Producer + +We have a producer procedure +(`schedule_refresh_continuous_aggregate_incremental`), which schedules tasks to +be picked up by the consumers. + +The configuration for this call contains the following keys: + +```json +{ + "end_offset": "similar to end-offset in the policy", + "start_offset": "similar to start-offset in the policy", + "continuous_aggregate": "regclass / fully qualified name of the user view for the CAgg", + "increment_size": "the size of each individual task, default: chunk_interval", + "priority": "priority for these tasks. Lower numbers get processed earlier, default: 100" +} +``` + +### Producer Examples + +#### Schedule multiple jobs for this cagg, with increments of 1 week + +We schedule 2 sets + +```sql +CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( + job_id => null, + config => ' +{ + "end_offset": "6 weeks", + "start_offset": "3 years", + "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", + "increment_size": "3 days" +}'); +``` + +with the most recent data having the highest priority: + +```sql +CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( + job_id => null, + config => ' +{ + "end_offset": "1 day", + "start_offset": "6 weeks", + "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", + "increment_size": "1 week", + "priority": 1 +}'); +``` + +## Consumer + +For the consumer(s), we schedule as many jobs as we want to be able to run in +parallel. Likely, a reasonable maximum for these is not too high, for example, +4-6. While we *can* do incremental CAgg refreshes, we cannot (as of december +2024) schedule parallel refreshes for the same CAgg. This should therefore never +be higher than your number of CAggs. + +These jobs will be consuming a connection all the time, as they are designed to +run all the time. + +```sql +SELECT + public.add_job( + proc => '_timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner'::regproc, + -- This isn't really needed, but this ensures the workers do not run forever, + -- but once they terminate, they will be restarted within 15 minutes or so. + schedule_interval => interval '15 minutes', + config => '{"max_runtime": "11 hours"}', + initial_start => now() + ) +FROM + generate_series(1, 4); +``` diff --git a/utils/cagg_refresh_producer_consumer/consumer.sql b/utils/cagg_refresh_producer_consumer/consumer.sql new file mode 100644 index 0000000..3b7a0b3 --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/consumer.sql @@ -0,0 +1,172 @@ + +DROP PROCEDURE IF EXISTS _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner; +CREATE PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner ( + job_id int, + config jsonb +) LANGUAGE plpgsql AS $BODY$ +DECLARE + max_runtime interval := (config->>'max_runtime')::interval; + global_start_time timestamptz := pg_catalog.clock_timestamp(); + global_end_time timestamptz; + app_name text; +BEGIN + max_runtime := coalesce(max_runtime, interval '6 hours'); + global_end_time := global_start_time + max_runtime; + + WHILE pg_catalog.clock_timestamp() < global_end_time LOOP + SET search_path TO 'pg_catalog,pg_temp'; + SET lock_timeout TO '3s'; + SET application_name TO 'cagg incremental refresh consumer - idle'; + + -- Prevent a hot loop + PERFORM pg_catalog.pg_sleep(1.0); + + SET application_name TO 'cagg incremental refresh consumer - retrieving new task'; + + DECLARE + p_id bigint; + p_cagg regclass; + p_window_start timestamptz; + p_window_end timestamptz; + p_start_time timestamptz; + p_end_time timestamptz; + p_mat_hypertable_id int; + p_job_id int; + BEGIN + SELECT + q.id, + q.continuous_aggregate, + q.window_start, + q.window_end, + cagg.mat_hypertable_id, + coalesce(jobs.job_id, -1) + INTO + p_id, + p_cagg, + p_window_start, + p_window_end, + p_mat_hypertable_id, + p_job_id + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes AS q + JOIN + pg_catalog.pg_class AS pc ON (q.continuous_aggregate=oid) + JOIN + pg_catalog.pg_namespace AS pn ON (relnamespace=pn.oid) + JOIN + _timescaledb_catalog.continuous_agg AS cagg ON (cagg.user_view_schema=nspname AND cagg.user_view_name=pc.relname) + JOIN + _timescaledb_catalog.hypertable AS h ON (cagg.mat_hypertable_id=h.id) + LEFT JOIN + timescaledb_information.jobs ON (proc_name='policy_refresh_continuous_aggregate' AND proc_schema='_timescaledb_functions' AND jobs.config->>'mat_hypertable_id' = cagg.mat_hypertable_id::text) + WHERE + q.worker_pid IS NULL AND q.finished IS NULL + -- We don't want multiple workers to be active on the same CAgg, + AND NOT EXISTS ( + SELECT + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes AS a + JOIN + pg_catalog.pg_stat_activity ON (pid=worker_pid) + WHERE + a.finished IS NULL + -- If pids ever get recycled (container/machine restart), + -- this filter ensures we ignore the old ones + AND started > backend_start + AND q.continuous_aggregate = a.continuous_aggregate + ) + ORDER BY + q.priority ASC, + q.scheduled ASC + FOR NO KEY UPDATE OF q SKIP LOCKED + LIMIT + 1; + + IF p_cagg IS NULL THEN + COMMIT; + -- There are no items in the queue that we can currently process. We therefore + -- sleep a while longer before attempting to try again. + IF global_end_time - interval '30 seconds' < now () THEN + EXIT; + ELSE + SET application_name TO 'cagg incremental refresh consumer - waiting for next task'; + PERFORM pg_catalog.pg_sleep(0.1); + CONTINUE; + END IF; + END IF; + + UPDATE + _timescaledb_additional.incremental_continuous_aggregate_refreshes + SET + worker_pid = pg_backend_pid(), + started = clock_timestamp() + WHERE + id = p_id; + + -- Inform others of what we are doing. + app_name := ' refresh ' || p_window_start::date; + IF p_window_end::date != p_window_start::date THEN + app_name := app_name || ' ' || p_window_end::date; + ELSE + app_name := app_name || to_char(p_window_start, 'THH24:MI'); + END IF; + IF length(app_name) + length(p_cagg::text) > 63 THEN + app_name := '...' || right(p_cagg::text, 60 - length(app_name)) || app_name; + ELSE + app_name := p_cagg::text || app_name; + END IF; + PERFORM pg_catalog.set_config( + 'application_name', + app_name, + false + ); + + RAISE NOTICE + '% - Processing %, (% - %)', + pg_catalog.to_char(pg_catalog.clock_timestamp(), 'YYYY-MM-DD HH24:MI:SS.FF3OF'), + p_cagg, + p_window_start, + p_window_end; + + -- We need to ensure that all other workers now know we are working on this + -- task. We therefore need to commit once now. This also releases our + -- access exclusive lock on the queue table. + COMMIT; + + -- We take out a row-level-lock to signal to concurrent workers that *we* + -- are working on it. By taking this type of lock, we can clean up + -- this table from different tasks: They can update/delete these rows + -- if no active worker is working on them, and no lock is established. + PERFORM + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes + WHERE + id = p_id + FOR NO KEY UPDATE; + + CALL _timescaledb_functions.policy_refresh_continuous_aggregate( + -1, + config => jsonb_build_object( + 'end_offset', (clock_timestamp() - p_window_end)::interval(0), + 'start_offset', (clock_timestamp() - p_window_start)::interval(0), + 'mat_hypertable_id', p_mat_hypertable_id + ) + ); + + UPDATE + _timescaledb_additional.incremental_continuous_aggregate_refreshes + SET + finished = clock_timestamp() + WHERE + id = p_id; + COMMIT; + + SET application_name TO 'cagg incremental refresh consumer - idle'; + END; + END LOOP; + + RAISE NOTICE 'Shutting down worker, as we exceeded our maximum runtime (%)', max_runtime; +END; +$BODY$; + +GRANT EXECUTE ON PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner TO pg_database_owner; diff --git a/utils/cagg_refresh_producer_consumer/producer.sql b/utils/cagg_refresh_producer_consumer/producer.sql new file mode 100644 index 0000000..0953712 --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/producer.sql @@ -0,0 +1,182 @@ +CREATE SCHEMA IF NOT EXISTS _timescaledb_additional; + +CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes ( + id bigint GENERATED ALWAYS AS IDENTITY, + continuous_aggregate regclass not null, + window_start timestamptz not null, + window_end timestamptz not null CHECK (window_end > window_start), + scheduled timestamptz not null default pg_catalog.clock_timestamp(), + priority int not null default 1, + started timestamptz, + finished timestamptz, + worker_pid integer, + primary key (id), + CONSTRAINT incr_cagg_refreshes_workers_have_started CHECK (num_nulls(worker_pid, started) IN (0, 2)) +); + +GRANT USAGE ON SCHEMA _timescaledb_additional TO public; +REVOKE ALL ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes FROM PUBLIC; +GRANT SELECT ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes TO public; +GRANT ALL ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes TO pg_database_owner; + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS +$$This column will be populated with the pid that is currently running this task. +This allows us to keep track of things, as well as allow us to reschedule an item if +a worker_pid is no longer active (for whatever reason)$$; + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.scheduled IS +$$To ensure we do actually get to do all the work, the workers will always pick up the +task that has the lowest priority, and then which one was scheduled first. +In that way, we have a bit of a priority queue.$$; + +-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been +-- picked up by any worker. +CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + continuous_aggregate, + window_start, + window_end +) WHERE worker_pid IS NULL AND finished IS NULL; + +CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_find_first_work_item_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + priority, + scheduled +) WHERE worker_pid IS NULL; + +CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_active_workers_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + worker_pid +) WHERE worker_pid IS NOT NULL; + +DROP PROCEDURE IF EXISTS _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental; +CREATE PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental ( + job_id int, + config jsonb +) LANGUAGE plpgsql AS $BODY$ +DECLARE + cagg_regclass regclass := (config ->> 'continuous_aggregate')::regclass; + start_offset INTERVAL; + end_offset INTERVAL := (config ->> 'end_offset')::INTERVAL; + increment_size INTERVAL; + priority int := coalesce((config ->> 'priority')::integer, 100); +BEGIN + IF pg_catalog.num_nulls(cagg_regclass, end_offset) > 0 THEN + RAISE EXCEPTION 'Invalid configuration for scheduling an incremental refresh: %', config; + END IF; + + -- We gather some data on the CAgg itself, its name, and its oid, + -- as well as the size of the increment if it wasn't specified + SELECT + -- We default to the dimension interval_length if not explicitly specified + coalesce(increment_size, interval_length * interval '1 microsecond', '1 hour'), + -- And we default to the known watermark + coalesce(start_offset, now() - _timescaledb_functions.to_timestamp(watermark)) + INTO + increment_size, + start_offset + FROM + _timescaledb_catalog.continuous_agg AS cagg + JOIN + _timescaledb_catalog.hypertable AS h ON (h.id = raw_hypertable_id) + JOIN + _timescaledb_catalog.dimension AS dim ON (h.id = dim.hypertable_id) + LEFT JOIN + _timescaledb_catalog.continuous_aggs_watermark AS cw ON (cw.mat_hypertable_id = cagg.mat_hypertable_id) + WHERE + format('%I.%I', user_view_schema, user_view_name)::regclass = cagg_regclass + -- If there are multiple dimensions, we only want the first one + ORDER BY + dim.id ASC + LIMIT + 1; + + -- If explicitly configured, those values take precedent. + increment_size := coalesce((config ->> 'increment_size')::INTERVAL, increment_size, '1 hour'); + start_offset := coalesce((config ->> 'start_offset')::INTERVAL, start_offset, '1 year'); + + -- Remove stale values + WITH stale AS ( + SELECT + id + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes + WHERE + worker_pid IS NOT NULL + AND finished IS NULL + -- There is a small chance for a race condition between a consumer and + -- this producer. The consumer will very quickly take out a row level + -- lock after an intermediate commit (first statement after that commit), + -- but it may not have it *yet*. + -- By adding this filter, we should prevent these + -- rows from being marked as stale. + AND started > pg_catalog.clock_timestamp() - interval '3 seconds' + AND NOT EXISTS ( + SELECT + FROM + pg_stat_activity + WHERE + pid = worker_pid + ) + FOR UPDATE SKIP LOCKED + ) + DELETE + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes AS q + USING + stale AS s + WHERE + s.id = q.id; + + DECLARE + start_t timestamptz := now() - start_offset; + end_t timestamptz := now() - end_offset; + + incr_end timestamptz := public.time_bucket(increment_size, now() - end_offset); + incr_start timestamptz := incr_end; + + count bigint := 0; + added bigint := 0; + hit bool := false; + BEGIN + WHILE incr_start >= start_t + LOOP + incr_start := public.time_bucket(increment_size, incr_end - increment_size); + + INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes + (continuous_aggregate, window_start, window_end, priority) + VALUES + (cagg_regclass, incr_start, incr_end, priority) + ON CONFLICT + DO NOTHING + RETURNING + true + INTO + hit; + + count := count + 1; + IF hit THEN + added := added + 1; + END IF; + + incr_end := incr_start; + END LOOP; + + RAISE NOTICE + E'Scheduled incremental refreshes for % (% - %). Tasks evaluated: %, newly inserted: %.\nStart offset: %, end offset: %, increment: %', + cagg_regclass::text, + start_t, + end_t, + count, + added, + start_offset, + end_offset, + increment_size; + END; +END; +$BODY$; + +GRANT EXECUTE ON PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental TO pg_database_owner; + +COMMENT ON PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental IS +$$schedule_refresh_continuous_aggregate_incremental is a pretty non-intelligent procedure. +For the provided continuous aggregate it will write records into this table: + _timescaledb_additional.incremental_continuous_aggregate_refreshes +Which will then be tasks picked up by task_refresh_continuous_aggregate_incremental_runner$$; diff --git a/utils/cagg_refresh_producer_consumer/test_schema.sql b/utils/cagg_refresh_producer_consumer/test_schema.sql new file mode 100644 index 0000000..6f5cce8 --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/test_schema.sql @@ -0,0 +1,120 @@ +CREATE TABLE public.test_cagg_incr_refresh( + time timestamptz not null, + value int4 +); + +CREATE FUNCTION public.slow_accum(int8, int4) +RETURNS int8 +LANGUAGE sql AS +$$SELECT $1 + $2$$ STRICT; + +CREATE OR REPLACE FUNCTION public.slow_final(int8) +RETURNS int8 +LANGUAGE plpgsql AS $$ +BEGIN + PERFORM pg_catalog.pg_sleep(30); + RETURN $1; +END; +$$; + +CREATE AGGREGATE public.slow_sum(int4) +( + INITCOND = 0, + STYPE = int8, + SFUNC = public.slow_accum, + FINALFUNC = public.slow_final +); + +SELECT + public.create_hypertable( + 'public.test_cagg_incremental_refresh', + 'time' + ); +CREATE TABLE public.( + time timestamptz not null, + value int4 +); + +CREATE FUNCTION public.slow_accum(int8, int4) +RETURNS int8 +LANGUAGE sql AS +$$SELECT $1 + $2$$ STRICT; + +CREATE OR REPLACE FUNCTION public.slow_final(int8) +RETURNS int8 +LANGUAGE plpgsql AS $$ +BEGIN + -- This ensures the aggregate will be slow always, + -- regardless of the amount of rows. + PERFORM pg_catalog.pg_sleep(1); + RETURN $1; +END; +$$; + +CREATE AGGREGATE public.slow_sum(int4) +( + INITCOND = 0, + STYPE = int8, + SFUNC = public.slow_accum, + FINALFUNC = public.slow_final +); + +SELECT + public.create_hypertable( + 'public.test_cagg_incr_refresh', + 'time' + ); + +CREATE MATERIALIZED VIEW public.test_cagg_incr_refresh_cagg +WITH (timescaledb.continuous) AS +SELECT + time_bucket(interval '12 hours', time) AS bucket, + slow_sum(value) AS sum_values, + count(*) AS n_values +FROM + public.test_cagg_incr_refresh +GROUP BY + bucket +WITH NO DATA; + +-- 1096 days worth of data. As we sleep 1 second for every hour, +-- we have quite a lot of sleeping to be done, allowing us to +-- observe the interactions between jobs, priorities etc. +INSERT INTO + public.test_cagg_incr_refresh (time, value) +SELECT + t, + (random() * 2000_000_000)::int +FROM + pg_catalog.generate_series( + '2022-01-01T00:00:00+00', + '2025-01-01T00:00:00+00', + -- Slightly offset from the bucket size, to allow some differences in buckets + interval '10 hours' + ) AS _(t); + +-- First, we schedule the older data, with a low priority +CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( + job_id => null, + config => +'{ + "end_offset": "7 days", + "start_offset": "3 years", + "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", + "increment_size": "3 days", + "priority": 100 +}' +); + +-- Next, we schedule the newer data, with a higher priority +CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( + job_id => null, + config => +'{ + "end_offset": "10 minutes", + "start_offset": "8 days", + "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", + "increment_size": "24 hours", + "priority": 1 +}' +); \ No newline at end of file