diff --git a/Cargo.lock b/Cargo.lock index e8f981c3..2428645c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,9 +254,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.0.13" +version = "3.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08799f92c961c7a1cf0cc398a9073da99e21ce388b46372c37f3191f2f3eed3e" +checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62" dependencies = [ "atty", "bitflags", @@ -271,9 +271,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.0.12" +version = "3.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fd2078197a22f338bd4fbf7d6387eb6f0d6a3c69e6cbc09f5c93e97321fd92a" +checksum = "9a1132dc3944b31c20dd8b906b3a9f0a5d0243e092d59171414969657ac6aa85" dependencies = [ "heck", "proc-macro-error", @@ -1097,8 +1097,8 @@ dependencies = [ [[package]] name = "pgx" -version = "0.2.6" -source = "git+https://github.com/timescale/pgx?branch=promscale-staging#271be6a1039d52ae998f7b0bb16fc28480e44af0" +version = "0.3.1" +source = "git+https://github.com/timescale/pgx?branch=promscale-staging#ee52db6bbaa006f6f3674bddeff8516c3b914e71" dependencies = [ "atomic-traits", "bitflags", @@ -1126,8 +1126,8 @@ dependencies = [ [[package]] name = "pgx-macros" -version = "0.2.6" -source = "git+https://github.com/timescale/pgx?branch=promscale-staging#271be6a1039d52ae998f7b0bb16fc28480e44af0" +version = "0.3.1" +source = "git+https://github.com/timescale/pgx?branch=promscale-staging#ee52db6bbaa006f6f3674bddeff8516c3b914e71" dependencies = [ "pgx-utils", "proc-macro-crate", @@ -1139,8 +1139,8 @@ dependencies = [ [[package]] name = "pgx-pg-sys" -version = "0.2.6" -source = "git+https://github.com/timescale/pgx?branch=promscale-staging#271be6a1039d52ae998f7b0bb16fc28480e44af0" +version = "0.3.1" +source = "git+https://github.com/timescale/pgx?branch=promscale-staging#ee52db6bbaa006f6f3674bddeff8516c3b914e71" dependencies = [ "bindgen", "build-deps", @@ -1160,8 +1160,8 @@ dependencies = [ [[package]] name = "pgx-tests" -version = "0.2.6" -source = "git+https://github.com/timescale/pgx?branch=promscale-staging#271be6a1039d52ae998f7b0bb16fc28480e44af0" +version = "0.3.1" +source = "git+https://github.com/timescale/pgx?branch=promscale-staging#ee52db6bbaa006f6f3674bddeff8516c3b914e71" dependencies = [ "colored", "eyre", @@ -1180,10 +1180,10 @@ dependencies = [ [[package]] name = "pgx-utils" -version = "0.2.6" -source = "git+https://github.com/timescale/pgx?branch=promscale-staging#271be6a1039d52ae998f7b0bb16fc28480e44af0" +version = "0.3.1" +source = "git+https://github.com/timescale/pgx?branch=promscale-staging#ee52db6bbaa006f6f3674bddeff8516c3b914e71" dependencies = [ - "clap 3.0.13", + "clap 3.0.14", "color-eyre", "colored", "convert_case", @@ -1949,9 +1949,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" dependencies = [ "cfg-if", "pin-project-lite", @@ -1961,9 +1961,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index d0bcd200..3a9260dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,14 +26,14 @@ pg_test = ["serde_json"] [dependencies] bincode = "1.3.1" -pgx = "0.2.5" -pgx-macros = "0.2.5" +pgx = "0.3.1" +pgx-macros = "0.3.1" num_cpus = "1.13.1" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.70", optional = true } [dev-dependencies] -pgx-tests = "0.2.5" +pgx-tests = "0.3.1" [patch.crates-io] pgx = { git = "https://github.com/timescale/pgx", branch = "promscale-staging" } diff --git a/Dockerfile b/Dockerfile index b416df3e..f08930a1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,7 +38,7 @@ RUN \ # Remove crt-static feature on musl target to allow building cdylibs ENV RUSTFLAGS="-C target-feature=-crt-static" RUN --mount=type=cache,uid=70,gid=70,target=/build/promscale/.cargo/registry \ - cargo install cargo-pgx --git https://github.com/timescale/pgx --branch promscale-staging --rev 271be6a1 && \ + cargo install cargo-pgx --git https://github.com/timescale/pgx --branch promscale-staging --rev ee52db6b && \ cargo pgx init --${PG_VERSION_TAG} $(which pg_config) USER root diff --git a/src/aggregate_utils.rs b/src/aggregate_utils.rs deleted file mode 100644 index 6cedd1af..00000000 --- a/src/aggregate_utils.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::ptr::null_mut; - -use pgx::pg_sys; - -// TODO move to func_utils once there are enough function to warrant one -#[allow(dead_code)] -pub unsafe fn get_collation(fcinfo: pg_sys::FunctionCallInfo) -> Option { - if (*fcinfo).fncollation == 0 { - None - } else { - Some((*fcinfo).fncollation) - } -} - -pub unsafe fn in_aggregate_context T>( - fcinfo: pg_sys::FunctionCallInfo, - f: F, -) -> T { - let mctx = - aggregate_mctx(fcinfo).unwrap_or_else(|| pgx::error!("cannot call as non-aggregate")); - crate::palloc::in_memory_context(mctx, f) -} - -pub fn aggregate_mctx(fcinfo: pg_sys::FunctionCallInfo) -> Option { - let mut mctx = null_mut(); - let is_aggregate = unsafe { pg_sys::AggCheckCallContext(fcinfo, &mut mctx) }; - if is_aggregate == 0 { - None - } else { - debug_assert!(!mctx.is_null()); - Some(mctx) - } -} diff --git a/src/aggregates/gapfill_delta.rs b/src/aggregates/gapfill_delta.rs index 6c6503be..93370fac 100644 --- a/src/aggregates/gapfill_delta.rs +++ b/src/aggregates/gapfill_delta.rs @@ -1,33 +1,9 @@ use crate::aggregates::{Microseconds, Milliseconds, STALE_NAN, USECS_PER_MS, USECS_PER_SEC}; -use crate::palloc::{Inner, InternalAsValue}; use pgx::*; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; -#[pg_extern(immutable, parallel_safe)] -pub fn prom_extrapolate_final(state: Internal) -> Option>> { - prom_extrapolate_final_inner(unsafe { state.to_inner() }) -} -pub fn prom_extrapolate_final_inner( - state: Option>, -) -> Option>> { - state.map(|mut s| s.as_vec()) -} - -/// Backwards compatibility -#[no_mangle] -pub extern "C" fn pg_finfo_gapfill_delta_final() -> &'static pg_sys::Pg_finfo_record { - const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 }; - &V1_API -} - -#[no_mangle] -unsafe extern "C" fn gapfill_delta_final(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum { - prom_extrapolate_final_wrapper(fcinfo) -} - #[derive(Serialize, Deserialize, PostgresType, Debug)] -#[pgx(sql = false)] pub struct GapfillDeltaTransition { window: VecDeque<(pg_sys::TimestampTz, f64)>, // a Datum for each index in the array, 0 by convention if the value is NULL diff --git a/src/aggregates/prom_delta.rs b/src/aggregates/prom_delta.rs index ff2e08e1..b5dc539b 100644 --- a/src/aggregates/prom_delta.rs +++ b/src/aggregates/prom_delta.rs @@ -2,9 +2,10 @@ use pgx::*; use pgx::error; -use crate::aggregate_utils::in_aggregate_context; use crate::aggregates::{GapfillDeltaTransition, Milliseconds}; -use crate::palloc::{Inner, InternalAsValue, ToInternal}; + +#[allow(non_camel_case_types)] +pub struct prom_delta; // prom divides time into sliding windows of fixed size, e.g. // | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds | @@ -12,112 +13,49 @@ use crate::palloc::{Inner, InternalAsValue, ToInternal}; // value for that bucket. // | a b c d e | f g h i | j k | m | // | e - a | i - f | k - j | | -#[allow(clippy::too_many_arguments)] -#[pg_extern(immutable, parallel_safe)] -pub fn prom_delta_transition( - state: Internal, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, // `prev_now - step_size` is where the next window starts - range: Milliseconds, // the size of a window to delta over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Internal { - prom_delta_transition_inner( - unsafe { state.to_inner() }, - lowest_time, - greatest_time, - step_size, - range, - sample_time, - sample_value, - fc, - ) - .internal() -} - -#[allow(clippy::too_many_arguments)] -fn prom_delta_transition_inner( - state: Option>, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, // `prev_now - step` is where the next window starts - range: Milliseconds, // the size of a window to delta over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Option> { - unsafe { - in_aggregate_context(fc, || { - if sample_time < lowest_time || sample_time > greatest_time { - error!("input time less than lowest time") - } - - let mut state = state.unwrap_or_else(|| { - let state: Inner<_> = GapfillDeltaTransition::new( - lowest_time, - greatest_time, - range, - step_size, - false, - false, - ) - .into(); - state - }); - - state.add_data_point(sample_time, sample_value); - - Some(state) - }) +#[pg_aggregate] +impl Aggregate for prom_delta { + type State = Option; + type Args = ( + name!(lowest_time, pg_sys::TimestampTz), + name!(greatest_time, pg_sys::TimestampTz), + name!(step_size, Milliseconds), + name!(range, Milliseconds), + name!(sample_time, pg_sys::TimestampTz), + name!(sample_value, f64), + ); + type Finalize = Option>>; + + fn state( + state: Self::State, + (lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args, + _: pg_sys::FunctionCallInfo, + ) -> Self::State { + if sample_time < lowest_time || sample_time > greatest_time { + error!(format!( + "input time {} not in bounds [{}, {}]", + sample_time, lowest_time, greatest_time + )) + } + + let mut state = state.unwrap_or_else(|| { + GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, false, false) + }); + + state.add_data_point(sample_time, sample_value); + + Some(state) } -} - -/// Backwards compatibility -#[no_mangle] -pub extern "C" fn pg_finfo_gapfill_delta_transition() -> &'static pg_sys::Pg_finfo_record { - const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 }; - &V1_API -} -#[no_mangle] -unsafe extern "C" fn gapfill_delta_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum { - prom_delta_transition_wrapper(fcinfo) -} - -#[no_mangle] -pub extern "C" fn pg_finfo_prom_delta_final_wrapper() -> &'static pg_sys::Pg_finfo_record { - const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 }; - &V1_API -} - -#[no_mangle] -unsafe extern "C" fn prom_delta_final_wrapper(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum { - super::gapfill_delta::prom_extrapolate_final_wrapper(fcinfo) + fn finalize( + current: Self::State, + _: Self::OrderedSetArgs, + _: pg_sys::FunctionCallInfo, + ) -> Self::Finalize { + current.map(|mut s| s.as_vec()) + } } -// implementation of prometheus delta function -// for proper behavior the input must be ORDER BY sample_time -extension_sql!( - r#" -CREATE OR REPLACE AGGREGATE @extschema@.prom_delta( - lowest_time TIMESTAMPTZ, - greatest_time TIMESTAMPTZ, - step_size BIGINT, - range BIGINT, - sample_time TIMESTAMPTZ, - sample_value DOUBLE PRECISION) -( - sfunc=@extschema@.prom_delta_transition, - stype=internal, - finalfunc=@extschema@.prom_extrapolate_final -); -"#, - name = "create_prom_delta_aggregate", - requires = [prom_delta_transition, prom_extrapolate_final] -); - #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { @@ -164,7 +102,7 @@ mod tests { Spi::get_one::>(&*prepare_query("'2000-01-02 15:00:00 UTC'", "NULL")); } - #[pg_test(error = "input time less than lowest time")] + #[pg_test(error = "input time 631292400000000 not in bounds [140400000000, 143100000000]")] fn test_prom_delta_with_input_time_less_than_lowest_time_fails() { setup(); Spi::get_one::>(&*prepare_query( diff --git a/src/aggregates/prom_increase.rs b/src/aggregates/prom_increase.rs index 3a884bbd..99a2ed49 100644 --- a/src/aggregates/prom_increase.rs +++ b/src/aggregates/prom_increase.rs @@ -1,108 +1,52 @@ -use pgx::error; -use pgx::Internal; use pgx::*; -use crate::aggregate_utils::in_aggregate_context; use crate::aggregates::{GapfillDeltaTransition, Milliseconds}; -use crate::palloc::{Inner, InternalAsValue, ToInternal}; -#[allow(clippy::too_many_arguments)] -#[pg_extern(immutable, parallel_safe)] -pub fn prom_increase_transition( - state: Internal, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, // `prev_now - step_size` is where the next window starts - range: Milliseconds, // the size of a window to delta over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Internal { - prom_increase_transition_inner( - unsafe { state.to_inner() }, - lowest_time, - greatest_time, - step_size, - range, - sample_time, - sample_value, - fc, - ) - .internal() -} - -#[allow(clippy::too_many_arguments)] -fn prom_increase_transition_inner( - state: Option>, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, // `prev_now - step` is where the next window starts - range: Milliseconds, // the size of a window to delta over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Option> { - unsafe { - in_aggregate_context(fc, || { - if sample_time < lowest_time || sample_time > greatest_time { - error!("input time less than lowest time") - } - - let mut state = state.unwrap_or_else(|| { - let state: Inner<_> = GapfillDeltaTransition::new( - lowest_time, - greatest_time, - range, - step_size, - true, - false, - ) - .into(); - state - }); - - state.add_data_point(sample_time, sample_value); - - Some(state) - }) +#[allow(non_camel_case_types)] +pub struct prom_increase; + +#[pg_aggregate] +impl Aggregate for prom_increase { + type State = Option; + type Args = ( + name!(lowest_time, pg_sys::TimestampTz), + name!(greatest_time, pg_sys::TimestampTz), + name!(step_size, Milliseconds), + name!(range, Milliseconds), + name!(sample_time, pg_sys::TimestampTz), + name!(sample_value, f64), + ); + type Finalize = Option>>; + + fn state( + state: Self::State, + (lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args, + _: pg_sys::FunctionCallInfo, + ) -> Self::State { + if sample_time < lowest_time || sample_time > greatest_time { + error!(format!( + "input time {} not in bounds [{}, {}]", + sample_time, lowest_time, greatest_time + )) + } + + let mut state = state.unwrap_or_else(|| { + GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, true, false) + }); + + state.add_data_point(sample_time, sample_value); + Some(state) } -} - -/// Backwards compatibility -#[no_mangle] -pub extern "C" fn pg_finfo_gapfill_increase_transition() -> &'static pg_sys::Pg_finfo_record { - const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 }; - &V1_API -} -#[no_mangle] -unsafe extern "C" fn gapfill_increase_transition( - fcinfo: pg_sys::FunctionCallInfo, -) -> pg_sys::Datum { - prom_increase_transition_wrapper(fcinfo) + fn finalize( + current: Self::State, + _: Self::OrderedSetArgs, + _: pg_sys::FunctionCallInfo, + ) -> Self::Finalize { + current.map(|mut s| s.as_vec()) + } } -// implementation of prometheus increase function -// for proper behavior the input must be ORDER BY sample_time -extension_sql!( - r#" -CREATE OR REPLACE AGGREGATE @extschema@.prom_increase( - lowest_time TIMESTAMPTZ, - greatest_time TIMESTAMPTZ, - step_size BIGINT, - range BIGINT, - sample_time TIMESTAMPTZ, - sample_value DOUBLE PRECISION) -( - sfunc=@extschema@.prom_increase_transition, - stype=internal, - finalfunc=@extschema@.prom_extrapolate_final -); -"#, - name = "create_prom_increase_aggregate", - requires = [prom_increase_transition, prom_extrapolate_final] -); - #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { diff --git a/src/aggregates/prom_rate.rs b/src/aggregates/prom_rate.rs index 99e120b5..a4c81b68 100644 --- a/src/aggregates/prom_rate.rs +++ b/src/aggregates/prom_rate.rs @@ -1,109 +1,54 @@ use pgx::error; -use pgx::Internal; use pgx::*; -use crate::aggregate_utils::in_aggregate_context; use crate::aggregates::{GapfillDeltaTransition, Milliseconds}; -use crate::palloc::{Inner, InternalAsValue, ToInternal}; -#[allow(clippy::too_many_arguments)] -#[pg_extern(immutable, parallel_safe)] -pub fn prom_rate_transition( - state: Internal, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, - range: Milliseconds, // the size of a window to calculate over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Internal { - prom_rate_transition_inner( - unsafe { state.to_inner() }, - lowest_time, - greatest_time, - step_size, - range, - sample_time, - sample_value, - fc, - ) - .internal() -} +#[allow(non_camel_case_types)] +pub struct prom_rate; -#[allow(clippy::too_many_arguments)] -fn prom_rate_transition_inner( - state: Option>, - lowest_time: pg_sys::TimestampTz, - greatest_time: pg_sys::TimestampTz, - step_size: Milliseconds, - range: Milliseconds, // the size of a window to calculate over - sample_time: pg_sys::TimestampTz, - sample_value: f64, - fc: pg_sys::FunctionCallInfo, -) -> Option> { - unsafe { - in_aggregate_context(fc, || { - if sample_time < lowest_time || sample_time > greatest_time { - error!(format!( - "input time {} not in bounds [{}, {}]", - sample_time, lowest_time, greatest_time - )) - } +#[pg_aggregate] +impl Aggregate for prom_rate { + type State = Option; + type Args = ( + name!(lowest_time, pg_sys::TimestampTz), + name!(greatest_time, pg_sys::TimestampTz), + name!(step_size, Milliseconds), + name!(range, Milliseconds), + name!(sample_time, pg_sys::TimestampTz), + name!(sample_value, f64), + ); + type Finalize = Option>>; - let mut state = state.unwrap_or_else(|| { - let state: Inner<_> = GapfillDeltaTransition::new( - lowest_time, - greatest_time, - range, - step_size, - true, - true, - ) - .into(); - state - }); + fn state( + state: Self::State, + (lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args, + _: pg_sys::FunctionCallInfo, + ) -> Self::State { + if sample_time < lowest_time || sample_time > greatest_time { + error!(format!( + "input time {} not in bounds [{}, {}]", + sample_time, lowest_time, greatest_time + )) + } - state.add_data_point(sample_time, sample_value); + let mut state = state.unwrap_or_else(|| { + GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, true, true) + }); - Some(state) - }) - } -} + state.add_data_point(sample_time, sample_value); -/// Backwards compatibility -#[no_mangle] -pub extern "C" fn pg_finfo_gapfill_rate_transition() -> &'static pg_sys::Pg_finfo_record { - const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 }; - &V1_API -} + Some(state) + } -#[no_mangle] -unsafe extern "C" fn gapfill_rate_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum { - prom_rate_transition_wrapper(fcinfo) + fn finalize( + current: Self::State, + _: Self::OrderedSetArgs, + _: pg_sys::FunctionCallInfo, + ) -> Self::Finalize { + current.map(|mut s| s.as_vec()) + } } -// implementation of prometheus rate function -// for proper behavior the input must be ORDER BY sample_time -extension_sql!( - r#" -CREATE OR REPLACE AGGREGATE @extschema@.prom_rate( - lowest_time TIMESTAMPTZ, - greatest_time TIMESTAMPTZ, - step_size BIGINT, - range BIGINT, - sample_time TIMESTAMPTZ, - sample_value DOUBLE PRECISION) -( - sfunc=@extschema@.prom_rate_transition, - stype=internal, - finalfunc=@extschema@.prom_extrapolate_final -); -"#, - name = "create_prom_rate_aggregate", - requires = [prom_rate_transition, prom_extrapolate_final] -); - #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { diff --git a/src/aggregates/vector_selector.rs b/src/aggregates/vector_selector.rs index 67780862..313e112b 100644 --- a/src/aggregates/vector_selector.rs +++ b/src/aggregates/vector_selector.rs @@ -140,168 +140,17 @@ use pgx::*; use pgx::error; -use crate::aggregate_utils::in_aggregate_context; use crate::aggregates::{Milliseconds, STALE_NAN, USECS_PER_MS}; use serde::{Deserialize, Serialize}; -use crate::palloc::{Inner, InternalAsValue, ToInternal}; -use crate::raw::bytea; - -/// Note that for performance, this aggregate is parallel-izable, combinable, and does not expect -/// ordered inputs. -#[allow(clippy::too_many_arguments)] -#[pg_extern(immutable, parallel_safe)] -pub fn vector_selector_transition( - state: Internal, - start_time: pg_sys::TimestampTz, - end_time: pg_sys::TimestampTz, - bucket_width: Milliseconds, - lookback: Milliseconds, - time: pg_sys::TimestampTz, - value: f64, - fcinfo: pg_sys::FunctionCallInfo, -) -> Internal { - vector_selector_transition_inner( - unsafe { state.to_inner() }, - start_time, - end_time, - bucket_width, - lookback, - time, - value, - fcinfo, - ) - .internal() -} - -#[allow(clippy::too_many_arguments)] -fn vector_selector_transition_inner( - state: Option>, - start_time: pg_sys::TimestampTz, - end_time: pg_sys::TimestampTz, - bucket_width: Milliseconds, - lookback: Milliseconds, - time: pg_sys::TimestampTz, - value: f64, - fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { - unsafe { - in_aggregate_context(fcinfo, || { - let mut state = state.unwrap_or_else(|| { - let state: Inner = - VectorSelector::new(start_time, end_time, bucket_width, lookback).into(); - state - }); - - state.insert(time, value); - - Some(state) - }) - } -} - -#[pg_extern(immutable, parallel_safe)] -pub fn vector_selector_final( - state: Internal, /* Option> */ -) -> Option>> { - let state: Option> = unsafe { state.to_inner() }; - state.map(|s| s.to_pg_array()) -} - -#[pg_extern(immutable, parallel_safe, strict)] -pub fn vector_selector_serialize(state: Internal) -> bytea { - let state: &mut VectorSelector = unsafe { - // This is safe as long as this function is defined as `strict`, in - // which case PG knows that NULL -> NULL and so it will not call this - // function with NULL values - state.get_mut().unwrap() - }; - crate::do_serialize!(state) -} - -#[pg_extern(immutable, parallel_safe, strict)] -pub fn vector_selector_deserialize(bytes: bytea, _internal: Internal) -> Internal { - let v: VectorSelector = crate::do_deserialize!(bytes, VectorSelector); - Inner::from(v).internal() -} - -#[pg_extern(immutable, parallel_safe)] -pub fn vector_selector_combine( - state1: Internal, - state2: Internal, - fcinfo: pg_sys::FunctionCallInfo, -) -> Internal { - vector_selector_combine_inner( - unsafe { state1.to_inner() }, - unsafe { state2.to_inner() }, - fcinfo, - ) - .internal() -} - -fn vector_selector_combine_inner( - state1: Option>, - state2: Option>, - fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { - unsafe { - in_aggregate_context(fcinfo, || { - match (state1, state2) { - (None, None) => None, - (None, Some(state2)) => { - let s = state2.clone(); - Some(s.into()) - } - (Some(state1), None) => { - let s = state1.clone(); - Some(s.into()) - } //should I make these return themselves? - (Some(state1), Some(state2)) => { - let mut s1 = state1.clone(); // is there a way to avoid if it doesn't need it - s1.combine(&state2); - Some(s1.into()) - } - } - }) - } -} - -extension_sql!( - r#" -CREATE OR REPLACE AGGREGATE @extschema@.vector_selector( - start_time TIMESTAMPTZ, - end_time TIMESTAMPTZ, - bucket_width BIGINT, - lookback BIGINT, - sample_time TIMESTAMPTZ, - sample_value DOUBLE PRECISION) -( - sfunc = vector_selector_transition, - stype = internal, - finalfunc = vector_selector_final, - combinefunc = vector_selector_combine, - serialfunc = vector_selector_serialize, - deserialfunc = vector_selector_deserialize, - parallel = safe -); -"#, - name = "create_vector_selector_aggregate", - requires = [ - vector_selector_transition, - vector_selector_final, - vector_selector_combine, - vector_selector_serialize, - vector_selector_deserialize - ] -); - -// The internal state consists of a vector non-overlapping sample buckets. Each bucket -// has a corresponding (virtual) timestamp corresponding to the ts series -// described above. The timestamp represents the maximum value stored in the -// bucket (inclusive). The minimum value is defined by the maximum value of -// the previous bucket (exclusive). -// the value stored inside the bucket is the last sample in the bucket (sample with highest timestamp) -#[derive(Debug, Clone, Serialize, Deserialize)] +/// The internal state consists of a vector non-overlapping sample buckets. Each bucket +/// has a corresponding (virtual) timestamp corresponding to the ts series +/// described above. The timestamp represents the maximum value stored in the +/// bucket (inclusive). The minimum value is defined by the maximum value of +/// the previous bucket (exclusive). +/// the value stored inside the bucket is the last sample in the bucket (sample with highest timestamp) +#[allow(non_camel_case_types)] +#[derive(Debug, Clone, Serialize, Deserialize, PostgresType)] pub struct VectorSelector { first_bucket_max_time: pg_sys::TimestampTz, last_bucket_max_time: pg_sys::TimestampTz, @@ -333,7 +182,7 @@ impl VectorSelector { } } - fn combine(&mut self, other: &Inner) { + fn combine_with(&mut self, other: &VectorSelector) { if self.first_bucket_max_time != other.first_bucket_max_time || self.last_bucket_max_time != other.last_bucket_max_time || self.end_time != other.end_time @@ -454,6 +303,61 @@ impl VectorSelector { } } +#[pg_aggregate] +impl Aggregate for VectorSelector { + type State = Option; + type Args = ( + name!(lowest_time, pg_sys::TimestampTz), + name!(greatest_time, pg_sys::TimestampTz), + name!(bucket_width, Milliseconds), + name!(lookback, Milliseconds), + name!(sample_time, pg_sys::TimestampTz), + name!(sample_value, f64), + ); + type Finalize = Option>>; + + const PARALLEL: Option = Some(ParallelOption::Safe); + + const NAME: &'static str = "vector_selector"; + + fn state( + state: Self::State, + (start_time, end_time, bucket_width, lookback, time, value): Self::Args, + _: pg_sys::FunctionCallInfo, + ) -> Self::State { + let mut state = state + .unwrap_or_else(|| VectorSelector::new(start_time, end_time, bucket_width, lookback)); + + state.insert(time, value); + + Some(state) + } + + fn finalize( + current: Self::State, + _: Self::OrderedSetArgs, + _: pg_sys::FunctionCallInfo, + ) -> Self::Finalize { + current.map(|c| c.to_pg_array()) + } + + fn combine( + state1: Self::State, + state2: Self::State, + _: pg_sys::FunctionCallInfo, + ) -> Self::State { + match (state1, state2) { + (None, None) => None, + (None, Some(state2)) => Some(state2), + (Some(state1), None) => Some(state1), + (Some(mut state1), Some(state2)) => { + state1.combine_with(&state2); + Some(state1) + } + } + } +} + #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { diff --git a/src/lib.rs b/src/lib.rs index a6a8c038..15625fda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,9 @@ use pgx::*; -mod aggregate_utils; mod aggregates; mod palloc; -mod raw; mod schema; mod support; -mod type_builder; mod util; pg_module_magic!(); diff --git a/src/palloc.rs b/src/palloc.rs index 3d0145ac..2d8039c3 100644 --- a/src/palloc.rs +++ b/src/palloc.rs @@ -5,14 +5,6 @@ use std::{ use pgx::*; -pub unsafe fn in_memory_context T>(mctx: pg_sys::MemoryContext, f: F) -> T { - let prev_ctx = pg_sys::CurrentMemoryContext; - pg_sys::CurrentMemoryContext = mctx; - let t = f(); - pg_sys::CurrentMemoryContext = prev_ctx; - t -} - pub use pgx::Internal; #[allow(clippy::missing_safety_doc)] diff --git a/src/raw.rs b/src/raw.rs deleted file mode 100644 index d271258a..00000000 --- a/src/raw.rs +++ /dev/null @@ -1,62 +0,0 @@ -#![allow(non_camel_case_types)] - -use pgx::*; - -// TODO: Is this the right approach to declaring `bytea` and `TimestampTz`? -extension_sql!( - "", - name = "pseudo_create_types", - creates = [Type(bytea), Type(TimestampTz),], -); - -macro_rules! raw_type { - ($name:ident, $tyid: path, $arrayid: path) => { - impl FromDatum for $name { - const NEEDS_TYPID: bool = false; - - unsafe fn from_datum( - datum: pg_sys::Datum, - is_null: bool, - _typoid: pg_sys::Oid, - ) -> Option - where - Self: Sized, - { - if is_null { - return None; - } - Some(Self(datum)) - } - } - - impl IntoDatum for $name { - fn into_datum(self) -> Option { - Some(self.0) - } - fn type_oid() -> pg_sys::Oid { - $tyid - } - fn array_type_oid() -> pg_sys::Oid { - $arrayid - } - } - - impl From for $name { - fn from(d: pg_sys::Datum) -> Self { - Self(d) - } - } - - #[allow(clippy::from_over_into)] - impl Into for $name { - fn into(self) -> pg_sys::Datum { - self.0 - } - } - }; -} - -#[derive(Clone, Copy)] -pub struct bytea(pub pg_sys::Datum); - -raw_type!(bytea, pg_sys::BYTEAOID, pg_sys::BYTEAARRAYOID); diff --git a/src/type_builder.rs b/src/type_builder.rs deleted file mode 100644 index b58f7955..00000000 --- a/src/type_builder.rs +++ /dev/null @@ -1,88 +0,0 @@ -#[repr(u8)] -pub enum SerializationType { - Default = 1, -} - -#[macro_export] -macro_rules! do_serialize { - ($state: ident) => { - { - $crate::do_serialize!($state, version: 1) - } - }; - ($state: ident, version: $version: expr) => { - { - use $crate::type_builder::SerializationType; - use std::io::{Cursor, Write}; - use std::convert::TryInto; - - let state = &*$state; - let serialized_size = bincode::serialized_size(state) - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - let our_size = serialized_size + 2; // size of serialized data + our version flags - let allocated_size = our_size + 4; // size of our data + the varlena header - let allocated_size = allocated_size.try_into() - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - // valena tyes have a maximum size - if allocated_size > 0x3FFFFFFF { - pgx::error!("size {} bytes is to large", allocated_size) - } - - let bytes: &mut [u8] = unsafe { - let bytes = pgx::pg_sys::palloc0(allocated_size); - std::slice::from_raw_parts_mut(bytes.cast(), allocated_size) - }; - let mut writer = Cursor::new(bytes); - // varlena header space - let varsize = [0; 4]; - writer.write_all(&varsize) - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - // type version - writer.write_all(&[$version]) - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - // serialization version; 1 for bincode is currently the only option - writer.write_all(&[SerializationType::Default as u8]) - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - bincode::serialize_into(&mut writer, state) - .unwrap_or_else(|e| pgx::error!("serialization error {}", e)); - unsafe { - let len = writer.position().try_into().expect("serialized size too large"); - ::pgx::set_varsize(writer.get_mut().as_mut_ptr() as *mut _, len); - } - bytea::from(writer.into_inner().as_mut_ptr() as pg_sys::Datum) - } - }; -} -#[macro_export] -macro_rules! do_deserialize { - ($bytes: ident, $t: ty) => {{ - use $crate::type_builder::SerializationType; - - let state: $t = unsafe { - let input: bytea = $bytes; - let input: pgx::pg_sys::Datum = input.into(); - let detoasted = pg_sys::pg_detoast_datum_packed(input as *mut _); - let len = pgx::varsize_any_exhdr(detoasted); - let data = pgx::vardata_any(detoasted); - let bytes = std::slice::from_raw_parts(data as *mut u8, len); - if bytes.len() < 1 { - pgx::error!("deserialization error, no bytes") - } - if bytes[0] != 1 { - pgx::error!( - "deserialization error, invalid serialization version {}", - bytes[0] - ) - } - if bytes[1] != SerializationType::Default as u8 { - pgx::error!( - "deserialization error, invalid serialization type {}", - bytes[1] - ) - } - bincode::deserialize(&bytes[2..]) - .unwrap_or_else(|e| pgx::error!("deserialization error {}", e)) - }; - state.into() - }}; -}