Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,24 +365,24 @@ case class ReplaceDataExec(
copy(query = newChild)
}

override protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
if (rowLevelCommand == DELETE) {
// DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't
// reach this node. We need to calculate this value as numScannedRows - numCopiedRows.
val numScannedRows = collectFirst(query) {
case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>
getMetricValue(b.metrics, "numOutputRows")
}
val numCopiedRows = getMetricValue(metrics, "numCopiedRows")
val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 0) {
numScannedRows.get - numCopiedRows
} else {
// One of the metrics couldn't be found, also mark numDeletedRows as not found.
-1L
}
metrics("numDeletedRows").set(numDeletedRows)
override protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
// DELETE ReplaceData plans filter out the deleted rows early in the plan, and they don't
// reach this node. We need to calculate this value as numScannedRows - numCopiedRows.
val numScannedRows = collectFirst(query) {
case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>
getMetricValue(b.metrics, "numOutputRows")
}
super.getWriteSummary(query)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: do we have variable shadowing in getWriteSummary? Does it accept query that's also a field in the method? Do we need to pass query?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we did shadow it. I removed the parameter.

val numCopiedRows = getMetricValue(sparkMetrics, "numCopiedRows")
val numDeletedRows = if (numScannedRows.exists(_ >= 0) && numCopiedRows >= 0) {
numScannedRows.get - numCopiedRows
} else {
// One of the metrics couldn't be found, also mark numDeletedRows as not found.
-1L
}

// SQLMetric.set is a no-op if value is -1, leaving the metric in its invalid state.
sparkMetrics("numDeletedRows").set(numDeletedRows)
super.getDeleteSummary().map(_.copy(numDeletedRows = numDeletedRows))
}
}

Expand Down Expand Up @@ -496,31 +496,40 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
metrics.get(name).map(_.value).getOrElse(-1L)
}

override protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = {
override protected def getWriteSummary(): Option[WriteSummary] = {
rowLevelCommand match {
case MERGE =>
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
val metrics = n.metrics
MergeSummaryImpl(
getMetricValue(metrics, "numTargetRowsCopied"),
getMetricValue(metrics, "numTargetRowsDeleted"),
getMetricValue(metrics, "numTargetRowsUpdated"),
getMetricValue(metrics, "numTargetRowsInserted"),
getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
}
case UPDATE =>
Some(UpdateSummaryImpl(
getMetricValue(sparkMetrics, "numUpdatedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
case DELETE =>
Some(DeleteSummaryImpl(
getMetricValue(sparkMetrics, "numDeletedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
case MERGE => getMergeSummary()
case UPDATE => getUpdateSummary()
case DELETE => getDeleteSummary()
}
}

protected def getMergeSummary(): Option[MergeSummaryImpl] = {
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
val metrics = n.metrics
MergeSummaryImpl(
getMetricValue(metrics, "numTargetRowsCopied"),
getMetricValue(metrics, "numTargetRowsDeleted"),
getMetricValue(metrics, "numTargetRowsUpdated"),
getMetricValue(metrics, "numTargetRowsInserted"),
getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
}
}

protected def getUpdateSummary(): Option[UpdateSummaryImpl] = {
Some(UpdateSummaryImpl(
getMetricValue(sparkMetrics, "numUpdatedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
}

protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
Some(DeleteSummaryImpl(
getMetricValue(sparkMetrics, "numDeletedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
}
}

/**
Expand Down Expand Up @@ -582,7 +591,7 @@ trait V2TableWriteExec
}
)

val writeSummary = getWriteSummary(query)
val writeSummary = getWriteSummary()
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
writeSummary match {
case Some(summary) => batchWrite.commit(messages, summary)
Expand Down Expand Up @@ -610,7 +619,7 @@ trait V2TableWriteExec
Nil
}

protected def getWriteSummary(query: SparkPlan): Option[WriteSummary] = None
protected def getWriteSummary(): Option[WriteSummary] = None
}

trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,29 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase {
Row(2, 200, "software")))
}

test("delete with NOT IN over empty subquery") {
withTempView("empty_subq") {
createAndInitTable("pk INT NOT NULL, id INT NOT NULL, dep STRING",
"""{ "pk": 1, "id": 1, "dep": "hr" }
|{ "pk": 2, "id": 2, "dep": "hr" }
|{ "pk": 3, "id": 3, "dep": "hr" }
|""".stripMargin)

Seq.empty[Int].toDF("v").createOrReplaceTempView("empty_subq")

sql(
s"""DELETE FROM $tableNameAsString
|WHERE id NOT IN (SELECT v FROM empty_subq)
|""".stripMargin)

checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil)
// The filter gets replaced by an EmptyRelation in the ReplaceData executed plan, which hides
// the executed BatchScan and prevents computing numDeletedRows using numOutputRows of the
// scan node.
checkDeleteMetrics(numDeletedRows = if (deltaDelete) 3 else -1, numCopiedRows = 0)
}
}

private def executeDeleteWithFilters(query: String): Unit = {
val executedPlan = executeAndKeepPlan {
sql(query)
Expand Down