Skip to content

Commit 0605341

Browse files
frailltcijothomas
andauthored
Directly implement Measure trait for metric aggregates (open-telemetry#2371)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent fbcba3b commit 0605341

File tree

6 files changed

+167
-95
lines changed

6 files changed

+167
-95
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+41-41
Original file line numberDiff line numberDiff line change
@@ -97,77 +97,77 @@ impl Default for AggregateTimeInitiator {
9797
}
9898
}
9999

100+
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
101+
102+
/// Applies filter on provided attribute set
103+
/// No-op, if filter is not set
104+
#[derive(Clone)]
105+
pub(crate) struct AttributeSetFilter {
106+
filter: Option<Filter>,
107+
}
108+
109+
impl AttributeSetFilter {
110+
pub(crate) fn new(filter: Option<Filter>) -> Self {
111+
Self { filter }
112+
}
113+
114+
pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
115+
if let Some(filter) = &self.filter {
116+
let filtered_attrs: Vec<KeyValue> =
117+
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
118+
run(&filtered_attrs);
119+
} else {
120+
run(attrs);
121+
};
122+
}
123+
}
124+
100125
/// Builds aggregate functions
101126
pub(crate) struct AggregateBuilder<T> {
102127
/// The temporality used for the returned aggregate functions.
103128
temporality: Temporality,
104129

105130
/// The attribute filter the aggregate function will use on the input of
106131
/// measurements.
107-
filter: Option<Filter>,
132+
filter: AttributeSetFilter,
108133

109134
_marker: marker::PhantomData<T>,
110135
}
111136

112-
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
113-
114137
impl<T: Number> AggregateBuilder<T> {
115138
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
116139
AggregateBuilder {
117140
temporality,
118-
filter,
141+
filter: AttributeSetFilter::new(filter),
119142
_marker: marker::PhantomData,
120143
}
121144
}
122145

123-
/// Wraps the passed in measure with an attribute filtering function.
124-
fn filter(&self, f: impl Measure<T>) -> impl Measure<T> {
125-
let filter = self.filter.clone();
126-
move |n, attrs: &[KeyValue]| {
127-
if let Some(filter) = &filter {
128-
let filtered_attrs: Vec<KeyValue> =
129-
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
130-
f.call(n, &filtered_attrs);
131-
} else {
132-
f.call(n, attrs);
133-
};
134-
}
135-
}
136-
137146
/// Builds a last-value aggregate function input and output.
138147
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
139-
let lv = Arc::new(LastValue::new(self.temporality));
140-
let agg_lv = Arc::clone(&lv);
141-
142-
(
143-
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
144-
agg_lv,
145-
)
148+
let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone()));
149+
(lv.clone(), lv)
146150
}
147151

148152
/// Builds a precomputed sum aggregate function input and output.
149153
pub(crate) fn precomputed_sum(
150154
&self,
151155
monotonic: bool,
152156
) -> (impl Measure<T>, impl ComputeAggregation) {
153-
let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic));
154-
let agg_sum = Arc::clone(&s);
157+
let s = Arc::new(PrecomputedSum::new(
158+
self.temporality,
159+
self.filter.clone(),
160+
monotonic,
161+
));
155162

156-
(
157-
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
158-
agg_sum,
159-
)
163+
(s.clone(), s)
160164
}
161165

162166
/// Builds a sum aggregate function input and output.
163167
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
164-
let s = Arc::new(Sum::new(self.temporality, monotonic));
165-
let agg_sum = Arc::clone(&s);
168+
let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic));
166169

167-
(
168-
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
169-
agg_sum,
170-
)
170+
(s.clone(), s)
171171
}
172172

173173
/// Builds a histogram aggregate function input and output.
@@ -179,13 +179,13 @@ impl<T: Number> AggregateBuilder<T> {
179179
) -> (impl Measure<T>, impl ComputeAggregation) {
180180
let h = Arc::new(Histogram::new(
181181
self.temporality,
182+
self.filter.clone(),
182183
boundaries,
183184
record_min_max,
184185
record_sum,
185186
));
186-
let agg_h = Arc::clone(&h);
187187

188-
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
188+
(h.clone(), h)
189189
}
190190

