-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathconsume_pause_resume.go
134 lines (114 loc) · 3.3 KB
/
consume_pause_resume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// qbus暂停/恢复消费功能示例:对每个topic,每次消费一个批次(固定数量)的消息,就会暂停消费该topic一段时间,之后恢复消费。
// 仅支持 Kafka 模式
package main
import (
"fmt"
"os"
"os/signal"
"qbus"
"strings"
"time"
)
const (
// 日志路径
logPath = "./consumer.log"
// 配置文件路径
configPath = "./consumer.config"
// 一个批次的消息数量
messageBatchSize = 2
// 暂停消费的时间(单位:秒)
pausedTimeS = 3
)
var qbusConsumer qbus.QbusConsumer
// GoCallback 实现qbus消费回调接口
type GoCallback struct {
qbus.QbusConsumerCallback
// topic名 => 该topic上已消费的消息数量
messageCountMap map[string]int64
}
// NewGoCallback 创建GoCallback实例
func NewGoCallback() *GoCallback {
cb := new(GoCallback)
cb.messageCountMap = make(map[string]int64)
return cb
}
// DeliveryMsg 自动提交offset模式中消费消息的回调函数
func (p *GoCallback) DeliveryMsg(topic string, msg string, msgLen int64) {
id, ok := p.messageCountMap[topic]
if ok {
id++
} else {
id = 1
}
p.messageCountMap[topic] = id
fmt.Printf("Topic:%s id:%v | msg:%s\n", topic, id, string(msg[0:msgLen]))
if id%messageBatchSize == 0 {
// 若消息消息数量构成了一个批次,暂停消费
fmt.Printf("%v | Pause consuming %s\n", time.Now().Format("2006-01-02 15:04:05.000"), topic)
if !qbusConsumer.Pause(toStringVector([]string{topic})) {
fmt.Printf("Failed to pause %s\n", topic)
os.Exit(1)
}
go func(topic string) {
// 暂停若干秒后,恢复该topic的消费
time.Sleep(time.Second * pausedTimeS)
fmt.Printf("%v | Resume consuming %s\n", time.Now().Format("2006-01-02 15:04:05.000"), topic)
if !qbusConsumer.Resume(toStringVector([]string{topic})) {
fmt.Printf("Failed to resume %s\n", topic)
os.Exit(1)
}
}(topic)
}
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("Usage: %s topic_list group_name cluster_name\n"+
" topic_list is comma-splitted topics like \"topic1,topic2,topic3\"\n",
os.Args[0])
return
}
// 1. 解析命令行参数
topicList := strings.Split(os.Args[1], ",")
groupName := os.Args[2]
clusterName := os.Args[3]
fmt.Printf("topics: %v | group: %s\n", topicList, groupName)
// 2. 启动消费者订阅消息
qbusConsumer = qbus.NewQbusConsumer()
defer qbus.DeleteQbusConsumer(qbusConsumer)
callback := qbus.NewDirectorQbusConsumerCallback(NewGoCallback())
defer qbus.DeleteDirectorQbusConsumerCallback(callback)
if !qbusConsumer.Init(clusterName, logPath, configPath, callback) {
fmt.Println("Failed to Init")
os.Exit(1)
}
if !qbusConsumer.Subscribe(groupName, toStringVector(topicList)) {
fmt.Println("Failed to Subscribe")
os.Exit(1)
}
// 3. Ctrl+C退出主循环
c := make(chan os.Signal, 1)
done := make(chan bool, 1)
go func() {
for sig := range c {
fmt.Printf("received ctrl+c(%v)\n", sig)
done <- true
}
}()
signal.Notify(c, os.Interrupt)
// 4. 启动消费线程进行消费
if !qbusConsumer.Start() {
fmt.Println("Failed to Start")
os.Exit(1)
}
defer qbusConsumer.Stop()
fmt.Println("%% Start consuming... (Press Ctrl+C to exit)")
<-done
fmt.Println("Done.")
}
func toStringVector(s []string) qbus.StringVector {
result := qbus.NewStringVector()
for i := 0; i < len(s); i++ {
result.Add(s[i])
}
return result
}