Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 73025eb

Browse files
committed
Switch to first-class aggregate support in pgx
Note this is a WIP, and removes backwards-compatibility functions.
1 parent f32671f commit 73025eb

11 files changed

+183
-683
lines changed

Cargo.lock

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aggregate_utils.rs

-33
This file was deleted.

src/aggregates/gapfill_delta.rs

-23
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,8 @@
11
use crate::aggregates::{Microseconds, Milliseconds, STALE_NAN, USECS_PER_MS, USECS_PER_SEC};
2-
use crate::palloc::{Inner, InternalAsValue};
32
use pgx::*;
43
use serde::{Deserialize, Serialize};
54
use std::collections::VecDeque;
65

7-
#[pg_extern(immutable, parallel_safe)]
8-
pub fn prom_extrapolate_final(state: Internal) -> Option<Vec<Option<f64>>> {
9-
prom_extrapolate_final_inner(unsafe { state.to_inner() })
10-
}
11-
pub fn prom_extrapolate_final_inner(
12-
state: Option<Inner<GapfillDeltaTransition>>,
13-
) -> Option<Vec<Option<f64>>> {
14-
state.map(|mut s| s.as_vec())
15-
}
16-
17-
/// Backwards compatibility
18-
#[no_mangle]
19-
pub extern "C" fn pg_finfo_gapfill_delta_final() -> &'static pg_sys::Pg_finfo_record {
20-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
21-
&V1_API
22-
}
23-
24-
#[no_mangle]
25-
unsafe extern "C" fn gapfill_delta_final(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
26-
prom_extrapolate_final_wrapper(fcinfo)
27-
}
28-
296
#[derive(Serialize, Deserialize, PostgresType, Debug)]
307
pub struct GapfillDeltaTransition {
318
window: VecDeque<(pg_sys::TimestampTz, f64)>,

src/aggregates/prom_delta.rs

+39-105
Original file line numberDiff line numberDiff line change
@@ -2,122 +2,56 @@ use pgx::*;
22

33
use pgx::error;
44

5-
use crate::aggregate_utils::in_aggregate_context;
65
use crate::aggregates::{GapfillDeltaTransition, Milliseconds};
7-
use crate::palloc::{Inner, InternalAsValue, ToInternal};
6+
7+
#[allow(non_camel_case_types)]
8+
pub struct prom_delta;
89

910
// prom divides time into sliding windows of fixed size, e.g.
1011
// | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds |
1112
// we take the first and last values in that bucket and uses `last-first` as the
1213
// value for that bucket.
1314
// | a b c d e | f g h i | j k | m |
1415
// | e - a | i - f | k - j | <null> |
15-
#[allow(clippy::too_many_arguments)]
16-
#[pg_extern(immutable, parallel_safe)]
17-
pub fn prom_delta_transition(
18-
state: Internal,
19-
lowest_time: pg_sys::TimestampTz,
20-
greatest_time: pg_sys::TimestampTz,
21-
step_size: Milliseconds, // `prev_now - step_size` is where the next window starts
22-
range: Milliseconds, // the size of a window to delta over
23-
sample_time: pg_sys::TimestampTz,
24-
sample_value: f64,
25-
fc: pg_sys::FunctionCallInfo,
26-
) -> Internal {
27-
prom_delta_transition_inner(
28-
unsafe { state.to_inner() },
29-
lowest_time,
30-
greatest_time,
31-
step_size,
32-
range,
33-
sample_time,
34-
sample_value,
35-
fc,
36-
)
37-
.internal()
38-
}
39-
40-
#[allow(clippy::too_many_arguments)]
41-
fn prom_delta_transition_inner(
42-
state: Option<Inner<GapfillDeltaTransition>>,
43-
lowest_time: pg_sys::TimestampTz,
44-
greatest_time: pg_sys::TimestampTz,
45-
step_size: Milliseconds, // `prev_now - step` is where the next window starts
46-
range: Milliseconds, // the size of a window to delta over
47-
sample_time: pg_sys::TimestampTz,
48-
sample_value: f64,
49-
fc: pg_sys::FunctionCallInfo,
50-
) -> Option<Inner<GapfillDeltaTransition>> {
51-
unsafe {
52-
in_aggregate_context(fc, || {
53-
if sample_time < lowest_time || sample_time > greatest_time {
54-
error!("input time less than lowest time")
55-
}
56-
57-
let mut state = state.unwrap_or_else(|| {
58-
let state: Inner<_> = GapfillDeltaTransition::new(
59-
lowest_time,
60-
greatest_time,
61-
range,
62-
step_size,
63-
false,
64-
false,
65-
)
66-
.into();
67-
state
68-
});
69-
70-
state.add_data_point(sample_time, sample_value);
71-
72-
Some(state)
73-
})
16+
#[pg_aggregate]
17+
impl Aggregate for prom_delta {
18+
type State = Option<GapfillDeltaTransition>;
19+
type Args = (
20+
name!(lowest_time, pg_sys::TimestampTz),
21+
name!(greatest_time, pg_sys::TimestampTz),
22+
name!(step_size, Milliseconds),
23+
name!(range, Milliseconds),
24+
name!(sample_time, pg_sys::TimestampTz),
25+
name!(sample_value, f64),
26+
);
27+
type Finalize = Option<Vec<Option<f64>>>;
28+
29+
fn state(
30+
state: Self::State,
31+
(lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args,
32+
_: pg_sys::FunctionCallInfo,
33+
) -> Self::State {
34+
if sample_time < lowest_time || sample_time > greatest_time {
35+
error!(format!(
36+
"input time {} not in bounds [{}, {}]",
37+
sample_time, lowest_time, greatest_time
38+
))
39+
}
40+
41+
let mut state = state.unwrap_or_else(|| {
42+
GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, false, false)
43+
});
44+
45+
state.add_data_point(sample_time, sample_value);
46+
47+
Some(state)
7448
}
75-
}
76-
77-
/// Backwards compatibility
78-
#[no_mangle]
79-
pub extern "C" fn pg_finfo_gapfill_delta_transition() -> &'static pg_sys::Pg_finfo_record {
80-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
81-
&V1_API
82-
}
8349

84-
#[no_mangle]
85-
unsafe extern "C" fn gapfill_delta_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
86-
prom_delta_transition_wrapper(fcinfo)
87-
}
88-
89-
#[no_mangle]
90-
pub extern "C" fn pg_finfo_prom_delta_final_wrapper() -> &'static pg_sys::Pg_finfo_record {
91-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
92-
&V1_API
93-
}
94-
95-
#[no_mangle]
96-
unsafe extern "C" fn prom_delta_final_wrapper(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
97-
super::gapfill_delta::prom_extrapolate_final_wrapper(fcinfo)
50+
fn finalize(current: Self::State, _: pg_sys::FunctionCallInfo) -> Self::Finalize {
51+
current.map(|mut s| s.as_vec())
52+
}
9853
}
9954

