diff --git a/pkg/autoscaler/autoscaler/metric_collector.go b/pkg/autoscaler/autoscaler/metric_collector.go index 169c708c7..df04e47e1 100644 --- a/pkg/autoscaler/autoscaler/metric_collector.go +++ b/pkg/autoscaler/autoscaler/metric_collector.go @@ -62,8 +62,10 @@ func NewMetricCollector(target *v1alpha1.Target, binding *v1alpha1.AutoscalingPo } type HistogramInfo struct { - PodStartTime *metav1.Time - HistogramMap map[string]*histogram.Snapshot + PodStartTime *metav1.Time + HistogramMap map[string]*histogram.Snapshot + CounterMap map[string]float64 + ScrapeTimestamp int64 } type Scope struct { @@ -130,7 +132,7 @@ func (collector *MetricCollector) UpdateMetrics(ctx context.Context, podLister l func (collector *MetricCollector) fetchMetricsFromPods(ctx context.Context, pods []*corev1.Pod, currentHistograms *map[string]HistogramInfo) InstanceInfo { instanceInfo := InstanceInfo{true, false, make(algorithm.Metrics)} - pastHistograms, ok := collector.PastHistograms.GetLastUnfreshSnapshot() + pastHistograms, _, ok := collector.PastHistograms.GetLastUnfreshSnapshotWithTimestamp() if !ok { pastHistograms = make(map[string]HistogramInfo) } @@ -142,13 +144,24 @@ func (collector *MetricCollector) fetchMetricsFromPods(ctx context.Context, pods pastValue, ok := pastHistograms[pod.Name] var pastHistogramMap map[string]*histogram.Snapshot + var pastCounterMap map[string]float64 + var pastScrapeTimestamp int64 if !ok || pod.Status.StartTime == nil || pastValue.PodStartTime == nil || !pod.Status.StartTime.Equal(pastValue.PodStartTime) { pastHistogramMap = make(map[string]*histogram.Snapshot) + pastCounterMap = make(map[string]float64) + pastScrapeTimestamp = 0 } else { pastHistogramMap = pastValue.HistogramMap + if pastValue.CounterMap == nil { + pastCounterMap = make(map[string]float64) + } else { + pastCounterMap = pastValue.CounterMap + } + pastScrapeTimestamp = pastValue.ScrapeTimestamp } currentHistogramMap := make(map[string]*histogram.Snapshot) + currentCounterMap := make(map[string]float64) ip := pod.Status.PodIP podCtx, cancel := context.WithTimeout(ctx, util.AutoscaleCtxTimeoutSeconds*time.Second) defer cancel() @@ -172,17 +185,19 @@ func (collector *MetricCollector) fetchMetricsFromPods(ctx context.Context, pods return } result := string(bodyStr) - collector.processPrometheusString(result, pastHistogramMap, currentHistogramMap, instanceInfo.MetricsMap) + collector.processPrometheusString(result, pastHistogramMap, pastCounterMap, currentHistogramMap, currentCounterMap, pastScrapeTimestamp, instanceInfo.MetricsMap) (*currentHistograms)[pod.Name] = HistogramInfo{ - PodStartTime: pod.Status.StartTime, - HistogramMap: currentHistogramMap, + PodStartTime: pod.Status.StartTime, + HistogramMap: currentHistogramMap, + CounterMap: currentCounterMap, + ScrapeTimestamp: util.GetCurrentTimestamp(), } }() } return instanceInfo } -func (collector *MetricCollector) processPrometheusString(metricStr string, pastHistograms map[string]*histogram.Snapshot, currentHistograms map[string]*histogram.Snapshot, instanceMetricMap algorithm.Metrics) { +func (collector *MetricCollector) processPrometheusString(metricStr string, pastHistograms map[string]*histogram.Snapshot, pastCounters map[string]float64, currentHistograms map[string]*histogram.Snapshot, currentCounters map[string]float64, pastScrapeTimestamp int64, instanceMetricMap algorithm.Metrics) { reader := strings.NewReader(metricStr) decoder := expfmt.NewDecoder(reader, expfmt.NewFormat(expfmt.TypeTextPlain)) for { @@ -208,7 +223,24 @@ func (collector *MetricCollector) processPrometheusString(metricStr string, past metric := mf.Metric[0] switch mf.GetType() { case io_prometheus_client.MetricType_COUNTER: - addMetric(instanceMetricMap, mf.GetName(), metric.GetCounter().GetValue()) + cur := metric.GetCounter().GetValue() + currentCounters[mf.GetName()] = cur + + rate := 0.0 + now := util.GetCurrentTimestamp() + if pastCounters != nil && pastScrapeTimestamp > 0 { + if prev, ok := pastCounters[mf.GetName()]; ok { + if cur < prev { + rate = 0 + } else { + elapsedSec := float64(now-pastScrapeTimestamp) / 1000.0 + if elapsedSec > 0 { + rate = (cur - prev) / elapsedSec + } + } + } + } + addMetric(instanceMetricMap, mf.GetName(), rate) case io_prometheus_client.MetricType_GAUGE: addMetric(instanceMetricMap, mf.GetName(), metric.GetGauge().GetValue()) case io_prometheus_client.MetricType_HISTOGRAM: diff --git a/pkg/autoscaler/datastructure/sliding_window.go b/pkg/autoscaler/datastructure/sliding_window.go index 0efeacbcd..aa627596f 100644 --- a/pkg/autoscaler/datastructure/sliding_window.go +++ b/pkg/autoscaler/datastructure/sliding_window.go @@ -234,3 +234,19 @@ func (window *SnapshotSlidingWindow[T]) GetLastUnfreshSnapshot() (value T, ok bo } return front.value, true } + +func (window *SnapshotSlidingWindow[T]) GetLastUnfreshSnapshotWithTimestamp() (value T, timestamp int64, ok bool) { + if window.freshMilliseconds == 0 { + return value, 0, false + } + currentTimestamp := window.getCurrentTimestamp() + window.expire(currentTimestamp) + if window.pool.Len() == 0 { + return value, 0, false + } + front := window.pool.Front() + if isFresh(window.freshMilliseconds, currentTimestamp, front.timestamp) { + return value, 0, false + } + return front.value, front.timestamp, true +}