Skip to content

Commit 3f7dee5

Browse files
authored
Sb track2 Fixing some live test (Azure#17227)
Fixing live test and also fixed the bug where receiver.complete was not completing for session enabled client
1 parent fd5ba7b commit 3f7dee5

File tree

4 files changed

+43
-89
lines changed

4 files changed

+43
-89
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,10 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
10611061
});
10621062
}
10631063

1064+
Mono<ServiceBusAsyncConsumer> createConsumerWithReceiveLink() {
1065+
return Mono.fromCallable(this::getOrCreateConsumer);
1066+
}
1067+
10641068
private ServiceBusAsyncConsumer getOrCreateConsumer() {
10651069
final ServiceBusAsyncConsumer existing = consumer.get();
10661070
if (existing != null) {

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId) {
113113
final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(),
114114
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
115115
receiverOptions.isEnableAutoComplete(), sessionId, null);
116-
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType,
117-
connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions);
118116

119-
return sessionSpecificManager.getActiveLink().map(receiveLink -> new ServiceBusReceiverAsyncClient(
117+
ServiceBusReceiverAsyncClient sessionSpecificAsyncClient = new ServiceBusReceiverAsyncClient(
120118
fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor,
121-
ServiceBusConstants.OPERATION_TIMEOUT, tracerProvider, messageSerializer, () -> { },
122-
sessionSpecificManager));
119+
ServiceBusConstants.OPERATION_TIMEOUT, tracerProvider, messageSerializer, () -> { });
120+
121+
return sessionSpecificAsyncClient.createConsumerWithReceiveLink()
122+
.thenReturn(sessionSpecificAsyncClient);
123123
}
124124

125125
@Override

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java

Lines changed: 33 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import com.azure.messaging.servicebus.models.CompleteOptions;
1515
import com.azure.messaging.servicebus.models.DeadLetterOptions;
1616
import com.azure.messaging.servicebus.models.DeferOptions;
17-
import com.azure.messaging.servicebus.models.ReceiveMode;
1817
import com.azure.messaging.servicebus.models.SubQueue;
1918
import org.junit.jupiter.api.Assertions;
2019
import org.junit.jupiter.api.Disabled;
@@ -58,18 +57,11 @@
5857
class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase {
5958
private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class);
6059
private final AtomicInteger messagesPending = new AtomicInteger();
61-
private final List<Long> messagesDeferredPending = new ArrayList<>();
6260
private final boolean isSessionEnabled = false;
6361

6462
private ServiceBusReceiverAsyncClient receiver;
6563
private ServiceBusSenderAsyncClient sender;
6664

