diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 1d8d4e7b70..ba92f9f28f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -8,6 +8,13 @@ on: - release/v* jobs: + check-no-ee-references: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Run check + run: make check-no-ee-references + build-frontend: runs-on: ubuntu-latest steps: diff --git a/.scripts/commentLinesForSetup.sh b/.scripts/commentLinesForSetup.sh deleted file mode 100644 index c0dfd40e9f..0000000000 --- a/.scripts/commentLinesForSetup.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh - -# It Comments out the Line Query-Service & Frontend Section of deploy/docker/clickhouse-setup/docker-compose.yaml -# Update the Line Numbers when deploy/docker/clickhouse-setup/docker-compose.yaml chnages. -# Docs Ref.: https://github.com/SigNoz/signoz/blob/main/CONTRIBUTING.md#contribute-to-frontend-with-docker-installation-of-signoz - -sed -i 38,62's/.*/# &/' .././deploy/docker/clickhouse-setup/docker-compose.yaml diff --git a/Makefile b/Makefile index 5f4a3c1ac2..c110ebdaf2 100644 --- a/Makefile +++ b/Makefile @@ -178,6 +178,15 @@ clear-swarm-ch: @docker run --rm -v "$(PWD)/$(SWARM_DIRECTORY)/data:/pwd" busybox \ sh -c "cd /pwd && rm -rf clickhouse*/* zookeeper-*/*" +check-no-ee-references: + @echo "Checking for 'ee' package references in 'pkg' directory..." + @if grep -R --include="*.go" '.*/ee/.*' pkg/; then \ + echo "Error: Found references to 'ee' packages in 'pkg' directory"; \ + exit 1; \ + else \ + echo "No references to 'ee' packages found in 'pkg' directory"; \ + fi + test: go test ./pkg/query-service/app/metrics/... go test ./pkg/query-service/cache/... diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 7fb9317946..ee019e639a 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -746,10 +746,7 @@ func makeRulesManager( // create manager opts managerOpts := &baserules.ManagerOptions{ NotifierOpts: notifierOpts, - Queriers: &baserules.Queriers{ - PqlEngine: pqle, - Ch: ch.GetConn(), - }, + PqlEngine: pqle, RepoURL: ruleRepoURL, DBConn: db, Context: context.Background(), diff --git a/ee/query-service/constants/constants.go b/ee/query-service/constants/constants.go index 51d22e5b63..c1baa6320b 100644 --- a/ee/query-service/constants/constants.go +++ b/ee/query-service/constants/constants.go @@ -11,8 +11,6 @@ const ( var LicenseSignozIo = "https://license.signoz.io/api/v1" var LicenseAPIKey = GetOrDefaultEnv("SIGNOZ_LICENSE_API_KEY", "") var SaasSegmentKey = GetOrDefaultEnv("SIGNOZ_SAAS_SEGMENT_KEY", "") -var SpanRenderLimitStr = GetOrDefaultEnv("SPAN_RENDER_LIMIT", "2500") -var MaxSpansInTraceStr = GetOrDefaultEnv("MAX_SPANS_IN_TRACE", "250000") var FetchFeatures = GetOrDefaultEnv("FETCH_FEATURES", "false") var ZeusFeaturesURL = GetOrDefaultEnv("ZEUS_FEATURES_URL", "ZeusFeaturesURL") diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index 831fb52793..d3bc03f58a 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -18,11 +18,9 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) tr, err := baserules.NewThresholdRule( ruleId, opts.Rule, - baserules.ThresholdRuleOpts{ - EvalDelay: opts.ManagerOpts.EvalDelay, - }, opts.FF, opts.Reader, + baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), ) if err != nil { @@ -41,8 +39,8 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) ruleId, opts.Rule, opts.Logger, - baserules.PromRuleOpts{}, opts.Reader, + opts.ManagerOpts.PqlEngine, ) if err != nil { diff --git a/frontend/src/components/QuickFilters/FilterRenderers/Checkbox/Checkbox.tsx b/frontend/src/components/QuickFilters/FilterRenderers/Checkbox/Checkbox.tsx index fc9a71a7b1..dcf3cc8f3e 100644 --- a/frontend/src/components/QuickFilters/FilterRenderers/Checkbox/Checkbox.tsx +++ b/frontend/src/components/QuickFilters/FilterRenderers/Checkbox/Checkbox.tsx @@ -82,7 +82,9 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { ); const filterSync = currentQuery?.builder.queryData?.[ lastUsedQuery || 0 - ]?.filters?.items.find((item) => isEqual(item.key, filter.attributeKey)); + ]?.filters?.items.find((item) => + isEqual(item.key?.key, filter.attributeKey.key), + ); if (filterSync) { if (SELECTED_OPERATORS.includes(filterSync.op)) { @@ -127,8 +129,9 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { () => (currentQuery?.builder?.queryData?.[ lastUsedQuery || 0 - ]?.filters?.items?.filter((item) => isEqual(item.key, filter.attributeKey)) - ?.length || 0) > 1, + ]?.filters?.items?.filter((item) => + isEqual(item.key?.key, filter.attributeKey.key), + )?.length || 0) > 1, [currentQuery?.builder?.queryData, lastUsedQuery, filter.attributeKey], ); @@ -149,7 +152,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { items: idx === lastUsedQuery ? item.filters.items.filter( - (fil) => !isEqual(fil.key, filter.attributeKey), + (fil) => !isEqual(fil.key?.key, filter.attributeKey.key), ) : [...item.filters.items], }, @@ -161,7 +164,9 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { const isSomeFilterPresentForCurrentAttribute = currentQuery.builder.queryData?.[ lastUsedQuery || 0 - ]?.filters?.items?.some((item) => isEqual(item.key, filter.attributeKey)); + ]?.filters?.items?.some((item) => + isEqual(item.key?.key, filter.attributeKey.key), + ); const onChange = ( value: string, @@ -180,7 +185,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { : 'Only' : 'Only'; query.filters.items = query.filters.items.filter( - (q) => !isEqual(q.key, filter.attributeKey), + (q) => !isEqual(q.key?.key, filter.attributeKey.key), ); if (isOnlyOrAll === 'Only') { const newFilterItem: TagFilterItem = { @@ -193,12 +198,14 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { } } else if (query?.filters?.items) { if ( - query.filters?.items?.some((item) => isEqual(item.key, filter.attributeKey)) + query.filters?.items?.some((item) => + isEqual(item.key?.key, filter.attributeKey.key), + ) ) { // if there is already a running filter for the current attribute key then // we split the cases by which particular operator is present right now! const currentFilter = query.filters?.items?.find((q) => - isEqual(q.key, filter.attributeKey), + isEqual(q.key?.key, filter.attributeKey.key), ); if (currentFilter) { const runningOperator = currentFilter?.op; @@ -213,7 +220,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [...currentFilter.value, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -225,7 +232,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [currentFilter.value as string, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -242,11 +249,11 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { if (newFilter.value.length === 0) { query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } else { query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -255,7 +262,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { } else { // if not an array remove the whole thing altogether! query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } } @@ -271,7 +278,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [...currentFilter.value, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -283,7 +290,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [currentFilter.value as string, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -299,11 +306,11 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { if (newFilter.value.length === 0) { query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } else { query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; @@ -311,7 +318,7 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { } } else { query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } } @@ -324,14 +331,14 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [currentFilter.value as string, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; }); } else if (!checked) { query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } break; @@ -343,14 +350,14 @@ export default function CheckboxFilter(props: ICheckboxProps): JSX.Element { value: [currentFilter.value as string, value], }; query.filters.items = query.filters.items.map((item) => { - if (isEqual(item.key, filter.attributeKey)) { + if (isEqual(item.key?.key, filter.attributeKey.key)) { return newFilter; } return item; }); } else if (checked) { query.filters.items = query.filters.items.filter( - (item) => !isEqual(item.key, filter.attributeKey), + (item) => !isEqual(item.key?.key, filter.attributeKey.key), ); } break; diff --git a/frontend/src/constants/localStorage.ts b/frontend/src/constants/localStorage.ts index bab93a7ff1..4e6859a2dd 100644 --- a/frontend/src/constants/localStorage.ts +++ b/frontend/src/constants/localStorage.ts @@ -19,5 +19,6 @@ export enum LOCALSTORAGE { SHOW_EXPLORER_TOOLBAR = 'SHOW_EXPLORER_TOOLBAR', PINNED_ATTRIBUTES = 'PINNED_ATTRIBUTES', THEME_ANALYTICS_V1 = 'THEME_ANALYTICS_V1', + LAST_USED_SAVED_VIEWS = 'LAST_USED_SAVED_VIEWS', SHOW_LOGS_QUICK_FILTERS = 'SHOW_LOGS_QUICK_FILTERS', } diff --git a/frontend/src/container/ExplorerOptions/ExplorerOptions.tsx b/frontend/src/container/ExplorerOptions/ExplorerOptions.tsx index 44378e602b..5be22deb2e 100644 --- a/frontend/src/container/ExplorerOptions/ExplorerOptions.tsx +++ b/frontend/src/container/ExplorerOptions/ExplorerOptions.tsx @@ -19,6 +19,7 @@ import axios from 'axios'; import cx from 'classnames'; import { getViewDetailsUsingViewKey } from 'components/ExplorerCard/utils'; import { SOMETHING_WENT_WRONG } from 'constants/api'; +import { LOCALSTORAGE } from 'constants/localStorage'; import { QueryParams } from 'constants/query'; import { PANEL_TYPES } from 'constants/queryBuilder'; import ROUTES from 'constants/routes'; @@ -48,6 +49,7 @@ import { Dispatch, SetStateAction, useCallback, + useEffect, useMemo, useRef, useState, @@ -61,7 +63,9 @@ import { DataSource, StringOperators } from 'types/common/queryBuilder'; import AppReducer from 'types/reducer/app'; import { USER_ROLES } from 'types/roles'; +import { PreservedViewsTypes } from './constants'; import ExplorerOptionsHideArea from './ExplorerOptionsHideArea'; +import { PreservedViewsInLocalStorage } from './types'; import { DATASOURCE_VS_ROUTES, generateRGBAFromHex, @@ -90,6 +94,12 @@ function ExplorerOptions({ const history = useHistory(); const ref = useRef(null); const isDarkMode = useIsDarkMode(); + const isLogsExplorer = sourcepage === DataSource.LOGS; + + const PRESERVED_VIEW_LOCAL_STORAGE_KEY = LOCALSTORAGE.LAST_USED_SAVED_VIEWS; + const PRESERVED_VIEW_TYPE = isLogsExplorer + ? PreservedViewsTypes.LOGS + : PreservedViewsTypes.TRACES; const onModalToggle = useCallback((value: boolean) => { setIsExport(value); @@ -107,7 +117,7 @@ function ExplorerOptions({ logEvent('Traces Explorer: Save view clicked', { panelType, }); - } else if (sourcepage === DataSource.LOGS) { + } else if (isLogsExplorer) { logEvent('Logs Explorer: Save view clicked', { panelType, }); @@ -141,7 +151,7 @@ function ExplorerOptions({ logEvent('Traces Explorer: Create alert', { panelType, }); - } else if (sourcepage === DataSource.LOGS) { + } else if (isLogsExplorer) { logEvent('Logs Explorer: Create alert', { panelType, }); @@ -166,7 +176,7 @@ function ExplorerOptions({ logEvent('Traces Explorer: Add to dashboard clicked', { panelType, }); - } else if (sourcepage === DataSource.LOGS) { + } else if (isLogsExplorer) { logEvent('Logs Explorer: Add to dashboard clicked', { panelType, }); @@ -265,6 +275,31 @@ function ExplorerOptions({ [viewsData, handleExplorerTabChange], ); + const updatePreservedViewInLocalStorage = (option: { + key: string; + value: string; + }): void => { + // Retrieve stored views from local storage + const storedViews = localStorage.getItem(PRESERVED_VIEW_LOCAL_STORAGE_KEY); + + // Initialize or parse the stored views + const updatedViews: PreservedViewsInLocalStorage = storedViews + ? JSON.parse(storedViews) + : {}; + + // Update the views with the new selection + updatedViews[PRESERVED_VIEW_TYPE] = { + key: option.key, + value: option.value, + }; + + // Save the updated views back to local storage + localStorage.setItem( + PRESERVED_VIEW_LOCAL_STORAGE_KEY, + JSON.stringify(updatedViews), + ); + }; + const handleSelect = ( value: string, option: { key: string; value: string }, @@ -277,18 +312,42 @@ function ExplorerOptions({ panelType, viewName: option?.value, }); - } else if (sourcepage === DataSource.LOGS) { + } else if (isLogsExplorer) { logEvent('Logs Explorer: Select view', { panelType, viewName: option?.value, }); } + + updatePreservedViewInLocalStorage(option); + if (ref.current) { ref.current.blur(); } }; + const removeCurrentViewFromLocalStorage = (): void => { + // Retrieve stored views from local storage + const storedViews = localStorage.getItem(PRESERVED_VIEW_LOCAL_STORAGE_KEY); + + if (storedViews) { + // Parse the stored views + const parsedViews = JSON.parse(storedViews); + + // Remove the current view type from the parsed views + delete parsedViews[PRESERVED_VIEW_TYPE]; + + // Update local storage with the modified views + localStorage.setItem( + PRESERVED_VIEW_LOCAL_STORAGE_KEY, + JSON.stringify(parsedViews), + ); + } + }; + const handleClearSelect = (): void => { + removeCurrentViewFromLocalStorage(); + history.replace(DATASOURCE_VS_ROUTES[sourcepage]); }; @@ -323,7 +382,7 @@ function ExplorerOptions({ panelType, viewName: newViewName, }); - } else if (sourcepage === DataSource.LOGS) { + } else if (isLogsExplorer) { logEvent('Logs Explorer: Save view successful', { panelType, viewName: newViewName, @@ -358,6 +417,44 @@ function ExplorerOptions({ const isEditDeleteSupported = allowedRoles.includes(role as string); + const [ + isRecentlyUsedSavedViewSelected, + setIsRecentlyUsedSavedViewSelected, + ] = useState(false); + + useEffect(() => { + const parsedPreservedView = JSON.parse( + localStorage.getItem(PRESERVED_VIEW_LOCAL_STORAGE_KEY) || '{}', + ); + + const preservedView = parsedPreservedView[PRESERVED_VIEW_TYPE] || {}; + + let timeoutId: string | number | NodeJS.Timeout | undefined; + + if ( + !!preservedView?.key && + viewsData?.data?.data && + !(!!viewName || !!viewKey) && + !isRecentlyUsedSavedViewSelected + ) { + // prevent the race condition with useShareBuilderUrl + timeoutId = setTimeout(() => { + onMenuItemSelectHandler({ key: preservedView.key }); + }, 0); + setIsRecentlyUsedSavedViewSelected(false); + } + + return (): void => clearTimeout(timeoutId); + }, [ + PRESERVED_VIEW_LOCAL_STORAGE_KEY, + PRESERVED_VIEW_TYPE, + isRecentlyUsedSavedViewSelected, + onMenuItemSelectHandler, + viewKey, + viewName, + viewsData?.data?.data, + ]); + return (
{isQueryUpdated && !isExplorerOptionHidden && ( @@ -476,12 +573,12 @@ function ExplorerOptions({ - {sourcepage === DataSource.LOGS + {isLogsExplorer ? 'Learn more about Logs explorer ' : 'Learn more about Traces explorer '} >; } + +export type PreservedViewType = + | PreservedViewsTypes.LOGS + | PreservedViewsTypes.TRACES; + +export type PreservedViewsInLocalStorage = Partial< + Record +>; diff --git a/frontend/src/container/GridPanelSwitch/index.tsx b/frontend/src/container/GridPanelSwitch/index.tsx index 641f06e885..24fbde3c85 100644 --- a/frontend/src/container/GridPanelSwitch/index.tsx +++ b/frontend/src/container/GridPanelSwitch/index.tsx @@ -40,6 +40,7 @@ const GridPanelSwitch = forwardRef< data: panelData, query, thresholds, + sticky: true, }, [PANEL_TYPES.LIST]: null, [PANEL_TYPES.PIE]: null, diff --git a/frontend/src/container/GridTableComponent/index.tsx b/frontend/src/container/GridTableComponent/index.tsx index fab4d85e8b..676a745b65 100644 --- a/frontend/src/container/GridTableComponent/index.tsx +++ b/frontend/src/container/GridTableComponent/index.tsx @@ -23,6 +23,7 @@ function GridTableComponent({ thresholds, columnUnits, tableProcessedDataRef, + sticky, ...props }: GridTableComponentProps): JSX.Element { const { t } = useTranslation(['valueGraph']); @@ -146,6 +147,7 @@ function GridTableComponent({ loading={false} columns={newColumnData} dataSource={dataSource} + sticky={sticky} // eslint-disable-next-line react/jsx-props-no-spreading {...props} /> diff --git a/frontend/src/container/GridTableComponent/types.ts b/frontend/src/container/GridTableComponent/types.ts index 25ca647933..6088f9dcb8 100644 --- a/frontend/src/container/GridTableComponent/types.ts +++ b/frontend/src/container/GridTableComponent/types.ts @@ -13,6 +13,7 @@ export type GridTableComponentProps = { thresholds?: ThresholdProps[]; columnUnits?: ColumnUnit; tableProcessedDataRef?: React.MutableRefObject; + sticky?: TableProps['sticky']; } & Pick & Omit, 'columns' | 'dataSource'>; diff --git a/frontend/src/container/LogsExplorerTable/index.tsx b/frontend/src/container/LogsExplorerTable/index.tsx index 13883d3a62..e65bd61464 100644 --- a/frontend/src/container/LogsExplorerTable/index.tsx +++ b/frontend/src/container/LogsExplorerTable/index.tsx @@ -30,6 +30,7 @@ function LogsExplorerTable({ queryTableData={data} loading={isLoading} rootClassName="logs-table" + sticky /> ); } diff --git a/frontend/src/container/MetricsApplication/Tabs/Overview/TopOperationMetrics.tsx b/frontend/src/container/MetricsApplication/Tabs/Overview/TopOperationMetrics.tsx index 22224862a4..ed77512d89 100644 --- a/frontend/src/container/MetricsApplication/Tabs/Overview/TopOperationMetrics.tsx +++ b/frontend/src/container/MetricsApplication/Tabs/Overview/TopOperationMetrics.tsx @@ -114,6 +114,7 @@ function TopOperationMetrics(): JSX.Element { loading={isLoading} renderColumnCell={renderColumnCell} downloadOption={topOperationMetricsDownloadOptions} + sticky /> ); } diff --git a/frontend/src/container/MetricsApplication/Tabs/util.ts b/frontend/src/container/MetricsApplication/Tabs/util.ts index 6832fe9d02..3e4dbeceb4 100644 --- a/frontend/src/container/MetricsApplication/Tabs/util.ts +++ b/frontend/src/container/MetricsApplication/Tabs/util.ts @@ -4,6 +4,8 @@ import ROUTES from 'constants/routes'; import { routeConfig } from 'container/SideNav/config'; import { getQueryString } from 'container/SideNav/helper'; import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder'; +import useResourceAttribute from 'hooks/useResourceAttribute'; +import { resourceAttributesToTracesFilterItems } from 'hooks/useResourceAttribute/utils'; import history from 'lib/history'; import { traceFilterKeys } from 'pages/TracesExplorer/Filter/filterUtils'; import { Dispatch, SetStateAction, useMemo } from 'react'; @@ -142,7 +144,12 @@ export function useGetAPMToTracesQueries({ filters?: TagFilterItem[]; }): Query { const { updateAllQueriesOperators } = useQueryBuilder(); + const { queries } = useResourceAttribute(); + const resourceAttributesFilters = useMemo( + () => resourceAttributesToTracesFilterItems(queries), + [queries], + ); const finalFilters: TagFilterItem[] = []; let spanKindFilter: TagFilterItem; let dbCallFilter: TagFilterItem; @@ -185,6 +192,10 @@ export function useGetAPMToTracesQueries({ finalFilters.push(...filters); } + if (resourceAttributesFilters?.length) { + finalFilters.push(...resourceAttributesFilters); + } + return useMemo(() => { const updatedQuery = updateAllQueriesOperators( initialQueriesMap.traces, @@ -199,5 +210,5 @@ export function useGetAPMToTracesQueries({ finalFilters, ); // eslint-disable-next-line react-hooks/exhaustive-deps - }, [servicename, updateAllQueriesOperators]); + }, [servicename, queries, updateAllQueriesOperators]); } diff --git a/frontend/src/container/MetricsApplication/TopOperationsTable.tsx b/frontend/src/container/MetricsApplication/TopOperationsTable.tsx index d897c8a205..da90045b6e 100644 --- a/frontend/src/container/MetricsApplication/TopOperationsTable.tsx +++ b/frontend/src/container/MetricsApplication/TopOperationsTable.tsx @@ -50,19 +50,21 @@ function TopOperationsTable({ const { servicename: encodedServiceName } = params; const servicename = decodeURIComponent(encodedServiceName); - const opFilter: TagFilterItem = { - id: uuid().slice(0, 8), - key: { - key: 'name', - dataType: DataTypes.String, - type: 'tag', - isColumn: true, - isJSON: false, - id: 'name--string--tag--true', + const opFilters: TagFilterItem[] = [ + { + id: uuid().slice(0, 8), + key: { + key: 'name', + dataType: DataTypes.String, + type: 'tag', + isColumn: true, + isJSON: false, + id: 'name--string--tag--true', + }, + op: 'in', + value: [operation], }, - op: 'in', - value: [operation], - }; + ]; const preparedQuery: Query = { ...apmToTraceQuery, @@ -72,7 +74,7 @@ function TopOperationsTable({ ...item, filters: { ...item.filters, - items: [...item.filters.items, opFilter], + items: [...item.filters.items, ...opFilters], }, })), }, diff --git a/frontend/src/container/PanelWrapper/TablePanelWrapper.tsx b/frontend/src/container/PanelWrapper/TablePanelWrapper.tsx index db2098554a..0eab4143a2 100644 --- a/frontend/src/container/PanelWrapper/TablePanelWrapper.tsx +++ b/frontend/src/container/PanelWrapper/TablePanelWrapper.tsx @@ -1,3 +1,4 @@ +import { PANEL_TYPES } from 'constants/queryBuilder'; import GridTableComponent from 'container/GridTableComponent'; import { GRID_TABLE_CONFIG } from 'container/GridTableComponent/config'; @@ -18,6 +19,7 @@ function TablePanelWrapper({ thresholds={thresholds} columnUnits={widget.columnUnits} tableProcessedDataRef={tableProcessedDataRef} + sticky={widget.panelTypes === PANEL_TYPES.TABLE} // eslint-disable-next-line react/jsx-props-no-spreading {...GRID_TABLE_CONFIG} /> diff --git a/frontend/src/container/PanelWrapper/__tests__/__snapshots__/TablePanelWrapper.test.tsx.snap b/frontend/src/container/PanelWrapper/__tests__/__snapshots__/TablePanelWrapper.test.tsx.snap index d37ccf5841..1a930f740c 100644 --- a/frontend/src/container/PanelWrapper/__tests__/__snapshots__/TablePanelWrapper.test.tsx.snap +++ b/frontend/src/container/PanelWrapper/__tests__/__snapshots__/TablePanelWrapper.test.tsx.snap @@ -70,20 +70,13 @@ exports[`Table panel wrappper tests table should render fine with the query resp class="ant-table-container" >
- - - - + @@ -222,6 +215,23 @@ exports[`Table panel wrappper tests table should render fine with the query resp +
+
+
+ + + + + diff --git a/frontend/src/container/QueryTable/QueryTable.intefaces.ts b/frontend/src/container/QueryTable/QueryTable.intefaces.ts index f73c407d85..7576d796ec 100644 --- a/frontend/src/container/QueryTable/QueryTable.intefaces.ts +++ b/frontend/src/container/QueryTable/QueryTable.intefaces.ts @@ -18,4 +18,5 @@ export type QueryTableProps = Omit< downloadOption?: DownloadOptions; columns?: ColumnsType; dataSource?: RowData[]; + sticky?: TableProps['sticky']; }; diff --git a/frontend/src/container/QueryTable/QueryTable.styles.scss b/frontend/src/container/QueryTable/QueryTable.styles.scss index e78a4df239..e116356f9d 100644 --- a/frontend/src/container/QueryTable/QueryTable.styles.scss +++ b/frontend/src/container/QueryTable/QueryTable.styles.scss @@ -1,10 +1,16 @@ .query-table { - position: relative; - height: inherit; - .query-table--download { - position: absolute; - top: 15px; - right: 0px; - z-index: 1; - } -} \ No newline at end of file + position: relative; + height: inherit; + .query-table--download { + position: absolute; + top: 15px; + right: 0px; + z-index: 1; + } + + .ant-table { + &::-webkit-scrollbar { + width: 0.1rem; + } + } +} diff --git a/frontend/src/container/QueryTable/QueryTable.tsx b/frontend/src/container/QueryTable/QueryTable.tsx index ccf8221bfe..1786e5d4e3 100644 --- a/frontend/src/container/QueryTable/QueryTable.tsx +++ b/frontend/src/container/QueryTable/QueryTable.tsx @@ -19,6 +19,7 @@ export function QueryTable({ downloadOption, columns, dataSource, + sticky, ...props }: QueryTableProps): JSX.Element { const { isDownloadEnabled = false, fileName = '' } = downloadOption || {}; @@ -71,6 +72,7 @@ export function QueryTable({ dataSource={newDataSource} scroll={{ x: true }} pagination={paginationConfig} + sticky={sticky} // eslint-disable-next-line react/jsx-props-no-spreading {...props} /> diff --git a/frontend/src/container/TimeSeriesView/TimeSeriesView.tsx b/frontend/src/container/TimeSeriesView/TimeSeriesView.tsx index fdb5405473..225d115bca 100644 --- a/frontend/src/container/TimeSeriesView/TimeSeriesView.tsx +++ b/frontend/src/container/TimeSeriesView/TimeSeriesView.tsx @@ -140,6 +140,7 @@ function TimeSeriesView({ className="graph-container" style={{ height: '100%', width: '100%' }} ref={graphRef} + data-testid="time-series-graph" > {isLoading && (dataSource === DataSource.LOGS ? : )} diff --git a/frontend/src/container/TracesExplorer/ListView/utils.tsx b/frontend/src/container/TracesExplorer/ListView/utils.tsx index 254ff93296..a6201436d1 100644 --- a/frontend/src/container/TracesExplorer/ListView/utils.tsx +++ b/frontend/src/container/TracesExplorer/ListView/utils.tsx @@ -5,10 +5,26 @@ import { getMs } from 'container/Trace/Filters/Panel/PanelBody/Duration/util'; import { formUrlParams } from 'container/TraceDetail/utils'; import dayjs from 'dayjs'; import { RowData } from 'lib/query/createTableColumnsFromQuery'; +import { Link } from 'react-router-dom'; import { ILog } from 'types/api/logs/log'; import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse'; import { QueryDataV3 } from 'types/api/widgets/getQuery'; +function BlockLink({ + children, + to, +}: { + children: React.ReactNode; + to: string; +}): any { + // Display block to make the whole cell clickable + return ( + + {children} + + ); +} + export const transformDataWithDate = ( data: QueryDataV3[], ): Omit[] => @@ -36,7 +52,11 @@ export const getListColumns = ( typeof item === 'string' ? dayjs(item).format('YYYY-MM-DD HH:mm:ss.SSS') : dayjs(item / 1e6).format('YYYY-MM-DD HH:mm:ss.SSS'); - return {date}; + return ( + + {date} + + ); }, }, ]; @@ -49,22 +69,36 @@ export const getListColumns = ( width: 145, render: (value): JSX.Element => { if (value === '') { - return N/A; + return ( + + N/A + + ); } if (key === 'httpMethod' || key === 'responseStatusCode') { return ( - - {value} - + + + {value} + + ); } if (key === 'durationNano') { - return {getMs(value)}ms; + return ( + + {getMs(value)}ms + + ); } - return {value}; + return ( + + {value} + + ); }, responsive: ['md'], })) || []; diff --git a/frontend/src/container/TracesExplorer/TableView/index.tsx b/frontend/src/container/TracesExplorer/TableView/index.tsx index 775db816c4..849ea9bd8f 100644 --- a/frontend/src/container/TracesExplorer/TableView/index.tsx +++ b/frontend/src/container/TracesExplorer/TableView/index.tsx @@ -47,6 +47,7 @@ function TableView(): JSX.Element { query={stagedQuery || initialQueriesMap.traces} queryTableData={data?.payload?.data?.newResult?.data?.result || []} loading={isLoading} + sticky /> ); diff --git a/frontend/src/hooks/useResourceAttribute/utils.ts b/frontend/src/hooks/useResourceAttribute/utils.ts index 77fdddfbea..4dd8c56563 100644 --- a/frontend/src/hooks/useResourceAttribute/utils.ts +++ b/frontend/src/hooks/useResourceAttribute/utils.ts @@ -93,6 +93,22 @@ export const resourceAttributesToTagFilterItems = ( value: `${res.tagValue}`.split(','), })); }; +/* Convert resource attributes to trace filters items for queryBuilder */ +export const resourceAttributesToTracesFilterItems = ( + queries: IResourceAttribute[], +): TagFilterItem[] => + queries.map((res) => ({ + id: `${res.id}`, + key: { + key: convertMetricKeyToTrace(res.tagKey), + isColumn: false, + type: MetricsType.Resource, + dataType: DataTypes.String, + id: `${convertMetricKeyToTrace(res.tagKey)}--string--resource--true`, + }, + op: `${res.operator === 'Not IN' ? 'nin' : res.operator}`, + value: res.tagValue, + })); export const OperatorSchema: IOption[] = OperatorConversions.map( (operator) => ({ diff --git a/frontend/src/pages/TracesExplorer/Filter/Section.tsx b/frontend/src/pages/TracesExplorer/Filter/Section.tsx index 9212f610b0..5ba94b1307 100644 --- a/frontend/src/pages/TracesExplorer/Filter/Section.tsx +++ b/frontend/src/pages/TracesExplorer/Filter/Section.tsx @@ -37,6 +37,7 @@ export function Section(props: SectionProps): JSX.Element { 'hasError', 'durationNano', 'serviceName', + 'deployment.environment', ]), ), [selectedFilters], diff --git a/frontend/src/pages/TracesExplorer/Filter/filterUtils.ts b/frontend/src/pages/TracesExplorer/Filter/filterUtils.ts index 88f604a0dc..ea82bc52ef 100644 --- a/frontend/src/pages/TracesExplorer/Filter/filterUtils.ts +++ b/frontend/src/pages/TracesExplorer/Filter/filterUtils.ts @@ -8,10 +8,11 @@ import { import { TagFilterItem } from 'types/api/queryBuilder/queryBuilderData'; import { DataSource } from 'types/common/queryBuilder'; -export const AllTraceFilterKeyValue = { +export const AllTraceFilterKeyValue: Record = { durationNanoMin: 'Duration', durationNano: 'Duration', durationNanoMax: 'Duration', + 'deployment.environment': 'Environment', hasError: 'Status', serviceName: 'Service Name', name: 'Operation / Name', @@ -22,7 +23,7 @@ export const AllTraceFilterKeyValue = { httpRoute: 'HTTP Route', httpUrl: 'HTTP URL', traceID: 'Trace ID', -}; +} as const; export type AllTraceFilterKeys = keyof typeof AllTraceFilterKeyValue; @@ -64,7 +65,7 @@ export const addFilter = ( | undefined > >, - keys?: BaseAutocompleteData, + keys: BaseAutocompleteData, ): void => { setSelectedFilters((prevFilters) => { const isDuration = [ @@ -122,7 +123,7 @@ export const removeFilter = ( | undefined > >, - keys?: BaseAutocompleteData, + keys: BaseAutocompleteData, ): void => { setSelectedFilters((prevFilters) => { if (!prevFilters || !prevFilters[filterType]?.values.length) { @@ -202,6 +203,15 @@ export const traceFilterKeys: Record< isJSON: false, id: 'serviceName--string--tag--true', }, + + 'deployment.environment': { + key: 'deployment.environment', + dataType: DataTypes.String, + type: 'resource', + isColumn: false, + isJSON: false, + id: 'deployment.environment--string--resource--false', + }, name: { key: 'name', dataType: DataTypes.String, @@ -282,7 +292,7 @@ export const traceFilterKeys: Record< isJSON: false, id: 'durationNanoMax--float64--tag--true', }, -}; +} as const; interface AggregateValuesProps { value: AllTraceFilterKeys; diff --git a/frontend/src/pages/TracesExplorer/__test__/testUtils.ts b/frontend/src/pages/TracesExplorer/__test__/testUtils.ts index 80d96c9cf3..8a46740e6a 100644 --- a/frontend/src/pages/TracesExplorer/__test__/testUtils.ts +++ b/frontend/src/pages/TracesExplorer/__test__/testUtils.ts @@ -244,7 +244,12 @@ export function checkIfSectionIsNotOpen( expect(section.querySelector('.ant-collapse-item-active')).toBeNull(); } -export const defaultOpenSections = ['hasError', 'durationNano', 'serviceName']; +export const defaultOpenSections = [ + 'hasError', + 'durationNano', + 'serviceName', + 'deployment.environment', +]; export const defaultClosedSections = Object.keys(AllTraceFilterKeyValue).filter( (section) => diff --git a/frontend/src/providers/QueryBuilder.tsx b/frontend/src/providers/QueryBuilder.tsx index 305372eea6..b79538cb45 100644 --- a/frontend/src/providers/QueryBuilder.tsx +++ b/frontend/src/providers/QueryBuilder.tsx @@ -233,8 +233,6 @@ export function QueryBuilderProvider({ timeUpdated ? merge(currentQuery, newQueryState) : newQueryState, ); setQueryType(type); - // this is required to reset the last used query when navigating or initializing the query builder - setLastUsedQuery(0); }, [prepareQueryBuilderData, currentQuery], ); @@ -820,6 +818,8 @@ export function QueryBuilderProvider({ currentPathnameRef.current = location.pathname; setStagedQuery(null); + // reset the last used query to 0 when navigating away from the page + setLastUsedQuery(0); } }, [location, stagedQuery, currentQuery]); diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index bbf97c7adf..9ae242c19f 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -18,7 +18,6 @@ import ( promModel "github.com/prometheus/common/model" "go.uber.org/multierr" - "go.signoz.io/signoz/ee/query-service/constants" "go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/auth" @@ -255,7 +254,7 @@ func ParseSearchTracesParams(r *http.Request) (*model.SearchTracesParams, error) levelDownStr = "0" } if SpanRenderLimitStr == "" || SpanRenderLimitStr == "null" { - SpanRenderLimitStr = constants.SpanRenderLimitStr + SpanRenderLimitStr = baseconstants.SpanRenderLimitStr } levelUpInt, err := strconv.Atoi(levelUpStr) @@ -270,7 +269,7 @@ func ParseSearchTracesParams(r *http.Request) (*model.SearchTracesParams, error) if err != nil { return nil, err } - MaxSpansInTraceInt, err := strconv.Atoi(constants.MaxSpansInTraceStr) + MaxSpansInTraceInt, err := strconv.Atoi(baseconstants.MaxSpansInTraceStr) if err != nil { return nil, err } diff --git a/pkg/query-service/app/preferences/model.go b/pkg/query-service/app/preferences/model.go index 82b8e9c9f6..fce34653fb 100644 --- a/pkg/query-service/app/preferences/model.go +++ b/pkg/query-service/app/preferences/model.go @@ -7,7 +7,7 @@ import ( "strings" "github.com/jmoiron/sqlx" - "go.signoz.io/signoz/ee/query-service/model" + "go.signoz.io/signoz/pkg/query-service/model" ) type Range struct { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 77caa9170b..557b082f42 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -731,10 +731,7 @@ func makeRulesManager( // create manager opts managerOpts := &rules.ManagerOptions{ NotifierOpts: notifierOpts, - Queriers: &rules.Queriers{ - PqlEngine: pqle, - Ch: ch.GetConn(), - }, + PqlEngine: pqle, RepoURL: ruleRepoURL, DBConn: db, Context: context.Background(), diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 8c8a038f2f..70eda959dc 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -25,6 +25,11 @@ var ConfigSignozIo = "https://config.signoz.io/api/v1" var DEFAULT_TELEMETRY_ANONYMOUS = false +func IsOSSTelemetryEnabled() bool { + ossSegmentKey := GetOrDefaultEnv("OSS_TELEMETRY_ENABLED", "true") + return ossSegmentKey == "true" +} + const MaxAllowedPointsInTimeSeries = 300 func IsTelemetryEnabled() bool { @@ -421,3 +426,6 @@ const DefaultFilterSuggestionsAttributesLimit = 50 const MaxFilterSuggestionsAttributesLimit = 100 const DefaultFilterSuggestionsExamplesLimit = 2 const MaxFilterSuggestionsExamplesLimit = 10 + +var SpanRenderLimitStr = GetOrDefaultEnv("SPAN_RENDER_LIMIT", "2500") +var MaxSpansInTraceStr = GetOrDefaultEnv("MAX_SPANS_IN_TRACE", "250000") diff --git a/pkg/query-service/model/alerting.go b/pkg/query-service/model/alerting.go index c065fdfae6..4d54f6ae34 100644 --- a/pkg/query-service/model/alerting.go +++ b/pkg/query-service/model/alerting.go @@ -57,7 +57,7 @@ func (s *AlertState) UnmarshalJSON(b []byte) error { case "disabled": *s = StateDisabled default: - return errors.New("invalid alert state") + *s = StateInactive } return nil default: diff --git a/pkg/query-service/rules/base_rule.go b/pkg/query-service/rules/base_rule.go new file mode 100644 index 0000000000..492f6f685c --- /dev/null +++ b/pkg/query-service/rules/base_rule.go @@ -0,0 +1,567 @@ +package rules + +import ( + "context" + "fmt" + "math" + "net/url" + "sync" + "time" + + "go.signoz.io/signoz/pkg/query-service/converter" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" + "go.uber.org/zap" +) + +// BaseRule contains common fields and methods for all rule types +type BaseRule struct { + id string + name string + source string + handledRestart bool + + // Type of the rule + typ AlertType + + ruleCondition *RuleCondition + // evalWindow is the time window used for evaluating the rule + // i.e each time we lookback from the current time, we look at data for the last + // evalWindow duration + evalWindow time.Duration + // holdDuration is the duration for which the alert waits before firing + holdDuration time.Duration + + // evalDelay is the delay in evaluation of the rule + // this is useful in cases where the data is not available immediately + evalDelay time.Duration + + // holds the static set of labels and annotations for the rule + // these are the same for all alerts created for this rule + labels qslabels.BaseLabels + annotations qslabels.BaseLabels + // preferredChannels is the list of channels to send the alert to + // if the rule is triggered + preferredChannels []string + mtx sync.Mutex + // the time it took to evaluate the rule (most recent evaluation) + evaluationDuration time.Duration + // the timestamp of the last evaluation + evaluationTimestamp time.Time + + health RuleHealth + lastError error + active map[uint64]*Alert + + // lastTimestampWithDatapoints is the timestamp of the last datapoint we observed + // for this rule + // this is used for missing data alerts + lastTimestampWithDatapoints time.Time + + reader interfaces.Reader + + logger *zap.Logger + + // sendUnmatched sends observed metric values + // even if they dont match the rule condition. this is + // useful in testing the rule + sendUnmatched bool + + // sendAlways will send alert irresepective of resendDelay + // or other params + sendAlways bool +} + +type RuleOption func(*BaseRule) + +func WithSendAlways() RuleOption { + return func(r *BaseRule) { + r.sendAlways = true + } +} + +func WithSendUnmatched() RuleOption { + return func(r *BaseRule) { + r.sendUnmatched = true + } +} + +func WithEvalDelay(dur time.Duration) RuleOption { + return func(r *BaseRule) { + r.evalDelay = dur + } +} + +func WithLogger(logger *zap.Logger) RuleOption { + return func(r *BaseRule) { + r.logger = logger + } +} + +func NewBaseRule(id string, p *PostableRule, reader interfaces.Reader, opts ...RuleOption) (*BaseRule, error) { + if p.RuleCondition == nil || !p.RuleCondition.IsValid() { + return nil, fmt.Errorf("invalid rule condition") + } + + baseRule := &BaseRule{ + id: id, + name: p.AlertName, + source: p.Source, + ruleCondition: p.RuleCondition, + evalWindow: time.Duration(p.EvalWindow), + labels: qslabels.FromMap(p.Labels), + annotations: qslabels.FromMap(p.Annotations), + preferredChannels: p.PreferredChannels, + health: HealthUnknown, + active: map[uint64]*Alert{}, + reader: reader, + } + + if baseRule.evalWindow == 0 { + baseRule.evalWindow = 5 * time.Minute + } + + for _, opt := range opts { + opt(baseRule) + } + + return baseRule, nil +} + +func (r *BaseRule) targetVal() float64 { + if r.ruleCondition == nil || r.ruleCondition.Target == nil { + return 0 + } + + // get the converter for the target unit + unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) + // convert the target value to the y-axis unit + value := unitConverter.Convert(converter.Value{ + F: *r.ruleCondition.Target, + U: converter.Unit(r.ruleCondition.TargetUnit), + }, converter.Unit(r.Unit())) + + return value.F +} + +func (r *BaseRule) matchType() MatchType { + if r.ruleCondition == nil { + return AtleastOnce + } + return r.ruleCondition.MatchType +} + +func (r *BaseRule) compareOp() CompareOp { + if r.ruleCondition == nil { + return ValueIsEq + } + return r.ruleCondition.CompareOp +} + +func (r *BaseRule) currentAlerts() []*Alert { + r.mtx.Lock() + defer r.mtx.Unlock() + + alerts := make([]*Alert, 0, len(r.active)) + for _, a := range r.active { + anew := *a + alerts = append(alerts, &anew) + } + return alerts +} + +func (r *ThresholdRule) hostFromSource() string { + parsedUrl, err := url.Parse(r.source) + if err != nil { + return "" + } + if parsedUrl.Port() != "" { + return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port()) + } + return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname()) +} + +func (r *BaseRule) ID() string { return r.id } +func (r *BaseRule) Name() string { return r.name } +func (r *BaseRule) Condition() *RuleCondition { return r.ruleCondition } +func (r *BaseRule) Labels() qslabels.BaseLabels { return r.labels } +func (r *BaseRule) Annotations() qslabels.BaseLabels { return r.annotations } +func (r *BaseRule) PreferredChannels() []string { return r.preferredChannels } + +func (r *BaseRule) GeneratorURL() string { + return prepareRuleGeneratorURL(r.ID(), r.source) +} + +func (r *BaseRule) Unit() string { + if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { + return r.ruleCondition.CompositeQuery.Unit + } + return "" +} + +func (r *BaseRule) SetLastError(err error) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.lastError = err +} + +func (r *BaseRule) LastError() error { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.lastError +} + +func (r *BaseRule) SetHealth(health RuleHealth) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.health = health +} + +func (r *BaseRule) Health() RuleHealth { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.health +} + +func (r *BaseRule) SetEvaluationDuration(dur time.Duration) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationDuration = dur +} + +func (r *BaseRule) GetEvaluationDuration() time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationDuration +} + +func (r *BaseRule) SetEvaluationTimestamp(ts time.Time) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationTimestamp = ts +} + +func (r *BaseRule) GetEvaluationTimestamp() time.Time { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationTimestamp +} + +func (r *BaseRule) State() model.AlertState { + maxState := model.StateInactive + for _, a := range r.active { + if a.State > maxState { + maxState = a.State + } + } + return maxState +} + +func (r *BaseRule) ActiveAlerts() []*Alert { + var res []*Alert + for _, a := range r.currentAlerts() { + if a.ResolvedAt.IsZero() { + res = append(res, a) + } + } + return res +} + +func (r *BaseRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { + alerts := []*Alert{} + r.ForEachActiveAlert(func(alert *Alert) { + if alert.needsSending(ts, resendDelay) { + alert.LastSentAt = ts + delta := resendDelay + if interval > resendDelay { + delta = interval + } + alert.ValidUntil = ts.Add(4 * delta) + anew := *alert + alerts = append(alerts, &anew) + } + }) + notifyFunc(ctx, "", alerts...) +} + +func (r *BaseRule) ForEachActiveAlert(f func(*Alert)) { + r.mtx.Lock() + defer r.mtx.Unlock() + + for _, a := range r.active { + f(a) + } +} + +func (r *BaseRule) shouldAlert(series v3.Series) (Sample, bool) { + var alertSmpl Sample + var shouldAlert bool + var lbls qslabels.Labels + var lblsNormalized qslabels.Labels + + for name, value := range series.Labels { + lbls = append(lbls, qslabels.Label{Name: name, Value: value}) + lblsNormalized = append(lblsNormalized, qslabels.Label{Name: normalizeLabelName(name), Value: value}) + } + + series.Points = removeGroupinSetPoints(series) + + // nothing to evaluate + if len(series.Points) == 0 { + return alertSmpl, false + } + + switch r.matchType() { + case AtleastOnce: + // If any sample matches the condition, the rule is firing. + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value > r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value < r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + shouldAlert = true + break + } + } + } + case AllTheTimes: + // If all samples match the condition, the rule is firing. + shouldAlert = true + alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + for _, smpl := range series.Points { + if smpl.Value <= r.targetVal() { + shouldAlert = false + break + } + } + // use min value from the series + if shouldAlert { + var minValue float64 = math.Inf(1) + for _, smpl := range series.Points { + if smpl.Value < minValue { + minValue = smpl.Value + } + } + alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls} + } + } else if r.compareOp() == ValueIsBelow { + for _, smpl := range series.Points { + if smpl.Value >= r.targetVal() { + shouldAlert = false + break + } + } + if shouldAlert { + var maxValue float64 = math.Inf(-1) + for _, smpl := range series.Points { + if smpl.Value > maxValue { + maxValue = smpl.Value + } + } + alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls} + } + } else if r.compareOp() == ValueIsEq { + for _, smpl := range series.Points { + if smpl.Value != r.targetVal() { + shouldAlert = false + break + } + } + } else if r.compareOp() == ValueIsNotEq { + for _, smpl := range series.Points { + if smpl.Value == r.targetVal() { + shouldAlert = false + break + } + } + // use any non-inf or nan value from the series + if shouldAlert { + for _, smpl := range series.Points { + if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) { + alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} + break + } + } + } + } + case OnAverage: + // If the average of all samples matches the condition, the rule is firing. + var sum, count float64 + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + count++ + } + avg := sum / count + alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if avg > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if avg < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if avg == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if avg != r.targetVal() { + shouldAlert = true + } + } + case InTotal: + // If the sum of all samples matches the condition, the rule is firing. + var sum float64 + + for _, smpl := range series.Points { + if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { + continue + } + sum += smpl.Value + } + alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls} + if r.compareOp() == ValueIsAbove { + if sum > r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsBelow { + if sum < r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsEq { + if sum == r.targetVal() { + shouldAlert = true + } + } else if r.compareOp() == ValueIsNotEq { + if sum != r.targetVal() { + shouldAlert = true + } + } + } + return alertSmpl, shouldAlert +} + +func (r *BaseRule) RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { + zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) + revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} + + lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) + if err != nil { + return err + } + // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), + // the state would reset so we need to add the corresponding state changes to previously saved states + if !r.handledRestart && len(lastSavedState) > 0 { + zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) + l := map[uint64]v3.RuleStateHistory{} + for _, item := range itemsToAdd { + l[item.Fingerprint] = item + } + + shouldSkip := map[uint64]bool{} + + for _, item := range lastSavedState { + // for the last saved item with fingerprint, check if there is a corresponding entry in the current state + currentState, ok := l[item.Fingerprint] + if !ok { + // there was a state change in the past, but not in the current state + // if the state was firing, then we should add a resolved state change + if item.State == model.StateFiring || item.State == model.StateNoData { + item.State = model.StateInactive + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + // there is nothing to do if the prev state was normal + } else { + if item.State != currentState.State { + item.State = currentState.State + item.StateChanged = true + item.UnixMilli = time.Now().UnixMilli() + revisedItemsToAdd[item.Fingerprint] = item + } + } + // do not add this item to revisedItemsToAdd as it is already processed + shouldSkip[item.Fingerprint] = true + } + zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + // if there are any new state changes that were not saved, add them to the revised items + for _, item := range itemsToAdd { + if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { + revisedItemsToAdd[item.Fingerprint] = item + } + } + zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + newState := model.StateInactive + for _, item := range revisedItemsToAdd { + if item.State == model.StateFiring || item.State == model.StateNoData { + newState = model.StateFiring + break + } + } + zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) + + // if there is a change in the overall state, update the overall state + if lastSavedState[0].OverallState != newState { + for fingerprint, item := range revisedItemsToAdd { + item.OverallState = newState + item.OverallStateChanged = true + revisedItemsToAdd[fingerprint] = item + } + } + zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + } else { + for _, item := range itemsToAdd { + revisedItemsToAdd[item.Fingerprint] = item + } + } + + if len(revisedItemsToAdd) > 0 && r.reader != nil { + zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) + + entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) + for _, item := range revisedItemsToAdd { + entries = append(entries, item) + } + err := r.reader.AddRuleStateHistory(ctx, entries) + if err != nil { + zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) + } + } + r.handledRestart = true + + return nil +} diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index fe309334b1..120d674a9a 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -21,6 +21,7 @@ import ( am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" + pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" "go.signoz.io/signoz/pkg/query-service/telemetry" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -56,7 +57,7 @@ func prepareTaskName(ruleId interface{}) string { // ManagerOptions bundles options for the Manager. type ManagerOptions struct { NotifierOpts am.NotifierOptions - Queriers *Queriers + PqlEngine *pqle.PqlEngine // RepoURL is used to generate a backlink in sent alert messages RepoURL string @@ -127,11 +128,9 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { tr, err := NewThresholdRule( ruleId, opts.Rule, - ThresholdRuleOpts{ - EvalDelay: opts.ManagerOpts.EvalDelay, - }, opts.FF, opts.Reader, + WithEvalDelay(opts.ManagerOpts.EvalDelay), ) if err != nil { @@ -150,8 +149,8 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { ruleId, opts.Rule, opts.Logger, - PromRuleOpts{}, opts.Reader, + opts.ManagerOpts.PqlEngine, ) if err != nil { @@ -793,12 +792,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m rule, err = NewThresholdRule( alertname, parsedRule, - ThresholdRuleOpts{ - SendUnmatched: true, - SendAlways: true, - }, m.featureFlags, m.reader, + WithSendAlways(), + WithSendUnmatched(), ) if err != nil { @@ -813,10 +810,10 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m alertname, parsedRule, m.logger, - PromRuleOpts{ - SendAlways: true, - }, m.reader, + m.opts.PqlEngine, + WithSendAlways(), + WithSendUnmatched(), ) if err != nil { @@ -830,7 +827,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m // set timestamp to current utc time ts := time.Now().UTC() - count, err := rule.Eval(ctx, ts, m.opts.Queriers) + count, err := rule.Eval(ctx, ts) if err != nil { zap.L().Error("evaluating rule failed", zap.String("rule", rule.Name()), zap.Error(err)) return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed")) diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index 2241d32a4b..7136a88e97 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -4,295 +4,60 @@ import ( "context" "encoding/json" "fmt" - "math" - "sync" "time" "go.uber.org/zap" - plabels "github.com/prometheus/prometheus/model/labels" - pql "github.com/prometheus/prometheus/promql" - "go.signoz.io/signoz/pkg/query-service/converter" + "github.com/prometheus/prometheus/promql" "go.signoz.io/signoz/pkg/query-service/formatter" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" qslabels "go.signoz.io/signoz/pkg/query-service/utils/labels" "go.signoz.io/signoz/pkg/query-service/utils/times" "go.signoz.io/signoz/pkg/query-service/utils/timestamp" yaml "gopkg.in/yaml.v2" ) -type PromRuleOpts struct { - // SendAlways will send alert irresepective of resendDelay - // or other params - SendAlways bool -} - type PromRule struct { - id string - name string - source string - ruleCondition *RuleCondition - - evalWindow time.Duration - holdDuration time.Duration - labels plabels.Labels - annotations plabels.Labels - - preferredChannels []string - - mtx sync.Mutex - evaluationDuration time.Duration - evaluationTimestamp time.Time - - health RuleHealth - - lastError error - - // map of active alerts - active map[uint64]*Alert - - logger *zap.Logger - opts PromRuleOpts - - reader interfaces.Reader - - handledRestart bool + *BaseRule + pqlEngine *pqle.PqlEngine } func NewPromRule( id string, postableRule *PostableRule, logger *zap.Logger, - opts PromRuleOpts, reader interfaces.Reader, + pqlEngine *pqle.PqlEngine, + opts ...RuleOption, ) (*PromRule, error) { - if postableRule.RuleCondition == nil { - return nil, fmt.Errorf("no rule condition") - } else if !postableRule.RuleCondition.IsValid() { - return nil, fmt.Errorf("invalid rule condition") + baseRule, err := NewBaseRule(id, postableRule, reader, opts...) + if err != nil { + return nil, err } p := PromRule{ - id: id, - name: postableRule.AlertName, - source: postableRule.Source, - ruleCondition: postableRule.RuleCondition, - evalWindow: time.Duration(postableRule.EvalWindow), - labels: plabels.FromMap(postableRule.Labels), - annotations: plabels.FromMap(postableRule.Annotations), - preferredChannels: postableRule.PreferredChannels, - health: HealthUnknown, - active: map[uint64]*Alert{}, - logger: logger, - opts: opts, + BaseRule: baseRule, + pqlEngine: pqlEngine, } - p.reader = reader - if int64(p.evalWindow) == 0 { - p.evalWindow = 5 * time.Minute - } query, err := p.getPqlQuery() if err != nil { // can not generate a valid prom QL query return nil, err } - - zap.L().Info("creating new alerting rule", zap.String("name", p.name), zap.String("condition", p.ruleCondition.String()), zap.String("query", query)) - + zap.L().Info("creating new prom rule", zap.String("name", p.name), zap.String("query", query)) return &p, nil } -func (r *PromRule) Name() string { - return r.name -} - -func (r *PromRule) ID() string { - return r.id -} - -func (r *PromRule) Condition() *RuleCondition { - return r.ruleCondition -} - -// targetVal returns the target value for the rule condition -// when the y-axis and target units are non-empty, it -// converts the target value to the y-axis unit -func (r *PromRule) targetVal() float64 { - if r.ruleCondition == nil || r.ruleCondition.Target == nil { - return 0 - } - - // get the converter for the target unit - unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) - // convert the target value to the y-axis unit - value := unitConverter.Convert(converter.Value{ - F: *r.ruleCondition.Target, - U: converter.Unit(r.ruleCondition.TargetUnit), - }, converter.Unit(r.Unit())) - - return value.F -} - func (r *PromRule) Type() RuleType { return RuleTypeProm } -func (r *PromRule) GeneratorURL() string { - return prepareRuleGeneratorURL(r.ID(), r.source) -} - -func (r *PromRule) PreferredChannels() []string { - return r.preferredChannels -} - -func (r *PromRule) SetLastError(err error) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.lastError = err -} - -func (r *PromRule) LastError() error { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.lastError -} - -func (r *PromRule) SetHealth(health RuleHealth) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.health = health -} - -func (r *PromRule) Health() RuleHealth { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.health -} - -// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. -func (r *PromRule) SetEvaluationDuration(dur time.Duration) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationDuration = dur -} - -func (r *PromRule) HoldDuration() time.Duration { - return r.holdDuration -} - -func (r *PromRule) EvalWindow() time.Duration { - return r.evalWindow -} - -// Labels returns the labels of the alerting rule. -func (r *PromRule) Labels() qslabels.BaseLabels { - return r.labels -} - -// Annotations returns the annotations of the alerting rule. -func (r *PromRule) Annotations() qslabels.BaseLabels { - return r.annotations -} - -// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. -func (r *PromRule) GetEvaluationDuration() time.Duration { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationDuration -} - -// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. -func (r *PromRule) SetEvaluationTimestamp(ts time.Time) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationTimestamp = ts -} - -// GetEvaluationTimestamp returns the time the evaluation took place. -func (r *PromRule) GetEvaluationTimestamp() time.Time { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationTimestamp -} - -// State returns the maximum state of alert instances for this rule. -// StateFiring > StatePending > StateInactive -func (r *PromRule) State() model.AlertState { - - maxState := model.StateInactive - for _, a := range r.active { - if a.State > maxState { - maxState = a.State - } - } - return maxState -} - -func (r *PromRule) currentAlerts() []*Alert { - r.mtx.Lock() - defer r.mtx.Unlock() - - alerts := make([]*Alert, 0, len(r.active)) - - for _, a := range r.active { - anew := *a - alerts = append(alerts, &anew) - } - return alerts -} - -func (r *PromRule) ActiveAlerts() []*Alert { - var res []*Alert - for _, a := range r.currentAlerts() { - if a.ResolvedAt.IsZero() { - res = append(res, a) - } - } - return res -} - -func (r *PromRule) Unit() string { - if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { - return r.ruleCondition.CompositeQuery.Unit - } - return "" -} - -// ForEachActiveAlert runs the given function on each alert. -// This should be used when you want to use the actual alerts from the ThresholdRule -// and not on its copy. -// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. -func (r *PromRule) ForEachActiveAlert(f func(*Alert)) { - r.mtx.Lock() - defer r.mtx.Unlock() - - for _, a := range r.active { - f(a) - } -} - -func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { - alerts := []*Alert{} - r.ForEachActiveAlert(func(alert *Alert) { - if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { - alert.LastSentAt = ts - // Allow for two Eval or Alertmanager send failures. - delta := resendDelay - if interval > resendDelay { - delta = interval - } - alert.ValidUntil = ts.Add(4 * delta) - anew := *alert - alerts = append(alerts, &anew) - } - }) - notifyFunc(ctx, "", alerts...) -} - func (r *PromRule) GetSelectedQuery() string { if r.ruleCondition != nil { // If the user has explicitly set the selected query, we return that. @@ -327,117 +92,7 @@ func (r *PromRule) getPqlQuery() (string, error) { return "", fmt.Errorf("invalid promql rule query") } -func (r *PromRule) matchType() MatchType { - if r.ruleCondition == nil { - return AtleastOnce - } - return r.ruleCondition.MatchType -} - -func (r *PromRule) compareOp() CompareOp { - if r.ruleCondition == nil { - return ValueIsEq - } - return r.ruleCondition.CompareOp -} - -// TODO(srikanthccv): implement base rule and use for all types of rules -func (r *PromRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { - zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) - revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} - - lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) - if err != nil { - return err - } - // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), - // the state would reset so we need to add the corresponding state changes to previously saved states - if !r.handledRestart && len(lastSavedState) > 0 { - zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) - l := map[uint64]v3.RuleStateHistory{} - for _, item := range itemsToAdd { - l[item.Fingerprint] = item - } - - shouldSkip := map[uint64]bool{} - - for _, item := range lastSavedState { - // for the last saved item with fingerprint, check if there is a corresponding entry in the current state - currentState, ok := l[item.Fingerprint] - if !ok { - // there was a state change in the past, but not in the current state - // if the state was firing, then we should add a resolved state change - if item.State == model.StateFiring || item.State == model.StateNoData { - item.State = model.StateInactive - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - // there is nothing to do if the prev state was normal - } else { - if item.State != currentState.State { - item.State = currentState.State - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - } - // do not add this item to revisedItemsToAdd as it is already processed - shouldSkip[item.Fingerprint] = true - } - zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - // if there are any new state changes that were not saved, add them to the revised items - for _, item := range itemsToAdd { - if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { - revisedItemsToAdd[item.Fingerprint] = item - } - } - zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - newState := model.StateInactive - for _, item := range revisedItemsToAdd { - if item.State == model.StateFiring || item.State == model.StateNoData { - newState = model.StateFiring - break - } - } - zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) - - // if there is a change in the overall state, update the overall state - if lastSavedState[0].OverallState != newState { - for fingerprint, item := range revisedItemsToAdd { - item.OverallState = newState - item.OverallStateChanged = true - revisedItemsToAdd[fingerprint] = item - } - } - zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - } else { - for _, item := range itemsToAdd { - revisedItemsToAdd[item.Fingerprint] = item - } - } - - if len(revisedItemsToAdd) > 0 && r.reader != nil { - zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) - for _, item := range revisedItemsToAdd { - entries = append(entries, item) - } - err := r.reader.AddRuleStateHistory(ctx, entries) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } - r.handledRestart = true - - return nil -} - -func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { +func (r *PromRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { prevState := r.State() @@ -452,7 +107,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( return nil, err } zap.L().Info("evaluating promql query", zap.String("name", r.Name()), zap.String("query", q)) - res, err := queriers.PqlEngine.RunAlertQuery(ctx, q, start, end, interval) + res, err := r.pqlEngine.RunAlertQuery(ctx, q, start, end, interval) if err != nil { r.SetHealth(HealthBad) r.SetLastError(err) @@ -476,7 +131,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( continue } - alertSmpl, shouldAlert := r.shouldAlert(series) + alertSmpl, shouldAlert := r.shouldAlert(toCommonSeries(series)) if !shouldAlert { continue } @@ -484,7 +139,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( threshold := valueFormatter.Format(r.targetVal(), r.Unit()) - tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.F, r.Unit()), threshold) + tmplData := AlertTemplateData(l, valueFormatter.Format(alertSmpl.V, r.Unit()), threshold) // Inject some convenience variables that are easier to remember for users // who are not used to Go's templating system. defs := "{{$labels := .Labels}}{{$value := .Value}}{{$threshold := .Threshold}}" @@ -507,20 +162,20 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( return result } - lb := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName) - resultLabels := plabels.NewBuilder(alertSmpl.Metric).Del(plabels.MetricName).Labels() + lb := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel) + resultLabels := qslabels.NewBuilder(alertSmpl.Metric).Del(qslabels.MetricNameLabel).Labels() - for _, l := range r.labels { - lb.Set(l.Name, expand(l.Value)) + for name, value := range r.labels.Map() { + lb.Set(name, expand(value)) } lb.Set(qslabels.AlertNameLabel, r.Name()) lb.Set(qslabels.AlertRuleIdLabel, r.ID()) lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL()) - annotations := make(plabels.Labels, 0, len(r.annotations)) - for _, a := range r.annotations { - annotations = append(annotations, plabels.Label{Name: a.Name, Value: expand(a.Value)}) + annotations := make(qslabels.Labels, 0, len(r.annotations.Map())) + for name, value := range r.annotations.Map() { + annotations = append(annotations, qslabels.Label{Name: name, Value: expand(value)}) } lbs := lb.Labels() @@ -542,7 +197,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( Annotations: annotations, ActiveAt: ts, State: model.StatePending, - Value: alertSmpl.F, + Value: alertSmpl.V, GeneratorURL: r.GeneratorURL(), Receivers: r.preferredChannels, } @@ -626,169 +281,11 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( itemsToAdd[idx] = item } - r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) return len(r.active), nil } -func (r *PromRule) shouldAlert(series pql.Series) (pql.Sample, bool) { - var alertSmpl pql.Sample - var shouldAlert bool - switch r.matchType() { - case AtleastOnce: - // If any sample matches the condition, the rule is firing. - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Floats { - if smpl.F > r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Floats { - if smpl.F < r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Floats { - if smpl.F == r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Floats { - if smpl.F != r.targetVal() { - alertSmpl = pql.Sample{F: smpl.F, T: smpl.T, Metric: series.Metric} - shouldAlert = true - break - } - } - } - case AllTheTimes: - // If all samples match the condition, the rule is firing. - shouldAlert = true - alertSmpl = pql.Sample{F: r.targetVal(), Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Floats { - if smpl.F <= r.targetVal() { - shouldAlert = false - break - } - } - // use min value from the series - if shouldAlert { - var minValue float64 = math.Inf(1) - for _, smpl := range series.Floats { - if smpl.F < minValue { - minValue = smpl.F - } - } - alertSmpl = pql.Sample{F: minValue, Metric: series.Metric} - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Floats { - if smpl.F >= r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - var maxValue float64 = math.Inf(-1) - for _, smpl := range series.Floats { - if smpl.F > maxValue { - maxValue = smpl.F - } - } - alertSmpl = pql.Sample{F: maxValue, Metric: series.Metric} - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Floats { - if smpl.F != r.targetVal() { - shouldAlert = false - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Floats { - if smpl.F == r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - for _, smpl := range series.Floats { - if !math.IsInf(smpl.F, 0) && !math.IsNaN(smpl.F) { - alertSmpl = pql.Sample{F: smpl.F, Metric: series.Metric} - break - } - } - } - } - case OnAverage: - // If the average of all samples matches the condition, the rule is firing. - var sum float64 - for _, smpl := range series.Floats { - if math.IsNaN(smpl.F) { - continue - } - sum += smpl.F - } - avg := sum / float64(len(series.Floats)) - alertSmpl = pql.Sample{F: avg, Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - if avg > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if avg < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if avg == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if avg != r.targetVal() { - shouldAlert = true - } - } - case InTotal: - // If the sum of all samples matches the condition, the rule is firing. - var sum float64 - for _, smpl := range series.Floats { - if math.IsNaN(smpl.F) { - continue - } - sum += smpl.F - } - alertSmpl = pql.Sample{F: sum, Metric: series.Metric} - if r.compareOp() == ValueIsAbove { - if sum > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if sum < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if sum == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if sum != r.targetVal() { - shouldAlert = true - } - } - } - return alertSmpl, shouldAlert -} - func (r *PromRule) String() string { ar := PostableRule{ @@ -807,3 +304,27 @@ func (r *PromRule) String() string { return string(byt) } + +func toCommonSeries(series promql.Series) v3.Series { + commonSeries := v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + for _, lbl := range series.Metric { + commonSeries.Labels[lbl.Name] = lbl.Value + commonSeries.LabelsArray = append(commonSeries.LabelsArray, map[string]string{ + lbl.Name: lbl.Value, + }) + } + + for _, f := range series.Floats { + commonSeries.Points = append(commonSeries.Points, v3.Point{ + Timestamp: f.T, + Value: f.F, + }) + } + + return commonSeries +} diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index 032fc227f2..f78994430a 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -367,7 +367,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) { } ctx = context.WithValue(ctx, common.LogCommentKey, kvs) - _, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index fef7630bbd..7c559d1eee 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -656,12 +656,12 @@ func TestPromRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewPromRule("69", &postableRule, zap.NewNop(), PromRuleOpts{}, nil) + rule, err := NewPromRule("69", &postableRule, zap.NewNop(), nil, nil) if err != nil { assert.NoError(t, err) } - _, shoulAlert := rule.shouldAlert(c.values) + _, shoulAlert := rule.shouldAlert(toCommonSeries(c.values)) assert.Equal(t, c.expectAlert, shoulAlert, "Test case %d", idx) } } diff --git a/pkg/query-service/rules/queriers.go b/pkg/query-service/rules/queriers.go index 2739e04280..1e8c7fa083 100644 --- a/pkg/query-service/rules/queriers.go +++ b/pkg/query-service/rules/queriers.go @@ -1,21 +1 @@ package rules - -import ( - "github.com/ClickHouse/clickhouse-go/v2" - pqle "go.signoz.io/signoz/pkg/query-service/pqlEngine" -) - -// Queriers register the options for querying metrics or event sources -// which return a condition that results in a alert. Currently we support -// promql engine and clickhouse queries but in future we may include -// api readers for Machine Learning (ML) use cases. -// Note: each rule will pick up the querier it is interested in -// and use it. This allows rules to have flexibility in choosing -// the query engines. -type Queriers struct { - // promql engine - PqlEngine *pqle.PqlEngine - - // metric querier - Ch clickhouse.Conn -} diff --git a/pkg/query-service/rules/rule.go b/pkg/query-service/rules/rule.go index eeb7de9066..bb41a2be13 100644 --- a/pkg/query-service/rules/rule.go +++ b/pkg/query-service/rules/rule.go @@ -5,6 +5,7 @@ import ( "time" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils/labels" ) @@ -23,9 +24,8 @@ type Rule interface { PreferredChannels() []string - Eval(context.Context, time.Time, *Queriers) (interface{}, error) + Eval(context.Context, time.Time) (interface{}, error) String() string - // Query() string SetLastError(error) LastError() error SetHealth(RuleHealth) @@ -35,5 +35,7 @@ type Rule interface { SetEvaluationTimestamp(time.Time) GetEvaluationTimestamp() time.Time + RecordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error + SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) } diff --git a/pkg/query-service/rules/rule_task.go b/pkg/query-service/rules/rule_task.go index 61e154d74d..0a969bffc8 100644 --- a/pkg/query-service/rules/rule_task.go +++ b/pkg/query-service/rules/rule_task.go @@ -349,7 +349,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) { } ctx = context.WithValue(ctx, common.LogCommentKey, kvs) - _, err := rule.Eval(ctx, ts, g.opts.Queriers) + _, err := rule.Eval(ctx, ts) if err != nil { rule.SetHealth(HealthBad) rule.SetLastError(err) diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index e50fb4b761..d35798035e 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -9,17 +9,13 @@ import ( "net/url" "regexp" "sort" - "sync" "text/template" "time" "unicode" "go.uber.org/zap" - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "go.signoz.io/signoz/pkg/query-service/common" - "go.signoz.io/signoz/pkg/query-service/converter" "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/postprocess" @@ -41,38 +37,7 @@ import ( ) type ThresholdRule struct { - id string - name string - source string - ruleCondition *RuleCondition - // evalWindow is the time window used for evaluating the rule - // i.e each time we lookback from the current time, we look at data for the last - // evalWindow duration - evalWindow time.Duration - // holdDuration is the duration for which the alert waits before firing - holdDuration time.Duration - // holds the static set of labels and annotations for the rule - // these are the same for all alerts created for this rule - labels labels.Labels - annotations labels.Labels - - // preferredChannels is the list of channels to send the alert to - // if the rule is triggered - preferredChannels []string - mtx sync.Mutex - - // the time it took to evaluate the rule - evaluationDuration time.Duration - // the timestamp of the last evaluation - evaluationTimestamp time.Time - - health RuleHealth - - lastError error - - // map of active alerts - active map[uint64]*Alert - + *BaseRule // Ever since we introduced the new metrics query builder, the version is "v4" // for all the rules // if the version is "v3", then we use the old querier @@ -84,80 +49,31 @@ type ThresholdRule struct { // should be fast but we can still avoid the query if we have the data in memory temporalityMap map[string]map[v3.Temporality]bool - opts ThresholdRuleOpts - - // lastTimestampWithDatapoints is the timestamp of the last datapoint we observed - // for this rule - // this is used for missing data alerts - lastTimestampWithDatapoints time.Time - - // Type of the rule - typ AlertType - // querier is used for alerts created before the introduction of new metrics query builder querier interfaces.Querier // querierV2 is used for alerts created after the introduction of new metrics query builder querierV2 interfaces.Querier - - reader interfaces.Reader - evalDelay time.Duration - - handledRestart bool -} - -type ThresholdRuleOpts struct { - // sendUnmatched sends observed metric values - // even if they dont match the rule condition. this is - // useful in testing the rule - SendUnmatched bool - - // sendAlways will send alert irresepective of resendDelay - // or other params - SendAlways bool - - // EvalDelay is the time to wait for data to be available - // before evaluating the rule. This is useful in scenarios - // where data might not be available in the system immediately - // after the timestamp. - EvalDelay time.Duration } func NewThresholdRule( id string, p *PostableRule, - opts ThresholdRuleOpts, featureFlags interfaces.FeatureLookup, reader interfaces.Reader, + opts ...RuleOption, ) (*ThresholdRule, error) { zap.L().Info("creating new ThresholdRule", zap.String("id", id), zap.Any("opts", opts)) - if p.RuleCondition == nil { - return nil, fmt.Errorf("no rule condition") - } else if !p.RuleCondition.IsValid() { - return nil, fmt.Errorf("invalid rule condition") + baseRule, err := NewBaseRule(id, p, reader, opts...) + if err != nil { + return nil, err } t := ThresholdRule{ - id: id, - name: p.AlertName, - source: p.Source, - ruleCondition: p.RuleCondition, - evalWindow: time.Duration(p.EvalWindow), - labels: labels.FromMap(p.Labels), - annotations: labels.FromMap(p.Annotations), - preferredChannels: p.PreferredChannels, - health: HealthUnknown, - active: map[uint64]*Alert{}, - opts: opts, - typ: p.AlertType, - version: p.Version, - temporalityMap: make(map[string]map[v3.Temporality]bool), - evalDelay: opts.EvalDelay, - } - - if int64(t.evalWindow) == 0 { - t.evalWindow = 5 * time.Minute + BaseRule: baseRule, + version: p.Version, + temporalityMap: make(map[string]map[v3.Temporality]bool), } querierOption := querier.QuerierOptions{ @@ -177,203 +93,15 @@ func NewThresholdRule( t.querier = querier.NewQuerier(querierOption) t.querierV2 = querierV2.NewQuerier(querierOptsV2) t.reader = reader - - zap.L().Info("creating new ThresholdRule", zap.String("name", t.name), zap.String("id", t.id)) - return &t, nil } -func (r *ThresholdRule) Name() string { - return r.name -} - -func (r *ThresholdRule) ID() string { - return r.id -} - -func (r *ThresholdRule) Condition() *RuleCondition { - return r.ruleCondition -} - -func (r *ThresholdRule) GeneratorURL() string { - return prepareRuleGeneratorURL(r.ID(), r.source) -} - -func (r *ThresholdRule) PreferredChannels() []string { - return r.preferredChannels -} - -// targetVal returns the target value for the rule condition -// when the y-axis and target units are non-empty, it -// converts the target value to the y-axis unit -func (r *ThresholdRule) targetVal() float64 { - if r.ruleCondition == nil || r.ruleCondition.Target == nil { - return 0 - } - - // get the converter for the target unit - unitConverter := converter.FromUnit(converter.Unit(r.ruleCondition.TargetUnit)) - // convert the target value to the y-axis unit - value := unitConverter.Convert(converter.Value{ - F: *r.ruleCondition.Target, - U: converter.Unit(r.ruleCondition.TargetUnit), - }, converter.Unit(r.Unit())) - - return value.F -} - -func (r *ThresholdRule) matchType() MatchType { - if r.ruleCondition == nil { - return AtleastOnce - } - return r.ruleCondition.MatchType -} - -func (r *ThresholdRule) compareOp() CompareOp { - if r.ruleCondition == nil { - return ValueIsEq - } - return r.ruleCondition.CompareOp -} - func (r *ThresholdRule) Type() RuleType { return RuleTypeThreshold } -func (r *ThresholdRule) SetLastError(err error) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.lastError = err -} - -func (r *ThresholdRule) LastError() error { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.lastError -} - -func (r *ThresholdRule) SetHealth(health RuleHealth) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.health = health -} - -func (r *ThresholdRule) Health() RuleHealth { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.health -} - -// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. -func (r *ThresholdRule) SetEvaluationDuration(dur time.Duration) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationDuration = dur -} - -func (r *ThresholdRule) HoldDuration() time.Duration { - return r.holdDuration -} - -func (r *ThresholdRule) EvalWindow() time.Duration { - return r.evalWindow -} - -// Labels returns the labels of the alerting rule. -func (r *ThresholdRule) Labels() labels.BaseLabels { - return r.labels -} - -// Annotations returns the annotations of the alerting rule. -func (r *ThresholdRule) Annotations() labels.BaseLabels { - return r.annotations -} - -// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. -func (r *ThresholdRule) GetEvaluationDuration() time.Duration { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationDuration -} - -// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. -func (r *ThresholdRule) SetEvaluationTimestamp(ts time.Time) { - r.mtx.Lock() - defer r.mtx.Unlock() - r.evaluationTimestamp = ts -} - -// GetEvaluationTimestamp returns the time the evaluation took place. -func (r *ThresholdRule) GetEvaluationTimestamp() time.Time { - r.mtx.Lock() - defer r.mtx.Unlock() - return r.evaluationTimestamp -} - -// State returns the maximum state of alert instances for this rule. -// StateFiring > StatePending > StateInactive -func (r *ThresholdRule) State() model.AlertState { - - maxState := model.StateInactive - for _, a := range r.active { - if a.State > maxState { - maxState = a.State - } - } - return maxState -} - -func (r *ThresholdRule) currentAlerts() []*Alert { - r.mtx.Lock() - defer r.mtx.Unlock() - - alerts := make([]*Alert, 0, len(r.active)) - - for _, a := range r.active { - anew := *a - alerts = append(alerts, &anew) - } - return alerts -} - -func (r *ThresholdRule) ActiveAlerts() []*Alert { - var res []*Alert - for _, a := range r.currentAlerts() { - if a.ResolvedAt.IsZero() { - res = append(res, a) - } - } - return res -} - -func (r *ThresholdRule) FetchTemporality(ctx context.Context, metricNames []string, ch driver.Conn) (map[string]map[v3.Temporality]bool, error) { - - metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - - query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME) - - rows, err := ch.Query(ctx, query, metricNames) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var metricName, temporality string - err := rows.Scan(&metricName, &temporality) - if err != nil { - return nil, err - } - if _, ok := metricNameToTemporality[metricName]; !ok { - metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) - } - metricNameToTemporality[metricName][v3.Temporality(temporality)] = true - } - return metricNameToTemporality, nil -} - // populateTemporality same as addTemporality but for v4 and better -func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3, ch driver.Conn) error { +func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { missingTemporality := make([]string, 0) metricNameToTemporality := make(map[string]map[v3.Temporality]bool) @@ -405,7 +133,7 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan var err error if len(missingTemporality) > 0 { - nameToTemporality, err = r.FetchTemporality(ctx, missingTemporality, ch) + nameToTemporality, err = r.reader.FetchTemporality(ctx, missingTemporality) if err != nil { return err } @@ -429,52 +157,13 @@ func (r *ThresholdRule) populateTemporality(ctx context.Context, qp *v3.QueryRan return nil } -// ForEachActiveAlert runs the given function on each alert. -// This should be used when you want to use the actual alerts from the ThresholdRule -// and not on its copy. -// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. -func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) { - r.mtx.Lock() - defer r.mtx.Unlock() - - for _, a := range r.active { - f(a) - } -} - -func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { - alerts := []*Alert{} - r.ForEachActiveAlert(func(alert *Alert) { - if r.opts.SendAlways || alert.needsSending(ts, resendDelay) { - alert.LastSentAt = ts - // Allow for two Eval or Alertmanager send failures. - delta := resendDelay - if interval > resendDelay { - delta = interval - } - alert.ValidUntil = ts.Add(4 * delta) - anew := *alert - alerts = append(alerts, &anew) - } else { - zap.L().Debug("skipping send alert due to resend delay", zap.String("rule", r.Name()), zap.Any("alert", alert.Labels)) - } - }) - notifyFunc(ctx, "", alerts...) -} - -func (r *ThresholdRule) Unit() string { - if r.ruleCondition != nil && r.ruleCondition.CompositeQuery != nil { - return r.ruleCondition.CompositeQuery.Unit - } - return "" -} - -func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { +func (r *ThresholdRule) prepareQueryRange(ts time.Time) (*v3.QueryRangeParamsV3, error) { zap.L().Info("prepareQueryRange", zap.Int64("ts", ts.UnixMilli()), zap.Int64("evalWindow", r.evalWindow.Milliseconds()), zap.Int64("evalDelay", r.evalDelay.Milliseconds())) start := ts.Add(-time.Duration(r.evalWindow)).UnixMilli() end := ts.UnixMilli() + if r.evalDelay > 0 { start = start - int64(r.evalDelay.Milliseconds()) end = end - int64(r.evalDelay.Milliseconds()) @@ -507,16 +196,12 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { tmpl := template.New("clickhouse-query") tmpl, err := tmpl.Parse(chQuery.Query) if err != nil { - zap.L().Error("failed to parse clickhouse query to populate vars", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return params + return nil, err } var query bytes.Buffer err = tmpl.Execute(&query, params.Variables) if err != nil { - zap.L().Error("failed to populate clickhouse query", zap.String("ruleid", r.ID()), zap.Error(err)) - r.SetHealth(HealthBad) - return params + return nil, err } params.CompositeQuery.ClickHouseQueries[name] = &v3.ClickHouseQuery{ Query: query.String(), @@ -524,7 +209,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { Legend: chQuery.Legend, } } - return params + return params, nil } if r.ruleCondition.CompositeQuery != nil && r.ruleCondition.CompositeQuery.BuilderQueries != nil { @@ -548,7 +233,7 @@ func (r *ThresholdRule) prepareQueryRange(ts time.Time) *v3.QueryRangeParamsV3 { CompositeQuery: r.ruleCondition.CompositeQuery, Variables: make(map[string]interface{}, 0), NoCache: true, - } + }, nil } // The following function is used to prepare the where clause for the query @@ -625,7 +310,10 @@ func (r *ThresholdRule) prepareLinksToLogs(ts time.Time, lbls labels.Labels) str return "" } - q := r.prepareQueryRange(ts) + q, err := r.prepareQueryRange(ts) + if err != nil { + return "" + } // Logs list view expects time in milliseconds tr := v3.URLShareableTimeRange{ Start: q.Start, @@ -689,7 +377,10 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s return "" } - q := r.prepareQueryRange(ts) + q, err := r.prepareQueryRange(ts) + if err != nil { + return "" + } // Traces list view expects time in nanoseconds tr := v3.URLShareableTimeRange{ Start: q.Start * time.Second.Microseconds(), @@ -745,17 +436,6 @@ func (r *ThresholdRule) prepareLinksToTraces(ts time.Time, lbls labels.Labels) s return fmt.Sprintf("compositeQuery=%s&timeRange=%s&startTime=%d&endTime=%d&options=%s", compositeQuery, urlEncodedTimeRange, tr.Start, tr.End, urlEncodedOptions) } -func (r *ThresholdRule) hostFromSource() string { - parsedUrl, err := url.Parse(r.source) - if err != nil { - return "" - } - if parsedUrl.Port() != "" { - return fmt.Sprintf("%s://%s:%s", parsedUrl.Scheme, parsedUrl.Hostname(), parsedUrl.Port()) - } - return fmt.Sprintf("%s://%s", parsedUrl.Scheme, parsedUrl.Hostname()) -} - func (r *ThresholdRule) GetSelectedQuery() string { if r.ruleCondition != nil { if r.ruleCondition.SelectedQuery != "" { @@ -797,18 +477,14 @@ func (r *ThresholdRule) GetSelectedQuery() string { return "" } -func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch clickhouse.Conn) (Vector, error) { - if r.ruleCondition == nil || r.ruleCondition.CompositeQuery == nil { - r.SetHealth(HealthBad) - r.SetLastError(fmt.Errorf("no rule condition")) - return nil, fmt.Errorf("invalid rule condition") - } +func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time) (Vector, error) { - params := r.prepareQueryRange(ts) - err := r.populateTemporality(ctx, params, ch) + params, err := r.prepareQueryRange(ts) + if err != nil { + return nil, err + } + err = r.populateTemporality(ctx, params) if err != nil { - r.SetHealth(HealthBad) - zap.L().Error("failed to set temporality", zap.String("rule", r.Name()), zap.Error(err)) return nil, fmt.Errorf("internal error while setting temporality") } @@ -822,24 +498,22 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c } var results []*v3.Result - var errQuriesByName map[string]error + var queryErrors map[string]error if r.version == "v4" { - results, errQuriesByName, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) + results, queryErrors, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } else { - results, errQuriesByName, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) + results, queryErrors, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{}) } if err != nil { - zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("queries", errQuriesByName)) - r.SetHealth(HealthBad) + zap.L().Error("failed to get alert query result", zap.String("rule", r.Name()), zap.Error(err), zap.Any("errors", queryErrors)) return nil, fmt.Errorf("internal error while querying") } if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { results, err = postprocess.PostProcessResult(results, params) if err != nil { - r.SetHealth(HealthBad) zap.L().Error("failed to post process result", zap.String("rule", r.Name()), zap.Error(err)) return nil, fmt.Errorf("internal error while post processing") } @@ -901,113 +575,14 @@ func normalizeLabelName(name string) string { return normalized } -// TODO(srikanthccv): implement base rule and use for all types of rules -func (r *ThresholdRule) recordRuleStateHistory(ctx context.Context, prevState, currentState model.AlertState, itemsToAdd []v3.RuleStateHistory) error { - zap.L().Debug("recording rule state history", zap.String("ruleid", r.ID()), zap.Any("prevState", prevState), zap.Any("currentState", currentState), zap.Any("itemsToAdd", itemsToAdd)) - revisedItemsToAdd := map[uint64]v3.RuleStateHistory{} - - lastSavedState, err := r.reader.GetLastSavedRuleStateHistory(ctx, r.ID()) - if err != nil { - return err - } - // if the query-service has been restarted, or the rule has been modified (which re-initializes the rule), - // the state would reset so we need to add the corresponding state changes to previously saved states - if !r.handledRestart && len(lastSavedState) > 0 { - zap.L().Debug("handling restart", zap.String("ruleid", r.ID()), zap.Any("lastSavedState", lastSavedState)) - l := map[uint64]v3.RuleStateHistory{} - for _, item := range itemsToAdd { - l[item.Fingerprint] = item - } - - shouldSkip := map[uint64]bool{} - - for _, item := range lastSavedState { - // for the last saved item with fingerprint, check if there is a corresponding entry in the current state - currentState, ok := l[item.Fingerprint] - if !ok { - // there was a state change in the past, but not in the current state - // if the state was firing, then we should add a resolved state change - if item.State == model.StateFiring || item.State == model.StateNoData { - item.State = model.StateInactive - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - // there is nothing to do if the prev state was normal - } else { - if item.State != currentState.State { - item.State = currentState.State - item.StateChanged = true - item.UnixMilli = time.Now().UnixMilli() - revisedItemsToAdd[item.Fingerprint] = item - } - } - // do not add this item to revisedItemsToAdd as it is already processed - shouldSkip[item.Fingerprint] = true - } - zap.L().Debug("after lastSavedState loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - // if there are any new state changes that were not saved, add them to the revised items - for _, item := range itemsToAdd { - if _, ok := revisedItemsToAdd[item.Fingerprint]; !ok && !shouldSkip[item.Fingerprint] { - revisedItemsToAdd[item.Fingerprint] = item - } - } - zap.L().Debug("after itemsToAdd loop", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - newState := model.StateInactive - for _, item := range revisedItemsToAdd { - if item.State == model.StateFiring || item.State == model.StateNoData { - newState = model.StateFiring - break - } - } - zap.L().Debug("newState", zap.String("ruleid", r.ID()), zap.Any("newState", newState)) - - // if there is a change in the overall state, update the overall state - if lastSavedState[0].OverallState != newState { - for fingerprint, item := range revisedItemsToAdd { - item.OverallState = newState - item.OverallStateChanged = true - revisedItemsToAdd[fingerprint] = item - } - } - zap.L().Debug("revisedItemsToAdd after newState", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - } else { - for _, item := range itemsToAdd { - revisedItemsToAdd[item.Fingerprint] = item - } - } - - if len(revisedItemsToAdd) > 0 && r.reader != nil { - zap.L().Debug("writing rule state history", zap.String("ruleid", r.ID()), zap.Any("revisedItemsToAdd", revisedItemsToAdd)) - - entries := make([]v3.RuleStateHistory, 0, len(revisedItemsToAdd)) - for _, item := range revisedItemsToAdd { - entries = append(entries, item) - } - err := r.reader.AddRuleStateHistory(ctx, entries) - if err != nil { - zap.L().Error("error while inserting rule state history", zap.Error(err), zap.Any("itemsToAdd", itemsToAdd)) - } - } - r.handledRestart = true - - return nil -} - -func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (interface{}, error) { +func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time) (interface{}, error) { prevState := r.State() valueFormatter := formatter.FromUnit(r.Unit()) - res, err := r.buildAndRunQuery(ctx, ts, queriers.Ch) + res, err := r.buildAndRunQuery(ctx, ts) if err != nil { - r.SetHealth(HealthBad) - r.SetLastError(err) - zap.L().Error("failure in buildAndRunQuery", zap.String("ruleid", r.ID()), zap.Error(err)) return nil, err } @@ -1054,17 +629,17 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel) resultLabels := labels.NewBuilder(smpl.MetricOrig).Del(labels.MetricNameLabel).Del(labels.TemporalityLabel).Labels() - for _, l := range r.labels { - lb.Set(l.Name, expand(l.Value)) + for name, value := range r.labels.Map() { + lb.Set(name, expand(value)) } lb.Set(labels.AlertNameLabel, r.Name()) lb.Set(labels.AlertRuleIdLabel, r.ID()) lb.Set(labels.RuleSourceLabel, r.GeneratorURL()) - annotations := make(labels.Labels, 0, len(r.annotations)) - for _, a := range r.annotations { - annotations = append(annotations, labels.Label{Name: normalizeLabelName(a.Name), Value: expand(a.Value)}) + annotations := make(labels.Labels, 0, len(r.annotations.Map())) + for name, value := range r.annotations.Map() { + annotations = append(annotations, labels.Label{Name: normalizeLabelName(name), Value: expand(value)}) } if smpl.IsMissing { lb.Set(labels.AlertNameLabel, "[No data] "+r.Name()) @@ -1092,10 +667,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie if _, ok := alerts[h]; ok { zap.L().Error("the alert query returns duplicate records", zap.String("ruleid", r.ID()), zap.Any("alert", alerts[h])) err = fmt.Errorf("duplicate alert found, vector contains metrics with the same labelset after applying alert labels") - // We have already acquired the lock above hence using SetHealth and - // SetLastError will deadlock. - r.health = HealthBad - r.lastError = err return nil, err } @@ -1112,7 +683,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } } - zap.L().Info("alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) + zap.L().Info("number of alerts found", zap.String("name", r.Name()), zap.Int("count", len(alerts))) // alerts[h] is ready, add or update active list now for h, a := range alerts { @@ -1127,7 +698,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie } r.active[h] = a - } itemsToAdd := []v3.RuleStateHistory{} @@ -1190,7 +760,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie itemsToAdd[idx] = item } - r.recordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) + r.RecordRuleStateHistory(ctx, prevState, currentState, itemsToAdd) r.health = HealthGood r.lastError = err @@ -1226,179 +796,3 @@ func removeGroupinSetPoints(series v3.Series) []v3.Point { } return result } - -func (r *ThresholdRule) shouldAlert(series v3.Series) (Sample, bool) { - var alertSmpl Sample - var shouldAlert bool - var lbls labels.Labels - var lblsNormalized labels.Labels - - for name, value := range series.Labels { - lbls = append(lbls, labels.Label{Name: name, Value: value}) - lblsNormalized = append(lblsNormalized, labels.Label{Name: normalizeLabelName(name), Value: value}) - } - - series.Points = removeGroupinSetPoints(series) - - // nothing to evaluate - if len(series.Points) == 0 { - return alertSmpl, false - } - - switch r.matchType() { - case AtleastOnce: - // If any sample matches the condition, the rule is firing. - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Points { - if smpl.Value > r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Points { - if smpl.Value < r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Points { - if smpl.Value == r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Points { - if smpl.Value != r.targetVal() { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - shouldAlert = true - break - } - } - } - case AllTheTimes: - // If all samples match the condition, the rule is firing. - shouldAlert = true - alertSmpl = Sample{Point: Point{V: r.targetVal()}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - for _, smpl := range series.Points { - if smpl.Value <= r.targetVal() { - shouldAlert = false - break - } - } - // use min value from the series - if shouldAlert { - var minValue float64 = math.Inf(1) - for _, smpl := range series.Points { - if smpl.Value < minValue { - minValue = smpl.Value - } - } - alertSmpl = Sample{Point: Point{V: minValue}, Metric: lblsNormalized, MetricOrig: lbls} - } - } else if r.compareOp() == ValueIsBelow { - for _, smpl := range series.Points { - if smpl.Value >= r.targetVal() { - shouldAlert = false - break - } - } - if shouldAlert { - var maxValue float64 = math.Inf(-1) - for _, smpl := range series.Points { - if smpl.Value > maxValue { - maxValue = smpl.Value - } - } - alertSmpl = Sample{Point: Point{V: maxValue}, Metric: lblsNormalized, MetricOrig: lbls} - } - } else if r.compareOp() == ValueIsEq { - for _, smpl := range series.Points { - if smpl.Value != r.targetVal() { - shouldAlert = false - break - } - } - } else if r.compareOp() == ValueIsNotEq { - for _, smpl := range series.Points { - if smpl.Value == r.targetVal() { - shouldAlert = false - break - } - } - // use any non-inf or nan value from the series - if shouldAlert { - for _, smpl := range series.Points { - if !math.IsInf(smpl.Value, 0) && !math.IsNaN(smpl.Value) { - alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lblsNormalized, MetricOrig: lbls} - break - } - } - } - } - case OnAverage: - // If the average of all samples matches the condition, the rule is firing. - var sum, count float64 - for _, smpl := range series.Points { - if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { - continue - } - sum += smpl.Value - count++ - } - avg := sum / count - alertSmpl = Sample{Point: Point{V: avg}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - if avg > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if avg < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if avg == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if avg != r.targetVal() { - shouldAlert = true - } - } - case InTotal: - // If the sum of all samples matches the condition, the rule is firing. - var sum float64 - - for _, smpl := range series.Points { - if math.IsNaN(smpl.Value) || math.IsInf(smpl.Value, 0) { - continue - } - sum += smpl.Value - } - alertSmpl = Sample{Point: Point{V: sum}, Metric: lblsNormalized, MetricOrig: lbls} - if r.compareOp() == ValueIsAbove { - if sum > r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsBelow { - if sum < r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsEq { - if sum == r.targetVal() { - shouldAlert = true - } - } else if r.compareOp() == ValueIsNotEq { - if sum != r.targetVal() { - shouldAlert = true - } - } - } - return alertSmpl, shouldAlert -} diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 6cfeac83d9..734347793d 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -685,7 +685,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -774,7 +774,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -816,7 +816,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -892,7 +892,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -945,17 +945,17 @@ func TestThresholdRuleEvalDelay(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, nil) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, fm, nil) // no eval delay if err != nil { assert.NoError(t, err) } - params := rule.prepareQueryRange(ts) - + params, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) - secondTimeParams := rule.prepareQueryRange(ts) - + secondTimeParams, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) } } @@ -994,17 +994,17 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{EvalDelay: 2 * time.Minute}, fm, nil) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } - params := rule.prepareQueryRange(ts) - + params, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, params.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) - secondTimeParams := rule.prepareQueryRange(ts) - + secondTimeParams, err := rule.prepareQueryRange(ts) + assert.NoError(t, err) assert.Equal(t, c.expectedQuery, secondTimeParams.CompositeQuery.ClickHouseQueries["A"].Query, "Test case %d", idx) } } @@ -1137,7 +1137,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1147,11 +1147,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { assert.NoError(t, err) } - queriers := Queriers{ - Ch: mock, - } - - retVal, err := rule.Eval(context.Background(), time.Now(), &queriers) + retVal, err := rule.Eval(context.Background(), time.Now()) if err != nil { assert.NoError(t, err) } @@ -1240,7 +1236,7 @@ func TestThresholdRuleNoData(t *testing.T) { options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "") - rule, err := NewThresholdRule("69", &postableRule, ThresholdRuleOpts{}, fm, reader) + rule, err := NewThresholdRule("69", &postableRule, fm, reader) rule.temporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1250,11 +1246,7 @@ func TestThresholdRuleNoData(t *testing.T) { assert.NoError(t, err) } - queriers := Queriers{ - Ch: mock, - } - - retVal, err := rule.Eval(context.Background(), time.Now(), &queriers) + retVal, err := rule.Eval(context.Background(), time.Now()) if err != nil { assert.NoError(t, err) } diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 88f3a09542..7f282ea3f9 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -204,11 +204,19 @@ func createTelemetry() { return } - telemetry = &Telemetry{ - ossOperator: analytics.New(api_key), - ipAddress: getOutboundIP(), - rateLimits: make(map[string]int8), - activeUser: make(map[string]int8), + if constants.IsOSSTelemetryEnabled() { + telemetry = &Telemetry{ + ossOperator: analytics.New(api_key), + ipAddress: getOutboundIP(), + rateLimits: make(map[string]int8), + activeUser: make(map[string]int8), + } + } else { + telemetry = &Telemetry{ + ipAddress: getOutboundIP(), + rateLimits: make(map[string]int8), + activeUser: make(map[string]int8), + } } telemetry.minRandInt = 0 telemetry.maxRandInt = int(1 / DEFAULT_SAMPLING) @@ -484,16 +492,18 @@ func (a *Telemetry) IdentifyUser(user *model.User) { }) } - a.ossOperator.Enqueue(analytics.Identify{ - UserId: a.ipAddress, - Traits: analytics.NewTraits().SetName(user.Name).SetEmail(user.Email).Set("ip", a.ipAddress), - }) - // Updating a groups properties - a.ossOperator.Enqueue(analytics.Group{ - UserId: a.ipAddress, - GroupId: a.getCompanyDomain(), - Traits: analytics.NewTraits().Set("company_domain", a.getCompanyDomain()), - }) + if a.ossOperator != nil { + a.ossOperator.Enqueue(analytics.Identify{ + UserId: a.ipAddress, + Traits: analytics.NewTraits().SetName(user.Name).SetEmail(user.Email).Set("ip", a.ipAddress), + }) + // Updating a groups properties + a.ossOperator.Enqueue(analytics.Group{ + UserId: a.ipAddress, + GroupId: a.getCompanyDomain(), + Traits: analytics.NewTraits().Set("company_domain", a.getCompanyDomain()), + }) + } } func (a *Telemetry) SetUserEmail(email string) {