Skip to content

Commit

Permalink
fix(parca-dev#2598): querying non-range profile w/ buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
albertlockett committed Sep 8, 2023
1 parent 24af885 commit 0977b4e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/polarsignals/frostdb => /home/albertlockett/Development/arcticdb
22 changes: 19 additions & 3 deletions pkg/parcacol/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ const (
ColumnPeriodSum = "sum(" + profile.ColumnPeriod + ")"
ColumnValueCount = "count(" + profile.ColumnValue + ")"
ColumnValueSum = "sum(" + profile.ColumnValue + ")"
ColumnValueFirst = "first(" + profile.ColumnValue + ")"
)

func (q *Querier) queryRangeDelta(ctx context.Context, filterExpr logicalplan.Expr, step time.Duration, sampleTypeUnit string) ([]*pb.MetricsSeries, error) {
Expand Down Expand Up @@ -531,17 +532,27 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan
Filter(filterExpr).
Aggregate(
[]logicalplan.Expr{
logicalplan.Sum(logicalplan.Col(profile.ColumnValue)),
logicalplan.Sum(logicalplan.Col(profile.ColumnValue)).Alias(ColumnValueFirst),
},
[]logicalplan.Expr{
logicalplan.DynCol(profile.ColumnLabels),
logicalplan.Col(profile.ColumnTimestamp),
},
).
Aggregate(
[]logicalplan.Expr{
logicalplan.Take(logicalplan.Col(profile.ColumnValue), 1).Alias(ColumnValueFirst),
},
[]logicalplan.Expr{
logicalplan.DynCol(profile.ColumnLabels),
logicalplan.Duration(1000 * time.Millisecond),
},
).
Execute(ctx, func(ctx context.Context, r arrow.Record) error {
r.Retain()
records = append(records, r)
rows += int(r.NumRows())
fmt.Printf("%v\n", r)
return nil
})
if err != nil {
Expand All @@ -561,7 +572,7 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan
// Add necessary columns and their found value is false by default.
columnIndices := map[string]columnIndex{
profile.ColumnTimestamp: {},
ColumnValueSum: {},
ColumnValueFirst: {},
}
labelColumnIndices := []int{}
labelSet := labels.Labels{}
Expand Down Expand Up @@ -623,7 +634,11 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan
}

ts := ar.Column(columnIndices[profile.ColumnTimestamp].index).(*array.Int64).Value(i)
value := ar.Column(columnIndices[ColumnValueSum].index).(*array.Int64).Value(i)

// value := ar.Column(columnIndices[ColumnValueFirst].index).(*array.Int64).Value(i)
valueList := ar.Column(columnIndices[ColumnValueFirst].index).(*array.List)
start, _ := valueList.ValueOffsets(i)
value := valueList.ListValues().(*array.Int64).Value(int(start))

// Each step bucket will only return one of the timestamps and its value.
// For this reason we'll take each timestamp and divide it by the step seconds.
Expand All @@ -634,6 +649,7 @@ func (q *Querier) queryRangeNonDelta(ctx context.Context, filterExpr logicalplan
// This needs to be moved to FrostDB to not even query all of this data in the first place.
// With a scrape interval of 10s and a query range of 1d we'd query 8640 samples and at most return 960.
// Even worse for a week, we'd query 60480 samples and only return 1000.

tsBucket := ts / 1000 / int64(step.Seconds())
if _, found := resSeriesBuckets[index][tsBucket]; found {
// We already have a MetricsSample for this timestamp bucket, ignore it.
Expand Down
Binary file added pkg/symbolizer/__debug_bin3784063650
Binary file not shown.

0 comments on commit 0977b4e

Please sign in to comment.