From c81af49483ceac003f61bc0e365a7d89d67dbdf3 Mon Sep 17 00:00:00 2001 From: Tobias Stadler Date: Tue, 20 Dec 2022 19:34:49 +0100 Subject: [PATCH 1/3] SPARKC-693 Support for Spark 3.3 --- .../SparkCassandraITFlatSpecBase.scala | 2 +- .../spark/connector/cql/sai/SaiBaseSpec.scala | 2 +- .../sql/CassandraDataSourceSpec.scala | 6 ++--- .../spark/connector/util/CatalystUtil.scala | 2 +- .../datasource/CassandraCatalog.scala | 2 +- .../datasource/CassandraScanBuilder.scala | 19 +++++----------- .../cassandra/CassandraSourceRelation.scala | 2 +- .../CassandraDirectJoinStrategy.scala | 22 +++++++++---------- project/Versions.scala | 2 +- 9 files changed, 25 insertions(+), 34 deletions(-) diff --git a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala index 66f543ce4..54fd2381a 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala @@ -247,7 +247,7 @@ trait SparkCassandraITSpecBase def getCassandraScan(plan: SparkPlan): CassandraScan = { plan.collectLeaves.collectFirst{ - case BatchScanExec(_, cassandraScan: CassandraScan, _) => cassandraScan + case BatchScanExec(_, cassandraScan: CassandraScan, _, _) => cassandraScan }.getOrElse(throw new IllegalArgumentException("No Cassandra Scan Found")) } diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala index 9a5d28dd1..ae922c915 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/sai/SaiBaseSpec.scala @@ -47,7 +47,7 @@ trait SaiBaseSpec extends Matchers with SparkCassandraITSpecBase { def findCassandraScan(plan: SparkPlan): CassandraScan = { plan match { - case BatchScanExec(_, scan: CassandraScan, _) => scan + case BatchScanExec(_, scan: CassandraScan, _, _) => scan case filter: FilterExec => findCassandraScan(filter.child) case project: ProjectExec => findCassandraScan(project.child) case _ => throw new NoSuchElementException("RowDataSourceScanExec was not found in the given plan") diff --git a/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala index c6b93d7ac..8bcfc3fe7 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataSourceSpec.scala @@ -274,10 +274,10 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC if (pushDown) withClue(s"Given Dataframe plan does not contain CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") { df.queryExecution.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _) => a case b@AdaptiveSparkPlanExec(_, _, _, _, _) => b.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _) => a } } shouldBe defined } @@ -288,7 +288,7 @@ class CassandraDataSourceSpec extends SparkCassandraITFlatSpecBase with DefaultC private def assertOnAbsenceOfCassandraInJoin(df: DataFrame): Unit = withClue(s"Given Dataframe plan contains CassandraInJoin in its predecessors.\n${df.queryExecution.sparkPlan.toString()}") { df.queryExecution.executedPlan.collectLeaves().collectFirst{ - case a@BatchScanExec(_, _: CassandraInJoin, _) => a + case a@BatchScanExec(_, _: CassandraInJoin, _, _) => a } shouldBe empty } diff --git a/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala b/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala index 3a523a9e7..5320ebfb7 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/util/CatalystUtil.scala @@ -7,6 +7,6 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec object CatalystUtil { def findCassandraScan(sparkPlan: SparkPlan): Option[CassandraScan] = { - sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _) => scan} + sparkPlan.collectFirst{ case BatchScanExec(_, scan: CassandraScan, _, _) => scan} } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala index bc51f99ff..ea4941878 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraCatalog.scala @@ -190,7 +190,7 @@ class CassandraCatalog extends CatalogPlugin .asJava } - override def dropNamespace(namespace: Array[String]): Boolean = { + override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { checkNamespace(namespace) val keyspace = getKeyspaceMeta(connector, namespace) val dropResult = connector.withSessionDo(session => session.execute(SchemaBuilder.dropKeyspace(keyspace.getName).asCql())) diff --git a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala index be7f03d4d..b1addefa1 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraScanBuilder.scala @@ -13,8 +13,9 @@ import com.datastax.spark.connector.{ColumnRef, RowCountRef, TTL, WriteTime} import org.apache.spark.SparkConf import org.apache.spark.sql.cassandra.CassandraSourceRelation.{AdditionalCassandraPushDownRulesParam, InClauseToJoinWithTableConversionThreshold} import org.apache.spark.sql.cassandra.{AnalyzedPredicates, Auto, BasicCassandraPredicatePushDown, CassandraPredicateRules, CassandraSourceRelation, DsePredicateRules, DseSearchOptimizationSetting, InClausePredicateRules, Off, On, SolrConstants, SolrPredicateRules, TimeUUIDPredicateRules} +import org.apache.spark.sql.connector.expressions.{Expression, Expressions} import org.apache.spark.sql.connector.read._ -import org.apache.spark.sql.connector.read.partitioning.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} import org.apache.spark.sql.sources.{EqualTo, Filter, In} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -307,7 +308,7 @@ case class CassandraScan( } override def outputPartitioning(): Partitioning = { - CassandraPartitioning(tableDef.partitionKey.map(_.columnName).toArray, inputPartitions.length) + new CassandraPartitioning(tableDef.partitionKey.map(_.columnName).map(Expressions.identity).toArray, inputPartitions.length) } override def description(): String = { @@ -317,17 +318,7 @@ case class CassandraScan( } } -case class CassandraPartitioning(partitionKeys: Array[String], numPartitions: Int) extends Partitioning { - - /* - Currently we only satisfy distributions which rely on all partition key values having identical - values. In the future we may be able to support some other distributions but Spark doesn't have - means to support those atm 3.0 - */ - override def satisfy(distribution: Distribution): Boolean = distribution match { - case cD: ClusteredDistribution => partitionKeys.forall(cD.clusteredColumns.contains) - case _ => false - } +class CassandraPartitioning(keys: Array[Expression], numPartitions: Int) extends KeyGroupedPartitioning(keys, numPartitions) { } case class CassandraInJoin( @@ -359,7 +350,7 @@ case class CassandraInJoin( } override def outputPartitioning(): Partitioning = { - CassandraPartitioning(tableDef.partitionKey.map(_.columnName).toArray, numPartitions) + new CassandraPartitioning(tableDef.partitionKey.map(_.columnName).map(Expressions.identity).toArray, numPartitions) } } diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala index 84c31f799..b8c7b3dfc 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala @@ -211,7 +211,7 @@ object CassandraSourceRelation extends Logging { oldPlan.transform { case ds@DataSourceV2Relation(_: CassandraTable, _, _, _, options) => ds.copy(options = applyDirectJoinSetting(options, directJoinSetting)) - case ds@DataSourceV2ScanRelation(_: CassandraTable, scan: CassandraScan, _) => + case ds@DataSourceV2ScanRelation(_: CassandraTable, scan: CassandraScan, _, _) => ds.copy(scan = scan.copy(consolidatedConf = applyDirectJoinSetting(scan.consolidatedConf, directJoinSetting))) } ) diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala index 8aa5ada47..df53e0231 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala @@ -25,7 +25,7 @@ case class CassandraDirectJoinStrategy(spark: SparkSession) extends Strategy wit val conf = spark.sqlContext.conf override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _) + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, _, left, right, _) if hasValidDirectJoin(joinType, leftKeys, rightKeys, condition, left, right) => val (otherBranch, joinTargetBranch, buildType) = { @@ -46,7 +46,7 @@ case class CassandraDirectJoinStrategy(spark: SparkSession) extends Strategy wit val cassandraScanExec = getScanExec(dataSourceOptimizedPlan).get joinTargetBranch match { - case PhysicalOperation(attributes, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _)) => + case PhysicalOperation(attributes, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _)) => val directJoin = CassandraDirectJoinExec( leftKeys, @@ -147,7 +147,7 @@ object CassandraDirectJoinStrategy extends Logging { */ def getScanExec(plan: SparkPlan): Option[BatchScanExec] = { plan.collectFirst { - case exec @ BatchScanExec(_, _: CassandraScan, _) => exec + case exec @ BatchScanExec(_, _: CassandraScan, _, _) => exec } } @@ -170,7 +170,7 @@ object CassandraDirectJoinStrategy extends Logging { def getDSV2CassandraRelation(plan: LogicalPlan): Option[DataSourceV2ScanRelation] = { val children = plan.collectLeaves() if (children.length == 1) { - plan.collectLeaves().collectFirst { case ds @ DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _) => ds } + plan.collectLeaves().collectFirst { case ds @ DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _) => ds } } else { None } @@ -183,7 +183,7 @@ object CassandraDirectJoinStrategy extends Logging { def getCassandraTable(plan: LogicalPlan): Option[CassandraTable] = { val children = plan.collectLeaves() if (children.length == 1) { - children.collectFirst { case DataSourceV2ScanRelation(DataSourceV2Relation(table: CassandraTable, _, _, _, _), _, _) => table } + children.collectFirst { case DataSourceV2ScanRelation(DataSourceV2Relation(table: CassandraTable, _, _, _, _), _, _, _) => table } } else { None } @@ -192,7 +192,7 @@ object CassandraDirectJoinStrategy extends Logging { def getCassandraScan(plan: LogicalPlan): Option[CassandraScan] = { val children = plan.collectLeaves() if (children.length == 1) { - plan.collectLeaves().collectFirst { case DataSourceV2ScanRelation(_: DataSourceV2Relation, cs: CassandraScan, _) => cs } + plan.collectLeaves().collectFirst { case DataSourceV2ScanRelation(_: DataSourceV2Relation, cs: CassandraScan, _, _) => cs } } else { None } @@ -204,8 +204,8 @@ object CassandraDirectJoinStrategy extends Logging { */ def hasCassandraChild[T <: QueryPlan[T]](plan: T): Boolean = { plan.children.size == 1 && plan.children.exists { - case DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _) => true - case BatchScanExec(_, _: CassandraScan, _) => true + case DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _) => true + case BatchScanExec(_, _: CassandraScan, _, _) => true case _ => false } } @@ -235,7 +235,7 @@ object CassandraDirectJoinStrategy extends Logging { def reorderPlan(plan: SparkPlan, directJoin: CassandraDirectJoinExec): SparkPlan = { val reordered = plan match { //This may be the only node in the Plan - case BatchScanExec(_, _: CassandraScan, _) => directJoin + case BatchScanExec(_, _: CassandraScan, _, _) => directJoin // Plan has children case normalPlan => normalPlan.transform { case penultimate if hasCassandraChild(penultimate) => @@ -292,7 +292,7 @@ object CassandraDirectJoinStrategy extends Logging { plan match { case PhysicalOperation( attributes, _, - DataSourceV2ScanRelation(DataSourceV2Relation(cassandraTable: CassandraTable, _, _, _, _), _, _)) => + DataSourceV2ScanRelation(DataSourceV2Relation(cassandraTable: CassandraTable, _, _, _, _), _, _, _)) => val joinKeysExprId = joinKeys.collect{ case attributeReference: AttributeReference => attributeReference.exprId } @@ -324,7 +324,7 @@ object CassandraDirectJoinStrategy extends Logging { */ def containsSafePlans(plan: LogicalPlan): Boolean = { plan match { - case PhysicalOperation(_, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), scan: CassandraScan, _)) + case PhysicalOperation(_, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), scan: CassandraScan, _, _)) if getDirectJoinSetting(scan.consolidatedConf) != AlwaysOff => true case _ => false } diff --git a/project/Versions.scala b/project/Versions.scala index 9004a8cdc..da271d48b 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -21,7 +21,7 @@ object Versions { // and install in a local Maven repository. This is all done automatically, however it will work // only on Unix/OSX operating system. Windows users have to build and install Spark manually if the // desired version is not yet published into a public Maven repository. - val ApacheSpark = "3.2.1" + val ApacheSpark = "3.3.1" val SparkJetty = "9.3.27.v20190418" val SolrJ = "8.3.0" From b6cfdc14180befc12eaf52b61f7c0d427c8bc6ee Mon Sep 17 00:00:00 2001 From: Tobias Stadler Date: Wed, 21 Dec 2022 13:06:50 +0100 Subject: [PATCH 2/3] Adjust jetty version Co-authored-by: Jack Richard Buggins --- project/Versions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Versions.scala b/project/Versions.scala index da271d48b..e730967d9 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -22,7 +22,7 @@ object Versions { // only on Unix/OSX operating system. Windows users have to build and install Spark manually if the // desired version is not yet published into a public Maven repository. val ApacheSpark = "3.3.1" - val SparkJetty = "9.3.27.v20190418" + val SparkJetty = "9.4.48.v20220622" val SolrJ = "8.3.0" /* From d57185117db6e43e087fa8ecb152fb7035750fe7 Mon Sep 17 00:00:00 2001 From: Tobias Stadler Date: Thu, 22 Dec 2022 07:25:46 +0100 Subject: [PATCH 3/3] Adjusted documentation --- CHANGES.txt | 2 ++ README.md | 1 + 2 files changed, 3 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 6b6e3244d..c74798a34 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,5 @@ +3.3.0 + * Spark 3.3.x support (SPARKC-693) 3.2.0 * Spark 3.2.x support (SPARKC-670) * Fix: Cassandra Direct Join doesn't quote keyspace and table names (SPARKC-667) diff --git a/README.md b/README.md index e7ea393dd..861354fc5 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Currently, the following branches are actively supported: | Connector | Spark | Cassandra | Cassandra Java Driver | Minimum Java Version | Supported Scala Versions | | --------- | ------------- | --------- | --------------------- | -------------------- | ----------------------- | +| 3.3 | 3.3 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 | | 3.2 | 3.2 | 2.1.5*, 2.2, 3.x, 4.0 | 4.13 | 8 | 2.12 | | 3.1 | 3.1 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 | | 3.0 | 3.0 | 2.1.5*, 2.2, 3.x, 4.0 | 4.12 | 8 | 2.12 |