Skip to content

Commit fb3cbd4

Browse files
committed
Fix acknowledgeCumulative never returns when accepting an invalid message id for a multi-topics consumer (#492)
(cherry picked from commit 9e119ce)
1 parent 95b5e71 commit fb3cbd4

File tree

2 files changed

+56
-11
lines changed

2 files changed

+56
-11
lines changed

lib/MultiTopicsConsumerImpl.cc

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,14 @@ void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const
650650
callback(result, msg);
651651
}
652652

653+
static void logErrorTopicNameForAcknowledge(const std::string& topic) {
654+
if (topic.empty()) {
655+
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
656+
} else {
657+
LOG_ERROR("Message of topic: " << topic << " not in consumers");
658+
}
659+
}
660+
653661
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
654662
if (state_ != Ready) {
655663
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
@@ -658,19 +666,14 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
658666
}
659667

660668
const std::string& topicPartitionName = msgId.getTopicName();
661-
if (topicPartitionName.empty()) {
662-
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
663-
callback(ResultOperationNotSupported);
664-
return;
665-
}
666669
auto optConsumer = consumers_.find(topicPartitionName);
667670

668671
if (optConsumer) {
669672
unAckedMessageTrackerPtr_->remove(msgId);
670673
optConsumer.value()->acknowledgeAsync(msgId, callback);
671674
} else {
672-
LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
673-
callback(ResultUnknownError);
675+
logErrorTopicNameForAcknowledge(topicPartitionName);
676+
callback(ResultOperationNotSupported);
674677
}
675678
}
676679

@@ -684,7 +687,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
684687
for (const MessageId& messageId : messageIdList) {
685688
auto topicName = messageId.getTopicName();
686689
if (topicName.empty()) {
687-
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
690+
logErrorTopicNameForAcknowledge(topicName);
688691
callback(ResultOperationNotSupported);
689692
return;
690693
}
@@ -710,18 +713,21 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
710713
unAckedMessageTrackerPtr_->remove(kv.second);
711714
optConsumer.value()->acknowledgeAsync(kv.second, cb);
712715
} else {
713-
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
714-
callback(ResultUnknownError);
716+
logErrorTopicNameForAcknowledge(kv.first);
717+
callback(ResultOperationNotSupported);
715718
}
716719
}
717720
}
718721

719722
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
720-
msgId.getTopicName();
723+
const auto& topic = msgId.getTopicName();
721724
auto optConsumer = consumers_.find(msgId.getTopicName());
722725
if (optConsumer) {
723726
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
724727
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
728+
} else {
729+
logErrorTopicNameForAcknowledge(topic);
730+
callback(ResultOperationNotSupported);
725731
}
726732
}
727733

tests/MultiTopicsConsumerTest.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,42 @@ TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
103103

104104
client.close();
105105
}
106+
107+
TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
108+
const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id";
109+
Client client{lookupUrl};
110+
std::vector<std::string> topics(2);
111+
for (size_t i = 0; i < topics.size(); i++) {
112+
Producer producer;
113+
auto topic = topicPrefix + unique_str();
114+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
115+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
116+
topics[i] = std::move(topic);
117+
}
118+
119+
Consumer consumer;
120+
ConsumerConfiguration conf;
121+
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
122+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
123+
124+
std::vector<MessageId> msgIds(topics.size());
125+
for (size_t i = 0; i < topics.size(); i++) {
126+
Message msg;
127+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
128+
std::string serialized;
129+
msg.getMessageId().serialize(serialized);
130+
msgIds[i] = MessageId::deserialize(serialized);
131+
}
132+
133+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
134+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
135+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
136+
137+
msgIds[0].setTopicName("invalid-topic");
138+
msgIds[1].setTopicName("invalid-topic");
139+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
140+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
141+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
142+
143+
client.close();
144+
}

0 commit comments

Comments
 (0)