Skip to content

Commit 7cea25f

Browse files
authored
Fixes issue where it would try to autocomplete/abandon messages that were already settled (Azure#17231)
* Follow with .NET in not throwing error when RECEIVE_AND_DELETE mode is used and AutoComplete. * Adding logic to set isSettled in client receiver and utilise it in AutoComplete. * Cancelling upstream on error. * Adding AutoComplete cancellation tests. * Check for disposed before waiting for semaphore.
1 parent 34fdaf3 commit 7cea25f

File tree

7 files changed

+164
-38
lines changed

7 files changed

+164
-38
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/FluxAutoComplete.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,24 @@ public Context currentContext() {
128128
* Applies a function and catches then logs and closes any exceptions.
129129
*
130130
* @param function Function to apply.
131-
* @param message received message to apply function to.
131+
* @param context received message to apply function to.
132132
* @param operation The operation name.
133133
*/
134134
private void applyWithCatch(Function<ServiceBusMessageContext, Mono<Void>> function,
135-
ServiceBusMessageContext message, String operation) {
135+
ServiceBusMessageContext context, String operation) {
136+
137+
// Do not settle the message again if it has already been settled.
138+
if (context.getMessage() != null && context.getMessage().isSettled()) {
139+
return;
140+
}
141+
136142
try {
137-
function.apply(message).block();
143+
function.apply(context).block();
138144
} catch (Exception e) {
139145
logger.warning("Unable to '{}' message.", operation, e);
146+
147+
// On an error, we'll stop requesting from upstream and pass the error downstream.
148+
upstream().cancel();
140149
onError(e);
141150
}
142151
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
3737
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
3838
.publish(receiverOptions.getPrefetchCount())
3939
.autoConnect(1);
40-
4140
}
4241

4342
/**

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -993,8 +993,8 @@ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAut
993993
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
994994
enableAutoComplete = false;
995995
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
996-
throw logger.logExceptionAsError(new IllegalStateException(
997-
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
996+
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
997+
enableAutoComplete = false;
998998
}
999999

10001000
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
@@ -1060,8 +1060,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
10601060
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
10611061
enableAutoComplete = false;
10621062
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
1063-
throw logger.logExceptionAsError(new IllegalStateException(
1064-
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
1063+
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
1064+
enableAutoComplete = false;
10651065
}
10661066

10671067
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
@@ -1422,8 +1422,8 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
14221422
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
14231423
enableAutoComplete = false;
14241424
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
1425-
throw logger.logExceptionAsError(new IllegalStateException(
1426-
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
1425+
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
1426+
enableAutoComplete = false;
14271427
}
14281428

14291429
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,6 @@
33

44
package com.azure.messaging.servicebus;
55

6-
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
7-
import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
8-
import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME;
9-
import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME;
10-
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
11-
import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME;
12-
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME;
13-
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME;
14-
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME;
15-
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME;
16-
176
import com.azure.core.amqp.AmqpMessageConstant;
187
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
198
import com.azure.core.amqp.models.AmqpBodyType;
@@ -31,6 +20,17 @@
3120
import java.util.Objects;
3221
import java.util.UUID;
3322

23+
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_DESCRIPTION_ANNOTATION_NAME;
24+
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_REASON_ANNOTATION_NAME;
25+
import static com.azure.core.amqp.AmqpMessageConstant.DEAD_LETTER_SOURCE_KEY_ANNOTATION_NAME;
26+
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME;
27+
import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME;
28+
import static com.azure.core.amqp.AmqpMessageConstant.LOCKED_UNTIL_KEY_ANNOTATION_NAME;
29+
import static com.azure.core.amqp.AmqpMessageConstant.PARTITION_KEY_ANNOTATION_NAME;
30+
import static com.azure.core.amqp.AmqpMessageConstant.SCHEDULED_ENQUEUE_UTC_TIME_NAME;
31+
import static com.azure.core.amqp.AmqpMessageConstant.SEQUENCE_NUMBER_ANNOTATION_NAME;
32+
import static com.azure.core.amqp.AmqpMessageConstant.VIA_PARTITION_KEY_ANNOTATION_NAME;
33+
3434
/**
3535
* This class represents a received message from Service Bus.
3636
*/
@@ -39,6 +39,12 @@ public final class ServiceBusReceivedMessage {
3939

4040
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
4141
private UUID lockToken;
42+
private boolean isSettled = false;
43+
44+
ServiceBusReceivedMessage(BinaryData body) {
45+
Objects.requireNonNull(body, "'body' cannot be null.");
46+
amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList(body.toBytes())));
47+
}
4248

4349
/**
4450
* The representation of message as defined by AMQP protocol.
@@ -52,11 +58,6 @@ public AmqpAnnotatedMessage getAmqpAnnotatedMessage() {
5258
return amqpAnnotatedMessage;
5359
}
5460

55-
ServiceBusReceivedMessage(BinaryData body) {
56-
Objects.requireNonNull(body, "'body' cannot be null.");
57-
amqpAnnotatedMessage = new AmqpAnnotatedMessage(new AmqpDataBody(Collections.singletonList(body.toBytes())));
58-
}
59-
6061
/**
6162
* Gets the actual payload/data wrapped by the {@link ServiceBusReceivedMessage}.
6263
*
@@ -432,6 +433,15 @@ public String getViaPartitionKey() {
432433
VIA_PARTITION_KEY_ANNOTATION_NAME.getValue());
433434
}
434435

436+
/**
437+
* Gets whether the message has been settled.
438+
*
439+
* @return True if the message has been settled, false otherwise.
440+
*/
441+
boolean isSettled() {
442+
return this.isSettled;
443+
}
444+
435445
/**
436446
* Sets a correlation identifier.
437447
*
@@ -493,6 +503,11 @@ void setDeliveryCount(long deliveryCount) {
493503
amqpAnnotatedMessage.getHeader().setDeliveryCount(deliveryCount);
494504
}
495505

506+
/**
507+
* Sets the message's sequence number.
508+
*
509+
* @param enqueuedSequenceNumber The message's sequence number.
510+
*/
496511
void setEnqueuedSequenceNumber(long enqueuedSequenceNumber) {
497512
amqpAnnotatedMessage.getMessageAnnotations().put(ENQUEUED_SEQUENCE_NUMBER_ANNOTATION_NAME.getValue(),
498513
enqueuedSequenceNumber);
@@ -508,12 +523,11 @@ void setEnqueuedTime(OffsetDateTime enqueuedTime) {
508523
}
509524

510525
/**
511-
* Sets the subject for the message.
526+
* Sets whether the message has been settled.
512527
*
513-
* @param subject The subject to set.
514528
*/
515-
void setSubject(String subject) {
516-
amqpAnnotatedMessage.getProperties().setSubject(subject);
529+
void setIsSettled() {
530+
this.isSettled = true;
517531
}
518532

519533
/**
@@ -583,6 +597,15 @@ void setSessionId(String sessionId) {
583597
amqpAnnotatedMessage.getProperties().setGroupId(sessionId);
584598
}
585599

600+
/**
601+
* Sets the subject for the message.
602+
*
603+
* @param subject The subject to set.
604+
*/
605+
void setSubject(String subject) {
606+
amqpAnnotatedMessage.getProperties().setSubject(subject);
607+
}
608+
586609
/**
587610
* Sets the duration of time before this message expires.
588611
*

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOption
365365
return monoError(logger, new NullPointerException(
366366
"'options.transactionContext.transactionId' cannot be null."));
367367
}
368-
return updateDisposition(message, DispositionStatus.SUSPENDED, options.getDeadLetterReason(),
368+
return updateDisposition(message, DispositionStatus.SUSPENDED, options.getDeadLetterReason(),
369369
options.getDeadLetterErrorDescription(), options.getPropertiesToModify(),
370370
options.getTransactionContext());
371371
}
@@ -919,7 +919,7 @@ public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionCo
919919
*/
920920
@Override
921921
public void close() {
922-
if (isDisposed.getAndSet(true)) {
922+
if (isDisposed.get()) {
923923
return;
924924
}
925925

@@ -929,6 +929,10 @@ public void close() {
929929
logger.info("Unable to obtain completion lock.", e);
930930
}
931931

932+
if (isDisposed.getAndSet(true)) {
933+
return;
934+
}
935+
932936
// Blocking until the last message has been completed.
933937
logger.info("Removing receiver links.");
934938
final ServiceBusAsyncConsumer disposed = consumer.getAndSet(null);
@@ -980,6 +984,9 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
980984
if (receiverOptions.getReceiveMode() != ReceiveMode.PEEK_LOCK) {
981985
return Mono.error(logger.logExceptionAsError(new UnsupportedOperationException(String.format(
982986
"'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", dispositionStatus))));
987+
} else if (message.isSettled()) {
988+
return Mono.error(logger.logExceptionAsError(
989+
new IllegalArgumentException("The message has either been deleted or already settled.")));
983990
}
984991

985992
final String sessionIdToUse;
@@ -1001,16 +1008,18 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
10011008
logger.info("{}: Management node Update completed. Disposition: {}. Lock: {}.",
10021009
entityPath, dispositionStatus, lockToken);
10031010

1011+
message.setIsSettled();
10041012
managementNodeLocks.remove(lockToken);
10051013
renewalContainer.remove(lockToken);
10061014
}));
10071015

10081016
Mono<Void> updateDispositionOperation;
10091017
if (sessionManager != null) {
1010-
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
1018+
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
10111019
propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext)
10121020
.flatMap(isSuccess -> {
10131021
if (isSuccess) {
1022+
message.setIsSettled();
10141023
renewalContainer.remove(lockToken);
10151024
return Mono.empty();
10161025
}
@@ -1028,16 +1037,19 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
10281037
.then(Mono.fromRunnable(() -> {
10291038
logger.verbose("{}: Update completed. Disposition: {}. Lock: {}.",
10301039
entityPath, dispositionStatus, lockToken);
1040+
1041+
message.setIsSettled();
10311042
renewalContainer.remove(lockToken);
10321043
}));
10331044
}
10341045
}
1046+
10351047
return updateDispositionOperation
10361048
.onErrorMap(throwable -> {
10371049
if (throwable instanceof ServiceBusReceiverException) {
10381050
return throwable;
10391051
}
1040-
1052+
10411053
switch (dispositionStatus) {
10421054
case COMPLETED:
10431055
return new ServiceBusReceiverException(throwable, ServiceBusErrorSource.COMPLETE);
@@ -1046,7 +1058,6 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
10461058
default:
10471059
return new ServiceBusReceiverException(throwable, ServiceBusErrorSource.UNKNOWN);
10481060
}
1049-
10501061
});
10511062
}
10521063

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/FluxAutoCompleteTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.concurrent.Semaphore;
2020
import java.util.function.Function;
2121

22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2224
import static org.junit.jupiter.api.Assertions.assertThrows;
2325
import static org.mockito.ArgumentMatchers.any;
2426
import static org.mockito.Mockito.doAnswer;
@@ -222,4 +224,75 @@ void onCompleteErrors() {
222224
verify(onComplete).apply(context2);
223225
verifyNoInteractions(onAbandon);
224226
}
227+
228+
/**
229+
* Verifies that if a message has been settled, we will not try to complete it.
230+
*/
231+
@Test
232+
void doesNotCompleteOnSettledMessage() {
233+
// Arrange
234+
final TestPublisher<ServiceBusMessageContext> testPublisher = TestPublisher.createCold();
235+
final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class);
236+
when(message.isSettled()).thenReturn(true);
237+
238+
final ServiceBusMessageContext context = new ServiceBusMessageContext(message);
239+
final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class);
240+
final ServiceBusMessageContext context2 = new ServiceBusMessageContext(message2);
241+
242+
final FluxAutoComplete autoComplete = new FluxAutoComplete(testPublisher.flux(), completionLock, onComplete, onAbandon);
243+
244+
when(onComplete.apply(any())).thenReturn(Mono.empty());
245+
when(onAbandon.apply(any())).thenReturn(Mono.empty());
246+
247+
// Act
248+
StepVerifier.create(autoComplete)
249+
.then(() -> testPublisher.next(context, context2))
250+
.expectNext(context, context2)
251+
.then(() -> testPublisher.complete())
252+
.expectComplete()
253+
.verify();
254+
255+
// Assert
256+
verify(onComplete, never()).apply(context);
257+
verify(onComplete).apply(context2);
258+
verifyNoInteractions(onAbandon);
259+
}
260+
261+
@SuppressWarnings("unchecked")
262+
@Test
263+
void onErrorCancelsUpstream() {
264+
// Arrange
265+
final TestPublisher<ServiceBusMessageContext> testPublisher = TestPublisher.createCold();
266+
final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class);
267+
when(message.isSettled()).thenReturn(false);
268+
final ServiceBusMessageContext context = new ServiceBusMessageContext(message);
269+
270+
final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class);
271+
when(message2.isSettled()).thenReturn(false);
272+
final ServiceBusMessageContext context2 = new ServiceBusMessageContext(message2);
273+
274+
final FluxAutoComplete autoComplete = new FluxAutoComplete(testPublisher.flux(), completionLock, onComplete, onAbandon);
275+
final CloneNotSupportedException testError = new CloneNotSupportedException("TEST error");
276+
277+
when(onComplete.apply(any())).thenReturn(Mono.error(testError), Mono.empty());
278+
when(onAbandon.apply(any())).thenReturn(Mono.empty());
279+
280+
// Act
281+
StepVerifier.create(autoComplete)
282+
.then(() -> testPublisher.next(context, context2))
283+
.expectNext(context)
284+
.consumeErrorWith(error -> {
285+
final Throwable cause = error.getCause();
286+
assertNotNull(cause);
287+
assertEquals(testError, cause);
288+
})
289+
.verify();
290+
291+
// Assert
292+
verify(onComplete).apply(context);
293+
verify(onComplete, never()).apply(context2);
294+
verifyNoInteractions(onAbandon);
295+
296+
testPublisher.assertCancelled();
297+
}
225298
}

0 commit comments

Comments
 (0)