Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ case class OffloadDeltaCommand() extends OffloadSingleNode with DeltaCommand {
}
}

// Currently only plain OPTIMIZE bin-packing is supported for command offload. OPTIMIZE
// variants with layout-specific semantics, such as ZORDER, REORG, OPTIMIZE FULL, or
// liquid clustering, continue to use Delta's original command path.
// Currently OPTIMIZE bin-packing and ZORDER are supported for command offload.
// REORG, OPTIMIZE FULL, and liquid clustering continue to use Delta's original command path.
private def shouldOffloadOptimize(optimize: OptimizeTableCommand): Boolean = {
optimize.zOrderBy.isEmpty &&
optimize.optimizeContext.reorg.isEmpty &&
!optimize.optimizeContext.isFull &&
!isClusteredOptimize(optimize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,119 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
}
}

test("native delta optimize zorder command should be offloaded") {
withNativeWriteOffloadConf {
withTempDir {
dir =>
val path = dir.getCanonicalPath
spark
.range(0, 128, 1, 8)
.selectExpr("id", "cast(id % 4 as int) as part")
.write
.format("delta")
.mode("append")
.save(path)
spark
.range(128, 256, 1, 8)
.selectExpr("id", "cast(id % 4 as int) as part")
.write
.format("delta")
.mode("append")
.save(path)

val deltaLog = DeltaLog.forTable(spark, path)
val beforeFiles = files(deltaLog)

withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK.key -> "false",
DeltaSQLConf.MDC_ADD_NOISE.key -> "false") {
val optimizeDf = sql(s"OPTIMIZE delta.`$path` ZORDER BY (id, part)")
assertContainsNativeWriteCommand(
optimizeDf.queryExecution.executedPlan,
"OPTIMIZE ZORDER BY")
val metrics = collectOptimizeMetrics(optimizeDf)

val afterFiles = files(deltaLog)
assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "OPTIMIZE ZORDER")
assertOptimizeCommit(deltaLog, "OPTIMIZE ZORDER")
}

val result = spark.read.format("delta").load(path)
val summary = result.selectExpr("count(*)", "min(id)", "max(id)").head()
assert(summary.getLong(0) == 256L)
assert(summary.getLong(1) == 0L)
assert(summary.getLong(2) == 255L)
}
}
}

test("native delta optimize zorder partition predicate command should be offloaded") {
withNativeWriteOffloadConf {
withTempDir {
dir =>
val path = dir.getCanonicalPath
spark
.range(0, 40, 1, 4)
.selectExpr("id", "cast(id % 2 as int) as part")
.write
.format("delta")
.partitionBy("part")
.mode("append")
.save(path)
spark
.range(40, 80, 1, 4)
.selectExpr("id", "cast(id % 2 as int) as part")
.write
.format("delta")
.partitionBy("part")
.mode("append")
.save(path)

val deltaLog = DeltaLog.forTable(spark, path)
val beforeFiles = files(deltaLog)
val beforePart0Paths = beforeFiles
.filter(_.partitionValues.get("part").contains("0"))
.map(_.path)
val beforePart1Count = beforeFiles.count(_.partitionValues.get("part").contains("1"))

withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK.key -> "false",
DeltaSQLConf.MDC_ADD_NOISE.key -> "false") {
val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1 ZORDER BY (id)")
assertContainsNativeWriteCommand(
optimizeDf.queryExecution.executedPlan,
"OPTIMIZE WHERE ZORDER BY")
val metrics = collectOptimizeMetrics(optimizeDf)

val afterFiles = files(deltaLog)
val afterPart0Paths = afterFiles
.filter(_.partitionValues.get("part").contains("0"))
.map(_.path)
val afterPart1Count = afterFiles.count(_.partitionValues.get("part").contains("1"))
assert(
beforePart0Paths.subsetOf(afterPart0Paths),
"OPTIMIZE WHERE part = 1 ZORDER should not remove files from part = 0")
assert(
afterPart1Count < beforePart1Count,
s"Expected fewer active files in part = 1, before=$beforePart1Count " +
s"after=$afterPart1Count")
assertCompactionMetrics(
metrics,
beforeFiles.size,
afterFiles.size,
"partition predicate OPTIMIZE ZORDER",
expectedPartitionsOptimized = Some(1L))
assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE ZORDER")
}

val result = spark.read.format("delta").load(path)
assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L until 80L).toSet)
assert(result.where("part = 0").count() == 40)
assert(result.where("part = 1").count() == 40)
}
}
}

