Skip to content

Commit

Permalink
chore: alert state change and overall status (#5845)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Sep 9, 2024
1 parent 74c994f commit 3e32dab
Show file tree
Hide file tree
Showing 14 changed files with 506 additions and 187 deletions.
154 changes: 137 additions & 17 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
signozHistoryDBName = "signoz_analytics"
ruleStateHistoryTableName = "distributed_rule_state_history"
ruleStateHistoryTableName = "distributed_rule_state_history_v0"
signozDurationMVTable = "distributed_durationSort"
signozUsageExplorerTable = "distributed_usage_explorer"
signozSpansTable = "distributed_signoz_spans"
Expand Down Expand Up @@ -5332,6 +5332,18 @@ func (r *ClickHouseReader) AddRuleStateHistory(ctx context.Context, ruleStateHis
return nil
}

func (r *ClickHouseReader) GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error) {
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE rule_id = '%s' AND state_changed = true ORDER BY unix_milli DESC LIMIT 1 BY fingerprint",
signozHistoryDBName, ruleStateHistoryTableName, ruleID)

history := []v3.RuleStateHistory{}
err := r.db.Select(ctx, &history, query)
if err != nil {
return nil, err
}
return history, nil
}

func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error) {

Expand Down Expand Up @@ -5397,22 +5409,51 @@ func (r *ClickHouseReader) ReadRuleStateHistoryByRuleID(
signozHistoryDBName, ruleStateHistoryTableName, whereClause, params.Order, params.Limit, params.Offset)

history := []v3.RuleStateHistory{}
zap.L().Debug("rule state history query", zap.String("query", query))
err := r.db.Select(ctx, &history, query)
if err != nil {
zap.L().Error("Error while reading rule state history", zap.Error(err))
return nil, err
}

var total uint64
zap.L().Debug("rule state history total query", zap.String("query", fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)))
err = r.db.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE %s",
signozHistoryDBName, ruleStateHistoryTableName, whereClause)).Scan(&total)
if err != nil {
return nil, err
}

labelsQuery := fmt.Sprintf("SELECT DISTINCT labels FROM %s.%s WHERE rule_id = $1",
signozHistoryDBName, ruleStateHistoryTableName)
rows, err := r.db.Query(ctx, labelsQuery, ruleID)
if err != nil {
return nil, err
}
defer rows.Close()

labelsMap := make(map[string][]string)
for rows.Next() {
var rawLabel string
err = rows.Scan(&rawLabel)
if err != nil {
return nil, err
}
label := map[string]string{}
err = json.Unmarshal([]byte(rawLabel), &label)
if err != nil {
return nil, err
}
for k, v := range label {
labelsMap[k] = append(labelsMap[k], v)
}
}

timeline := &v3.RuleStateTimeline{
Items: history,
Total: total,
Items: history,
Total: total,
Labels: labelsMap,
}

return timeline, nil
Expand All @@ -5425,11 +5466,13 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
any(labels) as labels,
count(*) as count
FROM %s.%s
WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d
WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d
GROUP BY fingerprint
HAVING labels != '{}'
ORDER BY count DESC`,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

zap.L().Debug("rule state history top contributors query", zap.String("query", query))
contributors := []v3.RuleStateHistoryContributor{}
err := r.db.Select(ctx, &contributors, query)
if err != nil {
Expand All @@ -5440,15 +5483,15 @@ func (r *ClickHouseReader) ReadRuleStateHistoryTopContributorsByRuleID(
return contributors, nil
}

func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error) {
func (r *ClickHouseReader) GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error) {

tmpl := `WITH firing_events AS (
SELECT
rule_id,
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5459,7 +5502,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5484,13 +5527,87 @@ ORDER BY firing_time ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

zap.L().Debug("overall state transitions query", zap.String("query", query))

transitions := []v3.RuleStateTransition{}
err := r.db.Select(ctx, &transitions, query)
if err != nil {
return nil, err
}

return transitions, nil
stateItems := []v3.ReleStateItem{}

for idx, item := range transitions {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(transitions)-1 {
nextStart := transitions[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: end,
End: nextStart,
})
}
}
}

