Skip to content

Commit f079ad7

Browse files
author
Liudmila Molkova
authored
Add session idle timeout (Azure#34700)
* Add session idle timeout config option
1 parent fde9319 commit f079ad7

27 files changed

+498
-151
lines changed

sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ static void addAttribute(io.opentelemetry.api.common.AttributesBuilder attribute
4343
} else if (value instanceof Byte) {
4444
attributesBuilder.put(AttributeKey.longKey(key), (Byte) value);
4545
} else {
46-
LOGGER.warning("Could not populate attribute with key '{}', type {} is not supported.", key, value.getClass().getName());
46+
LOGGER.warning("Could not populate attribute with key '{}', type '{}' is not supported.", key, value.getClass().getName());
4747
}
4848
}
4949
/**

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
### Features Added
66

7+
- Added `sessionIdleTimeout` method to configure session idle timeout on `ServiceBusSessionProcessorClientBuilder`. After this time has elapsed,
8+
the processor will close the session and attempt to process another session. ([#34700](https://github.com/Azure/azure-sdk-for-java/issues/34700))
9+
710
### Breaking Changes
811

912
### Bugs Fixed

sdk/servicebus/azure-messaging-servicebus/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
4646
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
4747
--add-reads com.azure.messaging.servicebus=com.azure.http.netty
48+
--add-reads com.azure.messaging.servicebus=com.azure.core.tracing.opentelemetry
4849
</javaModulesSurefireArgLine>
4950
</properties>
5051

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,39 @@
1212
*
1313
* @see ServiceBusReceiverAsyncClient
1414
*/
15-
class ReceiverOptions {
15+
final class ReceiverOptions {
1616
private final ServiceBusReceiveMode receiveMode;
1717
private final int prefetchCount;
1818
private final boolean enableAutoComplete;
1919
private final String sessionId;
2020
private final Integer maxConcurrentSessions;
2121
private final Duration maxLockRenewDuration;
22+
private final Duration sessionIdleTimeout;
2223

23-
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
24+
static ReceiverOptions createNonSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
2425
boolean enableAutoComplete) {
25-
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
26+
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null, null);
2627
}
2728

28-
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
29-
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
29+
static ReceiverOptions createNamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
30+
boolean enableAutoComplete, String sessionId) {
31+
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, sessionId, null, null);
32+
}
33+
34+
static ReceiverOptions createUnnamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
35+
boolean enableAutoComplete, Integer maxConcurrentSessions, Duration sessionIdleTimeout) {
36+
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions, sessionIdleTimeout);
37+
}
38+
39+
private ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
40+
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions, Duration sessionIdleTimeout) {
3041
this.receiveMode = receiveMode;
3142
this.prefetchCount = prefetchCount;
3243
this.enableAutoComplete = enableAutoComplete;
3344
this.sessionId = sessionId;
3445
this.maxConcurrentSessions = maxConcurrentSessions;
3546
this.maxLockRenewDuration = maxLockRenewDuration;
47+
this.sessionIdleTimeout = sessionIdleTimeout;
3648
}
3749

