Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 153a27e

Browse files
committed
Switch to first-class aggregate support in pgx
- Pgx automatically handles in_aggregate_context, so we can remove some (now) unused code. - Because vector_selector's state type is no longer INTERNAL, we don't need to provide (de)serialization functions. - We have removed backwards-compatibility wrappers for aggregates.
1 parent aabfee4 commit 153a27e

10 files changed

+186
-672
lines changed

src/aggregate_utils.rs

-33
This file was deleted.

src/aggregates/gapfill_delta.rs

-23
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,8 @@
11
use crate::aggregates::{Microseconds, Milliseconds, STALE_NAN, USECS_PER_MS, USECS_PER_SEC};
2-
use crate::palloc::{Inner, InternalAsValue};
32
use pgx::*;
43
use serde::{Deserialize, Serialize};
54
use std::collections::VecDeque;
65

7-
#[pg_extern(immutable, parallel_safe)]
8-
pub fn prom_extrapolate_final(state: Internal) -> Option<Vec<Option<f64>>> {
9-
prom_extrapolate_final_inner(unsafe { state.to_inner() })
10-
}
11-
pub fn prom_extrapolate_final_inner(
12-
state: Option<Inner<GapfillDeltaTransition>>,
13-
) -> Option<Vec<Option<f64>>> {
14-
state.map(|mut s| s.as_vec())
15-
}
16-
17-
/// Backwards compatibility
18-
#[no_mangle]
19-
pub extern "C" fn pg_finfo_gapfill_delta_final() -> &'static pg_sys::Pg_finfo_record {
20-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
21-
&V1_API
22-
}
23-
24-
#[no_mangle]
25-
unsafe extern "C" fn gapfill_delta_final(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
26-
prom_extrapolate_final_wrapper(fcinfo)
27-
}
28-
296
#[derive(Serialize, Deserialize, PostgresType, Debug)]
307
pub struct GapfillDeltaTransition {
318
window: VecDeque<(pg_sys::TimestampTz, f64)>,

src/aggregates/prom_delta.rs

+43-105
Original file line numberDiff line numberDiff line change
@@ -2,122 +2,60 @@ use pgx::*;
22

33
use pgx::error;
44

5-
use crate::aggregate_utils::in_aggregate_context;
65
use crate::aggregates::{GapfillDeltaTransition, Milliseconds};
7-
use crate::palloc::{Inner, InternalAsValue, ToInternal};
6+
7+
#[allow(non_camel_case_types)]
8+
pub struct prom_delta;
89

910
// prom divides time into sliding windows of fixed size, e.g.
1011
// | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds |
1112
// we take the first and last values in that bucket and uses `last-first` as the
1213
// value for that bucket.
1314
// | a b c d e | f g h i | j k | m |
1415
// | e - a | i - f | k - j | <null> |
15-
#[allow(clippy::too_many_arguments)]
16-
#[pg_extern(immutable, parallel_safe)]
17-
pub fn prom_delta_transition(
18-
state: Internal,
19-
lowest_time: pg_sys::TimestampTz,
20-
greatest_time: pg_sys::TimestampTz,
21-
step_size: Milliseconds, // `prev_now - step_size` is where the next window starts
22-
range: Milliseconds, // the size of a window to delta over
23-
sample_time: pg_sys::TimestampTz,
24-
sample_value: f64,
25-
fc: pg_sys::FunctionCallInfo,
26-
) -> Internal {
27-
prom_delta_transition_inner(
28-
unsafe { state.to_inner() },
29-
lowest_time,
30-
greatest_time,
31-
step_size,
32-
range,
33-
sample_time,
34-
sample_value,
35-
fc,
36-
)
37-
.internal()
38-
}
39-
40-
#[allow(clippy::too_many_arguments)]
41-
fn prom_delta_transition_inner(
42-
state: Option<Inner<GapfillDeltaTransition>>,
43-
lowest_time: pg_sys::TimestampTz,
44-
greatest_time: pg_sys::TimestampTz,
45-
step_size: Milliseconds, // `prev_now - step` is where the next window starts
46-
range: Milliseconds, // the size of a window to delta over
47-
sample_time: pg_sys::TimestampTz,
48-
sample_value: f64,
49-
fc: pg_sys::FunctionCallInfo,
50-
) -> Option<Inner<GapfillDeltaTransition>> {
51-
unsafe {
52-
in_aggregate_context(fc, || {
53-
if sample_time < lowest_time || sample_time > greatest_time {
54-
error!("input time less than lowest time")
55-
}
56-
57-
let mut state = state.unwrap_or_else(|| {
58-
let state: Inner<_> = GapfillDeltaTransition::new(
59-
lowest_time,
60-
greatest_time,
61-
range,
62-
step_size,
63-
false,
64-
false,
65-
)
66-
.into();
67-
state
68-
});
69-
70-
state.add_data_point(sample_time, sample_value);
71-
72-
Some(state)
73-
})
16+
#[pg_aggregate]
17+
impl Aggregate for prom_delta {
18+
type State = Option<GapfillDeltaTransition>;
19+
type Args = (
20+
name!(lowest_time, pg_sys::TimestampTz),
21+
name!(greatest_time, pg_sys::TimestampTz),
22+
name!(step_size, Milliseconds),
23+
name!(range, Milliseconds),
24+
name!(sample_time, pg_sys::TimestampTz),
25+
name!(sample_value, f64),
26+
);
27+
type Finalize = Option<Vec<Option<f64>>>;
28+
29+
fn state(
30+
state: Self::State,
31+
(lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args,
32+
_: pg_sys::FunctionCallInfo,
33+
) -> Self::State {
34+
if sample_time < lowest_time || sample_time > greatest_time {
35+
error!(format!(
36+
"input time {} not in bounds [{}, {}]",
37+
sample_time, lowest_time, greatest_time
38+
))
39+
}
40+
41+
let mut state = state.unwrap_or_else(|| {
42+
GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, false, false)
43+
});
44+
45+
state.add_data_point(sample_time, sample_value);
46+
47+
Some(state)
7448
}
75-
}
76-
77-
/// Backwards compatibility
78-
#[no_mangle]
79-
pub extern "C" fn pg_finfo_gapfill_delta_transition() -> &'static pg_sys::Pg_finfo_record {
80-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
81-
&V1_API
82-
}
8349

84-
#[no_mangle]
85-
unsafe extern "C" fn gapfill_delta_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
86-
prom_delta_transition_wrapper(fcinfo)
87-
}
88-
89-
#[no_mangle]
90-
pub extern "C" fn pg_finfo_prom_delta_final_wrapper() -> &'static pg_sys::Pg_finfo_record {
91-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
92-
&V1_API
93-
}
94-
95-
#[no_mangle]
96-
unsafe extern "C" fn prom_delta_final_wrapper(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
97-
super::gapfill_delta::prom_extrapolate_final_wrapper(fcinfo)
50+
fn finalize(
51+
current: Self::State,
52+
_: Self::OrderedSetArgs,
53+
_: pg_sys::FunctionCallInfo,
54+
) -> Self::Finalize {
55+
current.map(|mut s| s.as_vec())
56+
}
9857
}
9958

