Skip to content
This repository was archived by the owner on May 13, 2019. It is now read-only.

add function that get consumer by brokers #132

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,107 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
return
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroupByBrokers(name string, topics []string, zookeeper []string, config *Config, brokers []string) (cg *ConsumerGroup, err error) {

if name == "" {
return nil, sarama.ConfigurationError("Empty consumergroup name")
}

if len(topics) == 0 {
return nil, sarama.ConfigurationError("No topics provided")
}

if len(zookeeper) == 0 {
return nil, errors.New("You need to provide at least one zookeeper node address!")
}

if config == nil {
config = NewConfig()
}
config.ClientID = name

// Validate configuration
if err = config.Validate(); err != nil {
return
}

var kz *kazoo.Kazoo
if kz, err = kazoo.NewKazoo(zookeeper, config.Zookeeper); err != nil {
return
}

if len(brokers) == 0 {
brokers, err = kz.BrokerList()
if err != nil {
kz.Close()
return
}
}

group := kz.Consumergroup(name)

if config.Offsets.ResetOffsets {
err = group.ResetOffsets()
if err != nil {
kz.Close()
return
}
}

instance := group.NewInstance()

var consumer sarama.Consumer
if consumer, err = sarama.NewConsumer(brokers, config.Config); err != nil {
kz.Close()
return
}

cg = &ConsumerGroup{
config: config,
consumer: consumer,

kazoo: kz,
group: group,
instance: instance,

messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
errors: make(chan error, config.ChannelBufferSize),
stopper: make(chan struct{}),
}

// Register consumer group
if exists, err := cg.group.Exists(); err != nil {
cg.Logf("FAILED to check for existence of consumergroup: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
} else if !exists {
cg.Logf("Consumergroup `%s` does not yet exists, creating...\n", cg.group.Name)
if err := cg.group.Create(); err != nil {
cg.Logf("FAILED to create consumergroup in Zookeeper: %s!\n", err)
_ = consumer.Close()
_ = kz.Close()
return nil, err
}
}

// Register itself with zookeeper
if err := cg.instance.Register(topics); err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}

offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
cg.offsetManager = NewZookeeperOffsetManager(cg, &offsetConfig)

go cg.topicListConsumer(topics)

return
}

// Returns a channel that you can read to obtain events from Kafka to process.
func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage {
return cg.messages
Expand Down