From 4049f2eee72439f0abd5c4e71a965b2a8c4855ff Mon Sep 17 00:00:00 2001 From: Marquis Wong Date: Tue, 21 Jun 2022 12:05:50 -0500 Subject: [PATCH] SPARKC-686 Port to Scala 2.13 Major changes: * Migrate from scala.collection.JavaConversions to scala.jdk.CollectionConverters * Migrate some uses of Seq to immutable.Seq --- .github/workflows/main.yml | 2 +- build.sbt | 6 +- .../connector/CassandraJavaUtilSpec.scala | 72 +++++++------- .../SparkCassandraITFlatSpecBase.scala | 4 +- .../CassandraAuthenticatedConnectorSpec.scala | 2 +- .../rdd/CassandraJavaPairRDDSpec.scala | 16 ++-- .../connector/rdd/CassandraJavaRDDSpec.scala | 94 ++++++++++--------- .../japi/GenericJavaRowReaderFactory.java | 4 +- .../connector/japi/PairRDDJavaFunctions.java | 3 +- .../connector/japi/RDDJavaFunctions.java | 2 +- .../japi/rdd/CassandraJavaPairRDD.java | 4 +- .../connector/japi/rdd/CassandraJavaRDD.java | 7 +- .../japi/rdd/CassandraTableScanJavaRDD.java | 4 +- .../datasource/CassandraSourceUtil.scala | 8 +- .../connector/rdd/CassandraCoGroupedRDD.scala | 3 +- .../connector/rdd/CassandraMergeJoinRDD.scala | 3 +- .../connector/rdd/CassandraTableScanRDD.scala | 4 +- .../connector/rdd/DseGraphUnionedRDD.scala | 10 +- .../rdd/partitioner/BucketingRangeIndex.scala | 2 +- .../CassandraPartitionGenerator.scala | 9 +- .../rdd/partitioner/DataSizeEstimates.scala | 10 +- .../rdd/partitioner/NodeAddresses.scala | 4 +- .../rdd/partitioner/ReplicaPartitioner.scala | 3 +- .../rdd/partitioner/TokenRangeSplitter.scala | 5 +- .../spark/connector/util/JavaApiHelper.scala | 11 ++- .../connector/util/MergeJoinIterator.scala | 2 +- .../util/MultiMergeJoinIterator.scala | 2 +- .../connector/util/SpanningIterator.scala | 2 +- .../spark/connector/writer/Batch.scala | 2 +- .../writer/ObjectSizeEstimator.scala | 8 +- .../connector/writer/ReplicaLocator.scala | 3 +- .../spark/connector/writer/TableWriter.scala | 2 +- .../spark/connector/writer/WriteOption.scala | 2 +- .../apache/spark/metrics/CassandraSink.scala | 4 +- .../BasicCassandraPredicatePushDown.scala | 4 +- .../sql/cassandra/DsePredicateRules.scala | 4 +- .../CassandraDirectJoinStrategy.scala | 2 +- .../spark/connector/embedded/SparkRepl.scala | 4 +- .../metrics/InputMetricsUpdaterSpec.scala | 5 +- .../spark/connector/GettableData.scala | 9 +- .../datastax/spark/connector/UDTValue.scala | 4 +- .../datastax/spark/connector/cql/Schema.scala | 2 +- .../GettableDataToMappedTypeConverter.scala | 2 +- .../spark/connector/types/CanBuildFrom.scala | 6 +- .../spark/connector/types/ColumnType.scala | 2 +- .../spark/connector/types/TupleType.scala | 10 +- .../spark/connector/types/TypeConverter.scala | 10 +- .../connector/types/UserDefinedType.scala | 9 +- .../spark/connector/util/NameTools.scala | 18 ++-- .../connector/types/TypeConverterTest.scala | 2 +- .../spark/connector/ccm/CcmBridge.scala | 2 +- 51 files changed, 218 insertions(+), 196 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 00de87fc1..2c64f20a5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - scala: [2.12.10] + scala: [2.12.11, 2.13.8] db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2] steps: diff --git a/build.sbt b/build.sbt index 5b7455036..a645d3b79 100644 --- a/build.sbt +++ b/build.sbt @@ -4,10 +4,11 @@ import sbt.{Compile, moduleFilter, _} import sbtassembly.AssemblyPlugin.autoImport.assembly lazy val scala212 = "2.12.11" -lazy val supportedScalaVersions = List(scala212) +lazy val scala213 = "2.13.8" +lazy val supportedScalaVersions = List(scala212, scala213) // factor out common settings -ThisBuild / scalaVersion := scala212 +ThisBuild / scalaVersion := scala213 ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8") // Publishing Info @@ -70,6 +71,7 @@ val annotationProcessor = Seq( def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match { case "2.11" => Seq() case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil, + case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil, } lazy val root = (project in file(".")) diff --git a/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala index 0777c3e98..1466426aa 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala @@ -3,7 +3,7 @@ package com.datastax.spark.connector import com.datastax.spark.connector.ccm.CcmBridge import com.datastax.spark.connector.cluster.DefaultCluster -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.concurrent.Future import org.apache.spark.rdd.RDD import com.datastax.spark.connector.cql.CassandraConnector @@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu val rows = results.all() assert(rows.size() == 3) - assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) } @@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu val rows = results.all() assert(rows.size() == 3) - assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) } it should "allow to save beans with transient fields to Cassandra" in { @@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu val rows = results.all() assert(rows.size() == 3) - assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) } it should "allow to save beans with inherited fields to Cassandra" in { @@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu val rows = results.all() rows should have size 3 - rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set( + rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set( ("one", 1, "a"), ("two", 2, "b"), ("three", 3, "c") @@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu val rows = results.all() assert(rows.size() == 3) - assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3)) } it should "allow to read rows as Tuple1" in { @@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple1( 1: Integer ) @@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple2( 1: Integer, "2" @@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple3( 1: Integer, "2", @@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple4( 1: Integer, "2", @@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple5( 1: Integer, "2", @@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple6( 1: Integer, "2", @@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple7( 1: Integer, "2", @@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple8( 1: Integer, "2", @@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple9( 1: Integer, "2", @@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple10( 1: Integer, "2", @@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple11( 1: Integer, "2", @@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple12( 1: Integer, "2", @@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple13( 1: Integer, "2", @@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple14( 1: Integer, "2", @@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple15( 1: Integer, "2", @@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple16( 1: Integer, "2", @@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple17( 1: Integer, "2", @@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple18( 1: Integer, "2", @@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple19( 1: Integer, "2", @@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple20( 1: Integer, "2", @@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple21( 1: Integer, "2", @@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu )).select( "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22" ) - .collect().head + .collect().asScala.head tuple shouldBe Tuple22( 1: Integer, "2", diff --git a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala index 54fd2381a..06610c432 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala @@ -200,8 +200,8 @@ trait SparkCassandraITSpecBase Await.result(Future.sequence(units), Duration.Inf) } - def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = { - Await.result(Future.sequence(units), Duration.Inf) + def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = { + Await.result(Future.sequence(units.iterator.toList), Duration.Inf) } def keyspaceCql(name: String = ks) = diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala index b7561071e..1c382b4da 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala @@ -66,7 +66,7 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase w "spark.cassandra.auth.password" -> "cassandra", "keyspace" -> ks, "table" -> "authtest") - personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append")save() + personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append").save() val personDF2 = spark.read.format("org.apache.spark.sql.cassandra").options(options).load() personDF2.count should be(4) diff --git a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala index a94fc6811..d39cc24f7 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala @@ -8,8 +8,8 @@ import com.datastax.spark.connector.cql.CassandraConnector import com.datastax.spark.connector.japi.CassandraJavaUtil._ import org.apache.spark.api.java.function.{Function2, Function => JFunction} -import scala.collection.JavaConversions._ import scala.concurrent.Future +import scala.jdk.CollectionConverters._ case class SimpleClass(value: Integer) @@ -103,15 +103,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default "key") .spanBy(f, classOf[Integer]) .collect() + .asScala .toMap results should have size 2 results should contain key 10 results should contain key 20 - results(10).size should be(3) - results(10).map(_._2).toSeq should be(Seq(10, 11, 12)) - results(20).size should be(3) - results(20).map(_._2).toSeq should be(Seq(20, 21, 22)) + results(10).asScala.size should be(3) + results(10).asScala.map(_._2).toSeq should be(Seq(10, 11, 12)) + results(20).asScala.size should be(3) + results(20).asScala.map(_._2).toSeq should be(Seq(20, 21, 22)) } it should "allow to use spanByKey method" in { @@ -129,15 +130,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default "key") .spanByKey() .collect() + .asScala .toMap results should have size 2 results should contain key 10 results should contain key 20 results(10).size should be(3) - results(10).toSeq should be(Seq(10, 11, 12)) + results(10).asScala.toSeq should be(Seq(10, 11, 12)) results(20).size should be(3) - results(20).toSeq should be(Seq(20, 21, 22)) + results(20).asScala.toSeq should be(Seq(20, 21, 22)) } it should "allow to use of keyByAndApplyPartitioner" in { diff --git a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala index a0279d057..1145d0154 100644 --- a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala +++ b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala @@ -2,7 +2,6 @@ package com.datastax.spark.connector.rdd import java.io.IOException import java.net.{InetAddress, InetSocketAddress} - import com.datastax.spark.connector._ import com.datastax.spark.connector.cluster.DefaultCluster import com.datastax.spark.connector.cql.{CassandraConnector, IpBasedContactInfo} @@ -12,8 +11,10 @@ import com.datastax.spark.connector.types.TypeConverter import org.apache.commons.lang3.tuple import org.apache.spark.api.java.function.{Function => JFunction} -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ +import scala.jdk.CollectionConverters.ListHasAsScala import scala.concurrent.Future +import scala.jdk.javaapi.CollectionConverters.asScala class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster { @@ -91,25 +92,25 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus "CassandraJavaRDD" should "allow to read data as CassandraRows " in { val rows = javaFunctions(sc).cassandraTable(ks, "test_table").collect() assert(rows.size == 3) - assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ row.getString("value") == null && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ row.getString("value") == null && row.getInt("key") == 3)) } it should "allow to read data as Java beans " in { val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBean])).collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) - assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) - assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) } it should "allow to read data as Java beans with inherited fields" in { val beans = javaFunctions(sc).cassandraTable(ks, "test_table3", mapRowTo(classOf[SampleJavaBeanSubClass])).collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1 && bean.getSubClassField == "a")) - assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2 && bean.getSubClassField == "b")) - assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3 && bean.getSubClassField == "c")) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1 && bean.getSubClassField == "a")) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2 && bean.getSubClassField == "b")) + assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3 && bean.getSubClassField == "c")) } it should "allow to read data as Java beans with custom mapping defined by aliases" in { @@ -118,17 +119,17 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus .select(column("key").as("devil"), column("value").as("cat")) .collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getCat == "one" && bean.getDevil == 1)) - assert(beans.exists(bean ⇒ bean.getCat == "two" && bean.getDevil == 2)) - assert(beans.exists(bean ⇒ bean.getCat == null && bean.getDevil == 3)) + assert(beans.asScala.exists(bean ⇒ bean.getCat == "one" && bean.getDevil == 1)) + assert(beans.asScala.exists(bean ⇒ bean.getCat == "two" && bean.getDevil == 2)) + assert(beans.asScala.exists(bean ⇒ bean.getCat == null && bean.getDevil == 3)) } it should "allow to read data as Java beans (with multiple constructors)" in { val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBeanWithMultipleCtors])).collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) - assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) - assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) } it should "throw NoSuchMethodException when trying to read data as Java beans (without no-args constructor)" in { @@ -139,18 +140,18 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus it should "allow to read data as nested Java beans" in { val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleWithNestedJavaBean#InnerClass])).collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) - assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) - assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) } it should "allow to read data as deeply nested Java beans" in { val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleWithDeeplyNestedJavaBean#IntermediateClass#InnerClass])).collect() assert(beans.size == 3) - assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) - assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) - assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2)) + assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3)) } @@ -158,9 +159,9 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc).cassandraTable(ks, "test_table") .select("key").collect() assert(rows.size == 3) - assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 1)) - assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 2)) - assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 3)) + assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 1)) + assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 2)) + assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 3)) } it should "return selected columns" in { @@ -174,7 +175,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc).cassandraTable(ks, "test_table") .where("value = ?", "two").collect() assert(rows.size === 1) - assert(rows.exists(row => row.getString("value") == "two" && row.getInt("key") == 2)) + assert(rows.asScala.exists(row => row.getString("value") == "two" && row.getInt("key") == 2)) } it should "allow to read rows as an array of a single-column type supported by TypeConverter" in { @@ -212,7 +213,9 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc) .cassandraTable(ks, "collections", mapColumnToListOf(classOf[String])) .select("l") - .collect().map(_.toList) + .collect() + .asScala + .map(asScala) rows should have size 2 rows should contain(List("item1", "item2")) @@ -223,7 +226,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc) .cassandraTable(ks, "collections", mapColumnToSetOf(classOf[String])) .select("s") - .collect().map(_.toSet) + .collect().asScala.map(asScala) rows should have size 2 rows should contain(Set("item1", "item2")) @@ -234,7 +237,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc) .cassandraTable(ks, "collections", mapColumnToMapOf(classOf[String], classOf[String])) .select("m") - .collect().map(_.toMap) + .collect().asScala.map(asScala) rows should have size 2 rows should contain(Map("key1" → "value1", "key2" → "value2")) @@ -244,7 +247,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus it should "allow to read rows as an array of multi-column type" in { val rows = javaFunctions(sc) .cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBean])) - .collect().map(x => (x.getKey, x.getValue)) + .collect().asScala.map(x => (x.getKey, x.getValue)) rows should have size 3 rows should contain((1, "one")) @@ -256,7 +259,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus val rows = javaFunctions(sc) .cassandraTable(ks, "test_table2", mapRowTo(classOf[SampleJavaBean], tuple.Pair.of("key", "some_key"), tuple.Pair.of("value", "some_value"))) - .collect().map(x => (x.getKey, x.getValue)) + .collect().asScala.map(x => (x.getKey, x.getValue)) rows should have size 3 rows should contain((1, "one")) @@ -288,7 +291,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus mapToRow(classOf[java.lang.Integer]), classOf[Integer], "key") - .collect().map { case (i, x) ⇒ (i, (x.getKey, x.getValue))} + .collect().asScala.map { case (i, x) ⇒ (i, (x.getKey, x.getValue))} rows should have size 3 rows should contain((1, (1, "one"))) @@ -304,7 +307,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus mapRowTo(classOf[SampleJavaBean]), mapToRow(classOf[SampleJavaBean]), classOf[SampleJavaBean]) - .collect().map { case (x, i) ⇒ ((x.getKey, x.getValue), i)} + .collect().asScala.map { case (x, i) ⇒ ((x.getKey, x.getValue), i)} rows should have size 3 rows should contain(((1, "one"), 1)) @@ -319,7 +322,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus mapRowTo(classOf[SampleJavaBean]), mapToRow(classOf[SampleJavaBean]), classOf[SampleJavaBean]) - .collect().map { case (x, y) ⇒ ((x.getKey, x.getValue), (y.getKey, y.getValue))} + .collect().asScala.map { case (x, y) ⇒ ((x.getKey, x.getValue), (y.getKey, y.getValue))} rows should have size 3 rows should contain(((1, "one"), (1, "one"))) @@ -368,7 +371,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus .select("key", "name", "addr").collect() result should have length 1 - val row = result.head + val row = result.asScala.head row.getInt(0) should be(1) row.getString(1) should be("name") @@ -385,7 +388,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus .select("key", "value").collect() result should have length 1 - val row = result.head + val row = result.asScala.head row.getInt(0) should be(1) val tupleValue = row.getTupleValue(1) @@ -405,15 +408,16 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus .select("key", "group", "value") .spanBy[Int](f, classOf[Int]) .collect() + .asScala .toMap results should have size 2 results should contain key 10 results should contain key 20 - results(10).size should be(3) - results(10).map(_.getInt("group")).toSeq should be(Seq(10, 11, 12)) - results(20).size should be(3) - results(20).map(_.getInt("group")).toSeq should be(Seq(20, 21, 22)) + results(10).asScala.size should be(3) + results(10).asScala.map(_.getInt("group")).toSeq should be(Seq(10, 11, 12)) + results(20).asScala.size should be(3) + results(20).asScala.map(_.getInt("group")).toSeq should be(Seq(20, 21, 22)) } it should "allow to set limit" in { @@ -425,13 +429,13 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus it should "allow to set ascending ordering" in { val rdd = javaFunctions(sc).cassandraTable(ks, "wide_rows").where("key=10").withAscOrder - val result = rdd.collect() - result(0).getInt("group") should be(10) + val result = rdd.collect().asScala + result.head.getInt("group") should be(10) } it should "allow to set descending ordering" in { val rdd = javaFunctions(sc).cassandraTable(ks, "wide_rows").where("key=20").withDescOrder - val result = rdd.collect() - result(0).getInt("group") should be(22) + val result = rdd.collect().asScala + result.head.getInt("group") should be(22) } } diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java b/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java index df2207c61..41e25d620 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java @@ -7,8 +7,8 @@ import com.datastax.spark.connector.rdd.reader.RowReader; import com.datastax.spark.connector.rdd.reader.RowReaderFactory; import scala.Option; -import scala.collection.IndexedSeq; -import scala.collection.Seq; +import scala.collection.immutable.IndexedSeq; +import scala.collection.immutable.Seq; public class GenericJavaRowReaderFactory { public final static RowReaderFactory instance = new RowReaderFactory() { diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java b/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java index 95f139f2a..6db23e148 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java @@ -4,6 +4,7 @@ import com.datastax.spark.connector.util.JavaApiHelper; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.rdd.RDD; +import scala.Function1; import scala.Tuple2; import scala.collection.Seq; import scala.reflect.ClassTag; @@ -32,7 +33,7 @@ public JavaPairRDD> spanByKey(ClassTag keyClassTag) { ClassTag>> tupleClassTag = classTag(Tuple2.class); ClassTag> vClassTag = classTag(Collection.class); RDD>> newRDD = pairRDDFunctions.spanByKey() - .map(JavaApiHelper.>valuesAsJavaCollection(), tupleClassTag); + .map(JavaApiHelper.valuesAsJavaCollection(), tupleClassTag); return new JavaPairRDD<>(newRDD, keyClassTag, vClassTag); } diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java b/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java index 085a94c5c..1efe341b3 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java @@ -83,7 +83,7 @@ public JavaPairRDD> spanBy(final Function f, ClassTag> iterableClassTag = CassandraJavaUtil.classTag(Iterable.class); RDD>> newRDD = rddFunctions.spanBy(toScalaFunction1(f)) - .map(JavaApiHelper.>valuesAsJavaIterable(), tupleClassTag); + .map(JavaApiHelper.valuesAsJavaIterable(), tupleClassTag); return new JavaPairRDD<>(newRDD, keyClassTag, iterableClassTag); } diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java index 15960e7f0..c80885405 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java @@ -57,7 +57,7 @@ public CassandraRDD> rdd() { @SuppressWarnings("unchecked") public CassandraJavaPairRDD select(String... columnNames) { Seq columnRefs = toScalaSeq(toSelectableColumnRefs(columnNames)); - CassandraRDD> newRDD = rdd().select(columnRefs); + CassandraRDD> newRDD = rdd().select(columnRefs.toSeq()); return wrap(newRDD); } @@ -71,7 +71,7 @@ public CassandraJavaPairRDD select(String... columnNames) { @SuppressWarnings("unchecked") public CassandraJavaPairRDD select(ColumnRef... selectionColumns) { Seq columnRefs = JavaApiHelper.toScalaSeq(selectionColumns); - CassandraRDD> newRDD = rdd().select(columnRefs); + CassandraRDD> newRDD = rdd().select(columnRefs.toSeq()); return wrap(newRDD); } diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java index 86acfde4a..dd68466e9 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java @@ -3,7 +3,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; -import scala.collection.Seq; +import scala.collection.immutable.Seq; import scala.reflect.ClassTag; import com.datastax.spark.connector.ColumnRef; @@ -17,6 +17,7 @@ import static com.datastax.spark.connector.japi.CassandraJavaUtil.toSelectableColumnRefs; import static com.datastax.spark.connector.util.JavaApiHelper.getClassTag; +import static com.datastax.spark.connector.util.JavaApiHelper.toScalaImmutableSeq; import static com.datastax.spark.connector.util.JavaApiHelper.toScalaSeq; /** @@ -54,7 +55,7 @@ protected CassandraJavaRDD wrap(CassandraRDD newRDD) { * was removed by the previous {@code select} call, it is not possible to add it back. */ public CassandraJavaRDD select(String... columnNames) { - Seq columnRefs = toScalaSeq(toSelectableColumnRefs(columnNames)); + Seq columnRefs = toScalaImmutableSeq(toSelectableColumnRefs(columnNames)); CassandraRDD newRDD = rdd().select(columnRefs); return wrap(newRDD); } @@ -67,7 +68,7 @@ public CassandraJavaRDD select(String... columnNames) { * was removed by the previous {@code select} call, it is not possible to add it back.

