Skip to content

Commit 4cd6dd2

Browse files
committed
[#2595] Add configurable 'RPC_RETRY_BACKOFF_MS' option into ShuffleServerGrpcClient
1 parent e3fee62 commit 4cd6dd2

File tree

1 file changed

+190
-42
lines changed

1 file changed

+190
-42
lines changed

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java

Lines changed: 190 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public ShuffleServerGrpcClient(String host, int port) {
163163
RssClientConf.RPC_MAX_ATTEMPTS.defaultValue(),
164164
RssClientConf.RPC_TIMEOUT_MS.defaultValue(),
165165
true,
166+
RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue(),
166167
0,
167168
0,
168169
0,
@@ -185,6 +186,9 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
185186
? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
186187
: rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS),
187188
true,
189+
rssConf == null
190+
? RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue()
191+
: rssConf.getLong(RssClientConf.RPC_RETRY_BACKOFF_MS),
188192
0,
189193
0,
190194
0,
@@ -197,6 +201,7 @@ public ShuffleServerGrpcClient(
197201
int maxRetryAttempts,
198202
long rpcTimeoutMs,
199203
boolean usePlaintext,
204+
long rpcRetryBackoffMs,
200205
int pageSize,
201206
int maxOrder,
202207
int smallCacheSize,
@@ -206,6 +211,7 @@ public ShuffleServerGrpcClient(
206211
port,
207212
maxRetryAttempts,
208213
usePlaintext,
214+
rpcRetryBackoffMs,
209215
pageSize,
210216
maxOrder,
211217
smallCacheSize,
@@ -257,30 +263,36 @@ private ShuffleRegisterResponse doRegisterShuffle(
257263
private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) {
258264
ShuffleCommitRequest request =
259265
ShuffleCommitRequest.newBuilder().setAppId(appId).setShuffleId(shuffleId).build();
260-
int retryNum = 0;
261-
while (retryNum <= maxRetryAttempts) {
262-
try {
263-
ShuffleCommitResponse response = getBlockingStub().commitShuffleTask(request);
264-
return response;
265-
} catch (Exception e) {
266-
retryNum++;
267-
LOG.warn(
268-
"Send commit to host["
269-
+ host
270-
+ "], port["
271-
+ port
272-
+ "] failed, try again, retryNum["
273-
+ retryNum
274-
+ "]",
275-
e);
276-
}
266+
try {
267+
return RetryUtils.retryWithCondition(
268+
() -> getBlockingStub().commitShuffleTask(request),
269+
null,
270+
rpcRetryBackoffMs,
271+
maxRetryAttempts,
272+
e -> e instanceof Exception);
273+
} catch (Throwable e) {
274+
String msg = "Send commit to host[" + host + "], port[" + port + "] failed";
275+
LOG.warn(msg, e);
276+
throw new RssException(msg, e);
277277
}
278-
throw new RssException("Send commit to host[" + host + "], port[" + port + "] failed");
279278
}
280279

281280
private AppHeartBeatResponse doSendHeartBeat(String appId, long timeout) {
282281
AppHeartBeatRequest request = AppHeartBeatRequest.newBuilder().setAppId(appId).build();
283282
return blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request);
283+
try {
284+
return RetryUtils.retryWithCondition(
285+
() ->
286+
blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request),
287+
null,
288+
rpcRetryBackoffMs,
289+
maxRetryAttempts,
290+
e -> e instanceof Exception);
291+
} catch (Throwable e) {
292+
String msg = "Send heartbeat to host[" + host + "], port[" + port + "] failed";
293+
LOG.warn(msg, e);
294+
throw new RssException(msg, e);
295+
}
284296
}
285297

286298
// Only for tests
@@ -449,8 +461,21 @@ private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId(
449461
@Override
450462
public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
451463
RssUnregisterShuffleByAppIdRequest request) {
452-
RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse =
453-
doUnregisterShuffleByAppId(request.getAppId(), request.getTimeoutSec());
464+
RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse;
465+
try {
466+
rpcResponse =
467+
RetryUtils.retryWithCondition(
468+
() -> doUnregisterShuffleByAppId(request.getAppId(), request.getTimeoutSec()),
469+
null,
470+
rpcRetryBackoffMs,
471+
maxRetryAttempts,
472+
e -> e instanceof Exception);
473+
} catch (Throwable e) {
474+
String msg =
475+
"unregister shuffle by app id from host[" + host + "], port[" + port + "] failed";
476+
LOG.warn(msg, e);
477+
throw new RssException(msg, e);
478+
}
454479

455480
RssUnregisterShuffleByAppIdResponse response;
456481
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
@@ -483,8 +508,22 @@ private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(
483508

484509
@Override
485510
public RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleRequest request) {
486-
RssProtos.ShuffleUnregisterResponse rpcResponse =
487-
doUnregisterShuffle(request.getAppId(), request.getShuffleId(), request.getTimeoutSec());
511+
RssProtos.ShuffleUnregisterResponse rpcResponse;
512+
try {
513+
rpcResponse =
514+
RetryUtils.retryWithCondition(
515+
() ->
516+
doUnregisterShuffle(
517+
request.getAppId(), request.getShuffleId(), request.getTimeoutSec()),
518+
null,
519+
rpcRetryBackoffMs,
520+
maxRetryAttempts,
521+
e -> e instanceof Exception);
522+
} catch (Throwable e) {
523+
String msg = "unregister shuffle from host[" + host + "], port[" + port + "] failed";
524+
LOG.warn(msg, e);
525+
throw new RssException(msg, e);
526+
}
488527

489528
RssUnregisterShuffleResponse response;
490529
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
@@ -512,17 +551,30 @@ public RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleReques
512551

513552
@Override
514553
public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request) {
515-
ShuffleRegisterResponse rpcResponse =
516-
doRegisterShuffle(
517-
request.getAppId(),
518-
request.getShuffleId(),
519-
request.getPartitionRanges(),
520-
request.getRemoteStorageInfo(),
521-
request.getUser(),
522-
request.getDataDistributionType(),
523-
request.getMaxConcurrencyPerPartitionToWrite(),
524-
request.getMergeContext(),
525-
request.getProperties());
554+
ShuffleRegisterResponse rpcResponse;
555+
try {
556+
rpcResponse =
557+
RetryUtils.retryWithCondition(
558+
() ->
559+
doRegisterShuffle(
560+
request.getAppId(),
561+
request.getShuffleId(),
562+
request.getPartitionRanges(),
563+
request.getRemoteStorageInfo(),
564+
request.getUser(),
565+
request.getDataDistributionType(),
566+
request.getMaxConcurrencyPerPartitionToWrite(),
567+
request.getMergeContext(),
568+
request.getProperties()),
569+
null,
570+
rpcRetryBackoffMs,
571+
maxRetryAttempts,
572+
e -> e instanceof Exception);
573+
} catch (Throwable e) {
574+
String msg = "register shuffle to host[" + host + "], port[" + port + "] failed";
575+
LOG.warn(msg, e);
576+
throw new RssException(msg, e);
577+
}
526578

527579
RssRegisterShuffleResponse response;
528580
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
@@ -834,7 +886,7 @@ private ReportShuffleResultResponse doReportShuffleResult(ReportShuffleResultReq
834886
return RetryUtils.retryWithCondition(
835887
() -> getBlockingStub().reportShuffleResult(rpcRequest),
836888
null, // No specific callback to execute
837-
0, // No delay between retries, retry immediately
889+
rpcRetryBackoffMs,
838890
maxRetryAttempts, // Maximum number of retry attempts
839891
t -> { // Define retry condition directly in the method call
840892
if (t instanceof StatusRuntimeException) {
@@ -863,7 +915,22 @@ public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest r
863915
.setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits)
864916
.build())
865917
.build();
866-
GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest);
918+
919+
GetShuffleResultResponse rpcResponse;
920+
try {
921+
rpcResponse =
922+
RetryUtils.retryWithCondition(
923+
() -> getBlockingStub().getShuffleResult(rpcRequest),
924+
null,
925+
rpcRetryBackoffMs,
926+
maxRetryAttempts,
927+
e -> e instanceof Exception);
928+
} catch (Throwable e) {
929+
String msg = "get shuffle result from " + host + ":" + port + " failed";
930+
LOG.warn(msg, e);
931+
throw new RssException(msg, e);
932+
}
933+
867934
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
868935

869936
RssGetShuffleResultResponse response;
@@ -911,8 +978,22 @@ public RssGetShuffleResultResponse getShuffleResultForMultiPart(
911978
.setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits)
912979
.build())
913980
.build();
914-
GetShuffleResultForMultiPartResponse rpcResponse =
915-
getBlockingStub().getShuffleResultForMultiPart(rpcRequest);
981+
982+
GetShuffleResultForMultiPartResponse rpcResponse;
983+
try {
984+
rpcResponse =
985+
RetryUtils.retryWithCondition(
986+
() -> getBlockingStub().getShuffleResultForMultiPart(rpcRequest),
987+
null,
988+
rpcRetryBackoffMs,
989+
maxRetryAttempts,
990+
e -> e instanceof Exception);
991+
} catch (Throwable e) {
992+
String msg = "get shuffle result for multi-part from " + host + ":" + port + " failed";
993+
LOG.warn(msg, e);
994+
throw new RssException(msg, e);
995+
}
996+
916997
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
917998

918999
RssGetShuffleResultResponse response;
@@ -982,7 +1063,20 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request
9821063
int retry = 0;
9831064
GetLocalShuffleDataResponse rpcResponse;
9841065
while (true) {
985-
rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
1066+
try {
1067+
rpcResponse =
1068+
RetryUtils.retryWithCondition(
1069+
() -> getBlockingStub().getLocalShuffleData(rpcRequest),
1070+
null,
1071+
rpcRetryBackoffMs,
1072+
maxRetryAttempts,
1073+
e -> e instanceof Exception);
1074+
} catch (Throwable e) {
1075+
String msg = "get shuffle data from " + host + ":" + port + " failed";
1076+
LOG.warn(msg, e);
1077+
throw new RssException(msg, e);
1078+
}
1079+
9861080
if (rpcResponse.getStatus() != NO_BUFFER) {
9871081
break;
9881082
}
@@ -1041,7 +1135,20 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ
10411135
int retry = 0;
10421136
GetLocalShuffleIndexResponse rpcResponse;
10431137
while (true) {
1044-
rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest);
1138+
try {
1139+
rpcResponse =
1140+
RetryUtils.retryWithCondition(
1141+
() -> getBlockingStub().getLocalShuffleIndex(rpcRequest),
1142+
null,
1143+
rpcRetryBackoffMs,
1144+
maxRetryAttempts,
1145+
e -> e instanceof Exception);
1146+
} catch (Throwable e) {
1147+
String msg = "get shuffle index from " + host + ":" + port + " failed";
1148+
LOG.warn(msg, e);
1149+
throw new RssException(msg, e);
1150+
}
1151+
10451152
if (rpcResponse.getStatus() != NO_BUFFER) {
10461153
break;
10471154
}
@@ -1119,7 +1226,20 @@ public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData(
11191226
int retry = 0;
11201227
GetMemoryShuffleDataResponse rpcResponse;
11211228
while (true) {
1122-
rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest);
1229+
try {
1230+
rpcResponse =
1231+
RetryUtils.retryWithCondition(
1232+
() -> getBlockingStub().getMemoryShuffleData(rpcRequest),
1233+
null,
1234+
rpcRetryBackoffMs,
1235+
maxRetryAttempts,
1236+
e -> e instanceof Exception);
1237+
} catch (Throwable e) {
1238+
String msg = "get memory shuffle data from " + host + ":" + port + " failed";
1239+
LOG.warn(msg, e);
1240+
throw new RssException(msg, e);
1241+
}
1242+
11231243
if (rpcResponse.getStatus() != NO_BUFFER) {
11241244
break;
11251245
}
@@ -1179,8 +1299,23 @@ public RssStartSortMergeResponse startSortMerge(RssStartSortMergeRequest request
11791299
.setPartitionId(request.getPartitionId())
11801300
.setUniqueBlocksBitmap(serializedBlockIdsBytes)
11811301
.build();
1302+
11821303
long start = System.currentTimeMillis();
1183-
RssProtos.StartSortMergeResponse rpcResponse = getBlockingStub().startSortMerge(rpcRequest);
1304+
RssProtos.StartSortMergeResponse rpcResponse;
1305+
try {
1306+
rpcResponse =
1307+
RetryUtils.retryWithCondition(
1308+
() -> getBlockingStub().startSortMerge(rpcRequest),
1309+
null,
1310+
rpcRetryBackoffMs,
1311+
maxRetryAttempts,
1312+
e -> e instanceof Exception);
1313+
} catch (Throwable e) {
1314+
String msg = "start sort merge to " + host + ":" + port + " failed";
1315+
LOG.warn(msg, e);
1316+
throw new RssException(msg, e);
1317+
}
1318+
11841319
String requestInfo =
11851320
"appId["
11861321
+ request.getAppId()
@@ -1242,7 +1377,20 @@ public RssGetSortedShuffleDataResponse getSortedShuffleData(
12421377
int retry = 0;
12431378
RssProtos.GetSortedShuffleDataResponse rpcResponse;
12441379
while (true) {
1245-
rpcResponse = getBlockingStub().getSortedShuffleData(rpcRequest);
1380+
try {
1381+
rpcResponse =
1382+
RetryUtils.retryWithCondition(
1383+
() -> getBlockingStub().getSortedShuffleData(rpcRequest),
1384+
null,
1385+
rpcRetryBackoffMs,
1386+
maxRetryAttempts,
1387+
e -> e instanceof Exception);
1388+
} catch (Throwable e) {
1389+
String msg = "get sorted shuffle data from " + host + ":" + port + " failed";
1390+
LOG.warn(msg, e);
1391+
throw new RssException(msg, e);
1392+
}
1393+
12461394
if (rpcResponse.getStatus() != NO_BUFFER) {
12471395
break;
12481396
}

0 commit comments

Comments
 (0)