-
Notifications
You must be signed in to change notification settings - Fork 922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARKC-312: Implementing FilterOptimizer #1019
base: master
Are you sure you want to change the base?
Conversation
…e.optimization + some more tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just putting some more comments on here, I think we additionally definitely need some integration tests to check some edge cases and such.
val withPushdown = Map("pushdown" -> "true") | ||
val withWhereClauseOptimizationEnabled = Map("spark.cassandra.sql.enable.where.clause.optimization" -> "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace the string here with the parameter defined in the conf file
EnableWhereClauseOptimizationParam.name
just in case we change things later :)
@@ -79,7 +78,15 @@ private[cassandra] class CassandraSourceRelation( | |||
def buildScan(): RDD[Row] = baseRdd.asInstanceOf[RDD[Row]] | |||
|
|||
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = filterPushdown match { | |||
case true => predicatePushDown(filters).handledBySpark.toArray | |||
case true => | |||
val optimizedFilters = FiltersOptimizer(filters).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be better if we got the FilterOptimzer into the predicatePushDown function, then I think we could skip having it written in a bunch of places.
* val Array(f1, f2, ... fn) = ... // such that `where f1 AND f2 AND ... AND fn` | ||
* | ||
*/ | ||
class FiltersOptimizer(filters: Array[Filter]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused why there is a separate class here, do we ever use this without calling .build() immediately after?
val filteredRdd = maybePushdownFilters(prunedRdd, pushdownFilters) | ||
val optimizedFilters = new FiltersOptimizer(filters).build() | ||
val optimizationCanBeApplied = isOptimizationAvailable(optimizedFilters) | ||
val filteredRdd = if(optimizationCanBeApplied) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a rather dangerous optimization sometimes, so I think we should default to off. For example
Table where x < 3 or x > 5 and x ranges from 1 to 10000. Doing two scans here is probably much more expensive than a single scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that both scans will be done in parallel. By default this option is set to false
.
FilterOptimizer will to transform
where
clause to equivalent disjunction normal form.For example where clause
a = 5 and (b > 5 or b < 3)
can be transformed to equivalenta = 5 and b > 5 or a =5 and b < 3
, so now we can create two different table scans with where clausea = 5 and b > 5
anda =5 andb < 3
and union them.