*/ public CassandraJavaRDD select(ColumnRef... columns) { - Seq columnRefs = JavaApiHelper.toScalaSeq(columns); + Seq columnRefs = JavaApiHelper.toScalaImmutableSeq(columns); CassandraRDD newRDD = rdd().select(columnRefs); return wrap(newRDD); } diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java index f00d7cd3d..73345bfc0 100644 --- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java +++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java @@ -2,7 +2,7 @@ import com.datastax.spark.connector.writer.RowWriterFactory; import scala.Tuple2; -import scala.collection.Seq; +import scala.collection.immutable.Seq; import scala.reflect.ClassTag; import org.apache.spark.rdd.RDD; @@ -94,7 +94,7 @@ public CassandraJavaPairRDD keyBy( RowWriterFactory rwf, ColumnRef... columns) { - Seq columnRefs = JavaApiHelper.toScalaSeq(columns); + Seq columnRefs = JavaApiHelper.toScalaImmutableSeq(columns); CassandraRDD> resultRDD = columns.length == 0 ? rdd().keyBy(keyClassTag, rrf, rwf) diff --git a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala index d798c123f..111a8dfca 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala @@ -12,7 +12,9 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.types.{DataType => CatalystType} import org.apache.spark.sql.types.{BooleanType => SparkSqlBooleanType, DataType => SparkSqlDataType, DateType => SparkSqlDateType, DecimalType => SparkSqlDecimalType, DoubleType => SparkSqlDoubleType, FloatType => SparkSqlFloatType, MapType => SparkSqlMapType, TimestampType => SparkSqlTimestampType, UserDefinedType => SparkSqlUserDefinedType, _} -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ +import scala.collection.mutable +import scala.language.postfixOps import scala.util.Try object CassandraSourceUtil extends Logging { @@ -147,7 +149,7 @@ object CassandraSourceUtil extends Logging { catalystDataType(dataType, nullable = true), nullable = true) } - StructType(structFields) + StructType(structFields.asJava) } def fromTuple(t: TupleType): StructType = { @@ -157,7 +159,7 @@ object CassandraSourceUtil extends Logging { catalystDataType(dataType, nullable = true), nullable = true) } - StructType(structFields) + StructType(structFields.asJava) } cassandraType match { diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala index dfae0b836..d445f05f7 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala @@ -12,7 +12,7 @@ import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.cql.{BoundStatement, Row} import com.datastax.spark.connector.util._ -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi @@ -128,6 +128,7 @@ class CassandraCoGroupedRDD[T]( try { val stmt = session.prepare(cql) val converters = stmt.getVariableDefinitions + .asScala .map(v => ColumnType.converterToCassandra(v.getType)) .toArray val convertedValues = diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala index 862f3f8bd..1cafd522f 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala @@ -9,7 +9,7 @@ import java.io.IOException import com.datastax.bdp.util.ScalaJavaUtil.asScalaFuture -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi @@ -116,6 +116,7 @@ class CassandraMergeJoinRDD[L,R]( try { val stmt = session.prepare(cql) val converters = stmt.getVariableDefinitions + .asScala .map(v => ColumnType.converterToCassandra(v.getType)) .toArray val convertedValues = diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala index 3f02f5058..4590ab8c7 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala @@ -15,7 +15,7 @@ import org.apache.spark.rdd.{PartitionCoalescer, RDD} import org.apache.spark.{Partition, Partitioner, SparkContext, TaskContext} import java.io.IOException -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.language.existentials import scala.reflect.ClassTag @@ -186,7 +186,7 @@ class CassandraTableScanRDD[R] private[connector]( val selectedColumnNames = columns.selectFrom(tableDef).map(_.columnName).toSet val partitionKeyColumnNames = PartitionKeyColumns.selectFrom(tableDef).map(_.columnName).toSet - if (selectedColumnNames.containsAll(partitionKeyColumnNames)) { + if (selectedColumnNames.asJava.containsAll(partitionKeyColumnNames.asJava)) { val partitioner = partitionGenerator.partitioner[K](columns) logDebug( s"""Made partitioner ${partitioner} for $this""".stripMargin) diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala index d40399698..fadfda971 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala @@ -10,7 +10,7 @@ import java.lang.{String => JString} import java.util.{Map => JMap} import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark.rdd.{RDD, UnionRDD} @@ -33,8 +33,8 @@ object DseGraphUnionedRDD { graphLabels: java.util.List[String])( implicit connector: CassandraConnector): DseGraphUnionedRDD[R] = { - val rddSeq: Seq[RDD[R]] = rdds - val labelSeq: Seq[String] = graphLabels + val rddSeq: Seq[RDD[R]] = rdds.asScala.toSeq + val labelSeq: Seq[String] = graphLabels.asScala.toSeq new DseGraphUnionedRDD(sc, rddSeq, keyspace, labelSeq) } } @@ -188,7 +188,7 @@ class DseGraphPartitioner[V, T <: Token[V]]( */ override def getPartition(key: Any): Int = key match { case vertexId: JMap[JString, AnyRef]@unchecked => { - val label: String = vertexId.getOrElse( + val label: String = vertexId.asScala.getOrElse( LabelAccessor, throw new IllegalArgumentException(s"Couldn't find $LabelAccessor in key $key")) .asInstanceOf[String] @@ -217,7 +217,7 @@ class DseGraphPartitioner[V, T <: Token[V]]( class MapRowWriter(override val columnNames: Seq[String]) extends RowWriter[JMap[JString, AnyRef]] { override def readColumnValues(data: JMap[JString, AnyRef], buffer: Array[Any]): Unit = columnNames.zipWithIndex.foreach { case (columnName, index) => - buffer(index) = data.getOrElse(columnName, + buffer(index) = data.asScala.getOrElse(columnName, throw new IllegalArgumentException(s"""Couldn't find $columnName in $data, unable to generate token""")) } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala index 45cfa3e0d..8a654acbc 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala @@ -74,5 +74,5 @@ class BucketingRangeIndex[R, T](ranges: Seq[R]) /** Finds rangesContaining containing given point in O(1) time. */ def rangesContaining(point: T): IndexedSeq[R] = - table(bucket(point)).filter(bounds.contains(_, point)) + table(bucket(point)).filter(bounds.contains(_, point)).toIndexedSeq } diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala index f7fadeb5a..f8b6d1947 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala @@ -4,7 +4,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier import com.datastax.oss.driver.api.core.metadata.TokenMap import com.datastax.oss.driver.api.core.metadata.token.{TokenRange => DriverTokenRange} -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.language.existentials import scala.reflect.ClassTag import scala.util.Try @@ -37,6 +37,7 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]]( val endToken = tokenFactory.tokenFromString(metadata.format(range.getEnd)) val replicas = metadata .getReplicas(keyspaceName, range) + .asScala .map(node => DriverUtil.toAddress(node) .getOrElse(throw new IllegalStateException(s"Unable to determine Node Broadcast Address of $node"))) @@ -49,8 +50,8 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]]( val ranges = connector.withSessionDo { session => val tokenMap = Option(session.getMetadata.getTokenMap.get) .getOrElse(throw new IllegalStateException("Unable to determine Token Range Metadata")) - for (tr <- tokenMap.getTokenRanges()) yield tokenRange(tr, tokenMap) - } + for (tr <- tokenMap.getTokenRanges().asScala) yield tokenRange(tr, tokenMap) + }.toSeq /** * When we have a single Spark Partition use a single global range. This @@ -59,7 +60,7 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]]( if (splitCount == 1) { Seq(ranges.head.copy[V, T](tokenFactory.minToken, tokenFactory.minToken)) } else { - ranges.toSeq + ranges } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala index bd2cd524c..7f5300844 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala @@ -1,8 +1,8 @@ package com.datastax.spark.connector.rdd.partitioner -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import com.datastax.spark.connector.util.Logging -import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder +import com.datastax.oss.driver.api.core.cql.{Row, SimpleStatementBuilder} import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException import com.datastax.spark.connector.cql.CassandraConnector import com.datastax.spark.connector.rdd.partitioner.dht.{Token, TokenFactory} @@ -41,7 +41,7 @@ class DataSizeEstimates[V, T <: Token[V]]( "FROM system.size_estimates " + "WHERE keyspace_name = ? AND table_name = ?").addPositionalValues(keyspaceName, tableName).build()) - for (row <- rs.all()) yield TokenRangeSizeEstimate( + for (row: Row <- rs.all().asScala) yield TokenRangeSizeEstimate( rangeStart = tokenFactory.tokenFromString(row.getString("range_start")), rangeEnd = tokenFactory.tokenFromString(row.getString("range_end")), partitionsCount = row.getLong("partitions_count"), @@ -53,7 +53,7 @@ class DataSizeEstimates[V, T <: Token[V]]( // when we insert a few rows and immediately query them. However, for tiny data sets the lack // of size estimates is not a problem at all, because we don't want to split tiny data anyways. // Therefore, we're not issuing a warning if the result set was empty. - } + }.toSeq catch { case e: InvalidQueryException => logError( @@ -103,7 +103,7 @@ object DataSizeEstimates { def hasSizeEstimates: Boolean = { session.execute( s"SELECT * FROM system.size_estimates " + - s"WHERE keyspace_name = '$keyspaceName' AND table_name = '$tableName'").all().nonEmpty + s"WHERE keyspace_name = '$keyspaceName' AND table_name = '$tableName'").all().asScala.nonEmpty } val startTime = System.currentTimeMillis() diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala index e068943af..c4d44c7df 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala @@ -2,7 +2,7 @@ package com.datastax.spark.connector.rdd.partitioner import java.net.InetAddress -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import com.datastax.spark.connector.cql.CassandraConnector import com.datastax.spark.connector.util.DriverUtil.{toName, toOption} @@ -28,7 +28,7 @@ class NodeAddresses(conn: CassandraConnector) extends Serializable { // TODO: fetch information about the local node from system.local, when CASSANDRA-9436 is done val rs = session.execute(s"SELECT $nativeTransportAddressColumnName, $listenAddressColumnName FROM $table") for { - row <- rs.all() + row <- rs.all().asScala nativeTransportAddress <- Option(row.getInetAddress(nativeTransportAddressColumnName)) listenAddress = row.getInetAddress(listenAddressColumnName) } yield (nativeTransportAddress, listenAddress) diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala index 10eb90955..17da16395 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala @@ -9,7 +9,7 @@ import com.datastax.spark.connector.util._ import com.datastax.spark.connector.writer.RowWriterFactory import org.apache.spark.{Partition, Partitioner} -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -74,6 +74,7 @@ implicit val tokenHash = Math.abs(token.hashCode()) val replicas = tokenMap .getReplicas(_keyspace, token) + .asScala .map(n => DriverUtil.toAddress(n).get.getAddress) val replicaSetInDC = (hostSet & replicas).toVector diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala index 280f723f3..6324c3796 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala @@ -1,11 +1,12 @@ package com.datastax.spark.connector.rdd.partitioner import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool - +import java.util.concurrent.ForkJoinPool import com.datastax.spark.connector.rdd.partitioner.TokenRangeSplitter.WholeRing import com.datastax.spark.connector.rdd.partitioner.dht.{Token, TokenRange} +import scala.collection.parallel.CollectionConverters.IterableIsParallelizable + /** Splits a token ranges into smaller sub-ranges, * each with the desired approximate number of rows. */ diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala b/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala index 6f6725b68..b2cf5f400 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala @@ -4,7 +4,8 @@ import java.lang.{Iterable => JIterable} import java.util.{Collection => JCollection} import java.util.{Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.Iterable +import scala.jdk.CollectionConverters._ import scala.reflect._ import scala.reflect.api.{Mirror, TypeCreator, _} import scala.reflect.runtime.universe._ @@ -55,11 +56,11 @@ object JavaApiHelper { def toScalaFunction1[T1, R](f: JFunction[T1, R]): T1 => R = f.call def valuesAsJavaIterable[K, V, IV <: Iterable[V]]: ((K, IV)) => (K, JIterable[V]) = { - case (k, iterable) => (k, asJavaIterable(iterable)) + case (k, iterable) => (k, iterable.asJava) } def valuesAsJavaCollection[K, V, IV <: Iterable[V]]: ((K, IV)) => (K, JCollection[V]) = { - case (k, iterable) => (k, asJavaCollection(iterable)) + case (k, iterable) => (k, iterable.toList.asJava) } /** Returns a runtime class of a given `TypeTag`. */ @@ -71,7 +72,7 @@ object JavaApiHelper { classTag.runtimeClass.asInstanceOf[Class[T]] /** Converts a Java `Map` to a Scala immutable `Map`. */ - def toScalaMap[K, V](map: JMap[K, V]): Map[K, V] = Map(map.toSeq: _*) + def toScalaMap[K, V](map: JMap[K, V]): Map[K, V] = Map(map.asScala.toSeq: _*) /** Converts an array to a Scala `Seq`. */ def toScalaSeq[T](array: Array[T]): Seq[T] = array @@ -80,7 +81,7 @@ object JavaApiHelper { def toScalaImmutableSeq[T](array: Array[T]): scala.collection.immutable.Seq[T] = array.toIndexedSeq /** Converts a Java `Iterable` to Scala `Seq`. */ - def toScalaSeq[T](iterable: java.lang.Iterable[T]): Seq[T] = iterable.toSeq + def toScalaSeq[T](iterable: java.lang.Iterable[T]): Seq[T] = iterable.asScala.toSeq /** Returns the default `RowWriterFactory` initialized with the given `ColumnMapper`. */ def defaultRowWriterFactory[T](typeTag: TypeTag[T], mapper: ColumnMapper[T]): RowWriterFactory[T] = { diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala index bdee74312..07d2c88fc 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala @@ -63,6 +63,6 @@ extends Iterator[(K, Seq[L], Seq[R])] { val bufferRight = new ArrayBuffer[R] itemsLeft.appendWhile(l => keyExtractLeft(l) == key, bufferLeft) itemsRight.appendWhile(r => keyExtractRight(r) == key, bufferRight) - (key, bufferLeft, bufferRight) + (key, bufferLeft.toSeq, bufferRight.toSeq) } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala index 2690cd274..02debe048 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala @@ -48,7 +48,7 @@ extends Iterator[Seq[Seq[T]]] { items.map (i => { var buffer = new ArrayBuffer[T] i.appendWhile(l => keyExtract(l) == key, buffer) - buffer + buffer.toSeq }) } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala index 2d32cdc3b..8e5103540 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala @@ -26,6 +26,6 @@ class SpanningIterator[K, T](iterator: Iterator[T], f: T => K) extends Iterator[ val key = f(items.head) val buffer = new ArrayBuffer[T] items.appendWhile(r => f(r) == key, buffer) - (key, buffer) + (key, buffer.toSeq) } } diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala index 9efa2fee1..7f2856a0d 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala @@ -15,7 +15,7 @@ private[writer] sealed trait Batch extends Ordered[Batch] { def add(stmt: RichBoundStatementWrapper, force: Boolean = false): Boolean /** Collected statements */ - def statements: Seq[RichBoundStatementWrapper] = buf + def statements: Seq[RichBoundStatementWrapper] = buf.toSeq /** Only for internal use - batches are compared by this value. */ protected[Batch] def size: Int diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala index fa471ffa0..6ffa3ab34 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala @@ -3,7 +3,7 @@ package com.datastax.spark.connector.writer import java.io.{OutputStream, ObjectOutputStream} import java.nio.ByteBuffer -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import com.datastax.spark.connector.util.ByteBufferUtil @@ -14,11 +14,11 @@ object ObjectSizeEstimator { private def makeSerializable(obj: Any): AnyRef = { obj match { case bb: ByteBuffer => ByteBufferUtil.toArray(bb) - case list: java.util.List[_] => list.map(makeSerializable) + case list: java.util.List[_] => list.asScala.map(makeSerializable) case list: List[_] => list.map(makeSerializable) - case set: java.util.Set[_] => set.map(makeSerializable) + case set: java.util.Set[_] => set.asScala.map(makeSerializable) case set: Set[_] => set.map(makeSerializable) - case map: java.util.Map[_, _] => map.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) } + case map: java.util.Map[_, _] => map.asScala.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) } case map: Map[_, _] => map.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) } case other => other.asInstanceOf[AnyRef] } diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala index eaa6c3d1b..33f704bf3 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala @@ -10,7 +10,7 @@ import com.datastax.spark.connector.util.DriverUtil._ import com.datastax.spark.connector.util.{DriverUtil, Logging, tableFromCassandra} import com.datastax.spark.connector.util.PatitionKeyTools._ -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.collection._ /** @@ -43,6 +43,7 @@ class ReplicaLocator[T] private( data.map { row => val hosts = tokenMap .getReplicas(CqlIdentifier.fromInternal(keyspaceName), QueryUtils.getRoutingKeyOrError(boundStmtBuilder.bind(row).stmt)) + .asScala .map(node => DriverUtil.toAddress(node) .getOrElse(throw new IllegalStateException(s"Unable to determine Node Broadcast Address of $node"))) .map(_.getAddress) diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala index 5c4c255d5..35f1abb54 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala @@ -212,7 +212,7 @@ class TableWriter[T] private ( 1024 * 1024) (stmt: RichStatement) => rateLimiter.maybeSleep(stmt.bytesCount) case None => - (stmt: RichStatement) => Unit + (stmt: RichStatement) => () } AsyncStatementWriter(connector, writeConf, tableDef, stmt, batchBuilder, maybeRateLimit) diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala index 83ab25818..6aa07d772 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala @@ -42,7 +42,7 @@ object TTLOption { def constant(ttl: JodaDuration): TTLOption = constant(ttl.getStandardSeconds.toInt) - def constant(ttl: ScalaDuration): TTLOption = if (ttl.isFinite()) constant(ttl.toSeconds.toInt) else forever + def constant(ttl: ScalaDuration): TTLOption = if (ttl.isFinite) constant(ttl.toSeconds.toInt) else forever def perRow(placeholder: String): TTLOption = TTLOption(PerRowWriteOptionValue[Int](placeholder)) diff --git a/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala b/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala index 6ba0a177e..71f68a238 100644 --- a/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala +++ b/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala @@ -10,7 +10,7 @@ import java.util.Properties import java.util.concurrent.{Executors, TimeUnit} import java.util.function.BiConsumer -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import com.codahale.metrics.{Counting, Gauge, Metered, Metric, MetricRegistry, Sampling} import org.apache.spark.metrics.sink.Sink import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} @@ -55,7 +55,7 @@ class CassandraSink(val properties: Properties, val registry: MetricRegistry, se connector.withSessionDo { session => val stmt = session.prepare(writer.insertStatement) - for ((MetricName(appId, componentId, metricId), metric) <- registry.getMetrics.iterator) { + for ((MetricName(appId, componentId, metricId), metric) <- registry.getMetrics.asScala.iterator) { val bndStmt = stmt.bind(writer.build(componentId, metricId, metric): _*) session.executeAsync(bndStmt).whenComplete(warnOnError) } diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala index f6769890a..690aa34a8 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala @@ -49,14 +49,14 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps]( private val eqPredicatesByName = eqPredicates .groupBy(Predicates.columnName) - .mapValues(_.take(1)) // don't push down more than one EQ predicate for the same column + .view.mapValues(_.take(1)).toMap // don't push down more than one EQ predicate for the same column .withDefaultValue(Set.empty) private val inPredicates = singleColumnPredicates.filter(Predicates.isInPredicate) private val inPredicatesByName = inPredicates .groupBy(Predicates.columnName) - .mapValues(_.take(1)) // don't push down more than one IN predicate for the same column + .view.mapValues(_.take(1)).toMap // don't push down more than one IN predicate for the same column .withDefaultValue(Set.empty) private val rangePredicates = singleColumnPredicates.filter(Predicates.isRangePredicate) diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala index 5710c3057..5a0b48e95 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala @@ -12,6 +12,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.cassandra.PredicateOps.FilterOps import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull} +import scala.collection.mutable + /** * A series of pushdown rules that only apply when connecting to Datastax Enterprise @@ -76,7 +78,7 @@ object DsePredicateRules extends CassandraPredicateRules with Logging { val indexColumns = saiIndexes(table).map(_.targetColumn) val pushedEqualityPredicates = predicates.handledByCassandra.collect { case f if FilterOps.isEqualToPredicate(f) => FilterOps.columnName(f) - }.to[collection.mutable.Set] + }.to(mutable.Set) val (handledByCassandra, handledBySpark) = predicates.handledBySpark.partition { filter => lazy val columnName = FilterOps.columnName(filter) diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala index decad71fe..84054954b 100644 --- a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala +++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala @@ -103,7 +103,7 @@ case class CassandraDirectJoinStrategy(spark: SparkSession) extends Strategy wit .get(DirectJoinSizeRatioParam.name, DirectJoinSizeRatioParam.default.toString)) val cassandraSize = BigDecimal(cassandraPlan.stats.sizeInBytes) - val keySize = BigDecimal(keyPlan.stats.sizeInBytes.doubleValue()) + val keySize = BigDecimal(keyPlan.stats.sizeInBytes.doubleValue) logDebug(s"Checking if size ratio is good: $cassandraSize * $ratio > $keySize") diff --git a/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala b/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala index e3d255028..2f81e57f5 100644 --- a/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala +++ b/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala @@ -27,12 +27,12 @@ object SparkRepl { } Main.conf.setAll(conf.getAll) - val interp = new SparkILoop(Some(in), new PrintWriter(out)) + val interp = new SparkILoop(in, new PrintWriter(out)) Main.interp = interp val separator = System.getProperty("path.separator") val settings = new GenericRunnerSettings(s => throw new RuntimeException(s"Scala options error: $s")) settings.processArguments(List("-classpath", paths.mkString(separator)), processAll = true) - interp.process(settings) // Repl starts and goes in loop of R.E.P.L + interp.run(settings) // Repl starts and goes in loop of R.E.P.L Main.interp = null Option(Main.sparkContext).foreach(_.stop()) System.clearProperty("spark.driver.port") diff --git a/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala b/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala index 6667781ee..8632c7c78 100644 --- a/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala +++ b/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala @@ -1,6 +1,7 @@ package org.apache.spark.metrics import scala.collection.parallel.ForkJoinTaskSupport +import scala.collection.parallel.CollectionConverters._ import scala.concurrent.duration._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.metrics.source.Source @@ -56,7 +57,7 @@ class InputMetricsUpdaterSpec extends FlatSpec with Matchers with MockitoSugar { val row = new RowMock(Some(1), Some(2), Some(3), None, Some(4)) val range = (1 to 1000).par - range.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(10)) + range.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool(10)) for (i <- range) updater.updateMetrics(row) updater.finish() tc.taskMetrics().inputMetrics.bytesRead shouldBe 10000L @@ -112,7 +113,7 @@ class InputMetricsUpdaterSpec extends FlatSpec with Matchers with MockitoSugar { ccs.readByteMeter.getCount shouldBe 0 val range = (1 to 1000).par - range.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(10)) + range.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool(10)) for (i <- range) updater.updateMetrics(row) updater.finish() diff --git a/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala b/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala index 935c528c1..5c21f9c65 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala @@ -1,8 +1,7 @@ package com.datastax.spark.connector import java.nio.ByteBuffer - -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import com.datastax.oss.driver.api.core.cql.Row import com.datastax.oss.driver.api.core.`type`.codec.TypeCodec @@ -74,9 +73,9 @@ object GettableData { private[connector] def convert(obj: Any): AnyRef = { obj match { case bb: ByteBuffer => ByteBufferUtil.toArray(bb) - case list: java.util.List[_] => list.view.map(convert).toList - case set: java.util.Set[_] => set.view.map(convert).toSet - case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap + case list: java.util.List[_] => list.asScala.map(convert).toList + case set: java.util.Set[_] => set.asScala.map(convert).toSet + case map: java.util.Map[_, _] => map.asScala.map { case (k, v) => (convert(k), convert(v))}.toMap case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue) case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue) case other => other.asInstanceOf[AnyRef] diff --git a/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala b/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala index b4ac8bba2..96d303512 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala @@ -4,7 +4,7 @@ import com.datastax.oss.driver.api.core.data.{UdtValue => DriverUDTValue} import com.datastax.spark.connector.types.NullableTypeConverter import com.datastax.spark.connector.util.DriverUtil.toName -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ final case class UDTValue(metaData: CassandraRowMetadata, columnValues: IndexedSeq[AnyRef]) @@ -25,7 +25,7 @@ final case class UDTValue(metaData: CassandraRowMetadata, columnValues: IndexedS object UDTValue { def fromJavaDriverUDTValue(value: DriverUDTValue): UDTValue = { - val fields = value.getType.getFieldNames.map(f => toName(f)).toIndexedSeq + val fields = value.getType.getFieldNames.asScala.map(f => toName(f)).toIndexedSeq val values = fields.map(GettableData.get(value, _)) UDTValue(fields, values) } diff --git a/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 82e36d70a..3cb5e987f 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -308,7 +308,7 @@ object Schema extends Logging { private def fetchPartitionKey(table: RelationMetadata): Seq[ColumnDef] = { for (column <- table.getPartitionKey.asScala) yield ColumnDef(column, PartitionKeyColumn) - } + }.toSeq private def fetchClusteringColumns(table: RelationMetadata): Seq[ColumnDef] = { for ((column, index) <- table.getClusteringColumns.asScala.toSeq.zipWithIndex) yield { diff --git a/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala b/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala index 33f42c801..30bfb11a4 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala @@ -175,7 +175,7 @@ class GettableDataToMappedTypeConverter[T : TypeTag : ColumnMapper]( for ((s, _) <- columnMap.setters) yield (s, ReflectionUtil.methodParamTypes(targetType, s).head) val setterColumnTypes: Map[String, ColumnType[_]] = - columnMap.setters.mapValues(columnType) + columnMap.setters.view.mapValues(columnType).toMap for (setterName <- setterParamTypes.keys) yield { val ct = setterColumnTypes(setterName) val pt = setterParamTypes(setterName) diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala b/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala index a738c830a..98ff49b5f 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala @@ -55,7 +55,7 @@ object CanBuildFrom { implicit def javaArrayListCanBuildFrom[T] = new CanBuildFrom[T, java.util.ArrayList[T]] { override def apply() = new scala.collection.mutable.Builder[T, java.util.ArrayList[T]]() { val list = new java.util.ArrayList[T]() - override def +=(elem: T) = { list.add(elem); this } + override def addOne(elem: T) = { list.add(elem); this } override def result() = list override def clear() = list.clear() } @@ -67,7 +67,7 @@ object CanBuildFrom { implicit def javaHashSetCanBuildFrom[T] = new CanBuildFrom[T, java.util.HashSet[T]] { override def apply() = new scala.collection.mutable.Builder[T, java.util.HashSet[T]]() { val set = new java.util.HashSet[T]() - override def +=(elem: T) = { set.add(elem); this } + override def addOne(elem: T) = { set.add(elem); this } override def result() = set override def clear() = set.clear() } @@ -79,7 +79,7 @@ object CanBuildFrom { implicit def javaHashMapCanBuildFrom[K, V] = new CanBuildFrom[(K, V), java.util.HashMap[K, V]] { override def apply() = new scala.collection.mutable.Builder[(K, V), java.util.HashMap[K, V]]() { val map = new java.util.HashMap[K, V]() - override def +=(elem: (K, V)) = { map.put(elem._1, elem._2); this } + override def addOne(elem: (K, V)) = { map.put(elem._1, elem._2); this } override def result() = map override def clear() = map.clear() } diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala index 203e9a60e..e051bfa5b 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala @@ -11,7 +11,7 @@ import com.datastax.oss.driver.api.core.`type`.{DataType, DataTypes => DriverDat import com.datastax.spark.connector.util._ -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ /** Serializable representation of column data type. */ diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala index 0a4a0e9f1..234bde04e 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala @@ -12,7 +12,7 @@ import com.datastax.spark.connector.types.TypeAdapters.ValuesSeqAdapter import com.datastax.spark.connector.{ColumnName, TupleValue} import org.apache.commons.lang3.tuple.{Pair, Triple} -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ case class TupleFieldDef(index: Int, columnType: ColumnType[_]) extends FieldDef { @@ -107,7 +107,7 @@ object TupleType { extends TypeConverter[DriverTupleValue] { val fieldTypes = dataType.getComponentTypes - val fieldConverters = fieldTypes.map(ColumnType.converterToCassandra) + val fieldConverters = fieldTypes.asScala.map(ColumnType.converterToCassandra) override def targetTypeTag = typeTag[DriverTupleValue] @@ -120,7 +120,7 @@ object TupleType { if (fieldValue == null) { toSave.setToNull(i) } else { - toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes(i), fieldValue)) + toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes.get(i), fieldValue)) } } toSave @@ -143,9 +143,9 @@ object TupleType { } private def fields(dataType: DriverTupleType): IndexedSeq[TupleFieldDef] = unlazify { - for ((field, index) <- dataType.getComponentTypes.toIndexedSeq.zipWithIndex) yield + for ((field, index) <- dataType.getComponentTypes.asScala.zipWithIndex.toIndexedSeq) yield TupleFieldDef(index, fromDriverType(field)) - } + }.toIndexedSeq def apply(javaTupleType: DriverTupleType): TupleType = { TupleType(fields(javaTupleType): _*) diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 081020d62..270b09768 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -16,7 +16,7 @@ import com.datastax.spark.connector.util.ByteBufferUtil import com.datastax.spark.connector.util.Symbols._ import org.apache.commons.lang3.tuple -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.collection.immutable.{TreeMap, TreeSet} import scala.reflect.runtime.universe._ @@ -336,7 +336,7 @@ object TypeConverter { c } def targetTypeTag = GregorianCalendarTypeTag - def convertPF = DateConverter.convertPF.andThen(calendar) + def convertPF = DateConverter.convertPF.andThen(calendar _) } private val TimestampTypeTag = implicitly[TypeTag[Timestamp]] @@ -697,9 +697,9 @@ object TypeConverter { def convertPF = { case null => bf.apply().result() - case x: java.util.List[_] => newCollection(x) - case x: java.util.Set[_] => newCollection(x) - case x: java.util.Map[_, _] => newCollection(x) + case x: java.util.List[_] => newCollection(x.asScala) + case x: java.util.Set[_] => newCollection(x.asScala) + case x: java.util.Map[_, _] => newCollection(x.asScala) case x: Iterable[_] => newCollection(x) } } diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala index ff3fc865d..1e4272aba 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala @@ -10,7 +10,7 @@ import com.datastax.spark.connector.types.ColumnType.fromDriverType import com.datastax.spark.connector.types.TypeAdapters.ValueByNameAdapter import com.datastax.spark.connector.{CassandraRowMetadata, ColumnName, UDTValue} -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe._ /** A Cassandra user defined type field metadata. It consists of a name and an associated column type. @@ -81,8 +81,8 @@ object UserDefinedType { class DriverUDTValueConverter(dataType: DriverUserDefinedType) extends TypeConverter[DriverUDTValue] { - val fieldNames = dataType.getFieldNames.toIndexedSeq - val fieldTypes = dataType.getFieldTypes.toIndexedSeq + val fieldNames = dataType.getFieldNames.asScala.toIndexedSeq + val fieldTypes = dataType.getFieldTypes.asScala.toIndexedSeq val fieldConverters = fieldTypes.map(ColumnType.converterToCassandra) override def targetTypeTag = implicitly[TypeTag[DriverUDTValue]] @@ -113,7 +113,8 @@ object UserDefinedType { } private def fields(dataType: DriverUserDefinedType): IndexedSeq[UDTFieldDef] = unlazify { - for ((fieldName, fieldType) <- dataType.getFieldNames.zip(dataType.getFieldTypes).toIndexedSeq) yield + for ((fieldName, fieldType) <- + dataType.getFieldNames.asScala.zip(dataType.getFieldTypes.asScala).toIndexedSeq) yield UDTFieldDef(fieldName.asInternal(), fromDriverType(fieldType)) } diff --git a/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala b/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala index 2a2c022c8..99d1bf511 100644 --- a/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala +++ b/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala @@ -1,12 +1,11 @@ package com.datastax.spark.connector.util import java.util.Locale - import com.datastax.oss.driver.api.core.metadata.Metadata import com.datastax.spark.connector.util.DriverUtil.toName import org.apache.commons.lang3.StringUtils -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ object NameTools { @@ -45,11 +44,12 @@ object NameTools { val keyspaceScores = clusterMetadata .getKeyspaces .values() - .toSeq + .asScala .map(ks => (toName(ks.getName), StringUtils.getJaroWinklerDistance(toName(ks.getName).toLowerCase(Locale.ROOT), keyspace.toLowerCase(Locale.ROOT)))) val keyspaceSuggestions = keyspaceScores.filter( _._2 > MinJWScore) + .toSeq .sorted .map(_._1) @@ -75,11 +75,11 @@ object NameTools { val keyspaceScores = clusterMetadata .getKeyspaces .values() - .toSeq + .asScala .map(ks => (ks, StringUtils.getJaroWinklerDistance(toName(ks.getName).toLowerCase(Locale.ROOT), keyspace.toLowerCase(Locale.ROOT)))) - val ktScores = for ((ks, ksScore) <- keyspaceScores; (_, t) <- (ks.getTables ++ ks.getViews)) yield { + val ktScores = for ((ks, ksScore) <- keyspaceScores; (_, t) <- (ks.getTables.asScala ++ ks.getViews.asScala)) yield { val tScore = StringUtils.getJaroWinklerDistance(toName(t.getName).toLowerCase(Locale.ROOT), table.toLowerCase(Locale.ROOT)) (toName(ks.getName), toName(t.getName), ksScore, tScore) } @@ -100,10 +100,10 @@ object NameTools { val suggestedTablesUnknownKeyspace = ktScores .collect { case (ks, t, ksScore, tScore) if tScore > MinJWScore => (ks, t)} - if (suggestedTables.nonEmpty) Some(TableSuggestions(suggestedTables)) - else if (suggestedKeyspaces.nonEmpty) Some(KeyspaceSuggestions(suggestedKeyspaces)) - else if (suggestedKeyspaceAndTables.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedKeyspaceAndTables)) - else if (suggestedTablesUnknownKeyspace.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedTablesUnknownKeyspace)) + if (suggestedTables.nonEmpty) Some(TableSuggestions(suggestedTables.toSeq)) + else if (suggestedKeyspaces.nonEmpty) Some(KeyspaceSuggestions(suggestedKeyspaces.toSeq)) + else if (suggestedKeyspaceAndTables.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedKeyspaceAndTables.toSeq)) + else if (suggestedTablesUnknownKeyspace.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedTablesUnknownKeyspace.toSeq)) else None } diff --git a/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala index 03f87b8b6..2a5515f1a 100644 --- a/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala +++ b/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala @@ -271,7 +271,7 @@ class TypeConverterTest { buf.put(array) buf.rewind() assertSame(array, c.convert(array)) - assertEquals(array.deep, c.convert(buf).deep) + assertEquals(array.toList, c.convert(buf).toList) } @Test diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala index 21b967451..7312c6600 100644 --- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala +++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala @@ -104,7 +104,7 @@ object CcmBridge { logger.error( "Non-zero exit code ({}) returned from executing ccm command: {}", retValue, cli) } - outStream.lines + outStream.lines.toSeq } catch { case _: IOException if watchDog.killedProcess() => throw new RuntimeException(s"The command $cli was killed after 10 minutes")