Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-404] Fix bug #405

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
be55d63
update spark to 3.3.3
minmingzhu Sep 18, 2023
6d2b055
Merge branch 'oap-project:master' into master
minmingzhu Sep 18, 2023
833444e
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
23f0491
Merge branch 'oap-project:master' into master
minmingzhu Sep 19, 2023
f42d3be
Merge branch 'oap-project:master' into master
minmingzhu Sep 21, 2023
db6e64c
Merge branch 'oap-project:master' into master
minmingzhu Sep 26, 2023
f3f58bb
Merge branch 'oap-project:master' into master
minmingzhu Oct 9, 2023
262a746
Merge branch 'oap-project:master' into master
minmingzhu Oct 11, 2023
7802b2d
Merge branch 'oap-project:master' into master
minmingzhu Mar 5, 2024
efb9097
Merge branch 'oap-project:master' into master
minmingzhu Mar 25, 2024
c3f570b
Merge branch 'oap-project:master' into master
minmingzhu Mar 26, 2024
3bee6bb
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
5d08ac0
Merge branch 'oap-project:master' into master
minmingzhu Jul 22, 2024
49bdc75
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
0a44fbe
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
7ff407c
Merge branch 'oap-project:master' into master
minmingzhu Sep 10, 2024
c29e617
Merge branch 'oap-project:master' into master
minmingzhu Oct 16, 2024
58b206e
Merge branch 'oap-project:master' into master
minmingzhu Oct 23, 2024
071eb75
Merge branch 'oap-project:master' into master
minmingzhu Oct 31, 2024
adc2381
Merge branch 'oap-project:master' into master
minmingzhu Nov 14, 2024
011db1e
update
minmingzhu Jan 23, 2025
c735609
update
minmingzhu Jan 23, 2025
d244db0
update
minmingzhu Feb 14, 2025
cd5565b
update
minmingzhu Feb 14, 2025
72bc651
update
minmingzhu Feb 14, 2025
a18d8d3
update
minmingzhu Feb 14, 2025
8a70573
format cpp style
minmingzhu Feb 17, 2025
db249d9
update format
minmingzhu Feb 17, 2025
ca972d3
update
minmingzhu Feb 25, 2025
1c2070d
update
minmingzhu Feb 26, 2025
beba660
update
minmingzhu Feb 26, 2025
ab92ce3
update
minmingzhu Feb 26, 2025
700fdea
update
minmingzhu Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions mllib-dal/src/main/native/CorrelationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ static void doCorrelationOneAPICompute(
printHomegenTable(result_train.get_cor_matrix());
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 -
t1)
.count();
logger::println(
logger::INFO,
"Correlation batch(native): computing step took %d secs.",
duration / 1000);
logger::println(logger::INFO,
"Correlation (native): training step took %f secs.",
duration / 1000);
// Return all covariance & mean
jclass clazz = env->GetObjectClass(resultObj);

