diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index c7d98cac19cb..e06d0d05549c 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -595,7 +595,7 @@ jobs: fail-fast: false matrix: spark: [ "spark-3.3" ] - celeborn: [ "celeborn-0.6.1", "celeborn-0.5.4"] + celeborn: [ "celeborn-0.6.3", "celeborn-0.5.4"] writer: ["sort", "hash"] runs-on: ubuntu-22.04 container: centos:8 @@ -630,7 +630,7 @@ jobs: EXTRA_PROFILE="" if [ "${{ matrix.celeborn }}" = "celeborn-0.5.4" ]; then EXTRA_PROFILE="-Pceleborn-0.5" - elif [ "${{ matrix.celeborn }}" = "celeborn-0.6.1" ]; then + elif [ "${{ matrix.celeborn }}" = "celeborn-0.6.3" ]; then EXTRA_PROFILE="-Pceleborn-0.6" fi echo "EXTRA_PROFILE: ${EXTRA_PROFILE}" diff --git a/dev/docker/Dockerfile.centos8-dynamic-build b/dev/docker/Dockerfile.centos8-dynamic-build index b87e234bab23..48f45794a49f 100644 --- a/dev/docker/Dockerfile.centos8-dynamic-build +++ b/dev/docker/Dockerfile.centos8-dynamic-build @@ -33,7 +33,7 @@ RUN set -ex; \ yum install -y java-${JAVA_VERSION}-openjdk-devel patch wget git perl; \ mirror_host="https://www.apache.org/dyn/closer.lua"; \ wget -nv ${mirror_host}/celeborn/celeborn-0.5.4/apache-celeborn-0.5.4-bin.tgz?action=download -O /opt/apache-celeborn-0.5.4-bin.tgz; \ - wget -nv ${mirror_host}/celeborn/celeborn-0.6.1/apache-celeborn-0.6.1-bin.tgz?action=download -O /opt/apache-celeborn-0.6.1-bin.tgz; \ + wget -nv ${mirror_host}/celeborn/celeborn-0.6.3/apache-celeborn-0.6.3-bin.tgz?action=download -O /opt/apache-celeborn-0.6.3-bin.tgz; \ wget -nv ${mirror_host}/uniffle/0.10.0/apache-uniffle-0.10.0-bin.tar.gz?action=download -O /opt/apache-uniffle-0.10.0-bin.tar.gz; \ wget -nv ${mirror_host}/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz?action=download -O /opt/hadoop-2.8.5.tar.gz; \ git clone --depth=1 https://github.com/apache/gluten /opt/gluten; \ diff --git a/dev/docker/Dockerfile.centos9-dynamic-build b/dev/docker/Dockerfile.centos9-dynamic-build index 2034bf01e77f..7b3387ce8c83 100644 --- a/dev/docker/Dockerfile.centos9-dynamic-build +++ b/dev/docker/Dockerfile.centos9-dynamic-build @@ -31,7 +31,7 @@ RUN set -ex; \ yum install -y java-${JAVA_VERSION}-openjdk-devel patch wget git perl; \ mirror_host="https://www.apache.org/dyn/closer.lua"; \ wget -nv ${mirror_host}/celeborn/celeborn-0.5.4/apache-celeborn-0.5.4-bin.tgz?action=download -O /opt/apache-celeborn-0.5.4-bin.tgz; \ - wget -nv ${mirror_host}/celeborn/celeborn-0.6.1/apache-celeborn-0.6.1-bin.tgz?action=download -O /opt/apache-celeborn-0.6.1-bin.tgz; \ + wget -nv ${mirror_host}/celeborn/celeborn-0.6.3/apache-celeborn-0.6.3-bin.tgz?action=download -O /opt/apache-celeborn-0.6.3-bin.tgz; \ wget -nv ${mirror_host}/uniffle/0.10.0/apache-uniffle-0.10.0-bin.tar.gz?action=download -O /opt/apache-uniffle-0.10.0-bin.tar.gz; \ wget -nv ${mirror_host}/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz?action=download -O /opt/hadoop-2.8.5.tar.gz; \ git clone --depth=1 https://github.com/apache/gluten /opt/gluten; \ diff --git a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java index e8a8fcca6052..653ceaad139a 100644 --- a/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java +++ b/gluten-celeborn/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java @@ -700,4 +700,54 @@ public static void stopFailedShuffleCleaner(Object failedShuffleCleaner) { throw new RuntimeException(e); } } + + public static void prepareForMergeData( + ShuffleClient shuffleClient, int shuffleId, int mapId, int attemptId) { + try { + Method prepareMethod = + shuffleClient + .getClass() + .getDeclaredMethod("prepareForMergeData", Integer.TYPE, Integer.TYPE, Integer.TYPE); + prepareMethod.invoke(shuffleClient, shuffleId, mapId, attemptId); + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void mapperEnd( + ShuffleClient shuffleClient, + int shuffleId, + int mapId, + int attemptId, + int numMappers, + int numPartitions) { + try { + try { + // for Celeborn 0.6.3 + Method mapperEndMethod = + shuffleClient + .getClass() + .getDeclaredMethod( + "mapperEnd", + Integer.TYPE, + Integer.TYPE, + Integer.TYPE, + Integer.TYPE, + Integer.TYPE); + mapperEndMethod.invoke( + shuffleClient, shuffleId, mapId, attemptId, numMappers, numPartitions); + } catch (NoSuchMethodException e) { + Method mapperEndMethod = + shuffleClient + .getClass() + .getDeclaredMethod( + "mapperEnd", Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE); + mapperEndMethod.invoke(shuffleClient, shuffleId, mapId, attemptId, numMappers); + } + } catch (NoSuchMethodException ignored) { + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala index 451a020f6ff5..7ac31d797354 100644 --- a/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.SHUFFLE_COMPRESS import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle +import org.apache.spark.shuffle.gluten.celeborn.CelebornUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.BlockManager @@ -150,15 +151,27 @@ abstract class CelebornColumnarShuffleWriter[K, V]( def pushMergedDataToCeleborn(): Unit = { val pushMergedDataTime = System.nanoTime - client.prepareForMergeData(shuffleId, mapId, context.attemptNumber()) + CelebornUtils.prepareForMergeData(client, shuffleId, mapId, context.attemptNumber()) client.pushMergedData(shuffleId, mapId, context.attemptNumber) - client.mapperEnd(shuffleId, mapId, context.attemptNumber, numMappers) + CelebornUtils.mapperEnd( + client, + shuffleId, + mapId, + context.attemptNumber, + numMappers, + numPartitions) writeMetrics.incWriteTime(System.nanoTime - pushMergedDataTime) } def handleEmptyIterator(): Unit = { partitionLengths = new Array[Long](dep.partitioner.numPartitions) - client.mapperEnd(shuffleId, mapId, context.attemptNumber, numMappers) + CelebornUtils.mapperEnd( + client, + shuffleId, + mapId, + context.attemptNumber, + numMappers, + numPartitions) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) } } diff --git a/pom.xml b/pom.xml index 2a6d7ed75f83..676c7252ac1b 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ 3.3.2 33 1.12.262 - 0.6.1 + 0.6.3 0.10.0 15.0.0 15.0.0-gluten diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 487117ea2e06..1afea4edce6e 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -38,7 +38,7 @@ 3 delta-core 2.4.0 - 0.6.1 + 0.6.3 0.10.0 1.7.0-SNAPSHOT 1.1 @@ -333,7 +333,7 @@ celeborn-0.6 - 0.6.1 + 0.6.3