diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index b1b198d73c0..aef35bf0f22 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync { fn as_mut(&mut self) -> &mut dyn any::Any; } +/// Allow to access data points of an [Aggregation]. +pub trait AggregationDataPoints { + /// The type of data point in the aggregation. + type Point; + /// The data points of the aggregation. + fn points(&mut self) -> &mut Vec; +} + /// DataPoint is a single data point in a time series. #[derive(Debug, PartialEq)] pub struct GaugeDataPoint { @@ -228,6 +236,14 @@ impl Aggregation for ExponentialHistogram } } +impl AggregationDataPoints for ExponentialHistogram { + type Point = ExponentialHistogramDataPoint; + + fn points(&mut self) -> &mut Vec { + &mut self.data_points + } +} + /// A single exponential histogram data point in a time series. #[derive(Debug, PartialEq)] pub struct ExponentialHistogramDataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index fc9d5975c3f..ef57ef65462 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -8,11 +8,19 @@ use std::{ use opentelemetry::KeyValue; -use crate::metrics::{data::Aggregation, Temporality}; +use crate::metrics::{ + data::{Aggregation, AggregationDataPoints}, + Temporality, +}; use super::{ - exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue, - precomputed_sum::PrecomputedSum, sum::Sum, Number, + collector::{Collector, CumulativeValueMap, DeltaValueMap}, + exponential_histogram::{ExpoHistogram, ExpoHistogramBucketConfig}, + histogram::Histogram, + last_value::LastValue, + precomputed_sum::PrecomputedSum, + sum::Sum, + Number, }; pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; @@ -58,6 +66,7 @@ where } } +#[derive(Clone, Copy)] pub(crate) struct AggregateTime { pub start: SystemTime, pub current: SystemTime, @@ -121,6 +130,12 @@ impl AttributeSetFilter { } } +pub(crate) trait InitAggregationData { + type Aggr: Aggregation + AggregationDataPoints; + fn create_new(&self, time: AggregateTime) -> Self::Aggr; + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime); +} + /// Builds aggregate functions pub(crate) struct AggregateBuilder { /// The temporality used for the returned aggregate functions. @@ -182,15 +197,32 @@ impl AggregateBuilder { record_min_max: bool, record_sum: bool, ) -> AggregateFns { - ExpoHistogram::new( - self.temporality, - self.filter.clone(), - max_size, - max_scale, - record_min_max, - record_sum, - ) - .into() + match self.temporality { + Temporality::Delta => ExpoHistogram { + aggregate_collector: Collector::new( + self.filter.clone(), + DeltaValueMap::new(ExpoHistogramBucketConfig { + max_size: max_size as i32, + max_scale, + }), + ), + record_min_max, + record_sum, + } + .into(), + _ => ExpoHistogram { + aggregate_collector: Collector::new( + self.filter.clone(), + CumulativeValueMap::new(ExpoHistogramBucketConfig { + max_size: max_size as i32, + max_scale, + }), + ), + record_min_max, + record_sum, + } + .into(), + } } } diff --git a/opentelemetry-sdk/src/metrics/internal/collector.rs b/opentelemetry-sdk/src/metrics/internal/collector.rs new file mode 100644 index 00000000000..f94547e0a5f --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/collector.rs @@ -0,0 +1,195 @@ +use opentelemetry::KeyValue; + +use crate::metrics::{ + data::{Aggregation, AggregationDataPoints}, + Temporality, +}; + +use super::{ + aggregate::{AggregateTime, AttributeSetFilter}, + AggregateTimeInitiator, Aggregator, InitAggregationData, ValueMap, +}; + +/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality +pub(crate) trait AggregateMap: Send + Sync + 'static { + const TEMPORALITY: Temporality; + type Aggr: Aggregator; + + fn measure(&self, value: ::PreComputedValue, attributes: &[KeyValue]); + + fn collect_data_points(&self, dest: &mut Vec, map_fn: MapFn) + where + MapFn: FnMut(Vec, &Self::Aggr) -> DP; +} + +/// Higher level abstraction (compared to [`AggregateMap`]) that also does the filtering and collection into aggregation data +pub(crate) trait AggregateCollector: Send + Sync + 'static { + const TEMPORALITY: Temporality; + type Aggr: Aggregator; + + fn measure(&self, value: ::PreComputedValue, attributes: &[KeyValue]); + + fn collect<'a, InitAggregate, F>( + &self, + aggregate: &InitAggregate, + dest: Option<&'a mut dyn Aggregation>, + create_point: F, + ) -> (usize, Option>) + where + InitAggregate: InitAggregationData, + F: FnMut( + Vec, + &Self::Aggr, + ) -> ::Point; +} + +pub(crate) struct Collector { + filter: AttributeSetFilter, + aggregate_map: AM, + time: AggregateTimeInitiator, +} + +impl Collector +where + AM: AggregateMap, +{ + pub(crate) fn new(filter: AttributeSetFilter, aggregate_map: AM) -> Self { + Self { + filter, + aggregate_map, + time: AggregateTimeInitiator::default(), + } + } + + fn init_time(&self) -> AggregateTime { + if let Temporality::Delta = AM::TEMPORALITY { + self.time.delta() + } else { + self.time.cumulative() + } + } +} + +impl AggregateCollector for Collector +where + AM: AggregateMap, +{ + const TEMPORALITY: Temporality = AM::TEMPORALITY; + + type Aggr = AM::Aggr; + + fn measure(&self, value: ::PreComputedValue, attributes: &[KeyValue]) { + self.filter.apply(attributes, |filtered_attrs| { + self.aggregate_map.measure(value, filtered_attrs); + }); + } + + fn collect<'a, InitAggregate, F>( + &self, + aggregate: &InitAggregate, + dest: Option<&'a mut dyn Aggregation>, + create_point: F, + ) -> (usize, Option>) + where + InitAggregate: InitAggregationData, + F: FnMut(Vec, &AM::Aggr) -> ::Point, + { + let time = self.init_time(); + let s_data = dest.and_then(|d| d.as_mut().downcast_mut::()); + let mut new_agg = if s_data.is_none() { + Some(aggregate.create_new(time)) + } else { + None + }; + let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); + aggregate.reset_existing(s_data, time); + self.aggregate_map + .collect_data_points(s_data.points(), create_point); + + ( + s_data.points().len(), + new_agg.map(|a| Box::new(a) as Box<_>), + ) + } +} + +/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality +/// Later this could be improved to support only Delta temporality +pub(crate) struct DeltaValueMap(ValueMap) +where + A: Aggregator; + +impl DeltaValueMap +where + A: Aggregator, +{ + pub(crate) fn new(config: A::InitConfig) -> Self { + Self(ValueMap::new(config)) + } +} + +impl AggregateMap for DeltaValueMap +where + A: Aggregator, + ::InitConfig: Send + Sync, +{ + const TEMPORALITY: Temporality = Temporality::Delta; + + type Aggr = A; + + fn measure( + &self, + value: ::PreComputedValue, + attributes: &[KeyValue], + ) { + self.0.measure(value, attributes); + } + + fn collect_data_points(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, &Self::Aggr) -> DP, + { + self.0 + .collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr)); + } +} + +/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality +/// Later this could be improved to support only Cumulative temporality +pub(crate) struct CumulativeValueMap(ValueMap) +where + A: Aggregator; + +impl CumulativeValueMap +where + A: Aggregator, +{ + pub(crate) fn new(config: A::InitConfig) -> Self { + Self(ValueMap::new(config)) + } +} + +impl AggregateMap for CumulativeValueMap +where + A: Aggregator, + ::InitConfig: Send + Sync, +{ + const TEMPORALITY: Temporality = Temporality::Cumulative; + + type Aggr = A; + + fn measure( + &self, + value: ::PreComputedValue, + attributes: &[KeyValue], + ) { + self.0.measure(value, attributes); + } + + fn collect_data_points(&self, dest: &mut Vec, map_fn: MapFn) + where + MapFn: FnMut(Vec, &Self::Aggr) -> DP, + { + self.0.collect_readonly(dest, map_fn); + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 170f4a068d5..fe1b1ccce74 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -3,14 +3,11 @@ use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex}; use opentelemetry::{otel_debug, KeyValue}; use std::sync::OnceLock; -use crate::metrics::{ - data::{self, Aggregation}, - Temporality, -}; +use crate::metrics::data::{self, Aggregation}; use super::{ - aggregate::{AggregateTimeInitiator, AttributeSetFilter}, - Aggregator, ComputeAggregation, Measure, Number, ValueMap, + aggregate::AggregateTime, collector::AggregateCollector, Aggregator, ComputeAggregation, + InitAggregationData, Measure, Number, }; pub(crate) const EXPO_MAX_SCALE: i8 = 20; @@ -18,7 +15,7 @@ pub(crate) const EXPO_MIN_SCALE: i8 = -10; /// A single data point in an exponential histogram. #[derive(Debug, PartialEq)] -struct ExpoHistogramDataPoint { +pub(crate) struct ExpoHistogramDataPoint { max_size: i32, count: usize, min: T, @@ -31,7 +28,7 @@ struct ExpoHistogramDataPoint { } impl ExpoHistogramDataPoint { - fn new(config: &BucketConfig) -> Self { + fn new(config: &ExpoHistogramBucketConfig) -> Self { ExpoHistogramDataPoint { max_size: config.max_size, count: 0, @@ -317,11 +314,11 @@ impl Aggregator for Mutex> where T: Number, { - type InitConfig = BucketConfig; + type InitConfig = ExpoHistogramBucketConfig; type PreComputedValue = T; - fn create(init: &BucketConfig) -> Self { + fn create(init: &ExpoHistogramBucketConfig) -> Self { Mutex::new(ExpoHistogramDataPoint::new(init)) } @@ -333,7 +330,7 @@ where this.record(value); } - fn clone_and_reset(&self, init: &BucketConfig) -> Self { + fn clone_and_reset(&self, init: &ExpoHistogramBucketConfig) -> Self { let mut current = self.lock().unwrap_or_else(|err| err.into_inner()); let cloned = replace(current.deref_mut(), ExpoHistogramDataPoint::new(init)); Mutex::new(cloned) @@ -341,9 +338,9 @@ where } #[derive(Debug, Clone, Copy, PartialEq)] -struct BucketConfig { - max_size: i32, - max_scale: i8, +pub(crate) struct ExpoHistogramBucketConfig { + pub(crate) max_size: i32, + pub(crate) max_scale: i8, } /// An aggregator that summarizes a set of measurements as an exponential @@ -351,117 +348,37 @@ struct BucketConfig { /// /// Each histogram is scoped by attributes and the aggregation cycle the /// measurements were made in. -pub(crate) struct ExpoHistogram { - value_map: ValueMap>>, - init_time: AggregateTimeInitiator, - temporality: Temporality, - filter: AttributeSetFilter, - record_sum: bool, - record_min_max: bool, +pub(crate) struct ExpoHistogram { + pub(crate) aggregate_collector: AC, + pub(crate) record_sum: bool, + pub(crate) record_min_max: bool, } -impl ExpoHistogram { - /// Create a new exponential histogram. - pub(crate) fn new( - temporality: Temporality, - filter: AttributeSetFilter, - max_size: u32, - max_scale: i8, - record_min_max: bool, - record_sum: bool, - ) -> Self { - ExpoHistogram { - value_map: ValueMap::new(BucketConfig { - max_size: max_size as i32, - max_scale, - }), - init_time: AggregateTimeInitiator::default(), - temporality, - filter, - record_sum, - record_min_max, +impl Measure for ExpoHistogram +where + AC: AggregateCollector>>, + T: Number, +{ + fn call(&self, measurement: T, attrs: &[KeyValue]) { + let f_value = measurement.into_float(); + // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? + if !f_value.is_finite() { + return; } + self.aggregate_collector.measure(measurement, attrs); } +} - fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { - let time = self.init_time.delta(); - - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::ExponentialHistogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Delta, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Delta; - h.start_time = time.start; - h.time = time.current; - - self.value_map - .collect_and_reset(&mut h.data_points, |attributes, attr| { - let b = attr.into_inner().unwrap_or_else(|err| err.into_inner()); - data::ExponentialHistogramDataPoint { - attributes, - count: b.count, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - sum: if self.record_sum { b.sum } else { T::default() }, - scale: b.scale, - zero_count: b.zero_count, - positive_bucket: data::ExponentialBucket { - offset: b.pos_buckets.start_bin, - counts: b.pos_buckets.counts, - }, - negative_bucket: data::ExponentialBucket { - offset: b.neg_buckets.start_bin, - counts: b.neg_buckets.counts, - }, - zero_threshold: 0.0, - exemplars: vec![], - } - }); - - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) - } - - fn cumulative( - &self, - dest: Option<&mut dyn Aggregation>, - ) -> (usize, Option>) { - let time = self.init_time.cumulative(); - - let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); - let mut new_agg = if h.is_none() { - Some(data::ExponentialHistogram { - data_points: vec![], - start_time: time.start, - time: time.current, - temporality: Temporality::Cumulative, - }) - } else { - None - }; - let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); - h.temporality = Temporality::Cumulative; - h.start_time = time.start; - h.time = time.current; - - self.value_map - .collect_readonly(&mut h.data_points, |attributes, attr| { - let b = attr.lock().unwrap_or_else(|err| err.into_inner()); +impl ComputeAggregation for ExpoHistogram +where + AC: AggregateCollector>>, + T: Number, +{ + fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { + self.aggregate_collector + .collect(self, dest, |attributes, aggr| { + let b = aggr.lock().unwrap_or_else(|err| err.into_inner()); data::ExponentialHistogramDataPoint { attributes, count: b.count, @@ -489,48 +406,44 @@ impl ExpoHistogram { zero_threshold: 0.0, exemplars: vec![], } - }); - - (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) + }) } } -impl Measure for ExpoHistogram +impl InitAggregationData for ExpoHistogram where + AC: AggregateCollector>>, T: Number, { - fn call(&self, measurement: T, attrs: &[KeyValue]) { - let f_value = measurement.into_float(); - // Ignore NaN and infinity. - // Only makes sense if T is f64, maybe this could be no-op for other cases? - if !f_value.is_finite() { - return; + type Aggr = data::ExponentialHistogram; + + fn create_new(&self, time: AggregateTime) -> Self::Aggr { + data::ExponentialHistogram { + data_points: vec![], + start_time: time.start, + time: time.current, + temporality: AC::TEMPORALITY, } - - self.filter.apply(attrs, |filtered| { - self.value_map.measure(measurement, filtered); - }) } -} -impl ComputeAggregation for ExpoHistogram -where - T: Number, -{ - fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option>) { - match self.temporality { - Temporality::Delta => self.delta(dest), - _ => self.cumulative(dest), - } + fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) { + existing.data_points.clear(); + existing.start_time = time.start; + existing.time = time.current; + existing.temporality = AC::TEMPORALITY; } } + #[cfg(test)] mod tests { use std::{ops::Neg, time::SystemTime}; use tests::internal::AggregateFns; - use crate::metrics::internal::{self, AggregateBuilder}; + use crate::metrics::{ + internal::{self, AggregateBuilder}, + Temporality, + }; use super::*; @@ -630,7 +543,7 @@ mod tests { ]; for test in test_cases { - let mut dp = ExpoHistogramDataPoint::::new(&BucketConfig { + let mut dp = ExpoHistogramDataPoint::::new(&ExpoHistogramBucketConfig { max_size: test.max_size, max_scale: 20, }); @@ -691,21 +604,26 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new( - Temporality::Cumulative, - AttributeSetFilter::new(None), - 4, - 20, - true, - true, - ); + let AggregateFns { measure, collect } = + AggregateBuilder::new(Temporality::Cumulative, None) + .exponential_bucket_histogram(4, 20, true, true); for v in test.values { - Measure::call(&h, v, &[]); + measure.call(v, &[]); } - let dp = h.value_map.no_attribute_tracker.lock().unwrap(); - assert_eq!(test.expected.max, dp.max); - assert_eq!(test.expected.min, dp.min); + let mut got = Box::new(data::ExponentialHistogram:: { + data_points: vec![], + start_time: SystemTime::now(), + time: SystemTime::now(), + temporality: Temporality::Delta, + }); + + collect.call(Some(got.as_mut())); + assert_eq!(got.data_points.len(), 1); + let dp = got.data_points.first().unwrap(); + + assert_eq!(test.expected.max, dp.max.unwrap()); + assert_eq!(test.expected.min, dp.min.unwrap()); assert_eq!(test.expected.sum, dp.sum); assert_eq!(test.expected.count, dp.count); } @@ -747,21 +665,26 @@ mod tests { ]; for test in test_cases { - let h = ExpoHistogram::new( - Temporality::Cumulative, - AttributeSetFilter::new(None), - 4, - 20, - true, - true, - ); + let AggregateFns { measure, collect } = + AggregateBuilder::new(Temporality::Cumulative, None) + .exponential_bucket_histogram(4, 20, true, true); for v in test.values { - Measure::call(&h, v, &[]); + measure.call(v, &[]); } - let dp = h.value_map.no_attribute_tracker.lock().unwrap(); - assert_eq!(test.expected.max, dp.max); - assert_eq!(test.expected.min, dp.min); + let mut got = Box::new(data::ExponentialHistogram:: { + data_points: vec![], + start_time: SystemTime::now(), + time: SystemTime::now(), + temporality: Temporality::Delta, + }); + + collect.call(Some(got.as_mut())); + assert_eq!(got.data_points.len(), 1); + let dp = got.data_points.first().unwrap(); + + assert_eq!(test.expected.max, dp.max.unwrap()); + assert_eq!(test.expected.min, dp.min.unwrap()); assert_eq!(test.expected.sum, dp.sum); assert_eq!(test.expected.count, dp.count); } @@ -841,7 +764,7 @@ mod tests { }, ]; for test in test_cases { - let mut dp = ExpoHistogramDataPoint::new(&BucketConfig { + let mut dp = ExpoHistogramDataPoint::new(&ExpoHistogramBucketConfig { max_size: test.max_size, max_scale: 20, }); @@ -861,7 +784,7 @@ mod tests { // These bins are calculated from the following formula: // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. - let cfg = BucketConfig { + let cfg = ExpoHistogramBucketConfig { max_size: 4, max_scale: 20, }; @@ -1230,7 +1153,7 @@ mod tests { zero_count: 0, }; - let mut ehdp = ExpoHistogramDataPoint::new(&BucketConfig { + let mut ehdp = ExpoHistogramDataPoint::new(&ExpoHistogramBucketConfig { max_size: 4, max_scale: 20, }); diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 6316c97b237..f38470913d9 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,4 +1,5 @@ mod aggregate; +mod collector; mod exponential_histogram; mod histogram; mod last_value; @@ -12,7 +13,10 @@ use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock, RwLock}; -use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT}; +use aggregate::{ + is_under_cardinality_limit, AggregateTimeInitiator, InitAggregationData, + STREAM_CARDINALITY_LIMIT, +}; pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; use opentelemetry::{otel_warn, KeyValue}; @@ -25,7 +29,7 @@ fn stream_overflow_attributes() -> &'static Vec { STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")]) } -pub(crate) trait Aggregator { +pub(crate) trait Aggregator: Send + Sync + 'static { /// A static configuration that is needed in order to initialize aggregator. /// E.g. bucket_size at creation time . type InitConfig;