Skip to content

Commit 746ad0a

Browse files
Improved direct transport configuration in Spark to reduce number of transient failures under very high CPU/memory pressure on Spark Executors (Azure#27021)
* Release for Cosmos spark connector 4.6.1 * Fixing regression in TransientIOErrorsRetryingIterator possibly resulting in returning incomplete query results * Fixing build issue * Handling empty pages correctly * Prettifying code * Improve RNTBD config in Spark to reduce transient failures under high CPU/memory pressure * Ignoring StructuredStreamingTests and fixing changelog * Forcing to switch off of netty I/O thread immediately
1 parent 8fbbe16 commit 746ad0a

File tree

10 files changed

+76
-11
lines changed

10 files changed

+76
-11
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
## Release History
2-
### 4.6.1 (2022-02-10)
2+
### 4.6.1 (2022-02-11)
33
#### Key Bug Fixes
44
* Fixed a regression introduced in 4.5.0 that could result in returning incomplete query results when Executors are under high CPU load. - See [PR 26991](https://github.com/Azure/azure-sdk-for-java/pull/26991)
55

66
#### New Features
77
* Added support for reading from a Cosmos table without schema inference and maintaining system properties `_ts` and `_etag` when writing this data to another Cosmos container. This is helpful when moving data from one container to another without always consistent schema of the documents in the source container. - See [PR 26820](https://github.com/Azure/azure-sdk-for-java/pull/26820)
88
* Added support for new `spark.cosmos.write.strategy` value `ItemOverwriteIfNotModified`, which will allow only updating documents that haven't been modified since reading them (optimistic concurrency). - See [PR 26847](https://github.com/Azure/azure-sdk-for-java/pull/26847)
99
* Added support for correlating queries executed via the Cosmos Spark connector with service-telemetry based on the `correlationActivityId`. - See [PR 26908](https://github.com/Azure/azure-sdk-for-java/pull/26908)
10+
* Improved direct transport configuration in Spark to reduce number of transient failures under very high CPU/memory pressure on Spark Executors. - See [PR 27021](https://github.com/Azure/azure-sdk-for-java/pull/27021)
1011

1112
### 4.6.0 (2022-01-25)
1213
#### Key Bug Fixes

sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
## Release History
2-
### 4.6.1 (2022-02-10)
2+
### 4.6.1 (2022-02-11)
33
#### Key Bug Fixes
44
* Fixed a regression introduced in 4.5.0 that could result in returning incomplete query results when Executors are under high CPU load. - See [PR 26991](https://github.com/Azure/azure-sdk-for-java/pull/26991)
5+
* Improved direct transport configuration in Spark to reduce number of transient failures under very high CPU/memory pressure on Spark Executors. - See [PR 27021](https://github.com/Azure/azure-sdk-for-java/pull/27021)
56

67
#### New Features
78
* Added support for reading from a Cosmos table without schema inference and maintaining system properties `_ts` and `_etag` when writing this data to another Cosmos container. This is helpful when moving data from one container to another without always consistent schema of the documents in the source container. - See [PR 26820](https://github.com/Azure/azure-sdk-for-java/pull/26820)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/implementation/SparkBridgeImplementationInternal.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,18 @@ private[cosmos] object SparkBridgeImplementationInternal {
172172
.setIoThreadCountPerCoreFactor(config, ioThreadCountPerCoreFactor)
173173
}
174174

175+
def setIoThreadPriority
176+
(
177+
config: DirectConnectionConfig,
178+
ioThreadPriority: Int
179+
): DirectConnectionConfig = {
180+
181+
ImplementationBridgeHelpers
182+
.DirectConnectionConfigHelper
183+
.getDirectConnectionConfigAccessor
184+
.setIoThreadPriority(config, ioThreadPriority)
185+
}
186+
175187
def setUserAgentWithSnapshotInsteadOfBeta() = {
176188
HttpConstants.Versions.useSnapshotInsteadOfBeta();
177189
}

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosClientCache.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
2323
private[spark] object CosmosClientCache extends BasicLoggingTrait {
2424

2525
SparkBridgeImplementationInternal.setUserAgentWithSnapshotInsteadOfBeta()
26+
System.setProperty("COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE", "true")
2627

2728
// removing clients from the cache after 15 minutes
2829
// The clients won't be disposed - so any still running task can still keep using it
@@ -114,17 +115,26 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
114115
if (cosmosClientConfiguration.useGatewayMode){
115116
builder = builder.gatewayMode()
116117
} else {
117-
val directConfig = new DirectConnectionConfig()
118+
var directConfig = new DirectConnectionConfig()
118119
.setConnectTimeout(Duration.ofSeconds(CosmosConstants.defaultDirectRequestTimeoutInSeconds))
119120
.setNetworkRequestTimeout(Duration.ofSeconds(CosmosConstants.defaultDirectRequestTimeoutInSeconds))
120121

121-
builder = builder.directMode(
122+
directConfig =
122123
// Duplicate the default number of I/O threads per core
123124
// We know that Spark often works with large payloads and we have seen
124125
// indicators that the default number of I/O threads can be too low
125126
// for workloads with large payloads
126127
SparkBridgeImplementationInternal
127-
.setIoThreadCountPerCoreFactor(directConfig, CosmosConstants.defaultIoThreadCountFactorPerCore))
128+
.setIoThreadCountPerCoreFactor(directConfig, CosmosConstants.defaultIoThreadCountFactorPerCore)
129+
130+
directConfig =
131+
// Spark workloads often result in very high CPU load
132+
// We have seen indicators that increasing Thread priority for I/O threads
133+
// can reduce transient I/O errors/timeouts in this case
134+
SparkBridgeImplementationInternal
135+
.setIoThreadPriority(directConfig, Thread.MAX_PRIORITY)
136+
137+
builder = builder.directMode(directConfig)
128138
}
129139

130140
if (cosmosClientConfiguration.preferredRegionsList.isDefined) {

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EStructuredStreamingITest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class SparkE2EStructuredStreamingITest
3737
//scalastyle:off multiple.string.literals
3838
//scalastyle:off magic.number
3939
"spark change feed micro batch (incremental)" can
40-
"be used to copy data to another container" taggedAs(Retryable) in {
40+
"be used to copy data to another container" taggedAs(Retryable) ignore {
4141

4242
val processedRecordCount = new AtomicLong()
4343
var spark = this.createSparkSession(processedRecordCount)
@@ -177,7 +177,7 @@ class SparkE2EStructuredStreamingITest
177177
//scalastyle:off multiple.string.literals
178178
//scalastyle:off magic.number
179179
"spark change feed micro batch (incremental)" can
180-
"be used to copy data to another container capturing origin TS and etag" taggedAs(Retryable) in {
180+
"be used to copy data to another container capturing origin TS and etag" taggedAs(Retryable) ignore {
181181

182182
val processedRecordCount = new AtomicLong()
183183
var spark = this.createSparkSession(processedRecordCount)
@@ -327,7 +327,7 @@ class SparkE2EStructuredStreamingITest
327327
}
328328

329329
"spark change feed micro batch (incremental)" can
330-
"be used to copy data to another container with limit" taggedAs(Retryable) in {
330+
"be used to copy data to another container with limit" taggedAs(Retryable) ignore {
331331

332332
val processedRecordCount = new AtomicLong()
333333
var spark = this.createSparkSession(processedRecordCount)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/DirectConnectionConfig.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public final class DirectConnectionConfig {
2828
private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 130;
2929
private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30;
3030
private static final int DEFAULT_IO_THREAD_COUNT_PER_CORE_FACTOR = 2;
31+
private static final int DEFAULT_IO_THREAD_PRIORITY = Thread.NORM_PRIORITY;
3132

3233
private boolean connectionEndpointRediscoveryEnabled;
3334
private Duration connectTimeout;
@@ -37,6 +38,7 @@ public final class DirectConnectionConfig {
3738
private int maxConnectionsPerEndpoint;
3839
private int maxRequestsPerConnection;
3940
private int ioThreadCountPerCoreFactor;
41+
private int ioThreadPriority;
4042

4143
/**
4244
* Constructor
@@ -50,6 +52,7 @@ public DirectConnectionConfig() {
5052
this.maxRequestsPerConnection = DEFAULT_MAX_REQUESTS_PER_CONNECTION;
5153
this.networkRequestTimeout = DEFAULT_NETWORK_REQUEST_TIMEOUT;
5254
this.ioThreadCountPerCoreFactor = DEFAULT_IO_THREAD_COUNT_PER_CORE_FACTOR;
55+
this.ioThreadPriority = DEFAULT_IO_THREAD_PRIORITY;
5356
}
5457

5558
/**
@@ -286,6 +289,15 @@ DirectConnectionConfig setIoThreadCountPerCoreFactor(int ioThreadCountPerCoreFac
286289
return this;
287290
}
288291

292+
int getIoThreadPriority() {
293+
return ioThreadPriority;
294+
}
295+
296+
DirectConnectionConfig setIoThreadPriority(int ioThreadPriority) {
297+
this.ioThreadPriority = ioThreadPriority;
298+
return this;
299+
}
300+
289301
@Override
290302
public String toString() {
291303
return "DirectConnectionConfig{" +
@@ -296,6 +308,7 @@ public String toString() {
296308
", maxRequestsPerConnection=" + maxRequestsPerConnection +
297309
", networkRequestTimeout=" + networkRequestTimeout +
298310
", ioThreadCountPerCoreFactor=" + ioThreadCountPerCoreFactor +
311+
", ioThreadPriority=" + ioThreadPriority +
299312
'}';
300313
}
301314

@@ -316,6 +329,17 @@ public DirectConnectionConfig setIoThreadCountPerCoreFactor(DirectConnectionConf
316329
int ioThreadCountPerCoreFactor) {
317330
return config.setIoThreadCountPerCoreFactor(ioThreadCountPerCoreFactor);
318331
}
332+
333+
@Override
334+
public int getIoThreadPriority(DirectConnectionConfig config) {
335+
return config.getIoThreadPriority();
336+
}
337+
338+
@Override
339+
public DirectConnectionConfig setIoThreadPriority(DirectConnectionConfig config,
340+
int ioThreadPriority) {
341+
return config.setIoThreadPriority(ioThreadPriority);
342+
}
319343
});
320344
}
321345
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public final class ConnectionPolicy {
4545
private Duration tcpNetworkRequestTimeout;
4646
private boolean tcpConnectionEndpointRediscoveryEnabled;
4747
private int ioThreadCountPerCoreFactor;
48-
48+
private int ioThreadPriority;
4949

5050
private boolean clientTelemetryEnabled;
5151

@@ -74,6 +74,10 @@ public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) {
7474
.DirectConnectionConfigHelper
7575
.getDirectConnectionConfigAccessor()
7676
.getIoThreadCountPerCoreFactor(directConnectionConfig);
77+
this.ioThreadPriority = ImplementationBridgeHelpers
78+
.DirectConnectionConfigHelper
79+
.getDirectConnectionConfigAccessor()
80+
.getIoThreadPriority(directConnectionConfig);
7781
}
7882

7983
private ConnectionPolicy(ConnectionMode connectionMode) {
@@ -85,6 +89,7 @@ private ConnectionPolicy(ConnectionMode connectionMode) {
8589
this.readRequestsFallbackEnabled = true;
8690
this.throttlingRetryOptions = new ThrottlingRetryOptions();
8791
this.userAgentSuffix = "";
92+
this.ioThreadPriority = Thread.NORM_PRIORITY;
8893
}
8994

9095
/**
@@ -547,11 +552,18 @@ public void setClientTelemetryEnabled(boolean clientTelemetryEnabled) {
547552

548553
public int getIoThreadCountPerCoreFactor() { return this.ioThreadCountPerCoreFactor; }
549554

555+
public int getIoThreadPriority() { return this.ioThreadPriority; }
556+
550557
public ConnectionPolicy setIoThreadCountPerCoreFactor(int ioThreadCountPerCoreFactor) {
551558
this.ioThreadCountPerCoreFactor = ioThreadCountPerCoreFactor;
552559
return this;
553560
}
554561

562+
public ConnectionPolicy setIoThreadPriority(int ioThreadPriority) {
563+
this.ioThreadPriority = ioThreadPriority;
564+
return this;
565+
}
566+
555567
@Override
556568
public String toString() {
557569
return "ConnectionPolicy{" +

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ public interface DirectConnectionConfigAccessor {
148148
int getIoThreadCountPerCoreFactor(DirectConnectionConfig config);
149149
DirectConnectionConfig setIoThreadCountPerCoreFactor(
150150
DirectConnectionConfig config, int ioThreadCountPerCoreFactor);
151+
int getIoThreadPriority(DirectConnectionConfig config);
152+
DirectConnectionConfig setIoThreadPriority(
153+
DirectConnectionConfig config, int ioThreadPriority);
151154
}
152155
}
153156

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ private Options(final ConnectionPolicy connectionPolicy) {
507507
Runtime.getRuntime().availableProcessors();
508508
this.userAgent = new UserAgentContainer();
509509
this.channelAcquisitionContextEnabled = false;
510-
this.ioThreadPriority = Thread.NORM_PRIORITY;
510+
this.ioThreadPriority = connectionPolicy.getIoThreadPriority();
511511
this.tcpKeepIntvl = 1; // Configuration for EpollChannelOption.TCP_KEEPINTVL
512512
this.tcpKeepIdle = 30; // Configuration for EpollChannelOption.TCP_KEEPIDLE
513513
this.preferTcpNative = true;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck
3434
// Guidance: The grace period should be large enough to accommodate the round trip time of the slowest server
3535
// request. Assuming 1s of network RTT, a 2 MB request, a 2 MB response, a connection that can sustain 1 MB/s
3636
// both ways, and a 5-second deadline at the server, 10 seconds should be enough.
37-
private static final long readHangGracePeriodInNanos = 10L * 1_000_000_000L;
37+
// Adding an additional 45 seconds grace period because of relatively high number of
38+
// false negatives here under high CPU load (in Spark for example)
39+
private static final long readHangGracePeriodInNanos = (45L + 10L) * 1_000_000_000L;
3840

3941
// A channel will not be declared unhealthy if a write was attempted recently. As such gaps between
4042
// Timestamps.lastChannelWriteAttempt and Timestamps.lastChannelWrite lower than this value are ignored.

0 commit comments

Comments
 (0)