Skip to content

Commit

Permalink
Report container's CPU utilization, rather than node's.
Browse files Browse the repository at this point in the history
So far, we've been using node_exporter's metrics, which reported
metrics per node. We switch to using cadvisor's per container metrics,
which measure provider performance in isolation.

So far, we've been reporting CPU utilization as a percentage of total
CPU resources. 100% CPU utilization on an 8-CPU machine meant all
cores were fully utilized. We switch to reporting CPU utilization as a
percentage of one CPU. Full utilization of an 8-CPU machine is now
reported as 800%.

Signed-off-by: Cem Mergenci <[email protected]>
  • Loading branch information
mergenci committed Sep 6, 2023
1 parent 066f909 commit 7a82110
Showing 1 changed file with 151 additions and 7 deletions.
158 changes: 151 additions & 7 deletions cmd/perf/internal/quantify.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (o *QuantifyOptions) Run(_ *cobra.Command, _ []string) error {
log.Infof("\nExperiment Ended %v\n\n", o.endTime)
log.Infof("Results\n------------------------------------------------------------\n")
log.Infof("Experiment Duration: %f seconds\n", o.Duration().Seconds())

// Sleeping here allows Prometheus to scrape at least one more
// sample, which might be used in case there weren't enough samples
// collected during the experiment. See
// getInstantVectorQueryResultSampleByHandlingEmptyResults.
time.Sleep(60 * time.Second)

err := o.processPods(timeToReadinessResults)
Expand All @@ -136,7 +141,7 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er

for _, providerPod := range o.providerPods {
providerPod = strings.TrimSpace(providerPod)
queryResultMemory, err := o.CollectData(fmt.Sprintf(`sum(node_namespace_pod_container:container_memory_working_set_bytes{pod="%s", namespace="%s"})`,
queryResultMemory, err := o.CollectRangeQueryData(fmt.Sprintf(`sum(node_namespace_pod_container:container_memory_working_set_bytes{pod="%s", namespace="%s"})`,
providerPod, o.providerNamespace))
if err != nil {
return errors.Wrap(err, "cannot collect memory data")
Expand All @@ -151,14 +156,33 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er
aggregatedMemoryResult.Peak = memoryResult.Peak
}

queryResultCPURate, err := o.CollectData(fmt.Sprintf(`instance:node_cpu_utilisation:rate5m{instance="%s"} * 100`, o.nodeIP))
// Following matchers for image, job, and metrics_path are copied
// from
// node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate
// recording rule, definition of which can be found at
// /api/v1/rules endpoint of Prometheus instance, e.g.,
// http://localhost:9090/api/v1/rules.
labelMatcher := `image!="", job="kubelet", metrics_path="/metrics/cadvisor", container=~"provider-.*"`
avgCPURateQueryFormat := fmt.Sprintf(`100 * sum(rate(container_cpu_usage_seconds_total{%s}%s))`, labelMatcher, "[%.0fs] @ %d")
avgCPURate, err := o.getInstantVectorQueryResultSampleByHandlingEmptyResults(avgCPURateQueryFormat)
if err != nil {
return errors.Wrap(err, "cannot collect cpu data")
return errors.Wrap(err, "cannot get average cpu rate")
}
cpuRateResult, err := common.ConstructResult(queryResultCPURate, "CPU", "Rate", providerPod)

peakCPURateQueryFormat := fmt.Sprintf(`100 * max_over_time(irate(container_cpu_usage_seconds_total{%s}[5m])%s)`, labelMatcher, "[%.0fs:200ms] @ %d")
peakCPURate, err := o.getInstantVectorQueryResultSampleByHandlingEmptyResults(peakCPURateQueryFormat)
if err != nil {
return errors.Wrap(err, "cannot construct cpu results")
return errors.Wrap(err, "cannot get peak cpu rate ")
}

cpuRateResult := &common.Result{
Metric: "CPU",
MetricUnit: "Rate",
Average: float64(avgCPURate),
Peak: float64(peakCPURate),
PodName: providerPod,
}

// Update aggregated CPU rate result
aggregatedCPURateResult.Average += cpuRateResult.Average
if cpuRateResult.Peak > aggregatedCPURateResult.Peak {
Expand All @@ -180,8 +204,8 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er
return nil
}

// CollectData sends query and collect data by using the prometheus client
func (o *QuantifyOptions) CollectData(query string) (model.Value, error) {
// CollectRangeQueryData sends a range query and collects data by using the prometheus client
func (o *QuantifyOptions) CollectRangeQueryData(query string) (model.Value, error) {
client, err := common.ConstructPrometheusClient(o.address)
if err != nil {
return nil, errors.Wrap(err, "cannot construct prometheus client")
Expand All @@ -199,3 +223,123 @@ func (o *QuantifyOptions) CollectData(query string) (model.Value, error) {
}
return result, nil
}

// CollectInstantQueryData sends an instant query and collects data by using the prometheus client
func (o *QuantifyOptions) CollectInstantQueryData(query string) (model.Value, error) {
client, err := common.ConstructPrometheusClient(o.address)
if err != nil {
return nil, errors.Wrap(err, "cannot construct prometheus client")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

log.Infof("Executing query: %s", query)
result, warnings, err := client.Query(ctx, query, time.Now())
if err != nil {
return nil, errors.Wrap(err, "cannot construct time range for metrics")
}
if len(warnings) > 0 {
log.Infof("Warnings: %v\n", warnings)
}

return result, nil
}

type emptyQueryResultError struct {
query string
}

func (e *emptyQueryResultError) Error() string {
var query string
if e != nil {
query = e.query
}

return fmt.Sprintf(`empty result returned from query: "%s"`, query)
}

// getInstantVectorQueryResultSample extracts the value from a
// queryResult that holds a single value. It returns an error in case
// queryResult is empty.
func getInstantVectorQueryResultSample(queryResult model.Value) (model.SampleValue, error) {
var zero model.SampleValue

queryResultVector, ok := queryResult.(model.Vector)
if !ok {
return zero, errors.New("type assertion to vector failed")
}

if queryResultVector.Len() == 0 {
return zero, &emptyQueryResultError{}
}

return queryResultVector[0].Value, nil
}

// getInstantVectorQueryResultSample sends a query, which is
// expected to return a single value inside an instant vector, and
// extracts resulting value.
func (o *QuantifyOptions) getInstantVectorQueryResultSample(query string) (model.SampleValue, error) {
var zero model.SampleValue

queryResult, err := o.CollectInstantQueryData(query)
if err != nil {
return zero, errors.Wrap(err, "cannot collect query data")
}

sampleValue, err := getInstantVectorQueryResultSample(queryResult)
if err != nil {
return zero, errors.Wrap(err, "cannot process query result")
}

return sampleValue, nil
}

// getInstantVectorQueryResultSampleByHandlingEmptyResults prepares
// and executes a query, built from queryFormat. queryFormat should
// contain two format verbs: one for a float durationn and one for
// integer timestamp, respectively, values of which are calculated
// from o.
//
// If a query returns empty result, it is likely to be because the
// queried interval doesn't contain enough samples. Query interval is
// expanded, up to a limit, to get a non-empty result. Doing so
// reduces precision of the query, but having an imprecise result is
// better than having no results at all.
func (o *QuantifyOptions) getInstantVectorQueryResultSampleByHandlingEmptyResults(queryFormat string) (model.SampleValue, error) {
var result model.SampleValue
var errEmptyQueryResult *emptyQueryResultError

// Each empty query result causes query duration to be increased by
// durationIncrementStepSeconds, so that query duration is more
// likely to include samples.
durationIncrementStepSeconds := 30
i := 0
maxTries := 10
for ; i < maxTries; i++ {
// Query interval is expanded from both ends, beginning and end.
time := o.endTime.Add(time.Duration(i*durationIncrementStepSeconds/2) * time.Second).Unix()
duration := o.Duration().Seconds() + float64(i*durationIncrementStepSeconds)
query := fmt.Sprintf(queryFormat, duration, time)

var err error
result, err = o.getInstantVectorQueryResultSample(query)
if err == nil {
break
}

errEmptyQueryResult = nil
if errors.As(err, &errEmptyQueryResult) {
errEmptyQueryResult.query = query
log.Warnf("Got empty query result. Retrying with a wider query interval.")
continue
}

return result, errors.Wrapf(err, `cannot get result for query: "%s"`, query)
}

if i == maxTries {
return result, errEmptyQueryResult
}
return result, nil
}

0 comments on commit 7a82110

Please sign in to comment.