Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-84] Use Barrier Execution Mode to schedule oneCCL ranks together #344

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update barrier mode
Signed-off-by: minmingzhu <[email protected]>
minmingzhu committed Aug 23, 2023
commit d71fbc311d52793624f4783ec1a997cb17b412f0
19 changes: 8 additions & 11 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
@@ -409,29 +409,28 @@ object OneDAL {

val spark = SparkSession.active
import spark.implicits._
val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).toDF().rdd
.barrier().mapPartitions(iter => iter)
val labeledPointsRDD = labeledPoints.rdd

// Repartition to executorNum if not enough partitions
val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) {
logger.info(s"Repartition to executorNum if not enough partitions")
val rePartitions = labeledPointsRDD.repartition(executorNum).cache()
val rePartitions = labeledPoints.repartition(executorNum).cache()
rePartitions.count()
rePartitions
} else {
labeledPointsRDD
labeledPoints
}

// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion.map{ row =>
val vector = row.getAs[Vector](1)
val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row =>
val vector = row.getAs[Vector](0)
vector
})

// Filter out empty partitions, if there is no such rdd, coalesce will report an error
// "No partitions or no locations for partitions found".
// TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner
val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex {
val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex {
(index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
@@ -679,14 +678,12 @@ object OneDAL {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
val barrierRDD = vectors.barrier()mapPartitions(iter => iter)


// Repartition to executorNum if not enough partitions
val dataForConversion = if (vectors.getNumPartitions < executorNum) {
barrierRDD.repartition(executorNum).setName("Repartitioned for conversion").cache()
vectors.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
barrierRDD
vectors
}

// Get dimensions for each partition
Original file line number Diff line number Diff line change
@@ -42,11 +42,11 @@ class KMeansDALImpl(var nClusters: Int,
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
kmeansTimer.record("Preprocessing")

val barrierRDD = data.barrier().mapPartitions(iter => iter)
val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice)
OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum, computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum)
}
kmeansTimer.record("Data Convertion")

@@ -95,7 +95,7 @@ class KMeansDALImpl(var nClusters: Int,
}
OneCCL.cleanup()
ret
}.barrier().mapPartitions(iter => iter).collect()
}.collect()

// Make sure there is only one result from rank 0
assert(results.length == 1)
Original file line number Diff line number Diff line change
@@ -45,16 +45,18 @@ class PCADALImpl(val k: Int,
def train(data: RDD[Vector]): PCADALModel = {
val pcaTimer = new Utils.AlgoTimeMetrics("PCA")
val normalizedData = normalizeData(data)
val barrierRDD = normalizedData.barrier().mapPartitions(iter => iter)

val sparkContext = normalizedData.sparkContext
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
pcaTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(normalizedData, executorNum,
OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(normalizedData, executorNum)
OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum)
}
val kvsIPPort = getOneCCLIPPort(coalescedTables)
pcaTimer.record("Data Convertion")
Original file line number Diff line number Diff line change
@@ -72,8 +72,9 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]],

logInfo(s"ALSDAL fit using $executorNum Executors " +
s"for $nVectors vectors and $nFeatures features")
val barrierRDD = data.barrier().mapPartitions(iter => iter)

val numericTables = data.repartition(executorNum)
val numericTables = barrierRDD.repartition(executorNum)
.setName("Repartitioned for conversion").cache()

val kvsIPPort = getOneCCLIPPort(numericTables)
Original file line number Diff line number Diff line change
@@ -35,12 +35,13 @@ class CorrelationDALImpl(
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
corTimer.record("Preprocessing")
val barrierRDD = data.barrier().mapPartitions(iter => iter)

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum,
OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum)
}
corTimer.record("Data Convertion")

Original file line number Diff line number Diff line change
@@ -36,12 +36,13 @@ class SummarizerDALImpl(val executorNum: Int,
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
sumTimer.record("Preprocessing")
val barrierRDD = data.barrier().mapPartitions(iter => iter)

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum,
OneDAL.coalesceVectorsToHomogenTables(barrierRDD, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
OneDAL.coalesceVectorsToNumericTables(barrierRDD, executorNum)
}
sumTimer.record("Data Convertion")

Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ class MLlibKMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWri
override def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString)
conf.set("spark.driver.bindAddress", "10.239.34.1");
}

test("default parameters") {
Original file line number Diff line number Diff line change
@@ -32,6 +32,8 @@ class MLlibPCASuite extends MLTest with DefaultReadWriteTest {
override def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.oap.mllib.device", TestCommon.getComputeDevice.toString)
conf.set("spark.driver.bindAddress", "10.239.34.1");

}

test("params") {