-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
68 lines (59 loc) · 1.48 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"fmt"
"log"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type ConsumerImpl struct {
consumer *kafka.Consumer
}
func NewConsumer(broker, group string) (*ConsumerImpl, error) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"go.application.rebalance.enable": true,
"auto.offset.reset": "earliest",
})
if err != nil {
return nil, err
}
return &ConsumerImpl{
consumer: c,
}, nil
}
func (c *ConsumerImpl) Listen(topics []string) error {
defer c.consumer.Close()
if err := c.consumer.SubscribeTopics(topics, nil); err != nil {
return err
}
run := true
for run {
select {
case ev := <-c.consumer.Events():
switch e := ev.(type) {
case kafka.AssignedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
err := c.consumer.Assign(e.Partitions)
if err != nil {
log.Fatal(err)
}
case kafka.RevokedPartitions:
fmt.Fprintf(os.Stderr, "%% %v\n", e)
err := c.consumer.Unassign()
if err != nil {
log.Fatal(err)
}
case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n",
e.TopicPartition, string(e.Value))
case kafka.Error:
// Errors should generally be considered as informational, the client will try to automatically recover
fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
}
}
}
return nil
}