Skip to content

Commit 325a776

Browse files
vbabaninkatcharovrozzanhachicha
authored
CSOT refactoring for consistent timeout handling (#1781)
This change improves the predictability, consistency, and maintainability of Client-Side Operation Timeout (CSOT) handling. All methods affected by timeouts now take an explicit OperationContext parameter instead of relying on mutable stored fields. Key updates: - Decouple OperationContext from Binding and ConnectionSource to eliminate hidden coupling and side effects. - Make OperationContext and TimeoutContext immutable. - Extract core cursor logic into CoreCursor, enabling batch and change stream cursors (both sync and async) to implement independent timeout policies over the shared core cursor logic. - Fix TimeoutContext reuse in ClientSession by creating a new TimeoutContext per operation to prevent cross-operation interference. JAVA-5640 JAVA-5644 --------- Co-authored-by: Maxim Katcharov <[email protected]> Co-authored-by: Ross Lawley <[email protected]> Co-authored-by: Nabil Hachicha <[email protected]>
1 parent 670252a commit 325a776

File tree

152 files changed

+4038
-3027
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+4038
-3027
lines changed

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 202 additions & 173 deletions
Large diffs are not rendered by default.

driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
*<p>This class is not part of the public API and may be removed or changed at any time</p>
3030
*/
3131
public interface SingleResultCallback<T> {
32+
SingleResultCallback<Void> THEN_DO_NOTHING = (r, t) -> {};
33+
3234
/**
3335
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
3436
*
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.async.function;
17+
18+
import com.mongodb.internal.async.SingleResultCallback;
19+
import com.mongodb.lang.Nullable;
20+
21+
/**
22+
* An {@linkplain AsyncCallbackFunction asynchronous callback-based function} of three parameters.
23+
* This class is a callback-based.
24+
*
25+
* <p>This class is not part of the public API and may be removed or changed at any time</p>
26+
*
27+
* @param <P1> The type of the first parameter to the function.
28+
* @param <P2> The type of the second parameter to the function.
29+
* @param <P3> The type of the third parameter to the function.
30+
* @param <R> See {@link AsyncCallbackFunction}
31+
* @see AsyncCallbackFunction
32+
*/
33+
@FunctionalInterface
34+
public interface AsyncCallbackTriFunction<P1, P2, P3, R> {
35+
/**
36+
* @param p1 The first {@code @}{@link Nullable} argument of the asynchronous function.
37+
* @param p2 The second {@code @}{@link Nullable} argument of the asynchronous function.
38+
* @param p3 The second {@code @}{@link Nullable} argument of the asynchronous function.
39+
* @see AsyncCallbackFunction#apply(Object, SingleResultCallback)
40+
*/
41+
void apply(P1 p1, P2 p2, P3 p3, SingleResultCallback<R> callback);
42+
}

driver-core/src/main/com/mongodb/internal/async/function/RetryState.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
@NotThreadSafe
4848
public final class RetryState {
4949
public static final int RETRIES = 1;
50-
private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
50+
public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
5151

5252
private final LoopState loopState;
5353
private final int attempts;
@@ -67,19 +67,16 @@ public final class RetryState {
6767
* </p>
6868
*
6969
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
70-
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
70+
* @param retryUntilTimeoutThrowsException If {@code true}, then if a {@link MongoOperationTimeoutException} is throws then retrying stops.
7171
* @see #attempts()
7272
*/
73-
public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
73+
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
7474
assertTrue(retries > 0);
75-
if (timeoutContext.hasTimeoutMS()){
76-
return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
77-
}
78-
return new RetryState(retries, null);
75+
return new RetryState(retries, retryUntilTimeoutThrowsException);
7976
}
8077

8178
public static RetryState withNonRetryableState() {
82-
return new RetryState(0, null);
79+
return new RetryState(0, false);
8380
}
8481

8582
/**
@@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() {
9491
* @see #attempts()
9592
*/
9693
public RetryState(final TimeoutContext timeoutContext) {
97-
this(INFINITE_ATTEMPTS, timeoutContext);
94+
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
9895
}
9996

10097
/**
10198
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
102-
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
99+
* @param retryUntilTimeoutThrowsException
103100
* @see #attempts()
104101
*/
105-
private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) {
102+
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
106103
assertTrue(retries >= 0);
107104
loopState = new LoopState();
108105
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
109-
this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
106+
this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
110107
}
111108

112109
/**
@@ -400,7 +397,7 @@ public int attempt() {
400397
* <ul>
401398
* <li>0 if the number of retries is {@linkplain #RetryState(TimeoutContext) unlimited};</li>
402399
* <li>1 if no retries are allowed;</li>
403-
* <li>{@link #RetryState(int, TimeoutContext) retries} + 1 otherwise.</li>
400+
* <li>{@link #RetryState(int, boolean) retries} + 1 otherwise.</li>
404401
* </ul>
405402
*
406403
* @see #attempt()

driver-core/src/main/com/mongodb/internal/binding/AsyncClusterAwareReadWriteBinding.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.internal.async.SingleResultCallback;
21+
import com.mongodb.internal.connection.OperationContext;
2122

2223
/**
2324
* <p>This class is not part of the public API and may be removed or changed at any time</p>
@@ -28,9 +29,10 @@ public interface AsyncClusterAwareReadWriteBinding extends AsyncReadWriteBinding
2829
* Returns a connection source to the specified server
2930
*
3031
* @param serverAddress the server address
32+
* @param operationContext the operation context to use
3133
* @param callback the to be passed the connection source
3234
*/
33-
void getConnectionSource(ServerAddress serverAddress, SingleResultCallback<AsyncConnectionSource> callback);
35+
void getConnectionSource(ServerAddress serverAddress, OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);
3436

3537
@Override
3638
AsyncClusterAwareReadWriteBinding retain();

driver-core/src/main/com/mongodb/internal/binding/AsyncClusterBinding.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.internal.binding;
1818

19-
import com.mongodb.ReadConcern;
2019
import com.mongodb.ReadPreference;
2120
import com.mongodb.ServerAddress;
2221
import com.mongodb.connection.ClusterConnectionMode;
@@ -33,7 +32,6 @@
3332
import com.mongodb.selector.ServerSelector;
3433

3534
import static com.mongodb.assertions.Assertions.notNull;
36-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
3735

3836
/**
3937
* A simple ReadWriteBinding implementation that supplies write connection sources bound to a possibly different primary each time, and a
@@ -44,24 +42,17 @@
4442
public class AsyncClusterBinding extends AbstractReferenceCounted implements AsyncClusterAwareReadWriteBinding {
4543
private final Cluster cluster;
4644
private final ReadPreference readPreference;
47-
private final ReadConcern readConcern;
48-
private final OperationContext operationContext;
4945

5046
/**
5147
* Creates an instance.
5248
*
5349
* @param cluster a non-null Cluster which will be used to select a server to bind to
5450
* @param readPreference a non-null ReadPreference for read operations
55-
* @param readConcern a non-null read concern
56-
* @param operationContext the operation context
5751
* <p>This class is not part of the public API and may be removed or changed at any time</p>
5852
*/
59-
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference, final ReadConcern readConcern,
60-
final OperationContext operationContext) {
53+
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference) {
6154
this.cluster = notNull("cluster", cluster);
6255
this.readPreference = notNull("readPreference", readPreference);
63-
this.readConcern = notNull("readConcern", readConcern);
64-
this.operationContext = notNull("operationContext", operationContext);
6556
}
6657

6758
@Override
@@ -76,21 +67,18 @@ public ReadPreference getReadPreference() {
7667
}
7768

7869
@Override
79-
public OperationContext getOperationContext() {
80-
return operationContext;
81-
}
82-
83-
@Override
84-
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
85-
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), callback);
70+
public void getReadConnectionSource(final OperationContext operationContext,
71+
final SingleResultCallback<AsyncConnectionSource> callback) {
72+
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), operationContext, callback);
8673
}
8774

