Skip to content

Commit

Permalink
Generic metrics collector
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Jan 11, 2025
1 parent 6cc327d commit f89f06b
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 187 deletions.
16 changes: 16 additions & 0 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// Allow to access data points of an [Aggregation].
pub trait AggregationDataPoints {
/// The type of data point in the aggregation.
type Point;
/// The data points of the aggregation.
fn points(&mut self) -> &mut Vec<Self::Point>;
}

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
Expand Down Expand Up @@ -228,6 +236,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
}
}

impl<T> AggregationDataPoints for ExponentialHistogram<T> {
type Point = ExponentialHistogramDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::Point> {
&mut self.data_points
}
}

/// A single exponential histogram data point in a time series.
#[derive(Debug, PartialEq)]
pub struct ExponentialHistogramDataPoint<T> {
Expand Down
56 changes: 44 additions & 12 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ use std::{

use opentelemetry::KeyValue;

use crate::metrics::{data::Aggregation, Temporality};
use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
collector::{Collector, CumulativeValueMap, DeltaValueMap},
exponential_histogram::{ExpoHistogram, ExpoHistogramBucketConfig},
histogram::Histogram,
last_value::LastValue,
precomputed_sum::PrecomputedSum,
sum::Sum,
Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
Expand Down Expand Up @@ -58,6 +66,7 @@ where
}
}

#[derive(Clone, Copy)]
pub(crate) struct AggregateTime {
pub start: SystemTime,
pub current: SystemTime,
Expand Down Expand Up @@ -121,6 +130,12 @@ impl AttributeSetFilter {
}
}

pub(crate) trait InitAggregationData {
type Aggr: Aggregation + AggregationDataPoints;
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down Expand Up @@ -182,15 +197,32 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> AggregateFns<T> {
ExpoHistogram::new(
self.temporality,
self.filter.clone(),
max_size,
max_scale,
record_min_max,
record_sum,
)
.into()
match self.temporality {
Temporality::Delta => ExpoHistogram {
aggregate_collector: Collector::new(
self.filter.clone(),
DeltaValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_min_max,
record_sum,
}
.into(),
_ => ExpoHistogram {
aggregate_collector: Collector::new(
self.filter.clone(),
CumulativeValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_min_max,
record_sum,
}
.into(),
}
}
}

Expand Down
195 changes: 195 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
aggregate::{AggregateTime, AttributeSetFilter},
AggregateTimeInitiator, Aggregator, InitAggregationData, ValueMap,
};

/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
pub(crate) trait AggregateMap: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
}

/// Higher level abstraction (compared to [`AggregateMap`]) that also does the filtering and collection into aggregation data
pub(crate) trait AggregateCollector: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect<'a, InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&'a mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(
Vec<KeyValue>,
&Self::Aggr,
) -> <InitAggregate::Aggr as AggregationDataPoints>::Point;
}

pub(crate) struct Collector<AM> {
filter: AttributeSetFilter,
aggregate_map: AM,
time: AggregateTimeInitiator,
}

impl<AM> Collector<AM>
where
AM: AggregateMap,
{
pub(crate) fn new(filter: AttributeSetFilter, aggregate_map: AM) -> Self {
Self {
filter,
aggregate_map,
time: AggregateTimeInitiator::default(),
}
}

fn init_time(&self) -> AggregateTime {
if let Temporality::Delta = AM::TEMPORALITY {
self.time.delta()
} else {
self.time.cumulative()
}
}
}

impl<AM> AggregateCollector for Collector<AM>
where
AM: AggregateMap,
{
const TEMPORALITY: Temporality = AM::TEMPORALITY;

type Aggr = AM::Aggr;

fn measure(&self, value: <AM::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]) {
self.filter.apply(attributes, |filtered_attrs| {
self.aggregate_map.measure(value, filtered_attrs);
});
}

fn collect<'a, InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&'a mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(Vec<KeyValue>, &AM::Aggr) -> <InitAggregate::Aggr as AggregationDataPoints>::Point,
{
let time = self.init_time();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<InitAggregate::Aggr>());
let mut new_agg = if s_data.is_none() {
Some(aggregate.create_new(time))

Check warning on line 100 in opentelemetry-sdk/src/metrics/internal/collector.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/collector.rs#L100

Added line #L100 was not covered by tests
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
aggregate.reset_existing(s_data, time);
self.aggregate_map
.collect_data_points(s_data.points(), create_point);

(
s_data.points().len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
/// Later this could be improved to support only Delta temporality
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> DeltaValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for DeltaValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Delta;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
/// Later this could be improved to support only Cumulative temporality
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> CumulativeValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for CumulativeValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Cumulative;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0.collect_readonly(dest, map_fn);
}
}
Loading

0 comments on commit f89f06b

Please sign in to comment.