Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 2c75f82

Browse files
committed
WIP: port remaining aggregates to new aggregate framework
Note: This depends on some changes to the aggregate implementation
1 parent 8b4175d commit 2c75f82

File tree

6 files changed

+197
-352
lines changed

6 files changed

+197
-352
lines changed

Cargo.lock

+6-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ serde_json = { version = "1.0.70", optional = true }
3636
pgx-tests = "0.2.5"
3737

3838
[patch.crates-io]
39-
pgx = { git = "https://github.com/ZomboDB/pgx", branch = "aggregate" }
40-
pgx-macros = { git = "https://github.com/ZomboDB/pgx", branch = "aggregate" }
41-
pgx-tests = { git = "https://github.com/ZomboDB/pgx", branch = "aggregate" }
39+
pgx = { path = "../pgx/pgx" }
40+
pgx-macros = { path = "../pgx/pgx-macros" }
41+
pgx-tests = { path = "../pgx/pgx-tests" }

src/aggregates/prom_delta.rs

+48-102
Original file line numberDiff line numberDiff line change
@@ -7,118 +7,64 @@ use crate::aggregates::{GapfillDeltaTransition, Milliseconds};
77
use crate::palloc::{Inner, InternalAsValue, ToInternal};
88
use crate::raw::TimestampTz;
99

10+
#[allow(non_camel_case_types)]
11+
pub struct prom_delta;
12+
1013
// prom divides time into sliding windows of fixed size, e.g.
1114
// | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds |
1215
// we take the first and last values in that bucket and uses `last-first` as the
1316
// value for that bucket.
1417
// | a b c d e | f g h i | j k | m |
1518
// | e - a | i - f | k - j | <null> |
16-
#[allow(clippy::too_many_arguments)]
17-
#[pg_extern(immutable, parallel_safe)]
18-
pub fn prom_delta_transition(
19-
state: Internal,
20-
lowest_time: TimestampTz,
21-
greatest_time: TimestampTz,
22-
step_size: Milliseconds, // `prev_now - step_size` is where the next window starts
23-
range: Milliseconds, // the size of a window to delta over
24-
sample_time: TimestampTz,
25-
sample_value: f64,
26-
fc: pg_sys::FunctionCallInfo,
27-
) -> Internal {
28-
prom_delta_transition_inner(
29-
unsafe { state.to_inner() },
30-
lowest_time.into(),
31-
greatest_time.into(),
32-
step_size,
33-
range,
34-
sample_time.into(),
35-
sample_value,
36-
fc,
37-
)
38-
.internal()
39-
}
40-
41-
#[allow(clippy::too_many_arguments)]
42-
fn prom_delta_transition_inner(
43-
state: Option<Inner<GapfillDeltaTransition>>,
44-
lowest_time: pg_sys::TimestampTz,
45-
greatest_time: pg_sys::TimestampTz,
46-
step_size: Milliseconds, // `prev_now - step` is where the next window starts
47-
range: Milliseconds, // the size of a window to delta over
48-
sample_time: pg_sys::TimestampTz,
49-
sample_value: f64,
50-
fc: pg_sys::FunctionCallInfo,
51-
) -> Option<Inner<GapfillDeltaTransition>> {
52-
unsafe {
53-
in_aggregate_context(fc, || {
54-
if sample_time < lowest_time || sample_time > greatest_time {
55-
error!("input time less than lowest time")
56-
}
57-
58-
let mut state = state.unwrap_or_else(|| {
59-
let state: Inner<_> = GapfillDeltaTransition::new(
60-
lowest_time,
61-
greatest_time,
62-
range,
63-
step_size,
64-
false,
65-
false,
66-
)
67-
.into();
68-
state
69-
});
70-
71-
state.add_data_point(sample_time, sample_value);
72-
73-
Some(state)
74-
})
19+
#[pg_aggregate]
20+
impl Aggregate for prom_delta {
21+
type State = Option<GapfillDeltaTransition>;
22+
type Args = (
23+
TimestampTz,
24+
TimestampTz,
25+
Milliseconds,
26+
Milliseconds,
27+
TimestampTz,
28+
f64,
29+
);
30+
type Finalize = Option<Vec<Option<f64>>>;
31+
32+
fn state(
33+
state: Self::State,
34+
(lowest_time, greatest_time, step_size, range, sample_time, sample_value): Self::Args,
35+
) -> Self::State {
36+
let sample_time: pg_sys::TimestampTz = sample_time.into();
37+
let lowest_time: pg_sys::TimestampTz = lowest_time.into();
38+
let greatest_time: pg_sys::TimestampTz = greatest_time.into();
39+
40+
if sample_time < lowest_time || sample_time > greatest_time {
41+
error!(format!(
42+
"input time {} not in bounds [{}, {}]",
43+
sample_time, lowest_time, greatest_time
44+
))
45+
}
46+
47+
let mut state = state.unwrap_or_else(|| {
48+
GapfillDeltaTransition::new(
49+
lowest_time,
50+
greatest_time,
51+
range,
52+
step_size,
53+
false,
54+
false,
55+
)
56+
});
57+
58+
state.add_data_point(sample_time, sample_value);
59+
60+
Some(state)
7561
}
76-
}
77-
78-
/// Backwards compatibility
79-
#[no_mangle]
80-
pub extern "C" fn pg_finfo_gapfill_delta_transition() -> &'static pg_sys::Pg_finfo_record {
81-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
82-
&V1_API
83-
}
8462

85-
#[no_mangle]
86-
unsafe extern "C" fn gapfill_delta_transition(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
87-
prom_delta_transition_wrapper(fcinfo)
88-
}
89-
90-
#[no_mangle]
91-
pub extern "C" fn pg_finfo_prom_delta_final_wrapper() -> &'static pg_sys::Pg_finfo_record {
92-
const V1_API: pg_sys::Pg_finfo_record = pg_sys::Pg_finfo_record { api_version: 1 };
93-
&V1_API
94-
}
95-
96-
#[no_mangle]
97-
unsafe extern "C" fn prom_delta_final_wrapper(fcinfo: pg_sys::FunctionCallInfo) -> pg_sys::Datum {
98-
super::gapfill_delta::prom_extrapolate_final_wrapper(fcinfo)
63+
fn finalize(current: Self::State) -> Self::Finalize {
64+
current.map(|mut s| s.as_vec())
65+
}
9966
}
10067

