Skip to content

Commit bc2bdc4

Browse files
xinlian12annie-mackushagraThapar
authored
AddingChannelAcquisitionConext (Azure#23464)
* add channel acquisition context Co-authored-by: annie-mac <[email protected]> Co-authored-by: Kushagra Thapar <[email protected]>
1 parent 4e3ea3e commit bc2bdc4

20 files changed

+489
-32
lines changed

eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,12 @@
12351235
<Bug pattern="SE_BAD_FIELD"/>
12361236
</Match>
12371237

1238+
<Match>
1239+
<Class name="com.azure.cosmos.CosmosException"/>
1240+
<Field name="channelAcquisitionTimeline"/>
1241+
<Bug pattern="SE_BAD_FIELD"/>
1242+
</Match>
1243+
12381244
<!-- Bug: https://github.com/Azure/azure-sdk-for-java/issues/9088 -->
12391245
<Match>
12401246
<Class name="com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdContextException"/>

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
3333
import com.azure.cosmos.implementation.directconnectivity.StoreResult;
3434
import com.azure.cosmos.implementation.directconnectivity.Uri;
35+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
3536
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
3637
import com.azure.cosmos.implementation.patch.PatchOperation;
3738
import com.azure.cosmos.implementation.query.QueryInfo;
@@ -216,6 +217,17 @@ public static <E extends CosmosException> RequestTimeline getRequestTimeline(E e
216217
return e.getRequestTimeline();
217218
}
218219

220+
@Warning(value = INTERNAL_USE_ONLY_WARNING)
221+
public static <E extends CosmosException> E setChannelAcquisitionTimeline(E e, RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
222+
e.setChannelAcquisitionTimeline(channelAcquisitionTimeline);
223+
return e;
224+
}
225+
226+
@Warning(value = INTERNAL_USE_ONLY_WARNING)
227+
public static <E extends CosmosException> RntbdChannelAcquisitionTimeline getChannelAcqusitionTimeline(E e) {
228+
return e.getChannelAcquisitionTimeline();
229+
}
230+
219231
@Warning(value = INTERNAL_USE_ONLY_WARNING)
220232
public static <E extends CosmosException> E setChannelTaskQueueSize(E e, int value) {
221233
e.setRntbdChannelTaskQueueSize(value);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.List;
6161
import java.util.UUID;
6262
import java.util.concurrent.atomic.AtomicBoolean;
63+
import java.util.concurrent.atomic.AtomicInteger;
6364
import java.util.function.Function;
6465
import java.util.stream.Collectors;
6566

@@ -444,6 +445,7 @@ public <T> CosmosPagedFlux<T> queryItems(String query, Class<T> classType) {
444445
*/
445446
@Beta(value = Beta.SinceVersion.V4_14_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
446447
public Mono<Void> openConnectionsAndInitCaches() {
448+
447449
if(isInitialized.compareAndSet(false, true)) {
448450
return this.getFeedRanges().flatMap(feedRanges -> {
449451
List<Flux<FeedResponse<ObjectNode>>> fluxList = new ArrayList<>();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.cosmos.implementation.Utils;
1212
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
1313
import com.azure.cosmos.implementation.directconnectivity.Uri;
14+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
1415
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
1516
import com.azure.cosmos.models.ModelBridgeInternal;
1617
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -51,6 +52,7 @@ public class CosmosException extends AzureException {
5152

5253
private CosmosDiagnostics cosmosDiagnostics;
5354
private RequestTimeline requestTimeline;
55+
private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
5456
private CosmosError cosmosError;
5557
private int rntbdChannelTaskQueueSize;
5658

@@ -385,6 +387,14 @@ void setRequestTimeline(RequestTimeline requestTimeline) {
385387
this.requestTimeline = requestTimeline;
386388
}
387389

390+
RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
391+
return this.channelAcquisitionTimeline;
392+
}
393+
394+
void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
395+
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
396+
}
397+
388398
void setResourceAddress(String resourceAddress) {
389399
this.resourceAddress = resourceAddress;
390400
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
229229
response.setRequestPayloadLength(request.getContentLength());
230230
response.setRntbdChannelTaskQueueSize(record.channelTaskQueueLength());
231231
response.setRntbdPendingRequestSize(record.pendingRequestQueueSize());
232+
response.setChannelAcquisitionTimeline(record.getChannelAcquisitionTimeline());
232233
}
233234

234235
})).onErrorMap(throwable -> {
@@ -262,6 +263,7 @@ public Mono<StoreResponse> invokeStoreAsync(final Uri addressUri, final RxDocume
262263
BridgeInternal.setRntbdPendingRequestQueueSize(cosmosException, record.pendingRequestQueueSize());
263264
BridgeInternal.setChannelTaskQueueSize(cosmosException, record.channelTaskQueueLength());
264265
BridgeInternal.setSendingRequestStarted(cosmosException, record.hasSendingRequestStarted());
266+
BridgeInternal.setChannelAcquisitionTimeline(cosmosException, record.getChannelAcquisitionTimeline());
265267

266268
return cosmosException;
267269
});
@@ -417,6 +419,9 @@ public static final class Options {
417419
@JsonIgnore()
418420
private final UserAgentContainer userAgent;
419421

422+
@JsonProperty()
423+
private final boolean channelAcquisitionContextEnabled;
424+
420425
// endregion
421426

422427
// region Constructors
@@ -445,6 +450,7 @@ private Options(final Builder builder) {
445450
this.shutdownTimeout = builder.shutdownTimeout;
446451
this.threadCount = builder.threadCount;
447452
this.userAgent = builder.userAgent;
453+
this.channelAcquisitionContextEnabled = builder.channelAcquisitionContextEnabled;
448454

449455
this.connectTimeout = builder.connectTimeout == null
450456
? builder.requestTimeout
@@ -472,6 +478,7 @@ private Options(final ConnectionPolicy connectionPolicy) {
472478
this.shutdownTimeout = Duration.ofSeconds(15L);
473479
this.threadCount = 2 * Runtime.getRuntime().availableProcessors();
474480
this.userAgent = new UserAgentContainer();
481+
this.channelAcquisitionContextEnabled = true;
475482
}
476483

477484
// endregion
@@ -554,6 +561,8 @@ public UserAgentContainer userAgent() {
554561
return this.userAgent;
555562
}
556563

564+
public boolean isChannelAcquisitionContextEnabled() { return this.channelAcquisitionContextEnabled; }
565+
557566
// endregion
558567

559568
// region Methods
@@ -695,6 +704,7 @@ public static class Builder {
695704
private Duration shutdownTimeout;
696705
private int threadCount;
697706
private UserAgentContainer userAgent;
707+
private boolean channelAcquisitionContextEnabled;
698708

699709
// endregion
700710

@@ -723,6 +733,7 @@ public Builder(ConnectionPolicy connectionPolicy) {
723733
this.shutdownTimeout = DEFAULT_OPTIONS.shutdownTimeout;
724734
this.threadCount = DEFAULT_OPTIONS.threadCount;
725735
this.userAgent = DEFAULT_OPTIONS.userAgent;
736+
this.channelAcquisitionContextEnabled = DEFAULT_OPTIONS.channelAcquisitionContextEnabled;
726737
}
727738

728739
// endregion

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.cosmos.implementation.HttpConstants;
88
import com.azure.cosmos.implementation.RequestTimeline;
99
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
10+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
1011
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
@@ -28,6 +29,7 @@ public class StoreResponse {
2829
private int pendingRequestQueueSize;
2930
private int requestPayloadLength;
3031
private RequestTimeline requestTimeline;
32+
private RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
3133
private int rntbdChannelTaskQueueSize;
3234
private RntbdEndpointStatistics rntbdEndpointStatistics;
3335
private int rntbdRequestLength;
@@ -170,6 +172,14 @@ RequestTimeline getRequestTimeline() {
170172
return this.requestTimeline;
171173
}
172174

175+
void setChannelAcquisitionTimeline(RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
176+
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
177+
}
178+
179+
RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
180+
return this.channelAcquisitionTimeline;
181+
}
182+
173183
void setEndpointStatistics(RntbdEndpointStatistics rntbdEndpointStatistics) {
174184
this.rntbdEndpointStatistics = rntbdEndpointStatistics;
175185
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@ public void serialize(StoreResult storeResult,
208208
jsonGenerator.writeObjectField("transportRequestTimeline", storeResult.storeResponse != null ?
209209
storeResult.storeResponse.getRequestTimeline() :
210210
storeResult.exception != null ? BridgeInternal.getRequestTimeline(storeResult.exception) : null);
211+
212+
this.writeNonNullObjectField(
213+
jsonGenerator,
214+
"transportRequestChannelAcquisitionContext",
215+
storeResult.storeResponse != null ? storeResult.storeResponse.getChannelAcquisitionTimeline() :
216+
storeResult.exception != null? BridgeInternal.getChannelAcqusitionTimeline(storeResult.exception) : null);
217+
211218
jsonGenerator.writeObjectField("rntbdRequestLengthInBytes", storeResult.storeResponse != null ?
212219
storeResult.storeResponse.getRntbdRequestLength() : BridgeInternal.getRntbdRequestLength(storeResult.exception));
213220
jsonGenerator.writeObjectField("rntbdResponseLengthInBytes", storeResult.storeResponse != null ?
@@ -225,5 +232,13 @@ public void serialize(StoreResult storeResult,
225232

226233
jsonGenerator.writeEndObject();
227234
}
235+
236+
private void writeNonNullObjectField(JsonGenerator jsonGenerator, String fieldName, Object object) throws IOException {
237+
if (object == null) {
238+
return;
239+
}
240+
241+
jsonGenerator.writeObjectField(fieldName, object);
242+
}
228243
}
229244
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/ChannelPromiseWithExpiryTime.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,22 @@
1717
class ChannelPromiseWithExpiryTime implements Promise<Channel> {
1818
private final Promise<Channel> channelPromise;
1919
private final long expiryTimeInNanos;
20+
private final RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
2021

2122
public ChannelPromiseWithExpiryTime(Promise<Channel> channelPromise, long expiryTimeInNanos) {
23+
this(channelPromise, expiryTimeInNanos, null);
24+
}
25+
26+
public ChannelPromiseWithExpiryTime(
27+
Promise<Channel> channelPromise,
28+
long expiryTimeInNanos,
29+
RntbdChannelAcquisitionTimeline channelAcquisitionTimeline) {
2230
checkNotNull(channelPromise, "channelPromise must not be null");
2331
checkNotNull(expiryTimeInNanos, "expiryTimeInNanos must not be null");
2432

2533
this.channelPromise = channelPromise;
2634
this.expiryTimeInNanos = expiryTimeInNanos;
35+
this.channelAcquisitionTimeline = channelAcquisitionTimeline;
2736
}
2837

2938
public long getExpiryTimeInNanos() {
@@ -172,4 +181,8 @@ public Promise<Channel> sync() throws InterruptedException {
172181
public Promise<Channel> syncUninterruptibly() {
173182
return this.channelPromise.syncUninterruptibly();
174183
}
175-
}
184+
185+
public RntbdChannelAcquisitionTimeline getChannelAcquisitionTimeline() {
186+
return this.channelAcquisitionTimeline;
187+
}
188+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
5+
6+
import com.fasterxml.jackson.core.JsonGenerator;
7+
import com.fasterxml.jackson.databind.SerializerProvider;
8+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
9+
10+
import java.io.IOException;
11+
import java.time.Duration;
12+
import java.time.Instant;
13+
14+
@JsonSerialize(using = RntbdChannelAcquisitionEvent.RntbdChannelAcquisitionEventJsonSerializer.class)
15+
public class RntbdChannelAcquisitionEvent {
16+
private final Instant createdTime;
17+
private final RntbdChannelAcquisitionEventType eventType;
18+
private volatile Instant completeTime;
19+
20+
public RntbdChannelAcquisitionEvent(RntbdChannelAcquisitionEventType eventType, Instant createdTime) {
21+
this.eventType = eventType;
22+
this.createdTime = createdTime;
23+
}
24+
25+
public Instant getCreatedTime() {
26+
return createdTime;
27+
}
28+
29+
public RntbdChannelAcquisitionEventType getEventType() {
30+
return eventType;
31+
}
32+
33+
public Instant getCompleteTime() {
34+
return completeTime;
35+
}
36+
37+
public void setCompleteTime(Instant completeTime) {
38+
this.completeTime = completeTime;
39+
}
40+
41+
public void complete(Instant completeTime) {
42+
this.completeTime = completeTime;
43+
}
44+
45+
public void addDetail(Object detail) {}
46+
47+
public static void addDetail(RntbdChannelAcquisitionEvent event, Object detail) {
48+
if (event != null) {
49+
event.addDetail(detail);
50+
}
51+
}
52+
53+
public static class RntbdChannelAcquisitionEventJsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<RntbdChannelAcquisitionEvent> {
54+
@Override
55+
public void serialize(RntbdChannelAcquisitionEvent event,
56+
JsonGenerator writer,
57+
SerializerProvider serializerProvider) throws IOException {
58+
writer.writeStartObject();
59+
60+
writer.writeStringField(event.eventType.toString(), event.createdTime.toString());
61+
if (event.completeTime != null) {
62+
writer.writeNumberField("durationInMicroSec",Duration.between(event.createdTime, event.completeTime).toNanos()/1000L);
63+
}
64+
65+
writer.writeEndObject();
66+
}
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
5+
6+
public enum RntbdChannelAcquisitionEventType {
7+
ATTEMPT_TO_POLL_CHANNEL("poll"),
8+
ADD_TO_PENDING_QUEUE("pending"),
9+
PENDING_TIME_OUT("pendingTimeout"),
10+
ATTEMPT_TO_CREATE_NEW_CHANNEL("startNew"),
11+
ATTEMPT_TO_CREATE_NEW_CHANNEL_COMPLETE("completeNew");
12+
13+
private String name;
14+
RntbdChannelAcquisitionEventType(String name) {
15+
this.name = name;
16+
}
17+
18+
@Override
19+
public String toString(){
20+
return name;
21+
}
22+
}

0 commit comments

Comments
 (0)