3850
/**
@@ -43,6 +55,7 @@ class ReceiverOptions {
4355
Duration getMaxLockRenewDuration() {
4456
return maxLockRenewDuration;
4557
}
58+
4659
/**
4760
* Gets the receive mode for the message.
4861
*
@@ -98,6 +111,15 @@ public Integer getMaxConcurrentSessions() {
98111
return maxConcurrentSessions;
99112
}
100113

114+
/**
115+
* Gets the {@code sessionIdleTimeout} to roll to another session if no messages wew be received.
116+
*
117+
* @return the session idle timeout.
118+
*/
119+
Duration getSessionIdleTimeout() {
120+
return sessionIdleTimeout;
121+
}
122+
101123
public boolean isEnableAutoComplete() {
102124
return enableAutoComplete;
103125
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@
6262
import java.util.UUID;
6363
import java.util.concurrent.atomic.AtomicInteger;
6464
import java.util.function.Consumer;
65-
import java.util.regex.Pattern;
6665

6766
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
67+
import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions;
68+
import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions;
6869
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
6970

7071
/**
@@ -208,7 +209,6 @@ public final class ServiceBusClientBuilder implements
208209
private static final String UNKNOWN = "UNKNOWN";
209210
private static final String LIBRARY_NAME;
210211
private static final String LIBRARY_VERSION;
211-
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
212212
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
213213
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);
214214
private final Object connectionLock = new Object();
@@ -1041,6 +1041,21 @@ public ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration
10411041
return this;
10421042
}
10431043

1044+
/**
1045+
* Sets the maximum amount of time to wait for a message to be received for the currently active session.
1046+
* After this time has elapsed, the processor will close the session and attempt to process another session.
1047+
* If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used.
1048+
*
1049+
* @param sessionIdleTimeout Session idle timeout.
1050+
* @return The updated {@link ServiceBusSessionProcessorClientBuilder} object.
1051+
* @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
1052+
*/
1053+
public ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) {
1054+
validateAndThrow(sessionIdleTimeout);
1055+
sessionReceiverClientBuilder.sessionIdleTimeout(sessionIdleTimeout);
1056+
return this;
1057+
}
1058+
10441059
/**
10451060
* Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
10461061
*
@@ -1235,6 +1250,7 @@ public final class ServiceBusSessionReceiverClientBuilder {
12351250
private String subscriptionName;
12361251
private String topicName;
12371252
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
1253+
private Duration sessionIdleTimeout = null;
12381254
private SubQueue subQueue = SubQueue.NONE;
12391255

12401256
private ServiceBusSessionReceiverClientBuilder() {
@@ -1270,6 +1286,21 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
12701286
return this;
12711287
}
12721288

1289+
/**
1290+
* Sets the maximum amount of time to wait for a message to be received for the currently active session.
1291+
* After this time has elapsed, the processor will close the session and attempt to process another session.
1292+
* If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used.
1293+
*
1294+
* @param sessionIdleTimeout Session idle timeout.
1295+
* @return The updated {@link ServiceBusSessionReceiverClientBuilder} object.
1296+
* @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
1297+
*/
1298+
ServiceBusSessionReceiverClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) {
1299+
validateAndThrow(sessionIdleTimeout);
1300+
this.sessionIdleTimeout = sessionIdleTimeout;
1301+
return this;
1302+
}
1303+
12731304
/**
12741305
* Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
12751306
*
@@ -1403,9 +1434,9 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
14031434
}
14041435

14051436
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
1406-
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
1407-
maxAutoLockRenewDuration, enableAutoComplete, null,
1408-
maxConcurrentSessions);
1437+
1438+
final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount,
1439+
maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout);
14091440

14101441
final String clientIdentifier;
14111442
if (clientOptions instanceof AmqpClientOptions) {
@@ -1484,8 +1515,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
14841515
}
14851516

14861517
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
1487-
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
1488-
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);
1518+
final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount,
1519+
maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout);
14891520

14901521
final String clientIdentifier;
14911522
if (clientOptions instanceof AmqpClientOptions) {
@@ -1792,7 +1823,6 @@ public final class ServiceBusReceiverClientBuilder {
17921823
private String subscriptionName;
17931824
private String topicName;
17941825
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
1795-
17961826
private ServiceBusReceiverClientBuilder() {
17971827
}
17981828

@@ -1967,7 +1997,7 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
19671997
}
19681998

19691999
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
1970-
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
2000+
final ReceiverOptions receiverOptions = createNonSessionOptions(receiveMode, prefetchCount,
19712001
maxAutoLockRenewDuration, enableAutoComplete);
19722002

19732003
final String clientIdentifier;

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class ServiceBusSessionManager implements AutoCloseable {
6868
private final List<Scheduler> schedulers;
6969
private final Deque<Scheduler> availableSchedulers = new ConcurrentLinkedDeque<>();
7070
private final Duration maxSessionLockRenewDuration;
71+
private final Duration sessionIdleTimeout;
7172

7273
/**
7374
* SessionId to receiver mapping.
@@ -107,6 +108,9 @@ class ServiceBusSessionManager implements AutoCloseable {
107108
this.processor = EmitterProcessor.create(numberOfSchedulers, false);
108109
this.sessionReceiveSink = processor.sink();
109110
this.receiveLink = receiveLink;
111+
this.sessionIdleTimeout = receiverOptions.getSessionIdleTimeout() != null
112+
? receiverOptions.getSessionIdleTimeout()
113+
: connectionProcessor.getRetryOptions().getTryTimeout();
110114
}
111115

112116
ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
@@ -348,8 +352,8 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
348352
}
349353

350354
return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(),
351-
receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this::renewSessionLock,
352-
maxSessionLockRenewDuration);
355+
receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock,
356+
maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null);
353357
})))
354358
.flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> {
355359
LOGGER.atVerbose()

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
6464
* @param messageSerializer Serializes and deserializes messages from Service Bus.
6565
* @param retryOptions Retry options for the receiver.
6666
* @param prefetch Number of messages to prefetch from session.
67-
* @param disposeOnIdle true to dispose the session receiver if there are no more messages and the receiver is
68-
* idle.
6967
* @param scheduler The scheduler to publish messages on.
7068
* @param renewSessionLock Function to renew the session lock.
7169
* @param maxSessionLockRenewDuration Maximum time to renew the session lock for. {@code null} or {@link
7270
* Duration#ZERO} to disable session lock renewal.
71+
* @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages
72+
* and the receiver is idle. Set it to {@code null} to not dispose receiver.
7373
*/
7474
ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
75-
AmqpRetryOptions retryOptions, int prefetch, boolean disposeOnIdle, Scheduler scheduler,
76-
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration) {
75+
AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler,
76+
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) {
7777

7878
this.receiveLink = receiveLink;
7979
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);
@@ -146,12 +146,12 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
146146

