diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index ccc441463d..f1b9225c2b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -70,6 +70,7 @@ const ( signozTraceTableName = "distributed_signoz_index_v2" signozTraceLocalTableName = "signoz_index_v2" signozMetricDBName = "signoz_metrics" + signozMetadataDbName = "signoz_metadata" signozSampleLocalTableName = "samples_v4" signozSampleTableName = "distributed_samples_v4" @@ -95,6 +96,8 @@ const ( signozTSLocalTableNameV41Week = "time_series_v4_1week" signozTSTableNameV41Week = "distributed_time_series_v4_1week" + signozTableAttributesMetadata = "distributed_attributes_metadata" + signozLocalTableAttributesMetadata = "attributes_metadata" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute maxProgressiveSteps = 4 @@ -2031,9 +2034,6 @@ func (r *ClickHouseReader) SetTTLTracesV2(ctx context.Context, params *model.TTL return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil } -// SetTTL sets the TTL for traces or metrics or logs tables. -// This is an async API which creates goroutines to set TTL. -// Status of TTL update is tracked with ttl_status table in sqlite db. // SetTTL sets the TTL for traces or metrics or logs tables. // This is an async API which creates goroutines to set TTL. // Status of TTL update is tracked with ttl_status table in sqlite db. @@ -5691,7 +5691,8 @@ func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - rows, err := r.db.Query(ctx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText)) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText)) //only showing past day data if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} @@ -5713,6 +5714,9 @@ func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, } response = append(response, key) } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } return &response, nil } @@ -5726,7 +5730,8 @@ func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - rows, err = r.db.Query(ctx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff()) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err = r.db.Query(valueCtx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff()) //only showing past day data if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) @@ -5741,6 +5746,9 @@ func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context } attributeValues = append(attributeValues, atrributeValue) } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } return attributeValues, nil } @@ -5752,7 +5760,8 @@ func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *met query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, fmt.Sprintf("%%%s%%", req.SearchText)) if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} @@ -5765,6 +5774,9 @@ func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *met } response = append(response, attributeKey) } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } return response, nil } func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) { @@ -5774,8 +5786,8 @@ func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *met if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } - - rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, fmt.Sprintf("%%%s%%", req.SearchText)) if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} @@ -5788,6 +5800,9 @@ func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *met } response = append(response, attributeKey) } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } return response, nil } @@ -5798,7 +5813,8 @@ FROM %s.%s WHERE metric_name = ? `, signozMetricDBName, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME) var dataPoints uint64 - err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + err := r.db.QueryRow(valueCtx, query, metricName).Scan(&dataPoints) if err != nil { return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } @@ -5812,7 +5828,8 @@ FROM %s.%s WHERE metric_name = ? `, signozMetricDBName, signozSampleTableName) var lastReceived int64 - err := r.db.QueryRow(ctx, query, metricName).Scan(&lastReceived) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + err := r.db.QueryRow(valueCtx, query, metricName).Scan(&lastReceived) if err != nil { return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } @@ -5825,52 +5842,60 @@ func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, FROM %s.%s WHERE metric_name = ?;`, signozMetricDBName, signozTSTableNameV41Week) var timeSeriesCount uint64 - err := r.db.QueryRow(ctx, query, metricName).Scan(&timeSeriesCount) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + err := r.db.QueryRow(valueCtx, query, metricName).Scan(&timeSeriesCount) if err != nil { return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } return timeSeriesCount, nil } -func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) { - query := fmt.Sprintf(` +func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string, start, end *int64) (*[]metrics_explorer.Attribute, *model.ApiError) { + const baseQueryTemplate = ` SELECT kv.1 AS key, - arrayMap(x -> trim(BOTH '\"' FROM x), groupUniqArray(10000)(kv.2)) AS values, + arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(10000)(kv.2)) AS values, length(groupUniqArray(10000)(kv.2)) AS valueCount FROM %s.%s ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv -WHERE metric_name = ? -GROUP BY kv.1 -ORDER BY valueCount DESC; - `, signozMetricDBName, signozTSTableNameV41Week) +WHERE metric_name = ?` + + var args []interface{} + args = append(args, metricName) + tableName := signozTSTableNameV41Week + + if start != nil && end != nil { + st, en, tsTable, _ := utils.WhichTSTableToUse(*start, *end) + *start, *end, tableName = st, en, tsTable + args = append(args, *start, *end) + } else if start == nil && end == nil { + tableName = signozTSTableNameV41Week + } + + query := fmt.Sprintf(baseQueryTemplate, signozMetricDBName, tableName) + + if start != nil && end != nil { + query += " AND unix_milli BETWEEN ? AND ?" + } + + query += "\nGROUP BY kv.1\nORDER BY valueCount DESC;" - rows, err := r.db.Query(ctx, query, metricName) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, args...) if err != nil { return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} } - defer rows.Close() // Ensure the rows are closed + defer rows.Close() var attributesList []metrics_explorer.Attribute for rows.Next() { - var key string - var values []string - var valueCount uint64 - - // Manually scan each value into its corresponding variable - if err := rows.Scan(&key, &values, &valueCount); err != nil { + var attr metrics_explorer.Attribute + if err := rows.Scan(&attr.Key, &attr.Value, &attr.ValueCount); err != nil { return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} } - - // Append the scanned values into the struct - attributesList = append(attributesList, metrics_explorer.Attribute{ - Key: key, - Value: values, - ValueCount: valueCount, - }) + attributesList = append(attributesList, attr) } - // Handle any errors encountered while scanning rows if err := rows.Err(); err != nil { return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} } @@ -5883,7 +5908,8 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context, query := fmt.Sprintf("SELECT uniq(fingerprint) FROM %s.%s WHERE metric_name = '%s' and unix_milli >= ?", signozMetricDBName, signozTSTableNameV4, metricName) var timeSeries uint64 // Using QueryRow instead of Select since we're only expecting a single value - err := r.db.QueryRow(ctx, query, milli).Scan(&timeSeries) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + err := r.db.QueryRow(valueCtx, query, milli).Scan(&timeSeries) if err != nil { return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } @@ -6009,7 +6035,7 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_ sb.WriteString(orderByClauseFirstQuery) } - sb.WriteString(fmt.Sprintf(" LIMIT %d OFFSET %d;", req.Limit, req.Offset)) + sb.WriteString(fmt.Sprintf(" LIMIT %d;", req.Limit)) sampleQuery := sb.String() @@ -6455,6 +6481,10 @@ func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metr } } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + // Normalize the attribute similarity scores normalizeMap := utils.NormalizeMap(attributeMap) for metric := range result { @@ -6467,3 +6497,199 @@ func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metr return result, nil } + +func (r *ClickHouseReader) GetMetricsAllResourceAttributes(ctx context.Context, start int64, end int64) (map[string]uint64, *model.ApiError) { + start, end, attTable, _ := utils.WhichAttributesTableToUse(start, end) + query := fmt.Sprintf(`SELECT + key, + count(distinct value) AS distinct_value_count +FROM ( + SELECT key, value + FROM %s.%s + ARRAY JOIN + arrayConcat(mapKeys(resource_attributes)) AS key, + arrayConcat(mapValues(resource_attributes)) AS value + WHERE unix_milli between ? and ? +) +GROUP BY key +ORDER BY distinct_value_count DESC;`, signozMetadataDbName, attTable) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, start, end) + if err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + attributes := make(map[string]uint64) + for rows.Next() { + var attrs string + var uniqCount uint64 + + if err := rows.Scan(&attrs, &uniqCount); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + attributes[attrs] = uniqCount + } + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return attributes, nil +} + +func (r *ClickHouseReader) GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest, fingerprints []string) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) { + start, end, _, localTsTable := utils.WhichTSTableToUse(req.Start, req.End) + fingerprintsString := strings.Join(fingerprints, ",") + query := fmt.Sprintf(`SELECT + fingerprint, + labels, + unix_milli, + value as per_series_value + FROM + signoz_metrics.distributed_samples_v4 + INNER JOIN ( + SELECT DISTINCT + fingerprint, + labels + FROM + %s.%s + WHERE + fingerprint in (%s) + AND unix_milli >= ? + AND unix_milli < ?) as filtered_time_series + USING fingerprint + WHERE + metric_name = ? + AND unix_milli >= ? + AND unix_milli < ? + ORDER BY fingerprint DESC, unix_milli DESC`, signozMetricDBName, localTsTable, fingerprintsString) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, start, end, req.MetricName, start, end) + if err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + seriesMap := make(map[uint64]*v3.Series) + + for rows.Next() { + var fingerprint uint64 + var labelsJSON string + var unixMilli int64 + var perSeriesValue float64 + + if err := rows.Scan(&fingerprint, &labelsJSON, &unixMilli, &perSeriesValue); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var labelsMap map[string]string + if err := json.Unmarshal([]byte(labelsJSON), &labelsMap); err != nil { + return nil, &model.ApiError{Typ: "JsonUnmarshalError", Err: err} + } + + // Filter out keys starting with "__" + filteredLabelsMap := make(map[string]string) + for k, v := range labelsMap { + if !strings.HasPrefix(k, "__") { + filteredLabelsMap[k] = v + } + } + + var labelsArray []map[string]string + for k, v := range filteredLabelsMap { + labelsArray = append(labelsArray, map[string]string{k: v}) + } + + // Check if we already have a Series for this fingerprint. + series, exists := seriesMap[fingerprint] + if !exists { + series = &v3.Series{ + Labels: filteredLabelsMap, + LabelsArray: labelsArray, + Points: []v3.Point{}, + } + seriesMap[fingerprint] = series + } + + series.Points = append(series.Points, v3.Point{ + Timestamp: unixMilli, + Value: perSeriesValue, + }) + } + + if err = rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var seriesList []v3.Series + for _, s := range seriesMap { + seriesList = append(seriesList, *s) + } + + return &metrics_explorer.InspectMetricsResponse{ + Series: &seriesList, + }, nil +} + +func (r *ClickHouseReader) GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) { + // Build dynamic key selections and JSON extracts + var jsonExtracts []string + var groupBys []string + + for i, attr := range attributes { + keyAlias := fmt.Sprintf("key%d", i+1) + jsonExtracts = append(jsonExtracts, fmt.Sprintf("JSONExtractString(labels, '%s') AS %s", attr, keyAlias)) + groupBys = append(groupBys, keyAlias) + } + start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End) + query := fmt.Sprintf(` + SELECT + arrayDistinct(groupArray(toString(fingerprint))) AS fingerprints +FROM +( + SELECT + metric_name, labels, fingerprint, + %s + FROM %s.%s + WHERE metric_name = ? + AND unix_milli BETWEEN ? AND ? +) +GROUP BY %s +ORDER BY length(fingerprints) DESC, rand() +LIMIT 40`, // added rand to get diff value every time we run this query + strings.Join(jsonExtracts, ", "), + signozMetricDBName, tsTable, + strings.Join(groupBys, ", ")) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, + req.MetricName, + start, + end, + ) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} + } + defer rows.Close() + + var fingerprints []string + for rows.Next() { + // Create dynamic scanning based on number of attributes + var fingerprintsList []string + + if err := rows.Scan(&fingerprintsList); err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} + } + + if len(fingerprints) == 0 || len(fingerprints)+len(fingerprintsList) < 40 { + fingerprints = append(fingerprints, fingerprintsList...) + } + + if len(fingerprints) > 40 { + break + } + + } + + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} + } + + return fingerprints, nil +} diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index 416feaac02..07228fce08 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -617,10 +617,10 @@ func GetDashboardsWithMetricNames(ctx context.Context, metricNames []string) (ma for _, metricName := range metricNames { if strings.TrimSpace(key) == metricName { result[metricName] = append(result[metricName], map[string]string{ - "dashboard_id": dashboard.Uuid, - "widget_title": widgetTitle, - "widget_id": widgetID, - "dashboard_title": dashTitle, + "dashboard_id": dashboard.Uuid, + "widget_name": widgetTitle, + "widget_id": widgetID, + "dashboard_name": dashTitle, }) } } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ecf407ac5f..33e6f56ed2 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -640,6 +640,9 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddlewar router.HandleFunc("/api/v1/metrics/related", am.ViewAccess(ah.GetRelatedMetrics)). Methods(http.MethodPost) + router.HandleFunc("/api/v1/metrics/inspect", + am.ViewAccess(ah.GetInspectMetricsData)). + Methods(http.MethodPost) } func Intersection(a, b []int) (c []int) { diff --git a/pkg/query-service/app/metricsexplorer/parser.go b/pkg/query-service/app/metricsexplorer/parser.go index 7ae83e8864..1967d98934 100644 --- a/pkg/query-service/app/metricsexplorer/parser.go +++ b/pkg/query-service/app/metricsexplorer/parser.go @@ -76,3 +76,14 @@ func ParseRelatedMetricsParams(r *http.Request) (*metrics_explorer.RelatedMetric } return &relatedMetricParams, nil } + +func ParseInspectMetricsParams(r *http.Request) (*metrics_explorer.InspectMetricsRequest, *model.ApiError) { + var inspectMetricParams metrics_explorer.InspectMetricsRequest + if err := json.NewDecoder(r.Body).Decode(&inspectMetricParams); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + if inspectMetricParams.End-inspectMetricParams.Start > 1800000 { // half hour only + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("time duration shouldn't be more than 30 mins")} + } + return &inspectMetricParams, nil +} diff --git a/pkg/query-service/app/metricsexplorer/summary.go b/pkg/query-service/app/metricsexplorer/summary.go index bd692c0381..7b694e6c2f 100644 --- a/pkg/query-service/app/metricsexplorer/summary.go +++ b/pkg/query-service/app/metricsexplorer/summary.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "sort" + "strings" "time" "go.uber.org/zap" @@ -142,7 +143,7 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam }) g.Go(func() error { - attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName) + attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName, nil, nil) if err != nil { return err } @@ -166,13 +167,15 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return &model.ApiError{Typ: "MarshallingErr", Err: err} } - var dashboards []metrics_explorer.Dashboard + var dashboards map[string][]metrics_explorer.Dashboard err = json.Unmarshal(jsonData, &dashboards) if err != nil { zap.L().Error("Error unmarshalling data:", zap.Error(err)) return &model.ApiError{Typ: "UnMarshallingErr", Err: err} } - metricDetailsDTO.Dashboards = dashboards + if _, ok := dashboards[metricName]; ok { + metricDetailsDTO.Dashboards = dashboards[metricName] + } } return nil }) @@ -447,3 +450,85 @@ func getQueryRangeForRelateMetricsList(metricName string, scores metrics_explore return &query } + +func (receiver *SummaryService) GetInspectMetrics(ctx context.Context, params *metrics_explorer.InspectMetricsRequest) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) { + // Capture the original context. + parentCtx := ctx + + // Create an errgroup using the original context. + g, egCtx := errgroup.WithContext(ctx) + + var attributes []metrics_explorer.Attribute + var resourceAttrs map[string]uint64 + + // Run the two queries concurrently using the derived context. + g.Go(func() error { + attrs, apiErr := receiver.reader.GetAttributesForMetricName(egCtx, params.MetricName, ¶ms.Start, ¶ms.End) + if apiErr != nil { + return apiErr + } + if attrs != nil { + attributes = *attrs + } + return nil + }) + + g.Go(func() error { + resAttrs, apiErr := receiver.reader.GetMetricsAllResourceAttributes(egCtx, params.Start, params.End) + if apiErr != nil { + return apiErr + } + if resAttrs != nil { + resourceAttrs = resAttrs + } + return nil + }) + + // Wait for the concurrent operations to complete. + if err := g.Wait(); err != nil { + return nil, &model.ApiError{Typ: "InternalError", Err: err} + } + + // Use the parentCtx (or create a new context from it) for the rest of the calls. + if parentCtx.Err() != nil { + return nil, &model.ApiError{Typ: "ContextCanceled", Err: parentCtx.Err()} + } + + // Build a set of attribute keys for O(1) lookup. + attributeKeys := make(map[string]struct{}) + for _, attr := range attributes { + attributeKeys[attr.Key] = struct{}{} + } + + // Filter resource attributes that are present in attributes. + var validAttrs []string + for attrName := range resourceAttrs { + normalizedAttrName := strings.ReplaceAll(attrName, ".", "_") + if _, ok := attributeKeys[normalizedAttrName]; ok { + validAttrs = append(validAttrs, normalizedAttrName) + } + } + + // Get top 3 resource attributes (or use top attributes by valueCount if none match). + if len(validAttrs) > 3 { + validAttrs = validAttrs[:3] + } else if len(validAttrs) == 0 { + sort.Slice(attributes, func(i, j int) bool { + return attributes[i].ValueCount > attributes[j].ValueCount + }) + for i := 0; i < len(attributes) && i < 3; i++ { + validAttrs = append(validAttrs, attributes[i].Key) + } + } + fingerprints, apiError := receiver.reader.GetInspectMetricsFingerprints(parentCtx, validAttrs, params) + if apiError != nil { + return nil, apiError + } + + baseResponse, apiErr := receiver.reader.GetInspectMetrics(parentCtx, params, fingerprints) + if apiErr != nil { + return nil, apiErr + } + + return baseResponse, nil +} diff --git a/pkg/query-service/app/summary.go b/pkg/query-service/app/summary.go index 81049e0a0e..b30de738ea 100644 --- a/pkg/query-service/app/summary.go +++ b/pkg/query-service/app/summary.go @@ -122,3 +122,23 @@ func (aH *APIHandler) GetRelatedMetrics(w http.ResponseWriter, r *http.Request) aH.Respond(w, result) } + +func (aH *APIHandler) GetInspectMetricsData(w http.ResponseWriter, r *http.Request) { + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + ctx := r.Context() + params, apiError := explorer.ParseInspectMetricsParams(r) + if apiError != nil { + zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + result, apiError := aH.SummaryService.GetInspectMetrics(ctx, params) + if apiError != nil { + zap.L().Error("error getting inspect metrics data", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, result) + +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index fbfe0b76db..947c299233 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -226,6 +226,8 @@ const ( SIGNOZ_TIMESERIES_v4_TABLENAME = "distributed_time_series_v4" SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME = "distributed_time_series_v4_1week" SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME = "distributed_time_series_v4_6hrs" + SIGNOZ_ATTRIBUTES_METADATA_TABLENAME = "distributed_attributes_metadata" + SIGNOZ_ATTRIBUTES_METADATA_LOCAL_TABLENAME = "attributes_metadata" ) // alert related constants diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 9457a4cb5d..cc75b2223d 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -126,7 +126,7 @@ type Reader interface { GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) - GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) + GetAttributesForMetricName(ctx context.Context, metricName string, start, end *int64) (*[]metrics_explorer.Attribute, *model.ApiError) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) @@ -135,6 +135,10 @@ type Reader interface { GetNameSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) GetAttributeSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) + + GetMetricsAllResourceAttributes(ctx context.Context, start int64, end int64) (map[string]uint64, *model.ApiError) + GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError) + GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest, fingerprints []string) (*metrics_explorer.InspectMetricsResponse, *model.ApiError) } type Querier interface { diff --git a/pkg/query-service/model/metrics_explorer/summary.go b/pkg/query-service/model/metrics_explorer/summary.go index d9e493b0ab..bd297cc866 100644 --- a/pkg/query-service/model/metrics_explorer/summary.go +++ b/pkg/query-service/model/metrics_explorer/summary.go @@ -149,3 +149,14 @@ type RelatedMetrics struct { Dashboards []Dashboard `json:"dashboards"` Alerts []Alert `json:"alerts"` } + +type InspectMetricsRequest struct { + MetricName string `json:"metricName"` + Filters v3.FilterSet `json:"filters"` + Start int64 `json:"start"` + End int64 `json:"end"` +} + +type InspectMetricsResponse struct { + Series *[]v3.Series `json:"series,omitempty"` +} diff --git a/pkg/query-service/utils/filter_conditions.go b/pkg/query-service/utils/filter_conditions.go index 5b496320f4..d9b1390838 100644 --- a/pkg/query-service/utils/filter_conditions.go +++ b/pkg/query-service/utils/filter_conditions.go @@ -151,3 +151,10 @@ func WhichSampleTableToUse(start, end int64) (string, string) { return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME, "sum(count)" } } + +func WhichAttributesTableToUse(start, end int64) (int64, int64, string, string) { + if end-start < sixHoursInMilliseconds { + start = start - (start % (time.Hour.Milliseconds() * 6)) + } + return start, end, constants.SIGNOZ_ATTRIBUTES_METADATA_TABLENAME, constants.SIGNOZ_ATTRIBUTES_METADATA_LOCAL_TABLENAME +}