100-
// implementation of prometheus delta function
101-
// for proper behavior the input must be ORDER BY sample_time
102-
extension_sql!(
103-
r#"
104-
CREATE OR REPLACE AGGREGATE @[email protected]_delta(
105-
lowest_time TIMESTAMPTZ,
106-
greatest_time TIMESTAMPTZ,
107-
step_size BIGINT,
108-
range BIGINT,
109-
sample_time TIMESTAMPTZ,
110-
sample_value DOUBLE PRECISION)
111-
(
112-
sfunc=@[email protected]_delta_transition,
113-
stype=internal,
114-
finalfunc=@[email protected]_extrapolate_final
115-
);
116-
"#,
117-
name = "create_prom_delta_aggregate",
118-
requires = [prom_delta_transition, prom_extrapolate_final]
119-
);
120-
12159
#[cfg(any(test, feature = "pg_test"))]
12260
#[pg_schema]
12361
mod tests {
@@ -164,7 +102,7 @@ mod tests {
164102
Spi::get_one::<Vec<f64>>(&*prepare_query("'2000-01-02 15:00:00 UTC'", "NULL"));
165103
}
166104

167-
#[pg_test(error = "input time less than lowest time")]
105+
#[pg_test(error = "input time 631292400000000 not in bounds [140400000000, 143100000000]")]
168106
fn test_prom_delta_with_input_time_less_than_lowest_time_fails() {
169107
setup();
170108
Spi::get_one::<Vec<f64>>(&*prepare_query(

src/aggregates/prom_increase.rs

+41-97
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,52 @@
1-
use pgx::error;
2-
use pgx::Internal;
31
use pgx::*;
42

5-
use crate::aggregate_utils::in_aggregate_context;
63
use crate::aggregates::{GapfillDeltaTransition, Milliseconds};
7-
use crate::palloc::{Inner, InternalAsValue, ToInternal};
84

9-
#[allow(clippy::too_many_arguments)]
10-
#[pg_extern(immutable, parallel_safe)]
11-
pub fn prom_increase_transition(
12-
state: Internal,
13-
lowest_time: pg_sys::TimestampTz,
14-
greatest_time: pg_sys::TimestampTz,
15-
step_size: Milliseconds, // `prev_now - step_size` is where the next window starts
16-
range: Milliseconds, // the size of a window to delta over
17-
sample_time: pg_sys::TimestampTz,
18-
sample_value: f64,
19-
fc: pg_sys::FunctionCallInfo,
20-
) -> Internal {
21-
prom_increase_transition_inner(
22-
unsafe { state.to_inner() },
23-
lowest_time,
24-
greatest_time,
25-
step_size,
26-
range,
27-
sample_time,
28-
sample_value,
29-
fc,
30-
)
31-
.internal()
32-
}
33-
34-
#[allow(clippy::too_many_arguments)]
35-
fn prom_increase_transition_inner(
36-
state: Option<Inner<GapfillDeltaTransition>>,
37-
lowest_time: pg_sys::TimestampTz,
38-
greatest_time: pg_sys::TimestampTz,
39-
step_size: Milliseconds, // `prev_now - step` is where the next window starts
40-
range: Milliseconds, // the size of a window to delta over
41-
sample_time: pg_sys::TimestampTz,
42-
sample_value: f64,
43-
fc: pg_sys::FunctionCallInfo,
44-
) -> Option<Inner<GapfillDeltaTransition>> {
45-
unsafe {
46-
in_aggregate_context(fc, || {
47-
if sample_time < lowest_time || sample_time > greatest_time {
48-
error!("input time less than lowest time")
49-
}
50-
51-
let mut state = state.unwrap_or_else(|| {
52-
let state: Inner<_> = GapfillDeltaTransition::new(
53-
lowest_time,
54-
greatest_time,
55-
range,
56-
step_size,
57-
true,
58-
false,
59-
)
60-
.into();
61-
state
62-
});
63-
64-
state.add_data_point(sample_time, sample_value);
65-
66-
Some(state)
67-
})
5+
#[allow(non_camel_case_types)]
6+
pub struct prom_increase;
7+
8+
#[pg_aggregate]
9+
impl Aggregate for prom_increase {
10+
type State = Option<GapfillDeltaTransition>;
11+
type Args = (
12+
name!(lowest_time, pg_sys::TimestampTz),
13+
name!(greatest_time, pg_sys::TimestampTz),
14+
name!(step_size, Milliseconds),
15+
name!(range, Milliseconds),
16+
name!(sample_time, pg_sys::TimestampTz),
17+
name!(sample_value, f64),
18+
);
19+
type Finalize = Option<Vec<Option<f64>>>;
20+
21+
fn state(
22+
state: Self::State,
23+
(lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args,
24+
_: pg_sys::FunctionCallInfo,
25+
) -> Self::State {
26+
if sample_time < lowest_time || sample_time > greatest_time {
27+
error!(format!(
28+
"input time {} not in bounds [{}, {}]",
29+
sample_time, lowest_time, greatest_time
30+
))
31+
}
32+
33+
let mut state = state.unwrap_or_else(|| {
34+
GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, true, false)
35+
});
36+
37+
state.add_data_point(sample_time, sample_value);
38+
Some(state)
6839
}
69-
}
70-
71-
/// Backwards compatibility
72-
#[no_mangle]
73-
pub extern "C" fn pg_finfo_gapfill_increase_transition() -> &'static pg_sys::Pg_finfo_record {
74-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
75-
&V1_API
76-
}
7740

78-
#[no_mangle]
79-
unsafe extern "C" fn gapfill_increase_transition(
80-
fcinfo: pg_sys::FunctionCallInfo,
81-
) -> pg_sys::Datum {
82-
prom_increase_transition_wrapper(fcinfo)
41+
fn finalize(
42+
current: Self::State,
43+
_: Self::OrderedSetArgs,
44+
_: pg_sys::FunctionCallInfo,
45+
) -> Self::Finalize {
46+
current.map(|mut s| s.as_vec())
47+
}
8348
}
8449

85-
// implementation of prometheus increase function
86-
// for proper behavior the input must be ORDER BY sample_time
87-
extension_sql!(
88-
r#"
89-
CREATE OR REPLACE AGGREGATE @[email protected]_increase(
90-
lowest_time TIMESTAMPTZ,
91-
greatest_time TIMESTAMPTZ,
92-
step_size BIGINT,
93-
range BIGINT,
94-
sample_time TIMESTAMPTZ,
95-
sample_value DOUBLE PRECISION)
96-
(
97-
sfunc=@[email protected]_increase_transition,
98-
stype=internal,
99-
finalfunc=@[email protected]_extrapolate_final
100-
);
101-
"#,
102-
name = "create_prom_increase_aggregate",
103-
requires = [prom_increase_transition, prom_extrapolate_final]
104-
);
105-
10650
#[cfg(any(test, feature = "pg_test"))]
10751
#[pg_schema]
10852
mod tests {

0 commit comments

Comments
 (0)