Skip to content

Commit 71abd2e

Browse files
feat: introduce ack trackers
1 parent 953d9ea commit 71abd2e

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

pulsar/consumer_partition.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ type partitionConsumer struct {
178178
chunkedMsgCtxMap *chunkedMsgCtxMap
179179
unAckChunksTracker *unAckChunksTracker
180180
ackGroupingTracker ackGroupingTracker
181+
ackTrackers *ackTrackers
181182

182183
lastMessageInBroker *trackingMessageID
183184

@@ -375,6 +376,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
375376
pc.decryptor = decryptor
376377

377378
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
379+
pc.ackTrackers = newAckTrackers()
378380

379381
err := pc.grabConn("")
380382
if err != nil {
@@ -443,6 +445,9 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
443445
}
444446

445447
trackingID := toTrackingMessageID(msgID)
448+
if trackingID != nil && trackingID.tracker == nil {
449+
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
450+
}
446451

447452
if trackingID != nil && trackingID.ack() {
448453
// All messages in the same batch have been acknowledged, we only need to acknowledge the
@@ -712,6 +717,9 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon
712717
if trackingID == nil {
713718
return errors.New("failed to convert trackingMessageID")
714719
}
720+
if trackingID.tracker == nil {
721+
trackingID.tracker = pc.ackTrackers.tracker(trackingID)
722+
}
715723

716724
var msgIDToAck *trackingMessageID
717725
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
@@ -1162,6 +1170,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
11621170
ackTracker)
11631171
// set the consumer so we know how to ack the message id
11641172
trackingMsgID.consumer = pc
1173+
pc.ackTrackers.add(trackingMsgID, ackTracker)
11651174

11661175
if pc.messageShouldBeDiscarded(trackingMsgID) {
11671176
pc.AckID(trackingMsgID)
@@ -2366,3 +2375,32 @@ func (u *unAckChunksTracker) nack(cmid *chunkMessageID) {
23662375
}
23672376
u.remove(cmid)
23682377
}
2378+
2379+
type ackTrackers struct {
2380+
mu sync.RWMutex
2381+
trackers map[[2]int64]*ackTracker
2382+
}
2383+
2384+
func newAckTrackers() *ackTrackers {
2385+
return &ackTrackers{
2386+
trackers: make(map[[2]int64]*ackTracker),
2387+
}
2388+
}
2389+
2390+
func (a *ackTrackers) tracker(id MessageID) *ackTracker {
2391+
a.mu.RLock()
2392+
defer a.mu.RUnlock()
2393+
return a.trackers[[2]int64{id.LedgerID(), id.EntryID()}]
2394+
}
2395+
2396+
func (a *ackTrackers) add(id MessageID, tracker *ackTracker) {
2397+
a.mu.Lock()
2398+
defer a.mu.Unlock()
2399+
a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] = tracker
2400+
}
2401+
2402+
func (a *ackTrackers) remove(id MessageID) {
2403+
a.mu.Lock()
2404+
defer a.mu.Unlock()
2405+
delete(a.trackers, [2]int64{id.LedgerID(), id.EntryID()})
2406+
}

pulsar/consumer_partition_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
3636
options: &partitionConsumerOpts{},
3737
metrics: newTestMetrics(),
3838
decryptor: crypto.NewNoopDecryptor(),
39+
ackTrackers: newAckTrackers(),
3940
}
4041
pc.availablePermits = &availablePermits{pc: &pc}
4142
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
7576
options: &partitionConsumerOpts{},
7677
metrics: newTestMetrics(),
7778
decryptor: crypto.NewNoopDecryptor(),
79+
ackTrackers: newAckTrackers(),
7880
}
7981
pc.availablePermits = &availablePermits{pc: &pc}
8082
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -111,6 +113,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
111113
options: &partitionConsumerOpts{},
112114
metrics: newTestMetrics(),
113115
decryptor: crypto.NewNoopDecryptor(),
116+
ackTrackers: newAckTrackers(),
114117
}
115118
pc.availablePermits = &availablePermits{pc: &pc}
116119
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
@@ -150,6 +153,67 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
150153
}
151154
}
152155

156+
func TestBatchMessageIDWithAckTrackers(t *testing.T) {
157+
eventsCh := make(chan interface{}, 1)
158+
pc := partitionConsumer{
159+
queueCh: make(chan []*message, 1),
160+
eventsCh: eventsCh,
161+
compressionProviders: sync.Map{},
162+
options: &partitionConsumerOpts{},
163+
metrics: newTestMetrics(),
164+
decryptor: crypto.NewNoopDecryptor(),
165+
ackTrackers: newAckTrackers(),
166+
}
167+
pc.availablePermits = &availablePermits{pc: &pc}
168+
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
169+
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)
170+
171+
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
172+
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
173+
t.Fatal(err)
174+
}
175+
176+
// ensure the tracker was set on the message id
177+
messages := <-pc.queueCh
178+
for _, m := range messages {
179+
assert.NotNil(t, m.ID().(*trackingMessageID).tracker)
180+
}
181+
182+
noAckTrackerMessages := make([]MessageID, 10)
183+
for i, m := range messages {
184+
tmp := m.ID().Serialize()
185+
mid, err := DeserializeMessageID(tmp)
186+
if err != nil {
187+
t.Fatal(err)
188+
}
189+
noAckTrackerMessages[i] = mid
190+
}
191+
192+
// ack all message ids except the last one
193+
for i := 0; i < 9; i++ {
194+
_, ok := noAckTrackerMessages[i].(*trackingMessageID)
195+
assert.False(t, ok)
196+
err := pc.AckID(noAckTrackerMessages[i])
197+
assert.Nil(t, err)
198+
}
199+
200+
select {
201+
case <-eventsCh:
202+
t.Error("The message id should not be acked!")
203+
default:
204+
}
205+
206+
// ack last message
207+
err := pc.AckID(noAckTrackerMessages[9])
208+
assert.Nil(t, err)
209+
210+
select {
211+
case <-eventsCh:
212+
default:
213+
t.Error("Expected an ack request to be triggered!")
214+
}
215+
}
216+
153217
// Raw single message in old format
154218
// metadata properties:<key:"a" value:"1" > properties:<key:"b" value:"2" >
155219
// payload = "hello"

0 commit comments

Comments
 (0)