Skip to content

feat: introduce hadoop mini cluster to test native scan on hdfs #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ datafusion-comet-proto = { workspace = true }
object_store = { workspace = true }
url = { workspace = true }
parking_lot = "0.12.3"
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true}
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disable try_spawn_blocking to avoid native thread hanging


[dev-dependencies]
pprof = { version = "0.14.0", features = ["flamegraph"] }
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ under the License.
<protobuf.version>3.25.5</protobuf.version>
<parquet.version>1.13.1</parquet.version>
<parquet.maven.scope>provided</parquet.maven.scope>
<hadoop.version>3.3.4</hadoop.version>
<arrow.version>16.0.0</arrow.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<spotless.version>2.43.0</spotless.version>
Expand Down Expand Up @@ -447,6 +448,13 @@ under the License.
<version>5.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this dependency instead? https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-minicluster/3.3.4
Not sure what the difference is as long as both allow us to spin up a miniDFSCluster

Copy link
Member Author

@wForget wForget Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that hadoop-client-minicluster has fewer dependencies, and it depends on hadoop-client-runtime which is a shaded hadoop client (to avoid introducing conflicts)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/hadoop/blob/trunk/hadoop-client-modules/hadoop-client-minicluster/pom.xml

hadoop-client-minicluster seems to be a fat jar of hadoop mini cluster, so is it more suitable as a dependency for testing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I did not know that. It doesn't matter which one we use then.

<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down
24 changes: 24 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,30 @@ under the License.
<artifactId>arrow-c-data</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
<exclusions>
<!-- hadoop clients are provided by spark -->
<exclusion>
<artifactId>hadoop-client-api</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-client-runtime</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>junit</artifactId>
<groupId>junit</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
109 changes: 109 additions & 0 deletions spark/src/test/scala/org/apache/comet/WithHdfsCluster.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet

import java.io.{File, FileWriter}
import java.net.InetAddress
import java.nio.file.Files
import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
import org.apache.spark.internal.Logging