100-
// implementation of prometheus delta function
101-
// for proper behavior the input must be ORDER BY sample_time
102-
extension_sql!(
103-
r#"
104-
CREATE AGGREGATE @[email protected]_delta(
105-
lowest_time TIMESTAMPTZ,
106-
greatest_time TIMESTAMPTZ,
107-
step_size BIGINT,
108-
range BIGINT,
109-
sample_time TIMESTAMPTZ,
110-
sample_value DOUBLE PRECISION)
111-
(
112-
sfunc=@[email protected]_delta_transition,
113-
stype=internal,
114-
finalfunc=@[email protected]_extrapolate_final
115-
);
116-
"#,
117-
name = "create_prom_delta_aggregate",
118-
requires = [prom_delta_transition, prom_extrapolate_final]
119-
);
120-
12155
#[cfg(any(test, feature = "pg_test"))]
12256
#[pg_schema]
12357
mod tests {
@@ -164,7 +98,7 @@ mod tests {
16498
Spi::get_one::<Vec<f64>>(&*prepare_query("'2000-01-02 15:00:00 UTC'", "NULL"));
16599
}
166100

167-
#[pg_test(error = "input time less than lowest time")]
101+
#[pg_test(error = "input time 631292400000000 not in bounds [140400000000, 143100000000]")]
168102
fn test_prom_delta_with_input_time_less_than_lowest_time_fails() {
169103
setup();
170104
Spi::get_one::<Vec<f64>>(&*prepare_query(

0 commit comments

Comments
 (0)