diff --git a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala index 0587e8b07f7..6acabea9264 100644 --- a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala +++ b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala @@ -18,7 +18,7 @@ package org.apache.gluten.component import org.apache.gluten.backendsapi.velox.VeloxBackend import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.extension.{DeltaPostTransformRules, OffloadDeltaFilter, OffloadDeltaProject, OffloadDeltaScan} +import org.apache.gluten.extension.{DeltaCDFScanStrategy, DeltaPostTransformRules, OffloadDeltaFilter, OffloadDeltaProject, OffloadDeltaScan} import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.extension.columnar.validator.Validators import org.apache.gluten.extension.injector.Injector @@ -35,6 +35,8 @@ class VeloxDeltaComponent extends Component { } override def injectRules(injector: Injector): Unit = { + injector.spark.injectPlannerStrategy(DeltaCDFScanStrategy(_)) + val legacy = injector.gluten.legacy legacy.injectTransform { c => diff --git a/gluten-delta/src-delta23/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala b/gluten-delta/src-delta23/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala new file mode 100644 index 00000000000..30bdb75e99a --- /dev/null +++ b/gluten-delta/src-delta23/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.BatchCDFSchemaEndVersion +import org.apache.spark.sql.delta.commands.cdc.CDCReader + +object DeltaCDFRelationHelper { + def changesToBatchDF( + relation: CDCReader.DeltaCDFRelation, + spark: SparkSession): DataFrame = { + val deltaLog = relation.snapshotWithSchemaMode.snapshot.deltaLog + val latestVersion = deltaLog.update().version + val endingVersionForBatchSchema = + relation.endingVersion.map(v => latestVersion.min(v)).getOrElse(latestVersion) + val snapshotForBatchSchema = relation.snapshotWithSchemaMode.schemaMode match { + case BatchCDFSchemaEndVersion => deltaLog.getSnapshotAt(endingVersionForBatchSchema) + case _ => relation.snapshotWithSchemaMode.snapshot + } + val endVersion = relation.endingVersion.getOrElse(latestVersion) + + CDCReader.changesToBatchDF( + deltaLog, + relation.startingVersion.get, + endVersion, + spark, + readSchemaSnapshot = Some(snapshotForBatchSchema)) + } +} diff --git a/gluten-delta/src-delta24/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala b/gluten-delta/src-delta24/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala new file mode 100644 index 00000000000..30bdb75e99a --- /dev/null +++ b/gluten-delta/src-delta24/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.BatchCDFSchemaEndVersion +import org.apache.spark.sql.delta.commands.cdc.CDCReader + +object DeltaCDFRelationHelper { + def changesToBatchDF( + relation: CDCReader.DeltaCDFRelation, + spark: SparkSession): DataFrame = { + val deltaLog = relation.snapshotWithSchemaMode.snapshot.deltaLog + val latestVersion = deltaLog.update().version + val endingVersionForBatchSchema = + relation.endingVersion.map(v => latestVersion.min(v)).getOrElse(latestVersion) + val snapshotForBatchSchema = relation.snapshotWithSchemaMode.schemaMode match { + case BatchCDFSchemaEndVersion => deltaLog.getSnapshotAt(endingVersionForBatchSchema) + case _ => relation.snapshotWithSchemaMode.snapshot + } + val endVersion = relation.endingVersion.getOrElse(latestVersion) + + CDCReader.changesToBatchDF( + deltaLog, + relation.startingVersion.get, + endVersion, + spark, + readSchemaSnapshot = Some(snapshotForBatchSchema)) + } +} diff --git a/gluten-delta/src-delta33/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala b/gluten-delta/src-delta33/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala new file mode 100644 index 00000000000..30bdb75e99a --- /dev/null +++ b/gluten-delta/src-delta33/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.BatchCDFSchemaEndVersion +import org.apache.spark.sql.delta.commands.cdc.CDCReader + +object DeltaCDFRelationHelper { + def changesToBatchDF( + relation: CDCReader.DeltaCDFRelation, + spark: SparkSession): DataFrame = { + val deltaLog = relation.snapshotWithSchemaMode.snapshot.deltaLog + val latestVersion = deltaLog.update().version + val endingVersionForBatchSchema = + relation.endingVersion.map(v => latestVersion.min(v)).getOrElse(latestVersion) + val snapshotForBatchSchema = relation.snapshotWithSchemaMode.schemaMode match { + case BatchCDFSchemaEndVersion => deltaLog.getSnapshotAt(endingVersionForBatchSchema) + case _ => relation.snapshotWithSchemaMode.snapshot + } + val endVersion = relation.endingVersion.getOrElse(latestVersion) + + CDCReader.changesToBatchDF( + deltaLog, + relation.startingVersion.get, + endVersion, + spark, + readSchemaSnapshot = Some(snapshotForBatchSchema)) + } +} diff --git a/gluten-delta/src-delta40/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala b/gluten-delta/src-delta40/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala new file mode 100644 index 00000000000..f1f84cd1537 --- /dev/null +++ b/gluten-delta/src-delta40/main/scala/org/apache/gluten/extension/DeltaCDFRelationHelper.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.delta.BatchCDFSchemaEndVersion +import org.apache.spark.sql.delta.commands.cdc.CDCReader + +object DeltaCDFRelationHelper { + def changesToBatchDF( + relation: CDCReader.DeltaCDFRelation, + spark: SparkSession): DataFrame = { + val deltaLog = relation.snapshotWithSchemaMode.snapshot.deltaLog + val latestVersion = deltaLog.update(catalogTableOpt = relation.catalogTableOpt).version + val endingVersionForBatchSchema = + relation.endingVersion.map(v => latestVersion.min(v)).getOrElse(latestVersion) + val snapshotForBatchSchema = relation.snapshotWithSchemaMode.schemaMode match { + case BatchCDFSchemaEndVersion => + deltaLog.getSnapshotAt( + endingVersionForBatchSchema, + catalogTableOpt = relation.catalogTableOpt) + case _ => relation.snapshotWithSchemaMode.snapshot + } + val endVersion = relation.endingVersion.getOrElse(latestVersion) + + CDCReader.changesToBatchDF( + deltaLog, + relation.startingVersion.get, + endVersion, + spark, + readSchemaSnapshot = Some(snapshotForBatchSchema)) + } +} diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala index 1be03dd404a..ded79697c6d 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read.streaming.SparkDataStream +import org.apache.spark.sql.delta.files.TahoeRemoveFileIndex import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType @@ -57,16 +58,27 @@ case class DeltaScanTransformer( override protected def doValidateInternal(): ValidationResult = { if ( - requiredSchema.fields.exists( - _.name == "__delta_internal_is_row_deleted") || requiredSchema.fields.exists( - _.name == "__delta_internal_row_index") + requiredSchemaIncludesDeletionVectorColumns || + cdfRemoveFilesHaveDeletionVectors ) { - return ValidationResult.failed(s"Deletion vector is not supported in native.") + return ValidationResult.failed(DeltaScanTransformer.DELETION_VECTOR_UNSUPPORTED) } super.doValidateInternal() } + private def requiredSchemaIncludesDeletionVectorColumns: Boolean = { + requiredSchema.fields.exists( + _.name == "__delta_internal_is_row_deleted") || requiredSchema.fields.exists( + _.name == "__delta_internal_row_index") + } + + private def cdfRemoveFilesHaveDeletionVectors: Boolean = relation.location match { + case index: TahoeRemoveFileIndex => + index.filesByVersion.exists(_.actions.exists(_.deletionVector != null)) + case _ => false + } + override def doCanonicalize(): DeltaScanTransformer = { DeltaScanTransformer( relation, @@ -91,6 +103,8 @@ case class DeltaScanTransformer( object DeltaScanTransformer { + val DELETION_VECTOR_UNSUPPORTED = "Deletion vector is not supported in native." + def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = { new DeltaScanTransformer( scanExec.relation, diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaCDFScanStrategy.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaCDFScanStrategy.scala new file mode 100644 index 00000000000..067bd8da349 --- /dev/null +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaCDFScanStrategy.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.apache.spark.sql.execution.datasources.LogicalRelation + +case class DeltaCDFScanStrategy(spark: SparkSession) extends SparkStrategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projects, filters, relation: LogicalRelation) => + relation.relation match { + case cdfRelation: CDCReader.DeltaCDFRelation => + planCDFRelation(relation, cdfRelation, projects, filters).map(planLater).toSeq + case _ => Nil + } + case _ => Nil + } + + private def planCDFRelation( + relation: LogicalRelation, + cdfRelation: CDCReader.DeltaCDFRelation, + projects: Seq[NamedExpression], + filters: Seq[Expression]): Option[LogicalPlan] = { + if (cdfRelation.startingVersion.isEmpty) { + return Some(projectAndFilter(LocalRelation(relation.output), projects, filters)) + } + + val cdfPlan = + DeltaCDFRelationHelper.changesToBatchDF(cdfRelation, spark).queryExecution.analyzed + val cdfOutput = cdfPlan.output + val rewrittenFilters = filters.map(rewriteExpression(_, cdfOutput)) + val rewrittenProjects = projects.map(rewriteProject(_, cdfOutput)) + Some(projectAndFilter(cdfPlan, rewrittenProjects, rewrittenFilters)) + } + + private def projectAndFilter( + child: LogicalPlan, + projects: Seq[NamedExpression], + filters: Seq[Expression]): LogicalPlan = { + val filtered = filters.reduceOption(org.apache.spark.sql.catalyst.expressions.And) match { + case Some(condition) => Filter(condition, child) + case None => child + } + Project(projects, filtered) + } + + private def rewriteProject( + project: NamedExpression, + cdfOutput: Seq[Attribute]): NamedExpression = { + project match { + case attr: AttributeReference => + Alias( + resolveCDFAttribute(attr, cdfOutput), + attr.name)( + exprId = attr.exprId, + qualifier = attr.qualifier, + explicitMetadata = Some(attr.metadata)) + case other => + rewriteExpression(other, cdfOutput).asInstanceOf[NamedExpression] + } + } + + private def rewriteExpression(expr: Expression, cdfOutput: Seq[Attribute]): Expression = { + expr.transform { + case attr: AttributeReference => resolveCDFAttribute(attr, cdfOutput) + } + } + + private def resolveCDFAttribute( + attr: AttributeReference, + cdfOutput: Seq[Attribute]): Attribute = { + cdfOutput + .find(output => spark.sessionState.conf.resolver(output.name, attr.name)) + .getOrElse( + throw new IllegalArgumentException( + s"Unable to resolve CDF attribute ${attr.name} from " + + s"${cdfOutput.map(_.name).mkString("[", ", ", "]")}")) + } +} diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 031bf460347..873ef629404 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -17,7 +17,7 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types._ import scala.collection.JavaConverters._ @@ -146,6 +146,175 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } + testWithMinSparkVersion("delta: change data feed read", "3.2") { + withTable("delta_cdf") { + spark.sql(s""" + |create table delta_cdf (id int, name string) using delta + |tblproperties ("delta.enableChangeDataFeed" = "true") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cdf values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |update delta_cdf set name = "v2_updated" where id = 2 + |""".stripMargin) + spark.sql(s""" + |delete from delta_cdf where id = 1 + |""".stripMargin) + + val tableChangesFromZeroDF = runAndCompare( + s""" + |select id, name, _change_type, _commit_version + |from table_changes('delta_cdf', 0) + |order by _commit_version, id, name, _change_type + |""".stripMargin) + checkCDFRead(tableChangesFromZeroDF) + + val tableChangesDF = runAndCompare( + s""" + |select id, name, _change_type, _commit_version + |from table_changes('delta_cdf', 1) + |order by _commit_version, id, name, _change_type + |""".stripMargin) + checkCDFRead(tableChangesDF) + + val filteredCDF = runAndCompare( + s""" + |select id, name, _change_type, _commit_version + |from table_changes('delta_cdf', 1) + |where _commit_version = 2 and id = 2 + |order by name, _change_type + |""".stripMargin) + checkCDFRead( + filteredCDF, + Seq( + Row(2, "v2", "update_preimage", 2L), + Row(2, "v2_updated", "update_postimage", 2L))) + + val boundedCDF = runAndCompare( + s""" + |select id, name, _change_type, _commit_version + |from table_changes('delta_cdf', 1, 2) + |order by _commit_version, id, name, _change_type + |""".stripMargin) + checkCDFRead( + boundedCDF, + Seq( + Row(1, "v1", "insert", 1L), + Row(2, "v2", "insert", 1L), + Row(2, "v2", "update_preimage", 2L), + Row(2, "v2_updated", "update_postimage", 2L))) + + val readChangeFeedDF = compareCDFDataFrame( + () => + spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", "1") + .table("delta_cdf") + .selectExpr("id", "name", "_change_type", "_commit_version") + .orderBy("_commit_version", "id", "name", "_change_type")) + checkCDFRead(readChangeFeedDF) + } + } + + testWithMinSparkVersion("delta: change data feed read with column mapping", "3.2") { + withTable("delta_cdf_cm") { + spark.sql(s""" + |create table delta_cdf_cm (id int, name string) using delta + |tblproperties ( + | "delta.enableChangeDataFeed" = "true", + | "delta.columnMapping.mode" = "name") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cdf_cm values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |update delta_cdf_cm set name = "v2_updated" where id = 2 + |""".stripMargin) + spark.sql(s""" + |delete from delta_cdf_cm where id = 1 + |""".stripMargin) + + val df = runAndCompare( + s""" + |select id, name, _change_type, _commit_version + |from table_changes('delta_cdf_cm', 1) + |order by _commit_version, id, name, _change_type + |""".stripMargin) + checkCDFRead(df) + } + } + + testWithMinSparkVersion("delta: change data feed read with deletion vectors", "3.4") { + withTable("delta_cdf_dv") { + spark.sql(s""" + |create table delta_cdf_dv (id int, name string) using delta + |tblproperties ( + | "delta.enableChangeDataFeed" = "true", + | "delta.enableDeletionVectors" = "true") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cdf_dv values (1, "v1"), (2, "v2"), (3, "v3") + |""".stripMargin) + spark.sql(s""" + |delete from delta_cdf_dv where id = 2 + |""".stripMargin) + + // Native DV scan support is separate; this regression locks down CDF correctness. + import org.apache.spark.sql.execution.GlutenImplicits._ + val df = runAndCompare( + s""" + |select id, name, _change_type + |from table_changes('delta_cdf_dv', 0) + |order by id, name, _change_type + |""".stripMargin) + assert( + df.fallbackSummary.fallbackNodeToReason + .flatMap(_.values) + .exists(_.contains(DeltaScanTransformer.DELETION_VECTOR_UNSUPPORTED))) + checkAnswer( + df, + Seq( + Row(1, "v1", "insert"), + Row(2, "v2", "delete"), + Row(2, "v2", "insert"), + Row(3, "v3", "insert"))) + } + } + + private def compareCDFDataFrame(dataframe: () => DataFrame): DataFrame = { + var expected: Seq[Row] = null + withSQLConf(vanillaSparkConfs(): _*) { + expected = dataframe().collect() + } + val df = dataframe() + checkAnswer(df, expected) + df + } + + private def checkCDFRead( + df: DataFrame, + expectedRows: Seq[Row] = allCDFRows): Unit = { + // Delta CDF expansion can keep a Spark-side branch for synthesized change rows; this PR + // verifies the Delta file scans in the expanded plan are transformed. + checkLengthAndPlan(df, expectedRows.length) + checkAnswer( + df, + expectedRows) + assert( + collect(df.queryExecution.executedPlan) { case _: DeltaScanTransformer => true }.nonEmpty, + df.queryExecution.executedPlan) + } + + private def allCDFRows: Seq[Row] = + Seq( + Row(1, "v1", "insert", 1L), + Row(2, "v2", "insert", 1L), + Row(2, "v2", "update_preimage", 2L), + Row(2, "v2_updated", "update_postimage", 2L), + Row(1, "v1", "delete", 3L)) + testWithMinSparkVersion("column mapping with complex type", "3.2") { withTable("t1") { val simpleNestedSchema = new StructType()