Skip to content

Commit 9cb0cc4

Browse files
authored
feat: Add dynamic enabled and allowIncompat configs for all supported expressions (#2329) (#2385)
1 parent 68c1d33 commit 9cb0cc4

File tree

7 files changed

+100
-25
lines changed

7 files changed

+100
-25
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ object CometConf extends ShimCometConf {
6565

6666
val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
6767

68+
val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
69+
6870
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
6971
.doc(
7072
"Whether to enable Comet extension for Spark. When this is turned on, Spark will use " +
@@ -228,8 +230,6 @@ object CometConf extends ShimCometConf {
228230
createExecEnabledConfig("window", defaultValue = true)
229231
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
230232
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)
231-
val COMET_EXEC_INITCAP_ENABLED: ConfigEntry[Boolean] =
232-
createExecEnabledConfig("initCap", defaultValue = false)
233233

234234
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
235235
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
@@ -664,6 +664,26 @@ object CometConf extends ShimCometConf {
664664
.booleanConf
665665
.createWithDefault(defaultValue)
666666
}
667+
668+
def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
669+
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
670+
}
671+
672+
def getExprEnabledConfigKey(name: String): String = {
673+
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.enabled"
674+
}
675+
676+
def isExprAllowIncompat(name: String, conf: SQLConf = SQLConf.get): Boolean = {
677+
getBooleanConf(getExprAllowIncompatConfigKey(name), defaultValue = false, conf)
678+
}
679+
680+
def getExprAllowIncompatConfigKey(name: String): String = {
681+
s"${CometConf.COMET_EXPR_CONFIG_PREFIX}.$name.allowIncompatible"
682+
}
683+
684+
def getBooleanConf(name: String, defaultValue: Boolean, conf: SQLConf): Boolean = {
685+
conf.getConfString(name, defaultValue.toString).toLowerCase(Locale.ROOT) == "true"
686+
}
667687
}
668688

669689
object ConfigHelpers {

docs/source/user-guide/latest/configs.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ Comet provides the following configuration settings.
4848
| spark.comet.exec.filter.enabled | Whether to enable filter by default. | true |
4949
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
5050
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
51-
| spark.comet.exec.initCap.enabled | Whether to enable initCap by default. | false |
5251
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
5352
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
5453
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |

docs/source/user-guide/latest/expressions.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,15 @@ Comet supports the following Spark expressions. Expressions that are marked as S
2323
natively in Comet and provide the same results as Spark, or will fall back to Spark for cases that would not
2424
be compatible.
2525

26-
Expressions that are not Spark-compatible are disabled by default and can be enabled by setting
27-
`spark.comet.expression.allowIncompatible=true`.
26+
All expressions are enabled by default, but can be disabled by setting
27+
`spark.comet.expression.EXPRNAME.enabled=false`, where `EXPRNAME` is the expression name as specified in
28+
the following tables, such as `Length`, or `StartsWith`.
29+
30+
Expressions that are not Spark-compatible will fall back to Spark by default and can be enabled by setting
31+
`spark.comet.expression.EXPRNAME.allowIncompatible=true`.
32+
33+
It is also possible to specify `spark.comet.expression.allowIncompatible=true` to enable all
34+
incompatible expressions.
2835

2936
## Conditional Expressions
3037

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -595,15 +595,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
595595
expr: Expression,
596596
inputs: Seq[Attribute],
597597
binding: Boolean): Option[Expr] = {
598-
SQLConf.get
598+
val conf = SQLConf.get
599599

600600
def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = {
601+
val exprConfName = handler.getExprConfigName(expr)
602+
if (!CometConf.isExprEnabled(exprConfName)) {
603+
withInfo(
604+
expr,
605+
"Expression support is disabled. Set " +
606+
s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to enable it.")
607+
return None
608+
}
601609
handler.getSupportLevel(expr) match {
602610
case Unsupported(notes) =>
603611
withInfo(expr, notes.getOrElse(""))
604612
None
605613
case Incompatible(notes) =>
606-
if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
614+
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
615+
if (exprAllowIncompat || CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) {
607616
if (notes.isDefined) {
608617
logWarning(
609618
s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " +
@@ -615,8 +624,9 @@ object QueryPlanSerde extends Logging with CometExprShim {
615624
withInfo(
616625
expr,
617626
s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " +
618-
s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " +
619-
s"${CometConf.COMPAT_GUIDE}.")
627+
s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " +
628+
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to enable all " +
629+
s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.")
620630
None
621631
}
622632
case Compatible(notes) =>
@@ -634,7 +644,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
634644
exprToProtoInternal(Literal(value, dataType), inputs, binding)
635645

636646
case UnaryExpression(child) if expr.prettyName == "trycast" =>
637-
val timeZoneId = SQLConf.get.sessionLocalTimeZone
647+
val timeZoneId = conf.sessionLocalTimeZone
638648
val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY)
639649
convert(cast, CometCast)
640650

@@ -1988,6 +1998,17 @@ trait CometOperatorSerde[T <: SparkPlan] {
19881998
*/
19891999
trait CometExpressionSerde[T <: Expression] {
19902000

2001+
/**
2002+
* Get a short name for the expression that can be used as part of a config key related to the
2003+
* expression, such as enabling or disabling that expression.
2004+
*
2005+
* @param expr
2006+
* The Spark expression.
2007+
* @return
2008+
* Short name for the expression, defaulting to the Spark class name
2009+
*/
2010+
def getExprConfigName(expr: T): String = expr.getClass.getSimpleName
2011+
19912012
/**
19922013
* Determine the support level of the expression based on its attributes.
19932014
*

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,14 @@ object CometLower extends CometCaseConversionBase[Lower]("lower")
6868

6969
object CometInitCap extends CometScalarFunction[InitCap]("initcap") {
7070

71+
override def getSupportLevel(expr: InitCap): SupportLevel = {
72+
// Behavior differs from Spark. One example is that for the input "robert rose-smith", Spark
73+
// will produce "Robert Rose-smith", but Comet will produce "Robert Rose-Smith".
74+
// https://github.com/apache/datafusion-comet/issues/1052
75+
Incompatible(None)
76+
}
77+
7178
override def convert(expr: InitCap, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = {
72-
if (!CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
73-
withInfo(
74-
expr,
75-
"Comet initCap is not compatible with Spark yet. " +
76-
"See https://github.com/apache/datafusion-comet/issues/1052 ." +
77-
s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it anyway.")
78-
return None
79-
}
8079
super.convert(expr, inputs, binding)
8180
}
8281
}

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3333
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
3434
import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps
3535
import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec}
36-
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec}
36+
import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec}
3737
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3838
import org.apache.spark.sql.expressions.Window
3939
import org.apache.spark.sql.functions._
@@ -1301,6 +1301,40 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
13011301
}
13021302
}
13031303

1304+
test("disable expression using dynamic config") {
1305+
def countSparkProjectExec(plan: SparkPlan) = {
1306+
plan.collect { case _: ProjectExec =>
1307+
true
1308+
}.length
1309+
}
1310+
withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") {
1311+
val sql = "select _1+_2 from tbl"
1312+
val (_, cometPlan) = checkSparkAnswer(sql)
1313+
assert(0 == countSparkProjectExec(cometPlan))
1314+
withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") {
1315+
val (_, cometPlan) = checkSparkAnswer(sql)
1316+
assert(1 == countSparkProjectExec(cometPlan))
1317+
}
1318+
}
1319+
}
1320+
1321+
test("enable incompat expression using dynamic config") {
1322+
def countSparkProjectExec(plan: SparkPlan) = {
1323+
plan.collect { case _: ProjectExec =>
1324+
true
1325+
}.length
1326+
}
1327+
withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") {
1328+
val sql = "select initcap(_1) from tbl"
1329+
val (_, cometPlan) = checkSparkAnswer(sql)
1330+
assert(1 == countSparkProjectExec(cometPlan))
1331+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") {
1332+
val (_, cometPlan) = checkSparkAnswer(sql)
1333+
assert(0 == countSparkProjectExec(cometPlan))
1334+
}
1335+
}
1336+
}
1337+
13041338
test("signum") {
13051339
testDoubleScalarExpr("signum")
13061340
}

spark/src/test/scala/org/apache/comet/CometStringExpressionSuite.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,7 @@ class CometStringExpressionSuite extends CometTestBase {
103103
s"insert into $table values(1, 'james smith'), (2, 'michael rose'), " +
104104
"(3, 'robert williams'), (4, 'rames rose'), (5, 'james smith'), " +
105105
"(6, 'robert rose-smith'), (7, 'james ähtäri')")
106-
if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
107-
// TODO: remove this if clause https://github.com/apache/datafusion-comet/issues/1052
108-
checkSparkAnswerAndOperator(s"SELECT initcap(name) FROM $table")
109-
} else {
110-
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
111-
}
106+
checkSparkAnswer(s"SELECT initcap(name) FROM $table")
112107
}
113108
}
114109

0 commit comments

Comments
 (0)