@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Attribu
22
22
import org .apache .spark .sql .catalyst .expressions .Literal .{FalseLiteral , TrueLiteral }
23
23
import org .apache .spark .sql .catalyst .expressions .aggregate .AggregateExpression
24
24
import org .apache .spark .sql .catalyst .plans .{FullOuter , Inner , JoinType , LeftAnti , LeftOuter , RightOuter }
25
- import org .apache .spark .sql .catalyst .plans .logical .{AppendData , DeleteAction , Filter , HintInfo , InsertAction , Join , JoinHint , LogicalPlan , MergeAction , MergeIntoTable , MergeRows , NO_BROADCAST_AND_REPLICATION , Project , ReplaceData , UpdateAction , WriteDelta }
25
+ import org .apache .spark .sql .catalyst .plans .logical .{AppendData , DeleteAction , Filter , HintInfo , InsertAction , Join , JoinHint , LogicalPlan , MergeAction , MergeIntoTable , MergeRows , NO_BROADCAST_AND_REPLICATION , Project , ReplaceData , ResolvedHint , UpdateAction , WriteDelta }
26
26
import org .apache .spark .sql .catalyst .plans .logical .MergeRows .{Copy , Delete , Discard , Insert , Instruction , Keep , ROW_ID , Split , Update }
27
27
import org .apache .spark .sql .catalyst .util .RowDeltaUtils .{OPERATION_COLUMN , WRITE_OPERATION , WRITE_WITH_METADATA_OPERATION }
28
28
import org .apache .spark .sql .connector .catalog .SupportsRowLevelOperations
@@ -52,27 +52,11 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
52
52
EliminateSubqueryAliases (aliasedTable) match {
53
53
case r : DataSourceV2Relation =>
54
54
validateMergeIntoConditions(m)
55
+ buildAppendDataPlan(r, r, source, cond, notMatchedActions)
55
56
56
- // NOT MATCHED conditions may only refer to columns in source so they can be pushed down
57
- val insertAction = notMatchedActions.head.asInstanceOf [InsertAction ]
58
- val filteredSource = insertAction.condition match {
59
- case Some (insertCond) => Filter (insertCond, source)
60
- case None => source
61
- }
62
-
63
- // there is only one NOT MATCHED action, use a left anti join to remove any matching rows
64
- // and switch to using a regular append instead of a row-level MERGE operation
65
- // only unmatched source rows that match the condition are appended to the table
66
- val joinPlan = Join (filteredSource, r, LeftAnti , Some (cond), JoinHint .NONE )
67
-
68
- val output = insertAction.assignments.map(_.value)
69
- val outputColNames = r.output.map(_.name)
70
- val projectList = output.zip(outputColNames).map { case (expr, name) =>
71
- Alias (expr, name)()
72
- }
73
- val project = Project (projectList, joinPlan)
74
-
75
- AppendData .byPosition(r, project)
57
+ case h @ ResolvedHint (r : DataSourceV2Relation , _) =>
58
+ validateMergeIntoConditions(m)
59
+ buildAppendDataPlan(r, h, source, cond, notMatchedActions)
76
60
77
61
case _ =>
78
62
m
@@ -86,35 +70,11 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
86
70
EliminateSubqueryAliases (aliasedTable) match {
87
71
case r : DataSourceV2Relation =>
88
72
validateMergeIntoConditions(m)
73
+ buildAppendDataPlanForMultipleNotMatchedActions(r, r, source, cond, notMatchedActions)
89
74
90
- // there are only NOT MATCHED actions, use a left anti join to remove any matching rows
91
- // and switch to using a regular append instead of a row-level MERGE operation
92
- // only unmatched source rows that match action conditions are appended to the table
93
- val joinPlan = Join (source, r, LeftAnti , Some (cond), JoinHint .NONE )
94
-
95
- val notMatchedInstructions = notMatchedActions.map {
96
- case InsertAction (cond, assignments) =>
97
- Keep (Insert , cond.getOrElse(TrueLiteral ), assignments.map(_.value))
98
- case other =>
99
- throw new AnalysisException (
100
- errorClass = " _LEGACY_ERROR_TEMP_3053" ,
101
- messageParameters = Map (" other" -> other.toString))
102
- }
103
-
104
- val outputs = notMatchedInstructions.flatMap(_.outputs)
105
-
106
- // merge rows as there are multiple NOT MATCHED actions
107
- val mergeRows = MergeRows (
108
- isSourceRowPresent = TrueLiteral ,
109
- isTargetRowPresent = FalseLiteral ,
110
- matchedInstructions = Nil ,
111
- notMatchedInstructions = notMatchedInstructions,
112
- notMatchedBySourceInstructions = Nil ,
113
- checkCardinality = false ,
114
- output = generateExpandOutput(r.output, outputs),
115
- joinPlan)
116
-
117
- AppendData .byPosition(r, mergeRows)
75
+ case h @ ResolvedHint (r : DataSourceV2Relation , _) =>
76
+ validateMergeIntoConditions(m)
77
+ buildAppendDataPlanForMultipleNotMatchedActions(r, h, source, cond, notMatchedActions)
118
78
119
79
case _ =>
120
80
m
@@ -139,11 +99,92 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
139
99
notMatchedActions, notMatchedBySourceActions)
140
100
}
141
101
102
+ case h @ ResolvedHint (
103
+ r @ DataSourceV2Relation (tbl : SupportsRowLevelOperations , _, _, _, _), _) =>
104
+ validateMergeIntoConditions(m)
105
+ val table = buildOperationTable(tbl, MERGE , CaseInsensitiveStringMap .empty())
106
+ table.operation match {
107
+ case _ : SupportsDelta =>
108
+ buildWriteDeltaPlan(
109
+ r, table, source, cond, matchedActions,
110
+ notMatchedActions, notMatchedBySourceActions, Some (h))
111
+ case _ =>
112
+ buildReplaceDataPlan(
113
+ r, table, source, cond, matchedActions,
114
+ notMatchedActions, notMatchedBySourceActions, Some (h))
115
+ }
116
+
142
117
case _ =>
143
118
m
144
119
}
145
120
}
146
121
122
+ // build a rewrite plan for sources that support appending data
123
+ private def buildAppendDataPlan (
124
+ relation : DataSourceV2Relation ,
125
+ target : LogicalPlan ,
126
+ source : LogicalPlan ,
127
+ cond : Expression ,
128
+ notMatchedActions : Seq [MergeAction ]): AppendData = {
129
+ // NOT MATCHED conditions may only refer to columns in source so they can be pushed down
130
+ val insertAction = notMatchedActions.head.asInstanceOf [InsertAction ]
131
+ val filteredSource = insertAction.condition match {
132
+ case Some (insertCond) => Filter (insertCond, source)
133
+ case None => source
134
+ }
135
+
136
+ // there is only one NOT MATCHED action, use a left anti join to remove any matching rows
137
+ // and switch to using a regular append instead of a row-level MERGE operation
138
+ // only unmatched source rows that match the condition are appended to the table
139
+ val joinPlan = Join (filteredSource, target, LeftAnti , Some (cond), JoinHint .NONE )
140
+
141
+ val output = insertAction.assignments.map(_.value)
142
+ val outputColNames = relation.output.map(_.name)
143
+ val projectList = output.zip(outputColNames).map { case (expr, name) =>
144
+ Alias (expr, name)()
145
+ }
146
+ val project = Project (projectList, joinPlan)
147
+
148
+ AppendData .byPosition(relation, project)
149
+ }
150
+
151
+ // build a rewrite plan for sources that support appending data have multiple not matched actions
152
+ private def buildAppendDataPlanForMultipleNotMatchedActions (
153
+ relation : DataSourceV2Relation ,
154
+ target : LogicalPlan ,
155
+ source : LogicalPlan ,
156
+ cond : Expression ,
157
+ notMatchedActions : Seq [MergeAction ]): AppendData = {
158
+ // there are only NOT MATCHED actions, use a left anti join to remove any matching rows
159
+ // and switch to using a regular append instead of a row-level MERGE operation
160
+ // only unmatched source rows that match action conditions are appended to the table
161
+ val joinPlan = Join (source, target, LeftAnti , Some (cond), JoinHint .NONE )
162
+
163
+ val notMatchedInstructions = notMatchedActions.map {
164
+ case InsertAction (cond, assignments) =>
165
+ Keep (Insert , cond.getOrElse(TrueLiteral ), assignments.map(_.value))
166
+ case other =>
167
+ throw new AnalysisException (
168
+ errorClass = " _LEGACY_ERROR_TEMP_3053" ,
169
+ messageParameters = Map (" other" -> other.toString))
170
+ }
171
+
172
+ val outputs = notMatchedInstructions.flatMap(_.outputs)
173
+
174
+ // merge rows as there are multiple NOT MATCHED actions
175
+ val mergeRows = MergeRows (
176
+ isSourceRowPresent = TrueLiteral ,
177
+ isTargetRowPresent = FalseLiteral ,
178
+ matchedInstructions = Nil ,
179
+ notMatchedInstructions = notMatchedInstructions,
180
+ notMatchedBySourceInstructions = Nil ,
181
+ checkCardinality = false ,
182
+ output = generateExpandOutput(relation.output, outputs),
183
+ joinPlan)
184
+
185
+ AppendData .byPosition(relation, mergeRows)
186
+ }
187
+
147
188
// build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions)
148
189
private def buildReplaceDataPlan (
149
190
relation : DataSourceV2Relation ,
@@ -152,7 +193,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
152
193
cond : Expression ,
153
194
matchedActions : Seq [MergeAction ],
154
195
notMatchedActions : Seq [MergeAction ],
155
- notMatchedBySourceActions : Seq [MergeAction ]): ReplaceData = {
196
+ notMatchedBySourceActions : Seq [MergeAction ],
197
+ hintOption : Option [ResolvedHint ] = None ): ReplaceData = {
156
198
157
199
// resolve all required metadata attrs that may be used for grouping data on write
158
200
// for instance, JDBC data source may cluster data by shard/host before writing
@@ -161,12 +203,16 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
161
203
// construct a read relation and include all required metadata columns
162
204
val readRelation = buildRelationWithAttrs(relation, operationTable, metadataAttrs)
163
205
206
+ val target = hintOption.map { resolvedHint =>
207
+ resolvedHint.withNewChildren(Seq (readRelation))
208
+ }.getOrElse(readRelation)
209
+
164
210
val checkCardinality = shouldCheckCardinality(matchedActions)
165
211
166
212
// use left outer join if there is no NOT MATCHED action, unmatched source rows can be discarded
167
213
// use full outer join in all other cases, unmatched source rows may be needed
168
214
val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
169
- val joinPlan = join(readRelation , source, joinType, cond, checkCardinality)
215
+ val joinPlan = join(target , source, joinType, cond, checkCardinality)
170
216
171
217
val mergeRowsPlan = buildReplaceDataMergeRowsPlan(
172
218
readRelation, joinPlan, matchedActions, notMatchedActions,
@@ -260,7 +306,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
260
306
cond : Expression ,
261
307
matchedActions : Seq [MergeAction ],
262
308
notMatchedActions : Seq [MergeAction ],
263
- notMatchedBySourceActions : Seq [MergeAction ]): WriteDelta = {
309
+ notMatchedBySourceActions : Seq [MergeAction ],
310
+ hintOption : Option [ResolvedHint ] = None ): WriteDelta = {
264
311
265
312
val operation = operationTable.operation.asInstanceOf [SupportsDelta ]
266
313
@@ -279,11 +326,14 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
279
326
} else {
280
327
(readRelation, cond)
281
328
}
329
+ val target = hintOption.map { resolvedHint =>
330
+ resolvedHint.withNewChildren(Seq (filteredReadRelation))
331
+ }.getOrElse(filteredReadRelation)
282
332
283
333
val checkCardinality = shouldCheckCardinality(matchedActions)
284
334
285
335
val joinType = chooseWriteDeltaJoinType(notMatchedActions, notMatchedBySourceActions)
286
- val joinPlan = join(filteredReadRelation , source, joinType, joinCond, checkCardinality)
336
+ val joinPlan = join(target , source, joinType, joinCond, checkCardinality)
287
337
288
338
val mergeRowsPlan = buildWriteDeltaMergeRowsPlan(
289
339
readRelation, joinPlan, matchedActions, notMatchedActions,
0 commit comments