Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object CometConf extends ShimCometConf {

val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";

val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
Expand Down Expand Up @@ -228,8 +230,6 @@ object CometConf extends ShimCometConf {
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("initCap", defaultValue = false)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
Expand Down Expand Up @@ -664,6 +664,26 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(defaultValue)
}

def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
}

def getExprEnabledConfigKey(name: String): String = {
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled"
}

def isExprAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
getBooleanConf(getExprAllowIncompatConfigKey(name), defaultValue = false, conf)
}

def getExprAllowIncompatConfigKey(name: String): String = {
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.allowIncompatible"
}

def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
}
}

object ConfigHelpers {
Expand Down
1 change: 0 additions & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ Comet provides the following configuration settings.
| spark.comet.exec.filter.enabled | Whether to enable filter by default. | true |
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
Expand Down
11 changes: 9 additions & 2 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@ Comet supports the following Spark expressions. Expressions that are marked as S
natively in Comet and provide the same results as Spark, or will fall back to Spark for cases that would not
be compatible.

Expressions that are not Spark-compatible are disabled by default and can be enabled by setting
`spark.comet.expression.allowIncompatible=true`.
All expressions are enabled by default, but can be disabled by setting
`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the expression name as specified in
the following tables, such as `Length`, or `StartsWith`.

Expressions that are not Spark-compatible will fall back to Spark by default and can be enabled by setting
`spark.comet.expression.EXPRNAME.allowIncompatible=true`.

It is also possible to specify `spark.comet.expression.allowIncompatible=true` to enable all
incompatible expressions.

## Conditional Expressions

Expand Down
31 changes: 26 additions & 5 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -595,15 +595,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
SQLConf.get
val conf = SQLConf.get

def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = {
val exprConfName = handler.getExprConfigName(expr)
if (!CometConf.isExprEnabled(exprConfName)) {
withInfo(
expr,
"Expression support is disabled. Set " +
s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to enable it.")
return None
}
handler.getSupportLevel(expr) match {
case Unsupported(notes) =>
withInfo(expr, notes.getOrElse(""))
None
case Incompatible(notes) =>
if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
if (exprAllowIncompat || CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
if (notes.isDefined) {
logWarning(
s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
Expand All @@ -615,8 +624,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
withInfo(
expr,
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
s"${CometConf.COMPAT_GUIDE}.")
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to enable all " +
s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.")
None
}
case Compatible(notes) =>
Expand All @@ -634,7 +644,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
exprToProtoInternal(Literal(value, dataType), inputs, binding)

case UnaryExpression(child) if expr.prettyName == "trycast" =>
val timeZoneId = SQLConf.get.sessionLocalTimeZone
val timeZoneId = conf.sessionLocalTimeZone
val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY)
convert(cast, CometCast)

Expand Down Expand Up @@ -1988,6 +1998,17 @@ trait CometOperatorSerde[T <: SparkPlan] {
*/
trait CometExpressionSerde[T <: Expression] {

/**
* Get a short name for the expression that can be used as part of a config key related to the
* expression, such as enabling or disabling that expression.
*
* @param expr
* The Spark expression.
* @return
* Short name for the expression, defaulting to the Spark class name
*/
def getExprConfigName(expr: T): String = expr.getClass.getSimpleName

/**
* Determine the support level of the expression based on its attributes.
*
Expand Down
15 changes: 7 additions & 8 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ object CometLower extends CometCaseConversionBase[Lower]("lower")

object CometInitCap extends CometScalarFunction[InitCap]("initcap") {

override def getSupportLevel(expr: InitCap): SupportLevel = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really incompatible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes:

!== Correct Answer - 7 ==       == Spark Answer - 7 ==
 struct<initcap(name):string>   struct<initcap(name):string>
 [James Smith]                  [James Smith]
 [James Smith]                  [James Smith]
 [James Ähtäri]                 [James Ähtäri]
 [Michael Rose]                 [Michael Rose]
 [Rames Rose]                   [Rames Rose]
![Robert Rose-smith]            [Robert Rose-Smith]
 [Robert Williams]              [Robert Williams]

// Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark
// will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith".
// https://github.com/apache/datafusion-comet/issues/1052
Incompatible(None)
}

override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
if (!CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
withInfo(
expr,
"Comet initCap is not compatible with Spark yet. " +
"See https://github.com/apache/datafusion-comet/issues/1052 ." +
s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it anyway.")
return None
}
super.convert(expr, inputs, binding)
}
}
Expand Down
36 changes: 35 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec}
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec}
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -1301,6 +1301,40 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("disable expression using dynamic config") {
def countSparkProjectExec(plan: SparkPlan) = {
plan.collect { case _: ProjectExec =>
true
}.length
}
withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") {
val sql = "select _1+_2 from tbl"
val (_, cometPlan) = checkSparkAnswer(sql)
assert(0 == countSparkProjectExec(cometPlan))
withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") {
val (_, cometPlan) = checkSparkAnswer(sql)
assert(1 == countSparkProjectExec(cometPlan))
}
}
}

test("enable incompat expression using dynamic config") {
def countSparkProjectExec(plan: SparkPlan) = {
plan.collect { case _: ProjectExec =>
true
}.length
}
withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") {
val sql = "select initcap(_1) from tbl"
val (_, cometPlan) = checkSparkAnswer(sql)
assert(1 == countSparkProjectExec(cometPlan))
withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") {
val (_, cometPlan) = checkSparkAnswer(sql)
assert(0 == countSparkProjectExec(cometPlan))
}
}
}

test("signum") {
testDoubleScalarExpr("signum")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ class CometStringExpressionSuite extends CometTestBase {
s"insert into $table values(1, 'james smith'), (2, 'michael rose'), " +
"(3, 'robert williams'), (4, 'rames rose'), (5, 'james smith'), " +
"(6, 'robert rose-smith'), (7, 'james ähtäri')")
if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
// TODO: remove this if clause https://github.com/apache/datafusion-comet/issues/1052
checkSparkAnswerAndOperator(s"SELECT initcap(name) FROM $table")
} else {
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
}
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is #1052
resolved?
If so we should use checkSparkAnswerAndOperator

}
}

Expand Down
Loading