Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Switch to first-class aggregate support in pgx #62

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ pg_test = ["serde_json"]

[dependencies]
bincode = "1.3.1"
pgx = "0.2.5"
pgx-macros = "0.2.5"
pgx = "0.3.1"
pgx-macros = "0.3.1"
num_cpus = "1.13.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.70", optional = true }

[dev-dependencies]
pgx-tests = "0.2.5"
pgx-tests = "0.3.1"

[patch.crates-io]
pgx = { git = "https://github.com/timescale/pgx", branch = "promscale-staging" }
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RUN \
# Remove crt-static feature on musl target to allow building cdylibs
ENV RUSTFLAGS="-C target-feature=-crt-static"
RUN --mount=type=cache,uid=70,gid=70,target=/build/promscale/.cargo/registry \
cargo install cargo-pgx --git https://github.com/timescale/pgx --branch promscale-staging --rev 271be6a1 && \
cargo install cargo-pgx --git https://github.com/timescale/pgx --branch promscale-staging --rev ee52db6b && \
cargo pgx init --${PG_VERSION_TAG} $(which pg_config)

USER root
Expand Down
33 changes: 0 additions & 33 deletions src/aggregate_utils.rs

This file was deleted.

24 changes: 0 additions & 24 deletions src/aggregates/gapfill_delta.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,9 @@
use crate::aggregates::{Microseconds, Milliseconds, STALE_NAN, USECS_PER_MS, USECS_PER_SEC};
use crate::palloc::{Inner, InternalAsValue};
use pgx::*;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;

#[pg_extern(immutable, parallel_safe)]
pub fn prom_extrapolate_final(state: Internal) -> Option<Vec<Option<f64>>> {
prom_extrapolate_final_inner(unsafe { state.to_inner() })
}
pub fn prom_extrapolate_final_inner(
state: Option<Inner<GapfillDeltaTransition>>,
) -> Option<Vec<Option<f64>>> {
state.map(|mut s| s.as_vec())
}

/// Backwards compatibility
#[no_mangle]
pub extern "C" fn pg_finfo_gapfill_delta_final() -> &'static pg_sys::Pg_finfo_record {
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
&V1_API
}

#[no_mangle]
unsafe extern "C" fn gapfill_delta_final(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
prom_extrapolate_final_wrapper(fcinfo)
}

