diff --git a/presto-common/src/main/java/com/facebook/presto/common/RuntimeMetric.java b/presto-common/src/main/java/com/facebook/presto/common/RuntimeMetric.java
index e7864dd179d2a..42174524f08a6 100644
--- a/presto-common/src/main/java/com/facebook/presto/common/RuntimeMetric.java
+++ b/presto-common/src/main/java/com/facebook/presto/common/RuntimeMetric.java
@@ -20,16 +20,74 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
import static com.facebook.presto.common.RuntimeUnit.NONE;
import static java.util.Objects.requireNonNull;
/**
* A metric exposed by a presto operator or connector. It will be aggregated at the query level.
+ * Optionally supports percentile tracking (p90, p95, p99) when explicitly enabled.
*/
@ThriftStruct
public class RuntimeMetric
{
+ /*
+ * Percentile Calculation Algorithm: Fixed-Width Histogram with Auto-Configuration
+ *
+ * This implementation uses a fixed-width histogram to approximate percentiles with bounded memory.
+ * Both the number of buckets and bucket width can be configured, with auto-configured defaults.
+ *
+ * Auto-Configuration Strategy (with default 1000 buckets):
+ * - NANO (latency): 1000 buckets × 1ms (1000μs) = 0-1 second range
+ * - BYTE (data size): 1000 buckets × 1KB (1024 bytes) = 0-1MB range
+ * - NONE (counts): 1000 buckets × 1000 = 0-1 million range
+ *
+ * Algorithm Overview:
+ * 1. Divide the value range into numBuckets fixed-width bins
+ * 2. Each value is mapped to a bucket: bucketIndex = value / bucketWidth
+ * 3. To compute percentile P (e.g., 0.90 for p90):
+ * - Calculate target count: targetCount = ceil(P × totalCount)
+ * - Iterate through buckets, accumulating counts
+ * - When accumulated count >= targetCount, return bucket midpoint
+ *
+ * Example: For 100 latency values with p90 = 0.90:
+ * - targetCount = ceil(0.90 × 100) = 90
+ * - If buckets [0..89] have 85 values and bucket 90 has 10 values
+ * - Accumulated count reaches 95 at bucket 90, so return bucket 90's midpoint
+ *
+ * Accuracy: Within ±(bucketWidth/2) of true percentile
+ * - For NANO: ±0.5ms precision (default)
+ * - For BYTE: ±512 bytes precision (default)
+ * - For NONE: ±500 units precision (default)
+ *
+ * Memory: O(numBuckets) = 8KB (default 1000 buckets × 8 bytes per AtomicLong)
+ *
+ * Configurability:
+ * ✓ Number of buckets is configurable (default 1000)
+ * ✓ Bucket width is configurable (auto-determined by RuntimeUnit if not specified)
+ *
+ * References:
+ * - Prometheus Histogram: https://prometheus.io/docs/concepts/metric_types/#histogram
+ * - HdrHistogram paper: https://www.azul.com/files/HdrHistogram.pdf
+ *
+ * Trade-offs:
+ * ✓ Bounded memory (O(1) space complexity)
+ * ✓ Fast updates (O(1) atomic increment)
+ * ✓ Mergeable across distributed nodes (histogram addition)
+ * ✓ Auto-configured for different metric types
+ * ✓ Configurable number of buckets and bucket width for precision tuning
+ * ✗ Approximate results (accuracy = bucket width)
+ * ✗ Fixed range (values beyond range go to overflow bucket)
+ */
+ private static final int DEFAULT_NUM_BUCKETS = 1000;
+
+ // Number of histogram buckets (configurable, defaults to 1000)
+ private final int numBuckets;
+
+ // Bucket width is determined by the RuntimeUnit or explicitly configured
+ private final long bucketWidth;
+
private final String name;
private final RuntimeUnit unit;
private final AtomicLong sum = new AtomicLong();
@@ -37,36 +95,198 @@ public class RuntimeMetric
private final AtomicLong max = new AtomicLong(Long.MIN_VALUE);
private final AtomicLong min = new AtomicLong(Long.MAX_VALUE);
+ // Optional percentile tracking - only allocated when percentileTrackingEnabled is true
+ private volatile boolean percentileTrackingEnabled;
+ private volatile AtomicLongArray histogramBuckets;
+
+ // Cached percentile values (computed and serialized to JSON, histogram is NOT serialized)
+ private volatile Long p90;
+ private volatile Long p95;
+ private volatile Long p99;
+
/**
- * Creates a new empty RuntimeMetric.
+ * Creates a new empty RuntimeMetric without percentile tracking.
*
* @param name Name of this metric. If used in the presto core code base, this should be a value defined in {@link RuntimeMetricName}. But connectors could use arbitrary names.
* @param unit Unit of this metric. Available units are defined in {@link RuntimeUnit}.
*/
public RuntimeMetric(String name, RuntimeUnit unit)
+ {
+ this(name, unit, false);
+ }
+
+ /**
+ * Creates a new empty RuntimeMetric with optional percentile tracking.
+ *
+ * @param name Name of this metric.
+ * @param unit Unit of this metric.
+ * @param trackPercentiles If true, enables percentile tracking (p90, p95, p99) for all addValue calls. Allocates ~8KB histogram with default 1000 buckets.
+ */
+ public RuntimeMetric(String name, RuntimeUnit unit, boolean trackPercentiles)
+ {
+ this(name, unit, trackPercentiles, -1, -1);
+ }
+
+ /**
+ * Creates a new empty RuntimeMetric with optional percentile tracking and custom bucket width.
+ * Use this constructor when you need finer or coarser granularity than the auto-configured defaults.
+ *
+ * @param name Name of this metric.
+ * @param unit Unit of this metric.
+ * @param trackPercentiles If true, enables percentile tracking (p90, p95, p99) for all addValue calls. Allocates histogram.
+ * @param bucketWidth Custom bucket width for histogram. If <= 0, uses auto-configured value based on unit type.
+ * Examples:
+ * - For sub-millisecond latency: 100 (100μs per bucket, ±50μs precision)
+ * - For multi-second latency: 10000 (10ms per bucket, ±5ms precision)
+ * - For small byte counts: 100 (100 bytes per bucket, ±50 bytes precision)
+ * - For large row counts: 10000 (10k per bucket, ±5k precision)
+ */
+ public RuntimeMetric(String name, RuntimeUnit unit, boolean trackPercentiles, long bucketWidth)
+ {
+ this(name, unit, trackPercentiles, bucketWidth, -1);
+ }
+
+ /**
+ * Creates a new empty RuntimeMetric with optional percentile tracking, custom bucket width, and custom number of buckets.
+ * Use this constructor when you need complete control over histogram configuration.
+ *
+ * @param name Name of this metric.
+ * @param unit Unit of this metric.
+ * @param trackPercentiles If true, enables percentile tracking (p90, p95, p99) for all addValue calls. Allocates histogram.
+ * @param bucketWidth Custom bucket width for histogram. If <= 0, uses auto-configured value based on unit type.
+ * Examples:
+ * - For sub-millisecond latency: 100 (100μs per bucket, ±50μs precision)
+ * - For multi-second latency: 10000 (10ms per bucket, ±5ms precision)
+ * - For small byte counts: 100 (100 bytes per bucket, ±50 bytes precision)
+ * - For large row counts: 10000 (10k per bucket, ±5k precision)
+ * @param numBuckets Number of buckets for histogram. If <= 0, uses default (1000).
+ * Memory usage: numBuckets × 8 bytes (e.g., 1000 buckets = 8KB)
+ * Examples:
+ * - For high precision: 10000 buckets (80KB memory)
+ * - For low memory: 100 buckets (800 bytes memory)
+ * - Default: 1000 buckets (8KB memory)
+ */
+ public RuntimeMetric(String name, RuntimeUnit unit, boolean trackPercentiles, long bucketWidth, int numBuckets)
{
this.name = requireNonNull(name, "name is null");
this.unit = unit == null ? NONE : unit;
+ this.numBuckets = (numBuckets > 0) ? numBuckets : DEFAULT_NUM_BUCKETS;
+ this.bucketWidth = (bucketWidth > 0) ? bucketWidth : determineBucketWidth(this.unit);
+ this.percentileTrackingEnabled = trackPercentiles;
+ if (trackPercentiles) {
+ this.histogramBuckets = new AtomicLongArray(this.numBuckets);
+ }
+ }
+
+ /**
+ * Determines the appropriate bucket width based on the metric's unit type.
+ * This auto-configures the histogram for optimal accuracy based on expected value ranges.
+ *
+ * Auto-Configuration Strategy (with default 1000 buckets):
+ * - NANO: 1ms per bucket for typical latencies (0-1 second range, ±0.5ms precision)
+ * - BYTE: 1KB per bucket for typical data sizes (0-1MB range, ±512 bytes precision)
+ * - NONE: 1000 per bucket for typical counts (0-1 million range, ±500 units precision)
+ *
+ * Override with custom bucket width and/or numBuckets in constructor for:
+ * - Sub-millisecond precision: Use 100,000 ns per bucket (100μs, 0-100ms range with 1000 buckets, ±50μs precision)
+ * - Multi-second latencies: Use 10,000,000 ns per bucket (10ms, 0-10s range with 1000 buckets, ±5ms precision)
+ * - Small byte counts: Use 100 bytes per bucket (0-100KB range with 1000 buckets, ±50 bytes precision)
+ * - Large counts: Use 10,000 per bucket (0-10M range with 1000 buckets, ±5k precision)
+ * - High precision: Use more buckets (e.g., 10000 buckets for finer granularity)
+ * - Low memory: Use fewer buckets (e.g., 100 buckets to reduce memory footprint)
+ *
+ * @param unit The RuntimeUnit for this metric
+ * @return The bucket width in the unit's native scale
+ */
+ private static long determineBucketWidth(RuntimeUnit unit)
+ {
+ switch (unit) {
+ case NANO:
+ // For nanosecond timing metrics (latency, CPU time, etc.)
+ // Default: 1000 buckets × 1,000,000 ns (1ms) = 0-1 second range
+ // Precision: ±0.5ms
+ // Override bucketWidth and/or numBuckets for different precision or range
+ return 1_000_000; // 1 millisecond in nanoseconds
+ case BYTE:
+ // For byte count metrics (memory, network, disk I/O)
+ // Default: 1000 buckets × 1024 bytes (1KB) = 0-1MB range
+ // Precision: ±512 bytes
+ // Override bucketWidth and/or numBuckets for different precision or range
+ return 1024;
+ case NONE:
+ // For dimensionless counts (rows, operations, events)
+ // Default: 1000 buckets × 1000 = 0-1 million range
+ // Precision: ±500 units
+ // Override bucketWidth and/or numBuckets for different precision or range
+ return 1000;
+ default:
+ // Fallback for any future unit types
+ return 1000;
+ }
}
public static RuntimeMetric copyOf(RuntimeMetric metric)
+ {
+ return copyOf(metric, metric.getName());
+ }
+
+ public static RuntimeMetric copyOf(RuntimeMetric metric, String newName)
{
requireNonNull(metric, "metric is null");
- return new RuntimeMetric(metric.getName(), metric.getUnit(), metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
+ requireNonNull(newName, "newName is null");
+ RuntimeMetric copy = new RuntimeMetric(newName, metric.getUnit(),
+ metric.percentileTrackingEnabled, metric.bucketWidth, metric.numBuckets);
+ copy.set(metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
+ if (metric.histogramBuckets != null) {
+ for (int i = 0; i < metric.numBuckets; i++) {
+ copy.histogramBuckets.set(i, metric.histogramBuckets.get(i));
+ }
+ }
+ copy.p90 = metric.p90;
+ copy.p95 = metric.p95;
+ copy.p99 = metric.p99;
+ return copy;
}
- @JsonCreator
@ThriftConstructor
+ public RuntimeMetric(String name, RuntimeUnit unit, long sum, long count, long max, long min)
+ {
+ this.name = requireNonNull(name, "name is null");
+ this.unit = unit == null ? NONE : unit;
+ this.numBuckets = DEFAULT_NUM_BUCKETS;
+ this.bucketWidth = determineBucketWidth(this.unit);
+ set(sum, count, max, min);
+ // No percentile tracking for this constructor (backward compatibility)
+ this.percentileTrackingEnabled = false;
+ }
+
+ @JsonCreator
public RuntimeMetric(
@JsonProperty("name") String name,
@JsonProperty("unit") RuntimeUnit unit,
@JsonProperty("sum") long sum,
@JsonProperty("count") long count,
@JsonProperty("max") long max,
- @JsonProperty("min") long min)
+ @JsonProperty("min") long min,
+ @JsonProperty("numBuckets") Integer numBuckets,
+ @JsonProperty("bucketWidth") Long bucketWidth,
+ @JsonProperty("p90") Long p90,
+ @JsonProperty("p95") Long p95,
+ @JsonProperty("p99") Long p99)
{
- this(name, unit);
+ this.name = requireNonNull(name, "name is null");
+ this.unit = unit == null ? NONE : unit;
+ // Use provided numBuckets or default if null (for backward compatibility with old JSON)
+ this.numBuckets = (numBuckets != null && numBuckets > 0) ? numBuckets : DEFAULT_NUM_BUCKETS;
+ // Use provided bucketWidth or auto-configured value if null (for backward compatibility)
+ this.bucketWidth = (bucketWidth != null && bucketWidth > 0) ? bucketWidth : determineBucketWidth(this.unit);
set(sum, count, max, min);
+
+ // Deserialized metrics provide read-only percentile snapshots
+ this.p90 = p90;
+ this.p95 = p95;
+ this.p99 = p99;
+ this.percentileTrackingEnabled = false;
}
private void set(long sum, long count, long max, long min)
@@ -81,7 +301,32 @@ public void set(RuntimeMetric metric)
{
requireNonNull(metric, "metric is null");
checkState(unit == metric.getUnit(), "The metric must have the same unit type as the current one.");
+ checkState(bucketWidth == metric.bucketWidth, "The metric to be merged must have the same bucket width as the current one.");
+ checkState(numBuckets == metric.numBuckets, "The metric to be merged must have the same bucket number as the current one.");
+
set(metric.getSum(), metric.getCount(), metric.getMax(), metric.getMin());
+
+ // Copy percentile tracking state
+ this.percentileTrackingEnabled = metric.percentileTrackingEnabled;
+ if (metric.histogramBuckets != null) {
+ if (this.histogramBuckets == null) {
+ this.histogramBuckets = new AtomicLongArray(this.numBuckets);
+ }
+ for (int i = 0; i < this.numBuckets; i++) {
+ this.histogramBuckets.set(i, metric.histogramBuckets.get(i));
+ }
+ }
+ this.p90 = metric.p90;
+ this.p95 = metric.p95;
+ this.p99 = metric.p99;
+ }
+
+ /**
+ * Check if percentile tracking is enabled for this metric.
+ */
+ public boolean isPercentileTrackingEnabled()
+ {
+ return percentileTrackingEnabled;
}
@JsonProperty
@@ -91,12 +336,62 @@ public String getName()
return name;
}
+ /**
+ * Add a value to this metric.
+ * If percentile tracking is enabled (via constructor), the value is added to the histogram.
+ *
+ * @param value The value to add
+ */
public void addValue(long value)
{
sum.addAndGet(value);
count.incrementAndGet();
max.accumulateAndGet(value, Math::max);
min.accumulateAndGet(value, Math::min);
+
+ // Update histogram if percentile tracking is enabled
+ if (percentileTrackingEnabled && histogramBuckets != null) {
+ int bucketIndex = getBucketIndex(value);
+ histogramBuckets.incrementAndGet(bucketIndex);
+ }
+ }
+
+ /**
+ * Maps a value to its histogram bucket index.
+ *
+ * Algorithm: bucketIndex = floor(value / bucketWidth)
+ *
+ * Examples with auto-configured bucket widths and default 1000 buckets:
+ *
+ * For NANO (latency) with bucketWidth = 1000μs:
+ * - value = 500μs → bucket 0 (0-999μs)
+ * - value = 1500μs → bucket 1 (1000-1999μs)
+ * - value = 999000μs → bucket 999 (999000-999999μs, ~1 second)
+ * - value = 2000000μs → bucket 999 (overflow, clamped to last bucket)
+ *
+ * For BYTE (data size) with bucketWidth = 1024 bytes:
+ * - value = 512 bytes → bucket 0 (0-1023 bytes)
+ * - value = 2048 bytes → bucket 2 (2048-3071 bytes, ~2KB)
+ * - value = 1023KB → bucket 999 (1022KB-1023KB, ~1MB max)
+ *
+ * For NONE (counts) with bucketWidth = 1000:
+ * - value = 500 → bucket 0 (0-999)
+ * - value = 50000 → bucket 50 (50000-50999)
+ * - value = 999000 → bucket 999 (999000-999999, ~1 million max)
+ *
+ * Overflow handling: Values beyond (numBuckets × bucketWidth) go to the last bucket.
+ * This means very large outliers are grouped together but still counted.
+ */
+ private int getBucketIndex(long value)
+ {
+ if (value < 0) {
+ return 0;
+ }
+ long bucketIndex = value / bucketWidth;
+ if (bucketIndex >= numBuckets) {
+ return numBuckets - 1; // Overflow bucket for very large values
+ }
+ return (int) bucketIndex;
}
/**
@@ -130,6 +425,22 @@ public void mergeWith(RuntimeMetric metric)
count.addAndGet(metric.getCount());
max.accumulateAndGet(metric.getMax(), Math::max);
min.accumulateAndGet(metric.getMin(), Math::min);
+
+ // Merge histogram data if both have percentile tracking enabled
+ if (percentileTrackingEnabled && metric.percentileTrackingEnabled &&
+ histogramBuckets != null && metric.histogramBuckets != null) {
+ // Validate that both metrics have the same bucket configuration
+ checkState(bucketWidth == metric.bucketWidth, "The metric to be merged must have the same bucket width as the current one.");
+ checkState(numBuckets == metric.numBuckets, "The metric to be merged must have the same bucket number as the current one.");
+
+ for (int i = 0; i < numBuckets; i++) {
+ histogramBuckets.addAndGet(i, metric.histogramBuckets.get(i));
+ }
+ }
+ // Invalidate cached percentiles after merge
+ p90 = null;
+ p95 = null;
+ p99 = null;
}
@JsonProperty
@@ -167,10 +478,247 @@ public RuntimeUnit getUnit()
return unit;
}
+ /**
+ * Get the number of histogram buckets configured for this metric.
+ * Only relevant when percentile tracking is enabled.
+ * Note: This is only available via JSON, not Thrift.
+ *
+ * @return The number of buckets, or default value if not explicitly configured
+ */
+ @JsonProperty
+ public Integer getNumBuckets()
+ {
+ if (!percentileTrackingEnabled) {
+ return null;
+ }
+ return numBuckets;
+ }
+
+ /**
+ * Get the bucket width (in native units) configured for this metric.
+ * Only relevant when percentile tracking is enabled.
+ * Note: This is only available via JSON, not Thrift.
+ *
+ * @return The bucket width in the metric's native unit scale
+ */
+ @JsonProperty
+ public Long getBucketWidth()
+ {
+ if (!percentileTrackingEnabled) {
+ return null;
+ }
+ return bucketWidth;
+ }
+
+ /**
+ * Get the 90th percentile (null if percentile tracking not enabled).
+ * Note: This is only available via JSON, not Thrift.
+ * For JSON-deserialized metrics, returns the cached snapshot value.
+ */
+ @JsonProperty
+ public Long getP90()
+ {
+ if (!percentileTrackingEnabled) {
+ // For JSON-deserialized metrics, return the cached read-only snapshot
+ return p90;
+ }
+ // For live tracking, compute and cache
+ if (p90 == null) {
+ long computed = computePercentile(0.90);
+ p90 = computed >= 0 ? computed : null;
+ }
+ return p90;
+ }
+
+ /**
+ * Get the 95th percentile (null if percentile tracking not enabled).
+ * Note: This is only available via JSON, not Thrift.
+ * For JSON-deserialized metrics, returns the cached snapshot value.
+ */
+ @JsonProperty
+ public Long getP95()
+ {
+ if (!percentileTrackingEnabled) {
+ // For JSON-deserialized metrics, return the cached read-only snapshot
+ return p95;
+ }
+ // For live tracking, compute and cache
+ if (p95 == null) {
+ long computed = computePercentile(0.95);
+ p95 = computed >= 0 ? computed : null;
+ }
+ return p95;
+ }
+
+ /**
+ * Get the 99th percentile (null if percentile tracking not enabled).
+ * Note: This is only available via JSON, not Thrift.
+ * For JSON-deserialized metrics, returns the cached snapshot value.
+ */
+ @JsonProperty
+ public Long getP99()
+ {
+ if (!percentileTrackingEnabled) {
+ // For JSON-deserialized metrics, return the cached read-only snapshot
+ return p99;
+ }
+ // For live tracking, compute and cache
+ if (p99 == null) {
+ long computed = computePercentile(0.99);
+ p99 = computed >= 0 ? computed : null;
+ }
+ return p99;
+ }
+
+ /**
+ * Calculate percentile value from histogram using cumulative distribution.
+ *
+ * Algorithm (based on "Nearest Rank" method):
+ * 1. targetCount = ceil(percentile × totalCount)
+ * 2. Iterate through buckets, accumulating counts
+ * 3. When accumulated >= targetCount, return bucket midpoint
+ *
+ * Example: p90 with 100 values
+ * - targetCount = ceil(0.90 × 100) = 90
+ * - If bucket distribution is:
+ * bucket[0..88]: 89 values (accumulated = 89)
+ * bucket[89]: 5 values (accumulated = 94) ← 94 >= 90, so return this bucket
+ * - Return: bucket[89] midpoint = 89 × 1000 + 500 = 89,500μs
+ *
+ * Why return bucket midpoint?
+ * - We don't know exact value distribution within the bucket
+ * - Midpoint is the unbiased estimator (minimizes expected error)
+ * - Alternative: bucket start (pessimistic) or bucket end (optimistic)
+ *
+ * Accuracy Analysis:
+ * - True percentile is somewhere in the bucket: [i×bucketWidth, (i+1)×bucketWidth)
+ * - Midpoint: i×bucketWidth + bucketWidth/2
+ * - Maximum error: ±bucketWidth/2
+ * - For NANO (1ms buckets): ±0.5ms error
+ * - For BYTE (1KB buckets): ±512 bytes error
+ * - For NONE (1000 unit buckets): ±500 units error
+ *
+ * Why ceil() for targetCount?
+ * - ceil(0.90 × 100) = 90 means "at least 90 values must be <= p90"
+ * - This matches Excel PERCENTILE.INC function behavior
+ * - Alternative: floor() would give PERCENTILE.EXC behavior
+ *
+ * Auto-Configuration:
+ * - Bucket width automatically determined by RuntimeUnit
+ * - Ensures appropriate precision for different metric types
+ * - No configuration needed from caller
+ *
+ * References:
+ * - NIST: https://www.itl.nist.gov/div898/handbook/prc/section2/prc252.htm
+ * - Numpy percentile: https://numpy.org/doc/stable/reference/generated/numpy.percentile.html
+ *
+ * @param percentile The percentile to calculate (0.0 to 1.0, e.g., 0.90 for p90)
+ * @return The approximate percentile value, or -1 if tracking is disabled
+ */
+ private long computePercentile(double percentile)
+ {
+ if (!percentileTrackingEnabled || histogramBuckets == null) {
+ return -1;
+ }
+
+ if (percentile < 0.0 || percentile > 1.0) {
+ throw new IllegalArgumentException("Percentile must be between 0.0 and 1.0, got: " + percentile);
+ }
+
+ long totalCount = count.get();
+ if (totalCount == 0) {
+ return 0;
+ }
+
+ // Find the target count for the percentile (using "Nearest Rank" method)
+ long targetCount = (long) Math.ceil(percentile * totalCount);
+
+ // Accumulate counts through buckets - no synchronization needed with AtomicLongArray
+ long accumulatedCount = 0;
+ for (int i = 0; i < numBuckets; i++) {
+ accumulatedCount += histogramBuckets.get(i);
+ if (accumulatedCount >= targetCount) {
+ // Calculate the midpoint of the bucket as the percentile estimate
+ long percentileValue = i * bucketWidth + bucketWidth / 2;
+
+ // Clamp to [min, max] range to ensure accuracy with small samples or large bucket widths
+ // Example: If only value 100,000 is in bucket 0 (0-999,999), midpoint 500,000 > max 100,000
+ // Clamping ensures the percentile never exceeds actual data bounds
+ return Math.max(min.get(), Math.min(percentileValue, max.get()));
+ }
+ }
+
+ // If we reach here (all buckets processed), return the max value
+ // Race condition: Even with thread-safe AtomicLongArray, this can happen due to update ordering.
+ // In addValue(), count is incremented BEFORE the histogram bucket is updated. This creates
+ // a tiny window where count.get() returns N, but histogram only contains N-1 values.
+ // Example: Thread A reads totalCount=100, Thread B increments count to 101 but hasn't yet
+ // updated histogram, Thread A iterates and only finds 100 values, missing targetCount=99.
+ // Returning max.get() is a safe approximation for high percentiles in this rare edge case.
+ return max.get();
+ }
+
private static void checkState(boolean condition, String message)
{
if (!condition) {
throw new IllegalStateException(message);
}
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("RuntimeMetric{");
+ sb.append("name='").append(name).append('\'');
+ sb.append(", unit=").append(unit);
+ sb.append(", count=").append(count.get());
+ sb.append(", sum=").append(sum.get());
+ sb.append(", min=").append(min.get());
+ sb.append(", max=").append(max.get());
+
+ // Check if we have percentile values (either from live tracking or JSON deserialization)
+ Long p90Value = this.p90;
+ Long p95Value = this.p95;
+ Long p99Value = this.p99;
+ boolean hasPercentileValues = (p90Value != null || p95Value != null || p99Value != null);
+
+ if (percentileTrackingEnabled) {
+ sb.append(", percentileTracking=enabled");
+ sb.append(", numBuckets=").append(numBuckets);
+ sb.append(", bucketWidth=").append(bucketWidth);
+ }
+ else if (hasPercentileValues) {
+ // Read-only percentile snapshot (from JSON deserialization)
+ sb.append(", percentileTracking=disabled(read-only snapshot)");
+ sb.append(", numBuckets=").append(numBuckets);
+ sb.append(", bucketWidth=").append(bucketWidth);
+ }
+ else {
+ sb.append(", percentileTracking=disabled");
+ }
+
+ // Show percentile values if available (either from live tracking or JSON)
+ if (hasPercentileValues) {
+ sb.append(", percentiles={");
+ if (p90Value != null) {
+ sb.append("p90=").append(p90Value);
+ }
+ if (p95Value != null) {
+ if (p90Value != null) {
+ sb.append(", ");
+ }
+ sb.append("p95=").append(p95Value);
+ }
+ if (p99Value != null) {
+ if (p90Value != null || p95Value != null) {
+ sb.append(", ");
+ }
+ sb.append("p99=").append(p99Value);
+ }
+ sb.append('}');
+ }
+
+ sb.append('}');
+ return sb.toString();
+ }
}
diff --git a/presto-common/src/main/java/com/facebook/presto/common/RuntimeStats.java b/presto-common/src/main/java/com/facebook/presto/common/RuntimeStats.java
index 2caa105c3d22b..1ebdf8ab79747 100644
--- a/presto-common/src/main/java/com/facebook/presto/common/RuntimeStats.java
+++ b/presto-common/src/main/java/com/facebook/presto/common/RuntimeStats.java
@@ -49,7 +49,7 @@ public RuntimeStats()
public RuntimeStats(Map metrics)
{
requireNonNull(metrics, "metrics is null");
- metrics.forEach((name, newMetric) -> this.metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
+ metrics.forEach((name, newMetric) -> this.metrics.put(name, RuntimeMetric.copyOf(newMetric)));
}
public static RuntimeStats copyOf(RuntimeStats stats)
@@ -95,6 +95,11 @@ public void addMetricValue(String name, RuntimeUnit unit, long value)
metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, unit)).addValue(value);
}
+ public void addMetricValue(String name, RuntimeUnit unit, long value, boolean trackPercentiles)
+ {
+ metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, unit, trackPercentiles)).addValue(value);
+ }
+
public void addMetricValueIgnoreZero(String name, RuntimeUnit unit, long value)
{
if (value == 0) {
@@ -108,7 +113,15 @@ public void addMetricValueIgnoreZero(String name, RuntimeUnit unit, long value)
*/
public void mergeMetric(String name, RuntimeMetric metric)
{
- metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, metric.getUnit())).mergeWith(metric);
+ RuntimeMetric existing = metrics.get(name);
+ if (existing == null) {
+ // First time seeing this metric - create a copy to preserve configuration
+ metrics.put(name, RuntimeMetric.copyOf(metric, name));
+ }
+ else {
+ // Metric already exists - merge into it
+ existing.mergeWith(metric);
+ }
}
/**
@@ -119,7 +132,17 @@ public void mergeWith(RuntimeStats stats)
if (stats == null) {
return;
}
- stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).mergeWith(newMetric));
+ stats.getMetrics().forEach((name, newMetric) -> {
+ RuntimeMetric existing = metrics.get(name);
+ if (existing == null) {
+ // First time seeing this metric - create a copy to preserve configuration
+ metrics.put(name, RuntimeMetric.copyOf(newMetric, name));
+ }
+ else {
+ // Metric already exists - merge into it
+ existing.mergeWith(newMetric);
+ }
+ });
}
/**
@@ -131,7 +154,17 @@ public void update(RuntimeStats stats)
if (stats == null) {
return;
}
- stats.getMetrics().forEach((name, newMetric) -> metrics.computeIfAbsent(name, k -> new RuntimeMetric(name, newMetric.getUnit())).set(newMetric));
+ stats.getMetrics().forEach((name, newMetric) -> {
+ RuntimeMetric existing = metrics.get(name);
+ if (existing == null) {
+ // First time seeing this metric - copy it entirely to establish configuration
+ metrics.put(name, RuntimeMetric.copyOf(newMetric, name));
+ }
+ else {
+ // Metric exists - update values only (set() validates matching configurations)
+ existing.set(newMetric);
+ }
+ });
}
public V recordWallTime(String tag, Supplier supplier)
diff --git a/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeMetric.java b/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeMetric.java
index 7e72bfa4ab8cb..2c816edc3f5b7 100644
--- a/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeMetric.java
+++ b/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeMetric.java
@@ -20,6 +20,8 @@
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.common.RuntimeUnit.NONE;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class TestRuntimeMetric
{
@@ -138,4 +140,688 @@ public void testJsonWhenUnitIsUnavailable()
assertRuntimeMetricEquals(actual, metric1);
assertRuntimeMetricEquals(actual, metric2);
}
+
+ @Test
+ public void testPercentilesDisabledByDefault()
+ {
+ RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO);
+ metric.addValue(100);
+ metric.addValue(200);
+ metric.addValue(300);
+
+ assertEquals(metric.isPercentileTrackingEnabled(), false);
+ assertEquals(metric.getP90(), null);
+ assertEquals(metric.getP95(), null);
+ assertEquals(metric.getP99(), null);
+ }
+
+ @Test
+ public void testPercentilesWithTracking()
+ {
+ RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+
+ // Add 100 values from 0 to 99ms in nanoseconds
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 1_000_000); // Convert to nanoseconds
+ }
+
+ assertEquals(metric.isPercentileTrackingEnabled(), true);
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getSum(), 4_950_000_000L); // Sum in nanoseconds
+
+ // Verify percentiles are computed
+ Long p90 = metric.getP90();
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+
+ assertEquals(p90 != null, true);
+ assertEquals(p95 != null, true);
+ assertEquals(p99 != null, true);
+
+ // Verify percentile values are in reasonable range (approximate due to bucketing)
+ assertEquals(p90 >= 88_000_000 && p90 <= 92_000_000, true);
+ assertEquals(p95 >= 93_000_000 && p95 <= 97_000_000, true);
+ assertEquals(p99 >= 97_000_000 && p99 <= 100_000_000, true);
+ }
+
+ @Test
+ public void testPercentileJsonSerialization()
+ {
+ RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+
+ // Add values with percentile tracking (in nanoseconds)
+ for (int i = 0; i < 50; i++) {
+ metric.addValue(i * 1_000_000);
+ }
+
+ // Serialize to JSON
+ JsonCodec codec = JsonCodec.jsonCodec(RuntimeMetric.class);
+ String json = codec.toJson(metric);
+
+ // Verify JSON contains percentile values
+ assertEquals(json.contains("\"p90\""), true);
+ assertEquals(json.contains("\"p95\""), true);
+ assertEquals(json.contains("\"p99\""), true);
+
+ // Deserialize and verify percentiles are preserved
+ RuntimeMetric deserialized = codec.fromJson(json);
+ assertEquals(deserialized.isPercentileTrackingEnabled(), false);
+ assertEquals(deserialized.getP90(), metric.getP90());
+ assertEquals(deserialized.getP95(), metric.getP95());
+ assertEquals(deserialized.getP99(), metric.getP99());
+ }
+
+ @Test
+ public void testPercentileCopy()
+ {
+ RuntimeMetric metric = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+
+ // Add values with percentile tracking (in nanoseconds)
+ for (int i = 0; i < 50; i++) {
+ metric.addValue(i * 1_000_000);
+ }
+
+ RuntimeMetric copy = RuntimeMetric.copyOf(metric);
+
+ // Verify percentiles are copied
+ assertEquals(copy.isPercentileTrackingEnabled(), true);
+ assertEquals(copy.getP90(), metric.getP90());
+ assertEquals(copy.getP95(), metric.getP95());
+ assertEquals(copy.getP99(), metric.getP99());
+
+ // Verify copy is independent
+ metric.addValue(100_000_000);
+ assertEquals(copy.getCount() != metric.getCount(), true);
+ }
+
+ @Test
+ public void testPercentileMerge()
+ {
+ RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+ RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+
+ // Add values with percentile tracking to both metrics (in nanoseconds)
+ for (int i = 0; i < 50; i++) {
+ metric1.addValue(i * 1_000_000);
+ }
+ for (int i = 50; i < 100; i++) {
+ metric2.addValue(i * 1_000_000);
+ }
+
+ RuntimeMetric merged = RuntimeMetric.merge(metric1, metric2);
+
+ // Verify merged metrics have correct count
+ assertEquals(merged.getCount(), 100);
+ assertEquals(merged.isPercentileTrackingEnabled(), true);
+
+ // Verify percentiles are recomputed after merge
+ Long p90 = merged.getP90();
+ assertEquals(p90 != null, true);
+ assertEquals(p90 >= 88_000_000 && p90 <= 92_000_000, true);
+ }
+
+ @Test
+ public void testPercentileMergeWith()
+ {
+ RuntimeMetric metric1 = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+ RuntimeMetric metric2 = new RuntimeMetric(TEST_METRIC_NAME, NANO, true);
+
+ // Add values with percentile tracking (in nanoseconds)
+ for (int i = 0; i < 50; i++) {
+ metric1.addValue(i * 1_000_000);
+ }
+ for (int i = 50; i < 100; i++) {
+ metric2.addValue(i * 1_000_000);
+ }
+
+ metric1.mergeWith(metric2);
+
+ // Verify merged count
+ assertEquals(metric1.getCount(), 100);
+ assertEquals(metric1.isPercentileTrackingEnabled(), true);
+
+ // Verify percentiles are available after merge
+ assertEquals(metric1.getP90() != null, true);
+ assertEquals(metric1.getP95() != null, true);
+ assertEquals(metric1.getP99() != null, true);
+ }
+
+ @Test
+ public void testByteUnitWithDefaultBucketWidth()
+ {
+ // BYTE unit defaults to 1KB (1024 bytes) bucket width
+ RuntimeMetric metric = new RuntimeMetric("data_size", RuntimeUnit.BYTE, true);
+
+ // Add 100 values from 0 to 99KB (in bytes)
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 1024);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 99 * 1024);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 1KB buckets, precision should be ±512 bytes
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 88 * 1024 && p90 <= 92 * 1024, "p90 was " + p90 + " bytes");
+
+ Long p95 = metric.getP95();
+ assertTrue(p95 != null);
+ assertTrue(p95 >= 93 * 1024 && p95 <= 97 * 1024, "p95 was " + p95 + " bytes");
+ }
+
+ @Test
+ public void testByteUnitWithCustomBucketWidth()
+ {
+ // Custom: 100 bytes per bucket for small file sizes
+ RuntimeMetric metric = new RuntimeMetric("small_file_size", RuntimeUnit.BYTE, true, 100);
+
+ // Add 100 values from 0 to 9900 bytes (0-9.9KB)
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 100);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 9900);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 100 byte buckets, precision should be ±50 bytes
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 8850 && p90 <= 9050, "p90 was " + p90 + " bytes (expected ~8900-9000 with ±50 bytes precision)");
+
+ Long p99 = metric.getP99();
+ assertTrue(p99 != null);
+ assertTrue(p99 >= 9750 && p99 <= 10000, "p99 was " + p99 + " bytes");
+ }
+
+ @Test
+ public void testNoneUnitWithDefaultBucketWidth()
+ {
+ // NONE unit defaults to 1000 per bucket
+ RuntimeMetric metric = new RuntimeMetric("row_count", RuntimeUnit.NONE, true);
+
+ // Add 100 values from 0 to 99,000
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 1000);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 99000);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 1000 unit buckets, precision should be ±500 units
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 88000 && p90 <= 92000, "p90 was " + p90);
+
+ Long p95 = metric.getP95();
+ assertTrue(p95 != null);
+ assertTrue(p95 >= 93000 && p95 <= 97000, "p95 was " + p95);
+ }
+
+ @Test
+ public void testNoneUnitWithCustomBucketWidth()
+ {
+ // Custom: 10,000 per bucket for large row counts
+ RuntimeMetric metric = new RuntimeMetric("large_row_count", RuntimeUnit.NONE, true, 10000);
+
+ // Add 100 values from 0 to 990,000
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 10000);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 990000);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 10,000 unit buckets, precision should be ±5,000 units
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 885000 && p90 <= 905000, "p90 was " + p90 + " (expected ~890k-900k with ±5k precision)");
+
+ Long p99 = metric.getP99();
+ assertTrue(p99 != null);
+ assertTrue(p99 >= 975000 && p99 <= 1000000, "p99 was " + p99);
+ }
+
+ @Test
+ public void testLargeByteCountsWithCustomBucketWidth()
+ {
+ // Custom: 1MB (1048576 bytes) per bucket for large data sizes
+ RuntimeMetric metric = new RuntimeMetric("large_data_transfer", RuntimeUnit.BYTE, true, 1048576);
+
+ // Add 100 values from 0 to 99MB (in bytes)
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 1048576L);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 99L * 1048576);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 1MB buckets, precision should be ±0.5MB
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ long expectedP90 = 89L * 1048576; // ~89MB
+ assertTrue(Math.abs(p90 - expectedP90) <= 2 * 1048576, "p90 was " + p90 + " bytes (expected ~" + expectedP90 + " ±2MB)");
+ }
+
+ @Test
+ public void testSmallRowCountsWithCustomBucketWidth()
+ {
+ // Custom: 10 per bucket for small row counts
+ RuntimeMetric metric = new RuntimeMetric("small_batch_size", RuntimeUnit.NONE, true, 10);
+
+ // Add 100 values from 0 to 990
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(i * 10);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 990);
+ assertTrue(metric.isPercentileTrackingEnabled());
+
+ // With 10 unit buckets, precision should be ±5 units
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 885 && p90 <= 905, "p90 was " + p90 + " (expected ~890-900 with ±5 precision)");
+ }
+
+ @Test
+ public void testDifferentUnitsComparison()
+ {
+ // Create metrics with different units, all tracking percentiles
+ RuntimeMetric nanoMetric = new RuntimeMetric("latency", NANO, true);
+ RuntimeMetric byteMetric = new RuntimeMetric("data_size", RuntimeUnit.BYTE, true);
+ RuntimeMetric noneMetric = new RuntimeMetric("row_count", RuntimeUnit.NONE, true);
+
+ // Add 100 values to each
+ for (int i = 0; i < 100; i++) {
+ nanoMetric.addValue(i * 1_000_000); // 0-99ms in nanoseconds
+ byteMetric.addValue(i * 1024); // 0-99KB in bytes
+ noneMetric.addValue(i * 1000); // 0-99k in count
+ }
+
+ // All should have percentile tracking enabled
+ assertTrue(nanoMetric.isPercentileTrackingEnabled());
+ assertTrue(byteMetric.isPercentileTrackingEnabled());
+ assertTrue(noneMetric.isPercentileTrackingEnabled());
+
+ // All should have valid percentiles
+ assertTrue(nanoMetric.getP90() != null);
+ assertTrue(byteMetric.getP90() != null);
+ assertTrue(noneMetric.getP90() != null);
+
+ // Verify each is in appropriate range for its unit
+ Long nanoP90 = nanoMetric.getP90();
+ assertTrue(nanoP90 >= 88_000_000 && nanoP90 <= 92_000_000, "nano p90 was " + nanoP90);
+
+ Long byteP90 = byteMetric.getP90();
+ assertTrue(byteP90 >= 88 * 1024 && byteP90 <= 92 * 1024, "byte p90 was " + byteP90);
+
+ Long noneP90 = noneMetric.getP90();
+ assertTrue(noneP90 >= 88000 && noneP90 <= 92000, "none p90 was " + noneP90);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*must have the same unit type.*")
+ public void testMergeMetricsWithDifferentUnits()
+ {
+ // Create two metrics with different units
+ RuntimeMetric m1 = new RuntimeMetric("metric1", NANO, true);
+ RuntimeMetric m2 = new RuntimeMetric("metric2", RuntimeUnit.BYTE, true);
+
+ // Add values to both
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 1_000_000);
+ m2.addValue(i * 1024);
+ }
+
+ // This should throw IllegalStateException due to different units
+ m1.mergeWith(m2);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*must have the same bucket width.*")
+ public void testMergeMetricsWithDifferentBucketWidths()
+ {
+ // Create two metrics with same unit but different bucket widths
+ RuntimeMetric m1 = new RuntimeMetric("latency", NANO, true, 1_000_000); // 1ms buckets
+ RuntimeMetric m2 = new RuntimeMetric("latency", NANO, true, 10_000_000); // 10ms buckets
+
+ // Add values to both
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 1_000_000);
+ m2.addValue(i * 1_000_000);
+ }
+
+ // This should throw IllegalStateException due to different bucket widths
+ m1.mergeWith(m2);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*must have the same bucket number.*")
+ public void testMergeMetricsWithDifferentNumBuckets()
+ {
+ // Create two metrics with same unit and bucket width but different number of buckets
+ RuntimeMetric m1 = new RuntimeMetric("latency", NANO, true, 1_000_000, 1000); // 1000 buckets
+ RuntimeMetric m2 = new RuntimeMetric("latency", NANO, true, 1_000_000, 500); // 500 buckets
+
+ // Add values to both
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 1_000_000);
+ m2.addValue(i * 1_000_000);
+ }
+
+ // This should throw IllegalStateException due to different number of buckets
+ m1.mergeWith(m2);
+ }
+
+ @Test
+ public void testMergeMetricsWithSameBucketWidth()
+ {
+ // Create two metrics with same unit AND same custom bucket width
+ RuntimeMetric m1 = new RuntimeMetric("latency", NANO, true, 100_000); // 100μs buckets
+ RuntimeMetric m2 = new RuntimeMetric("latency", NANO, true, 100_000); // 100μs buckets
+
+ // Add values to both
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 100_000);
+ }
+ for (int i = 50; i < 100; i++) {
+ m2.addValue(i * 100_000);
+ }
+
+ // This should succeed
+ RuntimeMetric merged = RuntimeMetric.merge(m1, m2);
+
+ assertEquals(merged.getCount(), 100);
+ assertTrue(merged.isPercentileTrackingEnabled());
+
+ Long p90 = merged.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 8_850_000 && p90 <= 9_050_000, "merged p90 was " + p90);
+ }
+
+ @Test
+ public void testMergeWithoutPercentileTracking()
+ {
+ // If neither metric has percentile tracking, merge should work regardless of bucket width
+ RuntimeMetric m1 = new RuntimeMetric("latency", NANO, false, 1_000_000);
+ RuntimeMetric m2 = new RuntimeMetric("latency", NANO, false, 10_000_000);
+
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 1_000_000);
+ m2.addValue(i * 1_000_000);
+ }
+
+ // This should succeed since no histogram data to merge
+ m1.mergeWith(m2);
+
+ assertEquals(m1.getCount(), 100);
+ assertFalse(m1.isPercentileTrackingEnabled());
+ }
+
+ @Test
+ public void testMergeWithOnlyOneHavingPercentiles()
+ {
+ // If only one metric has percentile tracking, merge should work
+ RuntimeMetric m1 = new RuntimeMetric("latency", NANO, true, 1_000_000);
+ RuntimeMetric m2 = new RuntimeMetric("latency", NANO, false); // No percentile tracking
+
+ for (int i = 0; i < 50; i++) {
+ m1.addValue(i * 1_000_000);
+ m2.addValue(i * 1_000_000);
+ }
+
+ // This should succeed - only basic stats are merged from m2
+ m1.mergeWith(m2);
+
+ assertEquals(m1.getCount(), 100);
+ assertTrue(m1.isPercentileTrackingEnabled());
+ // Percentiles are only from m1's data
+ assertTrue(m1.getP90() != null);
+ }
+
+ @Test
+ public void testSingleValueMetric()
+ {
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+ metric.addValue(5_000_000); // 5ms
+
+ assertEquals(metric.getCount(), 1);
+ assertEquals(metric.getMin(), 5_000_000);
+ assertEquals(metric.getMax(), 5_000_000);
+
+ // All percentiles should return the same value (bucket midpoint)
+ Long p90 = metric.getP90();
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+
+ assertTrue(p90 != null);
+ assertTrue(p95 != null);
+ assertTrue(p99 != null);
+
+ // All should be close to the single value (within bucket width)
+ assertTrue(Math.abs(p90 - 5_000_000) <= 1_000_000, "p90 was " + p90);
+ assertTrue(Math.abs(p95 - 5_000_000) <= 1_000_000, "p95 was " + p95);
+ assertTrue(Math.abs(p99 - 5_000_000) <= 1_000_000, "p99 was " + p99);
+ }
+
+ @Test
+ public void testAllValuesInSameBucket()
+ {
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+
+ // Add 100 values all in the same 1ms bucket (500-600μs)
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(500_000 + i * 1000); // 500μs to 599μs
+ }
+
+ assertEquals(metric.getCount(), 100);
+
+ // All percentiles should return the same bucket midpoint
+ Long p90 = metric.getP90();
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+
+ // All should be the midpoint of bucket 0 (0-1ms) = 500μs
+ assertEquals(p90, 500_000L);
+ assertEquals(p95, 500_000L);
+ assertEquals(p99, 500_000L);
+ }
+
+ @Test
+ public void testNegativeValues()
+ {
+ RuntimeMetric metric = new RuntimeMetric("test", NANO, true);
+
+ // Add some negative values (edge case, shouldn't happen in practice but should be handled)
+ metric.addValue(-1_000_000);
+ metric.addValue(-500_000);
+ metric.addValue(1_000_000);
+ metric.addValue(2_000_000);
+
+ assertEquals(metric.getCount(), 4);
+ assertEquals(metric.getMin(), -1_000_000);
+
+ // Negative values go to bucket 0, so percentiles should still work
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ }
+
+ @Test
+ public void testOverflowBucketWithManyOutliers()
+ {
+ // Default NANO config: 1000 buckets × 1ms = 1 second max
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+
+ // Add 90 values in normal range (0-880ms)
+ for (int i = 0; i < 90; i++) {
+ metric.addValue(i * 10_000_000); // 0, 10ms, 20ms, ..., 890ms (last value is 89 * 10ms = 890ms)
+ }
+
+ // Add 10 outliers beyond range (5-5.009 seconds)
+ for (int i = 0; i < 10; i++) {
+ metric.addValue((5000000 + i * 1_000) * 1_000L); // 5000ms, 5001ms, ..., 5009ms (5-5.009 seconds)
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMax(), 5_009_000_000L); // 5.009 seconds
+
+ // p90 should still be in normal range since 90% of values are < 1s
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 < 1_000_000_000, "p90 was " + p90 + " (should be < 1 second)");
+
+ // p99 will include outliers (99th value is one of the outliers)
+ Long p99 = metric.getP99();
+ assertTrue(p99 != null);
+ // p99 will be in the overflow bucket, returning max value
+ assertTrue(p99 >= 900_000_000, "p99 was " + p99);
+ }
+
+ @Test
+ public void testVeryLargeValues()
+ {
+ RuntimeMetric metric = new RuntimeMetric("data_size", RuntimeUnit.BYTE, true);
+
+ // Add values that will overflow the default range (1000 buckets × 1KB = 1MB)
+ metric.addValue(100_000_000); // 100MB
+ metric.addValue(200_000_000); // 200MB
+ metric.addValue(300_000_000); // 300MB
+
+ assertEquals(metric.getCount(), 3);
+ assertEquals(metric.getMax(), 300_000_000);
+
+ // All values overflow, so all go to bucket 999
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ // Should return the overflow bucket midpoint or max
+ assertTrue(p90 > 0);
+ }
+
+ @Test
+ public void testZeroValues()
+ {
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+
+ // Add all zeros
+ for (int i = 0; i < 100; i++) {
+ metric.addValue(0);
+ }
+
+ assertEquals(metric.getCount(), 100);
+ assertEquals(metric.getMin(), 0);
+ assertEquals(metric.getMax(), 0);
+
+ // All percentiles should be 0 (midpoint of bucket 0)
+ Long p90 = metric.getP90();
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+
+ assertTrue(p90 != null);
+ assertTrue(p95 != null);
+ assertTrue(p99 != null);
+
+ // Should all return bucket 0 midpoint (500μs for 1ms bucket width)
+ assertTrue(p90 >= 0 && p90 <= 1_000_000, "p90 was " + p90);
+ assertTrue(p95 >= 0 && p95 <= 1_000_000, "p95 was " + p95);
+ assertTrue(p99 >= 0 && p99 <= 1_000_000, "p99 was " + p99);
+ }
+
+ @Test
+ public void testSmallCountPercentiles()
+ {
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+
+ // Add only 2 values
+ metric.addValue(1_000_000); // 1ms
+ metric.addValue(9_000_000); // 9ms
+
+ assertEquals(metric.getCount(), 2);
+
+ // Percentiles should still work but may not be meaningful
+ Long p90 = metric.getP90();
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+
+ assertTrue(p90 != null);
+ assertTrue(p95 != null);
+ assertTrue(p99 != null);
+
+ // All percentiles point to bucket 9 (9ms value)
+ // Bucket 9 midpoint = 9.5ms, but clamped to max = 9ms
+ assertEquals(p90, Long.valueOf(9_000_000), "p90 should be clamped to max (9ms)");
+ assertEquals(p95, Long.valueOf(9_000_000), "p95 should be clamped to max (9ms)");
+ assertEquals(p99, Long.valueOf(9_000_000), "p99 should be clamped to max (9ms)");
+ }
+
+ @Test
+ public void testCachedPercentilesAfterCopy()
+ {
+ RuntimeMetric original = new RuntimeMetric("latency", NANO, true);
+
+ // Add values and compute percentiles
+ for (int i = 0; i < 100; i++) {
+ original.addValue(i * 1_000_000);
+ }
+
+ Long originalP90 = original.getP90();
+
+ // Copy the metric
+ RuntimeMetric copy = RuntimeMetric.copyOf(original);
+
+ // Copy should have the same cached percentiles
+ assertEquals(copy.getP90(), originalP90);
+
+ // Add more data to original
+ for (int i = 100; i < 200; i++) {
+ original.addValue(i * 1_000_000);
+ }
+
+ // Original's cached percentile is now stale (but that's acceptable)
+ // It will still return the old cached value until invalidated
+ // Copy's percentiles should remain unchanged
+ assertEquals(copy.getP90(), originalP90);
+ }
+
+ @Test
+ public void testSkewedDistribution()
+ {
+ RuntimeMetric metric = new RuntimeMetric("latency", NANO, true);
+
+ // Add 95 fast values (0-94ms)
+ for (int i = 0; i < 95; i++) {
+ metric.addValue(i * 1_000_000);
+ }
+
+ // Add 5 very slow values (500-504ms)
+ for (int i = 500; i < 505; i++) {
+ metric.addValue(i * 1_000_000);
+ }
+
+ assertEquals(metric.getCount(), 100);
+
+ // p90 should still be in the fast range
+ Long p90 = metric.getP90();
+ assertTrue(p90 != null);
+ assertTrue(p90 >= 88_000_000 && p90 <= 92_000_000, "p90 was " + p90 + " (should be ~90ms)");
+
+ // p95 and p99 should reflect the slow outliers
+ Long p95 = metric.getP95();
+ Long p99 = metric.getP99();
+ assertTrue(p95 != null);
+ assertTrue(p99 != null);
+ assertTrue(p95 > 94_000_000, "p95 was " + p95 + " (should include slow values)");
+ assertTrue(p99 > 94_000_000, "p99 was " + p99 + " (should include slow values)");
+ }
}
diff --git a/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeStats.java b/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeStats.java
index 99cba97b6593b..370acf3533fe4 100644
--- a/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeStats.java
+++ b/presto-common/src/test/java/com/facebook/presto/common/TestRuntimeStats.java
@@ -23,7 +23,9 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
public class TestRuntimeStats
{
@@ -268,4 +270,39 @@ public void testRecordWallAndCpuTime()
assertThat(stats.getMetric(TEST_METRIC_NAME_NANO_2).getSum()).isGreaterThanOrEqualTo(MILLISECONDS.toNanos(100));
assertThat(stats.getMetric(TEST_METRIC_NAME_NANO_2 + "OnCpu").getSum()).isLessThan(MILLISECONDS.toNanos(100));
}
+
+ @Test
+ public void testComputeAllPercentilesBeforeSerialization()
+ {
+ // Create a metric with percentile tracking
+ RuntimeMetric metricWithPercentiles = new RuntimeMetric("latency", RuntimeUnit.NANO, true);
+ for (int i = 0; i < 100; i++) {
+ metricWithPercentiles.addValue(i * 1_000_000); // 0ms, 1ms, 2ms, ..., 99ms
+ }
+
+ RuntimeStats runtimeStats = new RuntimeStats();
+ runtimeStats.mergeMetric("latency", metricWithPercentiles);
+
+ // Serialize to JSON - percentiles are computed lazily during serialization via getters
+ JsonCodec codec = JsonCodec.jsonCodec(RuntimeStats.class);
+ String json = codec.toJson(runtimeStats);
+
+ // The JSON should contain configuration and percentile values
+ assertTrue(json.contains("\"numBuckets\""));
+ assertTrue(json.contains("\"bucketWidth\""));
+ assertTrue(json.contains("\"p90\""));
+ assertTrue(json.contains("\"p95\""));
+ assertTrue(json.contains("\"p99\""));
+
+ // Verify the percentile values are reasonable after deserialization
+ RuntimeStats deserialized = codec.fromJson(json);
+ RuntimeMetric deserializedMetric = deserialized.getMetric("latency");
+ assertNotNull(deserializedMetric.getP90());
+ assertNotNull(deserializedMetric.getP95());
+ assertNotNull(deserializedMetric.getP99());
+
+ assertTrue(Math.abs(deserializedMetric.getP90() - 90_000_000) < 1_000_000); // within 1ms
+ assertTrue(Math.abs(deserializedMetric.getP95() - 95_000_000) < 1_000_000);
+ assertTrue(Math.abs(deserializedMetric.getP99() - 99_000_000) < 1_000_000);
+ }
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionStateMachine.java b/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionStateMachine.java
index ff6497f1eac57..95ff1bc4877da 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionStateMachine.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/execution/StageExecutionStateMachine.java
@@ -437,7 +437,7 @@ public void recordTaskUpdateDeliveredTime(long nanos)
@Override
public void recordStartWaitForEventLoop(long nanos)
{
- runtimeStats.addMetricValue(TASK_START_WAIT_FOR_EVENT_LOOP, NANO, max(nanos, 0));
+ runtimeStats.addMetricValue(TASK_START_WAIT_FOR_EVENT_LOOP, NANO, max(nanos, 0), true);
}
public void recordDeliveredUpdates(int updates)