67-
/**
68-
* Receiver used to clean up resources in {@link #afterTest()}.
69-
*/
70-
private ServiceBusReceiverAsyncClient receiveAndDeleteReceiver;
71-
private Mono<ServiceBusReceiverAsyncClient> receiveAndDeleteReceiverMono;
72-
7365
ServiceBusReceiverAsyncClientIntegrationTest() {
7466
super(new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class));
7567
}
@@ -82,52 +74,11 @@ protected void beforeTest() {
8274
@Override
8375
protected void afterTest() {
8476
sharedBuilder = null;
85-
final int pending = messagesPending.get();
86-
final int pendingDeferred = messagesDeferredPending.size();
87-
if (pending < 1 && pendingDeferred < 1) {
88-
dispose(receiver, sender, receiveAndDeleteReceiver);
89-
if (receiveAndDeleteReceiverMono != null) {
90-
dispose(receiveAndDeleteReceiverMono.block());
91-
}
92-
return;
93-
}
94-
95-
// In the case that this test failed... we're going to drain the queue or subscription.
9677
try {
9778
dispose(receiver, sender);
98-
if (pending > 0) {
99-
if (receiveAndDeleteReceiverMono != null) {
100-
receiveAndDeleteReceiver = receiveAndDeleteReceiverMono.block();
101-
}
102-
receiveAndDeleteReceiver.receiveMessages()
103-
.map(message -> {
104-
logger.info("Message received: {}", message.getSequenceNumber());
105-
return message;
106-
})
107-
.timeout(Duration.ofSeconds(15), Mono.empty())
108-
.blockLast();
109-
}
11079
} catch (Exception e) {
11180
logger.warning("Error occurred when draining queue.", e);
11281
}
113-
114-
try {
115-
if (pendingDeferred > 0) {
116-
for (Long sequenceNumber : messagesDeferredPending) {
117-
receiveAndDeleteReceiver.receiveDeferredMessage(sequenceNumber)
118-
.map(message -> {
119-
logger.info("Message received: {}", message.getSequenceNumber());
120-
return message;
121-
})
122-
.timeout(Duration.ofSeconds(15), Mono.empty())
123-
.block();
124-
}
125-
}
126-
} catch (Exception e) {
127-
logger.warning("Error occurred when draining for deferred messages.", e);
128-
} finally {
129-
dispose(receiveAndDeleteReceiver);
130-
}
13182
}
13283

13384
/**
@@ -243,9 +194,6 @@ void transactionSendReceiveAndCommit(DispositionStatus dispositionStatus) {
243194
StepVerifier.create(receiver.commitTransaction(transaction.get()))
244195
.verifyComplete();
245196

246-
if (dispositionStatus == DispositionStatus.DEFERRED) {
247-
messagesDeferredPending.add(receivedMessage.getSequenceNumber());
248-
}
249197
}
250198

251199
/**
@@ -264,9 +212,6 @@ void transactionReceiveCompleteCommitMixClient(MessagingEntityType entityType) {
264212
.buildAsyncClient();
265213
this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
266214
.buildAsyncClient();
267-
this.receiveAndDeleteReceiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
268-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
269-
.buildAsyncClient();
270215

271216
final String messageId = UUID.randomUUID().toString();
272217
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);
@@ -312,10 +257,6 @@ void receiveTwoMessagesAutoComplete(MessagingEntityType entityType, boolean isSe
312257
assertNotNull(sessionId, "'sessionId' should have been set.");
313258
this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
314259
.buildAsyncClient().acceptSession(sessionId).block();
315-
this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex,
316-
shareConnection)
317-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
318-
.buildAsyncClient().acceptSession(sessionId);
319260
} else {
320261
this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
321262
.buildAsyncClient();
@@ -355,16 +296,9 @@ void receiveMessageAutoComplete(MessagingEntityType entityType, boolean isSessio
355296
assertNotNull(sessionId, "'sessionId' should have been set.");
356297
this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
357298
.buildAsyncClient().acceptSession(sessionId).block();
358-
this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex,
359-
shareConnection)
360-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
361-
.buildAsyncClient().acceptSession(sessionId);
362299
} else {
363300
this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
364301
.buildAsyncClient();
365-
this.receiveAndDeleteReceiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
366-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
367-
.buildAsyncClient();
368302
}
369303

370304
final String messageId = UUID.randomUUID().toString();
@@ -402,6 +336,12 @@ void peekMessage(MessagingEntityType entityType, boolean isSessionEnabled) {
402336
StepVerifier.create(receiver.peekMessage())
403337
.assertNext(receivedMessage -> assertMessageEquals(receivedMessage, messageId, isSessionEnabled))
404338
.verifyComplete();
339+
340+
// cleanup
341+
StepVerifier.create(receiver.receiveMessages())
342+
.assertNext(receivedMessage -> receiver.complete(receivedMessage).block(OPERATION_TIMEOUT))
343+
.thenCancel()
344+
.verify();
405345
}
406346

407347
/**
@@ -436,9 +376,10 @@ void sendScheduledMessageAndReceive(MessagingEntityType entityType, boolean isSe
436376
sender.scheduleMessage(message, scheduledEnqueueTime).block(TIMEOUT);
437377

438378
// Assert & Act
439-
StepVerifier.create(Mono.delay(Duration.ofSeconds(3)).then(receiveAndDeleteReceiver.receiveMessages().next()))
379+
StepVerifier.create(Mono.delay(Duration.ofSeconds(4)).then(receiver.receiveMessages().next()))
440380
.assertNext(receivedMessage -> {
441381
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
382+
receiver.complete(receivedMessage).block(OPERATION_TIMEOUT);
442383
messagesPending.decrementAndGet();
443384
})
444385
.verifyComplete();
@@ -558,9 +499,15 @@ void peekMessages(MessagingEntityType entityType, boolean isSessionEnabled) {
558499
.assertNext(message -> checkCorrectMessage.accept(message, 7))
559500
.verifyComplete();
560501
} finally {
561-
receiveAndDeleteReceiver.receiveMessages()
562-
.take(messages.size())
563-
.blockLast(Duration.ofSeconds(15));
502+
AtomicInteger completed = new AtomicInteger();
503+
StepVerifier.create(receiver.receiveMessages().take(messages.size()))
504+
.thenConsumeWhile(receivedMessage -> {
505+
completed.incrementAndGet();
506+
receiver.complete(receivedMessage).block(OPERATION_TIMEOUT);
507+
return completed.get() <= messages.size();
508+
})
509+
.expectComplete()
510+
.verify();
564511

565512
messagesPending.addAndGet(-messages.size());
566513
}
@@ -586,6 +533,17 @@ void peekMessagesFromSequence(MessagingEntityType entityType) {
586533
StepVerifier.create(receiver.peekMessagesAt(maxMessages, fromSequenceNumber))
587534
.expectNextCount(maxMessages)
588535
.verifyComplete();
536+
537+
// cleanup
538+
StepVerifier.create(receiver.receiveMessages().take(maxMessages))
539+
.assertNext(receivedMessage -> {
540+
receiver.complete(receivedMessage).block(Duration.ofSeconds(15));
541+
})
542+
.assertNext(receivedMessage -> {
543+
receiver.complete(receivedMessage).block(Duration.ofSeconds(15));
544+
})
545+
.expectComplete()
546+
.verify(TIMEOUT);
589547
}
590548

591549
/**
@@ -795,7 +753,7 @@ void receiveAndDefer(MessagingEntityType entityType, boolean isSessionEnabled) {
795753
@ParameterizedTest
796754
void receiveDeferredMessageBySequenceNumber(MessagingEntityType entityType, DispositionStatus dispositionStatus) {
797755
// Arrange
798-
setSenderAndReceiver(entityType, 0, false);
756+
setSenderAndReceiver(entityType, TestUtils.USE_CASE_DEFERRED_MESSAGE_BY_SEQUENCE_NUMBER, false);
799757

800758
final String messageId = UUID.randomUUID().toString();
801759
final ServiceBusMessage message = getMessage(messageId, false);
@@ -817,7 +775,6 @@ void receiveDeferredMessageBySequenceNumber(MessagingEntityType entityType, Disp
817775
switch (dispositionStatus) {
818776
case ABANDONED:
819777
operation = receiver.abandon(receivedDeferredMessage);
820-
messagesDeferredPending.add(receivedDeferredMessage.getSequenceNumber());
821778
break;
822779
case SUSPENDED:
823780
operation = receiver.deadLetter(receivedDeferredMessage);
@@ -844,9 +801,9 @@ void receiveDeferredMessageBySequenceNumber(MessagingEntityType entityType, Disp
844801
@ParameterizedTest
845802
void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType) {
846803
// Arrange
804+
final boolean isSessionEnabled = true;
847805
setSenderAndReceiver(entityType, TestUtils.USE_CASE_SEND_RECEIVE_WITH_PROPERTIES, isSessionEnabled);
848806

849-
final boolean isSessionEnabled = true;
850807
final String messageId = UUID.randomUUID().toString();
851808
final ServiceBusMessage messageToSend = getMessage(messageId, isSessionEnabled);
852809

@@ -866,7 +823,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType)
866823
sendMessage(messageToSend).block(TIMEOUT);
867824

868825
// Assert & Act
869-
StepVerifier.create(receiveAndDeleteReceiver.receiveMessages())
826+
StepVerifier.create(receiver.receiveMessages())
870827
.assertNext(receivedMessage -> {
871828
messagesPending.decrementAndGet();
872829
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
@@ -887,6 +844,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType)
887844
actual));
888845
}
889846
}
847+
receiver.complete(receivedMessage).block(OPERATION_TIMEOUT);
890848
})
891849
.thenCancel()
892850
.verify();
@@ -1232,19 +1190,11 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde
12321190
this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
12331191
.disableAutoComplete()
12341192
.buildAsyncClient().acceptSession(sessionId).block();
1235-
this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex,
1236-
shareConnection)
1237-
.disableAutoComplete()
1238-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
1239-
.buildAsyncClient().acceptSession(sessionId);
1193+
12401194
} else {
12411195
this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
12421196
.disableAutoComplete()
12431197
.buildAsyncClient();
1244-
this.receiveAndDeleteReceiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection)
1245-
.disableAutoComplete()
1246-
.receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
1247-
.buildAsyncClient();
12481198
}
12491199
}
12501200

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class TestUtils {
5151
static final int USE_CASE_MULTIPLE_RECEIVE_ONE_TIMEOUT = 5;
5252
static final int USE_CASE_PEEK_BATCH_MESSAGES = 6;
5353
static final int USE_CASE_SEND_READ_BACK_MESSAGES = 7;
54-
static final int USE_CASE_MULTIPLE_SESSION = 8;
54+
static final int USE_CASE_DEFERRED_MESSAGE_BY_SEQUENCE_NUMBER = 8;
5555
static final int USE_CASE_PEEK_MESSAGE_FROM_SEQUENCE = 9;
5656
static final int USE_CASE_PEEK_RECEIVE_AND_DEFER = 10;
5757
static final int USE_CASE_PEEK_TRANSACTION_SENDRECEIVE_AND_COMPLETE = 11;

0 commit comments

Comments
 (0)