Skip to content

Commit fa69421

Browse files
committed
[#2472] feat(spark): Add an rpc method to obtain the uniffleId and delete the Write Stage for retry at the same time.
1 parent a2095f1 commit fa69421

File tree

53 files changed

+539
-1695
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+539
-1695
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,6 @@ public class RssSparkConfig {
4646
.withDeprecatedKeys(RssClientConfig.RSS_RESUBMIT_STAGE)
4747
.withDescription("Whether to enable the resubmit stage for fetch/write failure");
4848

49-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED =
50-
ConfigOptions.key("rss.stageRetry.fetchFailureEnabled")
51-
.booleanType()
52-
.defaultValue(false)
53-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
54-
.withDescription(
55-
"If set to true, the stage retry mechanism will be enabled when a fetch failure occurs.");
56-
57-
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED =
58-
ConfigOptions.key("rss.stageRetry.writeFailureEnabled")
59-
.booleanType()
60-
.defaultValue(false)
61-
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
62-
.withDescription(
63-
"If set to true, the stage retry mechanism will be enabled when a write failure occurs.");
64-
6549
public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
6650
ConfigOptions.key("rss.blockId.selfManagementEnabled")
6751
.booleanType()

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hadoop.conf.Configuration;
3434
import org.apache.spark.SparkConf;
3535
import org.apache.spark.SparkContext;
36+
import org.apache.spark.TaskContext;
3637
import org.apache.spark.broadcast.Broadcast;
3738
import org.apache.spark.deploy.SparkHadoopUtil;
3839
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -54,7 +55,7 @@
5455
import org.apache.uniffle.common.exception.RssFetchFailedException;
5556
import org.apache.uniffle.common.util.Constants;
5657

57-
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
58+
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED;
5859

5960
public class RssSparkShuffleUtils {
6061

@@ -366,17 +367,21 @@ public static RssException reportRssFetchFailedException(
366367
SparkConf sparkConf,
367368
String appId,
368369
int shuffleId,
370+
int uniffleShuffleId,
369371
int stageAttemptId,
372+
int stageAttemptNumber,
370373
Set<Integer> failedPartitions) {
371374
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
372-
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
375+
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_ENABLED)
373376
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
374377
for (int partitionId : failedPartitions) {
375378
RssReportShuffleFetchFailureRequest req =
376379
new RssReportShuffleFetchFailureRequest(
377380
appId,
378381
shuffleId,
382+
uniffleShuffleId,
379383
stageAttemptId,
384+
stageAttemptNumber,
380385
partitionId,
381386
rssFetchFailedException.getMessage());
382387
RssReportShuffleFetchFailureResponse response =
@@ -404,4 +409,8 @@ public static boolean isSparkUIEnabled(SparkConf conf) {
404409
}
405410
return false;
406411
}
412+
413+
public static String getAppShuffleIdentifier(int appShuffleId, TaskContext context) {
414+
return appShuffleId + "-" + context.stageId() + "-" + context.stageAttemptNumber();
415+
}
407416
}

client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfoManager.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,26 @@
2121
import java.io.IOException;
2222
import java.util.Map;
2323

24-
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
24+
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
2525

2626
import org.apache.uniffle.common.util.JavaUtils;
2727

2828
public class ShuffleHandleInfoManager implements Closeable {
29-
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
29+
private Map<Integer, MutableShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
3030

3131
public ShuffleHandleInfoManager() {
3232
this.shuffleIdToShuffleHandleInfo = JavaUtils.newConcurrentMap();
3333
}
3434

35-
public ShuffleHandleInfo get(int shuffleId) {
35+
public MutableShuffleHandleInfo get(int shuffleId) {
3636
return shuffleIdToShuffleHandleInfo.get(shuffleId);
3737
}
3838

3939
public void remove(int shuffleId) {
4040
shuffleIdToShuffleHandleInfo.remove(shuffleId);
4141
}
4242

43-
public void register(int shuffleId, ShuffleHandleInfo handle) {
43+
public void register(int shuffleId, MutableShuffleHandleInfo handle) {
4444
shuffleIdToShuffleHandleInfo.put(shuffleId, handle);
4545
}
4646

client-spark/common/src/main/java/org/apache/spark/shuffle/handle/StageAttemptShuffleHandleInfo.java

Lines changed: 0 additions & 144 deletions
This file was deleted.

client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssFetchFailedIterator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ private RssFetchFailedIterator(Builder builder, Iterator<Product2<K, C>> iterato
4848
public static class Builder {
4949
private String appId;
5050
private int shuffleId;
51+
private int uniffleShuffleId;
5152
private int partitionId;
5253
private int stageAttemptId;
54+
private int stageAttemptNumber;
5355
private Supplier<ShuffleManagerClient> managerClientSupplier;
5456

5557
private Builder() {}
@@ -64,6 +66,11 @@ Builder shuffleId(int shuffleId) {
6466
return this;
6567
}
6668

69+
Builder uniffleShuffleId(int uniffleShuffleId) {
70+
this.uniffleShuffleId = uniffleShuffleId;
71+
return this;
72+
}
73+
6774
Builder partitionId(int partitionId) {
6875
this.partitionId = partitionId;
6976
return this;
@@ -74,6 +81,11 @@ Builder stageAttemptId(int stageAttemptId) {
7481
return this;
7582
}
7683

84+
Builder stageAttemptNumber(int stageAttemptNumber) {
85+
this.stageAttemptNumber = stageAttemptNumber;
86+
return this;
87+
}
88+
7789
Builder managerClientSupplier(Supplier<ShuffleManagerClient> managerClientSupplier) {
7890
this.managerClientSupplier = managerClientSupplier;
7991
return this;
@@ -95,7 +107,9 @@ private RssException generateFetchFailedIfNecessary(RssFetchFailedException e) {
95107
new RssReportShuffleFetchFailureRequest(
96108
builder.appId,
97109
builder.shuffleId,
110+
builder.uniffleShuffleId,
98111
builder.stageAttemptId,
112+
builder.stageAttemptNumber,
99113
builder.partitionId,
100114
e.getMessage());
101115
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,19 @@
2525
public class AddBlockEvent {
2626

2727
private String taskId;
28-
private int stageAttemptNumber;
2928
private List<ShuffleBlockInfo> shuffleDataInfoList;
3029
private List<Runnable> processedCallbackChain;
3130
private WriteBufferManager bufferManager;
3231

3332
public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList) {
34-
this(taskId, 0, shuffleDataInfoList, null);
33+
this(taskId, shuffleDataInfoList, null);
3534
}
3635

3736
public AddBlockEvent(
3837
String taskId,
39-
int stageAttemptNumber,
4038
List<ShuffleBlockInfo> shuffleDataInfoList,
4139
WriteBufferManager writeBufferManager) {
4240
this.taskId = taskId;
43-
this.stageAttemptNumber = stageAttemptNumber;
4441
this.shuffleDataInfoList = shuffleDataInfoList;
4542
this.processedCallbackChain = new ArrayList<>();
4643
this.bufferManager = writeBufferManager;
@@ -55,10 +52,6 @@ public String getTaskId() {
5552
return taskId;
5653
}
5754

58-
public int getStageAttemptNumber() {
59-
return stageAttemptNumber;
60-
}
61-
6255
public List<ShuffleBlockInfo> getShuffleDataInfoList() {
6356
return shuffleDataInfoList;
6457
}

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,7 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
9393
try {
9494
result =
9595
shuffleWriteClient.sendShuffleData(
96-
rssAppId,
97-
event.getStageAttemptNumber(),
98-
shuffleBlockInfoList,
99-
() -> !isValidTask(taskId));
96+
rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId));
10097
putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds());
10198
putFailedBlockSendTracker(
10299
taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker());

0 commit comments

Comments
 (0)