@@ -2,122 +2,56 @@ use pgx::*;
2
2
3
3
use pgx:: error;
4
4
5
- use crate :: aggregate_utils:: in_aggregate_context;
6
5
use crate :: aggregates:: { GapfillDeltaTransition , Milliseconds } ;
7
- use crate :: palloc:: { Inner , InternalAsValue , ToInternal } ;
6
+
7
+ #[ allow( non_camel_case_types) ]
8
+ pub struct prom_delta ;
8
9
9
10
// prom divides time into sliding windows of fixed size, e.g.
10
11
// | 5 seconds | 5 seconds | 5 seconds | 5 seconds | 5 seconds |
11
12
// we take the first and last values in that bucket and uses `last-first` as the
12
13
// value for that bucket.
13
14
// | a b c d e | f g h i | j k | m |
14
15
// | e - a | i - f | k - j | <null> |
15
- #[ allow( clippy:: too_many_arguments) ]
16
- #[ pg_extern( immutable, parallel_safe) ]
17
- pub fn prom_delta_transition (
18
- state : Internal ,
19
- lowest_time : pg_sys:: TimestampTz ,
20
- greatest_time : pg_sys:: TimestampTz ,
21
- step_size : Milliseconds , // `prev_now - step_size` is where the next window starts
22
- range : Milliseconds , // the size of a window to delta over
23
- sample_time : pg_sys:: TimestampTz ,
24
- sample_value : f64 ,
25
- fc : pg_sys:: FunctionCallInfo ,
26
- ) -> Internal {
27
- prom_delta_transition_inner (
28
- unsafe { state. to_inner ( ) } ,
29
- lowest_time,
30
- greatest_time,
31
- step_size,
32
- range,
33
- sample_time,
34
- sample_value,
35
- fc,
36
- )
37
- . internal ( )
38
- }
39
-
40
- #[ allow( clippy:: too_many_arguments) ]
41
- fn prom_delta_transition_inner (
42
- state : Option < Inner < GapfillDeltaTransition > > ,
43
- lowest_time : pg_sys:: TimestampTz ,
44
- greatest_time : pg_sys:: TimestampTz ,
45
- step_size : Milliseconds , // `prev_now - step` is where the next window starts
46
- range : Milliseconds , // the size of a window to delta over
47
- sample_time : pg_sys:: TimestampTz ,
48
- sample_value : f64 ,
49
- fc : pg_sys:: FunctionCallInfo ,
50
- ) -> Option < Inner < GapfillDeltaTransition > > {
51
- unsafe {
52
- in_aggregate_context ( fc, || {
53
- if sample_time < lowest_time || sample_time > greatest_time {
54
- error ! ( "input time less than lowest time" )
55
- }
56
-
57
- let mut state = state. unwrap_or_else ( || {
58
- let state: Inner < _ > = GapfillDeltaTransition :: new (
59
- lowest_time,
60
- greatest_time,
61
- range,
62
- step_size,
63
- false ,
64
- false ,
65
- )
66
- . into ( ) ;
67
- state
68
- } ) ;
69
-
70
- state. add_data_point ( sample_time, sample_value) ;
71
-
72
- Some ( state)
73
- } )
16
+ #[ pg_aggregate]
17
+ impl Aggregate for prom_delta {
18
+ type State = Option < GapfillDeltaTransition > ;
19
+ type Args = (
20
+ name ! ( lowest_time, pg_sys:: TimestampTz ) ,
21
+ name ! ( greatest_time, pg_sys:: TimestampTz ) ,
22
+ name ! ( step_size, Milliseconds ) ,
23
+ name ! ( range, Milliseconds ) ,
24
+ name ! ( sample_time, pg_sys:: TimestampTz ) ,
25
+ name ! ( sample_value, f64 ) ,
26
+ ) ;
27
+ type Finalize = Option < Vec < Option < f64 > > > ;
28
+
29
+ fn state (
30
+ state : Self :: State ,
31
+ ( lowest_time, greatest_time, step_size, range, sample_time, sample_value) : Self :: Args ,
32
+ _: pg_sys:: FunctionCallInfo ,
33
+ ) -> Self :: State {
34
+ if sample_time < lowest_time || sample_time > greatest_time {
35
+ error ! ( format!(
36
+ "input time {} not in bounds [{}, {}]" ,
37
+ sample_time, lowest_time, greatest_time
38
+ ) )
39
+ }
40
+
41
+ let mut state = state. unwrap_or_else ( || {
42
+ GapfillDeltaTransition :: new ( lowest_time, greatest_time, range, step_size, false , false )
43
+ } ) ;
44
+
45
+ state. add_data_point ( sample_time, sample_value) ;
46
+
47
+ Some ( state)
74
48
}
75
- }
76
-
77
- /// Backwards compatibility
78
- #[ no_mangle]
79
- pub extern "C" fn pg_finfo_gapfill_delta_transition ( ) -> & ' static pg_sys:: Pg_finfo_record {
80
- const V1_API : pg_sys:: Pg_finfo_record = pg_sys:: Pg_finfo_record { api_version : 1 } ;
81
- & V1_API
82
- }
83
49
84
- #[ no_mangle]
85
- unsafe extern "C" fn gapfill_delta_transition ( fcinfo : pg_sys:: FunctionCallInfo ) -> pg_sys:: Datum {
86
- prom_delta_transition_wrapper ( fcinfo)
87
- }
88
-
89
- #[ no_mangle]
90
- pub extern "C" fn pg_finfo_prom_delta_final_wrapper ( ) -> & ' static pg_sys:: Pg_finfo_record {
91
- const V1_API : pg_sys:: Pg_finfo_record = pg_sys:: Pg_finfo_record { api_version : 1 } ;
92
- & V1_API
93
- }
94
-
95
- #[ no_mangle]
96
- unsafe extern "C" fn prom_delta_final_wrapper ( fcinfo : pg_sys:: FunctionCallInfo ) -> pg_sys:: Datum {
97
- super :: gapfill_delta:: prom_extrapolate_final_wrapper ( fcinfo)
50
+ fn finalize ( current : Self :: State , _: pg_sys:: FunctionCallInfo ) -> Self :: Finalize {
51
+ current. map ( |mut s| s. as_vec ( ) )
52
+ }
98
53
}
99
54
100
- // implementation of prometheus delta function
101
- // for proper behavior the input must be ORDER BY sample_time
102
- extension_sql ! (
103
- r#"
104
- CREATE AGGREGATE @[email protected] _delta(
105
- lowest_time TIMESTAMPTZ,
106
- greatest_time TIMESTAMPTZ,
107
- step_size BIGINT,
108
- range BIGINT,
109
- sample_time TIMESTAMPTZ,
110
- sample_value DOUBLE PRECISION)
111
- (
112
- sfunc=@[email protected] _delta_transition,
113
- stype=internal,
114
- finalfunc=@[email protected] _extrapolate_final
115
- );
116
- "# ,
117
- name = "create_prom_delta_aggregate" ,
118
- requires = [ prom_delta_transition, prom_extrapolate_final]
119
- ) ;
120
-
121
55
#[ cfg( any( test, feature = "pg_test" ) ) ]
122
56
#[ pg_schema]
123
57
mod tests {
@@ -164,7 +98,7 @@ mod tests {
164
98
Spi :: get_one :: < Vec < f64 > > ( & * prepare_query ( "'2000-01-02 15:00:00 UTC'" , "NULL" ) ) ;
165
99
}
166
100
167
- #[ pg_test( error = "input time less than lowest time " ) ]
101
+ #[ pg_test( error = "input time 631292400000000 not in bounds [140400000000, 143100000000] " ) ]
168
102
fn test_prom_delta_with_input_time_less_than_lowest_time_fails ( ) {
169
103
setup ( ) ;
170
104
Spi :: get_one :: < Vec < f64 > > ( & * prepare_query (
0 commit comments