// fetch the most recent overall_state from the table
var state model.AlertState
stateQuery := fmt.Sprintf("SELECT state FROM %s.%s WHERE rule_id = '%s' AND unix_milli <= %d ORDER BY unix_milli DESC LIMIT 1",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.End)
if err := r.db.QueryRow(ctx, stateQuery).Scan(&state); err != nil {
if err != sql.ErrNoRows {
return nil, err
}
state = model.StateInactive
}

if len(transitions) == 0 {
// no transitions found, it is either firing or inactive for whole time range
stateItems = append(stateItems, v3.ReleStateItem{
State: state,
Start: params.Start,
End: params.End,
})
} else {
// there were some transitions, we need to add the last state at the end
if state == model.StateInactive {
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: params.End,
})
} else {
// fetch the most recent firing event from the table in the given time range
var firingTime int64
firingQuery := fmt.Sprintf(`
SELECT
unix_milli
FROM %s.%s
WHERE rule_id = '%s' AND overall_state_changed = true AND overall_state = '%s' AND unix_milli <= %d
ORDER BY unix_milli DESC LIMIT 1`, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.End)
if err := r.db.QueryRow(ctx, firingQuery).Scan(&firingTime); err != nil {
return nil, err
}
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateInactive,
Start: transitions[len(transitions)-1].ResolutionTime,
End: firingTime,
})
stateItems = append(stateItems, v3.ReleStateItem{
State: model.StateFiring,
Start: firingTime,
End: params.End,
})
}
}
return stateItems, nil
}

func (r *ClickHouseReader) GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error) {
Expand All @@ -5502,7 +5619,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5513,7 +5630,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5538,6 +5655,7 @@ FROM matched_events;
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)

zap.L().Debug("avg resolution time query", zap.String("query", query))
var avgResolutionTime float64
err := r.db.QueryRow(ctx, query).Scan(&avgResolutionTime)
if err != nil {
Expand All @@ -5558,7 +5676,7 @@ WITH firing_events AS (
state,
unix_milli AS firing_time
FROM %s.%s
WHERE overall_state = 'firing'
WHERE overall_state = '` + model.StateFiring.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5569,7 +5687,7 @@ resolution_events AS (
state,
unix_milli AS resolution_time
FROM %s.%s
WHERE overall_state = 'normal'
WHERE overall_state = '` + model.StateInactive.String() + `'
AND overall_state_changed = true
AND rule_id IN ('%s')
AND unix_milli >= %d AND unix_milli <= %d
Expand All @@ -5595,6 +5713,7 @@ ORDER BY ts ASC;`
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End,
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End, step)

zap.L().Debug("avg resolution time by interval query", zap.String("query", query))
result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
return nil, err
Expand All @@ -5604,10 +5723,11 @@ ORDER BY ts ASC;`
}

func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*) FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d",
signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

var totalTriggers uint64

err := r.db.QueryRow(ctx, query).Scan(&totalTriggers)
if err != nil {
return 0, err
Expand All @@ -5619,8 +5739,8 @@ func (r *ClickHouseReader) GetTotalTriggers(ctx context.Context, ruleID string,
func (r *ClickHouseReader) GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error) {
step := common.MinAllowedStepInterval(params.Start, params.End)

query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = 'firing') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, params.Start, params.End)
query := fmt.Sprintf("SELECT count(*), toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts FROM %s.%s WHERE rule_id = '%s' AND (state_changed = true) AND (state = '%s') AND unix_milli >= %d AND unix_milli <= %d GROUP BY ts ORDER BY ts ASC",
step, signozHistoryDBName, ruleStateHistoryTableName, ruleID, model.StateFiring.String(), params.Start, params.End)

result, err := r.GetTimeSeriesResultV3(ctx, query)
if err != nil || len(result) == 0 {
Expand Down
24 changes: 1 addition & 23 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,34 +746,12 @@ func (aH *APIHandler) getOverallStateTransitions(w http.ResponseWriter, r *http.
return
}

res, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
stateItems, err := aH.reader.GetOverallStateTransitions(r.Context(), ruleID, &params)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

stateItems := []v3.ReleStateItem{}

for idx, item := range res {
start := item.FiringTime
end := item.ResolutionTime
stateItems = append(stateItems, v3.ReleStateItem{
State: item.State,
Start: start,
End: end,
})
if idx < len(res)-1 {
nextStart := res[idx+1].FiringTime
if nextStart > end {
stateItems = append(stateItems, v3.ReleStateItem{
State: "normal",
Start: end,
End: nextStart,
})
}
}
}

aH.Respond(w, stateItems)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/query-service/interfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,15 @@ type Reader interface {
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)

AddRuleStateHistory(ctx context.Context, ruleStateHistory []v3.RuleStateHistory) error
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateTransition, error)
GetOverallStateTransitions(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.ReleStateItem, error)
ReadRuleStateHistoryByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.RuleStateTimeline, error)
GetTotalTriggers(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (uint64, error)
GetTriggersByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
GetAvgResolutionTime(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (float64, error)
GetAvgResolutionTimeByInterval(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) (*v3.Series, error)
ReadRuleStateHistoryTopContributorsByRuleID(ctx context.Context, ruleID string, params *v3.QueryRuleStateHistory) ([]v3.RuleStateHistoryContributor, error)
GetLastSavedRuleStateHistory(ctx context.Context, ruleID string) ([]v3.RuleStateHistory, error)

GetMinAndMaxTimestampForTraceID(ctx context.Context, traceID []string) (int64, int64, error)

// Query Progress tracking helpers.
Expand Down
10 changes: 5 additions & 5 deletions pkg/query-service/migrate/migate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func ClickHouseMigrate(conn driver.Conn, cluster string) error {

database := "CREATE DATABASE IF NOT EXISTS signoz_analytics ON CLUSTER %s"

localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history ON CLUSTER %s
localTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.rule_state_history_v0 ON CLUSTER %s
(
_retention_days UInt32 DEFAULT 180,
rule_id LowCardinality(String),
Expand All @@ -80,7 +80,7 @@ ORDER BY (rule_id, unix_milli)
TTL toDateTime(unix_milli / 1000) + toIntervalDay(_retention_days)
SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`

distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history ON CLUSTER %s
distributedTable := `CREATE TABLE IF NOT EXISTS signoz_analytics.distributed_rule_state_history_v0 ON CLUSTER %s
(
rule_id LowCardinality(String),
rule_name LowCardinality(String),
Expand All @@ -93,7 +93,7 @@ SETTINGS ttl_only_drop_parts = 1, index_granularity = 8192`
value Float64 CODEC(Gorilla, ZSTD(1)),
labels String CODEC(ZSTD(5)),
)
ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_id, rule_name, fingerprint))`
ENGINE = Distributed(%s, signoz_analytics, rule_state_history_v0, cityHash64(rule_id, rule_name, fingerprint))`

// check if db exists
dbExists := `SELECT count(*) FROM system.databases WHERE name = 'signoz_analytics'`
Expand All @@ -111,7 +111,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}

// check if table exists
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history' AND database = 'signoz_analytics'`
tableExists := `SELECT count(*) FROM system.tables WHERE name = 'rule_state_history_v0' AND database = 'signoz_analytics'`
var tableCount uint64
err = conn.QueryRow(context.Background(), tableExists).Scan(&tableCount)
if err != nil {
Expand All @@ -126,7 +126,7 @@ ENGINE = Distributed(%s, signoz_analytics, rule_state_history, cityHash64(rule_i
}

// check if distributed table exists
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history' AND database = 'signoz_analytics'`
distributedTableExists := `SELECT count(*) FROM system.tables WHERE name = 'distributed_rule_state_history_v0' AND database = 'signoz_analytics'`
var distributedTableCount uint64
err = conn.QueryRow(context.Background(), distributedTableExists).Scan(&distributedTableCount)
if err != nil {
Expand Down
Loading

0 comments on commit 3e32dab

Please sign in to comment.