191191
/// Builds an exponential histogram aggregate function input and output.
@@ -198,14 +198,14 @@ impl<T: Number> AggregateBuilder<T> {
198198
) -> (impl Measure<T>, impl ComputeAggregation) {
199199
let h = Arc::new(ExpoHistogram::new(
200200
self.temporality,
201+
self.filter.clone(),
201202
max_size,
202203
max_scale,
203204
record_min_max,
204205
record_sum,
205206
));
206-
let agg_h = Arc::clone(&h);
207207

208-
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
208+
(h.clone(), h)
209209
}
210210
}
211211

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

+43-16
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use crate::metrics::{
1313
Temporality,
1414
};
1515

16-
use super::{aggregate::AggregateTimeInitiator, Aggregator, ComputeAggregation, Number, ValueMap};
16+
use super::{
17+
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
18+
Aggregator, ComputeAggregation, Measure, Number, ValueMap,
19+
};
1720

1821
pub(crate) const EXPO_MAX_SCALE: i8 = 20;
1922
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -357,6 +360,7 @@ pub(crate) struct ExpoHistogram<T: Number> {
357360
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
358361
init_time: AggregateTimeInitiator,
359362
temporality: Temporality,
363+
filter: AttributeSetFilter,
360364
record_sum: bool,
361365
record_min_max: bool,
362366
}
@@ -365,6 +369,7 @@ impl<T: Number> ExpoHistogram<T> {
365369
/// Create a new exponential histogram.
366370
pub(crate) fn new(
367371
temporality: Temporality,
372+
filter: AttributeSetFilter,
368373
max_size: u32,
369374
max_scale: i8,
370375
record_min_max: bool,
@@ -377,22 +382,12 @@ impl<T: Number> ExpoHistogram<T> {
377382
}),
378383
init_time: AggregateTimeInitiator::default(),
379384
temporality,
385+
filter,
380386
record_sum,
381387
record_min_max,
382388
}
383389
}
384390

385-
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
386-
let f_value = value.into_float();
387-
// Ignore NaN and infinity.
388-
// Only makes sense if T is f64, maybe this could be no-op for other cases?
389-
if !f_value.is_finite() {
390-
return;
391-
}
392-
393-
self.value_map.measure(value, attrs);
394-
}
395-
396391
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
397392
let time = self.init_time.delta();
398393

@@ -505,6 +500,24 @@ impl<T: Number> ExpoHistogram<T> {
505500
}
506501
}
507502

503+
impl<T> Measure<T> for Arc<ExpoHistogram<T>>
504+
where
505+
T: Number,
506+
{
507+
fn call(&self, measurement: T, attrs: &[KeyValue]) {
508+
let f_value = measurement.into_float();
509+
// Ignore NaN and infinity.
510+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
511+
if !f_value.is_finite() {
512+
return;
513+
}
514+
515+
self.filter.apply(attrs, |filtered| {
516+
self.value_map.measure(measurement, filtered);
517+
})
518+
}
519+
}
520+
508521
impl<T> ComputeAggregation for Arc<ExpoHistogram<T>>
509522
where
510523
T: Number,
@@ -682,9 +695,16 @@ mod tests {
682695
];
683696

