Skip to content

Commit c979a89

Browse files
authored
[Spark] Refactor out Delta read path (#4041)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Refactored out the following read path functionality: - V2 -> V1 relation conversions. - Table with partition filters resolution. This is a refactor-only change to support the single-pass Analyzer project in Spark: https://issues.apache.org/jira/browse/SPARK-49834. It will be used in single-pass resolver extensions: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverExtension.scala. ## How was this patch tested? Existing tests. ## Does this PR introduce _any_ user-facing changes? No.
1 parent b1139d8 commit c979a89

File tree

3 files changed

+71
-10
lines changed

3 files changed

+71
-10
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,12 +265,7 @@ class DeltaAnalysis(session: SparkSession)
265265
}
266266
DeltaDynamicPartitionOverwriteCommand(r, d, adjustedQuery, o.writeOptions, o.isByName)
267267

268-
// Pull out the partition filter that may be part of the FileIndex. This can happen when someone
269-
// queries a Delta table such as spark.read.format("delta").load("/some/table/partition=2")
270-
case l @ DeltaTable(index: TahoeLogFileIndex) if index.partitionFilters.nonEmpty =>
271-
Filter(
272-
index.partitionFilters.reduce(And),
273-
DeltaTableUtils.replaceFileIndex(l, index.copy(partitionFilters = Nil)))
268+
case ResolveDeltaTableWithPartitionFilters(plan) => plan
274269

275270
// SQL CDC table value functions "table_changes" and "table_changes_by_path"
276271
case stmt: CDCStatementBase if stmt.functionArgs.forall(_.resolved) =>
@@ -442,10 +437,7 @@ class DeltaAnalysis(session: SparkSession)
442437

443438
case d: DescribeDeltaHistory if d.childrenResolved => d.toCommand
444439

445-
// This rule falls back to V1 nodes, since we don't have a V2 reader for Delta right now
446-
case dsv2 @ DataSourceV2Relation(d: DeltaTableV2, _, _, _, options)
447-
if dsv2.getTagValue(DeltaRelation.KEEP_AS_V2_RELATION_TAG).isEmpty =>
448-
DeltaRelation.fromV2Relation(d, dsv2, options)
440+
case FallbackToV1DeltaRelation(v1Relation) => v1Relation
449441

450442
case ResolvedTable(_, _, d: DeltaTableV2, _) if d.catalogTable.isEmpty && !d.tableExists =>
451443
// This is DDL on a path based table that doesn't exist. CREATE will not hit this path, most
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.delta.catalog.DeltaTableV2
20+
import org.apache.spark.sql.execution.datasources.LogicalRelation
21+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
22+
23+
/**
24+
* Fall back to V1 nodes, since we don't have a V2 reader for Delta right now
25+
*/
26+
object FallbackToV1DeltaRelation {
27+
def unapply(dsv2: DataSourceV2Relation): Option[LogicalRelation] = dsv2.table match {
28+
case d: DeltaTableV2 if dsv2.getTagValue(DeltaRelation.KEEP_AS_V2_RELATION_TAG).isEmpty =>
29+
Some(DeltaRelation.fromV2Relation(d, dsv2, dsv2.options))
30+
case _ => None
31+
}
32+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import org.apache.spark.sql.catalyst.expressions.And
20+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Filter}
21+
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
22+
23+
/**
24+
* Pull out the partition filter that may be part of the FileIndex. This can happen when someone
25+
* queries a Delta table such as spark.read.format("delta").load("/some/table/partition=2")
26+
*/
27+
object ResolveDeltaTableWithPartitionFilters {
28+
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
29+
case relation @ DeltaTable(index: TahoeLogFileIndex) if index.partitionFilters.nonEmpty =>
30+
val result = Filter(
31+
index.partitionFilters.reduce(And),
32+
DeltaTableUtils.replaceFileIndex(relation, index.copy(partitionFilters = Nil))
33+
)
34+
Some(result)
35+
case _ => None
36+
}
37+
}

0 commit comments

Comments
 (0)