diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 00de87fc1..2c64f20a5 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- scala: [2.12.10]
+ scala: [2.12.11, 2.13.8]
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]
steps:
diff --git a/build.sbt b/build.sbt
index 5b7455036..a645d3b79 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,10 +4,11 @@ import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly
lazy val scala212 = "2.12.11"
-lazy val supportedScalaVersions = List(scala212)
+lazy val scala213 = "2.13.8"
+lazy val supportedScalaVersions = List(scala212, scala213)
// factor out common settings
-ThisBuild / scalaVersion := scala212
+ThisBuild / scalaVersion := scala213
ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8")
// Publishing Info
@@ -70,6 +71,7 @@ val annotationProcessor = Seq(
def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
case "2.11" => Seq()
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
+ case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
}
lazy val root = (project in file("."))
diff --git a/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala
index 0777c3e98..1466426aa 100644
--- a/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala
+++ b/connector/src/it/scala/com/datastax/spark/connector/CassandraJavaUtilSpec.scala
@@ -3,7 +3,7 @@ package com.datastax.spark.connector
import com.datastax.spark.connector.ccm.CcmBridge
import com.datastax.spark.connector.cluster.DefaultCluster
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.concurrent.Future
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.cql.CassandraConnector
@@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()
assert(rows.size() == 3)
- assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}
@@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()
assert(rows.size() == 3)
- assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}
it should "allow to save beans with transient fields to Cassandra" in {
@@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()
assert(rows.size() == 3)
- assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}
it should "allow to save beans with inherited fields to Cassandra" in {
@@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()
rows should have size 3
- rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
+ rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
("one", 1, "a"),
("two", 2, "b"),
("three", 3, "c")
@@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()
assert(rows.size() == 3)
- assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}
it should "allow to read rows as Tuple1" in {
@@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple1(
1: Integer
)
@@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple2(
1: Integer,
"2"
@@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple3(
1: Integer,
"2",
@@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple4(
1: Integer,
"2",
@@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple5(
1: Integer,
"2",
@@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple6(
1: Integer,
"2",
@@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple7(
1: Integer,
"2",
@@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple8(
1: Integer,
"2",
@@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple9(
1: Integer,
"2",
@@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple10(
1: Integer,
"2",
@@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple11(
1: Integer,
"2",
@@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple12(
1: Integer,
"2",
@@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple13(
1: Integer,
"2",
@@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple14(
1: Integer,
"2",
@@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple15(
1: Integer,
"2",
@@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple16(
1: Integer,
"2",
@@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple17(
1: Integer,
"2",
@@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple18(
1: Integer,
"2",
@@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple19(
1: Integer,
"2",
@@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple20(
1: Integer,
"2",
@@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple21(
1: Integer,
"2",
@@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
)
- .collect().head
+ .collect().asScala.head
tuple shouldBe Tuple22(
1: Integer,
"2",
diff --git a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala
index 54fd2381a..06610c432 100644
--- a/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala
+++ b/connector/src/it/scala/com/datastax/spark/connector/SparkCassandraITFlatSpecBase.scala
@@ -200,8 +200,8 @@ trait SparkCassandraITSpecBase
Await.result(Future.sequence(units), Duration.Inf)
}
- def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
- Await.result(Future.sequence(units), Duration.Inf)
+ def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
+ Await.result(Future.sequence(units.iterator.toList), Duration.Inf)
}
def keyspaceCql(name: String = ks) =
diff --git a/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala
index b7561071e..1c382b4da 100644
--- a/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala
+++ b/connector/src/it/scala/com/datastax/spark/connector/cql/CassandraAuthenticatedConnectorSpec.scala
@@ -66,7 +66,7 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase w
"spark.cassandra.auth.password" -> "cassandra",
"keyspace" -> ks, "table" -> "authtest")
- personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append")save()
+ personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append").save()
val personDF2 = spark.read.format("org.apache.spark.sql.cassandra").options(options).load()
personDF2.count should be(4)
diff --git a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala
index a94fc6811..d39cc24f7 100644
--- a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala
+++ b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaPairRDDSpec.scala
@@ -8,8 +8,8 @@ import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.japi.CassandraJavaUtil._
import org.apache.spark.api.java.function.{Function2, Function => JFunction}
-import scala.collection.JavaConversions._
import scala.concurrent.Future
+import scala.jdk.CollectionConverters._
case class SimpleClass(value: Integer)
@@ -103,15 +103,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanBy(f, classOf[Integer])
.collect()
+ .asScala
.toMap
results should have size 2
results should contain key 10
results should contain key 20
- results(10).size should be(3)
- results(10).map(_._2).toSeq should be(Seq(10, 11, 12))
- results(20).size should be(3)
- results(20).map(_._2).toSeq should be(Seq(20, 21, 22))
+ results(10).asScala.size should be(3)
+ results(10).asScala.map(_._2).toSeq should be(Seq(10, 11, 12))
+ results(20).asScala.size should be(3)
+ results(20).asScala.map(_._2).toSeq should be(Seq(20, 21, 22))
}
it should "allow to use spanByKey method" in {
@@ -129,15 +130,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanByKey()
.collect()
+ .asScala
.toMap
results should have size 2
results should contain key 10
results should contain key 20
results(10).size should be(3)
- results(10).toSeq should be(Seq(10, 11, 12))
+ results(10).asScala.toSeq should be(Seq(10, 11, 12))
results(20).size should be(3)
- results(20).toSeq should be(Seq(20, 21, 22))
+ results(20).asScala.toSeq should be(Seq(20, 21, 22))
}
it should "allow to use of keyByAndApplyPartitioner" in {
diff --git a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala
index a0279d057..1145d0154 100644
--- a/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala
+++ b/connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraJavaRDDSpec.scala
@@ -2,7 +2,6 @@ package com.datastax.spark.connector.rdd
import java.io.IOException
import java.net.{InetAddress, InetSocketAddress}
-
import com.datastax.spark.connector._
import com.datastax.spark.connector.cluster.DefaultCluster
import com.datastax.spark.connector.cql.{CassandraConnector, IpBasedContactInfo}
@@ -12,8 +11,10 @@ import com.datastax.spark.connector.types.TypeConverter
import org.apache.commons.lang3.tuple
import org.apache.spark.api.java.function.{Function => JFunction}
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
+import scala.jdk.CollectionConverters.ListHasAsScala
import scala.concurrent.Future
+import scala.jdk.javaapi.CollectionConverters.asScala
class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultCluster {
@@ -91,25 +92,25 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
"CassandraJavaRDD" should "allow to read data as CassandraRows " in {
val rows = javaFunctions(sc).cassandraTable(ks, "test_table").collect()
assert(rows.size == 3)
- assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ row.getString("value") == null && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ row.getString("value") == null && row.getInt("key") == 3))
}
it should "allow to read data as Java beans " in {
val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBean])).collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
- assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
- assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
}
it should "allow to read data as Java beans with inherited fields" in {
val beans = javaFunctions(sc).cassandraTable(ks, "test_table3", mapRowTo(classOf[SampleJavaBeanSubClass])).collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1 && bean.getSubClassField == "a"))
- assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2 && bean.getSubClassField == "b"))
- assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3 && bean.getSubClassField == "c"))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1 && bean.getSubClassField == "a"))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2 && bean.getSubClassField == "b"))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3 && bean.getSubClassField == "c"))
}
it should "allow to read data as Java beans with custom mapping defined by aliases" in {
@@ -118,17 +119,17 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
.select(column("key").as("devil"), column("value").as("cat"))
.collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getCat == "one" && bean.getDevil == 1))
- assert(beans.exists(bean ⇒ bean.getCat == "two" && bean.getDevil == 2))
- assert(beans.exists(bean ⇒ bean.getCat == null && bean.getDevil == 3))
+ assert(beans.asScala.exists(bean ⇒ bean.getCat == "one" && bean.getDevil == 1))
+ assert(beans.asScala.exists(bean ⇒ bean.getCat == "two" && bean.getDevil == 2))
+ assert(beans.asScala.exists(bean ⇒ bean.getCat == null && bean.getDevil == 3))
}
it should "allow to read data as Java beans (with multiple constructors)" in {
val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBeanWithMultipleCtors])).collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
- assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
- assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
}
it should "throw NoSuchMethodException when trying to read data as Java beans (without no-args constructor)" in {
@@ -139,18 +140,18 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
it should "allow to read data as nested Java beans" in {
val beans = javaFunctions(sc).cassandraTable(ks, "test_table", mapRowTo(classOf[SampleWithNestedJavaBean#InnerClass])).collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
- assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
- assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
}
it should "allow to read data as deeply nested Java beans" in {
val beans = javaFunctions(sc).cassandraTable(ks, "test_table",
mapRowTo(classOf[SampleWithDeeplyNestedJavaBean#IntermediateClass#InnerClass])).collect()
assert(beans.size == 3)
- assert(beans.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
- assert(beans.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
- assert(beans.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "one" && bean.getKey == 1))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == "two" && bean.getKey == 2))
+ assert(beans.asScala.exists(bean ⇒ bean.getValue == null && bean.getKey == 3))
}
@@ -158,9 +159,9 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc).cassandraTable(ks, "test_table")
.select("key").collect()
assert(rows.size == 3)
- assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 1))
- assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 2))
- assert(rows.exists(row ⇒ !row.contains("value") && row.getInt("key") == 3))
+ assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 1))
+ assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row ⇒ !row.contains("value") && row.getInt("key") == 3))
}
it should "return selected columns" in {
@@ -174,7 +175,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc).cassandraTable(ks, "test_table")
.where("value = ?", "two").collect()
assert(rows.size === 1)
- assert(rows.exists(row => row.getString("value") == "two" && row.getInt("key") == 2))
+ assert(rows.asScala.exists(row => row.getString("value") == "two" && row.getInt("key") == 2))
}
it should "allow to read rows as an array of a single-column type supported by TypeConverter" in {
@@ -212,7 +213,9 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc)
.cassandraTable(ks, "collections", mapColumnToListOf(classOf[String]))
.select("l")
- .collect().map(_.toList)
+ .collect()
+ .asScala
+ .map(asScala)
rows should have size 2
rows should contain(List("item1", "item2"))
@@ -223,7 +226,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc)
.cassandraTable(ks, "collections", mapColumnToSetOf(classOf[String]))
.select("s")
- .collect().map(_.toSet)
+ .collect().asScala.map(asScala)
rows should have size 2
rows should contain(Set("item1", "item2"))
@@ -234,7 +237,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc)
.cassandraTable(ks, "collections", mapColumnToMapOf(classOf[String], classOf[String]))
.select("m")
- .collect().map(_.toMap)
+ .collect().asScala.map(asScala)
rows should have size 2
rows should contain(Map("key1" → "value1", "key2" → "value2"))
@@ -244,7 +247,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
it should "allow to read rows as an array of multi-column type" in {
val rows = javaFunctions(sc)
.cassandraTable(ks, "test_table", mapRowTo(classOf[SampleJavaBean]))
- .collect().map(x => (x.getKey, x.getValue))
+ .collect().asScala.map(x => (x.getKey, x.getValue))
rows should have size 3
rows should contain((1, "one"))
@@ -256,7 +259,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
val rows = javaFunctions(sc)
.cassandraTable(ks, "test_table2", mapRowTo(classOf[SampleJavaBean],
tuple.Pair.of("key", "some_key"), tuple.Pair.of("value", "some_value")))
- .collect().map(x => (x.getKey, x.getValue))
+ .collect().asScala.map(x => (x.getKey, x.getValue))
rows should have size 3
rows should contain((1, "one"))
@@ -288,7 +291,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
mapToRow(classOf[java.lang.Integer]),
classOf[Integer],
"key")
- .collect().map { case (i, x) ⇒ (i, (x.getKey, x.getValue))}
+ .collect().asScala.map { case (i, x) ⇒ (i, (x.getKey, x.getValue))}
rows should have size 3
rows should contain((1, (1, "one")))
@@ -304,7 +307,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
mapRowTo(classOf[SampleJavaBean]),
mapToRow(classOf[SampleJavaBean]),
classOf[SampleJavaBean])
- .collect().map { case (x, i) ⇒ ((x.getKey, x.getValue), i)}
+ .collect().asScala.map { case (x, i) ⇒ ((x.getKey, x.getValue), i)}
rows should have size 3
rows should contain(((1, "one"), 1))
@@ -319,7 +322,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
mapRowTo(classOf[SampleJavaBean]),
mapToRow(classOf[SampleJavaBean]),
classOf[SampleJavaBean])
- .collect().map { case (x, y) ⇒ ((x.getKey, x.getValue), (y.getKey, y.getValue))}
+ .collect().asScala.map { case (x, y) ⇒ ((x.getKey, x.getValue), (y.getKey, y.getValue))}
rows should have size 3
rows should contain(((1, "one"), (1, "one")))
@@ -368,7 +371,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
.select("key", "name", "addr").collect()
result should have length 1
- val row = result.head
+ val row = result.asScala.head
row.getInt(0) should be(1)
row.getString(1) should be("name")
@@ -385,7 +388,7 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
.select("key", "value").collect()
result should have length 1
- val row = result.head
+ val row = result.asScala.head
row.getInt(0) should be(1)
val tupleValue = row.getTupleValue(1)
@@ -405,15 +408,16 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
.select("key", "group", "value")
.spanBy[Int](f, classOf[Int])
.collect()
+ .asScala
.toMap
results should have size 2
results should contain key 10
results should contain key 20
- results(10).size should be(3)
- results(10).map(_.getInt("group")).toSeq should be(Seq(10, 11, 12))
- results(20).size should be(3)
- results(20).map(_.getInt("group")).toSeq should be(Seq(20, 21, 22))
+ results(10).asScala.size should be(3)
+ results(10).asScala.map(_.getInt("group")).toSeq should be(Seq(10, 11, 12))
+ results(20).asScala.size should be(3)
+ results(20).asScala.map(_.getInt("group")).toSeq should be(Seq(20, 21, 22))
}
it should "allow to set limit" in {
@@ -425,13 +429,13 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase with DefaultClus
it should "allow to set ascending ordering" in {
val rdd = javaFunctions(sc).cassandraTable(ks, "wide_rows").where("key=10").withAscOrder
- val result = rdd.collect()
- result(0).getInt("group") should be(10)
+ val result = rdd.collect().asScala
+ result.head.getInt("group") should be(10)
}
it should "allow to set descending ordering" in {
val rdd = javaFunctions(sc).cassandraTable(ks, "wide_rows").where("key=20").withDescOrder
- val result = rdd.collect()
- result(0).getInt("group") should be(22)
+ val result = rdd.collect().asScala
+ result.head.getInt("group") should be(22)
}
}
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java b/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java
index df2207c61..41e25d620 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java
@@ -7,8 +7,8 @@
import com.datastax.spark.connector.rdd.reader.RowReader;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory;
import scala.Option;
-import scala.collection.IndexedSeq;
-import scala.collection.Seq;
+import scala.collection.immutable.IndexedSeq;
+import scala.collection.immutable.Seq;
public class GenericJavaRowReaderFactory {
public final static RowReaderFactory instance = new RowReaderFactory() {
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java b/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java
index 95f139f2a..6db23e148 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/PairRDDJavaFunctions.java
@@ -4,6 +4,7 @@
import com.datastax.spark.connector.util.JavaApiHelper;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.rdd.RDD;
+import scala.Function1;
import scala.Tuple2;
import scala.collection.Seq;
import scala.reflect.ClassTag;
@@ -32,7 +33,7 @@ public JavaPairRDD> spanByKey(ClassTag keyClassTag) {
ClassTag>> tupleClassTag = classTag(Tuple2.class);
ClassTag> vClassTag = classTag(Collection.class);
RDD>> newRDD = pairRDDFunctions.spanByKey()
- .map(JavaApiHelper.>valuesAsJavaCollection(), tupleClassTag);
+ .map(JavaApiHelper.valuesAsJavaCollection(), tupleClassTag);
return new JavaPairRDD<>(newRDD, keyClassTag, vClassTag);
}
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java b/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java
index 085a94c5c..1efe341b3 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java
@@ -83,7 +83,7 @@ public JavaPairRDD> spanBy(final Function f, ClassTag> iterableClassTag = CassandraJavaUtil.classTag(Iterable.class);
RDD>> newRDD = rddFunctions.spanBy(toScalaFunction1(f))
- .map(JavaApiHelper.>valuesAsJavaIterable(), tupleClassTag);
+ .map(JavaApiHelper.valuesAsJavaIterable(), tupleClassTag);
return new JavaPairRDD<>(newRDD, keyClassTag, iterableClassTag);
}
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java
index 15960e7f0..c80885405 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaPairRDD.java
@@ -57,7 +57,7 @@ public CassandraRDD> rdd() {
@SuppressWarnings("unchecked")
public CassandraJavaPairRDD select(String... columnNames) {
Seq columnRefs = toScalaSeq(toSelectableColumnRefs(columnNames));
- CassandraRDD> newRDD = rdd().select(columnRefs);
+ CassandraRDD> newRDD = rdd().select(columnRefs.toSeq());
return wrap(newRDD);
}
@@ -71,7 +71,7 @@ public CassandraJavaPairRDD select(String... columnNames) {
@SuppressWarnings("unchecked")
public CassandraJavaPairRDD select(ColumnRef... selectionColumns) {
Seq columnRefs = JavaApiHelper.toScalaSeq(selectionColumns);
- CassandraRDD> newRDD = rdd().select(columnRefs);
+ CassandraRDD> newRDD = rdd().select(columnRefs.toSeq());
return wrap(newRDD);
}
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java
index 86acfde4a..dd68466e9 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java
@@ -3,7 +3,7 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
-import scala.collection.Seq;
+import scala.collection.immutable.Seq;
import scala.reflect.ClassTag;
import com.datastax.spark.connector.ColumnRef;
@@ -17,6 +17,7 @@
import static com.datastax.spark.connector.japi.CassandraJavaUtil.toSelectableColumnRefs;
import static com.datastax.spark.connector.util.JavaApiHelper.getClassTag;
+import static com.datastax.spark.connector.util.JavaApiHelper.toScalaImmutableSeq;
import static com.datastax.spark.connector.util.JavaApiHelper.toScalaSeq;
/**
@@ -54,7 +55,7 @@ protected CassandraJavaRDD wrap(CassandraRDD newRDD) {
* was removed by the previous {@code select} call, it is not possible to add it back.
*/
public CassandraJavaRDD select(String... columnNames) {
- Seq columnRefs = toScalaSeq(toSelectableColumnRefs(columnNames));
+ Seq columnRefs = toScalaImmutableSeq(toSelectableColumnRefs(columnNames));
CassandraRDD newRDD = rdd().select(columnRefs);
return wrap(newRDD);
}
@@ -67,7 +68,7 @@ public CassandraJavaRDD select(String... columnNames) {
* was removed by the previous {@code select} call, it is not possible to add it back.
*/
public CassandraJavaRDD select(ColumnRef... columns) {
- Seq columnRefs = JavaApiHelper.toScalaSeq(columns);
+ Seq columnRefs = JavaApiHelper.toScalaImmutableSeq(columns);
CassandraRDD newRDD = rdd().select(columnRefs);
return wrap(newRDD);
}
diff --git a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java
index f00d7cd3d..73345bfc0 100644
--- a/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java
+++ b/connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraTableScanJavaRDD.java
@@ -2,7 +2,7 @@
import com.datastax.spark.connector.writer.RowWriterFactory;
import scala.Tuple2;
-import scala.collection.Seq;
+import scala.collection.immutable.Seq;
import scala.reflect.ClassTag;
import org.apache.spark.rdd.RDD;
@@ -94,7 +94,7 @@ public CassandraJavaPairRDD keyBy(
RowWriterFactory rwf,
ColumnRef... columns) {
- Seq columnRefs = JavaApiHelper.toScalaSeq(columns);
+ Seq columnRefs = JavaApiHelper.toScalaImmutableSeq(columns);
CassandraRDD> resultRDD =
columns.length == 0
? rdd().keyBy(keyClassTag, rrf, rwf)
diff --git a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala
index d798c123f..111a8dfca 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala
@@ -12,7 +12,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{DataType => CatalystType}
import org.apache.spark.sql.types.{BooleanType => SparkSqlBooleanType, DataType => SparkSqlDataType, DateType => SparkSqlDateType, DecimalType => SparkSqlDecimalType, DoubleType => SparkSqlDoubleType, FloatType => SparkSqlFloatType, MapType => SparkSqlMapType, TimestampType => SparkSqlTimestampType, UserDefinedType => SparkSqlUserDefinedType, _}
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable
+import scala.language.postfixOps
import scala.util.Try
object CassandraSourceUtil extends Logging {
@@ -147,7 +149,7 @@ object CassandraSourceUtil extends Logging {
catalystDataType(dataType, nullable = true),
nullable = true)
}
- StructType(structFields)
+ StructType(structFields.asJava)
}
def fromTuple(t: TupleType): StructType = {
@@ -157,7 +159,7 @@ object CassandraSourceUtil extends Logging {
catalystDataType(dataType, nullable = true),
nullable = true)
}
- StructType(structFields)
+ StructType(structFields.asJava)
}
cassandraType match {
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala
index dfae0b836..d445f05f7 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraCoGroupedRDD.scala
@@ -12,7 +12,7 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.{BoundStatement, Row}
import com.datastax.spark.connector.util._
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.annotation.DeveloperApi
@@ -128,6 +128,7 @@ class CassandraCoGroupedRDD[T](
try {
val stmt = session.prepare(cql)
val converters = stmt.getVariableDefinitions
+ .asScala
.map(v => ColumnType.converterToCassandra(v.getType))
.toArray
val convertedValues =
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala
index 862f3f8bd..1cafd522f 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala
@@ -9,7 +9,7 @@ import java.io.IOException
import com.datastax.bdp.util.ScalaJavaUtil.asScalaFuture
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.annotation.DeveloperApi
@@ -116,6 +116,7 @@ class CassandraMergeJoinRDD[L,R](
try {
val stmt = session.prepare(cql)
val converters = stmt.getVariableDefinitions
+ .asScala
.map(v => ColumnType.converterToCassandra(v.getType))
.toArray
val convertedValues =
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala
index 3f02f5058..4590ab8c7 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala
@@ -15,7 +15,7 @@ import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.{Partition, Partitioner, SparkContext, TaskContext}
import java.io.IOException
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.reflect.ClassTag
@@ -186,7 +186,7 @@ class CassandraTableScanRDD[R] private[connector](
val selectedColumnNames = columns.selectFrom(tableDef).map(_.columnName).toSet
val partitionKeyColumnNames = PartitionKeyColumns.selectFrom(tableDef).map(_.columnName).toSet
- if (selectedColumnNames.containsAll(partitionKeyColumnNames)) {
+ if (selectedColumnNames.asJava.containsAll(partitionKeyColumnNames.asJava)) {
val partitioner = partitionGenerator.partitioner[K](columns)
logDebug(
s"""Made partitioner ${partitioner} for $this""".stripMargin)
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala
index d40399698..fadfda971 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/DseGraphUnionedRDD.scala
@@ -10,7 +10,7 @@ import java.lang.{String => JString}
import java.util.{Map => JMap}
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.rdd.{RDD, UnionRDD}
@@ -33,8 +33,8 @@ object DseGraphUnionedRDD {
graphLabels: java.util.List[String])(
implicit
connector: CassandraConnector): DseGraphUnionedRDD[R] = {
- val rddSeq: Seq[RDD[R]] = rdds
- val labelSeq: Seq[String] = graphLabels
+ val rddSeq: Seq[RDD[R]] = rdds.asScala.toSeq
+ val labelSeq: Seq[String] = graphLabels.asScala.toSeq
new DseGraphUnionedRDD(sc, rddSeq, keyspace, labelSeq)
}
}
@@ -188,7 +188,7 @@ class DseGraphPartitioner[V, T <: Token[V]](
*/
override def getPartition(key: Any): Int = key match {
case vertexId: JMap[JString, AnyRef]@unchecked => {
- val label: String = vertexId.getOrElse(
+ val label: String = vertexId.asScala.getOrElse(
LabelAccessor,
throw new IllegalArgumentException(s"Couldn't find $LabelAccessor in key $key"))
.asInstanceOf[String]
@@ -217,7 +217,7 @@ class DseGraphPartitioner[V, T <: Token[V]](
class MapRowWriter(override val columnNames: Seq[String]) extends RowWriter[JMap[JString, AnyRef]] {
override def readColumnValues(data: JMap[JString, AnyRef], buffer: Array[Any]): Unit =
columnNames.zipWithIndex.foreach { case (columnName, index) =>
- buffer(index) = data.getOrElse(columnName,
+ buffer(index) = data.asScala.getOrElse(columnName,
throw new IllegalArgumentException(s"""Couldn't find $columnName in $data, unable to generate token"""))
}
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala
index 45cfa3e0d..8a654acbc 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/BucketingRangeIndex.scala
@@ -74,5 +74,5 @@ class BucketingRangeIndex[R, T](ranges: Seq[R])
/** Finds rangesContaining containing given point in O(1) time. */
def rangesContaining(point: T): IndexedSeq[R] =
- table(bucket(point)).filter(bounds.contains(_, point))
+ table(bucket(point)).filter(bounds.contains(_, point)).toIndexedSeq
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala
index f7fadeb5a..f8b6d1947 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala
@@ -4,7 +4,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier
import com.datastax.oss.driver.api.core.metadata.TokenMap
import com.datastax.oss.driver.api.core.metadata.token.{TokenRange => DriverTokenRange}
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
@@ -37,6 +37,7 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]](
val endToken = tokenFactory.tokenFromString(metadata.format(range.getEnd))
val replicas = metadata
.getReplicas(keyspaceName, range)
+ .asScala
.map(node =>
DriverUtil.toAddress(node)
.getOrElse(throw new IllegalStateException(s"Unable to determine Node Broadcast Address of $node")))
@@ -49,8 +50,8 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]](
val ranges = connector.withSessionDo { session =>
val tokenMap = Option(session.getMetadata.getTokenMap.get)
.getOrElse(throw new IllegalStateException("Unable to determine Token Range Metadata"))
- for (tr <- tokenMap.getTokenRanges()) yield tokenRange(tr, tokenMap)
- }
+ for (tr <- tokenMap.getTokenRanges().asScala) yield tokenRange(tr, tokenMap)
+ }.toSeq
/**
* When we have a single Spark Partition use a single global range. This
@@ -59,7 +60,7 @@ private[connector] class CassandraPartitionGenerator[V, T <: Token[V]](
if (splitCount == 1) {
Seq(ranges.head.copy[V, T](tokenFactory.minToken, tokenFactory.minToken))
} else {
- ranges.toSeq
+ ranges
}
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala
index bd2cd524c..7f5300844 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimates.scala
@@ -1,8 +1,8 @@
package com.datastax.spark.connector.rdd.partitioner
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import com.datastax.spark.connector.util.Logging
-import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder
+import com.datastax.oss.driver.api.core.cql.{Row, SimpleStatementBuilder}
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.rdd.partitioner.dht.{Token, TokenFactory}
@@ -41,7 +41,7 @@ class DataSizeEstimates[V, T <: Token[V]](
"FROM system.size_estimates " +
"WHERE keyspace_name = ? AND table_name = ?").addPositionalValues(keyspaceName, tableName).build())
- for (row <- rs.all()) yield TokenRangeSizeEstimate(
+ for (row: Row <- rs.all().asScala) yield TokenRangeSizeEstimate(
rangeStart = tokenFactory.tokenFromString(row.getString("range_start")),
rangeEnd = tokenFactory.tokenFromString(row.getString("range_end")),
partitionsCount = row.getLong("partitions_count"),
@@ -53,7 +53,7 @@ class DataSizeEstimates[V, T <: Token[V]](
// when we insert a few rows and immediately query them. However, for tiny data sets the lack
// of size estimates is not a problem at all, because we don't want to split tiny data anyways.
// Therefore, we're not issuing a warning if the result set was empty.
- }
+ }.toSeq
catch {
case e: InvalidQueryException =>
logError(
@@ -103,7 +103,7 @@ object DataSizeEstimates {
def hasSizeEstimates: Boolean = {
session.execute(
s"SELECT * FROM system.size_estimates " +
- s"WHERE keyspace_name = '$keyspaceName' AND table_name = '$tableName'").all().nonEmpty
+ s"WHERE keyspace_name = '$keyspaceName' AND table_name = '$tableName'").all().asScala.nonEmpty
}
val startTime = System.currentTimeMillis()
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala
index e068943af..c4d44c7df 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/NodeAddresses.scala
@@ -2,7 +2,7 @@ package com.datastax.spark.connector.rdd.partitioner
import java.net.InetAddress
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.util.DriverUtil.{toName, toOption}
@@ -28,7 +28,7 @@ class NodeAddresses(conn: CassandraConnector) extends Serializable {
// TODO: fetch information about the local node from system.local, when CASSANDRA-9436 is done
val rs = session.execute(s"SELECT $nativeTransportAddressColumnName, $listenAddressColumnName FROM $table")
for {
- row <- rs.all()
+ row <- rs.all().asScala
nativeTransportAddress <- Option(row.getInetAddress(nativeTransportAddressColumnName))
listenAddress = row.getInetAddress(listenAddressColumnName)
} yield (nativeTransportAddress, listenAddress)
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala
index 10eb90955..17da16395 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/ReplicaPartitioner.scala
@@ -9,7 +9,7 @@ import com.datastax.spark.connector.util._
import com.datastax.spark.connector.writer.RowWriterFactory
import org.apache.spark.{Partition, Partitioner}
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -74,6 +74,7 @@ implicit
val tokenHash = Math.abs(token.hashCode())
val replicas = tokenMap
.getReplicas(_keyspace, token)
+ .asScala
.map(n => DriverUtil.toAddress(n).get.getAddress)
val replicaSetInDC = (hostSet & replicas).toVector
diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala
index 280f723f3..6324c3796 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/TokenRangeSplitter.scala
@@ -1,11 +1,12 @@
package com.datastax.spark.connector.rdd.partitioner
import scala.collection.parallel.ForkJoinTaskSupport
-import scala.concurrent.forkjoin.ForkJoinPool
-
+import java.util.concurrent.ForkJoinPool
import com.datastax.spark.connector.rdd.partitioner.TokenRangeSplitter.WholeRing
import com.datastax.spark.connector.rdd.partitioner.dht.{Token, TokenRange}
+import scala.collection.parallel.CollectionConverters.IterableIsParallelizable
+
/** Splits a token ranges into smaller sub-ranges,
* each with the desired approximate number of rows. */
diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala b/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala
index 6f6725b68..b2cf5f400 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/util/JavaApiHelper.scala
@@ -4,7 +4,8 @@ import java.lang.{Iterable => JIterable}
import java.util.{Collection => JCollection}
import java.util.{Map => JMap}
-import scala.collection.JavaConversions._
+import scala.collection.Iterable
+import scala.jdk.CollectionConverters._
import scala.reflect._
import scala.reflect.api.{Mirror, TypeCreator, _}
import scala.reflect.runtime.universe._
@@ -55,11 +56,11 @@ object JavaApiHelper {
def toScalaFunction1[T1, R](f: JFunction[T1, R]): T1 => R = f.call
def valuesAsJavaIterable[K, V, IV <: Iterable[V]]: ((K, IV)) => (K, JIterable[V]) = {
- case (k, iterable) => (k, asJavaIterable(iterable))
+ case (k, iterable) => (k, iterable.asJava)
}
def valuesAsJavaCollection[K, V, IV <: Iterable[V]]: ((K, IV)) => (K, JCollection[V]) = {
- case (k, iterable) => (k, asJavaCollection(iterable))
+ case (k, iterable) => (k, iterable.toList.asJava)
}
/** Returns a runtime class of a given `TypeTag`. */
@@ -71,7 +72,7 @@ object JavaApiHelper {
classTag.runtimeClass.asInstanceOf[Class[T]]
/** Converts a Java `Map` to a Scala immutable `Map`. */
- def toScalaMap[K, V](map: JMap[K, V]): Map[K, V] = Map(map.toSeq: _*)
+ def toScalaMap[K, V](map: JMap[K, V]): Map[K, V] = Map(map.asScala.toSeq: _*)
/** Converts an array to a Scala `Seq`. */
def toScalaSeq[T](array: Array[T]): Seq[T] = array
@@ -80,7 +81,7 @@ object JavaApiHelper {
def toScalaImmutableSeq[T](array: Array[T]): scala.collection.immutable.Seq[T] = array.toIndexedSeq
/** Converts a Java `Iterable` to Scala `Seq`. */
- def toScalaSeq[T](iterable: java.lang.Iterable[T]): Seq[T] = iterable.toSeq
+ def toScalaSeq[T](iterable: java.lang.Iterable[T]): Seq[T] = iterable.asScala.toSeq
/** Returns the default `RowWriterFactory` initialized with the given `ColumnMapper`. */
def defaultRowWriterFactory[T](typeTag: TypeTag[T], mapper: ColumnMapper[T]): RowWriterFactory[T] = {
diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala
index bdee74312..07d2c88fc 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/util/MergeJoinIterator.scala
@@ -63,6 +63,6 @@ extends Iterator[(K, Seq[L], Seq[R])] {
val bufferRight = new ArrayBuffer[R]
itemsLeft.appendWhile(l => keyExtractLeft(l) == key, bufferLeft)
itemsRight.appendWhile(r => keyExtractRight(r) == key, bufferRight)
- (key, bufferLeft, bufferRight)
+ (key, bufferLeft.toSeq, bufferRight.toSeq)
}
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala
index 2690cd274..02debe048 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/util/MultiMergeJoinIterator.scala
@@ -48,7 +48,7 @@ extends Iterator[Seq[Seq[T]]] {
items.map (i => {
var buffer = new ArrayBuffer[T]
i.appendWhile(l => keyExtract(l) == key, buffer)
- buffer
+ buffer.toSeq
})
}
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala
index 2d32cdc3b..8e5103540 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/util/SpanningIterator.scala
@@ -26,6 +26,6 @@ class SpanningIterator[K, T](iterator: Iterator[T], f: T => K) extends Iterator[
val key = f(items.head)
val buffer = new ArrayBuffer[T]
items.appendWhile(r => f(r) == key, buffer)
- (key, buffer)
+ (key, buffer.toSeq)
}
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala
index 9efa2fee1..7f2856a0d 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/writer/Batch.scala
@@ -15,7 +15,7 @@ private[writer] sealed trait Batch extends Ordered[Batch] {
def add(stmt: RichBoundStatementWrapper, force: Boolean = false): Boolean
/** Collected statements */
- def statements: Seq[RichBoundStatementWrapper] = buf
+ def statements: Seq[RichBoundStatementWrapper] = buf.toSeq
/** Only for internal use - batches are compared by this value. */
protected[Batch] def size: Int
diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala
index fa471ffa0..6ffa3ab34 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/writer/ObjectSizeEstimator.scala
@@ -3,7 +3,7 @@ package com.datastax.spark.connector.writer
import java.io.{OutputStream, ObjectOutputStream}
import java.nio.ByteBuffer
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import com.datastax.spark.connector.util.ByteBufferUtil
@@ -14,11 +14,11 @@ object ObjectSizeEstimator {
private def makeSerializable(obj: Any): AnyRef = {
obj match {
case bb: ByteBuffer => ByteBufferUtil.toArray(bb)
- case list: java.util.List[_] => list.map(makeSerializable)
+ case list: java.util.List[_] => list.asScala.map(makeSerializable)
case list: List[_] => list.map(makeSerializable)
- case set: java.util.Set[_] => set.map(makeSerializable)
+ case set: java.util.Set[_] => set.asScala.map(makeSerializable)
case set: Set[_] => set.map(makeSerializable)
- case map: java.util.Map[_, _] => map.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) }
+ case map: java.util.Map[_, _] => map.asScala.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) }
case map: Map[_, _] => map.map { case (k, v) => (makeSerializable(k), makeSerializable(v)) }
case other => other.asInstanceOf[AnyRef]
}
diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala
index eaa6c3d1b..33f704bf3 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/writer/ReplicaLocator.scala
@@ -10,7 +10,7 @@ import com.datastax.spark.connector.util.DriverUtil._
import com.datastax.spark.connector.util.{DriverUtil, Logging, tableFromCassandra}
import com.datastax.spark.connector.util.PatitionKeyTools._
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.collection._
/**
@@ -43,6 +43,7 @@ class ReplicaLocator[T] private(
data.map { row =>
val hosts = tokenMap
.getReplicas(CqlIdentifier.fromInternal(keyspaceName), QueryUtils.getRoutingKeyOrError(boundStmtBuilder.bind(row).stmt))
+ .asScala
.map(node => DriverUtil.toAddress(node)
.getOrElse(throw new IllegalStateException(s"Unable to determine Node Broadcast Address of $node")))
.map(_.getAddress)
diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala
index 5c4c255d5..35f1abb54 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala
@@ -212,7 +212,7 @@ class TableWriter[T] private (
1024 * 1024)
(stmt: RichStatement) => rateLimiter.maybeSleep(stmt.bytesCount)
case None =>
- (stmt: RichStatement) => Unit
+ (stmt: RichStatement) => ()
}
AsyncStatementWriter(connector, writeConf, tableDef, stmt, batchBuilder, maybeRateLimit)
diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala
index 83ab25818..6aa07d772 100644
--- a/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala
+++ b/connector/src/main/scala/com/datastax/spark/connector/writer/WriteOption.scala
@@ -42,7 +42,7 @@ object TTLOption {
def constant(ttl: JodaDuration): TTLOption = constant(ttl.getStandardSeconds.toInt)
- def constant(ttl: ScalaDuration): TTLOption = if (ttl.isFinite()) constant(ttl.toSeconds.toInt) else forever
+ def constant(ttl: ScalaDuration): TTLOption = if (ttl.isFinite) constant(ttl.toSeconds.toInt) else forever
def perRow(placeholder: String): TTLOption = TTLOption(PerRowWriteOptionValue[Int](placeholder))
diff --git a/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala b/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala
index 6ba0a177e..71f68a238 100644
--- a/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala
+++ b/connector/src/main/scala/org/apache/spark/metrics/CassandraSink.scala
@@ -10,7 +10,7 @@ import java.util.Properties
import java.util.concurrent.{Executors, TimeUnit}
import java.util.function.BiConsumer
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import com.codahale.metrics.{Counting, Gauge, Metered, Metric, MetricRegistry, Sampling}
import org.apache.spark.metrics.sink.Sink
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
@@ -55,7 +55,7 @@ class CassandraSink(val properties: Properties, val registry: MetricRegistry, se
connector.withSessionDo { session =>
val stmt = session.prepare(writer.insertStatement)
- for ((MetricName(appId, componentId, metricId), metric) <- registry.getMetrics.iterator) {
+ for ((MetricName(appId, componentId, metricId), metric) <- registry.getMetrics.asScala.iterator) {
val bndStmt = stmt.bind(writer.build(componentId, metricId, metric): _*)
session.executeAsync(bndStmt).whenComplete(warnOnError)
}
diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala
index f6769890a..690aa34a8 100644
--- a/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala
+++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/BasicCassandraPredicatePushDown.scala
@@ -49,14 +49,14 @@ class BasicCassandraPredicatePushDown[Predicate : PredicateOps](
private val eqPredicatesByName =
eqPredicates
.groupBy(Predicates.columnName)
- .mapValues(_.take(1)) // don't push down more than one EQ predicate for the same column
+ .view.mapValues(_.take(1)).toMap // don't push down more than one EQ predicate for the same column
.withDefaultValue(Set.empty)
private val inPredicates = singleColumnPredicates.filter(Predicates.isInPredicate)
private val inPredicatesByName =
inPredicates
.groupBy(Predicates.columnName)
- .mapValues(_.take(1)) // don't push down more than one IN predicate for the same column
+ .view.mapValues(_.take(1)).toMap // don't push down more than one IN predicate for the same column
.withDefaultValue(Set.empty)
private val rangePredicates = singleColumnPredicates.filter(Predicates.isRangePredicate)
diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala
index 5710c3057..5a0b48e95 100644
--- a/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala
+++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/DsePredicateRules.scala
@@ -12,6 +12,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.PredicateOps.FilterOps
import org.apache.spark.sql.sources.{EqualTo, Filter, IsNotNull}
+import scala.collection.mutable
+
/**
* A series of pushdown rules that only apply when connecting to Datastax Enterprise
@@ -76,7 +78,7 @@ object DsePredicateRules extends CassandraPredicateRules with Logging {
val indexColumns = saiIndexes(table).map(_.targetColumn)
val pushedEqualityPredicates = predicates.handledByCassandra.collect {
case f if FilterOps.isEqualToPredicate(f) => FilterOps.columnName(f)
- }.to[collection.mutable.Set]
+ }.to(mutable.Set)
val (handledByCassandra, handledBySpark) = predicates.handledBySpark.partition { filter =>
lazy val columnName = FilterOps.columnName(filter)
diff --git a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala
index decad71fe..84054954b 100644
--- a/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala
+++ b/connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala
@@ -103,7 +103,7 @@ case class CassandraDirectJoinStrategy(spark: SparkSession) extends Strategy wit
.get(DirectJoinSizeRatioParam.name, DirectJoinSizeRatioParam.default.toString))
val cassandraSize = BigDecimal(cassandraPlan.stats.sizeInBytes)
- val keySize = BigDecimal(keyPlan.stats.sizeInBytes.doubleValue())
+ val keySize = BigDecimal(keyPlan.stats.sizeInBytes.doubleValue)
logDebug(s"Checking if size ratio is good: $cassandraSize * $ratio > $keySize")
diff --git a/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala b/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala
index e3d255028..2f81e57f5 100644
--- a/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala
+++ b/connector/src/test/scala/com/datastax/spark/connector/embedded/SparkRepl.scala
@@ -27,12 +27,12 @@ object SparkRepl {
}
Main.conf.setAll(conf.getAll)
- val interp = new SparkILoop(Some(in), new PrintWriter(out))
+ val interp = new SparkILoop(in, new PrintWriter(out))
Main.interp = interp
val separator = System.getProperty("path.separator")
val settings = new GenericRunnerSettings(s => throw new RuntimeException(s"Scala options error: $s"))
settings.processArguments(List("-classpath", paths.mkString(separator)), processAll = true)
- interp.process(settings) // Repl starts and goes in loop of R.E.P.L
+ interp.run(settings) // Repl starts and goes in loop of R.E.P.L
Main.interp = null
Option(Main.sparkContext).foreach(_.stop())
System.clearProperty("spark.driver.port")
diff --git a/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala b/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala
index 6667781ee..8632c7c78 100644
--- a/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala
+++ b/connector/src/test/scala/org/apache/spark/metrics/InputMetricsUpdaterSpec.scala
@@ -1,6 +1,7 @@
package org.apache.spark.metrics
import scala.collection.parallel.ForkJoinTaskSupport
+import scala.collection.parallel.CollectionConverters._
import scala.concurrent.duration._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.metrics.source.Source
@@ -56,7 +57,7 @@ class InputMetricsUpdaterSpec extends FlatSpec with Matchers with MockitoSugar {
val row = new RowMock(Some(1), Some(2), Some(3), None, Some(4))
val range = (1 to 1000).par
- range.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(10))
+ range.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool(10))
for (i <- range) updater.updateMetrics(row)
updater.finish()
tc.taskMetrics().inputMetrics.bytesRead shouldBe 10000L
@@ -112,7 +113,7 @@ class InputMetricsUpdaterSpec extends FlatSpec with Matchers with MockitoSugar {
ccs.readByteMeter.getCount shouldBe 0
val range = (1 to 1000).par
- range.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(10))
+ range.tasksupport = new ForkJoinTaskSupport(new java.util.concurrent.ForkJoinPool(10))
for (i <- range) updater.updateMetrics(row)
updater.finish()
diff --git a/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala b/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala
index 935c528c1..5c21f9c65 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/GettableData.scala
@@ -1,8 +1,7 @@
package com.datastax.spark.connector
import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import com.datastax.oss.driver.api.core.cql.Row
import com.datastax.oss.driver.api.core.`type`.codec.TypeCodec
@@ -74,9 +73,9 @@ object GettableData {
private[connector] def convert(obj: Any): AnyRef = {
obj match {
case bb: ByteBuffer => ByteBufferUtil.toArray(bb)
- case list: java.util.List[_] => list.view.map(convert).toList
- case set: java.util.Set[_] => set.view.map(convert).toSet
- case map: java.util.Map[_, _] => map.view.map { case (k, v) => (convert(k), convert(v))}.toMap
+ case list: java.util.List[_] => list.asScala.map(convert).toList
+ case set: java.util.Set[_] => set.asScala.map(convert).toSet
+ case map: java.util.Map[_, _] => map.asScala.map { case (k, v) => (convert(k), convert(v))}.toMap
case udtValue: DriverUDTValue => UDTValue.fromJavaDriverUDTValue(udtValue)
case tupleValue: DriverTupleValue => TupleValue.fromJavaDriverTupleValue(tupleValue)
case other => other.asInstanceOf[AnyRef]
diff --git a/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala b/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala
index b4ac8bba2..96d303512 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/UDTValue.scala
@@ -4,7 +4,7 @@ import com.datastax.oss.driver.api.core.data.{UdtValue => DriverUDTValue}
import com.datastax.spark.connector.types.NullableTypeConverter
import com.datastax.spark.connector.util.DriverUtil.toName
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._
final case class UDTValue(metaData: CassandraRowMetadata, columnValues: IndexedSeq[AnyRef])
@@ -25,7 +25,7 @@ final case class UDTValue(metaData: CassandraRowMetadata, columnValues: IndexedS
object UDTValue {
def fromJavaDriverUDTValue(value: DriverUDTValue): UDTValue = {
- val fields = value.getType.getFieldNames.map(f => toName(f)).toIndexedSeq
+ val fields = value.getType.getFieldNames.asScala.map(f => toName(f)).toIndexedSeq
val values = fields.map(GettableData.get(value, _))
UDTValue(fields, values)
}
diff --git a/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala
index 82e36d70a..3cb5e987f 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/cql/Schema.scala
@@ -308,7 +308,7 @@ object Schema extends Logging {
private def fetchPartitionKey(table: RelationMetadata): Seq[ColumnDef] = {
for (column <- table.getPartitionKey.asScala) yield ColumnDef(column, PartitionKeyColumn)
- }
+ }.toSeq
private def fetchClusteringColumns(table: RelationMetadata): Seq[ColumnDef] = {
for ((column, index) <- table.getClusteringColumns.asScala.toSeq.zipWithIndex) yield {
diff --git a/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala b/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala
index 33f42c801..30bfb11a4 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/mapper/GettableDataToMappedTypeConverter.scala
@@ -175,7 +175,7 @@ class GettableDataToMappedTypeConverter[T : TypeTag : ColumnMapper](
for ((s, _) <- columnMap.setters)
yield (s, ReflectionUtil.methodParamTypes(targetType, s).head)
val setterColumnTypes: Map[String, ColumnType[_]] =
- columnMap.setters.mapValues(columnType)
+ columnMap.setters.view.mapValues(columnType).toMap
for (setterName <- setterParamTypes.keys) yield {
val ct = setterColumnTypes(setterName)
val pt = setterParamTypes(setterName)
diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala b/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala
index a738c830a..98ff49b5f 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/types/CanBuildFrom.scala
@@ -55,7 +55,7 @@ object CanBuildFrom {
implicit def javaArrayListCanBuildFrom[T] = new CanBuildFrom[T, java.util.ArrayList[T]] {
override def apply() = new scala.collection.mutable.Builder[T, java.util.ArrayList[T]]() {
val list = new java.util.ArrayList[T]()
- override def +=(elem: T) = { list.add(elem); this }
+ override def addOne(elem: T) = { list.add(elem); this }
override def result() = list
override def clear() = list.clear()
}
@@ -67,7 +67,7 @@ object CanBuildFrom {
implicit def javaHashSetCanBuildFrom[T] = new CanBuildFrom[T, java.util.HashSet[T]] {
override def apply() = new scala.collection.mutable.Builder[T, java.util.HashSet[T]]() {
val set = new java.util.HashSet[T]()
- override def +=(elem: T) = { set.add(elem); this }
+ override def addOne(elem: T) = { set.add(elem); this }
override def result() = set
override def clear() = set.clear()
}
@@ -79,7 +79,7 @@ object CanBuildFrom {
implicit def javaHashMapCanBuildFrom[K, V] = new CanBuildFrom[(K, V), java.util.HashMap[K, V]] {
override def apply() = new scala.collection.mutable.Builder[(K, V), java.util.HashMap[K, V]]() {
val map = new java.util.HashMap[K, V]()
- override def +=(elem: (K, V)) = { map.put(elem._1, elem._2); this }
+ override def addOne(elem: (K, V)) = { map.put(elem._1, elem._2); this }
override def result() = map
override def clear() = map.clear()
}
diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala
index 203e9a60e..e051bfa5b 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/types/ColumnType.scala
@@ -11,7 +11,7 @@ import com.datastax.oss.driver.api.core.`type`.{DataType, DataTypes => DriverDat
import com.datastax.spark.connector.util._
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._
/** Serializable representation of column data type. */
diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala
index 0a4a0e9f1..234bde04e 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/types/TupleType.scala
@@ -12,7 +12,7 @@ import com.datastax.spark.connector.types.TypeAdapters.ValuesSeqAdapter
import com.datastax.spark.connector.{ColumnName, TupleValue}
import org.apache.commons.lang3.tuple.{Pair, Triple}
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._
case class TupleFieldDef(index: Int, columnType: ColumnType[_]) extends FieldDef {
@@ -107,7 +107,7 @@ object TupleType {
extends TypeConverter[DriverTupleValue] {
val fieldTypes = dataType.getComponentTypes
- val fieldConverters = fieldTypes.map(ColumnType.converterToCassandra)
+ val fieldConverters = fieldTypes.asScala.map(ColumnType.converterToCassandra)
override def targetTypeTag = typeTag[DriverTupleValue]
@@ -120,7 +120,7 @@ object TupleType {
if (fieldValue == null) {
toSave.setToNull(i)
} else {
- toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes(i), fieldValue))
+ toSave.set(i, fieldValue, CodecRegistry.DEFAULT.codecFor(fieldTypes.get(i), fieldValue))
}
}
toSave
@@ -143,9 +143,9 @@ object TupleType {
}
private def fields(dataType: DriverTupleType): IndexedSeq[TupleFieldDef] = unlazify {
- for ((field, index) <- dataType.getComponentTypes.toIndexedSeq.zipWithIndex) yield
+ for ((field, index) <- dataType.getComponentTypes.asScala.zipWithIndex.toIndexedSeq) yield
TupleFieldDef(index, fromDriverType(field))
- }
+ }.toIndexedSeq
def apply(javaTupleType: DriverTupleType): TupleType = {
TupleType(fields(javaTupleType): _*)
diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala
index 081020d62..270b09768 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala
@@ -16,7 +16,7 @@ import com.datastax.spark.connector.util.ByteBufferUtil
import com.datastax.spark.connector.util.Symbols._
import org.apache.commons.lang3.tuple
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.collection.immutable.{TreeMap, TreeSet}
import scala.reflect.runtime.universe._
@@ -336,7 +336,7 @@ object TypeConverter {
c
}
def targetTypeTag = GregorianCalendarTypeTag
- def convertPF = DateConverter.convertPF.andThen(calendar)
+ def convertPF = DateConverter.convertPF.andThen(calendar _)
}
private val TimestampTypeTag = implicitly[TypeTag[Timestamp]]
@@ -697,9 +697,9 @@ object TypeConverter {
def convertPF = {
case null => bf.apply().result()
- case x: java.util.List[_] => newCollection(x)
- case x: java.util.Set[_] => newCollection(x)
- case x: java.util.Map[_, _] => newCollection(x)
+ case x: java.util.List[_] => newCollection(x.asScala)
+ case x: java.util.Set[_] => newCollection(x.asScala)
+ case x: java.util.Map[_, _] => newCollection(x.asScala)
case x: Iterable[_] => newCollection(x)
}
}
diff --git a/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala b/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala
index ff3fc865d..1e4272aba 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/types/UserDefinedType.scala
@@ -10,7 +10,7 @@ import com.datastax.spark.connector.types.ColumnType.fromDriverType
import com.datastax.spark.connector.types.TypeAdapters.ValueByNameAdapter
import com.datastax.spark.connector.{CassandraRowMetadata, ColumnName, UDTValue}
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._
/** A Cassandra user defined type field metadata. It consists of a name and an associated column type.
@@ -81,8 +81,8 @@ object UserDefinedType {
class DriverUDTValueConverter(dataType: DriverUserDefinedType)
extends TypeConverter[DriverUDTValue] {
- val fieldNames = dataType.getFieldNames.toIndexedSeq
- val fieldTypes = dataType.getFieldTypes.toIndexedSeq
+ val fieldNames = dataType.getFieldNames.asScala.toIndexedSeq
+ val fieldTypes = dataType.getFieldTypes.asScala.toIndexedSeq
val fieldConverters = fieldTypes.map(ColumnType.converterToCassandra)
override def targetTypeTag = implicitly[TypeTag[DriverUDTValue]]
@@ -113,7 +113,8 @@ object UserDefinedType {
}
private def fields(dataType: DriverUserDefinedType): IndexedSeq[UDTFieldDef] = unlazify {
- for ((fieldName, fieldType) <- dataType.getFieldNames.zip(dataType.getFieldTypes).toIndexedSeq) yield
+ for ((fieldName, fieldType) <-
+ dataType.getFieldNames.asScala.zip(dataType.getFieldTypes.asScala).toIndexedSeq) yield
UDTFieldDef(fieldName.asInternal(), fromDriverType(fieldType))
}
diff --git a/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala b/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala
index 2a2c022c8..99d1bf511 100644
--- a/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala
+++ b/driver/src/main/scala/com/datastax/spark/connector/util/NameTools.scala
@@ -1,12 +1,11 @@
package com.datastax.spark.connector.util
import java.util.Locale
-
import com.datastax.oss.driver.api.core.metadata.Metadata
import com.datastax.spark.connector.util.DriverUtil.toName
import org.apache.commons.lang3.StringUtils
-import scala.collection.JavaConversions._
+import scala.jdk.CollectionConverters._
object NameTools {
@@ -45,11 +44,12 @@ object NameTools {
val keyspaceScores = clusterMetadata
.getKeyspaces
.values()
- .toSeq
+ .asScala
.map(ks =>
(toName(ks.getName), StringUtils.getJaroWinklerDistance(toName(ks.getName).toLowerCase(Locale.ROOT), keyspace.toLowerCase(Locale.ROOT))))
val keyspaceSuggestions = keyspaceScores.filter( _._2 > MinJWScore)
+ .toSeq
.sorted
.map(_._1)
@@ -75,11 +75,11 @@ object NameTools {
val keyspaceScores = clusterMetadata
.getKeyspaces
.values()
- .toSeq
+ .asScala
.map(ks =>
(ks, StringUtils.getJaroWinklerDistance(toName(ks.getName).toLowerCase(Locale.ROOT), keyspace.toLowerCase(Locale.ROOT))))
- val ktScores = for ((ks, ksScore) <- keyspaceScores; (_, t) <- (ks.getTables ++ ks.getViews)) yield {
+ val ktScores = for ((ks, ksScore) <- keyspaceScores; (_, t) <- (ks.getTables.asScala ++ ks.getViews.asScala)) yield {
val tScore = StringUtils.getJaroWinklerDistance(toName(t.getName).toLowerCase(Locale.ROOT), table.toLowerCase(Locale.ROOT))
(toName(ks.getName), toName(t.getName), ksScore, tScore)
}
@@ -100,10 +100,10 @@ object NameTools {
val suggestedTablesUnknownKeyspace = ktScores
.collect { case (ks, t, ksScore, tScore) if tScore > MinJWScore => (ks, t)}
- if (suggestedTables.nonEmpty) Some(TableSuggestions(suggestedTables))
- else if (suggestedKeyspaces.nonEmpty) Some(KeyspaceSuggestions(suggestedKeyspaces))
- else if (suggestedKeyspaceAndTables.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedKeyspaceAndTables))
- else if (suggestedTablesUnknownKeyspace.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedTablesUnknownKeyspace))
+ if (suggestedTables.nonEmpty) Some(TableSuggestions(suggestedTables.toSeq))
+ else if (suggestedKeyspaces.nonEmpty) Some(KeyspaceSuggestions(suggestedKeyspaces.toSeq))
+ else if (suggestedKeyspaceAndTables.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedKeyspaceAndTables.toSeq))
+ else if (suggestedTablesUnknownKeyspace.nonEmpty) Some(KeyspaceAndTableSuggestions(suggestedTablesUnknownKeyspace.toSeq))
else None
}
diff --git a/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala b/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala
index 03f87b8b6..2a5515f1a 100644
--- a/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala
+++ b/driver/src/test/scala/com/datastax/spark/connector/types/TypeConverterTest.scala
@@ -271,7 +271,7 @@ class TypeConverterTest {
buf.put(array)
buf.rewind()
assertSame(array, c.convert(array))
- assertEquals(array.deep, c.convert(buf).deep)
+ assertEquals(array.toList, c.convert(buf).toList)
}
@Test
diff --git a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala
index 21b967451..7312c6600 100644
--- a/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala
+++ b/test-support/src/main/scala/com/datastax/spark/connector/ccm/CcmBridge.scala
@@ -104,7 +104,7 @@ object CcmBridge {
logger.error(
"Non-zero exit code ({}) returned from executing ccm command: {}", retValue, cli)
}
- outStream.lines
+ outStream.lines.toSeq
} catch {
case _: IOException if watchDog.killedProcess() =>
throw new RuntimeException(s"The command $cli was killed after 10 minutes")