From 8dbf03d4b9684f7c55c74cdf3895b0398feb1ff6 Mon Sep 17 00:00:00 2001 From: Feike Steenbergen Date: Wed, 4 Dec 2024 15:46:09 +0100 Subject: [PATCH] Wip, reword --- .../cagg_refresh_producer_consumer/README.md | 0 .../consumer.sql | 0 .../example.sql | 12 ++ .../producer.sql | 115 ++++++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 utils/cagg_refresh_producer_consumer/README.md create mode 100644 utils/cagg_refresh_producer_consumer/consumer.sql create mode 100644 utils/cagg_refresh_producer_consumer/example.sql create mode 100644 utils/cagg_refresh_producer_consumer/producer.sql diff --git a/utils/cagg_refresh_producer_consumer/README.md b/utils/cagg_refresh_producer_consumer/README.md new file mode 100644 index 0000000..e69de29 diff --git a/utils/cagg_refresh_producer_consumer/consumer.sql b/utils/cagg_refresh_producer_consumer/consumer.sql new file mode 100644 index 0000000..e69de29 diff --git a/utils/cagg_refresh_producer_consumer/example.sql b/utils/cagg_refresh_producer_consumer/example.sql new file mode 100644 index 0000000..7a7e3c6 --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/example.sql @@ -0,0 +1,12 @@ +\i producer.sql +\i consumer.sql + +CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( + job_id => null, + config => +'{ + "end_offset": "7 days", + "start_offset": "2 years", + "continuous_aggregate": "public.stats_five_minutes" +}' +); diff --git a/utils/cagg_refresh_producer_consumer/producer.sql b/utils/cagg_refresh_producer_consumer/producer.sql new file mode 100644 index 0000000..64c5848 --- /dev/null +++ b/utils/cagg_refresh_producer_consumer/producer.sql @@ -0,0 +1,115 @@ +CREATE SCHEMA IF NOT EXISTS _timescaledb_additional; + +CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes ( + continuous_aggregate regclass not null, + window_start timestamptz not null, + window_end timestamptz not null CHECK (window_end > window_start), + scheduled timestamptz not null default pg_catalog.clock_timestamp(), + worker_pid integer +); + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS +$$This column will be populated with the pid that is currently running this task. +This allows us to keep track of things, as well as allow us to reschedule an item if +a worker_pid is no longer active (for whatever reason)$$; + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.scheduled IS +$$To ensure we do actually get to do all the work, the workers will always pick up the +task that was scheduled first. In that way, we have a bit of a priority queue.$$; + +-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been +-- picked up by any worker. +CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + continuous_aggregate, + window_start, + window_end +) WHERE worker_pid IS NULL; + +DROP PROCEDURE IF EXISTS _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental; +CREATE PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental ( + job_id int, + config jsonb +) LANGUAGE plpgsql AS $BODY$ +DECLARE + cagg_regclass regclass := (config ->> 'continuous_aggregate')::regclass; + start_offset INTERVAL := (config ->> 'start_offset')::INTERVAL; + end_offset INTERVAL := (config ->> 'end_offset')::INTERVAL; + increment_size INTERVAL := (config ->> 'increment_size')::INTERVAL; +BEGIN + IF pg_catalog.num_nulls(cagg_regclass, start_offset, end_offset) > 0 THEN + RAISE EXCEPTION 'Invalid configuration for scheduling an incremental refresh: %', config; + END IF; + + -- We gather some data on the CAgg itself, its name, and its oid, + -- as well as the size of the increment if it wasn't specified + IF increment_size IS NULL THEN + SELECT + -- We default to the dimension interval_length if not explicitly specified + coalesce(increment_size, interval_length * interval '1 microsecond') + INTO + increment_size + FROM + _timescaledb_catalog.continuous_agg AS cagg + JOIN + _timescaledb_catalog.hypertable AS h ON (h.id = raw_hypertable_id) + JOIN + _timescaledb_catalog.dimension AS dim ON (h.id = dim.hypertable_id) + WHERE + format('%I.%I', user_view_schema, user_view_name)::regclass = cagg_regclass + -- If there are multiple dimensions, we only want the first one + ORDER BY + dim.id ASC + LIMIT + 1; + END IF; + + DECLARE + start_t timestamptz := now() - start_offset; + end_t timestamptz := now() - end_offset; + + incr_start timestamptz := public.time_bucket(increment_size, now() - start_offset); + incr_end timestamptz := incr_start; + + count bigint := 0; + added bigint := 0; + hit bool := false; + BEGIN + WHILE incr_end < end_t + LOOP + incr_end := public.time_bucket(increment_size, incr_start + increment_size); + + INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes + (continuous_aggregate, window_start, window_end) + VALUES + (cagg_regclass, incr_start, incr_end) + ON CONFLICT + DO NOTHING + RETURNING + true + INTO + hit; + + count := count + 1; + IF hit THEN + added := added + 1; + END IF; + + incr_start := incr_end; + END LOOP; + + RAISE NOTICE + 'Scheduled incremental refreshes for % (% - %). Tasks evaluated: %, newly inserted: %', + cagg_regclass::text, + start_t, + end_t, + count, + added; + END; +END; +$BODY$; + +COMMENT ON PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental IS +$$schedule_refresh_continuous_aggregate_incremental is a pretty non-intelligent procedure. +For the provided continuous aggregate it will write records into this table: + _timescaledb_additional.incremental_continuous_aggregate_refreshes +Which will then be tasks picked up by task_refresh_continuous_aggregate_incremental_run$$;