From f3cb3b9840cdadf8a21b1ee8f2d68ecd31b5beed Mon Sep 17 00:00:00 2001 From: Shaheer Kochai Date: Wed, 11 Sep 2024 18:14:22 +0430 Subject: [PATCH 01/25] fix: loading and no-data states showing in loading state of alert edit/overview (#5887) --- frontend/src/container/FormAlertRules/ChartPreview/index.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/container/FormAlertRules/ChartPreview/index.tsx b/frontend/src/container/FormAlertRules/ChartPreview/index.tsx index 03db8a362e..add24106e4 100644 --- a/frontend/src/container/FormAlertRules/ChartPreview/index.tsx +++ b/frontend/src/container/FormAlertRules/ChartPreview/index.tsx @@ -260,7 +260,7 @@ function ChartPreview({ )} - {chartData && !queryResponse.isError && ( + {chartData && !queryResponse.isError && !queryResponse.isLoading && ( Date: Wed, 11 Sep 2024 18:16:41 +0430 Subject: [PATCH 02/25] feat: alert history feedback changes (#5903) * fix: make the default offset 0 * chore: add beta tag to alert history * fix: don't add 5 minutes earlier to the timeline graph data --- .../src/container/AlertHistory/Timeline/Graph/Graph.tsx | 5 +---- frontend/src/pages/AlertDetails/hooks.tsx | 4 +++- frontend/src/periscope/components/BetaTag/BetaTag.tsx | 9 +++++++++ 3 files changed, 13 insertions(+), 5 deletions(-) create mode 100644 frontend/src/periscope/components/BetaTag/BetaTag.tsx diff --git a/frontend/src/container/AlertHistory/Timeline/Graph/Graph.tsx b/frontend/src/container/AlertHistory/Timeline/Graph/Graph.tsx index a0534691df..5adf1c481a 100644 --- a/frontend/src/container/AlertHistory/Timeline/Graph/Graph.tsx +++ b/frontend/src/container/AlertHistory/Timeline/Graph/Graph.tsx @@ -26,17 +26,14 @@ function HorizontalTimelineGraph({ return [[], []]; } - // add a first and last entry to make sure the graph displays all the data - const FIVE_MINUTES_IN_SECONDS = 300; + // add an entry for the end time of the last entry to make sure the graph displays all the data const timestamps = [ - data[0].start / 1000 - FIVE_MINUTES_IN_SECONDS, // 5 minutes before the first entry ...data.map((item) => item.start / 1000), data[data.length - 1].end / 1000, // end value of last entry ]; const states = [ - ALERT_STATUS[data[0].state], // Same state as the first entry ...data.map((item) => ALERT_STATUS[item.state]), ALERT_STATUS[data[data.length - 1].state], // Same state as the last entry ]; diff --git a/frontend/src/pages/AlertDetails/hooks.tsx b/frontend/src/pages/AlertDetails/hooks.tsx index fc6219b195..c6f7b64d64 100644 --- a/frontend/src/pages/AlertDetails/hooks.tsx +++ b/frontend/src/pages/AlertDetails/hooks.tsx @@ -26,6 +26,7 @@ import history from 'lib/history'; import { History, Table } from 'lucide-react'; import EditRules from 'pages/EditRules'; import { OrderPreferenceItems } from 'pages/Logs/config'; +import BetaTag from 'periscope/components/BetaTag/BetaTag'; import PaginationInfoText from 'periscope/components/PaginationInfoText/PaginationInfoText'; import { useAlertRule } from 'providers/Alert'; import { useCallback, useMemo } from 'react'; @@ -125,6 +126,7 @@ export const useRouteTabUtils = (): { routes: TabRoutes[] } => {
History +
), route: getRouteUrl(AlertDetailsTab.HISTORY), @@ -256,7 +258,7 @@ export const useGetAlertRuleDetailsTimelineTable = (): GetAlertRuleDetailsTimeli const { updatedOrder, offset } = useMemo( () => ({ updatedOrder: params.get(urlKey.order) ?? OrderPreferenceItems.ASC, - offset: parseInt(params.get(urlKey.offset) ?? '1', 10), + offset: parseInt(params.get(urlKey.offset) ?? '0', 10), }), [params], ); diff --git a/frontend/src/periscope/components/BetaTag/BetaTag.tsx b/frontend/src/periscope/components/BetaTag/BetaTag.tsx new file mode 100644 index 0000000000..55a4e79d58 --- /dev/null +++ b/frontend/src/periscope/components/BetaTag/BetaTag.tsx @@ -0,0 +1,9 @@ +import { Tag } from 'antd'; + +export default function BetaTag(): JSX.Element { + return ( + + Beta + + ); +} From d6b75d76ca89f1d2fd68b3d0e737fafdabf65844 Mon Sep 17 00:00:00 2001 From: Shaheer Kochai Date: Wed, 11 Sep 2024 19:02:17 +0430 Subject: [PATCH 03/25] fix: add support for long texts in alert history page (#5895) --- .../AlertDetails/AlertHeader/AlertHeader.tsx | 5 +- .../KeyValueLabel/KeyValueLabel.tsx | 25 +++++++-- .../LineClampedText.styles.scss | 6 +++ .../LineClampedText/LineClampedText.tsx | 52 +++++++++++++++++++ .../components/TrimmedText/TrimmedText.tsx | 30 +++++++++++ 5 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 frontend/src/periscope/components/LineClampedText/LineClampedText.styles.scss create mode 100644 frontend/src/periscope/components/LineClampedText/LineClampedText.tsx create mode 100644 frontend/src/periscope/components/TrimmedText/TrimmedText.tsx diff --git a/frontend/src/pages/AlertDetails/AlertHeader/AlertHeader.tsx b/frontend/src/pages/AlertDetails/AlertHeader/AlertHeader.tsx index f4ff7b933b..073b84382b 100644 --- a/frontend/src/pages/AlertDetails/AlertHeader/AlertHeader.tsx +++ b/frontend/src/pages/AlertDetails/AlertHeader/AlertHeader.tsx @@ -1,5 +1,6 @@ import './AlertHeader.styles.scss'; +import LineClampedText from 'periscope/components/LineClampedText/LineClampedText'; import { useAlertRule } from 'providers/Alert'; import { useEffect, useMemo } from 'react'; @@ -42,7 +43,9 @@ function AlertHeader({ alertDetails }: AlertHeaderProps): JSX.Element {
-
{alert}
+
+ +
diff --git a/frontend/src/periscope/components/KeyValueLabel/KeyValueLabel.tsx b/frontend/src/periscope/components/KeyValueLabel/KeyValueLabel.tsx index aa14dd6380..377c647a3f 100644 --- a/frontend/src/periscope/components/KeyValueLabel/KeyValueLabel.tsx +++ b/frontend/src/periscope/components/KeyValueLabel/KeyValueLabel.tsx @@ -1,18 +1,37 @@ import './KeyValueLabel.styles.scss'; -type KeyValueLabelProps = { badgeKey: string; badgeValue: string }; +import { Tooltip } from 'antd'; + +import TrimmedText from '../TrimmedText/TrimmedText'; + +type KeyValueLabelProps = { + badgeKey: string; + badgeValue: string; + maxCharacters?: number; +}; export default function KeyValueLabel({ badgeKey, badgeValue, + maxCharacters = 20, }: KeyValueLabelProps): JSX.Element | null { if (!badgeKey || !badgeValue) { return null; } return (
-
{badgeKey}
-
{badgeValue}
+
+ +
+ +
+ +
+
); } + +KeyValueLabel.defaultProps = { + maxCharacters: 20, +}; diff --git a/frontend/src/periscope/components/LineClampedText/LineClampedText.styles.scss b/frontend/src/periscope/components/LineClampedText/LineClampedText.styles.scss new file mode 100644 index 0000000000..644ddc480b --- /dev/null +++ b/frontend/src/periscope/components/LineClampedText/LineClampedText.styles.scss @@ -0,0 +1,6 @@ +.line-clamped-text { + display: -webkit-box; + -webkit-box-orient: vertical; + overflow: hidden; + text-overflow: ellipsis; +} diff --git a/frontend/src/periscope/components/LineClampedText/LineClampedText.tsx b/frontend/src/periscope/components/LineClampedText/LineClampedText.tsx new file mode 100644 index 0000000000..6b90704b0c --- /dev/null +++ b/frontend/src/periscope/components/LineClampedText/LineClampedText.tsx @@ -0,0 +1,52 @@ +import './LineClampedText.styles.scss'; + +import { Tooltip } from 'antd'; +import { useEffect, useRef, useState } from 'react'; + +function LineClampedText({ + text, + lines, +}: { + text: string; + lines?: number; +}): JSX.Element { + const [isOverflowing, setIsOverflowing] = useState(false); + const textRef = useRef(null); + + useEffect(() => { + const checkOverflow = (): void => { + if (textRef.current) { + setIsOverflowing( + textRef.current.scrollHeight > textRef.current.clientHeight, + ); + } + }; + + checkOverflow(); + window.addEventListener('resize', checkOverflow); + + return (): void => { + window.removeEventListener('resize', checkOverflow); + }; + }, [text, lines]); + + const content = ( +
+ {text} +
+ ); + + return isOverflowing ? {content} : content; +} + +LineClampedText.defaultProps = { + lines: 1, +}; + +export default LineClampedText; diff --git a/frontend/src/periscope/components/TrimmedText/TrimmedText.tsx b/frontend/src/periscope/components/TrimmedText/TrimmedText.tsx new file mode 100644 index 0000000000..10919f2e3f --- /dev/null +++ b/frontend/src/periscope/components/TrimmedText/TrimmedText.tsx @@ -0,0 +1,30 @@ +import { Tooltip } from 'antd'; +import { useEffect, useState } from 'react'; + +function TrimmedText({ + text, + maxCharacters, +}: { + text: string; + maxCharacters: number; +}): JSX.Element { + const [displayText, setDisplayText] = useState(text); + + useEffect(() => { + if (text.length > maxCharacters) { + setDisplayText(`${text.slice(0, maxCharacters)}...`); + } else { + setDisplayText(text); + } + }, [text, maxCharacters]); + + return text.length > maxCharacters ? ( + + {displayText} + + ) : ( + {displayText} + ); +} + +export default TrimmedText; From 20ac75e3d23c8edba06e4c3518641d31b6492d3b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 12 Sep 2024 00:57:48 +0530 Subject: [PATCH 04/25] chore: json logs for collector (#5240) --- deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml | 2 ++ deploy/docker/clickhouse-setup/otel-collector-config.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml b/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml index 091d4f504b..afa8291358 100644 --- a/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml +++ b/deploy/docker-swarm/clickhouse-setup/otel-collector-config.yaml @@ -154,6 +154,8 @@ extensions: service: telemetry: + logs: + encoding: json metrics: address: 0.0.0.0:8888 extensions: [health_check, zpages, pprof] diff --git a/deploy/docker/clickhouse-setup/otel-collector-config.yaml b/deploy/docker/clickhouse-setup/otel-collector-config.yaml index e6d4c2418e..6f30d42ad1 100644 --- a/deploy/docker/clickhouse-setup/otel-collector-config.yaml +++ b/deploy/docker/clickhouse-setup/otel-collector-config.yaml @@ -158,6 +158,8 @@ exporters: service: telemetry: + logs: + encoding: json metrics: address: 0.0.0.0:8888 extensions: From 6e7f04b4922f8112f3d4a7ced1ad44683a5cc4da Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Thu, 12 Sep 2024 09:48:09 +0530 Subject: [PATCH 05/25] logs v4 qb refactor (#5908) * feat: logsV4 initial refactoring * feat: filter_query builder with tests added * feat: all functions of v4 refactored * fix: tests fixed * fix: update select for table panel * fix: tests updated with better examples of limit and group by * fix: resource filter support in live tail --------- Co-authored-by: Srikanth Chekuri --- pkg/query-service/app/logs/v3/json_filter.go | 24 +- .../app/logs/v3/json_filter_test.go | 2 +- .../app/logs/v3/query_builder.go | 56 +- pkg/query-service/app/logs/v4/json_filter.go | 105 ++ .../app/logs/v4/json_filter_test.go | 200 +++ .../app/logs/v4/query_builder.go | 507 ++++++++ .../app/logs/v4/query_builder_test.go | 1099 +++++++++++++++++ .../app/logs/v4/resource_query_builder.go | 13 +- .../logs/v4/resource_query_builder_test.go | 2 +- pkg/query-service/constants/constants.go | 12 + pkg/query-service/model/v3/v3.go | 6 + pkg/query-service/utils/format.go | 22 + 12 files changed, 2002 insertions(+), 46 deletions(-) create mode 100644 pkg/query-service/app/logs/v4/json_filter.go create mode 100644 pkg/query-service/app/logs/v4/json_filter_test.go create mode 100644 pkg/query-service/app/logs/v4/query_builder_test.go diff --git a/pkg/query-service/app/logs/v3/json_filter.go b/pkg/query-service/app/logs/v3/json_filter.go index 887baaab4c..d883b61797 100644 --- a/pkg/query-service/app/logs/v3/json_filter.go +++ b/pkg/query-service/app/logs/v3/json_filter.go @@ -20,7 +20,7 @@ const ( NGRAM_SIZE = 4 ) -var dataTypeMapping = map[string]string{ +var DataTypeMapping = map[string]string{ "string": STRING, "int64": INT64, "float64": FLOAT64, @@ -31,7 +31,7 @@ var dataTypeMapping = map[string]string{ "array(bool)": ARRAY_BOOL, } -var arrayValueTypeMapping = map[string]string{ +var ArrayValueTypeMapping = map[string]string{ "array(string)": "string", "array(int64)": "int64", "array(float64)": "float64", @@ -59,7 +59,7 @@ var jsonLogOperators = map[v3.FilterOperator]string{ v3.FilterOperatorNotHas: "NOT has(%s, %s)", } -func getPath(keyArr []string) string { +func GetPath(keyArr []string) string { path := []string{} for i := 0; i < len(keyArr); i++ { if strings.HasSuffix(keyArr[i], "[*]") { @@ -71,7 +71,7 @@ func getPath(keyArr []string) string { return strings.Join(path, ".") } -func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) { +func GetJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) (string, error) { keyArr := strings.Split(key.Key, ".") // i.e it should be at least body.name, and not something like body if len(keyArr) < 2 { @@ -89,11 +89,11 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) ( var dataType string var ok bool - if dataType, ok = dataTypeMapping[string(key.DataType)]; !ok { + if dataType, ok = DataTypeMapping[string(key.DataType)]; !ok { return "", fmt.Errorf("unsupported dataType for JSON: %s", key.DataType) } - path := getPath(keyArr[1:]) + path := GetPath(keyArr[1:]) if isArray { return fmt.Sprintf("JSONExtract(JSON_QUERY(%s, '$.%s'), '%s')", keyArr[0], path, dataType), nil @@ -109,7 +109,7 @@ func getJSONFilterKey(key v3.AttributeKey, op v3.FilterOperator, isArray bool) ( } // takes the path and the values and generates where clauses for better usage of index -func getPathIndexFilter(path string) string { +func GetPathIndexFilter(path string) string { filters := []string{} keyArr := strings.Split(path, ".") if len(keyArr) < 2 { @@ -136,7 +136,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { dataType := item.Key.DataType isArray := false // check if its an array and handle it - if val, ok := arrayValueTypeMapping[string(item.Key.DataType)]; ok { + if val, ok := ArrayValueTypeMapping[string(item.Key.DataType)]; ok { if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas { return "", fmt.Errorf("only has operator is supported for array") } @@ -144,7 +144,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { dataType = v3.AttributeKeyDataType(val) } - key, err := getJSONFilterKey(item.Key, item.Operator, isArray) + key, err := GetJSONFilterKey(item.Key, item.Operator, isArray) if err != nil { return "", err } @@ -164,7 +164,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { if logsOp, ok := jsonLogOperators[op]; ok { switch op { case v3.FilterOperatorExists, v3.FilterOperatorNotExists: - filter = fmt.Sprintf(logsOp, key, getPath(strings.Split(item.Key.Key, ".")[1:])) + filter = fmt.Sprintf(logsOp, key, GetPath(strings.Split(item.Key.Key, ".")[1:])) case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas: fmtVal := utils.ClickHouseFormattedValue(value) filter = fmt.Sprintf(logsOp, key, fmtVal) @@ -181,7 +181,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { filters := []string{} - pathFilter := getPathIndexFilter(item.Key.Key) + pathFilter := GetPathIndexFilter(item.Key.Key) if pathFilter != "" { filters = append(filters, pathFilter) } @@ -196,7 +196,7 @@ func GetJSONFilter(item v3.FilterItem) (string, error) { // add exists check for non array items as default values of int/float/bool will corrupt the results if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) { - existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", getPath(strings.Split(item.Key.Key, ".")[1:])) + existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", GetPath(strings.Split(item.Key.Key, ".")[1:])) filter = fmt.Sprintf("%s AND %s", existsFilter, filter) } diff --git a/pkg/query-service/app/logs/v3/json_filter_test.go b/pkg/query-service/app/logs/v3/json_filter_test.go index 0a71cd67b2..060ba63707 100644 --- a/pkg/query-service/app/logs/v3/json_filter_test.go +++ b/pkg/query-service/app/logs/v3/json_filter_test.go @@ -140,7 +140,7 @@ var testGetJSONFilterKeyData = []struct { func TestGetJSONFilterKey(t *testing.T) { for _, tt := range testGetJSONFilterKeyData { Convey("testgetKey", t, func() { - columnName, err := getJSONFilterKey(tt.Key, tt.Operator, tt.IsArray) + columnName, err := GetJSONFilterKey(tt.Key, tt.Operator, tt.IsArray) if tt.Error { So(err, ShouldNotBeNil) } else { diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index 2aa56002ff..bd64b4d0e6 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -9,7 +9,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils" ) -var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ +var AggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP05: 0.05, v3.AggregateOperatorP10: 0.10, v3.AggregateOperatorP20: 0.20, @@ -21,7 +21,7 @@ var aggregateOperatorToPercentile = map[v3.AggregateOperator]float64{ v3.AggregateOperatorP99: 0.99, } -var aggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ +var AggregateOperatorToSQLFunc = map[v3.AggregateOperator]string{ v3.AggregateOperatorAvg: "avg", v3.AggregateOperatorMax: "max", v3.AggregateOperatorMin: "min", @@ -53,7 +53,7 @@ var logOperators = map[v3.FilterOperator]string{ const BODY = "body" -func getClickhouseLogsColumnType(columnType v3.AttributeKeyType) string { +func GetClickhouseLogsColumnType(columnType v3.AttributeKeyType) string { if columnType == v3.AttributeKeyTypeTag { return "attributes" } @@ -83,7 +83,7 @@ func getClickhouseColumnName(key v3.AttributeKey) string { //if the key is present in the topLevelColumn then it will be only searched in those columns, //regardless if it is indexed/present again in resource or column attribute if !key.IsColumn { - columnType := getClickhouseLogsColumnType(key.Type) + columnType := GetClickhouseLogsColumnType(key.Type) columnDataType := getClickhouseLogsColumnDataType(key.DataType) clickhouseColumn = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", columnType, columnDataType, columnType, columnDataType, key.Key) return clickhouseColumn @@ -114,7 +114,7 @@ func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.Attri return selectLabels } -func getSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { +func GetSelectKeys(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { var selectLabels []string if aggregatorOperator == v3.AggregateOperatorNoOp { return "" @@ -154,7 +154,7 @@ func GetExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string { } return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseColumnName(item.Key), "`"), val) } - columnType := getClickhouseLogsColumnType(item.Key.Type) + columnType := GetClickhouseLogsColumnType(item.Key.Type) columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType) return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key) } @@ -224,7 +224,7 @@ func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, // add group by conditions to filter out log lines which doesn't have the key for _, attr := range groupBy { if !attr.IsColumn { - columnType := getClickhouseLogsColumnType(attr.Type) + columnType := GetClickhouseLogsColumnType(attr.Type) columnDataType := getClickhouseLogsColumnDataType(attr.DataType) conditions = append(conditions, fmt.Sprintf("has(%s_%s_key, '%s')", columnType, columnDataType, attr.Key)) } else if attr.Type != v3.AttributeKeyTypeUnspecified { @@ -258,7 +258,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy) - having := having(mq.Having) + having := Having(mq.Having) if having != "" { having = " having " + having } @@ -288,10 +288,10 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build // we dont need value for first query // going with this route as for a cleaner approach on implementation if graphLimitQtype == constants.FirstQueryGraphLimit { - queryTmpl = "SELECT " + getSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" + queryTmpl = "SELECT " + GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + queryTmpl + ")" } - groupBy := groupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) + groupBy := GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) if panelType != v3.PanelTypeList && groupBy != "" { groupBy = " group by " + groupBy } @@ -301,7 +301,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build } if graphLimitQtype == constants.SecondQueryGraphLimit { - filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", getSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" + filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" } aggregationKey := "" @@ -329,7 +329,7 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build rate = rate / 60.0 } - op := fmt.Sprintf("%s(%s)/%f", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) + op := fmt.Sprintf("%s(%s)/%f", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey, rate) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case @@ -342,11 +342,11 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build v3.AggregateOperatorP90, v3.AggregateOperatorP95, v3.AggregateOperatorP99: - op := fmt.Sprintf("quantile(%v)(%s)", aggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) + op := fmt.Sprintf("quantile(%v)(%s)", AggregateOperatorToPercentile[mq.AggregateOperator], aggregationKey) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: - op := fmt.Sprintf("%s(%s)", aggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) + op := fmt.Sprintf("%s(%s)", AggregateOperatorToSQLFunc[mq.AggregateOperator], aggregationKey) query := fmt.Sprintf(queryTmpl, op, filterSubQuery, groupBy, having, orderBy) return query, nil case v3.AggregateOperatorCount: @@ -394,7 +394,7 @@ func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) str return strings.Join(tags, ",") } -func groupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string { +func GroupByAttributeKeyTags(panelType v3.PanelType, graphLimitQtype string, tags ...v3.AttributeKey) string { groupTags := []string{} for _, tag := range tags { groupTags = append(groupTags, "`"+tag.Key+"`") @@ -446,7 +446,7 @@ func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags [] return str } -func having(items []v3.Having) string { +func Having(items []v3.Having) string { // aggregate something and filter on that aggregate var having []string for _, item := range items { @@ -455,7 +455,7 @@ func having(items []v3.Having) string { return strings.Join(having, " AND ") } -func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { +func ReduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v3.AggregateOperator) (string, error) { // the timestamp picked is not relevant here since the final value used is show the single // chart with just the query value. switch reduceTo { @@ -475,14 +475,14 @@ func reduceQuery(query string, reduceTo v3.ReduceToOperator, aggregateOperator v return query, nil } -func addLimitToQuery(query string, limit uint64) string { +func AddLimitToQuery(query string, limit uint64) string { if limit == 0 { return query } return fmt.Sprintf("%s LIMIT %d", query, limit) } -func addOffsetToQuery(query string, offset uint64) string { +func AddOffsetToQuery(query string, offset uint64) string { return fmt.Sprintf("%s OFFSET %d", query, offset) } @@ -492,7 +492,7 @@ type Options struct { PreferRPM bool } -func isOrderByTs(orderBy []v3.OrderBy) bool { +func IsOrderByTs(orderBy []v3.OrderBy) bool { if len(orderBy) == 1 && (orderBy[0].Key == constants.TIMESTAMP || orderBy[0].ColumnName == constants.TIMESTAMP) { return true } @@ -523,7 +523,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan if err != nil { return "", err } - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) return query, nil } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { @@ -539,7 +539,7 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan return "", err } if panelType == v3.PanelTypeValue { - query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + query, err = ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator) } if panelType == v3.PanelTypeList { @@ -550,21 +550,21 @@ func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.Pan if mq.PageSize > 0 { if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit { - query = addLimitToQuery(query, mq.Limit-mq.Offset) + query = AddLimitToQuery(query, mq.Limit-mq.Offset) } else { - query = addLimitToQuery(query, mq.PageSize) + query = AddLimitToQuery(query, mq.PageSize) } // add offset to the query only if it is not orderd by timestamp. - if !isOrderByTs(mq.OrderBy) { - query = addOffsetToQuery(query, mq.Offset) + if !IsOrderByTs(mq.OrderBy) { + query = AddOffsetToQuery(query, mq.Offset) } } else { - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) } } else if panelType == v3.PanelTypeTable { - query = addLimitToQuery(query, mq.Limit) + query = AddLimitToQuery(query, mq.Limit) } return query, err diff --git a/pkg/query-service/app/logs/v4/json_filter.go b/pkg/query-service/app/logs/v4/json_filter.go new file mode 100644 index 0000000000..cde88e748a --- /dev/null +++ b/pkg/query-service/app/logs/v4/json_filter.go @@ -0,0 +1,105 @@ +package v4 + +import ( + "fmt" + "strings" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +var jsonLogOperators = map[v3.FilterOperator]string{ + v3.FilterOperatorEqual: "=", + v3.FilterOperatorNotEqual: "!=", + v3.FilterOperatorLessThan: "<", + v3.FilterOperatorLessThanOrEq: "<=", + v3.FilterOperatorGreaterThan: ">", + v3.FilterOperatorGreaterThanOrEq: ">=", + v3.FilterOperatorLike: "LIKE", + v3.FilterOperatorNotLike: "NOT LIKE", + v3.FilterOperatorContains: "LIKE", + v3.FilterOperatorNotContains: "NOT LIKE", + v3.FilterOperatorRegex: "match(%s, %s)", + v3.FilterOperatorNotRegex: "NOT match(%s, %s)", + v3.FilterOperatorIn: "IN", + v3.FilterOperatorNotIn: "NOT IN", + v3.FilterOperatorExists: "JSON_EXISTS(%s, '$.%s')", + v3.FilterOperatorNotExists: "NOT JSON_EXISTS(%s, '$.%s')", + v3.FilterOperatorHas: "has(%s, %s)", + v3.FilterOperatorNotHas: "NOT has(%s, %s)", +} + +func GetJSONFilter(item v3.FilterItem) (string, error) { + + dataType := item.Key.DataType + isArray := false + // check if its an array and handle it + if val, ok := logsV3.ArrayValueTypeMapping[string(item.Key.DataType)]; ok { + if item.Operator != v3.FilterOperatorHas && item.Operator != v3.FilterOperatorNotHas { + return "", fmt.Errorf("only has operator is supported for array") + } + isArray = true + dataType = v3.AttributeKeyDataType(val) + } + + key, err := logsV3.GetJSONFilterKey(item.Key, item.Operator, isArray) + if err != nil { + return "", err + } + + // non array + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + + var value interface{} + if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + value, err = utils.ValidateAndCastValue(item.Value, dataType) + if err != nil { + return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) + } + } + + var filter string + if logsOp, ok := jsonLogOperators[op]; ok { + switch op { + case v3.FilterOperatorExists, v3.FilterOperatorNotExists: + filter = fmt.Sprintf(logsOp, key, logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:])) + case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex, v3.FilterOperatorHas, v3.FilterOperatorNotHas: + fmtVal := utils.ClickHouseFormattedValue(value) + filter = fmt.Sprintf(logsOp, key, fmtVal) + case v3.FilterOperatorContains, v3.FilterOperatorNotContains: + val := utils.QuoteEscapedString(fmt.Sprintf("%v", item.Value)) + filter = fmt.Sprintf("%s %s '%%%s%%'", key, logsOp, val) + default: + fmtVal := utils.ClickHouseFormattedValue(value) + filter = fmt.Sprintf("%s %s %s", key, logsOp, fmtVal) + } + } else { + return "", fmt.Errorf("unsupported operator: %s", op) + } + + filters := []string{} + + pathFilter := logsV3.GetPathIndexFilter(item.Key.Key) + if pathFilter != "" { + filters = append(filters, pathFilter) + } + if op == v3.FilterOperatorContains || + op == v3.FilterOperatorEqual || + op == v3.FilterOperatorHas { + val, ok := item.Value.(string) + if ok && len(val) >= logsV3.NGRAM_SIZE { + filters = append(filters, fmt.Sprintf("lower(body) like lower('%%%s%%')", utils.QuoteEscapedString(strings.ToLower(val)))) + } + } + + // add exists check for non array items as default values of int/float/bool will corrupt the results + if !isArray && !(item.Operator == v3.FilterOperatorExists || item.Operator == v3.FilterOperatorNotExists) { + existsFilter := fmt.Sprintf("JSON_EXISTS(body, '$.%s')", logsV3.GetPath(strings.Split(item.Key.Key, ".")[1:])) + filter = fmt.Sprintf("%s AND %s", existsFilter, filter) + } + + filters = append(filters, filter) + + return strings.Join(filters, " AND "), nil +} diff --git a/pkg/query-service/app/logs/v4/json_filter_test.go b/pkg/query-service/app/logs/v4/json_filter_test.go new file mode 100644 index 0000000000..c8b2e44847 --- /dev/null +++ b/pkg/query-service/app/logs/v4/json_filter_test.go @@ -0,0 +1,200 @@ +package v4 + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +var testGetJSONFilterData = []struct { + Name string + FilterItem v3.FilterItem + Filter string + Error bool +}{ + { + Name: "Array membership string", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.requestor_list[*]", + DataType: "array(string)", + IsJSON: true, + }, + Operator: "has", + Value: "index_service", + }, + Filter: "lower(body) like lower('%requestor_list%') AND lower(body) like lower('%index_service%') AND has(JSONExtract(JSON_QUERY(body, '$.\"requestor_list\"[*]'), 'Array(String)'), 'index_service')", + }, + { + Name: "Array membership int64", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.int_numbers[*]", + DataType: "array(int64)", + IsJSON: true, + }, + Operator: "has", + Value: 2, + }, + Filter: "lower(body) like lower('%int_numbers%') AND has(JSONExtract(JSON_QUERY(body, '$.\"int_numbers\"[*]'), '" + logsV3.ARRAY_INT64 + "'), 2)", + }, + { + Name: "Array membership float64", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.nested_num[*].float_nums[*]", + DataType: "array(float64)", + IsJSON: true, + }, + Operator: "nhas", + Value: 2.2, + }, + Filter: "lower(body) like lower('%nested_num%float_nums%') AND NOT has(JSONExtract(JSON_QUERY(body, '$.\"nested_num\"[*].\"float_nums\"[*]'), '" + logsV3.ARRAY_FLOAT64 + "'), 2.200000)", + }, + { + Name: "Array membership bool", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.bool[*]", + DataType: "array(bool)", + IsJSON: true, + }, + Operator: "has", + Value: true, + }, + Filter: "lower(body) like lower('%bool%') AND has(JSONExtract(JSON_QUERY(body, '$.\"bool\"[*]'), '" + logsV3.ARRAY_BOOL + "'), true)", + }, + { + Name: "eq operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "=", + Value: "hello", + }, + Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') = 'hello'", + }, + { + Name: "eq operator number", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "int64", + IsJSON: true, + }, + Operator: "=", + Value: 1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') = 1", + }, + { + Name: "neq operator number", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "float64", + IsJSON: true, + }, + Operator: "=", + Value: 1.1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.FLOAT64 + "') = 1.100000", + }, + { + Name: "eq operator bool", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.boolkey", + DataType: "bool", + IsJSON: true, + }, + Operator: "=", + Value: true, + }, + Filter: "lower(body) like lower('%boolkey%') AND JSON_EXISTS(body, '$.\"boolkey\"') AND JSONExtract(JSON_VALUE(body, '$.\"boolkey\"'), '" + logsV3.BOOL + "') = true", + }, + { + Name: "greater than operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.status", + DataType: "int64", + IsJSON: true, + }, + Operator: ">", + Value: 1, + }, + Filter: "lower(body) like lower('%status%') AND JSON_EXISTS(body, '$.\"status\"') AND JSONExtract(JSON_VALUE(body, '$.\"status\"'), '" + logsV3.INT64 + "') > 1", + }, + { + Name: "regex operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "regex", + Value: "a*", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND match(JSON_VALUE(body, '$.\"message\"'), 'a*')", + }, + { + Name: "contains operator", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "contains", + Value: "a", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%a%'", + }, + { + Name: "contains operator with quotes", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "contains", + Value: "hello 'world'", + }, + Filter: "lower(body) like lower('%message%') AND lower(body) like lower('%hello \\'world\\'%') AND JSON_EXISTS(body, '$.\"message\"') AND JSON_VALUE(body, '$.\"message\"') LIKE '%hello \\'world\\'%'", + }, + { + Name: "exists", + FilterItem: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body.message", + DataType: "string", + IsJSON: true, + }, + Operator: "exists", + Value: "", + }, + Filter: "lower(body) like lower('%message%') AND JSON_EXISTS(body, '$.\"message\"')", + }, +} + +func TestGetJSONFilter(t *testing.T) { + for _, tt := range testGetJSONFilterData { + Convey("testGetJSONFilter", t, func() { + filter, err := GetJSONFilter(tt.FilterItem) + if tt.Error { + So(err, ShouldNotBeNil) + } else { + So(err, ShouldBeNil) + So(filter, ShouldEqual, tt.Filter) + } + }) + } +} diff --git a/pkg/query-service/app/logs/v4/query_builder.go b/pkg/query-service/app/logs/v4/query_builder.go index 08024756bd..b96c5b9113 100644 --- a/pkg/query-service/app/logs/v4/query_builder.go +++ b/pkg/query-service/app/logs/v4/query_builder.go @@ -1,7 +1,13 @@ package v4 import ( + "fmt" + "strings" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" ) var logOperators = map[v3.FilterOperator]string{ @@ -29,3 +35,504 @@ const ( DISTRIBUTED_LOGS_V2_RESOURCE = "distributed_logs_v2_resource" NANOSECOND = 1000000000 ) + +func getClickhouseLogsColumnDataType(columnDataType v3.AttributeKeyDataType) string { + if columnDataType == v3.AttributeKeyDataTypeFloat64 || columnDataType == v3.AttributeKeyDataTypeInt64 { + return "number" + } + if columnDataType == v3.AttributeKeyDataTypeBool { + return "bool" + } + return "string" +} + +func getClickhouseKey(key v3.AttributeKey) string { + // check if it is a top level static field + if _, ok := constants.StaticFieldsLogsV3[key.Key]; ok && key.Type == v3.AttributeKeyTypeUnspecified { + return key.Key + } + + //if the key is present in the topLevelColumn then it will be only searched in those columns, + //regardless if it is indexed/present again in resource or column attribute + if !key.IsColumn { + columnType := logsV3.GetClickhouseLogsColumnType(key.Type) + columnDataType := getClickhouseLogsColumnDataType(key.DataType) + return fmt.Sprintf("%s_%s['%s']", columnType, columnDataType, key.Key) + } + + // materialized column created from query + // https://github.com/SigNoz/signoz/pull/4775 + return "`" + utils.GetClickhouseColumnNameV2(string(key.Type), string(key.DataType), key.Key) + "`" +} + +func getSelectLabels(aggregatorOperator v3.AggregateOperator, groupBy []v3.AttributeKey) string { + var selectLabels string + if aggregatorOperator == v3.AggregateOperatorNoOp { + selectLabels = "" + } else { + for _, tag := range groupBy { + columnName := getClickhouseKey(tag) + selectLabels += fmt.Sprintf(" %s as `%s`,", columnName, tag.Key) + } + } + return selectLabels +} + +func getExistsNexistsFilter(op v3.FilterOperator, item v3.FilterItem) string { + if _, ok := constants.StaticFieldsLogsV3[item.Key.Key]; ok && item.Key.Type == v3.AttributeKeyTypeUnspecified { + // no exists filter for static fields as they exists everywhere + // TODO(nitya): Think what we can do here + return "" + } else if item.Key.IsColumn { + // get filter for materialized columns + val := true + if op == v3.FilterOperatorNotExists { + val = false + } + return fmt.Sprintf("%s_exists`=%v", strings.TrimSuffix(getClickhouseKey(item.Key), "`"), val) + } + // filter for non materialized attributes + columnType := logsV3.GetClickhouseLogsColumnType(item.Key.Type) + columnDataType := getClickhouseLogsColumnDataType(item.Key.DataType) + return fmt.Sprintf(logOperators[op], columnType, columnDataType, item.Key.Key) +} + +func buildAttributeFilter(item v3.FilterItem) (string, error) { + // check if the user is searching for value in all attributes + key := item.Key.Key + op := v3.FilterOperator(strings.ToLower(string(item.Operator))) + + var value interface{} + var err error + if op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + value, err = utils.ValidateAndCastValue(item.Value, item.Key.DataType) + if err != nil { + return "", fmt.Errorf("failed to validate and cast value for %s: %v", item.Key.Key, err) + } + } + + // TODO(nitya): as of now __attrs is only supports attributes_string. Discuss more on this + // also for eq and contains as now it does a exact match + if key == "__attrs" { + if (op != v3.FilterOperatorEqual && op != v3.FilterOperatorContains) || item.Key.DataType != v3.AttributeKeyDataTypeString { + return "", fmt.Errorf("only = operator and string data type is supported for __attrs") + } + val := utils.ClickHouseFormattedValue(item.Value) + return fmt.Sprintf("has(mapValues(attributes_string), %s)", val), nil + } + + keyName := getClickhouseKey(item.Key) + fmtVal := utils.ClickHouseFormattedValue(value) + + if logsOp, ok := logOperators[op]; ok { + switch op { + case v3.FilterOperatorExists, v3.FilterOperatorNotExists: + return getExistsNexistsFilter(op, item), nil + case v3.FilterOperatorRegex, v3.FilterOperatorNotRegex: + + return fmt.Sprintf(logsOp, keyName, fmtVal), nil + case v3.FilterOperatorContains, v3.FilterOperatorNotContains: + val := utils.QuoteEscapedStringForContains(fmt.Sprintf("%s", item.Value)) + // for body the contains is case insensitive + if keyName == BODY { + return fmt.Sprintf("lower(%s) %s lower('%%%s%%')", keyName, logsOp, val), nil + } else { + return fmt.Sprintf("%s %s '%%%s%%'", keyName, logsOp, val), nil + } + default: + // for use lower for like and ilike + if op == v3.FilterOperatorLike || op == v3.FilterOperatorNotLike { + if keyName == BODY { + keyName = fmt.Sprintf("lower(%s)", keyName) + fmtVal = fmt.Sprintf("lower(%s)", fmtVal) + } + } + return fmt.Sprintf("%s %s %s", keyName, logsOp, fmtVal), nil + } + } else { + return "", fmt.Errorf("unsupported operator: %s", op) + } +} + +func buildLogsTimeSeriesFilterQuery(fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) { + var conditions []string + + if fs == nil || len(fs.Items) == 0 { + return "", nil + } + + for _, item := range fs.Items { + // skip if it's a resource attribute + if item.Key.Type == v3.AttributeKeyTypeResource { + continue + } + + // if the filter is json filter + if item.Key.IsJSON { + filter, err := GetJSONFilter(item) + if err != nil { + return "", err + } + conditions = append(conditions, filter) + continue + } + + // generate the filter + filter, err := buildAttributeFilter(item) + if err != nil { + return "", err + } + conditions = append(conditions, filter) + + // add extra condition for map contains + // by default clickhouse is not able to utilize indexes for keys with all operators. + // mapContains forces the use of index. + op := v3.FilterOperator(strings.ToLower(string(item.Operator))) + if item.Key.IsColumn == false && op != v3.FilterOperatorExists && op != v3.FilterOperatorNotExists { + conditions = append(conditions, getExistsNexistsFilter(v3.FilterOperatorExists, item)) + } + } + + // add group by conditions to filter out log lines which doesn't have the key + for _, attr := range groupBy { + // skip if it's a resource attribute + if attr.Type == v3.AttributeKeyTypeResource { + continue + } + + if !attr.IsColumn { + columnType := logsV3.GetClickhouseLogsColumnType(attr.Type) + columnDataType := getClickhouseLogsColumnDataType(attr.DataType) + conditions = append(conditions, fmt.Sprintf("mapContains(%s_%s, '%s')", columnType, columnDataType, attr.Key)) + } else if attr.Type != v3.AttributeKeyTypeUnspecified { + // for materialzied columns and not the top level static fields + name := utils.GetClickhouseColumnNameV2(string(attr.Type), string(attr.DataType), attr.Key) + conditions = append(conditions, fmt.Sprintf("`%s_exists`=true", name)) + } + } + + // add conditions for aggregate attribute + if aggregateAttribute.Key != "" && aggregateAttribute.Type != v3.AttributeKeyTypeResource { + existsFilter := getExistsNexistsFilter(v3.FilterOperatorExists, v3.FilterItem{Key: aggregateAttribute}) + conditions = append(conditions, existsFilter) + } + + queryString := strings.Join(conditions, " AND ") + return queryString, nil +} + +// orderBy returns a string of comma separated tags for order by clause +// if there are remaining items which are not present in tags they are also added +// if the order is not specified, it defaults to ASC +func orderBy(panelType v3.PanelType, items []v3.OrderBy, tagLookup map[string]struct{}) []string { + var orderBy []string + + for _, item := range items { + if item.ColumnName == constants.SigNozOrderByValue { + orderBy = append(orderBy, fmt.Sprintf("value %s", item.Order)) + } else if _, ok := tagLookup[item.ColumnName]; ok { + orderBy = append(orderBy, fmt.Sprintf("`%s` %s", item.ColumnName, item.Order)) + } else if panelType == v3.PanelTypeList { + attr := v3.AttributeKey{Key: item.ColumnName, DataType: item.DataType, Type: item.Type, IsColumn: item.IsColumn} + name := getClickhouseKey(attr) + if item.IsColumn { + name = "`" + name + "`" + } + orderBy = append(orderBy, fmt.Sprintf("%s %s", name, item.Order)) + } + } + return orderBy +} + +func orderByAttributeKeyTags(panelType v3.PanelType, items []v3.OrderBy, tags []v3.AttributeKey) string { + + tagLookup := map[string]struct{}{} + for _, v := range tags { + tagLookup[v.Key] = struct{}{} + } + + orderByArray := orderBy(panelType, items, tagLookup) + + if len(orderByArray) == 0 { + if panelType == v3.PanelTypeList { + orderByArray = append(orderByArray, constants.TIMESTAMP+" DESC") + } else { + orderByArray = append(orderByArray, "value DESC") + } + } + + str := strings.Join(orderByArray, ",") + return str +} + +func generateAggregateClause(aggOp v3.AggregateOperator, + aggKey string, + step int64, + preferRPM bool, + timeFilter string, + whereClause string, + groupBy string, + having string, + orderBy string, +) (string, error) { + queryTmpl := " %s as value from signoz_logs." + DISTRIBUTED_LOGS_V2 + + " where " + timeFilter + "%s" + + "%s%s" + + "%s" + switch aggOp { + case v3.AggregateOperatorRate: + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("count(%s)/%f", aggKey, rate) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRateMin: + rate := float64(step) + if preferRPM { + rate = rate / 60.0 + } + + op := fmt.Sprintf("%s(%s)/%f", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey, rate) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99: + op := fmt.Sprintf("quantile(%v)(%s)", logsV3.AggregateOperatorToPercentile[aggOp], aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorAvg, v3.AggregateOperatorSum, v3.AggregateOperatorMin, v3.AggregateOperatorMax: + op := fmt.Sprintf("%s(%s)", logsV3.AggregateOperatorToSQLFunc[aggOp], aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorCount: + op := "toFloat64(count(*))" + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + case v3.AggregateOperatorCountDistinct: + op := fmt.Sprintf("toFloat64(count(distinct(%s)))", aggKey) + query := fmt.Sprintf(queryTmpl, op, whereClause, groupBy, having, orderBy) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator") + } +} + +func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.BuilderQuery, graphLimitQtype string, preferRPM bool) (string, error) { + // timerange will be sent in epoch millisecond + logsStart := utils.GetEpochNanoSecs(start) + logsEnd := utils.GetEpochNanoSecs(end) + + // -1800 this is added so that the bucket start considers all the fingerprints. + bucketStart := logsStart/NANOSECOND - 1800 + bucketEnd := logsEnd / NANOSECOND + + // timestamp filter , bucket_start filter is added for primary key + timeFilter := fmt.Sprintf("(timestamp >= %d AND timestamp <= %d) AND (ts_bucket_start >= %d AND ts_bucket_start <= %d)", logsStart, logsEnd, bucketStart, bucketEnd) + + // build the where clause for main table + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, mq.AggregateAttribute) + if err != nil { + return "", err + } + if filterSubQuery != "" { + filterSubQuery = " AND " + filterSubQuery + } + + // build the where clause for resource table + resourceSubQuery, err := buildResourceSubQuery(bucketStart, bucketEnd, mq.Filters, mq.GroupBy, mq.AggregateAttribute, false) + if err != nil { + return "", err + } + // join both the filter clauses + if resourceSubQuery != "" { + filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + ")" + } + + // get the select labels + selectLabels := getSelectLabels(mq.AggregateOperator, mq.GroupBy) + + // get the order by clause + orderBy := orderByAttributeKeyTags(panelType, mq.OrderBy, mq.GroupBy) + if panelType != v3.PanelTypeList && orderBy != "" { + orderBy = " order by " + orderBy + } + + // if noop create the query and return + if mq.AggregateOperator == v3.AggregateOperatorNoOp { + // with noop any filter or different order by other than ts will use new table + sqlSelect := constants.LogsSQLSelectV2 + queryTmpl := sqlSelect + "from signoz_logs.%s where %s%s order by %s" + query := fmt.Sprintf(queryTmpl, DISTRIBUTED_LOGS_V2, timeFilter, filterSubQuery, orderBy) + return query, nil + // ---- NOOP ends here ---- + } + + // ---- FOR aggregation queries ---- + + // get the having conditions + having := logsV3.Having(mq.Having) + if having != "" { + having = " having " + having + } + + // get the group by clause + groupBy := logsV3.GroupByAttributeKeyTags(panelType, graphLimitQtype, mq.GroupBy...) + if panelType != v3.PanelTypeList && groupBy != "" { + groupBy = " group by " + groupBy + } + + // get the aggregation key + aggregationKey := "" + if mq.AggregateAttribute.Key != "" { + aggregationKey = getClickhouseKey(mq.AggregateAttribute) + } + + // for limit queries, there are two queries formed + // in the second query we need to add the placeholder so that first query can be placed + if graphLimitQtype == constants.SecondQueryGraphLimit { + filterSubQuery = filterSubQuery + " AND " + fmt.Sprintf("(%s) GLOBAL IN (", logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy)) + "#LIMIT_PLACEHOLDER)" + } + + aggClause, err := generateAggregateClause(mq.AggregateOperator, aggregationKey, step, preferRPM, timeFilter, filterSubQuery, groupBy, having, orderBy) + if err != nil { + return "", err + } + + var queryTmplPrefix string + if graphLimitQtype == constants.FirstQueryGraphLimit { + queryTmplPrefix = "SELECT" + } else if panelType == v3.PanelTypeTable { + queryTmplPrefix = + "SELECT" + // step or aggregate interval is whole time period in case of table panel + step = (utils.GetEpochNanoSecs(end) - utils.GetEpochNanoSecs(start)) / NANOSECOND + } else if panelType == v3.PanelTypeGraph || panelType == v3.PanelTypeValue { + // Select the aggregate value for interval + queryTmplPrefix = + fmt.Sprintf("SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL %d SECOND) AS ts,", step) + } + + query := queryTmplPrefix + selectLabels + aggClause + + // for limit query this is the first query, + // we don't the the aggregation value here as we are just concerned with the names of group by + // for applying the limit + if graphLimitQtype == constants.FirstQueryGraphLimit { + query = "SELECT " + logsV3.GetSelectKeys(mq.AggregateOperator, mq.GroupBy) + " from (" + query + ")" + } + return query, nil +} + +func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) { + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy, v3.AttributeKey{}) + if err != nil { + return "", err + } + + // no values for bucket start and end + resourceSubQuery, err := buildResourceSubQuery(0, 0, mq.Filters, mq.GroupBy, mq.AggregateAttribute, true) + if err != nil { + return "", err + } + // join both the filter clauses + if resourceSubQuery != "" { + filterSubQuery = filterSubQuery + " AND (resource_fingerprint GLOBAL IN " + resourceSubQuery + } + + // the reader will add the timestamp and id filters + switch mq.AggregateOperator { + case v3.AggregateOperatorNoOp: + query := constants.LogsSQLSelectV2 + "from signoz_logs." + DISTRIBUTED_LOGS_V2 + " where " + if len(filterSubQuery) > 0 { + query = query + filterSubQuery + " AND " + } + + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator in live tail") + } +} + +// PrepareLogsQuery prepares the query for logs +func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options v3.LogQBOptions) (string, error) { + + // adjust the start and end time to the step interval + // NOTE: Disabling this as it's creating confusion between charts and actual data + // if panelType != v3.PanelTypeList { + // start = start - (start % (mq.StepInterval * 1000)) + // end = end - (end % (mq.StepInterval * 1000)) + // } + + if options.IsLivetailQuery { + query, err := buildLogsLiveTailQuery(mq) + if err != nil { + return "", err + } + return query, nil + } else if options.GraphLimitQtype == constants.FirstQueryGraphLimit { + // give me just the group_by names (no values) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + query = logsV3.AddLimitToQuery(query, mq.Limit) + + return query, nil + } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + return query, nil + } + + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype, options.PreferRPM) + if err != nil { + return "", err + } + if panelType == v3.PanelTypeValue { + query, err = logsV3.ReduceQuery(query, mq.ReduceTo, mq.AggregateOperator) + } + + if panelType == v3.PanelTypeList { + // check if limit exceeded + if mq.Limit > 0 && mq.Offset >= mq.Limit { + return "", fmt.Errorf("max limit exceeded") + } + + if mq.PageSize > 0 { + if mq.Limit > 0 && mq.Offset+mq.PageSize > mq.Limit { + query = logsV3.AddLimitToQuery(query, mq.Limit-mq.Offset) + } else { + query = logsV3.AddLimitToQuery(query, mq.PageSize) + } + + // add offset to the query only if it is not orderd by timestamp. + if !logsV3.IsOrderByTs(mq.OrderBy) { + query = logsV3.AddOffsetToQuery(query, mq.Offset) + } + + } else { + query = logsV3.AddLimitToQuery(query, mq.Limit) + } + } else if panelType == v3.PanelTypeTable { + query = logsV3.AddLimitToQuery(query, mq.Limit) + } + + return query, err +} diff --git a/pkg/query-service/app/logs/v4/query_builder_test.go b/pkg/query-service/app/logs/v4/query_builder_test.go new file mode 100644 index 0000000000..7bc831437c --- /dev/null +++ b/pkg/query-service/app/logs/v4/query_builder_test.go @@ -0,0 +1,1099 @@ +package v4 + +import ( + "testing" + + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func Test_getClickhouseKey(t *testing.T) { + type args struct { + key v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "attribute", + args: args{ + key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + }, + want: "attributes_string['user_name']", + }, + { + name: "resource", + args: args{ + key: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + }, + want: "resources_string['servicename']", + }, + { + name: "selected field", + args: args{ + key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: "`attribute_number_bytes`", + }, + { + name: "selected field resource", + args: args{ + key: v3.AttributeKey{Key: "servicename", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true}, + }, + want: "`resource_string_servicename`", + }, + { + name: "top level key", + args: args{ + key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString}, + }, + want: "trace_id", + }, + { + name: "name with -", + args: args{ + key: v3.AttributeKey{Key: "service-name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, + }, + want: "`attribute_string_service-name`", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getClickhouseKey(tt.args.key); got != tt.want { + t.Errorf("getClickhouseKey() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getSelectLabels(t *testing.T) { + type args struct { + aggregatorOperator v3.AggregateOperator + groupBy []v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "count", + args: args{ + aggregatorOperator: v3.AggregateOperatorCount, + groupBy: []v3.AttributeKey{{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: " attributes_string['user_name'] as `user_name`,", + }, + { + name: "multiple group by", + args: args{ + aggregatorOperator: v3.AggregateOperatorCount, + groupBy: []v3.AttributeKey{ + {Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + {Key: "service_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource, IsColumn: true}, + }, + }, + want: " attributes_string['user_name'] as `user_name`, `resource_string_service_name` as `service_name`,", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getSelectLabels(tt.args.aggregatorOperator, tt.args.groupBy); got != tt.want { + t.Errorf("getSelectLabels() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getExistsNexistsFilter(t *testing.T) { + type args struct { + op v3.FilterOperator + item v3.FilterItem + } + tests := []struct { + name string + args args + want string + }{ + { + name: "exists", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: "mapContains(attributes_string, 'user_name')", + }, + { + name: "not exists", + args: args{ + op: v3.FilterOperatorNotExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "user_name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + want: "not mapContains(attributes_string, 'user_name')", + }, + { + name: "exists mat column", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "bytes", DataType: v3.AttributeKeyDataTypeFloat64, Type: v3.AttributeKeyTypeTag, IsColumn: true}}, + }, + want: "`attribute_number_bytes_exists`=true", + }, + { + name: "exists top level column", + args: args{ + op: v3.FilterOperatorExists, + item: v3.FilterItem{Key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified}}, + }, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getExistsNexistsFilter(tt.args.op, tt.args.item); got != tt.want { + t.Errorf("getExistsNexistsFilter() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildAttributeFilter(t *testing.T) { + type args struct { + item v3.FilterItem + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build attribute filter", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + want: "resources_string['service.name'] = 'test'", + wantErr: false, + }, + { + name: "test for value search across all attributes", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "__attrs", + DataType: v3.AttributeKeyDataTypeString, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "has(mapValues(attributes_string), 'test')", + }, + { + name: "build attribute filter exists", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorExists, + }, + }, + want: "mapContains(resources_string, 'service.name')", + wantErr: false, + }, + { + name: "build attribute filter regex", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorRegex, + Value: "^test", + }, + }, + want: "match(resources_string['service.name'], '^test')", + }, + { + name: "build attribute filter contains", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "resources_string['service.name'] LIKE '%test%'", + }, + { + name: "build attribute filter contains- body", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Operator: v3.FilterOperatorContains, + Value: "test", + }, + }, + want: "lower(body) LIKE lower('%test%')", + }, + { + name: "build attribute filter like", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorLike, + Value: "test", + }, + }, + want: "resources_string['service.name'] LIKE 'test'", + }, + { + name: "build attribute filter like-body", + args: args{ + item: v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "body", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Operator: v3.FilterOperatorLike, + Value: "test", + }, + }, + want: "lower(body) LIKE lower('test')", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildAttributeFilter(tt.args.item) + if (err != nil) != tt.wantErr { + t.Errorf("buildAttributeFilter() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildAttributeFilter() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildLogsTimeSeriesFilterQuery(t *testing.T) { + type args struct { + fs *v3.FilterSet + groupBy []v3.AttributeKey + aggregateAttribute v3.AttributeKey + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build logs time series filter query", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "GET", + }, + }, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method')", + }, + { + name: "build logs time series filter query with group by and aggregate attribute", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + groupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + aggregateAttribute: v3.AttributeKey{ + Key: "test", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND mapContains(attributes_string, 'user_name') AND mapContains(attributes_string, 'test')", + }, + { + name: "build logs time series filter query with multiple group by and aggregate attribute", + args: args{ + fs: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + groupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + { + Key: "host", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + { + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "trace_id", + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + }, + aggregateAttribute: v3.AttributeKey{ + Key: "test", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + want: "attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') " + + "AND mapContains(attributes_string, 'user_name') AND `attribute_string_method_exists`=true AND mapContains(attributes_string, 'test')", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildLogsTimeSeriesFilterQuery(tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute) + if (err != nil) != tt.wantErr { + t.Errorf("buildLogsTimeSeriesFilterQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildLogsTimeSeriesFilterQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_orderByAttributeKeyTags(t *testing.T) { + type args struct { + panelType v3.PanelType + items []v3.OrderBy + tags []v3.AttributeKey + } + tests := []struct { + name string + args args + want string + }{ + { + name: "Test 1", + args: args{ + panelType: v3.PanelTypeGraph, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "desc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + }, + }, + want: "`name` asc,value desc", + }, + { + name: "Test Graph item not present in tag", + args: args{ + panelType: v3.PanelTypeGraph, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + { + ColumnName: "method", + Order: "asc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "bytes"}, + }, + }, + want: "`name` asc,`bytes` asc", + }, + { + name: "Test panel list", + args: args{ + panelType: v3.PanelTypeList, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "asc", + }, + { + ColumnName: "bytes", + Order: "asc", + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "bytes"}, + }, + }, + want: "`name` asc,value asc,`bytes` asc", + }, + { + name: "test 4", + args: args{ + panelType: v3.PanelTypeList, + items: []v3.OrderBy{ + { + ColumnName: "name", + Order: "asc", + }, + { + ColumnName: constants.SigNozOrderByValue, + Order: "asc", + }, + { + ColumnName: "response_time", + Order: "desc", + Key: "response_time", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }, + }, + tags: []v3.AttributeKey{ + {Key: "name"}, + {Key: "value"}, + }, + }, + want: "`name` asc,value asc,attributes_string['response_time'] desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := orderByAttributeKeyTags(tt.args.panelType, tt.args.items, tt.args.tags); got != tt.want { + t.Errorf("orderByAttributeKeyTags() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_generateAggregateClause(t *testing.T) { + type args struct { + op v3.AggregateOperator + aggKey string + step int64 + preferRPM bool + timeFilter string + whereClause string + groupBy string + having string + orderBy string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test rate", + args: args{ + op: v3.AggregateOperatorRate, + aggKey: "test", + step: 60, + preferRPM: false, + timeFilter: "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458)", + whereClause: " AND attributes_string['service.name'] = 'test'", + groupBy: " group by `user_name`", + having: "", + orderBy: " order by `user_name` desc", + }, + want: " count(test)/60.000000 as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND " + + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['service.name'] = 'test' " + + "group by `user_name` order by `user_name` desc", + }, + { + name: "test P10 with all args", + args: args{ + op: v3.AggregateOperatorRate, + aggKey: "test", + step: 60, + preferRPM: false, + timeFilter: "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458)", + whereClause: " AND attributes_string['service.name'] = 'test'", + groupBy: " group by `user_name`", + having: " having value > 10", + orderBy: " order by `user_name` desc", + }, + want: " count(test)/60.000000 as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND " + + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['service.name'] = 'test' group by `user_name` having value > 10 order by " + + "`user_name` desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := generateAggregateClause(tt.args.op, tt.args.aggKey, tt.args.step, tt.args.preferRPM, tt.args.timeFilter, tt.args.whereClause, tt.args.groupBy, tt.args.having, tt.args.orderBy) + if (err != nil) != tt.wantErr { + t.Errorf("generateAggreagteClause() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("generateAggreagteClause() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_buildLogsQuery(t *testing.T) { + type args struct { + panelType v3.PanelType + start int64 + end int64 + step int64 + mq *v3.BuilderQuery + graphLimitQtype string + preferRPM bool + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "build logs query", + args: args{ + panelType: v3.PanelTypeTable, + start: 1680066360726210000, + end: 1680066458000000000, + step: 1000, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorCount, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "user_name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "user_name", + Order: "desc", + }, + }, + }, + }, + want: "SELECT attributes_string['user_name'] as `user_name`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 " + + "where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') AND mapContains(attributes_string, 'user_name') " + + "group by `user_name` order by `user_name` desc", + }, + { + name: "build logs query noop", + args: args{ + panelType: v3.PanelTypeList, + start: 1680066360726210000, + end: 1680066458000000000, + step: 1000, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorNoOp, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "timestamp", + Order: "desc", + }, + }, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_string['service.name'] = 'test' AND mapContains(attributes_string, 'service.name') order by timestamp desc", + }, + { + name: "build logs query with all args", + args: args{ + panelType: v3.PanelTypeGraph, + start: 1680066360726210000, + end: 1680066458000000000, + step: 60, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorAvg, + AggregateAttribute: v3.AttributeKey{ + Key: "duration", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Filters: &v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + Operator: v3.FilterOperatorEqual, + Value: "test", + }, + { + Key: v3.AttributeKey{ + Key: "duration", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyTypeTag, + }, + Operator: v3.FilterOperatorGreaterThan, + Value: 1000, + }, + }, + }, + GroupBy: []v3.AttributeKey{ + { + Key: "host", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + }, + }, + OrderBy: []v3.OrderBy{ + { + ColumnName: "host", + Order: "desc", + }, + }, + }, + }, + want: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, resources_string['host'] as `host`, avg(attributes_number['duration']) as value " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND attributes_number['duration'] > 1000.000000 AND mapContains(attributes_number, 'duration') AND mapContains(attributes_number, 'duration') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) " + + "AND simpleJSONExtractString(labels, 'service.name') = 'test' AND labels like '%service.name%test%' AND ( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) " + + "group by `host`,ts order by `host` desc", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := buildLogsQuery(tt.args.panelType, tt.args.start, tt.args.end, tt.args.step, tt.args.mq, tt.args.graphLimitQtype, tt.args.preferRPM) + if (err != nil) != tt.wantErr { + t.Errorf("buildLogsQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("buildLogsQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestPrepareLogsQuery(t *testing.T) { + type args struct { + start int64 + end int64 + queryType v3.QueryType + panelType v3.PanelType + mq *v3.BuilderQuery + options v3.LogQBOptions + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "TABLE: Test count with JSON Filter Array, groupBy, orderBy", + args: args{ + start: 1680066360726210000, + end: 1680066458000000000, + panelType: v3.PanelTypeTable, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "body.requestor_list[*]", + DataType: "array(string)", + IsJSON: true, + }, + Operator: "has", + Value: "index_service", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + {Key: "host", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, + }, + OrderBy: []v3.OrderBy{ + {ColumnName: "name", Order: "DESC"}, + }, + }, + }, + want: "SELECT attributes_string['name'] as `name`, resources_string['host'] as `host`, toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where " + + "(timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND lower(body) like lower('%requestor_list%') " + + "AND lower(body) like lower('%index_service%') AND has(JSONExtract(JSON_QUERY(body, '$.\"requestor_list\"[*]'), 'Array(String)'), 'index_service') AND mapContains(attributes_string, 'name') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND " + + "( (simpleJSONHas(labels, 'host') AND labels like '%host%') ))) group by `name`,`host` order by `name` DESC", + }, + { + name: "Test TS with limit- first", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeGraph, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "="}, + }, + }, + Limit: 10, + GroupBy: []v3.AttributeKey{{Key: "user", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + }, + options: v3.LogQBOptions{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: true}, + }, + want: "SELECT `user` from (SELECT attributes_string['user'] as `user`, toFloat64(count(distinct(attributes_string['name']))) as value from signoz_logs.distributed_logs_v2 " + + "where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND attributes_string['method'] = 'GET' " + + "AND mapContains(attributes_string, 'method') AND mapContains(attributes_string, 'user') AND mapContains(attributes_string, 'name') AND (resource_fingerprint GLOBAL IN " + + "(SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND simpleJSONExtractString(labels, 'service.name') = 'app' " + + "AND labels like '%service.name%app%')) group by `user` order by value DESC) LIMIT 10", + }, + { + name: "Test TS with limit- second", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeGraph, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, + AggregateOperator: v3.AggregateOperatorCountDistinct, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "="}, + }, + }, + GroupBy: []v3.AttributeKey{{Key: "user", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 2, + }, + options: v3.LogQBOptions{GraphLimitQtype: constants.SecondQueryGraphLimit}, + }, + want: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 60 SECOND) AS ts, attributes_string['user'] as `user`, toFloat64(count(distinct(attributes_string['name']))) as value " + + "from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND " + + "attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND mapContains(attributes_string, 'user') AND mapContains(attributes_string, 'name') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (seen_at_ts_bucket_start >= 1680064560) AND (seen_at_ts_bucket_start <= 1680066458) AND " + + "simpleJSONExtractString(labels, 'service.name') = 'app' AND labels like '%service.name%app%')) AND (`user`) GLOBAL IN (#LIMIT_PLACEHOLDER) group by `user`,ts order by value DESC", + }, + { + name: "Live Tail Query", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND ", + }, + { + name: "Live Tail Query with resource attribute", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + {Key: v3.AttributeKey{Key: "service.name", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeResource}, Value: "app", Operator: "contains"}, + }, + }, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where attributes_string['method'] = 'GET' AND mapContains(attributes_string, 'method') AND " + + "(resource_fingerprint GLOBAL IN (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE simpleJSONExtractString(labels, 'service.name') LIKE '%app%' AND labels like '%service.name%app%' AND ", + }, + { + name: "Live Tail Query W/O filter", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + }, + options: v3.LogQBOptions{IsLivetailQuery: true}, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string " + + "from signoz_logs.distributed_logs_v2 where ", + }, + { + name: "Table query with limit", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeTable, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + Limit: 10, + }, + }, + want: "SELECT toFloat64(count(*)) as value from signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by value DESC LIMIT 10", + }, + { + name: "Test limit less than pageSize - order by ts", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + OrderBy: []v3.OrderBy{{ColumnName: constants.TIMESTAMP, Order: "desc", Key: constants.TIMESTAMP, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}}, + Limit: 1, + Offset: 0, + PageSize: 5, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "order by `timestamp` desc LIMIT 1", + }, + { + name: "Test limit greater than pageSize - order by ts", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "id", Type: v3.AttributeKeyTypeUnspecified, DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Operator: v3.FilterOperatorLessThan, Value: "2TNh4vp2TpiWyLt3SzuadLJF2s4"}, + }}, + OrderBy: []v3.OrderBy{{ColumnName: constants.TIMESTAMP, Order: "desc", Key: constants.TIMESTAMP, DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}}, + Limit: 100, + Offset: 10, + PageSize: 10, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "AND id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by `timestamp` desc LIMIT 10", + }, + { + name: "Test limit less than pageSize - order by custom", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "desc", Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 1, + Offset: 0, + PageSize: 5, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) " + + "order by attributes_string['method'] desc LIMIT 1 OFFSET 0", + }, + { + name: "Test limit greater than pageSize - order by custom", + args: args{ + start: 1680066360726, + end: 1680066458000, + queryType: v3.QueryTypeBuilder, + panelType: v3.PanelTypeList, + mq: &v3.BuilderQuery{ + QueryName: "A", + StepInterval: 60, + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "id", Type: v3.AttributeKeyTypeUnspecified, DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Operator: v3.FilterOperatorLessThan, Value: "2TNh4vp2TpiWyLt3SzuadLJF2s4"}, + }}, + OrderBy: []v3.OrderBy{{ColumnName: "method", Order: "desc", Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}}, + Limit: 100, + Offset: 50, + PageSize: 50, + }, + }, + want: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, attributes_string, attributes_number, attributes_bool, resources_string from " + + "signoz_logs.distributed_logs_v2 where (timestamp >= 1680066360726000000 AND timestamp <= 1680066458000000000) AND (ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) AND " + + "id < '2TNh4vp2TpiWyLt3SzuadLJF2s4' order by attributes_string['method'] desc LIMIT 50 OFFSET 50", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := PrepareLogsQuery(tt.args.start, tt.args.end, tt.args.queryType, tt.args.panelType, tt.args.mq, tt.args.options) + if (err != nil) != tt.wantErr { + t.Errorf("PrepareLogsQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("PrepareLogsQuery() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/query-service/app/logs/v4/resource_query_builder.go b/pkg/query-service/app/logs/v4/resource_query_builder.go index 004c9269fb..12d6c1a36a 100644 --- a/pkg/query-service/app/logs/v4/resource_query_builder.go +++ b/pkg/query-service/app/logs/v4/resource_query_builder.go @@ -164,7 +164,7 @@ func buildResourceFiltersFromAggregateAttribute(aggregateAttribute v3.AttributeK return "" } -func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey) (string, error) { +func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, groupBy []v3.AttributeKey, aggregateAttribute v3.AttributeKey, isLiveTail bool) (string, error) { // BUILD THE WHERE CLAUSE var conditions []string @@ -193,9 +193,14 @@ func buildResourceSubQuery(bucketStart, bucketEnd int64, fs *v3.FilterSet, group conditionStr := strings.Join(conditions, " AND ") // BUILD THE FINAL QUERY - query := fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd) - - query = "(" + query + conditionStr + ")" + var query string + if isLiveTail { + query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE ", DISTRIBUTED_LOGS_V2_RESOURCE) + query = "(" + query + conditionStr + } else { + query = fmt.Sprintf("SELECT fingerprint FROM signoz_logs.%s WHERE (seen_at_ts_bucket_start >= %d) AND (seen_at_ts_bucket_start <= %d) AND ", DISTRIBUTED_LOGS_V2_RESOURCE, bucketStart, bucketEnd) + query = "(" + query + conditionStr + ")" + } return query, nil } diff --git a/pkg/query-service/app/logs/v4/resource_query_builder_test.go b/pkg/query-service/app/logs/v4/resource_query_builder_test.go index 1616c29e08..130fd9e98c 100644 --- a/pkg/query-service/app/logs/v4/resource_query_builder_test.go +++ b/pkg/query-service/app/logs/v4/resource_query_builder_test.go @@ -469,7 +469,7 @@ func Test_buildResourceSubQuery(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute) + got, err := buildResourceSubQuery(tt.args.bucketStart, tt.args.bucketEnd, tt.args.fs, tt.args.groupBy, tt.args.aggregateAttribute, false) if (err != nil) != tt.wantErr { t.Errorf("buildResourceSubQuery() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 70eda959dc..71a1e39032 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -316,6 +316,12 @@ const ( "CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," + "CAST((attributes_bool_key, attributes_bool_value), 'Map(String, Bool)') as attributes_bool," + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string " + LogsSQLSelectV2 = "SELECT " + + "timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body, " + + "attributes_string, " + + "attributes_number, " + + "attributes_bool, " + + "resources_string " TracesExplorerViewSQLSelectWithSubQuery = "WITH subQuery AS (SELECT distinct on (traceID) traceID, durationNano, " + "serviceName, name FROM %s.%s WHERE parentSpanID = '' AND %s %s ORDER BY durationNano DESC " TracesExplorerViewSQLSelectQuery = "SELECT subQuery.serviceName, subQuery.name, count() AS " + @@ -380,6 +386,12 @@ var StaticFieldsLogsV3 = map[string]v3.AttributeKey{ Type: v3.AttributeKeyTypeUnspecified, IsColumn: true, }, + "__attrs": { + Key: "__attrs", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeUnspecified, + IsColumn: true, + }, } const SigNozOrderByValue = "#SIGNOZ_VALUE" diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 0128536ac2..c21d47229c 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -1290,3 +1290,9 @@ type URLShareableOptions struct { Format string `json:"format"` SelectColumns []AttributeKey `json:"selectColumns"` } + +type LogQBOptions struct { + GraphLimitQtype string + IsLivetailQuery bool + PreferRPM bool +} diff --git a/pkg/query-service/utils/format.go b/pkg/query-service/utils/format.go index c623d3e8e0..e9b7a0b7e3 100644 --- a/pkg/query-service/utils/format.go +++ b/pkg/query-service/utils/format.go @@ -272,6 +272,28 @@ func GetClickhouseColumnName(typeName string, dataType, field string) string { return colName } +func GetClickhouseColumnNameV2(typeName string, dataType, field string) string { + if typeName == string(v3.AttributeKeyTypeTag) { + typeName = constants.Attributes + } + + if typeName != string(v3.AttributeKeyTypeResource) { + typeName = typeName[:len(typeName)-1] + } + + dataType = strings.ToLower(dataType) + + if dataType == "int64" || dataType == "float64" { + dataType = "number" + } + + // if name contains . replace it with `$$` + field = strings.ReplaceAll(field, ".", "$$") + + colName := fmt.Sprintf("%s_%s_%s", strings.ToLower(typeName), dataType, field) + return colName +} + // GetEpochNanoSecs takes epoch and returns it in ns func GetEpochNanoSecs(epoch int64) int64 { temp := epoch From 10ebd0cad6edf27c6846f48a485d52644c7a4d0a Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Thu, 12 Sep 2024 10:58:07 +0530 Subject: [PATCH 06/25] feat: use new schema flag (#5930) --- ee/query-service/app/api/api.go | 3 ++ ee/query-service/app/db/reader.go | 3 +- ee/query-service/app/server.go | 13 +++++++-- ee/query-service/main.go | 3 ++ ee/query-service/rules/manager.go | 1 + .../app/clickhouseReader/reader.go | 7 ++++- pkg/query-service/app/http_handler.go | 28 +++++++++++------- pkg/query-service/app/querier/querier.go | 9 ++++-- pkg/query-service/app/querier/v2/querier.go | 9 ++++-- pkg/query-service/app/server.go | 29 +++++++++++-------- pkg/query-service/main.go | 3 ++ pkg/query-service/rules/manager.go | 12 ++++++++ pkg/query-service/rules/threshold_rule.go | 19 +++++++----- .../rules/threshold_rule_test.go | 20 ++++++------- .../tests/integration/test_utils.go | 1 + 15 files changed, 109 insertions(+), 51 deletions(-) diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 66b462e167..bb36fdf479 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -39,6 +39,8 @@ type APIHandlerOptions struct { Gateway *httputil.ReverseProxy // Querier Influx Interval FluxInterval time.Duration + + UseLogsNewSchema bool } type APIHandler struct { @@ -63,6 +65,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { LogsParsingPipelineController: opts.LogsParsingPipelineController, Cache: opts.Cache, FluxInterval: opts.FluxInterval, + UseLogsNewSchema: opts.UseLogsNewSchema, }) if err != nil { diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index b8326058ec..fcab1cb991 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -25,8 +25,9 @@ func NewDataConnector( maxOpenConns int, dialTimeout time.Duration, cluster string, + useLogsNewSchema bool, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index ee019e639a..9845ee670b 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -77,6 +77,7 @@ type ServerOptions struct { FluxInterval string Cluster string GatewayUrl string + UseLogsNewSchema bool } // Server runs HTTP api service @@ -154,6 +155,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.MaxOpenConns, serverOptions.DialTimeout, serverOptions.Cluster, + serverOptions.UseLogsNewSchema, ) go qb.Start(readerReady) reader = qb @@ -176,7 +178,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { localDB, reader, serverOptions.DisableRules, - lm) + lm, + serverOptions.UseLogsNewSchema, + ) if err != nil { return nil, err @@ -265,6 +269,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { Cache: c, FluxInterval: fluxInterval, Gateway: gatewayProxy, + UseLogsNewSchema: serverOptions.UseLogsNewSchema, } apiHandler, err := api.NewAPIHandler(apiOpts) @@ -728,7 +733,8 @@ func makeRulesManager( db *sqlx.DB, ch baseint.Reader, disableRules bool, - fm baseint.FeatureLookup) (*baserules.Manager, error) { + fm baseint.FeatureLookup, + useLogsNewSchema bool) (*baserules.Manager, error) { // create engine pqle, err := pqle.FromConfigPath(promConfigPath) @@ -756,7 +762,8 @@ func makeRulesManager( Reader: ch, EvalDelay: baseconst.GetEvalDelay(), - PrepareTaskFunc: rules.PrepareTaskFunc, + PrepareTaskFunc: rules.PrepareTaskFunc, + UseLogsNewSchema: useLogsNewSchema, } // create Manager diff --git a/ee/query-service/main.go b/ee/query-service/main.go index c5a03f4c0f..75a49500d0 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -87,6 +87,7 @@ func main() { var ruleRepoURL string var cluster string + var useLogsNewSchema bool var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool var preferSpanMetrics bool @@ -96,6 +97,7 @@ func main() { var dialTimeout time.Duration var gatewayUrl string + flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") @@ -134,6 +136,7 @@ func main() { FluxInterval: fluxInterval, Cluster: cluster, GatewayUrl: gatewayUrl, + UseLogsNewSchema: useLogsNewSchema, } // Read the jwt secret key diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index d3bc03f58a..2b80441f0c 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -20,6 +20,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) opts.Rule, opts.FF, opts.Reader, + opts.UseLogsNewSchema, baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), ) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2984fa0fa5..b3ef773da0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -132,6 +132,8 @@ type ClickHouseReader struct { liveTailRefreshSeconds int cluster string + + useLogsNewSchema bool } // NewTraceReader returns a TraceReader for the database @@ -143,6 +145,7 @@ func NewReader( maxOpenConns int, dialTimeout time.Duration, cluster string, + useLogsNewSchema bool, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -153,7 +156,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema) } func NewReaderFromClickhouseConnection( @@ -163,6 +166,7 @@ func NewReaderFromClickhouseConnection( configFile string, featureFlag interfaces.FeatureLookup, cluster string, + useLogsNewSchema bool, ) *ClickHouseReader { alertManager, err := am.New("") if err != nil { @@ -219,6 +223,7 @@ func NewReaderFromClickhouseConnection( featureFlags: featureFlag, cluster: cluster, queryProgressTracker: queryprogress.NewQueryProgressTracker(), + useLogsNewSchema: useLogsNewSchema, } } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 957ea5aaff..d347bf576e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -105,6 +105,8 @@ type APIHandler struct { // Websocket connection upgrader Upgrader *websocket.Upgrader + + UseLogsNewSchema bool } type APIHandlerOpts struct { @@ -140,6 +142,9 @@ type APIHandlerOpts struct { // Querier Influx Interval FluxInterval time.Duration + + // Use new schema + UseLogsNewSchema bool } // NewAPIHandler returns an APIHandler @@ -151,19 +156,21 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { } querierOpts := querier.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, } querier := querier.NewQuerier(querierOpts) @@ -185,6 +192,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { LogsParsingPipelineController: opts.LogsParsingPipelineController, querier: querier, querierV2: querierv2, + UseLogsNewSchema: opts.UseLogsNewSchema, } builderOpts := queryBuilder.QueryBuilderOptions{ diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 86a77da114..0663afd126 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -54,6 +54,8 @@ type querier struct { timeRanges [][]int returnedSeries []*v3.Series returnedErr error + + UseLogsNewSchema bool } type QuerierOptions struct { @@ -64,9 +66,10 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index d0c3a77d13..01cbf6d649 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -54,6 +54,8 @@ type querier struct { timeRanges [][]int returnedSeries []*v3.Series returnedErr error + + UseLogsNewSchema bool } type QuerierOptions struct { @@ -64,9 +66,10 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 557b082f42..a1fc0dd329 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -66,6 +66,7 @@ type ServerOptions struct { CacheConfigPath string FluxInterval string Cluster string + UseLogsNewSchema bool } // Server runs HTTP, Mux and a grpc server @@ -128,6 +129,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.MaxOpenConns, serverOptions.DialTimeout, serverOptions.Cluster, + serverOptions.UseLogsNewSchema, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader @@ -144,7 +146,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } <-readerReady - rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm) + rm, err := makeRulesManager(serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), serverOptions.RuleRepoURL, localDB, reader, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) if err != nil { return nil, err } @@ -197,6 +199,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { LogsParsingPipelineController: logParsingPipelineController, Cache: c, FluxInterval: fluxInterval, + UseLogsNewSchema: serverOptions.UseLogsNewSchema, }) if err != nil { return nil, err @@ -713,7 +716,8 @@ func makeRulesManager( db *sqlx.DB, ch interfaces.Reader, disableRules bool, - fm interfaces.FeatureLookup) (*rules.Manager, error) { + fm interfaces.FeatureLookup, + useLogsNewSchema bool) (*rules.Manager, error) { // create engine pqle, err := pqle.FromReader(ch) @@ -730,16 +734,17 @@ func makeRulesManager( // create manager opts managerOpts := &rules.ManagerOptions{ - NotifierOpts: notifierOpts, - PqlEngine: pqle, - RepoURL: ruleRepoURL, - DBConn: db, - Context: context.Background(), - Logger: nil, - DisableRules: disableRules, - FeatureFlags: fm, - Reader: ch, - EvalDelay: constants.GetEvalDelay(), + NotifierOpts: notifierOpts, + PqlEngine: pqle, + RepoURL: ruleRepoURL, + DBConn: db, + Context: context.Background(), + Logger: nil, + DisableRules: disableRules, + FeatureFlags: fm, + Reader: ch, + EvalDelay: constants.GetEvalDelay(), + UseLogsNewSchema: useLogsNewSchema, } // create Manager diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 3063e07b12..d1b191f248 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -33,6 +33,7 @@ func main() { // disables rule execution but allows change to the rule definition var disableRules bool + var useLogsNewSchema bool // the url used to build link in the alert messages in slack and other systems var ruleRepoURL, cacheConfigPath, fluxInterval string var cluster string @@ -43,6 +44,7 @@ func main() { var maxOpenConns int var dialTimeout time.Duration + flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") @@ -79,6 +81,7 @@ func main() { CacheConfigPath: cacheConfigPath, FluxInterval: fluxInterval, Cluster: cluster, + UseLogsNewSchema: useLogsNewSchema, } // Read the jwt secret key diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 120d674a9a..eaabc4f27a 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -35,6 +35,8 @@ type PrepareTaskOptions struct { FF interfaces.FeatureLookup ManagerOpts *ManagerOptions NotifyFunc NotifyFunc + + UseLogsNewSchema bool } const taskNamesuffix = "webAppEditor" @@ -75,6 +77,8 @@ type ManagerOptions struct { EvalDelay time.Duration PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) + + UseLogsNewSchema bool } // The Manager manages recording and alerting rules. @@ -96,6 +100,8 @@ type Manager struct { reader interfaces.Reader prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) + + UseLogsNewSchema bool } func defaultOptions(o *ManagerOptions) *ManagerOptions { @@ -130,6 +136,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { opts.Rule, opts.FF, opts.Reader, + opts.UseLogsNewSchema, WithEvalDelay(opts.ManagerOpts.EvalDelay), ) @@ -333,6 +340,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), + + UseLogsNewSchema: m.opts.UseLogsNewSchema, }) if err != nil { @@ -452,6 +461,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { FF: m.featureFlags, ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), + + UseLogsNewSchema: m.opts.UseLogsNewSchema, }) for _, r := range newTask.Rules() { @@ -794,6 +805,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m parsedRule, m.featureFlags, m.reader, + m.opts.UseLogsNewSchema, WithSendAlways(), WithSendUnmatched(), ) diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index d35798035e..964774500e 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -60,6 +60,7 @@ func NewThresholdRule( p *PostableRule, featureFlags interfaces.FeatureLookup, reader interfaces.Reader, + useLogsNewSchema bool, opts ...RuleOption, ) (*ThresholdRule, error) { @@ -77,17 +78,19 @@ func NewThresholdRule( } querierOption := querier.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, } t.querier = querier.NewQuerier(querierOption) diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 734347793d..ab37ad6af1 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -945,7 +945,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay if err != nil { assert.NoError(t, err) } @@ -994,7 +994,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1135,9 +1135,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1234,9 +1234,9 @@ func TestThresholdRuleNoData(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index 65140e5fc8..d060433dba 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -45,6 +45,7 @@ func NewMockClickhouseReader( "", featureFlags, "", + true, ) return reader, mockDB From 381a4de88a7d4b33e63e6586770a7fdf5293a85e Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 12 Sep 2024 12:48:50 +0530 Subject: [PATCH 07/25] chore: use json formatting for ClickHouse logs (#5241) Co-authored-by: Prashant Shahi --- deploy/docker-swarm/clickhouse-setup/clickhouse-config.xml | 3 +++ deploy/docker/clickhouse-setup/clickhouse-config.xml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/deploy/docker-swarm/clickhouse-setup/clickhouse-config.xml b/deploy/docker-swarm/clickhouse-setup/clickhouse-config.xml index 4e8dc00b30..f285997166 100644 --- a/deploy/docker-swarm/clickhouse-setup/clickhouse-config.xml +++ b/deploy/docker-swarm/clickhouse-setup/clickhouse-config.xml @@ -23,6 +23,9 @@ [1]: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105-L114 --> information + + json + /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.err.log information + + json + /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.err.log