Skip to content

Commit

Permalink
Merge pull request #878 from twmb/876
Browse files Browse the repository at this point in the history
kgo: broadcast batch finishes in one big blast
  • Loading branch information
twmb authored Jan 20, 2025
2 parents 082c308 + ead18d3 commit 9d27aac
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,12 @@ func (p *producer) promiseRecordBeforeBuf(pr promisedRec, err error) {
func (p *producer) finishPromises(b batchPromise) {
cl := p.cl
var more bool
var broadcast bool
defer func() {
if broadcast {
p.c.Broadcast()
}
}()
start:
p.promisesMu.Lock()
for i, pr := range b.recs {
Expand All @@ -564,7 +570,8 @@ start:
pr.ProducerID = b.pid
pr.ProducerEpoch = b.epoch
pr.Attrs = b.attrs
cl.finishRecordPromise(pr, b.err, b.beforeBuf)
recBroadcast := cl.finishRecordPromise(pr, b.err, b.beforeBuf)
broadcast = broadcast || recBroadcast
b.recs[i] = promisedRec{}
}
p.promisesMu.Unlock()
Expand All @@ -578,7 +585,7 @@ start:
}
}

func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) {
func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering bool) (broadcast bool) {
p := &cl.producer

if p.hooks != nil && len(p.hooks.unbuffered) > 0 {
Expand All @@ -605,12 +612,10 @@ func (cl *Client) finishRecordPromise(pr promisedRec, err error, beforeBuffering
p.mu.Lock()
p.bufferedBytes -= userSize
p.bufferedRecords--
broadcast := p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
broadcast = p.blocked.Load() > 0 || p.bufferedRecords == 0 && p.flushing.Load() > 0
p.mu.Unlock()

if broadcast {
p.c.Broadcast()
}
return broadcast
}

// partitionRecord loads the partitions for a topic and produce to them. If
Expand Down

0 comments on commit 9d27aac

Please sign in to comment.