Skip to content

Commit 74a9c98

Browse files
xinlian12annie-macannie-mac
authored
addDecodeStage (Azure#22808)
* add decode stage Co-authored-by: annie-mac <[email protected]> Co-authored-by: annie-mac <[email protected]>
1 parent ede6bd4 commit 74a9c98

File tree

5 files changed

+62
-6
lines changed

5 files changed

+62
-6
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import javax.net.ssl.SSLException;
5656
import java.net.SocketAddress;
5757
import java.nio.channels.ClosedChannelException;
58+
import java.time.Instant;
5859
import java.util.Map;
5960
import java.util.Optional;
6061
import java.util.UUID;
@@ -730,8 +731,14 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes
730731
return;
731732
}
732733

734+
requestRecord.stage(RntbdRequestRecord.Stage.DECODE_STARTED, response.getDecodeStartTime());
735+
736+
// When decode completed, it means sdk has received the full response from server
737+
requestRecord.stage(
738+
RntbdRequestRecord.Stage.RECEIVED,
739+
response.getDecodeEndTime() != null ? response.getDecodeEndTime() : Instant.now());
740+
733741
requestRecord.responseLength(response.getMessageLength());
734-
requestRecord.stage(RntbdRequestRecord.Stage.RECEIVED);
735742

736743
final HttpResponseStatus status = response.getStatus();
737744
final UUID activityId = response.getActivityId();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestRecord.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public abstract class RntbdRequestRecord extends CompletableFuture<StoreResponse
6363
private volatile Instant timePipelined;
6464
private final Instant timeQueued;
6565
private volatile Instant timeSent;
66+
private volatile Instant timeDecodeStarted;
6667
private volatile Instant timeReceived;
6768
private volatile boolean sendingRequestHasStarted;
6869
private volatile RntbdChannelAcquisitionTimeline channelAcquisitionTimeline;
@@ -115,8 +116,10 @@ public Stage stage() {
115116
}
116117

117118
public RntbdRequestRecord stage(final Stage value) {
119+
return this.stage(value, Instant.now());
120+
}
118121

