diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala index 9c63e091eaf51..b2101f0cf566a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._ import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLevelOperations, TruncatableTable} import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -40,11 +40,11 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case d @ DeleteFromTable(aliasedTable, cond) if d.resolved => EliminateSubqueryAliases(aliasedTable) match { - case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral => + case DataSourceV2Table(_: TruncatableTable) if cond == TrueLiteral => // don't rewrite as the table supports truncation d - case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) => + case r @ DataSourceV2Table(t: SupportsRowLevelOperations) => val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty()) table.operation match { case _: SupportsDelta => @@ -53,7 +53,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand { buildReplaceDataPlan(r, table, cond) } - case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) => + case DataSourceV2Table(_: SupportsDeleteV2) => // don't rewrite as the table supports deletes only with filters d diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 9e67aa156fa21..493ad7abdcfe8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table} import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -125,7 +125,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution => EliminateSubqueryAliases(aliasedTable) match { - case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => + case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) => validateMergeIntoConditions(m) val table = buildOperationTable(tbl, MERGE, CaseInsensitiveStringMap.empty()) table.operation match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala index b2955ca006878..169b10b2ed745 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._ import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table} import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -40,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand { if u.resolved && u.rewritable && u.aligned => EliminateSubqueryAliases(aliasedTable) match { - case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => + case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) => val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty()) val updateCond = cond.getOrElse(TrueLiteral) table.operation match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 54a4e75c90c95..62352fd8feb3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, DataSourceV2Table} import org.apache.spark.sql.internal.SQLConf trait OperationHelper extends AliasHelper with PredicateHelper { @@ -436,8 +436,7 @@ object GroupBasedRowLevelOperation { type ReturnType = (ReplaceData, Expression, Option[Expression], LogicalPlan) def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), - cond, query, _, _, groupFilterCond, _) => + case rd @ ReplaceData(DataSourceV2Table(table), cond, query, _, _, groupFilterCond, _) => // group-based UPDATEs that are rewritten as UNION read the table twice val allowMultipleReads = rd.operation.command == UPDATE val readRelation = findReadRelation(table, query, allowMultipleReads) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index a5cba44aac6a5..db8ff72a5955d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowL import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE} import org.apache.spark.sql.errors.DataTypeErrors.toSQLType import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.util.ArrayImplicits._ @@ -263,7 +263,7 @@ case class ReplaceData( lazy val operation: RowLevelOperation = { EliminateSubqueryAliases(table) match { - case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) => + case DataSourceV2Table(RowLevelOperationTable(_, operation)) => operation case _ => throw new AnalysisException( @@ -345,7 +345,7 @@ case class WriteDelta( lazy val operation: SupportsDelta = { EliminateSubqueryAliases(table) match { - case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) => + case DataSourceV2Table(RowLevelOperationTable(_, operation)) => operation.asInstanceOf[SupportsDelta] case _ => throw new AnalysisException( @@ -834,7 +834,7 @@ case class UpdateTable( lazy val rewritable: Boolean = { EliminateSubqueryAliases(table) match { - case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true + case DataSourceV2Table(_: SupportsRowLevelOperations) => true case _ => false } } @@ -878,7 +878,7 @@ case class MergeIntoTable( lazy val rewritable: Boolean = { EliminateSubqueryAliases(targetTable) match { - case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true + case DataSourceV2Table(_: SupportsRowLevelOperations) => true case _ => false } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 26f4069994943..9076e877c41ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -231,6 +231,10 @@ case class StreamingDataSourceV2ScanRelation( override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator } +object DataSourceV2Table { + def unapply(relation: DataSourceV2Relation): Option[Table] = Some(relation.table) +} + object DataSourceV2Relation { def create( table: Table, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index b8ffa09dfa05c..c45fb896ad7d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -60,7 +60,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, DataSourceV2Table, FileTable} import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.internal.SQLConf @@ -1733,8 +1733,7 @@ class Dataset[T] private[sql]( fr.inputFiles case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray - case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _), - _, _, _, _) => + case DataSourceV2ScanRelation(DataSourceV2Table(table: FileTable), _, _, _, _) => table.fileIndex.inputFiles }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index a8292a8dbaa3b..e1fac61cfe55b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table, FileTable} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -431,7 +431,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { case _ => false } - case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) => + case DataSourceV2Table(fileTable: FileTable) => refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath) case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala index 8c7203bca625f..8783c60557e30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala @@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Table, FileTable} /** * Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]]. @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement( - d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, _) => + d @ DataSourceV2Table(table: FileTable), _, _, _, _, _, _) => val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance() val relation = HadoopFsRelation( table.fileIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b07e0442d4f01..34ab2ded62f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -264,7 +264,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat invalidateCache) :: Nil } - case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _, + case AppendData(r @ DataSourceV2Table(v1: SupportsWrite), _, _, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) => write match { case v1Write: V1Write => @@ -278,7 +278,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) => AppendDataExec(planLater(query), refreshCache(r), write) :: Nil - case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _, + case OverwriteByExpression(r @ DataSourceV2Table(v1: SupportsWrite), _, _, _, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) => write match { case v1Write: V1Write => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 324fe148592a0..796b3b8d63926 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimeFor import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Table, FileTable} import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -1498,7 +1498,7 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite { (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) => + case DataSourceV2Table(fileTable: FileTable) => assert(fileTable.fileIndex.partitionSpec() === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a matching DataSourceV2Relation, but got:\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 067b0ca285d54..75f8c0ef23f8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable} +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Table, FileScan, FileTable} import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol import org.apache.spark.sql.execution.streaming.runtime._ import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink, FileStreamSinkLog, SinkFileStatus} @@ -776,7 +776,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite { // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // been inferred val table = df.queryExecution.analyzed.collect { - case DataSourceV2Relation(table: FileTable, _, _, _, _) => table + case DataSourceV2Table(table: FileTable) => table } assert(table.size === 1) assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])