Expand Down
10 changes: 9 additions & 1 deletion mllib-dal/src/main/native/DecisionForestClassifierImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ static jobject doRFClassifierOneAPICompute(
.set_max_tree_depth(maxTreeDepth)
.set_max_bins(maxBins);

auto t1 = std::chrono::high_resolution_clock::now();
const auto result_train =
preview::train(comm, df_desc, hFeaturetable, hLabeltable);
const auto result_infer =
Expand All @@ -261,7 +262,14 @@ static jobject doRFClassifierOneAPICompute(
printHomegenTable(result_infer.get_responses());
logger::println(logger::INFO, "Probabilities results:\n");
printHomegenTable(result_infer.get_probabilities());

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 -
t1)
.count();
logger::println(logger::INFO,
"RF Classifier (native): training step took %f secs.",
duration / 1000);
// convert to java hashmap
trees = collect_model(env, result_train.get_model(), classCount);

Expand Down
5 changes: 3 additions & 2 deletions mllib-dal/src/main/native/KMeansImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ static jlong doKMeansOneAPICompute(
printHomegenTable(result_train.get_model().get_centroids());
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 -
t1)
.count();
logger::println(logger::INFO,
"KMeans (native): training step took %d secs",
"KMeans (native): training step took %f secs",
duration / 1000);
// Get the class of the input object
jclass clazz = env->GetObjectClass(resultObj);
Expand Down
11 changes: 10 additions & 1 deletion mllib-dal/src/main/native/LinearRegressionImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,21 @@ static jlong doLROneAPICompute(
linear_regression_gpu::train_input local_input{xtrain, ytrain};
const auto linear_regression_desc =
linear_regression_gpu::descriptor<GpuAlgorithmFPType>(fitIntercept);

auto t1 = std::chrono::high_resolution_clock::now();
linear_regression_gpu::train_result result_train =
preview::train(comm, linear_regression_desc, xtrain, ytrain);
if (isRoot) {
HomogenTablePtr result_matrix = std::make_shared<homogen_table>(
result_train.get_model().get_betas());
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 -
t1)
.count();
logger::println(
logger::INFO,
"Linear regression (native): training step took %f secs.",
duration / 1000);
saveHomogenTablePtrToVector(result_matrix);
return (jlong)result_matrix.get();
} else {
Expand Down
10 changes: 8 additions & 2 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port,
jint computeDeviceOrdinal, jobject param) {

logger::println(logger::INFO, "OneCCL (native): init");
logger::println(logger::INFO, "OneCCL (native): init rank %d size %d", rank,
size);
const char *str = env->GetStringUTFChars(ip_port, 0);
ccl::string ccl_ip_port(str);
auto &singletonCCLInit = CCLInitSingleton::get(size, rank, ccl_ip_port);
Expand All @@ -87,7 +88,12 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
#ifdef CPU_GPU_PROFILE
case ComputeDevice::gpu: {
auto gpus = get_gpus();
sycl::queue queue{gpus[0]};
auto gpus_count = gpus.size();
logger::println(logger::INFO, "OneCCL (native): gpus_count %d",
gpus_count);
sycl::device selected_device;
selected_device = gpus[rank % gpus_count];
sycl::queue queue{selected_device};
auto t1 = std::chrono::high_resolution_clock::now();
auto comm = oneapi::dal::preview::spmd::make_communicator<
oneapi::dal::preview::spmd::backend::ccl>(queue, size, rank,
Expand Down
24 changes: 10 additions & 14 deletions mllib-dal/src/main/native/PCAImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,26 +198,26 @@ static void doPCAOneAPICompute(

auto t1 = std::chrono::high_resolution_clock::now();
const auto result = preview::compute(comm, cov_desc, htable);
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
logger::println(logger::INFO, "PCA (native): Covariance step took %d secs",
duration / 1000);
if (isRoot) {
using float_t = GpuAlgorithmFPType;
using method_t = pca_gpu::method::precomputed;
using task_t = pca_gpu::task::dim_reduction;
using descriptor_t = pca_gpu::descriptor<float_t, method_t, task_t>;
const auto pca_desc = descriptor_t().set_deterministic(true);

t1 = std::chrono::high_resolution_clock::now();
const auto result_train =
preview::train(comm, pca_desc, result.get_cov_matrix());
t2 = std::chrono::high_resolution_clock::now();
duration =
std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
logger::println(logger::INFO, "Eigenvectors:");
printHomegenTable(result_train.get_eigenvectors());
logger::println(logger::INFO, "Eigenvalues:");
printHomegenTable(result_train.get_eigenvalues());

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
(float)std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1)
.count();
logger::println(logger::INFO, "PCA (native): Eigen step took %d secs",
logger::println(logger::INFO,
"PCA (native): training step took %f secs",
duration / 1000);
// Return all eigenvalues & eigenvectors
// Get the class of the input object
Expand All @@ -227,10 +227,6 @@ static void doPCAOneAPICompute(
env->GetFieldID(clazz, "pcNumericTable", "J");
jfieldID explainedVarianceNumericTableField =
env->GetFieldID(clazz, "explainedVarianceNumericTable", "J");
logger::println(logger::INFO, "Eigenvectors:");
printHomegenTable(result_train.get_eigenvectors());
logger::println(logger::INFO, "Eigenvalues:");
printHomegenTable(result_train.get_eigenvalues());

HomogenTablePtr eigenvectors =
std::make_shared<homogen_table>(result_train.get_eigenvectors());
Expand Down
2 changes: 1 addition & 1 deletion mllib-dal/src/main/native/SummarizerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ static void doSummarizerOneAPICompute(
t1)
.count();
logger::println(logger::INFO,
"Summarizer (native): computing step took %d secs",
"Summarizer (native): training took %f secs",
duration / 1000);
// Return all covariance & mean
jclass clazz = env->GetObjectClass(resultObj);
Expand Down
40 changes: 31 additions & 9 deletions mllib-dal/src/main/native/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,33 @@ inline void printHomegenTable(const oneapi::dal::table &table) {
if (table.get_row_count() <= 10) {
for (std::int64_t i = 0; i < table.get_row_count(); i++) {
logger::print(logger::INFO, "");
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
if (table.get_column_count() <= 20) {
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
} else {
for (std::int64_t j = 0; j < 20; j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
}
logger::println(logger::NONE, "");
}

} else {
for (std::int64_t i = 0; i < 5; i++) {
logger::print(logger::INFO, "");
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
if (table.get_column_count() <= 20) {
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
} else {
for (std::int64_t j = 0; j < 20; j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
}
logger::println(logger::NONE, "");
}
Expand All @@ -109,9 +124,16 @@ inline void printHomegenTable(const oneapi::dal::table &table) {
for (std::int64_t i = table.get_row_count() - 5;
i < table.get_row_count(); i++) {
logger::print(logger::INFO, "");
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
if (table.get_column_count() <= 20) {
for (std::int64_t j = 0; j < table.get_column_count(); j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
} else {
for (std::int64_t j = 0; j < 20; j++) {
logger::print(logger::NONE, "%10f",
x[i * table.get_column_count() + j]);
}
}
logger::println(logger::NONE, "");
}
Expand Down
13 changes: 3 additions & 10 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/CommonJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,17 @@ package com.intel.oap.mllib
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkEnv

object CommonJob {
def initCCLAndSetAffinityMask(data: RDD[_],
executorNum: Int,
kvsIPPort: String,
useDevice: String): Unit = {
data.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort,
val executorId = SparkEnv.get.executorId.toInt
OneCCL.init(executorNum, executorId, kvsIPPort,
Common.ComputeDevice.getDeviceByName(useDevice).ordinal())
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
Array.empty[Int]
}
if (gpuIndices.nonEmpty) {
OneCCL.setAffinityMask(gpuIndices(0).toString())
}
Iterator.empty
}.count()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.annotation.Since
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.linalg.{Matrix, Vector}
Expand Down Expand Up @@ -121,7 +121,12 @@ class RandomForestClassifierDALImpl(val uid: String,

logInfo(s"RandomForestClassifierDAL compute took ${durationCompute} secs")

val ret = if (rank == 0) {
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}
val ret = if (isRoot) {
Iterator(hashmap)
} else {
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.intel.oap.mllib.clustering
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -92,8 +92,12 @@ class KMeansDALImpl(var nClusters: Int,
gpuIndices,
result
)

val ret = if (rank == 0) {
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}
val ret = if (isRoot) {
assert(cCentroids != 0)
val centerVectors = if (useDevice == "GPU") {
OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cCentroids))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.DoubleBuffer
import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable}
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Service, Utils}
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg._
Expand Down Expand Up @@ -86,8 +86,13 @@ class PCADALImpl(val k: Int,
gpuIndices,
result
)
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}

val ret = if (rank == 0) {
val ret = if (isRoot) {
val principleComponents = if (useDevice == "GPU") {
val pcNumericTable = OneDAL.makeHomogenTable(result.getPcNumericTable)
getPrincipleComponentsFromOneAPI(pcNumericTable, k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.intel.oap.mllib.regression
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.SparkException
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{DenseVector, Vector}
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -156,7 +155,12 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
result
)

val ret = if (rank == 0) {
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}
val ret = if (isRoot) {
val coefficientArray = if (useDevice == "GPU") {
OneDAL.homogenTableToVectors(OneDAL.makeHomogenTable(cbeta))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.classification.{LearningNode, RandomForestResult}
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.linalg.Matrix
Expand Down Expand Up @@ -112,7 +112,12 @@ class RandomForestRegressorDALImpl(val uid: String,

logInfo(s"RandomForestRegressorDALImpl compute took ${durationCompute} secs")

val ret = if (rank == 0) {
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}
val ret = if (isRoot) {
val convResultStartTime = System.nanoTime()
val predictionNumericTable = OneDAL.homogenTableToMatrix(
OneDAL.makeHomogenTable(result.getPredictionNumericTable))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.intel.oap.mllib.stat
import com.intel.oap.mllib.Utils.getOneCCLIPPort
import com.intel.oap.mllib.{CommonJob, OneCCL, OneDAL, Utils}
import com.intel.oneapi.dal.table.Common
import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{Matrix, Vector}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -83,7 +83,12 @@ class CorrelationDALImpl(

logInfo(s"CorrelationDAL compute took ${durationCompute} secs")

val ret = if (rank == 0) {
val isRoot = if (useDevice == "GPU") {
SparkEnv.get.executorId.toInt == 0
} else {
rank == 0
}
val ret = if (isRoot) {
val convResultStartTime = System.nanoTime()
val correlationNumericTable = if (useDevice == "GPU") {
OneDAL.homogenTableToMatrix(OneDAL.makeHomogenTable(result.getCorrelationNumericTable))
Expand Down
Loading
Loading