Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
// Only Parquet is needed for metadata validation so far.
return None
}
// Skip root paths that do not exist yet (e.g., during INSERT operations where
// the target directory may not be created yet). Do not filter out local
// (file://) paths here because metadata validation is independent of native
// file system registration and must still run for local paths in test envs.
val existingRootPaths = rootPaths.filter {
p =>
try {
val path = new Path(p)
path.getFileSystem(hadoopConf).exists(path)
} catch {
case _: Exception => false
}
}
if (existingRootPaths.isEmpty) {
return None
}
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
ParquetMetadataUtils
.validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
.validateMetadata(existingRootPaths, hadoopConf, parquetOptions, fileLimit)
.map(reason => s"Detected unsupported metadata in parquet files: $reason")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,14 @@ object ParquetMetadataUtils extends Logging {
if (!GlutenConfig.get.parquetMetadataValidationEnabled) {
None
} else {
rootPaths.foreach {
val samplePercentage = GlutenConfig.get.parquetMetadataFallbackSamplePercentage
val sampledPaths = sampleRootPaths(rootPaths, samplePercentage)

logDebug(s"Parquet metadata validation: total rootPaths=${rootPaths.size}, " +
s"sampled=${sampledPaths.size}, samplePercentage=$samplePercentage, " +
s"fileLimit=$fileLimit")

sampledPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
Expand All @@ -75,6 +82,33 @@ object ParquetMetadataUtils extends Logging {
}
}

/**
* Samples root paths based on the given percentage. When the number of root paths is large,
* sampling reduces the cost of metadata validation significantly.
*
* The sampling strategy selects paths at evenly spaced intervals to ensure good coverage across
* different partitions.
*
* @param rootPaths
* All root paths to sample from
* @param samplePercentage
* Percentage of paths to sample, in range (0, 1.0]
* @return
* Sampled subset of root paths
*/
private def sampleRootPaths(rootPaths: Seq[String], samplePercentage: Double): Seq[String] = {
if (samplePercentage >= 1.0 || rootPaths.size <= 1) {
return rootPaths
}
val sampleCount = math.max(1, math.ceil(rootPaths.size * samplePercentage).toInt)
if (sampleCount >= rootPaths.size) {
return rootPaths
}
// Use evenly spaced interval sampling for better coverage across partitions
val step = rootPaths.size.toDouble / sampleCount
(0 until sampleCount).map(i => rootPaths((i * step).toInt))
}

def validateCodec(footer: ParquetMetadata): Option[String] = {
val blocks = footer.getBlocks
if (blocks.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,72 @@ class ParquetEncryptionDetectionSuite extends SharedSparkSession {
assertFalse(isFileEncrypted(filePath))
}
}

test("Metadata validation with sampling - encrypted file detected in sampled paths") {
withTempDir {
tempDir =>
// Create multiple subdirectories simulating partitions
val numPartitions = 20
val encryptedPartitionIndex = 5
val paths = (0 until numPartitions).map {
i =>
val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
new java.io.File(partDir).mkdirs()
val filePath = s"$partDir/data.parquet"
if (i == encryptedPartitionIndex) {
val encryptionProps = FileEncryptionProperties
.builder(Base64.getDecoder.decode(masterKey))
.withEncryptedColumns(
Map(
ColumnPath.get("name") -> ColumnEncryptionProperties
.builder(ColumnPath.get("name"))
.withKey(Base64.getDecoder.decode(columnKey))
.build()).asJava)
.build()
writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> i, "name" -> "Test")))
} else {
writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test")))
}
partDir
}

// With 100% sampling, the encrypted file should always be detected
withSQLConf(
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> "1.0") {
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), SQLConf.get)
val result = ParquetMetadataUtils
.validateMetadata(paths, new Configuration(), parquetOptions, 100)
assertTrue(
"Should detect encrypted file with 100% sampling",
result.isDefined)
}
}
}

test("Metadata validation with sampling - all plain files pass validation") {
withTempDir {
tempDir =>
// Create multiple subdirectories with only plain files
val numPartitions = 10
val paths = (0 until numPartitions).map {
i =>
val partDir = s"${tempDir.getAbsolutePath}/partition=$i"
new java.io.File(partDir).mkdirs()
val filePath = s"$partDir/data.parquet"
writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test")))
partDir
}

// With any sampling percentage, all plain files should pass
withSQLConf(
GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> "0.3") {
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), SQLConf.get)
val result = ParquetMetadataUtils
.validateMetadata(paths, new Configuration(), parquetOptions, 100)
assertFalse(
"Should pass validation when all files are plain",
result.isDefined)
}
}
}
}
1 change: 1 addition & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ nav_order: 15
| spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. |
| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. |
| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. |
| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. |
| spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. |
| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. |
| spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT)
}

def parquetMetadataFallbackSamplePercentage: Double = {
getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE)
}

def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
def enableColumnarCollectLimit: Boolean = getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
def enableColumnarCollectTail: Boolean = getConf(COLUMNAR_COLLECT_TAIL_ENABLED)
Expand Down Expand Up @@ -1573,6 +1577,16 @@ object GlutenConfig extends ConfigRegistry {
.checkValue(_ > 0, s"must be positive.")
.createWithDefault(10)

val PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE =
buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage")
.doc("The percentage of root paths to sample for metadata validation when the number" +
" of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths" +
" (no sampling). A smaller value reduces validation cost for tables with many" +
" partitions.")
.doubleConf
.checkValue(v => v > 0 && v <= 1.0, "must be in range (0, 1.0].")
.createWithDefault(0.1)

val COLUMNAR_RANGE_ENABLED =
buildConf("spark.gluten.sql.columnar.range")
.doc("Enable or disable columnar range.")
Expand Down
Loading