684697
for test in test_cases {
685-
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
698+
let h = Arc::new(ExpoHistogram::new(
699+
Temporality::Cumulative,
700+
AttributeSetFilter::new(None),
701+
4,
702+
20,
703+
true,
704+
true,
705+
));
686706
for v in test.values {
687-
h.measure(v, &[]);
707+
Measure::call(&h, v, &[]);
688708
}
689709
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
690710

@@ -731,9 +751,16 @@ mod tests {
731751
];
732752

733753
for test in test_cases {
734-
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
754+
let h = Arc::new(ExpoHistogram::new(
755+
Temporality::Cumulative,
756+
AttributeSetFilter::new(None),
757+
4,
758+
20,
759+
true,
760+
true,
761+
));
735762
for v in test.values {
736-
h.measure(v, &[]);
763+
Measure::call(&h, v, &[]);
737764
}
738765
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
739766

opentelemetry-sdk/src/metrics/internal/histogram.rs

+26-13
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use crate::metrics::Temporality;
99
use opentelemetry::KeyValue;
1010

1111
use super::aggregate::AggregateTimeInitiator;
12+
use super::aggregate::AttributeSetFilter;
1213
use super::ComputeAggregation;
14+
use super::Measure;
1315
use super::ValueMap;
1416
use super::{Aggregator, Number};
1517

@@ -72,6 +74,7 @@ pub(crate) struct Histogram<T: Number> {
7274
value_map: ValueMap<Mutex<Buckets<T>>>,
7375
init_time: AggregateTimeInitiator,
7476
temporality: Temporality,
77+
filter: AttributeSetFilter,
7578
bounds: Vec<f64>,
7679
record_min_max: bool,
7780
record_sum: bool,
@@ -81,6 +84,7 @@ impl<T: Number> Histogram<T> {
8184
#[allow(unused_mut)]
8285
pub(crate) fn new(
8386
temporality: Temporality,
87+
filter: AttributeSetFilter,
8488
mut bounds: Vec<f64>,
8589
record_min_max: bool,
8690
record_sum: bool,
@@ -97,24 +101,13 @@ impl<T: Number> Histogram<T> {
97101
value_map: ValueMap::new(buckets_count),
98102
init_time: AggregateTimeInitiator::default(),
99103
temporality,
104+
filter,
100105
bounds,
101106
record_min_max,
102107
record_sum,
103108
}
104109
}
105110

106-
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
107-
let f = measurement.into_float();
108-
// This search will return an index in the range `[0, bounds.len()]`, where
109-
// it will return `bounds.len()` if value is greater than the last element
110-
// of `bounds`. This aligns with the buckets in that the length of buckets
111-
// is `bounds.len()+1`, with the last bucket representing:
112-
// `(bounds[bounds.len()-1], +∞)`.
113-
let index = self.bounds.partition_point(|&x| x < f);
114-
115-
self.value_map.measure((measurement, index), attrs);
116-
}
117-
118111
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
119112
let time = self.init_time.delta();
120113

@@ -216,6 +209,25 @@ impl<T: Number> Histogram<T> {
216209
}
217210
}
218211

212+
impl<T> Measure<T> for Arc<Histogram<T>>
213+
where
214+
T: Number,
215+
{
216+
fn call(&self, measurement: T, attrs: &[KeyValue]) {
217+
let f = measurement.into_float();
218+
// This search will return an index in the range `[0, bounds.len()]`, where
219+
// it will return `bounds.len()` if value is greater than the last element
220+
// of `bounds`. This aligns with the buckets in that the length of buckets
221+
// is `bounds.len()+1`, with the last bucket representing:
222+
// `(bounds[bounds.len()-1], +∞)`.
223+
let index = self.bounds.partition_point(|&x| x < f);
224+
225+
self.filter.apply(attrs, |filtered| {
226+
self.value_map.measure((measurement, index), filtered);
227+
})
228+
}
229+
}
230+
219231
impl<T> ComputeAggregation for Arc<Histogram<T>>
220232
where
221233
T: Number,
@@ -236,12 +248,13 @@ mod tests {
236248
fn check_buckets_are_selected_correctly() {
237249
let hist = Arc::new(Histogram::<i64>::new(
238250
Temporality::Cumulative,
251+
AttributeSetFilter::new(None),
239252
vec![1.0, 3.0, 6.0],
240253
false,
241254
false,
242255
));
243256
for v in 1..11 {
244-
hist.measure(v, &[]);
257+
Measure::call(&hist, v, &[]);
245258
}
246259
let (count, dp) = ComputeAggregation::call(&hist, None);
247260
let dp = dp.unwrap();

0 commit comments

Comments
 (0)