101-
// implementation of prometheus delta function
102-
// for proper behavior the input must be ORDER BY sample_time
103-
extension_sql!(
104-
r#"
105-
CREATE AGGREGATE @[email protected]_delta(
106-
lowest_time TIMESTAMPTZ,
107-
greatest_time TIMESTAMPTZ,
108-
step_size BIGINT,
109-
range BIGINT,
110-
sample_time TIMESTAMPTZ,
111-
sample_value DOUBLE PRECISION)
112-
(
113-
sfunc=@[email protected]_delta_transition,
114-
stype=internal,
115-
finalfunc=@[email protected]_extrapolate_final
116-
);
117-
"#,
118-
name = "create_prom_delta_aggregate",
119-
requires = [prom_delta_transition, prom_extrapolate_final]
120-
);
121-
12268
#[cfg(any(test, feature = "pg_test"))]
12369
#[pg_schema]
12470
mod tests {

src/aggregates/prom_increase.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@ impl Aggregate for prom_increase {
2626
let sample_time: pg_sys::TimestampTz = sample_time.into();
2727
let lowest_time: pg_sys::TimestampTz = lowest_time.into();
2828
let greatest_time: pg_sys::TimestampTz = greatest_time.into();
29+
2930
if sample_time < lowest_time || sample_time > greatest_time {
30-
error!("input time less than lowest time")
31+
error!(format!(
32+
"input time {} not in bounds [{}, {}]",
33+
sample_time, lowest_time, greatest_time
34+
))
3135
}
3236

33-
// TODO: Missing in_aggregate_context magic
3437
let mut state = state.unwrap_or_else(|| {
3538
GapfillDeltaTransition::new(lowest_time, greatest_time, range, step_size, true, false)
3639
});

0 commit comments

Comments
 (0)