147147
// Creates a subscription that disposes/closes the receiver when there are no more messages in the session and
148148
// receiver is idle.
149-
if (disposeOnIdle) {
149+
if (sessionIdleTimeout != null) {
150150
this.subscriptions.add(Flux.switchOnNext(messageReceivedEmitter
151-
.map((String lockToken) -> Mono.delay(this.retryOptions.getTryTimeout())))
151+
.map((String lockToken) -> Mono.delay(sessionIdleTimeout)))
152152
.subscribe(item -> {
153153
withReceiveLinkInformation(LOGGER.atInfo())
154-
.addKeyValue("timeout", retryOptions.getTryTimeout())
154+
.addKeyValue("timeout", sessionIdleTimeout)
155155
.log("Did not a receive message within timeout.");
156156
cancelReceiveProcessor.onComplete();
157157
}));

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Objects;
2222

2323
import static com.azure.core.util.FluxUtil.monoError;
24+
import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions;
2425

2526
/**
2627
* This <b>asynchronous</b> session receiver client is used to acquire session locks from a queue or topic and create
@@ -136,9 +137,9 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
136137
public Mono<ServiceBusReceiverAsyncClient> acceptNextSession() {
137138
return tracer.traceMono("ServiceBus.acceptNextSession", unNamedSessionManager.getActiveLink().flatMap(receiveLink -> receiveLink.getSessionId()
138139
.map(sessionId -> {
139-
final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(),
140+
final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(),
140141
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
141-
receiverOptions.isEnableAutoComplete(), sessionId, null);
142+
receiverOptions.isEnableAutoComplete(), sessionId);
142143
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath,
143144
entityType, connectionProcessor, messageSerializer, newReceiverOptions,
144145
receiveLink, identifier);
@@ -172,9 +173,9 @@ public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId) {
172173
return monoError(LOGGER, new IllegalArgumentException("'sessionId' cannot be empty"));
173174
}
174175

175-
final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(),
176+
final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(),
176177
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
177-
receiverOptions.isEnableAutoComplete(), sessionId, null);
178+
receiverOptions.isEnableAutoComplete(), sessionId);
178179
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType,
179180
connectionProcessor, messageSerializer, newReceiverOptions, identifier);
180181

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private Throwable mapError(Throwable throwable) {
264264
*/
265265
@Override
266266
public Mono<OffsetDateTime> renewMessageLock(String lockToken, String associatedLinkName) {
267-
return isAuthorized(OPERATION_PEEK).then(createChannel.flatMap(channel -> {
267+
return isAuthorized(ManagementConstants.OPERATION_RENEW_LOCK).then(createChannel.flatMap(channel -> {
268268
final Message requestMessage = createManagementMessage(ManagementConstants.OPERATION_RENEW_LOCK,
269269
associatedLinkName);
270270
final Map<String, Object> requestBody = new HashMap<>();
@@ -428,7 +428,7 @@ public Mono<Void> updateDisposition(String lockToken, DispositionStatus disposit
428428
final UUID[] lockTokens = new UUID[]{UUID.fromString(lockToken)};
429429
return isAuthorized(OPERATION_UPDATE_DISPOSITION).then(createChannel.flatMap(channel -> {
430430
logger.atVerbose()
431-
.addKeyValue("lockTokens", Arrays.toString(lockTokens))
431+
.addKeyValue("lockTokens", () -> Arrays.toString(lockTokens))
432432
.addKeyValue(DISPOSITION_STATUS_KEY, dispositionStatus)
433433
.addKeyValue(SESSION_ID_KEY, sessionId)
434434
.log("Update disposition of deliveries.");

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic
6464
private final Mono<OffsetDateTime> sessionLockedUntil;
6565

6666
public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, Receiver receiver,
67-
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, Duration timeout,
68-
AmqpRetryPolicy retryPolicy) {
67+
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, AmqpRetryPolicy retryPolicy) {
6968
super(connection, entityPath, receiver, handler, tokenManager, provider.getReactorDispatcher(),
7069
retryPolicy.getRetryOptions());
7170
this.receiver = receiver;

0 commit comments

Comments
 (0)