Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARKC-686 Port to Scala 2.13 #1349

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
scala: [2.12.10]
scala: [2.12.11, 2.13.8]
db-version: [6.8.13, 5.1.24, 3.11.10, 4.0-rc2]

steps:
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import sbt.{Compile, moduleFilter, _}
import sbtassembly.AssemblyPlugin.autoImport.assembly

lazy val scala212 = "2.12.11"
lazy val supportedScalaVersions = List(scala212)
lazy val scala213 = "2.13.8"
lazy val supportedScalaVersions = List(scala212, scala213)

// factor out common settings
ThisBuild / scalaVersion := scala212
ThisBuild / scalaVersion := scala213
ThisBuild / scalacOptions ++= Seq("-target:jvm-1.8")

// Publishing Info
Expand Down Expand Up @@ -70,6 +71,7 @@ val annotationProcessor = Seq(
def scalacVersionDependantOptions(scalaBinary: String): Seq[String] = scalaBinary match {
case "2.11" => Seq()
case "2.12" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
case "2.13" => Seq("-no-java-comments") //Scala Bug on inner classes, CassandraJavaUtil,
}

lazy val root = (project in file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.datastax.spark.connector
import com.datastax.spark.connector.ccm.CcmBridge
import com.datastax.spark.connector.cluster.DefaultCluster

import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._
import scala.concurrent.Future
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.cql.CassandraConnector
Expand Down Expand Up @@ -117,9 +117,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}


Expand All @@ -140,9 +140,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with transient fields to Cassandra" in {
Expand All @@ -162,9 +162,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to save beans with inherited fields to Cassandra" in {
Expand All @@ -184,7 +184,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
val rows = results.all()

rows should have size 3
rows.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
rows.asScala.map(row => (row.getString("value"), row.getInt("key"), row.getString("sub_class_field"))).toSet shouldBe Set(
("one", 1, "a"),
("two", 2, "b"),
("three", 3, "c")
Expand All @@ -210,9 +210,9 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu

val rows = results.all()
assert(rows.size() == 3)
assert(rows.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "one" && row.getInt("key") == 1))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "two" && row.getInt("key") == 2))
assert(rows.asScala.exists(row ⇒ row.getString("value") == "three" && row.getInt("key") == 3))
}

it should "allow to read rows as Tuple1" in {
Expand All @@ -222,7 +222,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple1(
1: Integer
)
Expand All @@ -237,7 +237,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple2(
1: Integer,
"2"
Expand All @@ -254,7 +254,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple3(
1: Integer,
"2",
Expand All @@ -273,7 +273,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple4(
1: Integer,
"2",
Expand All @@ -294,7 +294,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple5(
1: Integer,
"2",
Expand All @@ -317,7 +317,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple6(
1: Integer,
"2",
Expand All @@ -342,7 +342,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple7(
1: Integer,
"2",
Expand All @@ -369,7 +369,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple8(
1: Integer,
"2",
Expand Down Expand Up @@ -398,7 +398,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple9(
1: Integer,
"2",
Expand Down Expand Up @@ -429,7 +429,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple10(
1: Integer,
"2",
Expand Down Expand Up @@ -462,7 +462,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple11(
1: Integer,
"2",
Expand Down Expand Up @@ -497,7 +497,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple12(
1: Integer,
"2",
Expand Down Expand Up @@ -534,7 +534,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple13(
1: Integer,
"2",
Expand Down Expand Up @@ -573,7 +573,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple14(
1: Integer,
"2",
Expand Down Expand Up @@ -614,7 +614,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple15(
1: Integer,
"2",
Expand Down Expand Up @@ -657,7 +657,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple16(
1: Integer,
"2",
Expand Down Expand Up @@ -702,7 +702,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple17(
1: Integer,
"2",
Expand Down Expand Up @@ -749,7 +749,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple18(
1: Integer,
"2",
Expand Down Expand Up @@ -798,7 +798,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple19(
1: Integer,
"2",
Expand Down Expand Up @@ -849,7 +849,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple20(
1: Integer,
"2",
Expand Down Expand Up @@ -902,7 +902,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple21(
1: Integer,
"2",
Expand Down Expand Up @@ -957,7 +957,7 @@ class CassandraJavaUtilSpec extends SparkCassandraITFlatSpecBase with DefaultClu
)).select(
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", "c19", "c20", "c21", "c22"
)
.collect().head
.collect().asScala.head
tuple shouldBe Tuple22(
1: Integer,
"2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ trait SparkCassandraITSpecBase
Await.result(Future.sequence(units), Duration.Inf)
}

def awaitAll[T](units: TraversableOnce[Future[T]]): TraversableOnce[T] = {
Await.result(Future.sequence(units), Duration.Inf)
def awaitAll[T](units: IterableOnce[Future[T]]): IterableOnce[T] = {
Await.result(Future.sequence(units.iterator.toList), Duration.Inf)
}

def keyspaceCql(name: String = ks) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class CassandraAuthenticatedConnectorSpec extends SparkCassandraITFlatSpecBase w
"spark.cassandra.auth.password" -> "cassandra",
"keyspace" -> ks, "table" -> "authtest")

personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append")save()
personDF1.write.format("org.apache.spark.sql.cassandra").options(options).mode("append").save()
val personDF2 = spark.read.format("org.apache.spark.sql.cassandra").options(options).load()

personDF2.count should be(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.japi.CassandraJavaUtil._
import org.apache.spark.api.java.function.{Function2, Function => JFunction}

import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

case class SimpleClass(value: Integer)

Expand Down Expand Up @@ -103,15 +103,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanBy(f, classOf[Integer])
.collect()
.asScala
.toMap

results should have size 2
results should contain key 10
results should contain key 20
results(10).size should be(3)
results(10).map(_._2).toSeq should be(Seq(10, 11, 12))
results(20).size should be(3)
results(20).map(_._2).toSeq should be(Seq(20, 21, 22))
results(10).asScala.size should be(3)
results(10).asScala.map(_._2).toSeq should be(Seq(10, 11, 12))
results(20).asScala.size should be(3)
results(20).asScala.map(_._2).toSeq should be(Seq(20, 21, 22))
}

it should "allow to use spanByKey method" in {
Expand All @@ -129,15 +130,16 @@ class CassandraJavaPairRDDSpec extends SparkCassandraITFlatSpecBase with Default
"key")
.spanByKey()
.collect()
.asScala
.toMap

results should have size 2
results should contain key 10
results should contain key 20
results(10).size should be(3)
results(10).toSeq should be(Seq(10, 11, 12))
results(10).asScala.toSeq should be(Seq(10, 11, 12))
results(20).size should be(3)
results(20).toSeq should be(Seq(20, 21, 22))
results(20).asScala.toSeq should be(Seq(20, 21, 22))
}

it should "allow to use of keyByAndApplyPartitioner" in {
Expand Down
Loading