diff --git a/Cargo.toml b/Cargo.toml index da686091..88db055d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,9 @@ [workspace] +resolver = "2" members = [ "metrics", "metrics-benchmark", + "metrics-exporter-dogstatsd", "metrics-exporter-prometheus", "metrics-exporter-tcp", "metrics-observer", @@ -24,7 +26,6 @@ crossbeam-utils = { version = "0.8", default-features = false } getopts = { version = "0.2", default-features = false } hashbrown = { version = "0.15", default-features = false, features = ["default-hasher", "raw-entry"] } hdrhistogram = { version = "7.2", default-features = false } -home = { version = "0.5", default-features = false } http-body-util = { version = "0.1", default-features = false } hyper = { version = "1.1", default-features = false, features = ["server", "client"] } hyper-rustls = { version = "0.27", default-features = false, features = ["aws-lc-rs", "http1", "rustls-native-certs"] } diff --git a/clippy.toml b/clippy.toml index be264b66..7d6c4297 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,2 +1,3 @@ too-many-lines-threshold = 150 ignore-interior-mutability = ["metrics::key::Key"] +doc-valid-idents = ["DogStatsD", ".."] diff --git a/metrics-exporter-dogstatsd/CHANGELOG.md b/metrics-exporter-dogstatsd/CHANGELOG.md new file mode 100644 index 00000000..b43c0324 --- /dev/null +++ b/metrics-exporter-dogstatsd/CHANGELOG.md @@ -0,0 +1,13 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + + + +## [Unreleased] - ReleaseDate + +### Added + +- Genesis. diff --git a/metrics-exporter-dogstatsd/Cargo.toml b/metrics-exporter-dogstatsd/Cargo.toml new file mode 100644 index 00000000..6c3eadaa --- /dev/null +++ b/metrics-exporter-dogstatsd/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "metrics-exporter-dogstatsd" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = { version = "1", default-features = false } +ryu = { version = "1", default-features = false } +itoa = { version = "1", default-features = false } +metrics = { version = "^0.24", path = "../metrics" } +metrics-util = { version = "^0.18", path = "../metrics-util" } +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +proptest = { workspace = true } +rand = { workspace = true } +rand_xoshiro = { version = "0.6", default-features = false } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["fmt"] } diff --git a/metrics-exporter-dogstatsd/examples/dogstatsd_idle_counter.rs b/metrics-exporter-dogstatsd/examples/dogstatsd_idle_counter.rs new file mode 100644 index 00000000..30c16ff5 --- /dev/null +++ b/metrics-exporter-dogstatsd/examples/dogstatsd_idle_counter.rs @@ -0,0 +1,28 @@ +use std::time::{Duration, Instant}; + +use metrics::counter; +use metrics_exporter_dogstatsd::DogStatsDBuilder; + +fn main() { + tracing_subscriber::fmt::init(); + + DogStatsDBuilder::default() + .with_remote_address("localhost:9125") + .expect("failed to parse remote address") + .with_telemetry(false) + .install() + .expect("failed to install DogStatsD recorder"); + + counter!("idle_metric").increment(1); + + // Loop over and over, incrementing our counter every 10 seconds or so. + let mut last_update = Instant::now(); + loop { + if last_update.elapsed() > Duration::from_secs(10) { + counter!("idle_metric").increment(1); + last_update = Instant::now(); + } + + std::thread::sleep(Duration::from_secs(1)); + } +} diff --git a/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs b/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs new file mode 100644 index 00000000..dc95fd63 --- /dev/null +++ b/metrics-exporter-dogstatsd/examples/dogstatsd_synchronous.rs @@ -0,0 +1,36 @@ +use metrics::{counter, gauge, histogram}; +use metrics_exporter_dogstatsd::DogStatsDBuilder; +use rand::{thread_rng, Rng, SeedableRng as _}; +use rand_xoshiro::Xoshiro256StarStar; + +fn main() { + tracing_subscriber::fmt::init(); + + DogStatsDBuilder::default() + .with_remote_address("localhost:9125") + .expect("failed to parse remote address") + .install() + .expect("failed to install DogStatsD recorder"); + + counter!("idle_metric").increment(1); + gauge!("testing").set(42.0); + + let server_loops = counter!("tcp_server_loops", "system" => "foo"); + let server_loops_delta_secs = histogram!("tcp_server_loop_delta_secs", "system" => "foo"); + + let mut rng = Xoshiro256StarStar::from_rng(thread_rng()).unwrap(); + + // Loop over and over, pretending to do some work. + loop { + server_loops.increment(1); + server_loops_delta_secs.record(rng.gen_range(0.0..1.0)); + + let increment_gauge = thread_rng().gen_bool(0.75); + let gauge = gauge!("lucky_iterations"); + if increment_gauge { + gauge.increment(1.0); + } else { + gauge.decrement(1.0); + } + } +} diff --git a/metrics-exporter-dogstatsd/proptest-regressions/forwarder/writer.txt b/metrics-exporter-dogstatsd/proptest-regressions/forwarder/writer.txt new file mode 100644 index 00000000..a5a5878e --- /dev/null +++ b/metrics-exporter-dogstatsd/proptest-regressions/forwarder/writer.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 996294ee8ec0a637930c95346f0165bbaba2cbf701d321de8d713d7f723f0ff0 # shrinks to payload_limit = 43, inputs = [Histogram(Key { name: KeyName("A0AA0A0A"), labels: [], hashed: true, hash: 18423047812237334005 }, [-0.0, 3.1748022040256364e164], 10000000)] diff --git a/metrics-exporter-dogstatsd/src/builder.rs b/metrics-exporter-dogstatsd/src/builder.rs new file mode 100644 index 00000000..a922df54 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/builder.rs @@ -0,0 +1,410 @@ +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +use thiserror::Error; + +use crate::{ + forwarder::{self, ForwarderConfiguration, RemoteAddr}, + recorder::DogStatsDRecorder, + state::{State, StateConfiguration}, +}; + +// Maximum data length for a UDP datagram. +// +// Realistically, users should basically never send payloads anywhere _near_ this large, but we're only trying to ensure +// we're not about to do anything that we _know_ is technically invalid. +const UDP_DATAGRAM_MAX_PAYLOAD_LEN: usize = (u16::MAX as usize) - 8; + +const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(1); +const DEFAULT_MAX_PAYLOAD_LEN: usize = 8192; +const DEFAULT_FLUSH_INTERVAL: Duration = Duration::from_secs(3); +const DEFAULT_HISTOGRAM_RESERVOIR_SIZE: usize = 1024; + +/// Errors that could occur while building or installing a DogStatsD recorder/exporter. +#[derive(Debug, Error, Eq, PartialEq)] +pub enum BuildError { + /// A generic invalid configuration setting. + #[error("invalid configuration: {reason}")] + InvalidConfiguration { + /// Details about the invalid configuration. + reason: String, + }, + + /// Failed to parse the remote address. + #[error("invalid remote address: {reason}")] + InvalidRemoteAddress { + /// Details about the parsing failure. + reason: String, + }, + + /// Failed to spawn the background thread in synchronous mode. + #[error("failed to spawn background thread for exporter in synchronous mode")] + Backend, + + /// Failed to install the recorder due to an existing global recorder already being installed. + #[error("failed to install exporter as global recorder")] + FailedToInstall, +} + +/// Aggregation mode. +#[derive(Debug)] +pub enum AggregationMode { + /// Counters and gauges are aggregated but are not sent with a timestamp. + /// + /// This mode still allows for reduced network traffic, but allows for scenarios where multiple instances of the + /// metric are sent to the same Datadog Agent instance and aren't otherwise differentiated. This may be the case if + /// Origin Detection is disabled in the Datadog Agent. + Conservative, + + /// Counters and gauges are aggregated and sent with a timestamp. + /// + /// This mode allows for the most efficient processing on the Datadog Agent side, as no aggregation is performed and + /// metrics are passed through with minimal processing. This mode should only be used when Origin Detection is + /// enabled, or when no other instances of the application are sending metrics to the same Datadog Agent instance, + /// as this can result in data points being overwritten if the same metric is sent multiple times with the same + /// timestamp. + Aggressive, +} + +/// Builder for a DogStatsD exporter. +#[derive(Debug)] +pub struct DogStatsDBuilder { + remote_addr: RemoteAddr, + write_timeout: Duration, + max_payload_len: usize, + flush_interval: Duration, + synchronous: bool, + agg_mode: AggregationMode, + telemetry: bool, + histogram_sampling: bool, + histogram_reservoir_size: usize, + histograms_as_distributions: bool, +} + +impl DogStatsDBuilder { + fn validate_max_payload_len(&self) -> Result<(), BuildError> { + if let RemoteAddr::Udp(_) = &self.remote_addr { + if self.max_payload_len > UDP_DATAGRAM_MAX_PAYLOAD_LEN { + return Err(BuildError::InvalidConfiguration { + reason: format!("maximum payload length ({} bytes) exceeds UDP datagram maximum length ({} bytes)", self.max_payload_len, UDP_DATAGRAM_MAX_PAYLOAD_LEN), + }); + } + } + + if self.max_payload_len > u32::MAX as usize { + return Err(BuildError::InvalidConfiguration { + reason: format!( + "maximum payload length ({} bytes) exceeds theoretical upper bound ({} bytes)", + self.max_payload_len, + u32::MAX + ), + }); + } + + Ok(()) + } + + /// Set the remote address to forward metrics to. + /// + /// For UDP, the address simply needs to be in the format of `:`. For Unix domain sockets, an address in + /// the format of `://`. The scheme can be either `unix` or `unixgram`, for a stream (`SOCK_STREAM`) + /// or datagram (`SOCK_DGRAM`) socket, respectively. + /// + /// Defaults to sending to `127.0.0.1:8125` over UDP. + /// + /// # Errors + /// + /// If the given address is not able to be parsed as a valid address, an error will be returned. + pub fn with_remote_address(mut self, addr: A) -> Result + where + A: AsRef, + { + self.remote_addr = RemoteAddr::try_from(addr.as_ref()) + .map_err(|reason| BuildError::InvalidRemoteAddress { reason })?; + + Ok(self) + } + + /// Set the write timeout for forwarding metrics. + /// + /// When the write timeout is reached, the write operation will be aborted and the payload being sent at the time + /// will be dropped without retrying. + /// + /// Defaults to 1 second. + #[must_use] + pub fn with_write_timeout(mut self, timeout: Duration) -> Self { + self.write_timeout = timeout; + self + } + + /// Set the maximum payload length for forwarding metrics. + /// + /// This controls the maximum size of a single payload that will be sent to the remote server. As metric payloads + /// are being built, they will be limited to this size. If a metric cannot be built without exceeding this size, it + /// will be dropped. + /// + /// This should generally be set to the same value (or lower) as `dogstatsd_buffer_size` in the Datadog Agent. + /// Setting a higher value is likely to lead to invalid metric payloads that are discarded by the Datadog Agent when + /// received. + /// + /// Defaults to 8192 bytes. + /// + /// # Errors + /// + /// If the maximum payload length is not valid for the underlying transport, an error will be returned. + pub fn with_maximum_payload_length( + mut self, + max_payload_len: usize, + ) -> Result { + self.max_payload_len = max_payload_len; + self.validate_max_payload_len()?; + + Ok(self) + } + + /// Use a synchronous backend for forwarding metrics. + /// + /// A background OS thread will be spawned to handle forwarding metrics to the remote server. + /// + /// Defaults to `true`. + #[must_use] + pub fn with_synchronous_backend(mut self) -> Self { + self.synchronous = true; + self + } + + /// Set the aggregation mode for the exporter. + /// + /// Counters and gauges are always aggregated locally before forwarding to the Datadog Agent, but the aggregation + /// mode controls how much information is sent in the metric payloads. Changing the aggregation mode can potentially + /// allow for more efficient processing on the Datadog Agent side, but is not suitable for all scenarios. + /// + /// See [`AggregationMode`] for more details. + /// + /// Defaults to [`AggregationMode::Conservative`]. + #[must_use] + pub fn with_aggregation_mode(mut self, mode: AggregationMode) -> Self { + self.agg_mode = mode; + self + } + + /// Set the flush interval of the aggregator. + /// + /// This controls how often metrics are forwarded to the remote server, and in turn controls the efficiency of + /// aggregation. A shorter interval will provide more frequent updates to the remote server, but will result in more + /// network traffic and processing overhead. + /// + /// Defaults to 3 seconds. + #[must_use] + pub fn with_flush_interval(mut self, flush_interval: Duration) -> Self { + self.flush_interval = flush_interval; + self + } + + /// Sets whether or not to enable telemetry for the exporter. + /// + /// When enabled, additional metrics will be sent to the configured remote server that provide insight into the + /// operation of the exporter itself, such as the number of active metrics, how many points were flushed or dropped, + /// how many payloads and bytes were sent, and so on. + /// + /// Defaults to `true`. + #[must_use] + pub fn with_telemetry(mut self, telemetry: bool) -> Self { + self.telemetry = telemetry; + self + } + + /// Sets whether or not to enable histogram sampling. + /// + /// When enabled, histograms utilize [reservoir sampling][reservoir] to represent any arbitrarily large number of + /// input values using a small, fixed size array. This means that whether or not the histogram has 1,000 or + /// 1,000,000 values recorded to itl, the memory consumption will be the same _and_ the resulting values in the + /// histogram will be statistically representative of the overall population. + /// + /// When histogram sampling is enabled, each histogram metric will consume roughly `reservoir_size * 16` bytes. For + /// example, when the reservoir size is 1,024, each histogram will consume roughly 16KB of memory. This memory is + /// allocated for the life of a histogram and does not grow or shrink while the histogram is active, so care must be + /// taken if there are a high number of active histograms at any given time. + /// + /// If your application frequently has many (100s or more) active histograms, or if your application does not have a + /// high number of histogram updates, you likely will not benefit from enabling histogram sampling. + /// + /// Defaults to `true`. + /// + /// [reservoir]: https://en.wikipedia.org/wiki/Reservoir_sampling + #[must_use] + pub fn with_histogram_sampling(mut self, histogram_sampling: bool) -> Self { + self.histogram_sampling = histogram_sampling; + self + } + + /// Sets the reservoir size for histogram sampling. + /// + /// Defaults to 1,024. + #[must_use] + pub fn with_histogram_reservoir_size(mut self, reservoir_size: usize) -> Self { + self.histogram_reservoir_size = reservoir_size; + self + } + + /// Sets whether or not to send histograms as distributions. + /// + /// When enabled, histograms will be sent as distributions to the remote server. This changes the default behavior + /// of how the metrics will be processed by the Datadog Agent, as histograms have a specific set of default + /// "aggregates" calculated -- `max`, `median`, `avg`, `count`, etc -- locally in the Datadog Agent, whereas + /// distributions are aggregated entirely on the Datadog backend, and provide richer support for global aggregation + /// and specific percentiles. + /// + /// Generally speaking, distributions are vastly more powerful and preferred over histograms, but sending as + /// histograms may be required to ensure parity with existing applications. + /// + /// Defaults to `true`. + #[must_use] + pub fn send_histograms_as_distributions(mut self, histograms_as_distributions: bool) -> Self { + self.histograms_as_distributions = histograms_as_distributions; + self + } + + /// Builds the recorder. + /// + /// The configured backend will be spawned to forward metrics to the remote server, but the recorder must be + /// manually installed by the caller. + /// + /// # Errors + /// + /// If the exporter is configured to use an asynchronous backend but is not built in the context of an asynchronous + /// runtime, or if the maximum payload length is not valid for the underlying transport, an error will be returned. + pub fn build(self) -> Result { + self.validate_max_payload_len()?; + + let state_config = StateConfiguration { + agg_mode: self.agg_mode, + telemetry: self.telemetry, + histogram_sampling: self.histogram_sampling, + histogram_reservoir_size: self.histogram_reservoir_size, + histograms_as_distributions: self.histograms_as_distributions, + }; + let state = Arc::new(State::new(state_config)); + + let recorder = DogStatsDRecorder::new(Arc::clone(&state)); + + let forwarder_config = ForwarderConfiguration { + remote_addr: self.remote_addr, + max_payload_len: self.max_payload_len, + flush_interval: self.flush_interval, + write_timeout: self.write_timeout, + }; + + if self.synchronous { + let forwarder = forwarder::sync::Forwarder::new(forwarder_config, state); + + std::thread::Builder::new() + .name("metrics-exporter-dogstatsd-forwarder".to_string()) + .spawn(move || forwarder.run()) + .map_err(|_| BuildError::Backend)?; + } else { + unreachable!("Asynchronous backend should not be configurable yet."); + } + + Ok(recorder) + } + + /// Builds and installs the recorder. + /// + /// The configured backend will be spawned to forward metrics to the remote server, and the recorder will be + /// installed as the global recorder. + /// + /// # Errors + /// + /// If the exporter is configured to use an asynchronous backend but is not built in the context of an asynchronous + /// runtime, or if the maximum payload length is not valid for the underlying transport, or if a global recorder is + /// already installed, an error will be returned. + pub fn install(self) -> Result<(), BuildError> { + let recorder = self.build()?; + + metrics::set_global_recorder(recorder).map_err(|_| BuildError::FailedToInstall) + } +} + +impl Default for DogStatsDBuilder { + fn default() -> Self { + DogStatsDBuilder { + remote_addr: RemoteAddr::Udp(vec![SocketAddr::from(([127, 0, 0, 1], 8125))]), + write_timeout: DEFAULT_WRITE_TIMEOUT, + max_payload_len: DEFAULT_MAX_PAYLOAD_LEN, + flush_interval: DEFAULT_FLUSH_INTERVAL, + synchronous: true, + agg_mode: AggregationMode::Conservative, + telemetry: true, + histogram_sampling: false, + histogram_reservoir_size: DEFAULT_HISTOGRAM_RESERVOIR_SIZE, + histograms_as_distributions: true, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn max_payload_len_exceeds_udp_max_len() { + let builder = + DogStatsDBuilder::default().with_maximum_payload_length(UDP_DATAGRAM_MAX_PAYLOAD_LEN); + assert!(builder.is_ok()); + + let builder = DogStatsDBuilder::default() + .with_maximum_payload_length(UDP_DATAGRAM_MAX_PAYLOAD_LEN + 1); + assert_eq!( + builder.unwrap_err(), + BuildError::InvalidConfiguration { + reason: format!( + "maximum payload length (65528 bytes) exceeds UDP datagram maximum length (65527 bytes)" + ) + } + ); + } + + mod linux { + use super::*; + + #[test] + fn max_payload_len_exceeds_udp_max_len_transport_change() { + let builder = DogStatsDBuilder::default() + .with_remote_address("unix:///tmp/dogstatsd.sock") + .unwrap() + .with_maximum_payload_length(u32::MAX as usize) + .unwrap() + .with_remote_address("127.0.0.1:9125") + .unwrap(); + + match builder.build() { + Ok(_) => panic!("expected error"), + Err(e) => assert_eq!(e, BuildError::InvalidConfiguration { + reason: "maximum payload length (4294967295 bytes) exceeds UDP datagram maximum length (65527 bytes)".to_string() + }), + } + } + + #[test] + fn max_payload_len_exceeds_theoretical_max() { + let builder = DogStatsDBuilder::default() + .with_remote_address("unix:///tmp/dogstatsd.sock") + .unwrap() + .with_maximum_payload_length(u32::MAX as usize); + assert!(builder.is_ok()); + + let builder = DogStatsDBuilder::default() + .with_remote_address("unix:///tmp/dogstatsd.sock") + .unwrap() + .with_maximum_payload_length((u32::MAX as usize) + 1); + assert_eq!( + builder.unwrap_err(), + BuildError::InvalidConfiguration { + reason: format!( + "maximum payload length (4294967296 bytes) exceeds theoretical upper bound (4294967295 bytes)" + ) + } + ); + } + } +} diff --git a/metrics-exporter-dogstatsd/src/forwarder/mod.rs b/metrics-exporter-dogstatsd/src/forwarder/mod.rs new file mode 100644 index 00000000..4b4cdeed --- /dev/null +++ b/metrics-exporter-dogstatsd/src/forwarder/mod.rs @@ -0,0 +1,130 @@ +#[cfg(target_os = "linux")] +use std::path::PathBuf; +use std::{ + net::{SocketAddr, ToSocketAddrs as _}, + time::Duration, +}; + +pub mod sync; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) enum RemoteAddr { + Udp(Vec), + + #[cfg(target_os = "linux")] + Unixgram(PathBuf), + + #[cfg(target_os = "linux")] + Unix(PathBuf), +} + +impl RemoteAddr { + /// Returns the transport ID for the remote address. + /// + /// This is a simple acronym related to the transport that will be used for the remote address, such as `udp` for + /// UDP, and so on. + pub const fn transport_id(&self) -> &'static str { + match self { + RemoteAddr::Udp(_) => "udp", + #[cfg(target_os = "linux")] + RemoteAddr::Unix(_) => "uds-stream", + #[cfg(target_os = "linux")] + RemoteAddr::Unixgram(_) => "uds", + } + } +} + +impl<'a> TryFrom<&'a str> for RemoteAddr { + type Error = String; + + fn try_from(addr: &'a str) -> Result { + // Try treating the address as a fully-qualified URL, where the scheme is the transport identifier. + if let Some((scheme, path)) = addr.split_once("://") { + return match scheme { + #[cfg(target_os = "linux")] + "unix" => Ok(RemoteAddr::Unix(PathBuf::from(path))), + #[cfg(target_os = "linux")] + "unixgram" => Ok(RemoteAddr::Unixgram(PathBuf::from(path))), + "udp" => match path.to_socket_addrs() { + Ok(addr) => Ok(RemoteAddr::Udp(addr.collect())), + Err(e) => Err(e.to_string()), + }, + _ => Err(unknown_scheme_error_str(scheme)), + }; + } + + // When there's no scheme present, treat the address as a UDP address. + match addr.to_socket_addrs() { + Ok(addr) => Ok(RemoteAddr::Udp(addr.collect())), + Err(e) => Err(e.to_string()), + } + } +} + +fn unknown_scheme_error_str(scheme: &str) -> String { + format!("invalid scheme '{scheme}' (expected 'udp', 'unix', or 'unixgram')") +} + +/// Forwarder configuration. +#[derive(Clone)] +pub(crate) struct ForwarderConfiguration { + pub remote_addr: RemoteAddr, + pub max_payload_len: usize, + pub flush_interval: Duration, + pub write_timeout: Duration, +} + +impl ForwarderConfiguration { + /// Returns `true` if the remote address requires a length prefix to be sent before each payload. + pub fn is_length_prefixed(&self) -> bool { + match self.remote_addr { + RemoteAddr::Udp(_) => false, + #[cfg(target_os = "linux")] + RemoteAddr::Unix(_) => true, + #[cfg(target_os = "linux")] + RemoteAddr::Unixgram(_) => true, + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddrV4; + + use super::*; + + #[test] + fn remote_addr_basic() { + let addr = RemoteAddr::try_from("127.0.0.1:8125").unwrap(); + let inner_addrs = vec![SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), 8125))]; + assert_eq!(addr, RemoteAddr::Udp(inner_addrs)); + } + + #[test] + fn remote_addr_scheme_udp() { + let addr = RemoteAddr::try_from("udp://127.0.0.1:8127").unwrap(); + let inner_addrs = vec![SocketAddr::V4(SocketAddrV4::new([127, 0, 0, 1].into(), 8127))]; + assert_eq!(addr, RemoteAddr::Udp(inner_addrs)); + } + + #[test] + fn remote_addr_scheme_unknown() { + let addr = RemoteAddr::try_from("spongebob://127.0.0.1:8675"); + assert_eq!(addr, Err(unknown_scheme_error_str("spongebob"))); + } + + #[cfg(target_os = "linux")] + mod linux { + #[test] + fn remote_addr_scheme_unix() { + let addr = super::RemoteAddr::try_from("unix:///tmp/dogstatsd.sock").unwrap(); + assert_eq!(addr, super::RemoteAddr::Unix("/tmp/dogstatsd.sock".into())); + } + + #[test] + fn remote_addr_scheme_unixgram() { + let addr = super::RemoteAddr::try_from("unixgram:///tmp/dogstatsd.sock").unwrap(); + assert_eq!(addr, super::RemoteAddr::Unixgram("/tmp/dogstatsd.sock".into())); + } + } +} diff --git a/metrics-exporter-dogstatsd/src/forwarder/sync.rs b/metrics-exporter-dogstatsd/src/forwarder/sync.rs new file mode 100644 index 00000000..e81f6d95 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/forwarder/sync.rs @@ -0,0 +1,210 @@ +#[cfg(target_os = "linux")] +use std::os::unix::net::{UnixDatagram, UnixStream}; +use std::{ + io::{self, Write as _}, + net::{Ipv4Addr, UdpSocket}, + sync::Arc, + thread::sleep, + time::Instant, +}; + +use tracing::{debug, error, trace}; + +use super::{ForwarderConfiguration, RemoteAddr}; +use crate::{ + state::{FlushState, State}, + telemetry::{Telemetry, TelemetryUpdate}, + writer::PayloadWriter, +}; + +enum Client { + Udp(UdpSocket), + + #[cfg(target_os = "linux")] + Unixgram(UnixDatagram), + + #[cfg(target_os = "linux")] + Unix(UnixStream), +} + +impl Client { + fn from_forwarder_config(config: &ForwarderConfiguration) -> io::Result { + match &config.remote_addr { + RemoteAddr::Udp(addrs) => { + UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).and_then(|socket| { + socket.connect(&addrs[..])?; + socket.set_write_timeout(Some(config.write_timeout))?; + Ok(Client::Udp(socket)) + }) + } + + #[cfg(target_os = "linux")] + RemoteAddr::Unixgram(path) => UnixDatagram::bind(path).and_then(|socket| { + socket.set_write_timeout(Some(config.write_timeout))?; + Ok(Client::Unixgram(socket)) + }), + + #[cfg(target_os = "linux")] + RemoteAddr::Unix(path) => UnixStream::connect(path).and_then(|socket| { + socket.set_write_timeout(Some(config.write_timeout))?; + Ok(Client::Unix(socket)) + }), + } + } + + fn send(&mut self, buf: &[u8]) -> io::Result { + match self { + Client::Udp(socket) => socket.send(buf), + + #[cfg(target_os = "linux")] + Client::Unixgram(socket) => socket.send(buf), + + #[cfg(target_os = "linux")] + Client::Unix(socket) => match socket.write_all(buf) { + Ok(()) => Ok(buf.len()), + Err(e) => Err(e), + }, + } + } +} + +enum ClientState { + // Intermediate state during send attempts. + Inconsistent, + + // Forwarder is currently disconnected. + Disconnected(ForwarderConfiguration), + + // Forwarder is connected and ready to send metrics. + Ready(ForwarderConfiguration, Client), +} + +impl ClientState { + fn try_send(&mut self, payload: &[u8]) -> io::Result { + loop { + let old_state = std::mem::replace(self, ClientState::Inconsistent); + match old_state { + ClientState::Inconsistent => unreachable!("transitioned _from_ inconsistent state"), + ClientState::Disconnected(config) => { + let client = Client::from_forwarder_config(&config)?; + *self = ClientState::Ready(config, client); + } + ClientState::Ready(config, mut client) => { + let result = client.send(payload); + if result.is_ok() { + *self = ClientState::Ready(config, client); + } else { + *self = ClientState::Disconnected(config); + } + + return result; + } + }; + } + } +} + +pub(crate) struct Forwarder { + client_state: ClientState, + config: ForwarderConfiguration, + state: Arc, + telemetry: Option, +} + +impl Forwarder { + /// Create a new synchronous `Forwarder`. + pub fn new(config: ForwarderConfiguration, state: Arc) -> Self { + Forwarder { + client_state: ClientState::Disconnected(config.clone()), + config, + state, + telemetry: None, + } + } + + fn update_telemetry(&mut self, update: &TelemetryUpdate) { + // If we processed any metrics, update our telemetry. + // + // We do it in this lazily-initialized fashion because we need to register our internal telemetry metrics with + // the global recorder _after_ we've been installed, so that the metrics all flow through the same recorder + // stack and are affected by any relevant recorder layers, and so on. + // + // When we have updates, we know that can only have happened if the recorder was installed and metrics were + // being processed, so we can safely initialize our telemetry at this point. + if self.state.telemetry_enabled() && update.had_updates() { + let telemetry = self + .telemetry + .get_or_insert_with(|| Telemetry::new(self.config.remote_addr.transport_id())); + telemetry.apply_update(update); + } + } + + /// Run the forwarder, sending out payloads to the configured remote address at the configured interval. + pub fn run(mut self) { + let mut flush_state = FlushState::default(); + let mut writer = + PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed()); + let mut telemetry_update = TelemetryUpdate::default(); + + let mut next_flush = Instant::now() + self.config.flush_interval; + loop { + // Sleep until our target flush deadline. + // + // If the previous flush iteration took longer than the flush interval, we won't sleep at all. + if let Some(sleep_duration) = next_flush.checked_duration_since(Instant::now()) { + sleep(sleep_duration); + } + + // Process our flush, building up all of our payloads. + // + // We'll also calculate our next flush time here, so that we can splay out the payloads over the remaining + // time we have before we should be flushing again. + next_flush = Instant::now() + self.config.flush_interval; + + telemetry_update.clear(); + self.state.flush(&mut flush_state, &mut writer, &mut telemetry_update); + + // Send out all of the payloads that we've written, but splay them out over the remaining time until our + // next flush, in order to smooth out the network traffic / processing demands on the Datadog Agent. + let mut payloads = writer.payloads(); + if u32::try_from(payloads.len()).is_err() { + error!(num_payloads = payloads.len(), "Too many payloads to send."); + continue; + }; + + let splay_duration = next_flush.saturating_duration_since(Instant::now()); + debug!( + ?splay_duration, + num_payloads = payloads.len(), + "Splaying payloads over remaining time until next flush." + ); + + let mut payloads_sent = 0; + let mut payloads_dropped = 0; + + while let Some(payload) = payloads.next_payload() { + if let Err(e) = self.client_state.try_send(payload) { + error!(error = %e, "Failed to send payload."); + telemetry_update.track_packet_send_failed(payload.len()); + payloads_dropped += 1; + } else { + telemetry_update.track_packet_send_succeeded(payload.len()); + payloads_sent += 1; + } + + // Figure out how long we should sleep based on the remaining time until the next flush and the number + // of remaining payloads. + let next_flush_delta = next_flush.saturating_duration_since(Instant::now()); + let remaining_payloads = u32::try_from(payloads.len()).unwrap(); + let inter_payload_sleep = next_flush_delta / remaining_payloads.saturating_add(1); + + trace!(remaining_payloads, "Sleeping {:?} between payloads.", inter_payload_sleep); + sleep(inter_payload_sleep); + } + + debug!(payloads_sent, payloads_dropped, "Finished sending payloads."); + + self.update_telemetry(&telemetry_update); + } + } +} diff --git a/metrics-exporter-dogstatsd/src/lib.rs b/metrics-exporter-dogstatsd/src/lib.rs new file mode 100644 index 00000000..b7845998 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/lib.rs @@ -0,0 +1,106 @@ +//! A [`metrics`]-compatible exporter for sending metrics to a [DogStatsD][dsd]-compatible server. +//! +//! [dsd]: https://docs.datadoghq.com/developers/dogstatsd/ +//! +//! # Usage +//! +//! Using the exporter is straightforward: +//! +//! ```no_run +//! # use metrics_exporter_dogstatsd::DogStatsDBuilder; +//! // First, create a builder. +//! // +//! // The builder can configure many aspects of the exporter, such as changing the listen address, adjusting how +//! // histograms will be reported, configuring sampling, and more. +//! let builder = DogStatsDBuilder::default(); +//! +//! // At this point, any of the methods on `DogStatsDBuilder` can be called to configure the exporter, such as +//! // setting a non-default remote address, or configuring how histograms are sampled, and so on. +//! +//! // Normally, most users will want to "install" the exporter which sets it as the global recorder for all `metrics` +//! // calls, and creates the necessary background thread/task to flush the metrics to the remote DogStatsD server. +//! builder.install().expect("failed to install recorder/exporter"); +//! +//! // For scenarios where you need access to the `Recorder` object, perhaps to wrap it in a layer stack, or something +//! // else, you can simply call `build` instead of `install`: +//! # let builder = DogStatsDBuilder::default(); +//! let recorder = builder.build().expect("failed to build recorder"); +//! ``` +//! +//! # Features +//! +//! ## Client-side aggregation +//! +//! The exporter takes advantage of support in the DogStatsD protocol for aggregating metrics on the client-side, by +//! either utilizing multi-value payloads (for histograms, DSD v1.1) or aggregating points and specifying a timestamp +//! directly (for counters and gauges, DSD v1.3). +//! +//! This helps reduce load on the downstream DogStatsD server. +//! +//! ## Histogram sampling +//! +//! Histograms can be sampled at a configurable rate to limit the maximum per-histogram memory consumption and reduce +//! load on the downstream DogStatsD server. Reservoir sampling is used to ensure that the samples are statistically +//! representative over the overall population of values, even when the reservoir size is much smaller than the total +//! population size: we can hold 1,000 to 2,000 samples and still get a good representation when the number of input +//! values is in the millions. +//! +//! ## Smart reporting +//! +//! The exporter will "splay" the reporting of metrics over time, to smooth out the rate of payloads received by the +//! downstream DogStatsD server. This means that instead of reporting `N` metrics as fast as possibler every `M` +//! seconds, the exporter tries to ensure that over the period of `M` seconds, metrics are flushed at a rate of `M/N`. +//! +//! This is also designed to help reduce load on the downstream DogStatsD server to help avoid spiky resource +//! consumption and potentially dropped metrics. +//! +//! ## Full transport support for Unix domain sockets +//! +//! The exporter supports sending metrics to a DogStatsD server over all three major allowable transports: UDP, and Unix +//! domain sockets in either `SOCK_DGRAM` or `SOCK_STREAM` mode. +//! +//! `SOCK_STREAM` mode is roughly equivalent to TCP, but only available on the same host, and provides better +//! guarantees around message delivery in high-throughput scenarios. +//! +//! ## Telemetry +//! +//! The exporter captures its own internal telemetry around the number of active metrics, points flushed or dropped, +//! payloads/bytes sent, and so on. This telemetry can be emitted to the same downstream DogStatsD server as the +//! exporter itself. +//! +//! All internal telemetry is under the `datadog.dogstatsd.client` namespace, to align with the internal telemetry +//! emitted by official DogStatsD clients. +//! +//! # Missing +//! +//! ## Container ID detection +//! +//! We do not yet support container ID detection (DSD v1.2) which is used to help aid the downstream DogStatsD server in +//! enriching the metrics with additional metadata relevant to the host/application emitting the metrics. +//! +//! ## Asynchronous backend +//! +//! We do not yet support an asynchronous forwarder backend for flushing metrics. + +#![deny(clippy::all)] +#![deny(clippy::pedantic)] +#![allow(clippy::cast_possible_truncation)] +#![allow(clippy::cast_precision_loss)] +#![allow(clippy::cast_sign_loss)] +#![allow(clippy::must_use_candidate)] +#![allow(clippy::module_name_repetitions)] +#![allow(clippy::struct_excessive_bools)] +#![deny(missing_docs)] +#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))] + +mod builder; +pub use self::builder::{AggregationMode, BuildError, DogStatsDBuilder}; + +mod forwarder; +mod recorder; +pub use self::recorder::DogStatsDRecorder; + +mod state; +mod storage; +mod telemetry; +mod writer; diff --git a/metrics-exporter-dogstatsd/src/recorder.rs b/metrics-exporter-dogstatsd/src/recorder.rs new file mode 100644 index 00000000..79f94f6a --- /dev/null +++ b/metrics-exporter-dogstatsd/src/recorder.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; + +use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit}; + +use crate::state::State; + +/// A recorder that forwards metrics to a DogStatsD server. +pub struct DogStatsDRecorder { + state: Arc, +} + +impl DogStatsDRecorder { + pub(crate) fn new(state: Arc) -> Self { + DogStatsDRecorder { state } + } +} + +impl Recorder for DogStatsDRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + + fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter { + self.state + .registry() + .get_or_create_counter(key, |existing| Counter::from_arc(Arc::clone(existing))) + } + + fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge { + self.state + .registry() + .get_or_create_gauge(key, |existing| Gauge::from_arc(Arc::clone(existing))) + } + + fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram { + self.state + .registry() + .get_or_create_histogram(key, |existing| Histogram::from_arc(Arc::clone(existing))) + } +} diff --git a/metrics-exporter-dogstatsd/src/state.rs b/metrics-exporter-dogstatsd/src/state.rs new file mode 100644 index 00000000..64aa2422 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/state.rs @@ -0,0 +1,203 @@ +use std::{collections::HashSet, time::SystemTime}; + +use metrics::Key; +use metrics_util::registry::Registry; +use tracing::error; + +use crate::{ + builder::AggregationMode, storage::ClientSideAggregatedStorage, telemetry::TelemetryUpdate, + writer::PayloadWriter, +}; + +/// Exporter state configuration. +pub struct StateConfiguration { + /// Aggregation mode when flushing counters and gauges. + /// + /// See [`AggregationMode`] for more information. + pub agg_mode: AggregationMode, + + /// Whether or not to collect/emit internal telemetry. + pub telemetry: bool, + + /// Whether or not to sample histograms. + pub histogram_sampling: bool, + + /// Reservoir size when histogram sampling is enabled. + pub histogram_reservoir_size: usize, + + /// Whether or not to emit histograms as distributions. + pub histograms_as_distributions: bool, +} + +/// Exporter state. +pub(crate) struct State { + config: StateConfiguration, + registry: Registry, +} + +impl State { + /// Creates a new `State` from the given configuration. + pub fn new(config: StateConfiguration) -> Self { + State { + registry: Registry::new(ClientSideAggregatedStorage::new( + config.histogram_sampling, + config.histogram_reservoir_size, + )), + config, + } + } + + /// Returns a reference to the registry. + pub fn registry(&self) -> &Registry { + &self.registry + } + + /// Returns `true` if telemetry is enabled. + pub fn telemetry_enabled(&self) -> bool { + self.config.telemetry + } + + fn get_aggregation_timestamp(&self) -> Option { + match self.config.agg_mode { + AggregationMode::Conservative => { + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).ok().map(|d| d.as_secs()) + } + AggregationMode::Aggressive => None, + } + } + + /// Flushes all registered metrics to the given payload writer. + pub fn flush( + &self, + flush_state: &mut FlushState, + writer: &mut PayloadWriter, + telemetry: &mut TelemetryUpdate, + ) { + // TODO: Delete metrics when they are idle. (This needs support in the handles before we could do this.) + + let counters = self.registry.get_counter_handles(); + let mut active_counters = 0; + + for (key, counter) in counters { + let (value, points_flushed) = counter.flush(); + + // If the counter is already idle, and no updates were made since the last time the counter was flushed, + // then we've already emitted our zero value and no longer need to emit updates until the counter is active + // again. + if points_flushed == 0 { + if flush_state.is_counter_idle(&key) { + continue; + } + + flush_state.mark_counter_as_idle(key.clone()); + } else { + flush_state.clear_counter_idle(&key); + } + + active_counters += 1; + + let result = writer.write_counter(&key, value, self.get_aggregation_timestamp()); + if result.any_failures() { + let points_dropped = result.points_dropped(); + error!( + metric_name = key.name(), + points_dropped, "Failed to build counter payload." + ); + + telemetry.track_packet_serializer_failed(); + } else { + telemetry.increment_counter_points(points_flushed); + } + } + + telemetry.increment_counter_contexts(active_counters); + + let gauges = self.registry.get_gauge_handles(); + telemetry.increment_gauge_contexts(gauges.len()); + + for (key, gauge) in gauges { + let (value, points_flushed) = gauge.flush(); + let result = writer.write_gauge(&key, value, self.get_aggregation_timestamp()); + if result.any_failures() { + let points_dropped = result.points_dropped(); + error!(metric_name = key.name(), points_dropped, "Failed to build gauge payload."); + + telemetry.track_packet_serializer_failed(); + } else { + telemetry.increment_gauge_points(points_flushed); + } + } + + let histograms = self.registry.get_histogram_handles(); + let mut active_histograms = 0; + + for (key, histogram) in histograms { + if histogram.is_empty() { + continue; + } + + active_histograms += 1; + + histogram.flush(|maybe_sample_rate, values| { + let points_len = values.len(); + let result = if self.config.histograms_as_distributions { + writer.write_distribution(&key, values, maybe_sample_rate) + } else { + writer.write_histogram(&key, values, maybe_sample_rate) + }; + + // Scale the points flushed/dropped values by the sample rate to determine the true number of points flushed/dropped. + let sample_rate = maybe_sample_rate.unwrap_or(1.0); + let points_flushed = + ((points_len as u64 - result.points_dropped()) as f64 / sample_rate) as u64; + telemetry.increment_histogram_points(points_flushed); + + let points_dropped = (result.points_dropped() as f64 / sample_rate) as u64; + + if result.any_failures() { + if result.payloads_written() > 0 { + error!( + metric_name = key.name(), + points_dropped, "Failed to build some histogram payload(s)." + ); + } else { + error!( + metric_name = key.name(), + points_dropped, "Failed to build any histogram payload(s)." + ); + } + + telemetry.track_packet_serializer_failed(); + } + }); + } + + telemetry.increment_histogram_contexts(active_histograms); + } +} + +/// Flush state. +/// +/// This type contains state information related to flush operations, and is intended to be held by the forwarder and +/// used during each flush. This allows the flush state information to be updated without needing to wrap calls to +/// `State` within a lock. +#[derive(Default)] +pub struct FlushState { + idle_counters: HashSet, +} + +impl FlushState { + /// Marks a counter as idle. + fn mark_counter_as_idle(&mut self, key: Key) { + self.idle_counters.insert(key); + } + + fn clear_counter_idle(&mut self, key: &Key) { + self.idle_counters.remove(key); + } + + /// Returns `true` if the counter is idle. + fn is_counter_idle(&self, key: &Key) -> bool { + self.idle_counters.contains(key) + } +} diff --git a/metrics-exporter-dogstatsd/src/storage.rs b/metrics-exporter-dogstatsd/src/storage.rs new file mode 100644 index 00000000..49b40cf1 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/storage.rs @@ -0,0 +1,371 @@ +use std::{ + slice::Iter, + sync::{ + atomic::{ + AtomicBool, AtomicU64, + Ordering::{AcqRel, Acquire, Relaxed, Release}, + }, + Arc, + }, +}; + +use metrics::{CounterFn, GaugeFn, HistogramFn, Key}; +use metrics_util::{ + registry::Storage, + storage::{ + reservoir::{AtomicSamplingReservoir, Drain}, + AtomicBucket, + }, +}; + +pub(crate) struct AtomicCounter { + is_absolute: AtomicBool, + last: AtomicU64, + current: AtomicU64, + updates: AtomicU64, +} + +impl AtomicCounter { + /// Creates a new `AtomicCounter`. + fn new() -> Self { + Self { + is_absolute: AtomicBool::new(false), + last: AtomicU64::new(0), + current: AtomicU64::new(0), + updates: AtomicU64::new(0), + } + } + + /// Flushes the current counter value, returning the delta of the counter value, and the number of updates, since + /// the last flush. + pub fn flush(&self) -> (u64, u64) { + let current = self.current.load(Acquire); + let last = self.last.swap(current, AcqRel); + let delta = current.wrapping_sub(last); + let updates = self.updates.swap(0, AcqRel); + + (delta, updates) + } +} + +impl CounterFn for AtomicCounter { + fn increment(&self, value: u64) { + self.is_absolute.store(false, Release); + self.current.fetch_add(value, Relaxed); + self.updates.fetch_add(1, Relaxed); + } + + fn absolute(&self, value: u64) { + // Ensure the counter is in absolute mode, and if it wasn't already, reset `last` to `value` to give ourselves a + // consistent starting point when flushing. This ensures that we only start flushing deltas once we've gotten + // two consecutive absolute values, since otherwise we might be calculating a delta between a `last` of 0 and a + // very large `current` value. + if !self.is_absolute.swap(true, Release) { + self.last.store(value, Release); + } + + self.current.store(value, Release); + self.updates.fetch_add(1, Relaxed); + } +} + +pub(crate) struct AtomicGauge { + inner: AtomicU64, + updates: AtomicU64, +} + +impl AtomicGauge { + /// Creates a new `AtomicGauge`. + fn new() -> Self { + Self { inner: AtomicU64::new(0.0f64.to_bits()), updates: AtomicU64::new(0) } + } + + /// Flushes the current gauge value and the number of updates since the last flush. + pub fn flush(&self) -> (f64, u64) { + let current = f64::from_bits(self.inner.load(Acquire)); + let updates = self.updates.swap(0, AcqRel); + + (current, updates) + } +} + +impl GaugeFn for AtomicGauge { + fn increment(&self, value: f64) { + self.inner + .fetch_update(AcqRel, Relaxed, |current| { + let new = f64::from_bits(current) + value; + Some(f64::to_bits(new)) + }) + .expect("should never fail to update gauge"); + self.updates.fetch_add(1, Relaxed); + } + + fn decrement(&self, value: f64) { + self.inner + .fetch_update(AcqRel, Relaxed, |current| { + let new = f64::from_bits(current) - value; + Some(f64::to_bits(new)) + }) + .expect("should never fail to update gauge"); + self.updates.fetch_add(1, Relaxed); + } + + fn set(&self, value: f64) { + self.inner.store(value.to_bits(), Release); + self.updates.fetch_add(1, Relaxed); + } +} + +pub(crate) enum AtomicHistogram { + Raw(AtomicBucket), + Sampled(AtomicSamplingReservoir), +} + +impl AtomicHistogram { + /// Creates a new `AtomicHistogram` based on the given sampling configuration. + fn new(sampling: bool, reservoir_size: usize) -> Self { + if sampling { + AtomicHistogram::Sampled(AtomicSamplingReservoir::new(reservoir_size)) + } else { + AtomicHistogram::Raw(AtomicBucket::new()) + } + } + + /// Returns `true` if the histogram is empty. + pub fn is_empty(&self) -> bool { + match self { + AtomicHistogram::Raw(bucket) => bucket.is_empty(), + AtomicHistogram::Sampled(reservoir) => reservoir.is_empty(), + } + } + + /// Records a new value in the histogram. + pub fn record(&self, value: f64) { + match self { + AtomicHistogram::Raw(bucket) => bucket.push(value), + AtomicHistogram::Sampled(reservoir) => reservoir.push(value), + } + } + + /// Flushes the histogram, calling the given closure with the calculated sample rate and an iterator over the + /// histogram values. + /// + /// If the sample rate is `None`, the histogram has not been sampled at all and the iterator will contain all of the + /// values since the last flush. Otherwise, the sample rate will be a value between 0.0 and 1.0, indicating the + /// extent of sampling which has occurred since the last flush. The values may not _necessarily_ have been sampled, + /// in which case the sample rate will be a nominal 1.0 value. + /// + /// Depending on the underlying histogram implementation, the closure may be called multiple times. Callers are + /// responsible for using the sample rate and reported length of the iterator ([`Values<'a>`] implements + /// [`ExactSizeIterator`]) to calculate the unsampled length of the histogram. + pub fn flush(&self, mut f: F) + where + F: FnMut(Option, Values<'_>), + { + match self { + AtomicHistogram::Raw(bucket) => bucket.clear_with(|values| { + f(None, Values::Raw(values.iter())); + }), + AtomicHistogram::Sampled(reservoir) => reservoir.consume(|values| { + f(Some(values.sample_rate()), Values::Sampled(values)); + }), + } + } +} + +impl HistogramFn for AtomicHistogram { + fn record(&self, value: f64) { + self.record(value); + } +} + +pub(crate) enum Values<'a> { + Raw(Iter<'a, f64>), + Sampled(Drain<'a>), +} + +impl<'a> Iterator for Values<'a> { + type Item = f64; + + fn next(&mut self) -> Option { + match self { + Values::Raw(values) => values.next().copied(), + Values::Sampled(drain) => drain.next(), + } + } +} + +impl<'a> ExactSizeIterator for Values<'a> { + fn len(&self) -> usize { + match self { + Values::Raw(values) => values.len(), + Values::Sampled(drain) => drain.len(), + } + } +} + +/// Client-side aggregated metrics storage. +/// +/// This storage implementation is designed to be used for aggregating metric values on the client side before sending +/// them to the server. This allows for more efficient transmission of metrics data to the DogStatsD server, as each +/// individual point does not have to be emitted over the network. +/// +/// # Behavior +/// +/// - Counters are aggregated by summing the increments since the last flush. +/// - Gauges simply maintain their standard "last write wins" behavior and emit the latest value when flushed. +/// - Histograms have their individual values stored as there ia no suitable way to aggregate them. +/// +/// # Absolute versus incremental updates to counters +/// +/// As we must support both absolute and incremental updates to counters, we need to be able to differentiate between +/// the two cases, which we do in the following way: if a counter is updated absolutely, and the _last_ update was not +/// an absolute value, we reset the counter's state such that the next immediate flush will return a delta of zero. +/// +/// This means that a counter needs to have two consecutive absolute updates before it will start emitting deltas, as a +/// stable starting point is required to calculate deltas from. This also means that if a counter has incremental +/// updates that have not yet been flushed, and an absolute update comes in before the next flush, those updates could +/// effectively be lost unless the absolute value accounts for them. +/// +/// This should almost never be a concern in practice, as mixing incremental and absolute values is exceedingly rare. +pub(crate) struct ClientSideAggregatedStorage { + histogram_sampling: bool, + histogram_reservoir_size: usize, +} + +impl ClientSideAggregatedStorage { + /// Creates a new `ClientSideAggregatedStorage`. + pub fn new(histogram_sampling: bool, histogram_reservoir_size: usize) -> Self { + Self { histogram_sampling, histogram_reservoir_size } + } +} + +impl Storage for ClientSideAggregatedStorage { + type Counter = Arc; + type Gauge = Arc; + type Histogram = Arc; + + fn counter(&self, _: &Key) -> Self::Counter { + Arc::new(AtomicCounter::new()) + } + + fn gauge(&self, _: &Key) -> Self::Gauge { + Arc::new(AtomicGauge::new()) + } + + fn histogram(&self, _: &Key) -> Self::Histogram { + Arc::new(AtomicHistogram::new(self.histogram_sampling, self.histogram_reservoir_size)) + } +} + +#[cfg(test)] +mod tests { + use metrics::{CounterFn as _, GaugeFn as _}; + + use super::{AtomicCounter, AtomicGauge}; + + #[test] + fn atomic_counter_increment() { + let counter = AtomicCounter::new(); + assert_eq!(counter.flush(), (0, 0)); + + counter.increment(42); + assert_eq!(counter.flush(), (42, 1)); + + let large_amount = u64::MAX - u64::from(u16::MAX); + counter.increment(large_amount); + assert_eq!(counter.flush(), (large_amount, 1)); + } + + #[test] + fn atomic_counter_absolute() { + let first_value = 42; + + let second_value_delta = 87; + let second_value = first_value + second_value_delta; + + let third_value_delta = 13; + let third_value = second_value + third_value_delta; + + let counter = AtomicCounter::new(); + assert_eq!(counter.flush(), (0, 0)); + + counter.absolute(first_value); + assert_eq!(counter.flush(), (0, 1)); + + counter.absolute(second_value); + assert_eq!(counter.flush(), (second_value_delta, 1)); + + counter.absolute(third_value); + assert_eq!(counter.flush(), (third_value_delta, 1)); + } + + #[test] + fn atomic_counter_absolute_multiple_updates_before_first_flush() { + let first_value = 42; + let second_value_delta = 66; + let second_value = first_value + second_value_delta; + + let counter = AtomicCounter::new(); + assert_eq!(counter.flush(), (0, 0)); + + counter.absolute(first_value); + counter.absolute(second_value); + assert_eq!(counter.flush(), (second_value_delta, 2)); + } + + #[test] + fn atomic_counter_incremental_to_absolute_reset() { + let counter = AtomicCounter::new(); + assert_eq!(counter.flush(), (0, 0)); + + counter.increment(27); + counter.absolute(42); + assert_eq!(counter.flush(), (0, 2)); + + counter.increment(13); + assert_eq!(counter.flush(), (13, 1)); + + counter.absolute(87); + assert_eq!(counter.flush(), (0, 1)); + + counter.increment(78); + assert_eq!(counter.flush(), (78, 1)); + } + + #[test] + fn atomic_gauge_increment() { + let gauge = AtomicGauge::new(); + assert_eq!(gauge.flush(), (0.0, 0)); + + gauge.increment(42.0); + assert_eq!(gauge.flush(), (42.0, 1)); + + gauge.increment(13.0); + assert_eq!(gauge.flush(), (55.0, 1)); + } + + #[test] + fn atomic_gauge_decrement() { + let gauge = AtomicGauge::new(); + assert_eq!(gauge.flush(), (0.0, 0)); + + gauge.decrement(42.0); + assert_eq!(gauge.flush(), (-42.0, 1)); + + gauge.decrement(13.0); + assert_eq!(gauge.flush(), (-55.0, 1)); + } + + #[test] + fn atomic_gauge_set() { + let gauge = AtomicGauge::new(); + assert_eq!(gauge.flush(), (0.0, 0)); + + gauge.set(42.0); + assert_eq!(gauge.flush(), (42.0, 1)); + + gauge.set(-13.0); + assert_eq!(gauge.flush(), (-13.0, 1)); + } +} diff --git a/metrics-exporter-dogstatsd/src/telemetry.rs b/metrics-exporter-dogstatsd/src/telemetry.rs new file mode 100644 index 00000000..140b41bb --- /dev/null +++ b/metrics-exporter-dogstatsd/src/telemetry.rs @@ -0,0 +1,211 @@ +use metrics::{counter, Counter}; + +/// Exporter telemetry. +/// +/// `Telemetry` collects information about the exporter's behavior and can be optionally enabled to send this +/// information as normal metrics to the same DogStatsD server. +pub struct Telemetry { + metric_points: Counter, + counter_points: Counter, + gauge_points: Counter, + histogram_points: Counter, + packets_sent: Counter, + packets_dropped: Counter, + packets_dropped_writer: Counter, + packets_dropped_serializer: Counter, + bytes_dropped: Counter, + bytes_sent: Counter, + bytes_dropped_writer: Counter, + agg_contexts: Counter, + agg_contexts_counter: Counter, + agg_contexts_gauge: Counter, + agg_contexts_histogram: Counter, +} + +impl Telemetry { + /// Creates a `Telemetry` instance. + pub fn new(transport: &'static str) -> Self { + let base_labels = telemetry_tags!("client_transport" => transport); + let counter_labels = + telemetry_tags!("client_transport" => transport, "metrics_type" => "count"); + let gauge_labels = + telemetry_tags!("client_transport" => transport, "metrics_type" => "gauge"); + let histogram_labels = + telemetry_tags!("client_transport" => transport, "metrics_type" => "histogram"); + + Self { + metric_points: counter!("datadog.dogstatsd.client.metrics", base_labels.iter()), + counter_points: counter!( + "datadog.dogstatsd.client.metrics_by_type", + counter_labels.iter() + ), + gauge_points: counter!("datadog.dogstatsd.client.metrics_by_type", gauge_labels.iter()), + histogram_points: counter!( + "datadog.dogstatsd.client.metrics_by_type", + histogram_labels.iter() + ), + packets_sent: counter!("datadog.dogstatsd.client.packets_sent", base_labels.iter()), + packets_dropped: counter!( + "datadog.dogstatsd.client.packets_dropped", + base_labels.iter() + ), + packets_dropped_writer: counter!( + "datadog.dogstatsd.client.packets_dropped_writer", + base_labels.iter() + ), + packets_dropped_serializer: counter!( + "datadog.dogstatsd.client.packets_dropped_serializer", + base_labels.iter() + ), + bytes_dropped: counter!("datadog.dogstatsd.client.bytes_dropped", base_labels.iter()), + bytes_sent: counter!("datadog.dogstatsd.client.bytes_sent", base_labels.iter()), + bytes_dropped_writer: counter!( + "datadog.dogstatsd.client.bytes_dropped_writer", + base_labels.iter() + ), + agg_contexts: counter!( + "datadog.dogstatsd.client.aggregated_context", + base_labels.iter() + ), + agg_contexts_counter: counter!( + "datadog.dogstatsd.client.aggregated_context_by_type", + counter_labels.iter() + ), + agg_contexts_gauge: counter!( + "datadog.dogstatsd.client.aggregated_context_by_type", + gauge_labels.iter() + ), + agg_contexts_histogram: counter!( + "datadog.dogstatsd.client.aggregated_context_by_type", + histogram_labels.iter() + ), + } + } + + /// Applies the given telemetry update, updating the internal metrics. + pub fn apply_update(&mut self, update: &TelemetryUpdate) { + let metric_points = update.counter_points + update.gauge_points + update.histogram_points; + let agg_contexts = + update.counter_contexts + update.gauge_contexts + update.histogram_contexts; + + self.metric_points.increment(metric_points); + self.counter_points.increment(update.counter_points); + self.gauge_points.increment(update.gauge_points); + self.histogram_points.increment(update.histogram_points); + self.packets_sent.increment(update.packets_sent); + self.packets_dropped.increment(update.packets_dropped); + self.packets_dropped_writer.increment(update.packets_dropped_writer); + self.packets_dropped_serializer.increment(update.packets_dropped_serializer); + self.bytes_dropped.increment(update.bytes_dropped); + self.bytes_sent.increment(update.bytes_sent); + self.bytes_dropped_writer.increment(update.bytes_dropped_writer); + self.agg_contexts.increment(agg_contexts); + self.agg_contexts_counter.increment(update.counter_contexts); + self.agg_contexts_gauge.increment(update.gauge_contexts); + self.agg_contexts_histogram.increment(update.histogram_contexts); + } +} + +/// A buffer for collecting telemetry updates. +#[derive(Default)] +pub struct TelemetryUpdate { + counter_contexts: u64, + gauge_contexts: u64, + histogram_contexts: u64, + counter_points: u64, + gauge_points: u64, + histogram_points: u64, + packets_sent: u64, + packets_dropped: u64, + packets_dropped_writer: u64, + packets_dropped_serializer: u64, + bytes_sent: u64, + bytes_dropped: u64, + bytes_dropped_writer: u64, +} + +impl TelemetryUpdate { + /// Clears the update buffer, resetting it back to an empty state. + pub fn clear(&mut self) { + self.counter_contexts = 0; + self.gauge_contexts = 0; + self.histogram_contexts = 0; + self.counter_points = 0; + self.gauge_points = 0; + self.histogram_points = 0; + self.packets_sent = 0; + self.packets_dropped = 0; + self.packets_dropped_writer = 0; + self.packets_dropped_serializer = 0; + self.bytes_sent = 0; + self.bytes_dropped = 0; + self.bytes_dropped_writer = 0; + } + + /// Returns `true` if any updates have been recorded. + pub fn had_updates(&self) -> bool { + self.counter_points > 0 || self.gauge_points > 0 || self.histogram_points > 0 + } + + /// Increments the number of counter contexts collected. + pub fn increment_counter_contexts(&mut self, value: usize) { + self.counter_contexts += value as u64; + } + + /// Increments the number of gauge contexts collected. + pub fn increment_gauge_contexts(&mut self, value: usize) { + self.gauge_contexts += value as u64; + } + + /// Increments the number of histogram contexts collected. + pub fn increment_histogram_contexts(&mut self, value: usize) { + self.histogram_contexts += value as u64; + } + + /// Increments the number of counter points collected. + pub fn increment_counter_points(&mut self, value: u64) { + self.counter_points += value; + } + + /// Increments the number of gauge points collected. + pub fn increment_gauge_points(&mut self, value: u64) { + self.gauge_points += value; + } + + /// Increments the number of histogram points collected. + pub fn increment_histogram_points(&mut self, value: u64) { + self.histogram_points += value; + } + + /// Tracks a successful packet send. + pub fn track_packet_send_succeeded(&mut self, bytes_len: usize) { + self.packets_sent += 1; + self.bytes_sent += bytes_len as u64; + } + + /// Tracks a failed packet send. + pub fn track_packet_send_failed(&mut self, bytes_len: usize) { + self.packets_dropped += 1; + self.packets_dropped_writer += 1; + self.bytes_dropped += bytes_len as u64; + self.bytes_dropped_writer += bytes_len as u64; + } + + /// Tracks a failed packet serialization. + pub fn track_packet_serializer_failed(&mut self) { + self.packets_dropped += 1; + self.packets_dropped_serializer += 1; + } +} + +macro_rules! _telemetry_tags { + ($($k:literal => $v:expr),*) => { + [ + ::metrics::Label::from_static_parts("client", "rust"), + ::metrics::Label::from_static_parts("client_version", env!("CARGO_PKG_VERSION")), + $(::metrics::Label::from_static_parts($k, $v),)* + ] + }; +} + +pub(crate) use _telemetry_tags as telemetry_tags; diff --git a/metrics-exporter-dogstatsd/src/writer.rs b/metrics-exporter-dogstatsd/src/writer.rs new file mode 100644 index 00000000..74733d95 --- /dev/null +++ b/metrics-exporter-dogstatsd/src/writer.rs @@ -0,0 +1,682 @@ +use metrics::Key; + +pub struct WriteResult { + payloads_written: u64, + points_dropped: u64, +} + +impl WriteResult { + const fn success(payloads_written: u64) -> Self { + Self { payloads_written, points_dropped: 0 } + } + + const fn failure(points_dropped: u64) -> Self { + Self { payloads_written: 0, points_dropped } + } + + const fn new() -> Self { + Self { payloads_written: 0, points_dropped: 0 } + } + + fn increment_payloads_written(&mut self) { + self.payloads_written += 1; + } + + fn increment_points_dropped(&mut self) { + self.points_dropped += 1; + } + + pub const fn any_failures(&self) -> bool { + self.points_dropped != 0 + } + + pub const fn payloads_written(&self) -> u64 { + self.payloads_written + } + + pub const fn points_dropped(&self) -> u64 { + self.points_dropped + } +} + +pub(super) struct PayloadWriter { + max_payload_len: usize, + buf: Vec, + trailer_buf: Vec, + offsets: Vec, + with_length_prefix: bool, +} + +impl PayloadWriter { + /// Creates a new `PayloadWriter` with the given maximum payload length. + pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self { + // NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a + // properly sanitized value. + assert!( + u32::try_from(max_payload_len).is_ok(), + "maximum payload length must be less than 2^32 bytes" + ); + + let mut writer = Self { + max_payload_len, + buf: Vec::new(), + trailer_buf: Vec::new(), + offsets: Vec::new(), + with_length_prefix, + }; + + writer.prepare_for_write(); + writer + } + + fn last_offset(&self) -> usize { + self.offsets.last().copied().unwrap_or(0) + } + + fn current_len(&self) -> usize { + // Figure out the last metric's offset, which we use to calculate the current uncommitted length. + // + // If there aren't any committed metrics, then the last offset is simply zero. + let last_offset = self.last_offset(); + let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 }; + self.buf.len() - last_offset - maybe_length_prefix_len + } + + fn prepare_for_write(&mut self) { + if self.with_length_prefix { + // If we're adding length prefixes, we need to write the length of the payload first. + // + // We write a dummy length of zero for now, and then we'll go back and fill it in later. + self.buf.extend_from_slice(&[0, 0, 0, 0]); + } + } + + fn commit(&mut self) -> bool { + let current_last_offset = self.last_offset(); + let current_len = self.current_len(); + if current_len > self.max_payload_len { + // If the current metric is too long, we need to truncate everything we just wrote to get us back to the end + // of the last metric, since the previous parts of the buffer are still valid and could be flushed. + self.buf.truncate(self.last_offset()); + + return false; + } + + // Track the new offset. + self.offsets.push(self.buf.len()); + + // If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the + // length of it. + if self.with_length_prefix { + // NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we + // check above that `current_len` is less than or equal to `self.max_payload_len`. + let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes(); + self.buf[current_last_offset..current_last_offset + 4] + .copy_from_slice(¤t_len_buf[..]); + } + + // Initialize the buffer for the next payload. + self.prepare_for_write(); + + true + } + + fn write_trailing(&mut self, key: &Key, timestamp: Option) { + write_metric_trailer(key, timestamp, &mut self.buf, None); + } + + /// Writes a counter payload. + pub fn write_counter(&mut self, key: &Key, value: u64, timestamp: Option) -> WriteResult { + let mut int_writer = itoa::Buffer::new(); + let value_str = int_writer.format(value); + + self.buf.extend_from_slice(key.name().as_bytes()); + self.buf.push(b':'); + self.buf.extend_from_slice(value_str.as_bytes()); + self.buf.extend_from_slice(b"|c"); + + self.write_trailing(key, timestamp); + + if self.commit() { + WriteResult::success(1) + } else { + WriteResult::failure(1) + } + } + + /// Writes a gauge payload. + pub fn write_gauge(&mut self, key: &Key, value: f64, timestamp: Option) -> WriteResult { + let mut float_writer = ryu::Buffer::new(); + let value_str = float_writer.format(value); + + self.buf.extend_from_slice(key.name().as_bytes()); + self.buf.push(b':'); + self.buf.extend_from_slice(value_str.as_bytes()); + self.buf.extend_from_slice(b"|g"); + + self.write_trailing(key, timestamp); + + if self.commit() { + WriteResult::success(1) + } else { + WriteResult::failure(1) + } + } + + /// Writes a histogram payload. + pub fn write_histogram( + &mut self, + key: &Key, + values: I, + maybe_sample_rate: Option, + ) -> WriteResult + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + self.write_hist_dist_inner(key, values, b'h', maybe_sample_rate) + } + + /// Writes a distribution payload. + pub fn write_distribution( + &mut self, + key: &Key, + values: I, + maybe_sample_rate: Option, + ) -> WriteResult + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + self.write_hist_dist_inner(key, values, b'd', maybe_sample_rate) + } + + fn write_hist_dist_inner( + &mut self, + key: &Key, + values: I, + metric_type: u8, + maybe_sample_rate: Option, + ) -> WriteResult + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + let mut float_writer = ryu::Buffer::new(); + let mut result = WriteResult::new(); + let values = values.into_iter(); + + // Pre-render our metric trailer, which includes the timestamp and tags. + // + // We do this for efficiency reasons, but also to calculate the minimum payload length. + self.trailer_buf.clear(); + write_metric_trailer(key, None, &mut self.trailer_buf, maybe_sample_rate); + + // Calculate the minimum payload length, which is the key name, the metric trailer, and the metric type + // substring (`|`). This is the minimum amount of space we need to write out the metric without + // including the value itself. + // + // If the minimum payload length exceeds the maximum payload length, we can't write the metric at all, so we + // return an error. + let minimum_payload_len = key.name().len() + self.trailer_buf.len() + 2; + if minimum_payload_len + 2 > self.max_payload_len { + // The extra two we add above simulates the smallest possible value string, which would be `:0`. + return WriteResult::failure(values.len() as u64); + } + + // Iterate over each value, writing it out to the buffer in a streaming fashion. + // + // We track a shadow "current length" because we want to make sure we don't write an additional value if it + // would cause the payload to exceed the maximum payload length... but since we have to write values in the + // middle of the payload, rather than just at the end... we can't use `current_len()` since we haven't yet + // written tags, the timestamp, etc. + let mut needs_name = true; + let mut current_len = minimum_payload_len; + for value in values { + let value_str = float_writer.format(value); + + // Skip the value if it's not even possible to fit it by itself. + if minimum_payload_len + value_str.len() + 1 > self.max_payload_len { + result.increment_points_dropped(); + continue; + } + + // Figure out if we can write the value to the current metric payload. + // + // If we can't fit it into the current buffer, then we have to first commit our current buffer. + if current_len + value_str.len() + 1 > self.max_payload_len { + // Write the metric type and then the trailer. + self.buf.push(b'|'); + self.buf.push(metric_type); + self.buf.extend_from_slice(&self.trailer_buf); + + assert!(self.commit(), "should not fail to commit histogram metric at this stage"); + + result.increment_payloads_written(); + + // Reset the current length to the minimum payload length, since we're starting a new metric. + needs_name = true; + current_len = minimum_payload_len; + } + + // Write the metric name if it hasn't been written yet. + if needs_name { + self.buf.extend_from_slice(key.name().as_bytes()); + needs_name = false; + } + + // Write the value. + self.buf.push(b':'); + self.buf.extend_from_slice(value_str.as_bytes()); + + // Track the length of the value we just wrote. + current_len += value_str.len() + 1; + } + + // If we have any remaining uncommitted values, finalize them and commit. + if self.current_len() != 0 { + self.buf.push(b'|'); + self.buf.push(metric_type); + self.buf.extend_from_slice(&self.trailer_buf); + + assert!(self.commit(), "should not fail to commit histogram metric at this stage"); + + result.increment_payloads_written(); + } + + result + } + + /// Returns a consuming iterator over all payloads written by this writer. + /// + /// The iterator will yield payloads in the order they were written, and the payloads will be cleared from the + /// writer when the iterator is dropped. + pub fn payloads(&mut self) -> Payloads<'_> { + Payloads { buf: &mut self.buf, start: 0, offsets: self.offsets.drain(..) } + } +} + +/// Iterator over all payloads written by a `PayloadWriter`. +pub struct Payloads<'a> { + buf: &'a mut Vec, + start: usize, + offsets: std::vec::Drain<'a, usize>, +} + +impl<'a> Payloads<'a> { + /// Returns the number of remaining payloads. + pub fn len(&self) -> usize { + self.offsets.len() + } + + /// Returns the next payload. + /// + /// If there are no more payloads, `None` is returned. + pub fn next_payload(&mut self) -> Option<&[u8]> { + let offset = self.offsets.next()?; + + let offset_buf = &self.buf[self.start..offset]; + self.start = offset; + + Some(offset_buf) + } +} + +impl<'a> Drop for Payloads<'a> { + fn drop(&mut self) { + self.buf.clear(); + } +} + +fn write_metric_trailer( + key: &Key, + maybe_timestamp: Option, + buf: &mut Vec, + maybe_sample_rate: Option, +) { + // Write the sample rate if it's not 1.0, as that is the implied default. + if let Some(sample_rate) = maybe_sample_rate { + let mut float_writer = ryu::Buffer::new(); + let sample_rate_str = float_writer.format(sample_rate); + + buf.extend_from_slice(b"|@"); + buf.extend_from_slice(sample_rate_str.as_bytes()); + } + + // Write the metric tags first. + let tags = key.labels(); + let mut wrote_tag = false; + for tag in tags { + // If we haven't written a tag yet, write out the tags prefix first. + // + // Otherwise, write a tag separator. + if wrote_tag { + buf.push(b','); + } else { + buf.extend_from_slice(b"|#"); + wrote_tag = true; + } + + // Write the tag. + // + // If the tag value is empty, we treat it as a bare tag, which means we only write something like `tag_name` + // instead of `tag_name:`. + buf.extend_from_slice(tag.key().as_bytes()); + if tag.value().is_empty() { + continue; + } + + buf.push(b':'); + buf.extend_from_slice(tag.value().as_bytes()); + } + + // Write the timestamp if present. + if let Some(timestamp) = maybe_timestamp { + let mut int_writer = itoa::Buffer::new(); + let ts_str = int_writer.format(timestamp); + + buf.extend_from_slice(b"|T"); + buf.extend_from_slice(ts_str.as_bytes()); + } + + // Finally, add the newline. + buf.push(b'\n'); +} + +#[cfg(test)] +mod tests { + use metrics::{Key, Label}; + use proptest::{ + collection::vec as arb_vec, + prelude::{any, Strategy}, + prop_oneof, proptest, + }; + + use super::PayloadWriter; + + #[derive(Debug)] + enum InputMetric { + Counter(Key, u64, Option), + Gauge(Key, f64, Option), + Histogram(Key, Vec), + } + + fn arb_label() -> impl Strategy { + let key_regex = "[a-z]{4,12}"; + let value_regex = "[a-z0-9]{8,16}"; + + let bare_tag = key_regex.prop_map(|k| Label::new(k, "")); + let kv_tag = (key_regex, value_regex).prop_map(|(k, v)| Label::new(k, v)); + + prop_oneof![bare_tag, kv_tag,] + } + + fn arb_key() -> impl Strategy { + let name_regex = "[a-zA-Z0-9]{8,32}"; + (name_regex, arb_vec(arb_label(), 0..4)) + .prop_map(|(name, labels)| Key::from_parts(name, labels)) + } + + fn arb_metric() -> impl Strategy { + let counter = (arb_key(), any::(), any::>()) + .prop_map(|(k, v, ts)| InputMetric::Counter(k, v, ts)); + let gauge = (arb_key(), any::(), any::>()) + .prop_map(|(k, v, ts)| InputMetric::Gauge(k, v, ts)); + let histogram = (arb_key(), arb_vec(any::(), 1..64)) + .prop_map(|(k, v)| InputMetric::Histogram(k, v)); + + prop_oneof![counter, gauge, histogram,] + } + + fn string_from_writer(writer: &mut PayloadWriter) -> String { + let buf = buf_from_writer(writer); + + // SAFETY: It's a test. + unsafe { String::from_utf8_unchecked(buf) } + } + + fn buf_from_writer(writer: &mut PayloadWriter) -> Vec { + let mut payloads = writer.payloads(); + let mut buf = Vec::new(); + while let Some(payload) = payloads.next_payload() { + buf.extend_from_slice(payload); + } + + buf + } + + #[test] + fn counter() { + // Cases are defined as: metric key, metric value, metric timestamp, expected output. + let cases = [ + (Key::from("test_counter"), 91919, None, "test_counter:91919|c\n"), + (Key::from("test_counter"), 666, Some(345678), "test_counter:666|c|T345678\n"), + ( + Key::from_parts("test_counter", &[("bug", "boop")]), + 12345, + None, + "test_counter:12345|c|#bug:boop\n", + ), + ( + Key::from_parts("test_counter", &[("foo", "bar"), ("baz", "quux")]), + 777, + Some(234567), + "test_counter:777|c|#foo:bar,baz:quux|T234567\n", + ), + ]; + + for (key, value, ts, expected) in cases { + let mut writer = PayloadWriter::new(8192, false); + let result = writer.write_counter(&key, value, ts); + assert_eq!(result.payloads_written(), 1); + + let actual = string_from_writer(&mut writer); + assert_eq!(actual, expected); + } + } + + #[test] + fn gauge() { + // Cases are defined as: metric key, metric value, metric timestamp, expected output. + let cases = [ + (Key::from("test_gauge"), 42.0, None, "test_gauge:42.0|g\n"), + (Key::from("test_gauge"), 1967.0, Some(345678), "test_gauge:1967.0|g|T345678\n"), + ( + Key::from_parts("test_gauge", &[("foo", "bar"), ("baz", "quux")]), + 3.13232, + None, + "test_gauge:3.13232|g|#foo:bar,baz:quux\n", + ), + ( + Key::from_parts("test_gauge", &[("foo", "bar"), ("baz", "quux")]), + 3.13232, + Some(234567), + "test_gauge:3.13232|g|#foo:bar,baz:quux|T234567\n", + ), + ]; + + for (key, value, ts, expected) in cases { + let mut writer = PayloadWriter::new(8192, false); + let result = writer.write_gauge(&key, value, ts); + assert_eq!(result.payloads_written(), 1); + + let actual = string_from_writer(&mut writer); + assert_eq!(actual, expected); + } + } + + #[test] + fn histogram() { + // Cases are defined as: metric key, metric values, metric timestamp, expected output. + let cases = [ + (Key::from("test_histogram"), &[22.22][..], "test_histogram:22.22|h\n"), + ( + Key::from_parts("test_histogram", &[("foo", "bar"), ("baz", "quux")]), + &[88.0][..], + "test_histogram:88.0|h|#foo:bar,baz:quux\n", + ), + ( + Key::from("test_histogram"), + &[22.22, 33.33, 44.44][..], + "test_histogram:22.22:33.33:44.44|h\n", + ), + ( + Key::from_parts("test_histogram", &[("foo", "bar"), ("baz", "quux")]), + &[88.0, 66.6, 123.4][..], + "test_histogram:88.0:66.6:123.4|h|#foo:bar,baz:quux\n", + ), + ]; + + for (key, values, expected) in cases { + let mut writer = PayloadWriter::new(8192, false); + let result = writer.write_histogram(&key, values.iter().copied(), None); + assert_eq!(result.payloads_written(), 1); + + let actual = string_from_writer(&mut writer); + assert_eq!(actual, expected); + } + } + + #[test] + fn distribution() { + // Cases are defined as: metric key, metric values, metric timestamp, expected output. + let cases = [ + (Key::from("test_distribution"), &[22.22][..], "test_distribution:22.22|d\n"), + ( + Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), + &[88.0][..], + "test_distribution:88.0|d|#foo:bar,baz:quux\n", + ), + ( + Key::from("test_distribution"), + &[22.22, 33.33, 44.44][..], + "test_distribution:22.22:33.33:44.44|d\n", + ), + ( + Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), + &[88.0, 66.6, 123.4][..], + "test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n", + ), + ]; + + for (key, values, expected) in cases { + let mut writer = PayloadWriter::new(8192, false); + let result = writer.write_distribution(&key, values.iter().copied(), None); + assert_eq!(result.payloads_written(), 1); + + let actual = string_from_writer(&mut writer); + assert_eq!(actual, expected); + } + } + + #[test] + fn length_prefix() { + let prefixed = |buf: &str| { + let mut prefixed_buf = Vec::with_capacity(buf.len() + 4); + prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes()); + prefixed_buf.extend_from_slice(buf.as_bytes()); + prefixed_buf + }; + + // Cases are defined as: metric key, metric values, metric timestamp, expected output. + let cases = [ + (Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")), + ( + Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), + &[88.0][..], + prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"), + ), + ( + Key::from("test_distribution"), + &[22.22, 33.33, 44.44][..], + prefixed("test_distribution:22.22:33.33:44.44|d\n"), + ), + ( + Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]), + &[88.0, 66.6, 123.4][..], + prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"), + ), + ]; + + for (key, values, expected) in cases { + let mut writer = PayloadWriter::new(8192, true); + let result = writer.write_distribution(&key, values.iter().copied(), None); + assert_eq!(result.payloads_written(), 1); + + let actual = buf_from_writer(&mut writer); + assert_eq!(actual, expected); + } + } + + proptest! { + #[test] + fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) { + // TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[] + + let mut writer = PayloadWriter::new(payload_limit, false); + let mut total_input_points: u64 = 0; + let mut payloads_written = 0; + let mut points_dropped = 0; + + for input in inputs { + match input { + InputMetric::Counter(key, value, ts) => { + total_input_points += 1; + + let result = writer.write_counter(&key, value, ts); + payloads_written += result.payloads_written(); + points_dropped += result.points_dropped(); + }, + InputMetric::Gauge(key, value, ts) => { + total_input_points += 1; + + let result = writer.write_gauge(&key, value, ts); + payloads_written += result.payloads_written(); + points_dropped += result.points_dropped(); + }, + InputMetric::Histogram(key, values) => { + total_input_points += values.len() as u64; + + let result = writer.write_histogram(&key, values, None); + payloads_written += result.payloads_written(); + points_dropped += result.points_dropped(); + }, + } + } + + let mut payloads = writer.payloads(); + let mut payloads_emitted = 0; + let mut points_emitted: u64 = 0; + while let Some(payload) = payloads.next_payload() { + assert!(payload.len() <= payload_limit); + + // Payloads from the writer are meant to be full, sendable chunks that contain only valid metrics. From + // our perspective, payloads are successfully-written individual metrics, so we take the writer payload, + // and split it into individual lines, which gives us metric payloads. + let payload_lines = std::str::from_utf8(payload).unwrap().lines(); + + // For each payload line, we increment the number of payloads emitted and we also extract the number of + // points contained in the metric payload. + for payload_line in payload_lines { + payloads_emitted += 1; + + // We don't care about the actual values in the payload, just the number of them. + // + // Split the name/points by taking everything in front of the first pipe character, and then split + // by colon, and remove the first element which is the metric name. + let num_points = payload_line.split('|') + .next().unwrap() + .split(':') + .skip(1) + .count(); + assert!(num_points > 0); + + points_emitted += num_points as u64; + } + } + + assert_eq!(payloads_written, payloads_emitted); + assert_eq!(total_input_points, points_dropped + points_emitted); + } + } +} diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index 434e336f..11c80cae 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -46,7 +46,7 @@ metrics = { version = "^0.24", path = "../metrics" } metrics-util = { version = "^0.18", path = "../metrics-util", default-features = false, features = [ "recency", "registry", - "summary", + "storage", ] } quanta = { workspace = true } thiserror = { workspace = true } diff --git a/metrics-exporter-prometheus/src/common.rs b/metrics-exporter-prometheus/src/common.rs index 51f9bad9..51e70527 100644 --- a/metrics-exporter-prometheus/src/common.rs +++ b/metrics-exporter-prometheus/src/common.rs @@ -75,7 +75,7 @@ pub enum BuildError { #[error("bucket bounds/quantiles cannot be empty")] EmptyBucketsOrQuantiles, - /// Bucket duration cannot be zero + /// Bucket duration cannot be zero. #[error("bucket durations cannot be set to zero")] ZeroBucketDuration, } diff --git a/metrics-exporter-prometheus/src/distribution.rs b/metrics-exporter-prometheus/src/distribution.rs index cf997201..2b1fd0f7 100644 --- a/metrics-exporter-prometheus/src/distribution.rs +++ b/metrics-exporter-prometheus/src/distribution.rs @@ -6,7 +6,10 @@ use quanta::Instant; use crate::common::Matcher; -use metrics_util::{Histogram, Quantile, Summary}; +use metrics_util::{ + storage::{Histogram, Summary}, + Quantile, +}; const DEFAULT_SUMMARY_BUCKET_COUNT: NonZeroU32 = match NonZeroU32::new(3) { Some(v) => v, diff --git a/metrics-exporter-prometheus/src/registry.rs b/metrics-exporter-prometheus/src/registry.rs index ea5b470a..14a5d7eb 100644 --- a/metrics-exporter-prometheus/src/registry.rs +++ b/metrics-exporter-prometheus/src/registry.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use metrics::{atomics::AtomicU64, HistogramFn}; -use metrics_util::{registry::GenerationalStorage, AtomicBucket}; +use metrics_util::{registry::GenerationalStorage, storage::AtomicBucket}; use quanta::Instant; pub type GenerationalAtomicStorage = GenerationalStorage; diff --git a/metrics-exporter-tcp/Cargo.toml b/metrics-exporter-tcp/Cargo.toml index 7ced0917..c5377980 100644 --- a/metrics-exporter-tcp/Cargo.toml +++ b/metrics-exporter-tcp/Cargo.toml @@ -26,7 +26,6 @@ prost-types = { workspace = true, features = ["std"] } tracing = { workspace = true, features = ["attributes"] } [build-dependencies] -home = { workspace = true } prost-build = { workspace = true } [dev-dependencies] diff --git a/metrics-observer/Cargo.toml b/metrics-observer/Cargo.toml index 429d0c7f..0fee634d 100644 --- a/metrics-observer/Cargo.toml +++ b/metrics-observer/Cargo.toml @@ -21,11 +21,10 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["clock"] } crossbeam-channel = { workspace = true, features = ["std"] } metrics = { version = "^0.24", path = "../metrics", default-features = false } -metrics-util = { version = "^0.18", path = "../metrics-util", default-features = false, features = ["summary"] } +metrics-util = { version = "^0.18", path = "../metrics-util", default-features = false, features = ["storage"] } prost = { workspace = true } prost-types = { workspace = true } ratatui = { workspace = true, features = ["crossterm"] } [build-dependencies] -home = { workspace = true } prost-build = { workspace = true } diff --git a/metrics-observer/src/metrics.rs b/metrics-observer/src/metrics.rs index 59d78032..fa63401f 100644 --- a/metrics-observer/src/metrics.rs +++ b/metrics-observer/src/metrics.rs @@ -13,7 +13,7 @@ use bytes::{BufMut, BytesMut}; use prost::Message; use metrics::{Key, Label, Unit}; -use metrics_util::{CompositeKey, MetricKind, Summary}; +use metrics_util::{storage::Summary, CompositeKey, MetricKind}; mod proto { include!(concat!(env!("OUT_DIR"), "/event.proto.rs")); diff --git a/metrics-util/CHANGELOG.md b/metrics-util/CHANGELOG.md index 33b1fc5c..054e02f0 100644 --- a/metrics-util/CHANGELOG.md +++ b/metrics-util/CHANGELOG.md @@ -9,10 +9,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Added + +- Added new module `storage` -- behind a feature flag of the same name -- which contains all storage implementations + such as `Summary`, `Histogram`, `AtomicBucket`, etc. +- Added new histogram storage type, `AtomicSamplingReservoir`, which provides a memory-bounded histogram storage + container utilizing reservoir sampling for statistically accurate sampling. + ### Changed - `FanoutBuilder` and `RouterBuilder` now both require recorders to be `Sync` to facilitate usage with being installed as the global recorder. +- Moved `AtomicBucket`, `Histogram`, and `Summary` to new `storage` module. + +### Removed + +- Removed `handles` feature which has been subsumed by the new `storage` feature flag. ## [0.18.0] - 2024-10-12 diff --git a/metrics-util/Cargo.toml b/metrics-util/Cargo.toml index a4159b57..1a95f6cc 100644 --- a/metrics-util/Cargo.toml +++ b/metrics-util/Cargo.toml @@ -44,7 +44,7 @@ required-features = ["layer-router"] [[example]] name = "bucket-crusher" -required-features = ["handles"] +required-features = ["storage"] [dependencies] ahash = { workspace = true, optional = true } @@ -56,6 +56,8 @@ indexmap = { workspace = true, optional = true } metrics = { version = "^0.24", path = "../metrics" } ordered-float = { workspace = true, optional = true } quanta = { workspace = true, optional = true } +rand = { workspace = true, optional = true } +rand_xoshiro = { version = "0.6", default-features = false, optional = true } radix_trie = { workspace = true, optional = true } sketches-ddsketch = { workspace = true, optional = true } @@ -82,12 +84,11 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["fmt", "ansi"] } [features] -handles = ["crossbeam-epoch", "crossbeam-utils"] +default = ["debugging", "layers", "recency", "registry", "storage"] debugging = ["indexmap", "ordered-float", "registry"] -default = ["debugging", "handles", "layers", "summary", "recency", "registry"] layers = ["layer-filter", "layer-router"] layer-filter = ["aho-corasick"] layer-router = ["radix_trie"] -summary = ["sketches-ddsketch"] recency = ["registry", "quanta"] -registry = ["crossbeam-epoch", "crossbeam-utils", "handles", "hashbrown"] +registry = ["hashbrown", "storage"] +storage = ["crossbeam-epoch", "crossbeam-utils", "rand", "rand_xoshiro", "sketches-ddsketch"] diff --git a/metrics-util/examples/bucket-crusher.rs b/metrics-util/examples/bucket-crusher.rs index 0b0cf018..01ef44a8 100644 --- a/metrics-util/examples/bucket-crusher.rs +++ b/metrics-util/examples/bucket-crusher.rs @@ -7,7 +7,7 @@ use std::thread; use std::time::{Duration, Instant}; use getopts::Options; -use metrics_util::AtomicBucket; +use metrics_util::storage::AtomicBucket; use rand::{thread_rng, Rng}; use tracing::{debug, error, info}; diff --git a/metrics-util/src/lib.rs b/metrics-util/src/lib.rs index 4b0ac324..dffa901b 100644 --- a/metrics-util/src/lib.rs +++ b/metrics-util/src/lib.rs @@ -2,26 +2,21 @@ #![deny(missing_docs)] #![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))] -#[cfg(feature = "handles")] -mod bucket; -#[cfg(feature = "handles")] -#[cfg_attr(docsrs, doc(cfg(feature = "handles")))] -pub use bucket::AtomicBucket; - #[cfg(feature = "debugging")] #[cfg_attr(docsrs, doc(cfg(feature = "debugging")))] pub mod debugging; -#[cfg(feature = "handles")] -mod handles; - mod quantile; -pub use quantile::{parse_quantiles, Quantile}; +pub use self::quantile::{parse_quantiles, Quantile}; #[cfg(feature = "registry")] #[cfg_attr(docsrs, doc(cfg(feature = "registry")))] pub mod registry; +#[cfg(feature = "storage")] +#[cfg_attr(docsrs, doc(cfg(feature = "storage")))] +pub mod storage; + mod common; pub use common::*; @@ -31,18 +26,9 @@ pub use key::CompositeKey; mod kind; pub use kind::{MetricKind, MetricKindMask}; -mod histogram; -pub use histogram::Histogram; - mod recoverable; pub use recoverable::RecoverableRecorder; -#[cfg(feature = "summary")] -mod summary; -#[cfg(feature = "summary")] -#[cfg_attr(docsrs, doc(cfg(feature = "summary")))] -pub use summary::Summary; - pub mod layers; #[cfg(test)] diff --git a/metrics-util/src/registry/storage.rs b/metrics-util/src/registry/storage.rs index 0e6ca0a5..e1eb138c 100644 --- a/metrics-util/src/registry/storage.rs +++ b/metrics-util/src/registry/storage.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn}; -use crate::AtomicBucket; +use crate::storage::AtomicBucket; /// Defines the underlying storage for metrics as well as how to create them. pub trait Storage { diff --git a/metrics-util/src/bucket.rs b/metrics-util/src/storage/bucket.rs similarity index 100% rename from metrics-util/src/bucket.rs rename to metrics-util/src/storage/bucket.rs diff --git a/metrics-util/src/histogram.rs b/metrics-util/src/storage/histogram.rs similarity index 100% rename from metrics-util/src/histogram.rs rename to metrics-util/src/storage/histogram.rs diff --git a/metrics-util/src/storage/mod.rs b/metrics-util/src/storage/mod.rs new file mode 100644 index 00000000..4e5e27e6 --- /dev/null +++ b/metrics-util/src/storage/mod.rs @@ -0,0 +1,26 @@ +//! Various data structures for storing metric data. + +mod bucket; +use metrics::HistogramFn; + +pub use self::bucket::AtomicBucket; + +mod histogram; +pub use self::histogram::Histogram; + +pub mod reservoir; + +mod summary; +pub use self::summary::Summary; + +impl HistogramFn for AtomicBucket { + fn record(&self, value: f64) { + self.push(value); + } +} + +impl HistogramFn for self::reservoir::AtomicSamplingReservoir { + fn record(&self, value: f64) { + self.push(value); + } +} diff --git a/metrics-util/src/storage/reservoir.rs b/metrics-util/src/storage/reservoir.rs new file mode 100644 index 00000000..d1fc68d8 --- /dev/null +++ b/metrics-util/src/storage/reservoir.rs @@ -0,0 +1,187 @@ +//! An atomic sampling reservoir. + +use std::{ + cell::UnsafeCell, + sync::{ + atomic::{ + AtomicBool, AtomicU64, AtomicUsize, + Ordering::{Acquire, Relaxed, Release}, + }, + Mutex, + }, +}; + +use rand::{rngs::OsRng, Rng as _, SeedableRng as _}; +use rand_xoshiro::Xoshiro256StarStar; + +thread_local! { + static FAST_RNG: UnsafeCell = { + UnsafeCell::new(Xoshiro256StarStar::from_rng(OsRng).unwrap()) + }; +} + +fn fastrand(upper: usize) -> usize { + FAST_RNG.with(|rng| { + // SAFETY: We know it's safe to take a mutable reference since we're getting a pointer to a thread-local value, + // and the reference never outlives the closure executing on this thread. + let rng = unsafe { &mut *rng.get() }; + rng.gen_range(0..upper) + }) +} + +struct Reservoir { + values: Box<[AtomicU64]>, + count: AtomicUsize, +} + +impl Reservoir { + fn with_capacity(capacity: usize) -> Self { + let mut values = Vec::with_capacity(capacity); + for _ in 0..capacity { + values.push(AtomicU64::new(0)); + } + + Self { values: values.into_boxed_slice(), count: AtomicUsize::new(0) } + } + + fn push(&self, value: f64) { + let idx = self.count.fetch_add(1, Relaxed); + if idx < self.values.len() { + self.values[idx].store(value.to_bits(), Relaxed); + } else { + let maybe_idx = fastrand(idx); + if maybe_idx < self.values.len() { + self.values[maybe_idx].store(value.to_bits(), Relaxed); + } + } + } + + fn drain(&self) -> Drain<'_> { + let unsampled_len = self.count.load(Relaxed); + let len = if unsampled_len > self.values.len() { self.values.len() } else { unsampled_len }; + Drain { reservoir: self, unsampled_len, len, idx: 0 } + } +} + +/// A draining iterator over the samples in a reservoir. +pub struct Drain<'a> { + reservoir: &'a Reservoir, + unsampled_len: usize, + len: usize, + idx: usize, +} + +impl<'a> Drain<'a> { + /// Returns the sample rate of the reservoir that produced this iterator. + /// + /// The sample rate is the ratio of the number of samples pushed into the reservoir to the number of samples held in + /// the reservoir. When the reservoir has not been filled, the sample rate is 1.0. When more samples have been + /// pushed into the reservoir than its overall capacity, the sample rate is `size / count`, where `size` is the + /// reservoir's capacity and `count` is the number of samples pushed into the reservoir. + /// + /// For example, if the reservoir holds 1,000 samples, and 100,000 values were pushed into the reservoir, the sample + /// rate would be 0.01 (100,000 / 1,000). + pub fn sample_rate(&self) -> f64 { + if self.unsampled_len == self.len { + 1.0 + } else { + self.len as f64 / self.unsampled_len as f64 + } + } +} + +impl<'a> Iterator for Drain<'a> { + type Item = f64; + + fn next(&mut self) -> Option { + if self.idx < self.len { + let value = f64::from_bits(self.reservoir.values[self.idx].load(Relaxed)); + self.idx += 1; + Some(value) + } else { + None + } + } +} + +impl ExactSizeIterator for Drain<'_> { + fn len(&self) -> usize { + self.len - self.idx + } +} + +impl<'a> Drop for Drain<'a> { + fn drop(&mut self) { + self.reservoir.count.store(0, Release); + } +} + +/// An atomic sampling reservoir. +/// +/// [Reservoir sampling][rs] is a technique used to produce a statistically representative sample of a data stream, in a +/// fixed space, without knowing the length of the stream in advance. `AtomicSamplingReservoir` is a thread-safe version of a +/// sampling reservoir, based on Vitter's ["Algorithm R"][vitter_paper]. +/// +/// Utilizes an A/B-based storage mechanism to avoid contention between producers and the consumer, and a fast, +/// thread-local PRNG ([Xoshiro256**][xoshiro256starstar]) to limit the per-call sampling overhead. +/// +/// [rs]: https://en.wikipedia.org/wiki/Reservoir_sampling +/// [vitter_paper]: https://www.cs.umd.edu/~samir/498/vitter.pdf +/// [xoshiro256starstar]: https://prng.di.unimi.it +pub struct AtomicSamplingReservoir { + primary: Reservoir, + secondary: Reservoir, + use_primary: AtomicBool, + swap: Mutex<()>, +} + +impl AtomicSamplingReservoir { + /// Creates a new `AtomicSamplingReservoir` that stores up to `size` samples. + pub fn new(size: usize) -> Self { + Self { + primary: Reservoir::with_capacity(size), + secondary: Reservoir::with_capacity(size), + use_primary: AtomicBool::new(true), + swap: Mutex::new(()), + } + } + + /// Returns `true` if the reservoir is empty. + pub fn is_empty(&self) -> bool { + let use_primary = self.use_primary.load(Acquire); + if use_primary { + self.primary.count.load(Relaxed) == 0 + } else { + self.secondary.count.load(Relaxed) == 0 + } + } + + /// Pushes a sample into the reservoir. + pub fn push(&self, value: f64) { + let use_primary = self.use_primary.load(Relaxed); + if use_primary { + self.primary.push(value); + } else { + self.secondary.push(value); + }; + } + + /// Consumes all samples in the reservoir, passing them to the provided closure. + /// + /// The underlying storage is swapped before the closure is called, and the previous storage is consumed. + pub fn consume(&self, mut f: F) + where + F: FnMut(Drain<'_>), + { + let _guard = self.swap.lock().unwrap(); + + // Swap the active reservoir. + let use_primary = self.use_primary.load(Acquire); + self.use_primary.store(!use_primary, Release); + + // Consume the previous reservoir. + let drain = if use_primary { self.primary.drain() } else { self.secondary.drain() }; + + f(drain); + } +} diff --git a/metrics-util/src/summary.rs b/metrics-util/src/storage/summary.rs similarity index 99% rename from metrics-util/src/summary.rs rename to metrics-util/src/storage/summary.rs index 4e65e104..3056741b 100644 --- a/metrics-util/src/summary.rs +++ b/metrics-util/src/storage/summary.rs @@ -8,7 +8,7 @@ use std::fmt; /// provides relative-error guarantees, regardless of the absolute range between the smallest and /// larger values. /// -/// `Summary` is similiar to [HDRHistogram][hdrhistogram] in practice, but supports an arbitrary +/// `Summary` is similar to [HDRHistogram][hdrhistogram] in practice, but supports an arbitrary /// range of values, and supports floating-point numbers. /// /// Numbers with an absolute value smaller than given `min_value` will be recognized as zeroes. diff --git a/metrics/src/key.rs b/metrics/src/key.rs index 881fce29..c74f2405 100644 --- a/metrics/src/key.rs +++ b/metrics/src/key.rs @@ -23,6 +23,11 @@ impl KeyName { pub fn as_str(&self) -> &str { &self.0 } + + /// Consumes this [`KeyName`], returning the inner string. + pub fn into_inner(self) -> SharedString { + self.0 + } } impl From for KeyName diff --git a/metrics/src/metadata.rs b/metrics/src/metadata.rs index a3d1d46e..eae43ed2 100644 --- a/metrics/src/metadata.rs +++ b/metrics/src/metadata.rs @@ -102,11 +102,16 @@ mod tests { #[test] fn level_try_from_valid() { let cases = &[ - ("trace", Level::TRACE), ("TRACE", Level::TRACE), - ("debug", Level::DEBUG), ("DEBUG", Level::DEBUG), - ("info", Level::INFO), ("INFO", Level::INFO), - ("warn", Level::WARN), ("WARN", Level::WARN), - ("error", Level::ERROR), ("ERROR", Level::ERROR), + ("trace", Level::TRACE), + ("TRACE", Level::TRACE), + ("debug", Level::DEBUG), + ("DEBUG", Level::DEBUG), + ("info", Level::INFO), + ("INFO", Level::INFO), + ("warn", Level::WARN), + ("WARN", Level::WARN), + ("error", Level::ERROR), + ("ERROR", Level::ERROR), ]; for (input, expected) in cases { @@ -136,9 +141,7 @@ mod tests { assert!(Level::WARN == Level::WARN); // Now check each level programmatically. - let levels = &[ - Level::TRACE, Level::DEBUG, Level::INFO, Level::WARN, Level::ERROR, - ]; + let levels = &[Level::TRACE, Level::DEBUG, Level::INFO, Level::WARN, Level::ERROR]; for i in 0..levels.len() { let current_level = levels[i];