8875
@Override
8976
public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference,
77+
final OperationContext operationContext,
9078
final SingleResultCallback<AsyncConnectionSource> callback) {
9179
// Assume 5.0+ for load-balanced mode
9280
if (cluster.getSettings().getMode() == ClusterConnectionMode.LOAD_BALANCED) {
93-
getReadConnectionSource(callback);
81+
getReadConnectionSource(operationContext, callback);
9482
} else {
9583
ReadPreferenceWithFallbackServerSelector readPreferenceWithFallbackServerSelector
9684
= new ReadPreferenceWithFallbackServerSelector(readPreference, minWireVersion, fallbackReadPreference);
@@ -106,16 +94,19 @@ public void getReadConnectionSource(final int minWireVersion, final ReadPreferen
10694
}
10795

10896
@Override
109-
public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
110-
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), callback);
97+
public void getWriteConnectionSource(final OperationContext operationContext,
98+
final SingleResultCallback<AsyncConnectionSource> callback) {
99+
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), operationContext, callback);
111100
}
112101

113102
@Override
114-
public void getConnectionSource(final ServerAddress serverAddress, final SingleResultCallback<AsyncConnectionSource> callback) {
115-
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), callback);
103+
public void getConnectionSource(final ServerAddress serverAddress, final OperationContext operationContext,
104+
final SingleResultCallback<AsyncConnectionSource> callback) {
105+
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), operationContext, callback);
116106
}
117107

118108
private void getAsyncClusterBindingConnectionSource(final ServerSelector serverSelector,
109+
final OperationContext operationContext,
119110
final SingleResultCallback<AsyncConnectionSource> callback) {
120111
cluster.selectServerAsync(serverSelector, operationContext, (result, t) -> {
121112
if (t != null) {
@@ -132,12 +123,12 @@ private final class AsyncClusterBindingConnectionSource extends AbstractReferenc
132123
private final ServerDescription serverDescription;
133124
private final ReadPreference appliedReadPreference;
134125

135-
private AsyncClusterBindingConnectionSource(final Server server, final ServerDescription serverDescription,
136-
final ReadPreference appliedReadPreference) {
126+
private AsyncClusterBindingConnectionSource(final Server server,
127+
final ServerDescription serverDescription,
128+
final ReadPreference appliedReadPreference) {
137129
this.server = server;
138130
this.serverDescription = serverDescription;
139131
this.appliedReadPreference = appliedReadPreference;
140-
operationContext.getTimeoutContext().minRoundTripTimeMS(NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos()));
141132
AsyncClusterBinding.this.retain();
142133
}
143134

@@ -146,19 +137,14 @@ public ServerDescription getServerDescription() {
146137
return serverDescription;
147138
}
148139

149-
@Override
150-
public OperationContext getOperationContext() {
151-
return operationContext;
152-
}
153-
154140
@Override
155141
public ReadPreference getReadPreference() {
156142
return appliedReadPreference;
157143
}
158144

159145
@Override
160-
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
161-
server.getConnectionAsync(operationContext, callback);
146+
public void getConnection(final OperationContext operationContext, final SingleResultCallback<AsyncConnection> callback) {
147+
server.getConnectionAsync(operationContext.withConnectionEstablishmentSessionContext(), callback);
162148
}
163149

164150
public AsyncConnectionSource retain() {

driver-core/src/main/com/mongodb/internal/binding/AsyncConnectionSource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020
import com.mongodb.connection.ServerDescription;
2121
import com.mongodb.internal.async.SingleResultCallback;
2222
import com.mongodb.internal.connection.AsyncConnection;
23+
import com.mongodb.internal.connection.OperationContext;
2324

2425
/**
2526
* A source of connections to a single MongoDB server.
2627
*
2728
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2829
*/
29-
public interface AsyncConnectionSource extends BindingContext, ReferenceCounted {
30+
public interface AsyncConnectionSource extends ReferenceCounted {
3031

3132
/**
3233
* Gets the current description of this source.
@@ -38,16 +39,17 @@ public interface AsyncConnectionSource extends BindingContext, ReferenceCounted
3839
/**
3940
* Gets the read preference that was applied when selecting this source.
4041
*
41-
* @see AsyncReadBinding#getReadConnectionSource(int, ReadPreference, SingleResultCallback)
42+
* @see AsyncReadBinding#getReadConnectionSource(int, ReadPreference, OperationContext, SingleResultCallback)
4243
*/
4344
ReadPreference getReadPreference();
4445

4546
/**
4647
* Gets a connection from this source.
4748
*
49+
* @param operationContext the operation context to use
4850
* @param callback the to be passed the connection
4951
*/
50-
void getConnection(SingleResultCallback<AsyncConnection> callback);
52+
void getConnection(OperationContext operationContext, SingleResultCallback<AsyncConnection> callback);
5153

5254
@Override
5355
AsyncConnectionSource retain();

driver-core/src/main/com/mongodb/internal/binding/AsyncReadBinding.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
import com.mongodb.ReadPreference;
2020
import com.mongodb.internal.async.SingleResultCallback;
21+
import com.mongodb.internal.connection.OperationContext;
2122

2223
/**
2324
* An asynchronous factory of connection sources to servers that can be read from and that satisfy the specified read preference.
2425
*
2526
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2627
*/
27-
public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
28+
public interface AsyncReadBinding extends ReferenceCounted {
2829
/**
2930
* The read preference that all connection sources returned by this instance will satisfy.
3031
* @return the non-null read preference
@@ -33,9 +34,10 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
3334

3435
/**
3536
* Returns a connection source to a server that satisfies the read preference with which this instance is configured.
37+
* @param operationContext the operation context to use
3638
* @param callback the to be passed the connection source
3739
*/
38-
void getReadConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
40+
void getReadConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);
3941

4042
/**
4143
* Return a connection source that satisfies the read preference with which this instance is configured, if all connected servers have
@@ -48,6 +50,7 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
4850
* @see com.mongodb.internal.operation.AggregateToCollectionOperation
4951
*/
5052
void getReadConnectionSource(int minWireVersion, ReadPreference fallbackReadPreference,
53+
OperationContext operationContext,
5154
SingleResultCallback<AsyncConnectionSource> callback);
5255

5356
@Override

driver-core/src/main/com/mongodb/internal/binding/AsyncWriteBinding.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,22 @@
1717
package com.mongodb.internal.binding;
1818

1919
import com.mongodb.internal.async.SingleResultCallback;
20+
import com.mongodb.internal.connection.OperationContext;
2021

2122
/**
2223
* An asynchronous factory of connection sources to servers that can be written to, e.g, a standalone, a mongos, or a replica set primary.
2324
*
2425
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2526
*/
26-
public interface AsyncWriteBinding extends BindingContext, ReferenceCounted {
27+
public interface AsyncWriteBinding extends ReferenceCounted {
2728

2829
/**
2930
* Supply a connection source to a server that can be written to
3031
*
32+
* @param operationContext the operation context to use
3133
* @param callback the to be passed the connection source
3234
*/
33-
void getWriteConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
35+
void getWriteConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);
3436

3537
@Override
3638
AsyncWriteBinding retain();

driver-core/src/main/com/mongodb/internal/binding/BindingContext.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)