Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] Optimization of subqueries involving date columns with spark.sql.datetime.java8API.enabled set to true causes a ClassCastException error #4201

Open
2 of 8 tasks
mikoloay opened this issue Feb 28, 2025 · 0 comments
Labels
bug Something isn't working

Comments

@mikoloay
Copy link

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

Hello, our team observed a bug in a Delta query optimization which appears when the following conditions are met:

  1. spark.sql.datetime.java8API.enabled = "true"
  2. We read a dataframe from a Delta table containing a date column (the dates are serialized as java.time.LocalDate)
  3. We filter the dataframe using a subquery performing an aggregation such as MIN on the date column (please look at the code snippet in the next section)

The subquery gets optimized with logic from the OptimizeMetadataOnlyDeltaQuery trait. The trait contains the convertValueIfRequired function which tries to cast DateType columns to the java.sql.Date class. In this case it fails because the LocalDate is used instead.

The convertValueIfRequired function was added in the Delta-Spark 3.1.0 release and it exists in all releases >=3.1.0.

Steps to reproduce

I prepared a Scala script which can be executed with scala-cli and which results in the reported error. The bug appears when executing the (SELECT MIN(DATE_COL) FROM MY_TABLE) subquery.

//> using scala 2.13
//> using dep org.apache.spark::spark-sql:3.5.5
//> using dep io.delta::delta-spark:3.3.0

import org.apache.spark.sql.SparkSession

object Main extends App {
  val spark = SparkSession.builder()
      .appName("TestDeltaDateConversion")
      .master("local[*]")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .config("spark.sql.datetime.java8API.enabled", "true")
      .getOrCreate()
  
    spark.sql("SELECT DATE '2025-01-01' as DATE_COL").write.format("delta").mode("overwrite").save("sourceData")

    val df = spark.read.format("delta").load("sourceData")
    df.createOrReplaceTempView("MY_TABLE")

    val dfFiltered = df.where("DATE_COL >= (SELECT MIN(DATE_COL) FROM MY_TABLE)")
    dfFiltered.show()
}

Observed results

The following error appears:

Exception in thread "main" java.lang.ClassCastException: class java.time.LocalDate cannot be cast to class java.sql.Date (java.time.LocalDate is in module java.base of loader 'bootstrap'; java.sql.Date is in module java.sql of loader 'platform')
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery.org$apache$spark$sql$delta$perf$OptimizeMetadataOnlyDeltaQuery$$convertValueIfRequired$1(OptimizeMetadataOnlyDeltaQuery.scala:75)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery$$anonfun$1.applyOrElse(OptimizeMetadataOnlyDeltaQuery.scala:91)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery$$anonfun$1.applyOrElse(OptimizeMetadataOnlyDeltaQuery.scala:81)
        at scala.collection.immutable.List.collect(List.scala:268)
        at scala.collection.immutable.List.collect(List.scala:79)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery.org$apache$spark$sql$delta$perf$OptimizeMetadataOnlyDeltaQuery$$createLocalRelationPlan(OptimizeMetadataOnlyDeltaQuery.scala:81)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery$$anonfun$optimizeQueryWithMetadata$1.applyOrElse(OptimizeMetadataOnlyDeltaQuery.scala:52)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery$$anonfun$optimizeQueryWithMetadata$1.applyOrElse(OptimizeMetadataOnlyDeltaQuery.scala:50)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1.applyOrElse(QueryPlan.scala:501)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1.applyOrElse(QueryPlan.scala:495)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformUpWithSubqueries(QueryPlan.scala:495)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1$$anonfun$1.applyOrElse(QueryPlan.scala:498)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1$$anonfun$1.applyOrElse(QueryPlan.scala:496)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1244)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1241)
        at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1244)
        at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1241)
        at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:198)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:210)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:221)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:231)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:231)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:198)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:178)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1.applyOrElse(QueryPlan.scala:496)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformUpWithSubqueries$1.applyOrElse(QueryPlan.scala:495)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:515)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
        at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
        at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1608)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
        at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1587)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning(AnalysisHelper.scala:279)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUpWithPruning$(AnalysisHelper.scala:275)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUpWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:488)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformUpWithSubqueries(QueryPlan.scala:495)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery.optimizeQueryWithMetadata(OptimizeMetadataOnlyDeltaQuery.scala:50)
        at org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery.optimizeQueryWithMetadata$(OptimizeMetadataOnlyDeltaQuery.scala:49)
        at org.apache.spark.sql.delta.stats.PrepareDeltaScan.optimizeQueryWithMetadata(PrepareDeltaScan.scala:306)
        at org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply(PrepareDeltaScan.scala:203)
        at org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply$(PrepareDeltaScan.scala:186)
        at org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:306)
        at org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:306)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:838)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:797)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:806)

Expected results

Such subqueries shouldn't raise Exceptions - the java LocalDate format should be recognized and read appropriately.

Further details

Not applicable.

Environment information

  • Delta Lake version: 3.3.0
  • Spark version: 3.5.5
  • Scala version: 2.13.16

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.

The fix in the OptimizeMetadataOnlyDeltaQuery.convertValueIfRequired function should be quite simple - using a try-catch clause, when casting to Date fails, we could cast to LocalDate and use DateTimeUtils.anyToDays instead of DateTimeUtils.fromJavaDate. I'll gladly prepare such a fix if you confirm that this is the way.

@mikoloay mikoloay added the bug Something isn't working label Feb 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant