Skip to content

Commit 7a9a33c

Browse files
[issue #1357][producer] fix: allow multiples callbacks with concurrent producer flushes (async publish) (#1409)
1 parent d471a67 commit 7a9a33c

File tree

6 files changed

+91
-24
lines changed

6 files changed

+91
-24
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,7 @@ jobs:
5050
matrix:
5151
go-version: [ '1.23', '1.24' ]
5252
steps:
53-
- uses: actions/checkout@v3
54-
- name: Check for Docker images
55-
id: check_images
56-
run: echo "::set-output name=images::$(docker images -q | wc -l)"
57-
- name: Clean Docker cache if images exist
58-
if: ${{ steps.check_images.outputs.images > 0 }}
59-
run: docker rmi $(docker images -q) -f && df -h
53+
- uses: actions/checkout@v3
6054
- uses: actions/setup-go@v3
6155
with:
6256
go-version: ${{ matrix.go-version }}

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ pulsar-perf
1616
bin
1717

1818
vendor/
19+
logs/

pulsar/consumer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5230,7 +5230,7 @@ func sendMessages(t *testing.T, client Client, topic string, startIndex int, num
52305230
}
52315231
}
52325232
}
5233-
assert.Nil(t, producer.Flush())
5233+
assert.Nil(t, producer.FlushWithCtx(ctx))
52345234
}
52355235

52365236
func receiveMessages(t *testing.T, consumer Consumer, numMessages int) []Message {
@@ -5276,10 +5276,10 @@ func TestAckResponseNotBlocked(t *testing.T) {
52765276
}
52775277
})
52785278
if i%100 == 99 {
5279-
assert.Nil(t, producer.Flush())
5279+
assert.Nil(t, producer.FlushWithCtx(ctx))
52805280
}
52815281
}
5282-
producer.Flush()
5282+
producer.FlushWithCtx(ctx)
52835283
producer.Close()
52845284

52855285
// Set a small receiver queue size to trigger ack response blocking if the internal `queueCh`

pulsar/producer_partition.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -839,15 +839,15 @@ func (p *partitionProducer) internalSingleSend(
839839

840840
type pendingItem struct {
841841
sync.Mutex
842-
ctx context.Context
843-
cancel context.CancelFunc
844-
buffer internal.Buffer
845-
sequenceID uint64
846-
createdAt time.Time
847-
sentAt time.Time
848-
sendRequests []interface{}
849-
isDone bool
850-
flushCallback func(err error)
842+
ctx context.Context
843+
cancel context.CancelFunc
844+
buffer internal.Buffer
845+
sequenceID uint64
846+
createdAt time.Time
847+
sentAt time.Time
848+
sendRequests []interface{}
849+
isDone bool
850+
flushCallbacks []func(err error)
851851
}
852852

853853
func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -1064,10 +1064,10 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
10641064
return
10651065
}
10661066

1067-
pi.flushCallback = func(err error) {
1067+
pi.flushCallbacks = append(pi.flushCallbacks, func(err error) {
10681068
fr.err = err
10691069
close(fr.doneCh)
1070-
}
1070+
})
10711071
}
10721072

10731073
// clearPendingSendRequests makes sure to push forward previous sending requests
@@ -1749,8 +1749,8 @@ func (i *pendingItem) done(err error) {
17491749
i.isDone = true
17501750
// return the buffer to the pool after all callbacks have been called.
17511751
defer i.buffer.Release()
1752-
if i.flushCallback != nil {
1753-
i.flushCallback(err)
1752+
for _, callback := range i.flushCallbacks {
1753+
callback(err)
17541754
}
17551755

17561756
if i.cancel != nil {

pulsar/producer_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,78 @@ func TestFlushInProducer(t *testing.T) {
449449
assert.Equal(t, msgCount, numOfMessages)
450450
}
451451

452+
// TestConcurrentFlushInProducer validates that concurrent flushes don't create a deadlock
453+
func TestConcurrentFlushInProducer(t *testing.T) {
454+
client, err := NewClient(ClientOptions{
455+
URL: serviceURL,
456+
})
457+
assert.NoError(t, err)
458+
defer client.Close()
459+
460+
topicName := "test-concurrent-flushes-in-producer"
461+
subName := "subscription-name"
462+
ctx := context.Background()
463+
464+
// set batch message number numOfMessages, and max delay 10s
465+
producer, err := client.CreateProducer(ProducerOptions{
466+
Topic: topicName,
467+
DisableBatching: false,
468+
})
469+
assert.Nil(t, err)
470+
defer producer.Close()
471+
472+
consumer, err := client.Subscribe(ConsumerOptions{
473+
Topic: topicName,
474+
SubscriptionName: subName,
475+
})
476+
assert.Nil(t, err)
477+
defer consumer.Close()
478+
479+
expectedMsgCount := 100
480+
481+
var wg sync.WaitGroup
482+
483+
wg.Add(expectedMsgCount)
484+
485+
errs := make(chan error, expectedMsgCount*2)
486+
487+
// Each message in sent and flushed concurrently
488+
for range expectedMsgCount {
489+
go func() {
490+
defer wg.Done()
491+
producer.SendAsync(ctx, &ProducerMessage{
492+
Payload: []byte("anythingWorksInThatPayload"),
493+
}, func(_ MessageID, _ *ProducerMessage, e error) {
494+
errs <- e
495+
})
496+
497+
errs <- producer.FlushWithCtx(ctx)
498+
}()
499+
}
500+
501+
// Wait for all concurrent async publications and flushes to complete
502+
wg.Wait()
503+
504+
// Make sure that there were no error publishing or flushing
505+
close(errs)
506+
var errElementCount int
507+
for e := range errs {
508+
errElementCount++
509+
assert.Nil(t, e)
510+
}
511+
assert.Equal(t, errElementCount, expectedMsgCount*2)
512+
513+
// Make sure all messages were processed successfully
514+
var receivedMsgCount int
515+
for range expectedMsgCount {
516+
_, err := consumer.Receive(ctx)
517+
assert.Nil(t, err)
518+
receivedMsgCount++
519+
}
520+
521+
assert.Equal(t, receivedMsgCount, expectedMsgCount)
522+
}
523+
452524
func TestFlushInPartitionedProducer(t *testing.T) {
453525
topicName := "public/default/partition-testFlushInPartitionedProducer"
454526

pulsaradmin/pkg/admin/subscription_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
126126
Payload: []byte(fmt.Sprintf("hello-%d", i)),
127127
}, nil)
128128
}
129-
err = producer.Flush()
129+
err = producer.FlushWithCtx(ctx)
130130
if err != nil {
131131
return
132132
}

0 commit comments

Comments
 (0)