119-
final Instant time = Instant.now();
122+
public RntbdRequestRecord stage(final Stage value, Instant time) {
120123

121124
STAGE.updateAndGet(this, current -> {
122125

@@ -143,9 +146,18 @@ public RntbdRequestRecord stage(final Stage value) {
143146
}
144147
this.timeSent = time;
145148
break;
146-
case RECEIVED:
149+
150+
case DECODE_STARTED:
147151
if (current != Stage.SENT) {
148-
logger.debug("Expected transition from SENT to RECEIVED, not {} to RECEIVED", current);
152+
logger.debug("Expected transition from SENT to DECODE_STARTED, not {} to DECODE_STARTED", current);
153+
break;
154+
}
155+
this.timeDecodeStarted = time;
156+
break;
157+
158+
case RECEIVED:
159+
if (current != Stage.DECODE_STARTED) {
160+
logger.debug("Expected transition from DECODE_STARTED to RECEIVED, not {} to RECEIVED", current);
149161
break;
150162
}
151163
this.timeReceived = time;
@@ -181,6 +193,8 @@ public Instant timeCreated() {
181193
return this.args.timeCreated();
182194
}
183195

196+
public Instant timeDecodeStarted() { return this.timeDecodeStarted; }
197+
184198
public Instant timePipelined() {
185199
return this.timePipelined;
186200
}
@@ -272,6 +286,7 @@ public RequestTimeline takeTimelineSnapshot() {
272286
Instant timeChannelAcquisitionStarted = this.timeChannelAcquisitionStarted();
273287
Instant timePipelined = this.timePipelined();
274288
Instant timeSent = this.timeSent();
289+
Instant timeDecodeStarted = this.timeDecodeStarted();
275290
Instant timeReceived = this.timeReceived();
276291
Instant timeCompleted = this.timeCompleted();
277292
Instant timeCompletedOrNow = timeCompleted == null ? now : timeCompleted;
@@ -286,7 +301,9 @@ public RequestTimeline takeTimelineSnapshot() {
286301
new RequestTimeline.Event("pipelined",
287302
timePipelined, timeSent == null ? timeCompletedOrNow : timeSent),
288303
new RequestTimeline.Event("transitTime",
289-
timeSent, timeReceived == null ? timeCompletedOrNow : timeReceived),
304+
timeSent, timeDecodeStarted == null ? timeCompletedOrNow : timeDecodeStarted),
305+
new RequestTimeline.Event("decodeTime",
306+
timeDecodeStarted, timeReceived == null ? timeCompletedOrNow : timeReceived),
290307
new RequestTimeline.Event("received",
291308
timeReceived, timeCompletedOrNow),
292309
new RequestTimeline.Event("completed",
@@ -307,7 +324,7 @@ public String toString() {
307324
// region Types
308325

309326
public enum Stage {
310-
QUEUED, CHANNEL_ACQUISITION_STARTED, PIPELINED, SENT, RECEIVED, COMPLETED
327+
QUEUED, CHANNEL_ACQUISITION_STARTED, PIPELINED, SENT, DECODE_STARTED, RECEIVED, COMPLETED
311328
}
312329

313330
static final class JsonSerializer extends StdSerializer<RntbdRequestRecord> {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.io.IOException;
2323
import java.nio.charset.StandardCharsets;
24+
import java.time.Instant;
2425
import java.util.Map;
2526
import java.util.UUID;
2627
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -55,6 +56,12 @@ public final class RntbdResponse implements ReferenceCounted {
5556
@JsonProperty
5657
private volatile int referenceCount;
5758

59+
@JsonIgnore
60+
private Instant decodeStartTime;
61+
62+
@JsonIgnore
63+
private Instant decodeEndTime;
64+
5865
// endregion
5966

6067
// region Constructors
@@ -135,6 +142,22 @@ public Long getTransportRequestId() {
135142
return this.getHeader(RntbdResponseHeader.TransportRequestID);
136143
}
137144

145+
public Instant getDecodeStartTime() {
146+
return this.decodeStartTime;
147+
}
148+
149+
public void setDecodeStartTime(Instant decodeStartTime) {
150+
this.decodeStartTime = decodeStartTime;
151+
}
152+
153+
public Instant getDecodeEndTime() {
154+
return this.decodeEndTime;
155+
}
156+
157+
public void setDecodeEndTime(Instant decodeEndTime) {
158+
this.decodeEndTime = decodeEndTime;
159+
}
160+
138161
// endregion
139162

140163
// region Methods

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponseDecoder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
1111

12+
import java.time.Instant;
1213
import java.util.List;
14+
import java.util.concurrent.atomic.AtomicReference;
1315

1416
public final class RntbdResponseDecoder extends ByteToMessageDecoder {
1517

1618
private static final Logger logger = LoggerFactory.getLogger(RntbdResponseDecoder.class);
19+
private static final AtomicReference<Instant> decodeStartTime = new AtomicReference<>();
1720

1821
/**
1922
* Deserialize from an input {@link ByteBuf} to an {@link RntbdResponse} instance.
@@ -27,11 +30,16 @@ public final class RntbdResponseDecoder extends ByteToMessageDecoder {
2730
@Override
2831
protected void decode(final ChannelHandlerContext context, final ByteBuf in, final List<Object> out) {
2932

33+
decodeStartTime.compareAndSet(null, Instant.now());
34+
3035
if (RntbdFramer.canDecodeHead(in)) {
3136

3237
final RntbdResponse response = RntbdResponse.decode(in);
3338

3439
if (response != null) {
40+
response.setDecodeEndTime(Instant.now());
41+
response.setDecodeStartTime(decodeStartTime.getAndSet(null));
42+
3543
logger.debug("{} DECODE COMPLETE: {}", context.channel(), response);
3644
in.discardReadBytes();
3745
out.add(response.retain());

sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosDiagnosticsTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ private void validateTransportRequestTimelineDirect(String diagnostics) {
10511051
assertThat(diagnostics).contains("\"eventName\":\"channelAcquisitionStarted\"");
10521052
assertThat(diagnostics).contains("\"eventName\":\"pipelined\"");
10531053
assertThat(diagnostics).contains("\"eventName\":\"transitTime\"");
1054+
assertThat(diagnostics).contains("\"eventName\":\"decodeTime");
10541055
assertThat(diagnostics).contains("\"eventName\":\"received\"");
10551056
assertThat(diagnostics).contains("\"eventName\":\"completed\"");
10561057
assertThat(diagnostics).contains("\"startTimeUTC\"");

0 commit comments

Comments
 (0)