diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index a4ccfd047a..00b287ce8e 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -2,20 +2,19 @@ package querier import ( "context" - "encoding/json" "fmt" "strings" "sync" - "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" - "go.signoz.io/signoz/pkg/query-service/cache/status" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/postprocess" + "go.signoz.io/signoz/pkg/query-service/querycache" "go.uber.org/zap" ) @@ -107,7 +106,8 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceLogs { var query string var err error - if _, ok := cacheKeys[queryName]; !ok { + if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -118,21 +118,11 @@ func (q *querier) runBuilderQuery( return } - cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache && q.cache != nil { - var retrieveStatus status.RetrieveStatus - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } - } - misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -147,48 +137,23 @@ func (q *querier) runBuilderQuery( } return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries - } - - var mergedSeriesData []byte - var marshallingErr error - missedSeriesLen := len(missedSeries) - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { - // caching the data - mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) - if marshallingErr != nil { - zap.L().Error("error marshalling merged series", zap.Error(marshallingErr)) - } + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Start: miss.Start, + End: miss.End, + Data: series, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries) - // response doesn't need everything - filterCachedPoints(mergedSeries, start, end) + resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) ch <- channelResult{ Err: nil, Name: queryName, - Series: mergedSeries, - } - - // Cache the seriesList for future queries - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { - // caching the data - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + Series: resultSeries, } return - } if builderQuery.DataSource == v3.DataSourceTraces { @@ -242,7 +207,8 @@ func (q *querier) runBuilderQuery( // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. - if _, ok := cacheKeys[queryName]; !ok { + if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -254,22 +220,13 @@ func (q *querier) runBuilderQuery( } cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache && q.cache != nil { - var retrieveStatus status.RetrieveStatus - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } - } - misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKey) + zap.L().Info("cache misses for metrics query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err := metricsV3.PrepareMetricQuery( - miss.start, - miss.end, + miss.Start, + miss.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, @@ -294,41 +251,20 @@ func (q *querier) runBuilderQuery( } return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries - } - var mergedSeriesData []byte - var marshallingErr error - missedSeriesLen := len(missedSeries) - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { - // caching the data - mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) - if marshallingErr != nil { - zap.L().Error("error marshalling merged series", zap.Error(marshallingErr)) - } + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Start: miss.Start, + End: miss.End, + Data: series, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries) + + resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) - // response doesn't need everything - filterCachedPoints(mergedSeries, start, end) ch <- channelResult{ Err: nil, Name: queryName, - Series: mergedSeries, - } - - // Cache the seriesList for future queries - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { - err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + Series: resultSeries, } } @@ -350,7 +286,8 @@ func (q *querier) runBuilderExpression( return } - if _, ok := cacheKeys[queryName]; !ok { + if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for expression query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query := queries[queryName] series, err := q.execClickHouseQuery(ctx, query) ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} @@ -358,23 +295,14 @@ func (q *querier) runBuilderExpression( } cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache && q.cache != nil { - var retrieveStatus status.RetrieveStatus - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } - } step := postprocess.StepIntervalForFunction(params, queryName) - misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, step, cacheKey) + zap.L().Info("cache misses for expression query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{ - Start: miss.start, - End: miss.end, + Start: miss.Start, + End: miss.End, Step: params.Step, NoCache: params.NoCache, CompositeQuery: params.CompositeQuery, @@ -386,41 +314,19 @@ func (q *querier) runBuilderExpression( ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Start: miss.Start, + End: miss.End, + Data: series, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries) - var mergedSeriesData []byte - missedSeriesLen := len(missedSeries) - var marshallingErr error - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { - // caching the data - mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) - if marshallingErr != nil { - zap.L().Error("error marshalling merged series", zap.Error(marshallingErr)) - } - } + resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End) - // response doesn't need everything - filterCachedPoints(mergedSeries, params.Start, params.End) ch <- channelResult{ Err: nil, Name: queryName, - Series: mergedSeries, - } - - // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + Series: resultSeries, } } diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 50ef63394a..fd7198b334 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -2,11 +2,7 @@ package querier import ( "context" - "encoding/json" "fmt" - "math" - "sort" - "strings" "sync" "time" @@ -15,7 +11,9 @@ import ( metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + "go.signoz.io/signoz/pkg/query-service/common" chErrors "go.signoz.io/signoz/pkg/query-service/errors" + "go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/cache" @@ -34,14 +32,11 @@ type channelResult struct { Query string } -type missInterval struct { - start, end int64 // in milliseconds -} - type querier struct { cache cache.Cache reader interfaces.Reader keyGenerator cache.KeyGenerator + queryCache interfaces.QueryCache fluxInterval time.Duration @@ -80,8 +75,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { logsQueryBuilder = logsV4.PrepareLogsQuery } + qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) + return &querier{ cache: opts.Cache, + queryCache: qc, reader: opts.Reader, keyGenerator: opts.KeyGenerator, fluxInterval: opts.FluxInterval, @@ -154,156 +152,6 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar return seriesList, nil } -// findMissingTimeRanges finds the missing time ranges in the seriesList -// and returns a list of miss structs, It takes the fluxInterval into -// account to find the missing time ranges. -// -// The [End - fluxInterval, End] is always added to the list of misses, because -// the data might still be in flux and not yet available in the database. -// -// replaceCacheData is used to indicate if the cache data should be replaced instead of merging -// with the new data -// TODO: Remove replaceCacheData with a better logic -func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) { - replaceCacheData = false - var cachedStart, cachedEnd int64 - for idx := range seriesList { - series := seriesList[idx] - for pointIdx := range series.Points { - point := series.Points[pointIdx] - if cachedStart == 0 || point.Timestamp < cachedStart { - cachedStart = point.Timestamp - } - if cachedEnd == 0 || point.Timestamp > cachedEnd { - cachedEnd = point.Timestamp - } - } - } - - // time.Now is used because here we are considering the case where data might not - // be fully ingested for last (fluxInterval) minutes - endMillis := time.Now().UnixMilli() - adjustStep := int64(math.Min(float64(step), 60)) - roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) - - // Exclude the flux interval from the cached end time - cachedEnd = int64( - math.Min( - float64(cachedEnd), - float64(roundedMillis-fluxInterval.Milliseconds()), - ), - ) - - // There are five cases to consider - // 1. Cached time range is a subset of the requested time range - // 2. Cached time range is a superset of the requested time range - // 3. Cached time range is a left overlap of the requested time range - // 4. Cached time range is a right overlap of the requested time range - // 5. Cached time range is a disjoint of the requested time range - if cachedStart >= start && cachedEnd <= end { - // Case 1: Cached time range is a subset of the requested time range - // Add misses for the left and right sides of the cached time range - misses = append(misses, missInterval{start: start, end: cachedStart - 1}) - misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) - } else if cachedStart <= start && cachedEnd >= end { - // Case 2: Cached time range is a superset of the requested time range - // No misses - } else if cachedStart <= start && cachedEnd >= start { - // Case 3: Cached time range is a left overlap of the requested time range - // Add a miss for the left side of the cached time range - misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) - } else if cachedStart <= end && cachedEnd >= end { - // Case 4: Cached time range is a right overlap of the requested time range - // Add a miss for the right side of the cached time range - misses = append(misses, missInterval{start: start, end: cachedStart - 1}) - } else { - // Case 5: Cached time range is a disjoint of the requested time range - // Add a miss for the entire requested time range - misses = append(misses, missInterval{start: start, end: end}) - replaceCacheData = true - } - - // remove the struts with start > end - var validMisses []missInterval - for idx := range misses { - miss := misses[idx] - if miss.start < miss.end { - validMisses = append(validMisses, miss) - } - } - return validMisses, replaceCacheData -} - -// findMissingTimeRanges finds the missing time ranges in the cached data -// and returns them as a list of misses -func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) { - var cachedSeriesList []*v3.Series - if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil { - // In case of error, we return the entire range as a miss - return []missInterval{{start: start, end: end}}, true - } - return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval) -} - -func labelsToString(labels map[string]string) string { - type label struct { - Key string - Value string - } - var labelsList []label - for k, v := range labels { - labelsList = append(labelsList, label{Key: k, Value: v}) - } - sort.Slice(labelsList, func(i, j int) bool { - return labelsList[i].Key < labelsList[j].Key - }) - labelKVs := make([]string, len(labelsList)) - for idx := range labelsList { - labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value - } - return fmt.Sprintf("{%s}", strings.Join(labelKVs, ",")) -} - -func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) { - for _, c := range cachedSeries { - points := []v3.Point{} - for _, p := range c.Points { - if (p.Timestamp < start || p.Timestamp > end) && p.Timestamp != 0 { - continue - } - points = append(points, p) - } - c.Points = points - } -} - -func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { - // Merge the missed series with the cached series by timestamp - mergedSeries := make([]*v3.Series, 0) - seriesesByLabels := make(map[string]*v3.Series) - for idx := range cachedSeries { - series := cachedSeries[idx] - seriesesByLabels[labelsToString(series.Labels)] = series - } - - for idx := range missedSeries { - series := missedSeries[idx] - if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok { - seriesesByLabels[labelsToString(series.Labels)] = series - continue - } - seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) - } - // Sort the points in each series by timestamp - for idx := range seriesesByLabels { - series := seriesesByLabels[idx] - series.SortPoints() - series.RemoveDuplicatePoints() - mergedSeries = append(mergedSeries, series) - } - return mergedSeries -} - func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { cacheKeys := q.keyGenerator.GenerateKeys(params) @@ -363,51 +211,34 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam go func(queryName string, promQuery *v3.PromQuery) { defer wg.Done() cacheKey, ok := cacheKeys[queryName] - var cachedData []byte - // Ensure NoCache is not set and cache is not nil - if !params.NoCache && q.cache != nil && ok { - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } + + if !ok || params.NoCache { + zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) + query := metricsV3.BuildPromQuery(promQuery, params.Step, params.Start, params.End) + series, err := q.execPromQuery(ctx, query) + channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series} + return } - misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey) + zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) + query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End) series, err := q.execPromQuery(ctx, query) if err != nil { channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - // ideally we should not be getting an error here - zap.L().Error("error unmarshalling cached data", zap.Error(err)) + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Data: series, + Start: miss.Start, + End: miss.End, + }) } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries - } - - channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries) + resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End) + channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries} - // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.L().Error("error marshalling merged series", zap.Error(err)) - return - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } - } }(queryName, promQuery) } wg.Wait() diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index a4814d0c0a..c30841546e 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -2,7 +2,9 @@ package querier import ( "context" + "encoding/json" "fmt" + "math" "strings" "testing" "time" @@ -11,8 +13,33 @@ import ( tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" ) +func minTimestamp(series []*v3.Series) int64 { + min := int64(math.MaxInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp < min { + min = point.Timestamp + } + } + } + return min +} + +func maxTimestamp(series []*v3.Series) int64 { + max := int64(math.MinInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp > max { + max = point.Timestamp + } + } + } + return max +} + func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range @@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { requestedEnd int64 // in milliseconds requestedStep int64 // in seconds cachedSeries []*v3.Series - expectedMiss []missInterval + expectedMiss []querycache.MissInterval replaceCachedData bool }{ { @@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cached time range is a left overlap of the requested time range", @@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, replaceCachedData: true, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } - if replaceCachedData != tc.replaceCachedData { - t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) - } + for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -226,7 +270,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { requestedStep int64 cachedSeries []*v3.Series fluxInterval time.Duration - expectedMiss []missInterval + expectedMiss []querycache.MissInterval }{ { name: "cached time range is a subset of the requested time range", @@ -251,14 +295,14 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -293,7 +337,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cache time range is a left overlap of the requested time range", @@ -322,10 +366,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -356,10 +400,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -390,27 +434,45 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -1022,18 +1084,18 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { expectedQueryAndTimeRanges := []struct { query string - ranges []missInterval + ranges []querycache.MissInterval }{ { query: "signoz_calls_total", - ranges: []missInterval{ - {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722, End: 1675115596722 + 120*60*1000}, }, }, { query: "signoz_latency_bucket", - ranges: []missInterval{ - {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000}, }, }, } @@ -1054,10 +1116,10 @@ func TestQueryRangeValueTypePromQL(t *testing.T) { if len(q.TimeRanges()[i]) != 2 { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index f1dd33c4e6..bb41bc8c36 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -2,20 +2,19 @@ package v2 import ( "context" - "encoding/json" "fmt" "strings" "sync" - "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" - "go.signoz.io/signoz/pkg/query-service/cache/status" + "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" "go.uber.org/zap" ) @@ -108,7 +107,8 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceLogs { var query string var err error - if _, ok := cacheKeys[queryName]; !ok { + if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -118,21 +118,11 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} return } - cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache && q.cache != nil { - var retrieveStatus status.RetrieveStatus - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } - } - misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) + query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return @@ -147,43 +137,20 @@ func (q *querier) runBuilderQuery( } return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries - } - var mergedSeriesData []byte - var marshallingErr error - missedSeriesLen := len(missedSeries) - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { - // caching the data - mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) - if marshallingErr != nil { - zap.L().Error("error marshalling merged series", zap.Error(marshallingErr)) - } + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Data: series, + Start: miss.Start, + End: miss.End, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries) - // response doesn't need everything - filterCachedPoints(mergedSeries, start, end) + resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) ch <- channelResult{ Err: nil, Name: queryName, - Series: mergedSeries, - } - - // Cache the seriesList for future queries - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { - // caching the data - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + Series: resultSeries, } return @@ -240,7 +207,8 @@ func (q *querier) runBuilderQuery( // What is happening here? // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // If the query is not cached, we execute the query and return the result without caching it. - if _, ok := cacheKeys[queryName]; !ok { + if _, ok := cacheKeys[queryName]; !ok || params.NoCache { + zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} @@ -251,23 +219,13 @@ func (q *querier) runBuilderQuery( return } - cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache && q.cache != nil { - var retrieveStatus status.RetrieveStatus - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } - } - misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + zap.L().Info("cache misses for metrics query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err := metricsV4.PrepareMetricQuery( - miss.start, - miss.end, + miss.Start, + miss.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, @@ -292,41 +250,19 @@ func (q *querier) runBuilderQuery( } return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Data: series, + Start: miss.Start, + End: miss.End, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries) - var mergedSeriesData []byte - var marshallingErr error - missedSeriesLen := len(missedSeries) - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { - // caching the data - mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) - if marshallingErr != nil { - zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) - } - } - - // response doesn't need everything - filterCachedPoints(mergedSeries, start, end) + resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) ch <- channelResult{ Err: nil, Name: queryName, - Series: mergedSeries, - } - // Cache the seriesList for future queries - if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { - err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + Series: resultSeries, } } diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index b71a8cc0cc..f8316d6f6c 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -2,11 +2,7 @@ package v2 import ( "context" - "encoding/json" "fmt" - "math" - "sort" - "strings" "sync" "time" @@ -15,7 +11,9 @@ import ( metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + "go.signoz.io/signoz/pkg/query-service/common" chErrors "go.signoz.io/signoz/pkg/query-service/errors" + "go.signoz.io/signoz/pkg/query-service/querycache" "go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/cache" @@ -34,14 +32,11 @@ type channelResult struct { Query string } -type missInterval struct { - start, end int64 // in milliseconds -} - type querier struct { cache cache.Cache reader interfaces.Reader keyGenerator cache.KeyGenerator + queryCache interfaces.QueryCache fluxInterval time.Duration @@ -79,8 +74,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { logsQueryBuilder = logsV4.PrepareLogsQuery } + qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) + return &querier{ cache: opts.Cache, + queryCache: qc, reader: opts.Reader, keyGenerator: opts.KeyGenerator, fluxInterval: opts.FluxInterval, @@ -157,167 +155,6 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar return seriesList, nil } -// findMissingTimeRanges finds the missing time ranges in the seriesList -// and returns a list of miss structs, It takes the fluxInterval into -// account to find the missing time ranges. -// -// The [End - fluxInterval, End] is always added to the list of misses, because -// the data might still be in flux and not yet available in the database. -// -// replaceCacheData is used to indicate if the cache data should be replaced instead of merging -// with the new data -// TODO: Remove replaceCacheData with a better logic -func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) { - replaceCacheData = false - var cachedStart, cachedEnd int64 - for idx := range seriesList { - series := seriesList[idx] - for pointIdx := range series.Points { - point := series.Points[pointIdx] - if cachedStart == 0 || point.Timestamp < cachedStart { - cachedStart = point.Timestamp - } - if cachedEnd == 0 || point.Timestamp > cachedEnd { - cachedEnd = point.Timestamp - } - } - } - - // time.Now is used because here we are considering the case where data might not - // be fully ingested for last (fluxInterval) minutes - endMillis := time.Now().UnixMilli() - adjustStep := int64(math.Min(float64(step), 60)) - roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) - - // Exclude the flux interval from the cached end time - cachedEnd = int64( - math.Min( - float64(cachedEnd), - float64(roundedMillis-fluxInterval.Milliseconds()), - ), - ) - - // There are five cases to consider - // 1. Cached time range is a subset of the requested time range - // 2. Cached time range is a superset of the requested time range - // 3. Cached time range is a left overlap of the requested time range - // 4. Cached time range is a right overlap of the requested time range - // 5. Cached time range is a disjoint of the requested time range - if cachedStart >= start && cachedEnd <= end { - // Case 1: Cached time range is a subset of the requested time range - // Add misses for the left and right sides of the cached time range - misses = append(misses, missInterval{start: start, end: cachedStart - 1}) - misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) - } else if cachedStart <= start && cachedEnd >= end { - // Case 2: Cached time range is a superset of the requested time range - // No misses - } else if cachedStart <= start && cachedEnd >= start { - // Case 3: Cached time range is a left overlap of the requested time range - // Add a miss for the left side of the cached time range - misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) - } else if cachedStart <= end && cachedEnd >= end { - // Case 4: Cached time range is a right overlap of the requested time range - // Add a miss for the right side of the cached time range - misses = append(misses, missInterval{start: start, end: cachedStart - 1}) - } else { - // Case 5: Cached time range is a disjoint of the requested time range - // Add a miss for the entire requested time range - misses = append(misses, missInterval{start: start, end: end}) - replaceCacheData = true - } - - // remove the struts with start > end - var validMisses []missInterval - for idx := range misses { - miss := misses[idx] - if miss.start < miss.end { - validMisses = append(validMisses, miss) - } - } - return validMisses, replaceCacheData -} - -// findMissingTimeRanges finds the missing time ranges in the cached data -// and returns them as a list of misses -func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) { - var cachedSeriesList []*v3.Series - if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil { - // In case of error, we return the entire range as a miss - return []missInterval{{start: start, end: end}}, true - } - return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval) -} - -// labelsToString converts the labels map to a string -// sorted by key so that the string is consistent -// across different runs -func labelsToString(labels map[string]string) string { - type label struct { - Key string - Value string - } - var labelsList []label - for k, v := range labels { - labelsList = append(labelsList, label{Key: k, Value: v}) - } - sort.Slice(labelsList, func(i, j int) bool { - return labelsList[i].Key < labelsList[j].Key - }) - labelKVs := make([]string, len(labelsList)) - for idx := range labelsList { - labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value - } - return fmt.Sprintf("{%s}", strings.Join(labelKVs, ",")) -} - -// filterCachedPoints filters the points in the series list -// that are outside the start and end time range -// and returns the filtered series list -// TODO(srikanthccv): is this really needed? -func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) { - for _, c := range cachedSeries { - points := []v3.Point{} - for _, p := range c.Points { - if (p.Timestamp < start || p.Timestamp > end) && p.Timestamp != 0 { - continue - } - points = append(points, p) - } - c.Points = points - } -} - -// mergeSerieses merges the cached series and the missed series -// and returns the merged series list -func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { - // Merge the missed series with the cached series by timestamp - mergedSeries := make([]*v3.Series, 0) - seriesesByLabels := make(map[string]*v3.Series) - for idx := range cachedSeries { - series := cachedSeries[idx] - seriesesByLabels[labelsToString(series.Labels)] = series - } - - for idx := range missedSeries { - series := missedSeries[idx] - if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok { - seriesesByLabels[labelsToString(series.Labels)] = series - continue - } - seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) - } - - // Sort the points in each series by timestamp - // and remove duplicate points - for idx := range seriesesByLabels { - series := seriesesByLabels[idx] - series.SortPoints() - series.RemoveDuplicatePoints() - mergedSeries = append(mergedSeries, series) - } - return mergedSeries -} - func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { cacheKeys := q.keyGenerator.GenerateKeys(params) @@ -372,50 +209,33 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam go func(queryName string, promQuery *v3.PromQuery) { defer wg.Done() cacheKey, ok := cacheKeys[queryName] - var cachedData []byte - // Ensure NoCache is not set and cache is not nil - if !params.NoCache && q.cache != nil && ok { - data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err == nil { - cachedData = data - } + + if !ok || params.NoCache { + zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName])) + query := metricsV4.BuildPromQuery(promQuery, params.Step, params.Start, params.End) + series, err := q.execPromQuery(ctx, query) + channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series} + return } - misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) + misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey) + zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses)) + missedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { - query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) + query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End) series, err := q.execPromQuery(ctx, query) if err != nil { channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} return } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - // ideally we should not be getting an error here - zap.L().Error("error unmarshalling cached data", zap.Error(err)) - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - if replaceCachedData { - mergedSeries = missedSeries - } - channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} - - // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.L().Error("error marshalling merged series", zap.Error(err)) - return - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - zap.L().Error("error storing merged series", zap.Error(err)) - return - } + missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Data: series, + Start: miss.Start, + End: miss.End, + }) } + mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries) + resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End) + channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries} }(queryName, promQuery) } wg.Wait() diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index c65b6ff54a..6dfc921183 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -2,7 +2,9 @@ package v2 import ( "context" + "encoding/json" "fmt" + "math" "strings" "testing" "time" @@ -11,9 +13,34 @@ import ( tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache/inmemory" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" ) -func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { +func minTimestamp(series []*v3.Series) int64 { + min := int64(math.MaxInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp < min { + min = point.Timestamp + } + } + } + return min +} + +func maxTimestamp(series []*v3.Series) int64 { + max := int64(math.MinInt64) + for _, series := range series { + for _, point := range series.Points { + if point.Timestamp > max { + max = point.Timestamp + } + } + } + return max +} + +func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range // 2. Cached time range is a superset of the requested time range @@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { requestedEnd int64 // in milliseconds requestedStep int64 // in seconds cachedSeries []*v3.Series - expectedMiss []missInterval + expectedMiss []querycache.MissInterval replaceCachedData bool }{ { @@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cached time range is a left overlap of the requested time range", @@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { }, }, }, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, replaceCachedData: true, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + } + + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } - if replaceCachedData != tc.replaceCachedData { - t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) - } + for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -226,7 +270,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { requestedStep int64 cachedSeries []*v3.Series fluxInterval time.Duration - expectedMiss []missInterval + expectedMiss []querycache.MissInterval }{ { name: "cached time range is a subset of the requested time range", @@ -251,14 +295,14 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -293,7 +337,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{}, + expectedMiss: []querycache.MissInterval{}, }, { name: "cache time range is a left overlap of the requested time range", @@ -322,10 +366,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722 + 120*60*1000 + 1, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722 + 120*60*1000, + End: 1675115596722 + 180*60*1000, }, }, }, @@ -356,10 +400,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 60*60*1000 - 1, + Start: 1675115596722, + End: 1675115596722 + 60*60*1000, }, }, }, @@ -390,27 +434,47 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { }, }, fluxInterval: 5 * time.Minute, - expectedMiss: []missInterval{ + expectedMiss: []querycache.MissInterval{ { - start: 1675115596722, - end: 1675115596722 + 180*60*1000, + Start: 1675115596722, + End: 1675115596722 + 180*60*1000, }, }, }, } - for _, tc := range testCases { + c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + qc := querycache.NewQueryCache(querycache.WithCache(c)) + + for idx, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + cacheKey := fmt.Sprintf("test-cache-key-%d", idx) + cachedData := &querycache.CachedSeriesData{ + Start: minTimestamp(tc.cachedSeries), + End: maxTimestamp(tc.cachedSeries), + Data: tc.cachedSeries, + } + jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData}) + if err != nil { + t.Errorf("error marshalling cached data: %v", err) + return + } + err = c.Store(cacheKey, jsonData, 5*time.Minute) + if err != nil { + t.Errorf("error storing cached data: %v", err) + return + } + misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } for i, miss := range misses { - if miss.start != tc.expectedMiss[i].start { - t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + if miss.Start != tc.expectedMiss[i].Start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start) } - if miss.end != tc.expectedMiss[i].end { - t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + if miss.End != tc.expectedMiss[i].End { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End) } } }) @@ -1074,18 +1138,18 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { expectedQueryAndTimeRanges := []struct { query string - ranges []missInterval + ranges []querycache.MissInterval }{ { query: "signoz_calls_total", - ranges: []missInterval{ - {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722, End: 1675115596722 + 120*60*1000}, }, }, { query: "signoz_latency_bucket", - ranges: []missInterval{ - {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + ranges: []querycache.MissInterval{ + {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000}, }, }, } @@ -1106,10 +1170,10 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) { if len(q.TimeRanges()[i]) != 2 { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } - if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) { t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) } } diff --git a/pkg/query-service/common/query_range.go b/pkg/query-service/common/query_range.go index c352c7d9f2..d6b62baf27 100644 --- a/pkg/query-service/common/query_range.go +++ b/pkg/query-service/common/query_range.go @@ -6,6 +6,8 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" + "go.signoz.io/signoz/pkg/query-service/utils/labels" ) func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) { @@ -70,3 +72,35 @@ func LCMList(nums []int64) int64 { } return result } + +func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series { + series := make(map[uint64]*v3.Series) + + for _, cachedData := range data { + for _, data := range cachedData.Data { + h := labels.FromMap(data.Labels).Hash() + + if _, ok := series[h]; !ok { + series[h] = &v3.Series{ + Labels: data.Labels, + LabelsArray: data.LabelsArray, + Points: make([]v3.Point, 0), + } + } + + for _, point := range data.Points { + if point.Timestamp >= start && point.Timestamp <= end { + series[h].Points = append(series[h].Points, point) + } + } + } + } + + newSeries := make([]*v3.Series, 0, len(series)) + for _, s := range series { + s.SortPoints() + s.RemoveDuplicatePoints() + newSeries = append(newSeries, s) + } + return newSeries +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index db2563edab..8e651e17ea 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/util/stats" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" ) type Reader interface { @@ -121,3 +122,8 @@ type Querier interface { QueriesExecuted() []string TimeRanges() [][]int } + +type QueryCache interface { + FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval + MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData +} diff --git a/pkg/query-service/querycache/query_range_cache.go b/pkg/query-service/querycache/query_range_cache.go new file mode 100644 index 0000000000..3b3e3be93c --- /dev/null +++ b/pkg/query-service/querycache/query_range_cache.go @@ -0,0 +1,225 @@ +package querycache + +import ( + "encoding/json" + "math" + "sort" + "time" + + "go.signoz.io/signoz/pkg/query-service/cache" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils/labels" + "go.uber.org/zap" +) + +type queryCache struct { + cache cache.Cache + fluxInterval time.Duration +} + +type MissInterval struct { + Start, End int64 // in milliseconds +} + +type CachedSeriesData struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Data []*v3.Series `json:"data"` +} + +type QueryCacheOption func(q *queryCache) + +func NewQueryCache(opts ...QueryCacheOption) *queryCache { + q := &queryCache{} + for _, opt := range opts { + opt(q) + } + return q +} + +func WithCache(cache cache.Cache) QueryCacheOption { + return func(q *queryCache) { + q.cache = cache + } +} + +func WithFluxInterval(fluxInterval time.Duration) QueryCacheOption { + return func(q *queryCache) { + q.fluxInterval = fluxInterval + } +} + +func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey string) []MissInterval { + if q.cache == nil || cacheKey == "" { + return []MissInterval{{Start: start, End: end}} + } + + cachedSeriesDataList := q.getCachedSeriesData(cacheKey) + + // Sort the cached data by start time + sort.Slice(cachedSeriesDataList, func(i, j int) bool { + return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start + }) + + zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList))) + + // Exclude the flux interval from the cached end time + + // Why do we use `time.Now()` here? + // When querying for a range [start, now()) + // we don't want to use the cached data inside the flux interval period + // because the data in the flux interval period might not be fully ingested + // and should not be used for caching. + // This is not an issue if the end time is before now() - fluxInterval + endMillis := time.Now().UnixMilli() + adjustStep := int64(math.Min(float64(step), 60)) + roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) + + if len(cachedSeriesDataList) > 0 { + lastCachedData := cachedSeriesDataList[len(cachedSeriesDataList)-1] + lastCachedData.End = int64( + math.Min( + float64(lastCachedData.End), + float64(roundedMillis-q.fluxInterval.Milliseconds()), + ), + ) + } + + var missingRanges []MissInterval + currentTime := start + + for _, data := range cachedSeriesDataList { + // Ignore cached data that ends before the start time + if data.End <= start { + continue + } + // Stop processing if we've reached the end time + if data.Start >= end { + break + } + + // Add missing range if there's a gap + if currentTime < data.Start { + missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)}) + } + + // Update currentTime, but don't go past the end time + currentTime = max(currentTime, min(data.End, end)) + } + + // Add final missing range if necessary + if currentTime < end { + missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end}) + } + + return missingRanges +} + +func (q *queryCache) getCachedSeriesData(cacheKey string) []*CachedSeriesData { + cachedData, _, _ := q.cache.Retrieve(cacheKey, true) + var cachedSeriesDataList []*CachedSeriesData + if err := json.Unmarshal(cachedData, &cachedSeriesDataList); err != nil { + return nil + } + return cachedSeriesDataList +} + +func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.Series { + // Merge the missed series with the cached series by timestamp + mergedSeries := make([]*v3.Series, 0) + seriesesByLabels := make(map[uint64]*v3.Series) + for idx := range cachedSeries { + series := cachedSeries[idx] + seriesesByLabels[labels.FromMap(series.Labels).Hash()] = series + } + + for idx := range missedSeries { + series := missedSeries[idx] + h := labels.FromMap(series.Labels).Hash() + if _, ok := seriesesByLabels[h]; !ok { + seriesesByLabels[h] = series + continue + } + seriesesByLabels[h].Points = append(seriesesByLabels[h].Points, series.Points...) + } + // Sort the points in each series by timestamp + for idx := range seriesesByLabels { + series := seriesesByLabels[idx] + series.SortPoints() + series.RemoveDuplicatePoints() + mergedSeries = append(mergedSeries, series) + } + return mergedSeries +} + +func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) { + mergedDataJSON, err := json.Marshal(mergedData) + if err != nil { + zap.L().Error("error marshalling merged data", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedDataJSON, 0) + if err != nil { + zap.L().Error("error storing merged data", zap.Error(err)) + } +} + +func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData { + + if q.cache == nil { + return newData + } + + cachedData, _, _ := q.cache.Retrieve(cacheKey, true) + var existingData []CachedSeriesData + if err := json.Unmarshal(cachedData, &existingData); err != nil { + // In case of error, we return the entire range as a miss + q.storeMergedData(cacheKey, newData) + return newData + } + + allData := append(existingData, newData...) + + sort.Slice(allData, func(i, j int) bool { + return allData[i].Start < allData[j].Start + }) + + var mergedData []CachedSeriesData + var current *CachedSeriesData + + for _, data := range allData { + if current == nil { + current = &CachedSeriesData{ + Start: data.Start, + End: data.End, + Data: data.Data, + } + continue + } + if data.Start <= current.End { + // Overlapping intervals, merge them + current.End = max(current.End, data.End) + current.Start = min(current.Start, data.Start) + // Merge the Data fields + current.Data = q.mergeSeries(current.Data, data.Data) + } else { + // No overlap, add current to mergedData + mergedData = append(mergedData, *current) + // Start new current + current = &CachedSeriesData{ + Start: data.Start, + End: data.End, + Data: data.Data, + } + } + } + + // After the loop, add the last current + if current != nil { + mergedData = append(mergedData, *current) + } + + q.storeMergedData(cacheKey, mergedData) + + return mergedData +} diff --git a/pkg/query-service/querycache/query_range_cache_test.go b/pkg/query-service/querycache/query_range_cache_test.go new file mode 100644 index 0000000000..c71ba13f10 --- /dev/null +++ b/pkg/query-service/querycache/query_range_cache_test.go @@ -0,0 +1,336 @@ +package querycache_test + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.signoz.io/signoz/pkg/query-service/cache/inmemory" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" +) + +func TestFindMissingTimeRanges(t *testing.T) { + // Initialize the mock cache + mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + // Create a queryCache instance with the mock cache and a fluxInterval + q := querycache.NewQueryCache( + querycache.WithCache(mockCache), + querycache.WithFluxInterval(0), // Set to zero for testing purposes + ) + + // Define the test cases + testCases := []struct { + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + step int64 // in seconds + cacheKey string + cachedData []querycache.CachedSeriesData + expectedMiss []querycache.MissInterval + }{ + { + name: "Cached time range is a subset of the requested time range", + requestedStart: 1000, + requestedEnd: 5000, + step: 60, + cacheKey: "testKey1", + cachedData: []querycache.CachedSeriesData{ + { + Start: 2000, + End: 3000, + Data: []*v3.Series{}, // Data can be empty for this test + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1000, End: 2000}, + {Start: 3000, End: 5000}, + }, + }, + { + name: "Cached time range is a superset of the requested time range", + requestedStart: 2000, + requestedEnd: 3000, + step: 60, + cacheKey: "testKey2", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1000, + End: 4000, + Data: []*v3.Series{}, + }, + }, + expectedMiss: nil, // No missing intervals + }, + { + name: "Cached time range is a left overlap of the requested time range", + requestedStart: 2000, + requestedEnd: 4000, + step: 60, + cacheKey: "testKey3", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1000, + End: 2500, + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 2500, End: 4000}, + }, + }, + { + name: "Cached time range is a right overlap of the requested time range", + requestedStart: 2000, + requestedEnd: 4000, + step: 60, + cacheKey: "testKey4", + cachedData: []querycache.CachedSeriesData{ + { + Start: 3500, + End: 5000, + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 2000, End: 3500}, + }, + }, + { + name: "Cached time range is disjoint from the requested time range", + requestedStart: 2000, + requestedEnd: 4000, + step: 60, + cacheKey: "testKey5", + cachedData: []querycache.CachedSeriesData{ + { + Start: 5000, + End: 6000, + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 2000, End: 4000}, + }, + }, + // Additional test cases for non-overlapping cached data + { + name: "Multiple non-overlapping cached intervals within requested range", + requestedStart: 1000, + requestedEnd: 5000, + step: 60, + cacheKey: "testKey6", + cachedData: []querycache.CachedSeriesData{ + {Start: 1100, End: 1200, Data: []*v3.Series{}}, + {Start: 1300, End: 1400, Data: []*v3.Series{}}, + {Start: 1500, End: 1600, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1000, End: 1100}, + {Start: 1200, End: 1300}, + {Start: 1400, End: 1500}, + {Start: 1600, End: 5000}, + }, + }, + { + name: "Cached intervals covering some parts with gaps", + requestedStart: 1000, + requestedEnd: 2000, + step: 60, + cacheKey: "testKey7", + cachedData: []querycache.CachedSeriesData{ + {Start: 1000, End: 1100, Data: []*v3.Series{}}, + {Start: 1200, End: 1300, Data: []*v3.Series{}}, + {Start: 1400, End: 1500, Data: []*v3.Series{}}, + {Start: 1600, End: 1700, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1100, End: 1200}, + {Start: 1300, End: 1400}, + {Start: 1500, End: 1600}, + {Start: 1700, End: 2000}, + }, + }, + { + name: "Non-overlapping cached intervals outside requested range", + requestedStart: 2000, + requestedEnd: 3000, + step: 60, + cacheKey: "testKey8", + cachedData: []querycache.CachedSeriesData{ + {Start: 1000, End: 1500, Data: []*v3.Series{}}, + {Start: 3500, End: 4000, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 2000, End: 3000}, + }, + }, + { + name: "No cached data at all", + requestedStart: 1000, + requestedEnd: 2000, + step: 60, + cacheKey: "testKey10", + cachedData: nil, + expectedMiss: []querycache.MissInterval{ + {Start: 1000, End: 2000}, + }, + }, + { + name: "Cached intervals with overlapping and non-overlapping mix", + requestedStart: 1000, + requestedEnd: 5000, + step: 60, + cacheKey: "testKey11", + cachedData: []querycache.CachedSeriesData{ + {Start: 1000, End: 2000, Data: []*v3.Series{}}, + {Start: 1500, End: 2500, Data: []*v3.Series{}}, // Overlaps with previous + {Start: 3000, End: 3500, Data: []*v3.Series{}}, + {Start: 4000, End: 4500, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 2500, End: 3000}, + {Start: 3500, End: 4000}, + {Start: 4500, End: 5000}, + }, + }, + { + name: "Cached intervals covering the edges but missing middle", + requestedStart: 1000, + requestedEnd: 5000, + step: 60, + cacheKey: "testKey12", + cachedData: []querycache.CachedSeriesData{ + {Start: 1000, End: 1500, Data: []*v3.Series{}}, + {Start: 4500, End: 5000, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1500, End: 4500}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Store the cached data in the mock cache + if len(tc.cachedData) > 0 { + cachedDataJSON, err := json.Marshal(tc.cachedData) + assert.NoError(t, err) + err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0) + assert.NoError(t, err) + } + + // Call FindMissingTimeRanges + missingRanges := q.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey) + + // Verify the missing ranges + assert.Equal(t, tc.expectedMiss, missingRanges) + }) + } +} + +func TestMergeWithCachedSeriesData(t *testing.T) { + // Initialize the mock cache + mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + // Create a queryCache instance with the mock cache and a fluxInterval + q := querycache.NewQueryCache( + querycache.WithCache(mockCache), + querycache.WithFluxInterval(0), // Set to zero for testing purposes + ) + + // Define test data + cacheKey := "mergeTestKey" + + // Existing cached data + existingData := []querycache.CachedSeriesData{ + { + Start: 1000, + End: 2000, + Data: []*v3.Series{ + { + Labels: map[string]string{"metric": "cpu", "instance": "localhost"}, + Points: []v3.Point{ + {Timestamp: 1500, Value: 0.5}, + }, + }, + }, + }, + } + + // New data to merge + newData := []querycache.CachedSeriesData{ + { + Start: 1500, + End: 2500, + Data: []*v3.Series{ + { + Labels: map[string]string{"metric": "cpu", "instance": "localhost"}, + Points: []v3.Point{ + {Timestamp: 1750, Value: 0.6}, + }, + }, + { + Labels: map[string]string{"metric": "memory", "instance": "localhost"}, + Points: []v3.Point{ + {Timestamp: 1800, Value: 0.7}, + }, + }, + }, + }, + } + + // Expected merged data + expectedMergedData := []querycache.CachedSeriesData{ + { + Start: 1000, + End: 2500, + Data: []*v3.Series{ + { + Labels: map[string]string{"metric": "cpu", "instance": "localhost"}, + Points: []v3.Point{ + {Timestamp: 1500, Value: 0.5}, + {Timestamp: 1750, Value: 0.6}, + }, + }, + { + Labels: map[string]string{"metric": "memory", "instance": "localhost"}, + Points: []v3.Point{ + {Timestamp: 1800, Value: 0.7}, + }, + }, + }, + }, + } + + // Store existing data in cache + cachedDataJSON, err := json.Marshal(existingData) + assert.NoError(t, err) + err = mockCache.Store(cacheKey, cachedDataJSON, 0) + assert.NoError(t, err) + + // Call MergeWithCachedSeriesData + mergedData := q.MergeWithCachedSeriesData(cacheKey, newData) + + // Verify the merged data + assert.Equal(t, len(expectedMergedData), len(mergedData)) + for i, expected := range expectedMergedData { + actual := mergedData[i] + assert.Equal(t, expected.Start, actual.Start) + assert.Equal(t, expected.End, actual.End) + assert.Equal(t, len(expected.Data), len(actual.Data)) + for j, expectedSeries := range expected.Data { + actualSeries := actual.Data[j] + assert.Equal(t, expectedSeries.Labels, actualSeries.Labels) + assert.Equal(t, len(expectedSeries.Points), len(actualSeries.Points)) + for k, expectedPoint := range expectedSeries.Points { + actualPoint := actualSeries.Points[k] + assert.Equal(t, expectedPoint.Timestamp, actualPoint.Timestamp) + assert.Equal(t, expectedPoint.Value, actualPoint.Value) + } + } + } +}