Skip to content

Commit

Permalink
fix(panic): partition offset manager
Browse files Browse the repository at this point in the history
Signed-off-by: napallday <[email protected]>
  • Loading branch information
napallday committed Jun 19, 2024
1 parent bf40615 commit 4ac2873
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
1 change: 1 addition & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
StickyBalanceStrategyName = "sticky"

// CooperativeStickyBalanceStrategyName identifies strategies that use the cooperative sticky-partition assignment strategy
CooperativeStickyBalanceStrategyName = "cooperative-sticky"

defaultGeneration = -1
Expand Down
34 changes: 14 additions & 20 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
}

func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) error {
// flush one last time
if om.conf.Consumer.Offsets.AutoCommit.Enable {
for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
om.flushToBroker()
}
}

var errs ConsumerErrors
var errsLock sync.Mutex

Expand All @@ -150,10 +157,15 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
go func(topic string, partition int32) {
defer wg.Done()

om.pomsLock.RLock()
om.pomsLock.Lock()
pom := om.poms[topic][partition]
om.pomsLock.RUnlock()
err := pom.Close()
delete(om.poms[topic], partition)
if len(om.poms[topic]) == 0 {
delete(om.poms, topic)
}
om.pomsLock.Unlock()

if err != nil {
errsLock.Lock()
var consumerErrs ConsumerErrors
Expand All @@ -167,24 +179,6 @@ func (om *offsetManager) RemovePartitions(topicPartitions map[string][]int32) er
}
wg.Wait()

// flush one last time
if om.conf.Consumer.Offsets.AutoCommit.Enable {
for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
om.flushToBroker()
}
}

om.pomsLock.Lock()
for topic, partitions := range topicPartitions {
for _, partition := range partitions {
delete(om.poms[topic], partition)
if len(om.poms[topic]) == 0 {
delete(om.poms, topic)
}
}
}

om.pomsLock.Unlock()
if len(errs) > 0 {
return errs
}
Expand Down

0 comments on commit 4ac2873

Please sign in to comment.