Skip to content

[SPARK-52235][SQL] Add implicit cast to DefaultValue V2 Expressions passed to DSV2 #50959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object AnsiTypeCoercion extends TypeCoercionBase {
UnpivotCoercion ::
WidenSetOperationTypes ::
ProcedureArgumentCoercion ::
DefaultValueExpressionCoercion ::
new AnsiCombinedTypeCoercionRule(
CollationTypeCasts ::
InConversion ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object TypeCoercion extends TypeCoercionBase {
UnpivotCoercion ::
WidenSetOperationTypes ::
ProcedureArgumentCoercion ::
DefaultValueExpressionCoercion ::
new CombinedTypeCoercionRule(
CollationTypeCasts ::
InConversion ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ import org.apache.spark.sql.catalyst.expressions.{
WindowSpecDefinition
}
import org.apache.spark.sql.catalyst.plans.logical.{
AddColumns,
AlterColumns,
Call,
CreateTable,
Except,
Intersect,
LogicalPlan,
Project,
ReplaceTable,
Union,
Unpivot
}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -81,6 +87,71 @@ abstract class TypeCoercionBase extends TypeCoercionHelper {
}
}

/**
* A type coercion rule that implicitly casts default value expression in DDL statements
* to expected types.
*/
object DefaultValueExpressionCoercion extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment for this new rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case createTable @ CreateTable(_, cols, _, _, _) if createTable.resolved &&
cols.exists(_.defaultValue.isDefined) =>
val newCols = cols.map { c =>
c.copy(defaultValue = c.defaultValue.map(d =>
d.copy(child = ResolveDefaultColumns.coerceDefaultValue(
d.child,
c.dataType,
"CREATE TABLE",
c.name,
d.originalSQL,
castWiderOnlyLiterals = false))))
}
createTable.copy(columns = newCols)

case replaceTable @ ReplaceTable(_, cols, _, _, _) if replaceTable.resolved &&
cols.exists(_.defaultValue.isDefined) =>
val newCols = cols.map { c =>
c.copy(defaultValue = c.defaultValue.map(d =>
d.copy(child = ResolveDefaultColumns.coerceDefaultValue(
d.child,
c.dataType,
"REPLACE TABLE",
c.name,
d.originalSQL,
castWiderOnlyLiterals = false))))
}
replaceTable.copy(columns = newCols)

case addColumns @ AddColumns(_, cols) if addColumns.resolved &&
cols.exists(_.default.isDefined) =>
val newCols = cols.map { c =>
c.copy(default = c.default.map(d =>
d.copy(child = ResolveDefaultColumns.coerceDefaultValue(
d.child,
c.dataType,
"ALTER TABLE ADD COLUMNS",
c.colName,
d.originalSQL,
castWiderOnlyLiterals = false))))
}
addColumns.copy(columnsToAdd = newCols)

case alterColumns @ AlterColumns(_, specs) if alterColumns.resolved &&
specs.exists(_.newDefaultExpression.isDefined) =>
val newSpecs = specs.map { c =>
val dataType = c.column.asInstanceOf[ResolvedFieldName].field.dataType
c.copy(newDefaultExpression = c.newDefaultExpression.map(d =>
d.copy(child = ResolveDefaultColumns.coerceDefaultValue(
d.child,
dataType,
"ALTER TABLE ALTER COLUMN",
c.column.name.quoted,
d.originalSQL,
castWiderOnlyLiterals = false))))
}
alterColumns.copy(specs = newSpecs)
}
}

/**
* Widens the data types of the [[Unpivot]] values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,34 @@ object ResolveDefaultColumns extends QueryErrorsBase
targetType: DataType,
colName: String): Option[Expression] = {
expr match {
case l: Literal if !Seq(targetType, l.dataType).exists(_ match {
case l: Literal => defaultValueFromWiderType(l, targetType, colName)
Copy link
Contributor Author

@szehon-ho szehon-ho May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To explain this, the analyze() function in this class for strings does the following

  1. analyze/ checkAnalyze
  2. constant fold
  3. cast

However, because we add the rule in TypeCoercion (Analyzer phase), it runs before constant folding. Hence, relaxing the cast code to also take Expression instead of Literal.

case _ => None
}
}

/**
* If the provided default value is a literal of a wider type than the target column,
* but the literal value fits within the narrower type, just coerce it for convenience.
* Exclude boolean/array/struct/map types from consideration for this type coercion to
* avoid surprising behavior like interpreting "false" as integer zero.
*/
def defaultValueFromWiderType(
expr: Expression,
targetType: DataType,
colName: String): Option[Expression] = {
expr match {
case e if !Seq(targetType, e.dataType).exists(_ match {
case _: BooleanType | _: ArrayType | _: StructType | _: MapType => true
case _ => false
}) =>
val casted = Cast(l, targetType, Some(conf.sessionLocalTimeZone), evalMode = EvalMode.TRY)
val casted = Cast(e, targetType, Some(conf.sessionLocalTimeZone), evalMode = EvalMode.TRY)
try {
Option(casted.eval(EmptyRow)).map(Literal(_, targetType))
} catch {
case e @ ( _: SparkThrowable | _: RuntimeException) =>
logWarning(log"Failed to cast default value '${MDC(COLUMN_DEFAULT_VALUE, l)}' " +
logWarning(log"Failed to cast default value '${MDC(COLUMN_DEFAULT_VALUE, e)}' " +
log"for column ${MDC(COLUMN_NAME, colName)} " +
log"from ${MDC(COLUMN_DATA_TYPE_SOURCE, l.dataType)} " +
log"from ${MDC(COLUMN_DATA_TYPE_SOURCE, expr.dataType)} " +
log"to ${MDC(COLUMN_DATA_TYPE_TARGET, targetType)} " +
log"due to ${MDC(ERROR, e.getMessage)}", e)
None
Expand All @@ -461,7 +477,8 @@ object ResolveDefaultColumns extends QueryErrorsBase
dataType: DataType,
statementType: String,
colName: String,
defaultSQL: String): Expression = {
defaultSQL: String,
castWiderOnlyLiterals: Boolean = true): Expression = {
val supplanted = CharVarcharUtils.replaceCharVarcharWithString(dataType)
// Perform implicit coercion from the provided expression type to the required column type.
val ret = analyzed match {
Expand All @@ -470,7 +487,12 @@ object ResolveDefaultColumns extends QueryErrorsBase
case canUpCast if Cast.canUpCast(canUpCast.dataType, supplanted) =>
Cast(analyzed, supplanted, Some(conf.sessionLocalTimeZone))
case other =>
defaultValueFromWiderTypeLiteral(other, supplanted, colName).getOrElse(
val casted = if (castWiderOnlyLiterals) {
defaultValueFromWiderTypeLiteral(other, supplanted, colName)
} else {
defaultValueFromWiderType(other, supplanted, colName)
}
casted.getOrElse(
throw QueryCompilationErrors.defaultValuesDataTypeError(
statementType, colName, defaultSQL, dataType, other.dataType))
}
Expand Down
Loading