Skip to content

Commit f902baf

Browse files
authored
Merge pull request #135 from opsre/fix-consumer
Fix consumer knowns issue
2 parents 96a5133 + 92d6b87 commit f902baf

File tree

7 files changed

+115
-75
lines changed

7 files changed

+115
-75
lines changed

.github/workflows/test-ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: CI
33
on:
44
push:
55
branches:
6-
- '*/*'
6+
- '*'
77

88
jobs:
99
build:

alert/consumer/consumer.go

Lines changed: 83 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -54,48 +54,39 @@ func (ag *AlertGroups) AddAlert(alert *models.AlertCurEvent, noticeGroup []map[s
5454
ag.lock.Lock()
5555
defer ag.lock.Unlock()
5656

57-
// 查找Rule位置
58-
rulePos := sort.Search(len(ag.Rules), func(i int) bool {
59-
return ag.Rules[i].RuleID >= alert.RuleId
60-
})
57+
// 查找 Rule 位置
58+
rulePos := ag.getRuleNodePos(alert.RuleId)
6159

62-
// Rule存在时的处理,找到对应的规则组
60+
// Rule 存在时的处理,找到对应的规则组
6361
if rulePos < len(ag.Rules) && ag.Rules[rulePos].RuleID == alert.RuleId {
64-
groups := &ag.Rules[rulePos].Groups
65-
// 查找Group位置
66-
groupPos := sort.Search(len(*groups), func(i int) bool {
67-
return (*groups)[i].ID >= groupID
68-
})
62+
rule := &ag.Rules[rulePos]
6963

70-
if groupPos < len(*groups) && (*groups)[groupPos].ID == groupID {
64+
// 查找 Group 位置
65+
groupPos := ag.getGroupNodePos(rule, groupID)
66+
67+
if groupPos < len(rule.Groups) && (rule.Groups)[groupPos].ID == groupID {
7168
// 追加事件
72-
(*groups)[groupPos].Events = append((*groups)[groupPos].Events, alert)
69+
(rule.Groups)[groupPos].Events = append((rule.Groups)[groupPos].Events, alert)
7370
} else {
74-
// 插入新Group,新增空元素扩容切片
75-
*groups = append(*groups, EventsGroup{})
76-
// 将插入位置之后的元素后移,
77-
copy((*groups)[groupPos+1:], (*groups)[groupPos:])
78-
// 将数据赋值到新的空元素上
79-
(*groups)[groupPos] = EventsGroup{
71+
// 插入新数据
72+
rule.Groups = append(rule.Groups, EventsGroup{
8073
ID: groupID,
8174
Events: []*models.AlertCurEvent{alert},
82-
}
75+
})
8376
}
8477
return
8578
}
8679

8780
// 插入新Rule
88-
ag.Rules = append(ag.Rules, RulesGroup{})
89-
copy(ag.Rules[rulePos+1:], ag.Rules[rulePos:])
90-
ag.Rules[rulePos] = RulesGroup{
81+
ag.Rules = append(ag.Rules, RulesGroup{
9182
RuleID: alert.RuleId,
9283
Groups: []EventsGroup{
9384
{
9485
ID: groupID,
9586
Events: []*models.AlertCurEvent{alert},
9687
},
9788
},
98-
}
89+
})
9990
}
10091

10192
// generateGroupID 生成分组ID,每个规则可能会有多个分组(其分组通知),默认为 default,如果有匹配的分组则根据 key/value 计算一个 HASH值作为 ID。
@@ -114,6 +105,31 @@ func (ag *AlertGroups) generateGroupID(alert *models.AlertCurEvent, noticeGroupM
114105
return groupId
115106
}
116107

108+
// getRuleNodePos 获取 Rule 点位
109+
func (ag *AlertGroups) getRuleNodePos(ruleId string) int {
110+
// Rules 切片排序
111+
sort.Slice(ag.Rules, func(i, j int) bool {
112+
return ag.Rules[i].RuleID < ag.Rules[j].RuleID
113+
})
114+
115+
// 查找Rule位置
116+
return sort.Search(len(ag.Rules), func(i int) bool {
117+
return ag.Rules[i].RuleID >= ruleId
118+
})
119+
}
120+
121+
func (ag *AlertGroups) getGroupNodePos(rule *RulesGroup, groupId string) int {
122+
// Groups 切片排序
123+
sort.Slice(rule.Groups, func(i, j int) bool {
124+
return rule.Groups[i].ID < rule.Groups[j].ID
125+
})
126+
127+
// 查找Group位置
128+
return sort.Search(len(rule.Groups), func(i int) bool {
129+
return (rule.Groups)[i].ID >= groupId
130+
})
131+
}
132+
117133
func NewConsumerWork(ctx *ctx.Context) ConsumeInterface {
118134
return &Consume{
119135
ctx: ctx,
@@ -141,52 +157,52 @@ func (c *Consume) Stop(faultCenterId string) {
141157

142158
// Watch 启动 Consumer Watch 进程
143159
func (c *Consume) Watch(ctx context.Context, faultCenter models.FaultCenter) {
160+
taskChan := make(chan struct{}, 1)
144161
timer := time.NewTicker(time.Second * time.Duration(1))
145162
defer func() {
146163
timer.Stop()
147-
if r := recover(); r != nil {
148-
logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID))
149-
}
164+
//if r := recover(); r != nil {
165+
// logc.Error(c.ctx.Ctx, fmt.Sprintf("Recovered from consumer watch goroutine panic: %s, FaultCenterName: %s, Id: %s", r, faultCenter.Name, faultCenter.ID))
166+
//}
150167
}()
151168

152169
for {
153170
select {
154171
case <-timer.C:
155-
c.processSilenceRule(faultCenter)
156-
// 获取故障中心的所有告警事件
157-
data, err := c.ctx.Redis.Redis().HGetAll(faultCenter.GetFaultCenterKey()).Result()
158-
if err != nil {
159-
logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", faultCenter.GetFaultCenterKey(), err.Error()))
160-
return
161-
}
162-
// 事件过滤
163-
filterEvents := c.filterAlertEvents(faultCenter, data)
164-
// 事件分组
165-
var alertGroups AlertGroups
166-
c.alarmGrouping(faultCenter, &alertGroups, filterEvents)
167-
// 事件聚合
168-
aggEvents := c.alarmAggregation(faultCenter, &alertGroups)
169-
// 发送事件
170-
c.sendAlerts(faultCenter, aggEvents)
172+
// 处理任务信号量
173+
taskChan <- struct{}{}
174+
c.executeTask(faultCenter, taskChan)
171175
case <-ctx.Done():
172176
return
173177
}
174178
}
175179
}
176180

177-
func (c *Consume) wait(startAt, endAt int64, sem chan struct{}) {
178-
timer := time.NewTicker(time.Second * time.Duration(1))
179-
for {
180-
select {
181-
case <-timer.C:
182-
if (endAt - startAt) > 60 {
183-
time.Sleep(time.Millisecond * 200)
184-
sem <- struct{}{}
185-
} else {
186-
return
187-
}
188-
}
181+
// executeTask 执行具体的任务逻辑
182+
func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan struct{}) {
183+
defer func() {
184+
// 释放任务信号量
185+
<-taskChan
186+
}()
187+
// 处理静默规则
188+
c.processSilenceRule(faultCenter)
189+
// 获取故障中心的所有告警事件
190+
data, err := c.ctx.Redis.Redis().HGetAll(faultCenter.GetFaultCenterKey()).Result()
191+
if err != nil {
192+
logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", faultCenter.GetFaultCenterKey(), err.Error()))
193+
return
189194
}
195+
196+
// 事件过滤
197+
filterEvents := c.filterAlertEvents(faultCenter, data)
198+
// 事件分组
199+
var alertGroups AlertGroups
200+
c.alarmGrouping(faultCenter, &alertGroups, filterEvents)
201+
fmt.Println("alertGroups->", tools.JsonMarshal(alertGroups.Rules))
202+
// 事件聚合
203+
aggEvents := c.alarmAggregation(faultCenter, &alertGroups)
204+
// 发送事件
205+
c.sendAlerts(faultCenter, aggEvents)
190206
}
191207

192208
// filterAlertEvents 过滤告警事件
@@ -226,23 +242,31 @@ func (c *Consume) isMutedEvent(event *models.AlertCurEvent, faultCenter models.F
226242

227243
// validateEvent 事件验证
228244
func (c *Consume) validateEvent(event *models.AlertCurEvent, faultCenter models.FaultCenter) bool {
229-
if event.State == "Pending" {
230-
return false
231-
}
232-
233245
return event.IsRecovered || event.LastSendTime == 0 ||
234246
event.LastEvalTime >= event.LastSendTime+faultCenter.RepeatNoticeInterval*60
235247
}
236248

237249
// alarmGrouping 告警分组
250+
// 分组会进行 2 次分类
251+
// 第一次是状态(用于区分事件是告警或恢复,用于后续聚合逻辑,避免告警和恢复聚合到一起)
252+
// 第二次是规则(对隶属于相同规则的事件放再同一组,用于后续聚合逻辑,避免不同规则的告警或恢复聚合到一起)
238253
func (c *Consume) alarmGrouping(faultCenter models.FaultCenter, alertGroups *AlertGroups, alerts []*models.AlertCurEvent) {
239254
if len(alerts) == 0 {
240255
return
241256
}
242257

243258
for _, alert := range alerts {
259+
// 状态分组
260+
switch alert.IsRecovered {
261+
case true:
262+
alert.RuleId = "Recover_" + alert.RuleId
263+
case false:
264+
alert.RuleId = "Firing_" + alert.RuleId
265+
default:
266+
alert.RuleId = "Unknown_" + alert.RuleId
267+
}
268+
244269
alertGroups.AddAlert(alert, faultCenter.NoticeGroup)
245-
//c.addAlertToGroup(alert, faultCenter.NoticeGroup)
246270
if alert.IsRecovered {
247271
c.removeAlertFromCache(alert)
248272
if err := process.RecordAlertHisEvent(c.ctx, *alert); err != nil {

alert/eval/eval.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type (
2121
Submit(rule models.AlertRule)
2222
Stop(ruleId string)
2323
Eval(ctx context.Context, rule models.AlertRule)
24-
Recover(faultCenterKey string, faultCenterInfoKey string, curKeys []string)
24+
Recover(ruleId, faultCenterKey string, faultCenterInfoKey string, curKeys []string)
2525
GC(rule models.AlertRule, curFiringKeys []string)
2626
RestartAllEvals()
2727
}
@@ -108,8 +108,8 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
108108
// 追加当前数据源的指纹到总列表
109109
curFingerprints = append(curFingerprints, fingerprints...)
110110
}
111-
//logc.Infof(t.ctx.Ctx, fmt.Sprintf("规则评估 -> %v", tools.JsonMarshal(rule)))
112-
t.Recover(models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints)
111+
logc.Infof(t.ctx.Ctx, fmt.Sprintf("规则评估 -> %v", tools.JsonMarshal(rule)))
112+
t.Recover(rule.RuleId, models.BuildCacheEventKey(rule.TenantId, rule.FaultCenterId), models.BuildCacheInfoKey(rule.TenantId, rule.FaultCenterId), curFingerprints)
113113
t.GC(rule, curFingerprints)
114114

115115
case <-ctx.Done():
@@ -120,13 +120,22 @@ func (t *AlertRule) Eval(ctx context.Context, rule models.AlertRule) {
120120
}
121121
}
122122

123-
func (t *AlertRule) Recover(faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) {
123+
func (t *AlertRule) Recover(RuleId, faultCenterKey string, faultCenterInfoKey string, curFingerprints []string) {
124124
// 获取所有的故障中心的告警事件
125125
events, err := t.ctx.Redis.Event().GetAllEventsForFaultCenter(faultCenterKey)
126126
if err != nil {
127127
return
128128
}
129129

130+
// 只获取当前规则的事件
131+
var currentRuleEvents = make(map[string]models.AlertCurEvent)
132+
for fingerprint, event := range events {
133+
if event.RuleId == RuleId {
134+
currentRuleEvents[fingerprint] = event
135+
}
136+
}
137+
events = currentRuleEvents
138+
130139
// 提取事件中的告警指纹
131140
fingerprints := make([]string, 0)
132141
for fingerprint := range events {
@@ -146,6 +155,10 @@ func (t *AlertRule) Recover(faultCenterKey string, faultCenterInfoKey string, cu
146155
return
147156
}
148157

158+
// 调整为待恢复状态
159+
event.Status = 3
160+
t.ctx.Redis.Event().PushEventToFaultCenter(&event)
161+
149162
// 判断是否在等待时间范围内
150163
wTime, exists := t.alarmRecoverWaitStore.Get(key)
151164
if !exists {
@@ -159,6 +172,8 @@ func (t *AlertRule) Recover(faultCenterKey string, faultCenterInfoKey string, cu
159172
continue
160173
}
161174

175+
// 已恢复状态
176+
event.Status = 4
162177
event.IsRecovered = true
163178
event.RecoverTime = curTime
164179
event.LastSendTime = 0

alert/process/process.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,15 @@ func PushEventToFaultCenter(ctx *ctx.Context, event *models.AlertCurEvent) {
5151
event.LastEvalTime = eventOpt.GetLastEvalTimeForFaultCenter()
5252
event.LastSendTime = eventOpt.GetLastSendTimeForFaultCenter(event.TenantId, event.FaultCenterId, event.Fingerprint)
5353

54-
event.State = "Pending"
5554
if event.IsArriveForDuration() {
56-
event.State = "Firing"
55+
event.Status = 1
5756
}
58-
if event.IsRecovered {
59-
event.State = "Recover"
60-
}
61-
6257
if isSilencedEvent(event) {
6358
event.Status = 2
6459
}
60+
if event.IsRecovered {
61+
event.Status = 3
62+
}
6563

6664
eventOpt.PushEventToFaultCenter(event)
6765
}

internal/models/alert_current_event.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ type AlertCurEvent struct {
1414
DatasourceType string `json:"datasource_type"`
1515
DatasourceId string `json:"datasource_id" gorm:"datasource_id"`
1616
Fingerprint string `json:"fingerprint"`
17-
State string `json:"state"` // 事件状态,Pending / Firing
1817
Severity string `json:"severity"`
1918
Metric map[string]interface{} `json:"metric" gorm:"metric;serializer:json"`
2019
Labels map[string]string `json:"labels" gorm:"labels;serializer:json"`
@@ -36,7 +35,7 @@ type AlertCurEvent struct {
3635
FaultCenter FaultCenter `json:"faultCenter" gorm:"-"`
3736
ResponseTime string `json:"response_time" gorm:"-"`
3837
TimeRemaining int64 `json:"time_remaining" gorm:"-"`
39-
Status int64 `json:"status" gorm:"-"` // 事件状态,告警中:1,静默中:2
38+
Status int64 `json:"status" gorm:"-"` // 事件状态,告警中:1,静默中:2,待恢复:3,已恢复:4
4039
}
4140

4241
type AlertCurEventQuery struct {

internal/models/fault_center.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type FaultCenter struct {
1818
RecoverWaitTime int64 `json:"recoverWaitTime"`
1919
CurrentAlertNumber int64 `json:"currentAlertNumber" gorm:"-"`
2020
CurrentMuteNumber int64 `json:"currentMuteNumber" gorm:"-"`
21+
CurrentRecoverNumber int64 `json:"currentRecoverNumber" gorm:"-"`
2122
}
2223

2324
func (f *FaultCenter) TableName() string {

internal/services/fault_center.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,17 @@ func (f faultCenterService) List(req interface{}) (data interface{}, err interfa
8989
}
9090

9191
for _, event := range events {
92-
if event.Status == 1 {
92+
switch event.Status {
93+
case 1:
9394
faultCenters[index].CurrentAlertNumber++
94-
} else {
95+
case 2:
9596
faultCenters[index].CurrentMuteNumber++
97+
case 3:
98+
faultCenters[index].CurrentRecoverNumber++
9699
}
97100
}
98-
99101
}
102+
100103
return faultCenters, nil
101104
}
102105

0 commit comments

Comments
 (0)