/**
* Trait for starting and stopping a MiniDFSCluster for testing.
*
* Most copy from:
* https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
*/
trait WithHdfsCluster extends Logging {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most copy from kyuubi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave a comment that this was taken from kyuubi

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added


private var hadoopConfDir: File = _
private var hdfsCluster: MiniDFSCluster = _
private var hdfsConf: Configuration = _
private var tmpRootDir: Path = _
private var fileSystem: FileSystem = _

def startHdfsCluster(): Unit = {
hdfsConf = new Configuration()
// before HADOOP-18206 (3.4.0), HDFS MetricsLogger strongly depends on
// commons-logging, we should disable it explicitly, otherwise, it throws
// ClassNotFound: org.apache.commons.logging.impl.Log4JLogger
hdfsConf.set("dfs.namenode.metrics.logger.period.seconds", "0")
hdfsConf.set("dfs.datanode.metrics.logger.period.seconds", "0")
// Set bind host to localhost to avoid java.net.BindException
hdfsConf.setIfUnset("dfs.namenode.rpc-bind-host", "localhost")

hdfsCluster = new MiniDFSCluster.Builder(hdfsConf)
.checkDataNodeAddrConfig(true)
.checkDataNodeHostConfig(true)
.build()
logInfo(
"NameNode address in configuration is " +
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
hadoopConfDir =
Files.createTempDirectory(s"comet_hdfs_conf_${UUID.randomUUID().toString}").toFile
saveHadoopConf(hadoopConfDir)

fileSystem = hdfsCluster.getFileSystem
tmpRootDir = new Path("/tmp")
fileSystem.mkdirs(tmpRootDir)
}

def stopHdfsCluster(): Unit = {
if (hdfsCluster != null) hdfsCluster.shutdown(true)
if (hadoopConfDir != null) FileUtils.deleteDirectory(hadoopConfDir)
}

private def saveHadoopConf(hadoopConfDir: File): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
hdfsConf.iterator().asScala.foreach { kv =>
val key = kv.getKey
val value = kv.getValue.replaceAll(hostName, "localhost")
configToWrite.set(key, value)
}
val file = new File(hadoopConfDir, "core-site.xml")
val writer = new FileWriter(file)
configToWrite.writeXml(writer)
writer.close()
}

def getHadoopConf: Configuration = hdfsConf
def getDFSPort: Int = hdfsCluster.getNameNodePort
def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
def getHadoopConfFile: Path = new Path(hadoopConfDir.toURI.toURL.toString, "core-site.xml")
def getTmpRootDir: Path = tmpRootDir
def getFileSystem: FileSystem = fileSystem

def withTmpHdfsDir(tmpDir: Path => Unit): Unit = {
val tempPath = new Path(tmpRootDir, UUID.randomUUID().toString)
fileSystem.mkdirs(tempPath)
try tmpDir(tempPath)
finally fileSystem.delete(tempPath, true)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import java.io.File
import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.hadoop.fs.Path
import org.apache.spark.TestUtils
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector

import org.apache.comet.CometConf
import org.apache.comet.{CometConf, WithHdfsCluster}
import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
import org.apache.comet.parquet.BatchReader

Expand All @@ -40,7 +42,7 @@ import org.apache.comet.parquet.BatchReader
* benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark` Results will be written to
* "spark/benchmarks/CometReadBenchmark-**results.txt".
*/
object CometReadBenchmark extends CometBenchmarkBase {
class CometReadBaseBenchmark extends CometBenchmarkBase {

def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
// Benchmarks running through spark sql.
Expand Down Expand Up @@ -71,6 +73,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should enable COMET_EXEC_ENABLED as it will mix the scan benchmark and exec benchmark

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should enable COMET_EXEC_ENABLED as it will mix the scan benchmark and exec benchmark

It seems difficult to benchmark only scan anyway. If we disable exec conversion, it may introduce the performance loss of ColumnarToRow.

CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql(s"select $query from parquetV1Table").noop()
}
Expand All @@ -79,6 +82,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql(s"select $query from parquetV1Table").noop()
}
Expand Down Expand Up @@ -118,6 +122,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql("select sum(id) from parquetV1Table").noop()
}
Expand All @@ -126,6 +131,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql("select sum(id) from parquetV1Table").noop()
}
Expand Down Expand Up @@ -244,6 +250,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
}
Expand All @@ -252,6 +259,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
}
Expand Down Expand Up @@ -300,6 +308,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql("select sum(length(id)) from parquetV1Table").noop()
}
Expand All @@ -308,6 +317,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql("select sum(length(id)) from parquetV1Table").noop()
}
Expand Down Expand Up @@ -352,6 +362,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
Expand All @@ -363,6 +374,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
Expand Down Expand Up @@ -403,6 +415,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
}
Expand All @@ -411,6 +424,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
}
Expand Down Expand Up @@ -452,6 +466,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
Expand All @@ -460,6 +475,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
Expand Down Expand Up @@ -501,6 +517,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
Expand All @@ -509,6 +526,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
}
Expand Down Expand Up @@ -587,3 +605,51 @@ object CometReadBenchmark extends CometBenchmarkBase {
}
}
}

object CometReadBenchmark extends CometReadBaseBenchmark {}

object CometReadHdfsBenchmark extends CometReadBaseBenchmark with WithHdfsCluster {

override def getSparkSession: SparkSession = {
// start HDFS cluster and add hadoop conf
startHdfsCluster()
val sparkSession = super.getSparkSession
sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile)
sparkSession
}

override def runCometBenchmark(mainArgs: Array[String]): Unit = {
try {
super.runCometBenchmark(mainArgs)
} finally {
stopHdfsCluster()
}
}

override def readerBenchmark(values: Int, dataType: DataType): Unit = {
// ignore reader benchmark for HDFS
}

// mock local dir to hdfs
override protected def withTempPath(f: File => Unit): Unit = {
super.withTempPath { dir =>
val tempHdfsPath = new Path(getTmpRootDir, dir.getName)
getFileSystem.mkdirs(tempHdfsPath)
try f(dir)
finally getFileSystem.delete(tempHdfsPath, true)
}
}
override protected def prepareTable(
dir: File,
df: DataFrame,
partition: Option[String]): Unit = {
val testDf = if (partition.isDefined) {
df.write.partitionBy(partition.get)
} else {
df.write
}
val tempHdfsPath = getFileSystem.resolvePath(new Path(getTmpRootDir, dir.getName))
val parquetV1Path = new Path(tempHdfsPath, "parquetV1")
saveAsParquetV1Table(testDf, parquetV1Path.toString)
}
}
Loading