#[derive(Serialize, Deserialize, PostgresType, Debug)]
#[pgx(sql = false)]
pub struct GapfillDeltaTransition {
window: VecDeque<(pg_sys::TimestampTz, f64)>,
// a Datum for each index in the array, 0 by convention if the value is NULL
Expand Down
148 changes: 43 additions & 105 deletions src/aggregates/prom_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,122 +2,60 @@ use pgx::*;

use pgx::error;

use crate::aggregate_utils::in_aggregate_context;
use crate::aggregates::{GapfillDeltaTransition, Milliseconds};
use crate::palloc::{Inner, InternalAsValue, ToInternal};

#[allow(non_camel_case_types)]
pub struct prom_delta;

// prom divides time into sliding windows of fixed size, e.g.
// | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds |
// we take the first and last values in that bucket and uses `last-first` as the
// value for that bucket.
// | a b c d e | f g h i | j k | m |
// | e - a | i - f | k - j | <null> |
#[allow(clippy::too_many_arguments)]
#[pg_extern(immutable, parallel_safe)]
pub fn prom_delta_transition(
state: Internal,
lowest_time: pg_sys::TimestampTz,
greatest_time: pg_sys::TimestampTz,
step_size: Milliseconds, // `prev_now - step_size` is where the next window starts
range: Milliseconds, // the size of a window to delta over
sample_time: pg_sys::TimestampTz,
sample_value: f64,
fc: pg_sys::FunctionCallInfo,
) -> Internal {
prom_delta_transition_inner(
unsafe { state.to_inner() },
lowest_time,
greatest_time,
step_size,
range,
sample_time,
sample_value,
fc,
)
.internal()
}

#[allow(clippy::too_many_arguments)]
fn prom_delta_transition_inner(
state: Option<Inner<GapfillDeltaTransition>>,
lowest_time: pg_sys::TimestampTz,
greatest_time: pg_sys::TimestampTz,
step_size: Milliseconds, // `prev_now - step` is where the next window starts
range: Milliseconds, // the size of a window to delta over
sample_time: pg_sys::TimestampTz,
sample_value: f64,
fc: pg_sys::FunctionCallInfo,
) -> Option<Inner<GapfillDeltaTransition>> {
unsafe {
in_aggregate_context(fc, || {
if sample_time < lowest_time || sample_time > greatest_time {
error!("input time less than lowest time")
}

let mut state = state.unwrap_or_else(|| {
let state: Inner<_> = GapfillDeltaTransition::new(
lowest_time,
greatest_time,
range,
step_size,
false,
false,
)
.into();
state
});

state.add_data_point(sample_time, sample_value);

Some(state)
})
#[pg_aggregate]
impl Aggregate for prom_delta {
type State = Option<GapfillDeltaTransition>;
type Args = (
name!(lowest_time, pg_sys::TimestampTz),
name!(greatest_time, pg_sys::TimestampTz),
name!(step_size, Milliseconds),
name!(range, Milliseconds),
name!(sample_time, pg_sys::TimestampTz),
name!(sample_value, f64),
);
type Finalize = Option<Vec<Option<f64>>>;

fn state(
state: Self::State,
(lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args,
_: pg_sys::FunctionCallInfo,
) -> Self::State {
if sample_time < lowest_time || sample_time > greatest_time {
error!(format!(
"input time {} not in bounds [{}, {}]",
sample_time, lowest_time, greatest_time
))
}

let mut state = state.unwrap_or_else(|| {
GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, false, false)
});

state.add_data_point(sample_time, sample_value);

Some(state)
}
}

/// Backwards compatibility
#[no_mangle]
pub extern "C" fn pg_finfo_gapfill_delta_transition() -> &'static pg_sys::Pg_finfo_record {
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
&V1_API
}

#[no_mangle]
unsafe extern "C" fn gapfill_delta_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
prom_delta_transition_wrapper(fcinfo)
}

#[no_mangle]
pub extern "C" fn pg_finfo_prom_delta_final_wrapper() -> &'static pg_sys::Pg_finfo_record {
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
&V1_API
}

#[no_mangle]
unsafe extern "C" fn prom_delta_final_wrapper(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
super::gapfill_delta::prom_extrapolate_final_wrapper(fcinfo)
fn finalize(
current: Self::State,
_: Self::OrderedSetArgs,
_: pg_sys::FunctionCallInfo,
) -> Self::Finalize {
current.map(|mut s| s.as_vec())
}
}

// implementation of prometheus delta function
// for proper behavior the input must be ORDER BY sample_time
extension_sql!(
r#"
CREATE OR REPLACE AGGREGATE @[email protected]_delta(
lowest_time TIMESTAMPTZ,
greatest_time TIMESTAMPTZ,
step_size BIGINT,
range BIGINT,
sample_time TIMESTAMPTZ,
sample_value DOUBLE PRECISION)
(
sfunc=@[email protected]_delta_transition,
stype=internal,
finalfunc=@[email protected]_extrapolate_final
);
"#,
name = "create_prom_delta_aggregate",
requires = [prom_delta_transition, prom_extrapolate_final]
);

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down Expand Up @@ -164,7 +102,7 @@ mod tests {
Spi::get_one::<Vec<f64>>(&*prepare_query("'2000-01-02 15:00:00 UTC'", "NULL"));
}

#[pg_test(error = "input time less than lowest time")]
#[pg_test(error = "input time 631292400000000 not in bounds [140400000000, 143100000000]")]
fn test_prom_delta_with_input_time_less_than_lowest_time_fails() {
setup();
Spi::get_one::<Vec<f64>>(&*prepare_query(
Expand Down
Loading