test("delta optimize command should not be offloaded when native write is disabled") {
withNativeWriteOffloadConf {
withTempDir {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,119 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
}
}

test("native delta optimize zorder command should be offloaded") {
withNativeWriteOffloadConf {
withTempDir {
dir =>
val path = dir.getCanonicalPath
spark
.range(0, 128, 1, 8)
.selectExpr("id", "cast(id % 4 as int) as part")
.write
.format("delta")
.mode("append")
.save(path)
spark
.range(128, 256, 1, 8)
.selectExpr("id", "cast(id % 4 as int) as part")
.write
.format("delta")
.mode("append")
.save(path)

val deltaLog = DeltaLog.forTable(spark, path)
val beforeFiles = files(deltaLog)

withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK.key -> "false",
DeltaSQLConf.MDC_ADD_NOISE.key -> "false") {
val optimizeDf = sql(s"OPTIMIZE delta.`$path` ZORDER BY (id, part)")
assertContainsNativeWriteCommand(
Seq(optimizeDf.queryExecution.executedPlan),
"OPTIMIZE ZORDER BY")
val metrics = collectOptimizeMetrics(optimizeDf)

val afterFiles = files(deltaLog)
assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "OPTIMIZE ZORDER")
assertOptimizeCommit(deltaLog, "OPTIMIZE ZORDER")
}

val result = spark.read.format("delta").load(path)
val summary = result.selectExpr("count(*)", "min(id)", "max(id)").head()
assert(summary.getLong(0) == 256L)
assert(summary.getLong(1) == 0L)
assert(summary.getLong(2) == 255L)
}
}
}

test("native delta optimize zorder partition predicate command should be offloaded") {
withNativeWriteOffloadConf {
withTempDir {
dir =>
val path = dir.getCanonicalPath
spark
.range(0, 40, 1, 4)
.selectExpr("id", "cast(id % 2 as int) as part")
.write
.format("delta")
.partitionBy("part")
.mode("append")
.save(path)
spark
.range(40, 80, 1, 4)
.selectExpr("id", "cast(id % 2 as int) as part")
.write
.format("delta")
.partitionBy("part")
.mode("append")
.save(path)

val deltaLog = DeltaLog.forTable(spark, path)
val beforeFiles = files(deltaLog)
val beforePart0Paths = beforeFiles
.filter(_.partitionValues.get("part").contains("0"))
.map(_.path)
val beforePart1Count = beforeFiles.count(_.partitionValues.get("part").contains("1"))

withSQLConf(
DeltaSQLConf.DELTA_OPTIMIZE_ZORDER_COL_STAT_CHECK.key -> "false",
DeltaSQLConf.MDC_ADD_NOISE.key -> "false") {
val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1 ZORDER BY (id)")
assertContainsNativeWriteCommand(
Seq(optimizeDf.queryExecution.executedPlan),
"OPTIMIZE WHERE ZORDER BY")
val metrics = collectOptimizeMetrics(optimizeDf)

val afterFiles = files(deltaLog)
val afterPart0Paths = afterFiles
.filter(_.partitionValues.get("part").contains("0"))
.map(_.path)
val afterPart1Count = afterFiles.count(_.partitionValues.get("part").contains("1"))
assert(
beforePart0Paths.subsetOf(afterPart0Paths),
"OPTIMIZE WHERE part = 1 ZORDER should not remove files from part = 0")
assert(
afterPart1Count < beforePart1Count,
s"Expected fewer active files in part = 1, before=$beforePart1Count " +
s"after=$afterPart1Count")
assertCompactionMetrics(
metrics,
beforeFiles.size,
afterFiles.size,
"partition predicate OPTIMIZE ZORDER",
expectedPartitionsOptimized = Some(1L))
assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE ZORDER")
}

val result = spark.read.format("delta").load(path)
assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L until 80L).toSet)
assert(result.where("part = 0").count() == 40)
assert(result.where("part = 1").count() == 40)
}
}
}

test("delta optimize command should not be offloaded when native write is disabled") {
withNativeWriteOffloadConf {
withTempDir {
Expand Down
Loading
Loading