Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve CPU performance measurement precision #132

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/perf/internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func ConstructResult(value model.Value, metric, unit string, podName string) (*R
}
matrix, ok := value.(model.Matrix)
if !ok {
return nil, errors.New("model cannot cast to matrix")
return nil, errors.New("type assertion to matrix failed")
}

for _, m := range matrix {
Expand Down
167 changes: 158 additions & 9 deletions cmd/perf/internal/quantify.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type QuantifyOptions struct {
timeout time.Duration
}

// Duration returns the time difference from startTime to endTime.
func (o *QuantifyOptions) Duration() time.Duration {
return o.endTime.Sub(o.startTime)
}

// NewCmdQuantify creates a cobra command
func NewCmdQuantify() *cobra.Command {
o := QuantifyOptions{}
Expand All @@ -56,7 +61,7 @@ func NewCmdQuantify() *cobra.Command {
Short: "This tool collects CPU & Memory Utilization and time to readiness of MRs metrics of providers and " +
"reports them. When you execute this tool an end-to-end experiment will run.",
Example: "provider-scale --mrs ./internal/providerScale/manifests/virtualnetwork.yaml=2 " +
"--mrs https:... OR ./internal/providerScale/manifests/loadbalancer.yaml=2" +
"--mrs https:... OR ./internal/providerScale/manifests/loadbalancer.yaml=2 " +
"--provider-pods crossplane-provider-jet-azure " +
"--provider-namespace crossplane-system",
RunE: o.Run,
Expand Down Expand Up @@ -113,7 +118,12 @@ func (o *QuantifyOptions) Run(_ *cobra.Command, _ []string) error {
o.endTime = time.Now()
log.Infof("\nExperiment Ended %v\n\n", o.endTime)
log.Infof("Results\n------------------------------------------------------------\n")
log.Infof("Experiment Duration: %f seconds\n", o.endTime.Sub(o.startTime).Seconds())
log.Infof("Experiment Duration: %f seconds\n", o.Duration().Seconds())

// Sleeping here allows Prometheus to scrape at least one more
// sample, which might be used in case there weren't enough samples
// collected during the experiment. See
// getInstantVectorQueryResultSampleByHandlingEmptyResults.
time.Sleep(60 * time.Second)

err := o.processPods(timeToReadinessResults)
Expand All @@ -131,7 +141,7 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er

for _, providerPod := range o.providerPods {
providerPod = strings.TrimSpace(providerPod)
queryResultMemory, err := o.CollectData(fmt.Sprintf(`sum(node_namespace_pod_container:container_memory_working_set_bytes{pod="%s", namespace="%s"})`,
queryResultMemory, err := o.CollectRangeQueryData(fmt.Sprintf(`sum(node_namespace_pod_container:container_memory_working_set_bytes{pod="%s", namespace="%s"})`,
providerPod, o.providerNamespace))
if err != nil {
return errors.Wrap(err, "cannot collect memory data")
Expand All @@ -146,14 +156,33 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er
aggregatedMemoryResult.Peak = memoryResult.Peak
}

queryResultCPURate, err := o.CollectData(fmt.Sprintf(`instance:node_cpu_utilisation:rate5m{instance="%s"} * 100`, o.nodeIP))
// Following matchers for image, job, and metrics_path are copied
// from
// node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate
// recording rule, definition of which can be found at
// /api/v1/rules endpoint of Prometheus instance, e.g.,
// http://localhost:9090/api/v1/rules.
labelMatcher := `image!="", job="kubelet", metrics_path="/metrics/cadvisor", container=~"provider-.*"`
avgCPURateQueryFormat := fmt.Sprintf(`100 * sum(rate(container_cpu_usage_seconds_total{%s}%s))`, labelMatcher, "[%.0fs] @ %d")
avgCPURate, err := o.getInstantVectorQueryResultSampleByHandlingEmptyResults(avgCPURateQueryFormat)
if err != nil {
return errors.Wrap(err, "cannot collect cpu data")
return errors.Wrap(err, "cannot get average cpu rate")
}
cpuRateResult, err := common.ConstructResult(queryResultCPURate, "CPU", "Rate", providerPod)

peakCPURateQueryFormat := fmt.Sprintf(`100 * max_over_time(irate(container_cpu_usage_seconds_total{%s}[5m])%s)`, labelMatcher, "[%.0fs:200ms] @ %d")
peakCPURate, err := o.getInstantVectorQueryResultSampleByHandlingEmptyResults(peakCPURateQueryFormat)
if err != nil {
return errors.Wrap(err, "cannot construct cpu results")
return errors.Wrap(err, "cannot get peak cpu rate ")
}

cpuRateResult := &common.Result{
Metric: "CPU",
MetricUnit: "Rate",
Average: float64(avgCPURate),
Peak: float64(peakCPURate),
PodName: providerPod,
}

// Update aggregated CPU rate result
aggregatedCPURateResult.Average += cpuRateResult.Average
if cpuRateResult.Peak > aggregatedCPURateResult.Peak {
Expand All @@ -175,8 +204,8 @@ func (o *QuantifyOptions) processPods(timeToReadinessResults []common.Result) er
return nil
}

// CollectData sends query and collect data by using the prometheus client
func (o *QuantifyOptions) CollectData(query string) (model.Value, error) {
// CollectRangeQueryData sends a range query and collects data by using the prometheus client
func (o *QuantifyOptions) CollectRangeQueryData(query string) (model.Value, error) {
client, err := common.ConstructPrometheusClient(o.address)
if err != nil {
return nil, errors.Wrap(err, "cannot construct prometheus client")
Expand All @@ -194,3 +223,123 @@ func (o *QuantifyOptions) CollectData(query string) (model.Value, error) {
}
return result, nil
}

// CollectInstantQueryData sends an instant query and collects data by using the prometheus client
func (o *QuantifyOptions) CollectInstantQueryData(query string) (model.Value, error) {
client, err := common.ConstructPrometheusClient(o.address)
if err != nil {
return nil, errors.Wrap(err, "cannot construct prometheus client")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

log.Infof("Executing query: %s", query)
result, warnings, err := client.Query(ctx, query, time.Now())
if err != nil {
return nil, errors.Wrap(err, "cannot construct time range for metrics")
}
if len(warnings) > 0 {
log.Infof("Warnings: %v\n", warnings)
}

return result, nil
}

type emptyQueryResultError struct {
query string
}

func (e *emptyQueryResultError) Error() string {
var query string
if e != nil {
query = e.query
}

return fmt.Sprintf(`empty result returned from query: "%s"`, query)
}

// getInstantVectorQueryResultSample extracts the value from a
// queryResult that holds a single value. It returns an error in case
// queryResult is empty.
func getInstantVectorQueryResultSample(queryResult model.Value) (model.SampleValue, error) {
var zero model.SampleValue

queryResultVector, ok := queryResult.(model.Vector)
if !ok {
return zero, errors.New("type assertion to vector failed")
}

if queryResultVector.Len() == 0 {
return zero, &emptyQueryResultError{}
}

return queryResultVector[0].Value, nil
}

// getInstantVectorQueryResultSample sends a query, which is
// expected to return a single value inside an instant vector, and
// extracts resulting value.
func (o *QuantifyOptions) getInstantVectorQueryResultSample(query string) (model.SampleValue, error) {
var zero model.SampleValue

queryResult, err := o.CollectInstantQueryData(query)
if err != nil {
return zero, errors.Wrap(err, "cannot collect query data")
}

sampleValue, err := getInstantVectorQueryResultSample(queryResult)
if err != nil {
return zero, errors.Wrap(err, "cannot process query result")
}

return sampleValue, nil
}

// getInstantVectorQueryResultSampleByHandlingEmptyResults prepares
// and executes a query, built from queryFormat. queryFormat should
// contain two format verbs: one for a float durationn and one for
// integer timestamp, respectively, values of which are calculated
// from o.
//
// If a query returns empty result, it is likely to be because the
// queried interval doesn't contain enough samples. Query interval is
// expanded, up to a limit, to get a non-empty result. Doing so
// reduces precision of the query, but having an imprecise result is
// better than having no results at all.
func (o *QuantifyOptions) getInstantVectorQueryResultSampleByHandlingEmptyResults(queryFormat string) (model.SampleValue, error) {
var result model.SampleValue
var errEmptyQueryResult *emptyQueryResultError

// Each empty query result causes query duration to be increased by
// durationIncrementStepSeconds, so that query duration is more
// likely to include samples.
durationIncrementStepSeconds := 30
i := 0
maxTries := 10
for ; i < maxTries; i++ {
// Query interval is expanded from both ends, beginning and end.
time := o.endTime.Add(time.Duration(i*durationIncrementStepSeconds/2) * time.Second).Unix()
duration := o.Duration().Seconds() + float64(i*durationIncrementStepSeconds)
query := fmt.Sprintf(queryFormat, duration, time)

var err error
result, err = o.getInstantVectorQueryResultSample(query)
if err == nil {
break
}

errEmptyQueryResult = nil
if errors.As(err, &errEmptyQueryResult) {
errEmptyQueryResult.query = query
log.Warnf("Got empty query result. Retrying with a wider query interval.")
continue
}

return result, errors.Wrapf(err, `cannot get result for query: "%s"`, query)
}

if i == maxTries {
return result, errEmptyQueryResult
}
return result, nil
}