Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

import java.io.File

/**
* Reproduction for the Parquet row-group sizing bug behind Delta's
* `DeletionVectorsWithPredicatePushdownSuite` abort.
*
* That suite sets `parquet.block.size = 2MB` on `spark.sparkContext.hadoopConfiguration`, writes a
* 1M-row Delta table, and asserts the resulting data file has more than one row group. Under Gluten
* the data file has a single row group, so `beforeAll` throws and the whole suite aborts.
*
* VeloxParquetRowGroupSuite already showed that a *plain* Parquet write (`INSERT OVERWRITE
* DIRECTORY ... USING PARQUET`) DOES honor `parquet.block.size` from that same runtime Hadoop conf
* (multiple row groups). These tests isolate the remaining difference -- the *Delta* write path --
* using the same conf source and data. They assert the desired behavior (> 1 row group) and are
* expected to FAIL today, reproducing the Delta abort; they should turn green once the Hadoop-conf
* block size is plumbed through the Delta write path to the native writer. The deletion-vectors
* variant matches the original suite (which enables DVs on the table); the initial write is
* identical with or without DVs, so both cases pin down the write path rather than DVs or
* checkpoints.
*/
class VeloxDeltaParquetRowGroupSuite extends DeltaSuite {

override protected def sparkConf: SparkConf =
super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")

// ~8MB of int64 (uncompressed): far above the 1MB block size used here, so a writer that honors
// the block size produces several row groups.
private val numRows: Long = 1000000L
private val smallBlockSize: Long = 1L * 1024 * 1024 // 1MB

private def listParquetFiles(dir: File): Seq[File] = {
val entries = Option(dir.listFiles()).getOrElse(Array.empty[File])
entries.filter(f => f.isFile && f.getName.endsWith(".parquet")).toSeq ++
entries.filter(_.isDirectory).flatMap(listParquetFiles)
}

private def rowGroupCount(dir: File): Int = {
val parquetFiles = listParquetFiles(dir)
assert(parquetFiles.nonEmpty, s"no parquet file written under ${dir.getAbsolutePath}")
parquetFiles.map {
file =>
val in = HadoopInputFile
.fromPath(new Path(file.getAbsolutePath), spark.sessionState.newHadoopConf())
val reader = ParquetFileReader.open(in)
try {
reader.getFooter.getBlocks.size().toInt
} finally {
reader.close()
}
}.sum
}

private def writeDelta(path: String, enableDeletionVectors: Boolean): Unit = {
// scalastyle:off hadoopconfiguration
// Mirror Delta's `hadoopConf().set("parquet.block.size", ...)`, which resolves to exactly this
// conf object in DeletionVectorsWithPredicatePushdownSuite.
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
hadoopConf.set("parquet.block.size", smallBlockSize.toString)
try {
val writer = spark.range(0, numRows, 1, 1).toDF("id").write.format("delta")
val withOptions =
if (enableDeletionVectors) writer.option("delta.enableDeletionVectors", "true") else writer
withOptions.save(path)
} finally {
hadoopConf.unset("parquet.block.size")
}
}

test("delta write should respect parquet.block.size set on the runtime Hadoop conf") {
withTempPath {
dir =>
writeDelta(dir.getCanonicalPath, enableDeletionVectors = false)
assert(rowGroupCount(dir) > 1)
}
}

test(
"delta write with deletion vectors should respect parquet.block.size on the runtime Hadoop " +
"conf") {
withTempPath {
dir =>
writeDelta(dir.getCanonicalPath, enableDeletionVectors = true)
assert(rowGroupCount(dir) > 1)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.VeloxWholeStageTransformerSuite

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

import java.io.File

/**
* Validates which configuration channel actually controls the Parquet row-group size of Gluten's
* native (Velox) writer.
*
* Background: Delta's `DeletionVectorsWithPredicatePushdownSuite.beforeAll` writes a 1M-row table
* with `hadoopConf().set("parquet.block.size", 2MB)` and asserts the resulting file has more than
* one Parquet row group. Under Gluten the file has a single row group, so `beforeAll` throws and
* the whole suite aborts. These tests pin down why: the native writer flushes a row group when the
* accumulated uncompressed size reaches `maxRowGroupBytes` (default 128MB, from
* `parquet.block.size`) or the row count reaches `maxRowGroupRows` (default 100M). ~8MB of `int64`
* data is far below the 128MB default (one row group) but far above the 1MB block size used here
* (several row groups). For a plain Parquet write the runtime Hadoop-conf block size DOES reach the
* native writer (all three tests below pass); the Delta abort is instead specific to the Delta
* write path and is reproduced in VeloxDeltaParquetRowGroupSuite.
*/
class VeloxParquetRowGroupSuite extends VeloxWholeStageTransformerSuite with WriteUtils {

override protected val resourcePath: String = ""
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf =
super.sparkConf.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")

// ~8MB of int64 data: << the 128MB default block size, >> the 1MB block size used below.
private val numRows: Long = 1000000L
private val smallBlockSize: Long = 1L * 1024 * 1024 // 1MB
private val glutenBlockSizeKey = "spark.gluten.sql.columnar.parquet.write.blockSize"

private def rowGroupCount(dir: File): Int = {
val parquetFiles =
Option(dir.listFiles((_, name) => name.endsWith(".parquet"))).getOrElse(Array.empty[File])
assert(parquetFiles.nonEmpty, s"no parquet file written under ${dir.getAbsolutePath}")
parquetFiles.map {
file =>
val in = HadoopInputFile
.fromPath(new Path(file.getAbsolutePath), spark.sessionState.newHadoopConf())
Utils.tryWithResource(ParquetFileReader.open(in))(_.getFooter.getBlocks.size())
}.sum
}

// Writes `numRows` longs to `path` via the native Velox writer (checkNativeWrite asserts the plan
// actually offloads to ColumnarWriteFilesExec, so we are exercising the native writer).
private def writeRangeNatively(path: String): Unit = {
withTempView("velox_rowgroup_src") {
spark.range(0, numRows, 1, 1).toDF("id").createOrReplaceTempView("velox_rowgroup_src")
checkNativeWrite(
s"INSERT OVERWRITE DIRECTORY '$path' USING PARQUET SELECT * FROM velox_rowgroup_src")
}
}

test("native writer emits a single row group at the default (128MB) block size") {
withTempPath {
f =>
writeRangeNatively(f.getCanonicalPath)
// Exactly what DeletionVectorsWithPredicatePushdownSuite hits: ~8MB under the 128MB default
// is one row group, so its `> 1 row group` assert fails and the suite aborts.
assert(rowGroupCount(f) === 1)
}
}

test("native writer honors the Gluten block-size conf and emits multiple row groups") {
withTempPath {
f =>
withSQLConf(glutenBlockSizeKey -> smallBlockSize.toString) {
writeRangeNatively(f.getCanonicalPath)
}
// The Gluten conf is read directly by the native writer, so the 1MB block size takes effect
// and the file is split into several row groups.
assert(rowGroupCount(f) > 1)
}
}

// Control: a plain Parquet write DOES honor `parquet.block.size` set on the runtime Hadoop conf,
// so this passes (multiple row groups). The Delta `beforeAll` abort is therefore NOT a generic
// native-writer problem -- it is specific to the Delta write path, reproduced separately in
// VeloxDeltaParquetRowGroupSuite.
test("native writer should respect parquet.block.size set on the runtime Hadoop conf") {
// scalastyle:off hadoopconfiguration
withTempPath {
f =>
// Mirror Delta's `hadoopConf().set("parquet.block.size", ...)`: mutate the runtime Hadoop
// conf the write actually reads, rather than a session-creation `spark.hadoop.*` setting.
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.set("parquet.block.size", smallBlockSize.toString)
try {
writeRangeNatively(f.getCanonicalPath)
} finally {
hadoopConf.unset("parquet.block.size")
}
assert(rowGroupCount(f) > 1)
}
// scalastyle:on hadoopconfiguration
}
}
Loading