Skip to content

Commit

Permalink
SPARKC-686 Port to Scala 2.13
Browse files Browse the repository at this point in the history
Major changes:
* Migrate from scala.collection.JavaConversions to
  scala.jdk.CollectionConverters
* Migrate some uses of Seq to immutable.Seq
  • Loading branch information
Marquis Wong authored and Marquis Wong committed Jan 5, 2023
1 parent 9eb9e81 commit 4049f2e
Show file tree
Hide file tree
Showing 51 changed files with 218 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
scala: [2.12.10]
scala: [2.12.11, 2.13.8]
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]

steps:
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly

lazy val scala212 = "2.12.11"
lazy val supportedScalaVersions = List(scala212)
lazy val scala213 = "2.13.8"
lazy val supportedScalaVersions = List(scala212, scala213)

// factor out common settings
ThisBuild / scalaVersion := scala212
ThisBuild / scalaVersion := scala213
ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8")

// Publishing Info
Expand Down Expand Up @@ -70,6 +71,7 @@ val annotationProcessor = Seq(
def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
case "2.11" => Seq()
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
}

lazy val root = (project in file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark.connector
import com.datastax.spark.connector.ccm.CcmBridge
import com.datastax.spark.connector.cluster.DefaultCluster

import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._
import scala.concurrent.Future
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.cql.CassandraConnector
Expand Down Expand Up @@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}


Expand All @@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with transient fields to Cassandra" in {
Expand All @@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with inherited fields to Cassandra" in {
Expand All @@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()

rows should have size 3
rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
("one", 1, "a"),
("two", 2, "b"),
("three", 3, "c")
Expand All @@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to read rows as Tuple1" in {
Expand All @@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple1(
1: Integer
)
Expand All @@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple2(
1: Integer,
"2"
Expand All @@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple3(
1: Integer,
"2",
Expand All @@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple4(
1: Integer,
"2",
Expand All @@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple5(
1: Integer,
"2",
Expand All @@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple6(
1: Integer,
"2",
Expand All @@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple7(
1: Integer,
"2",
Expand All @@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple8(
1: Integer,
"2",
Expand Down Expand Up @@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple9(
1: Integer,
"2",
Expand Down Expand Up @@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple10(
1: Integer,
"2",
Expand Down Expand Up @@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple11(
1: Integer,
"2",
Expand Down Expand Up @@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple12(
1: Integer,
"2",
Expand Down Expand Up @@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple13(
1: Integer,
"2",
Expand Down Expand Up @@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple14(
1: Integer,
"2",
Expand Down Expand Up @@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple15(
1: Integer,
"2",
Expand Down Expand Up @@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple16(
1: Integer,
"2",
Expand Down Expand Up @@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple17(
1: Integer,
"2",
Expand Down Expand Up @@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple18(
1: Integer,
"2",
Expand Down Expand Up @@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple19(
1: Integer,
"2",
Expand Down Expand Up @@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple20(
1: Integer,
"2",
Expand Down Expand Up @@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple21(
1: Integer,
"2",
Expand Down Expand Up @@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple22(
1: Integer,
"2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ trait SparkCassandraITSpecBase
Await.result(Future.sequence(units), Duration.Inf)
}

def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
Await.result(Future.sequence(units), Duration.Inf)
def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
Await.result(Future.sequence(units.iterator.toList), Duration.Inf)
}

def keyspaceCql(name: String = ks) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase w
"spark.cassandra.auth.password" -> "cassandra",
"keyspace" -> ks, "table" -> "authtest")

personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append")save()
personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append").save()
val personDF2 = spark.read.format("org.apache.spark.sql.cassandra").options(options).load()

personDF2.count should be(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.japi.CassandraJavaUtil._
import org.apache.spark.api.java.function.{Function2, Function => JFunction}

import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

case class SimpleClass(value: Integer)

Expand Down Expand Up @@ -103,15 +103,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanBy(f, classOf[Integer])
.collect()
.asScala
.toMap

results should have size 2
results should contain key 10
results should contain key 20
results(10).size should be(3)
results(10).map(_._2).toSeq should be(Seq(10, 11, 12))
results(20).size should be(3)
results(20).map(_._2).toSeq should be(Seq(20, 21, 22))
results(10).asScala.size should be(3)
results(10).asScala.map(_._2).toSeq should be(Seq(10, 11, 12))
results(20).asScala.size should be(3)
results(20).asScala.map(_._2).toSeq should be(Seq(20, 21, 22))
}

it should "allow to use spanByKey method" in {
Expand All @@ -129,15 +130,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanByKey()
.collect()
.asScala
.toMap

results should have size 2
results should contain key 10
results should contain key 20
results(10).size should be(3)
results(10).toSeq should be(Seq(10, 11, 12))
results(10).asScala.toSeq should be(Seq(10, 11, 12))
results(20).size should be(3)
results(20).toSeq should be(Seq(20, 21, 22))
results(20).asScala.toSeq should be(Seq(20, 21, 22))
}

it should "allow to use of keyByAndApplyPartitioner" in {
Expand Down
Loading

0 comments on commit 4049f2e

Please sign in to comment.