diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 1ac9f264a209..9d2d92a2fcdf 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -207,10 +207,26 @@ object VeloxBackendSettings extends BackendSettingsApi { // Only Parquet is needed for metadata validation so far. return None } + // Skip root paths that do not exist yet (e.g., during INSERT operations where + // the target directory may not be created yet). Do not filter out local + // (file://) paths here because metadata validation is independent of native + // file system registration and must still run for local paths in test envs. + val existingRootPaths = rootPaths.filter { + p => + try { + val path = new Path(p) + path.getFileSystem(hadoopConf).exists(path) + } catch { + case _: Exception => false + } + } + if (existingRootPaths.isEmpty) { + return None + } val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) ParquetMetadataUtils - .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit) + .validateMetadata(existingRootPaths, hadoopConf, parquetOptions, fileLimit) .map(reason => s"Detected unsupported metadata in parquet files: $reason") } diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala index 9864f42a9f3c..9bf7574efd3e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala @@ -52,7 +52,14 @@ object ParquetMetadataUtils extends Logging { if (!GlutenConfig.get.parquetMetadataValidationEnabled) { None } else { - rootPaths.foreach { + val samplePercentage = GlutenConfig.get.parquetMetadataFallbackSamplePercentage + val sampledPaths = sampleRootPaths(rootPaths, samplePercentage) + + logDebug(s"Parquet metadata validation: total rootPaths=${rootPaths.size}, " + + s"sampled=${sampledPaths.size}, samplePercentage=$samplePercentage, " + + s"fileLimit=$fileLimit") + + sampledPaths.foreach { rootPath => val fs = new Path(rootPath).getFileSystem(hadoopConf) try { @@ -75,6 +82,33 @@ object ParquetMetadataUtils extends Logging { } } + /** + * Samples root paths based on the given percentage. When the number of root paths is large, + * sampling reduces the cost of metadata validation significantly. + * + * The sampling strategy selects paths at evenly spaced intervals to ensure good coverage across + * different partitions. + * + * @param rootPaths + * All root paths to sample from + * @param samplePercentage + * Percentage of paths to sample, in range (0, 1.0] + * @return + * Sampled subset of root paths + */ + private def sampleRootPaths(rootPaths: Seq[String], samplePercentage: Double): Seq[String] = { + if (samplePercentage >= 1.0 || rootPaths.size <= 1) { + return rootPaths + } + val sampleCount = math.max(1, math.ceil(rootPaths.size * samplePercentage).toInt) + if (sampleCount >= rootPaths.size) { + return rootPaths + } + // Use evenly spaced interval sampling for better coverage across partitions + val step = rootPaths.size.toDouble / sampleCount + (0 until sampleCount).map(i => rootPaths((i * step).toInt)) + } + def validateCodec(footer: ParquetMetadata): Option[String] = { val blocks = footer.getBlocks if (blocks.isEmpty) { diff --git a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala index 1a303ad12078..cb2658cb561a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala @@ -162,4 +162,72 @@ class ParquetEncryptionDetectionSuite extends SharedSparkSession { assertFalse(isFileEncrypted(filePath)) } } + + test("Metadata validation with sampling - encrypted file detected in sampled paths") { + withTempDir { + tempDir => + // Create multiple subdirectories simulating partitions + val numPartitions = 20 + val encryptedPartitionIndex = 5 + val paths = (0 until numPartitions).map { + i => + val partDir = s"${tempDir.getAbsolutePath}/partition=$i" + new java.io.File(partDir).mkdirs() + val filePath = s"$partDir/data.parquet" + if (i == encryptedPartitionIndex) { + val encryptionProps = FileEncryptionProperties + .builder(Base64.getDecoder.decode(masterKey)) + .withEncryptedColumns( + Map( + ColumnPath.get("name") -> ColumnEncryptionProperties + .builder(ColumnPath.get("name")) + .withKey(Base64.getDecoder.decode(columnKey)) + .build()).asJava) + .build() + writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> i, "name" -> "Test"))) + } else { + writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test"))) + } + partDir + } + + // With 100% sampling, the encrypted file should always be detected + withSQLConf( + GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> "1.0") { + val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), SQLConf.get) + val result = ParquetMetadataUtils + .validateMetadata(paths, new Configuration(), parquetOptions, 100) + assertTrue( + "Should detect encrypted file with 100% sampling", + result.isDefined) + } + } + } + + test("Metadata validation with sampling - all plain files pass validation") { + withTempDir { + tempDir => + // Create multiple subdirectories with only plain files + val numPartitions = 10 + val paths = (0 until numPartitions).map { + i => + val partDir = s"${tempDir.getAbsolutePath}/partition=$i" + new java.io.File(partDir).mkdirs() + val filePath = s"$partDir/data.parquet" + writeParquet(filePath, None, Seq(Map("id" -> i, "name" -> "Test"))) + partDir + } + + // With any sampling percentage, all plain files should pass + withSQLConf( + GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE.key -> "0.3") { + val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), SQLConf.get) + val result = ParquetMetadataUtils + .validateMetadata(paths, new Configuration(), parquetOptions, 100) + assertFalse( + "Should pass validation when all files are plain", + result.isDefined) + } + } + } } diff --git a/docs/Configuration.md b/docs/Configuration.md index b32941150554..294e6c010ff0 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -120,6 +120,7 @@ nav_order: 15 | spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. | | spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. | | spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. | | spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. | | spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. | | spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader | diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 2f8155ce70ee..faa0da4784d3 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -379,6 +379,10 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT) } + def parquetMetadataFallbackSamplePercentage: Double = { + getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE) + } + def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED) def enableColumnarCollectLimit: Boolean = getConf(COLUMNAR_COLLECT_LIMIT_ENABLED) def enableColumnarCollectTail: Boolean = getConf(COLUMNAR_COLLECT_TAIL_ENABLED) @@ -1573,6 +1577,16 @@ object GlutenConfig extends ConfigRegistry { .checkValue(_ > 0, s"must be positive.") .createWithDefault(10) + val PARQUET_UNEXPECTED_METADATA_FALLBACK_SAMPLE_PERCENTAGE = + buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage") + .doc("The percentage of root paths to sample for metadata validation when the number" + + " of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths" + + " (no sampling). A smaller value reduces validation cost for tables with many" + + " partitions.") + .doubleConf + .checkValue(v => v > 0 && v <= 1.0, "must be in range (0, 1.0].") + .createWithDefault(0.1) + val COLUMNAR_RANGE_ENABLED = buildConf("spark.gluten.sql.columnar.range") .doc("Enable or disable columnar range.")