diff --git a/README.md b/README.md index 646e2a7..ef24c79 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ This project can be useful if you have: `flink-protobuf` is released to Maven-central. For SBT, add this snippet to `build.sbt`: ```scala -libraryDependencies += "io.findify" %% "flink-protobuf" % "0.2" +libraryDependencies += "io.findify" %% "flink-protobuf" % "0.3" ``` Then, given that you have a following message format: ```proto @@ -44,6 +44,55 @@ env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()), ti).exec ``` +## ScalaPB oneof + +It is also possible to generate `TypeInformation` for ADTs generated by ScalaPB. Given the message: +```proto +message Foo1 { + required int32 value = 1; +} + +message Bar1 { + required string value = 1; +} + +message SealedOptional { + // optional one! + oneof sealed_value_optional { + Foo1 foo = 1; + Bar1 bar = 2; + } +} +``` +You can generate the following typeinfo: +```scala +implicit val ti = FlinkProtobuf.generateScalaOptionalOneof[SealedOptional,SealedOptionalMessage](SealedOptionalMessage) +val result = env.fromCollection(List(Foo1(1), Foo1(2), Foo1(3))) +``` + +The same can be done for non-optional ADTs: +```proto +message Foo2 { + required int32 value = 1; +} + +message Bar2 { + required string value = 1; +} +message SealedNonOpt { + // non-optional one! + oneof sealed_value { + Foo2 foo = 1; + Bar2 bar = 2; + } +} +``` +with the following typeinfo: +```scala +implicit val ti = FlinkProtobuf.generateScalaOptionalOneof[SealedNonOpt, SealedNonOptMessage](SealedNonOpt) +val result = env.fromCollection(List(Foo2(1), Foo2(2), Foo2(3))) +``` + ## Schema evolution Compared to Flink schema evolution for POJO classes, with `flink-protobuf` you can do much more: @@ -54,6 +103,8 @@ For Scala case classes Flink has no support for schema evolution, so with this p * add, rename, remove fields * change field types +For both Scala and Java messages, you cannot rename the main message class, and + ## Compatibility The library is built over Flink 1.13 for Scala 2.12, but should be binary compatible with older flink versions. diff --git a/build.sbt b/build.sbt index da8b92c..38c2c6b 100644 --- a/build.sbt +++ b/build.sbt @@ -1,19 +1,21 @@ name := "flink-protobuf" -version := "0.2" +version := "0.3" scalaVersion := "2.12.14" -lazy val scalapbVersion = "0.11.3" +resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" + +lazy val scalapbVersion = "0.11.3+12-3a9f2017+20210601-1710-SNAPSHOT" lazy val flinkVersion = "1.13.1" libraryDependencies ++= Seq( - "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf,test,compile", - "com.google.protobuf" % "protobuf-java" % "3.17.1" % "protobuf,test,compile", - "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", - "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", - "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test", - "org.scalatest" %% "scalatest" % "3.2.9" % "test" + "io.findify" %% "scalapb-runtime" % scalapbVersion % "protobuf,test,compile", + "com.google.protobuf" % "protobuf-java" % "3.17.1" % "protobuf,test,compile", + "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", + "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", + "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test", + "org.scalatest" %% "scalatest" % "3.2.9" % "test" ) Test / PB.targets := Seq( diff --git a/src/main/scala/io/findify/flinkpb/Codec.scala b/src/main/scala/io/findify/flinkpb/Codec.scala index 4dce373..1b2ea42 100644 --- a/src/main/scala/io/findify/flinkpb/Codec.scala +++ b/src/main/scala/io/findify/flinkpb/Codec.scala @@ -1,7 +1,7 @@ package io.findify.flinkpb import com.google.protobuf.{GeneratedMessageV3, Parser} -import scalapb.{GeneratedMessage, GeneratedMessageCompanion} +import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper} import java.io.{DataOutputStream, InputStream, OutputStream} @@ -13,6 +13,41 @@ sealed trait Codec[T] { } object Codec { + case class ScalaOptionalOneofCodec[T, M <: GeneratedMessage]( + mapper: TypeMapper[M, Option[T]], + companion: GeneratedMessageCompanion[M], + clazz: Class[T] + ) extends Codec[T] { + override def defaultInstance: T = mapper + .toCustom(companion.defaultInstance) + .getOrElse(throw new IllegalArgumentException("cannot decode empty message")) + override def parseFrom(in: InputStream): T = { + val message = + companion.parseDelimitedFrom(in).getOrElse(throw new IllegalArgumentException("cannot parse message")) + mapper.toCustom(message).getOrElse(throw new IllegalArgumentException("cannot decode empty message")) + } + + override def writeTo(out: OutputStream, value: T): Unit = { + mapper.toBase(Some(value)).writeDelimitedTo(out) + } + } + case class ScalaOneofCodec[T, M <: GeneratedMessage]( + mapper: TypeMapper[M, T], + companion: GeneratedMessageCompanion[M], + clazz: Class[T] + ) extends Codec[T] { + override def defaultInstance: T = mapper.toCustom(companion.defaultInstance) + override def parseFrom(in: InputStream): T = { + val message = + companion.parseDelimitedFrom(in).getOrElse(throw new IllegalArgumentException("cannot parse message")) + mapper.toCustom(message) + } + + override def writeTo(out: OutputStream, value: T): Unit = { + mapper.toBase(value).writeDelimitedTo(out) + } + } + case class ScalaCodec[T <: GeneratedMessage](companion: GeneratedMessageCompanion[T], clazz: Class[T]) extends Codec[T] { override def defaultInstance: T = companion.defaultInstance diff --git a/src/main/scala/io/findify/flinkpb/FlinkProtobuf.scala b/src/main/scala/io/findify/flinkpb/FlinkProtobuf.scala index 902f4a7..b53f3c4 100644 --- a/src/main/scala/io/findify/flinkpb/FlinkProtobuf.scala +++ b/src/main/scala/io/findify/flinkpb/FlinkProtobuf.scala @@ -1,12 +1,27 @@ package io.findify.flinkpb import com.google.protobuf.{GeneratedMessageV3, Parser} -import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec} +import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec, ScalaOneofCodec, ScalaOptionalOneofCodec} import org.apache.flink.api.common.typeinfo.TypeInformation -import scalapb.{GeneratedMessage, GeneratedMessageCompanion} +import scalapb.{GeneratedMessage, GeneratedMessageCompanion, GeneratedSealedOneof, TypeMapper} + import scala.reflect.{ClassTag, classTag} object FlinkProtobuf { + + def generateScalaOptionalOneof[T: ClassTag, M <: GeneratedMessage]( + companion: GeneratedMessageCompanion[M] + )(implicit mapper: TypeMapper[M, Option[T]]): TypeInformation[T] = { + new ProtobufTypeInformation[T]( + ScalaOptionalOneofCodec(mapper, companion, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + ) + } + def generateScalaOneof[T: ClassTag, M <: GeneratedMessage]( + companion: GeneratedMessageCompanion[M] + )(implicit mapper: TypeMapper[M, T]): TypeInformation[T] = { + new ProtobufTypeInformation[T](ScalaOneofCodec(mapper, companion, classTag[T].runtimeClass.asInstanceOf[Class[T]])) + } + def generateScala[T <: GeneratedMessage: ClassTag](companion: GeneratedMessageCompanion[T]): TypeInformation[T] = new ProtobufTypeInformation[T](ScalaCodec(companion, classTag[T].runtimeClass.asInstanceOf[Class[T]])) diff --git a/src/main/scala/io/findify/flinkpb/ProtobufSerializer.scala b/src/main/scala/io/findify/flinkpb/ProtobufSerializer.scala index 455dab4..426357f 100644 --- a/src/main/scala/io/findify/flinkpb/ProtobufSerializer.scala +++ b/src/main/scala/io/findify/flinkpb/ProtobufSerializer.scala @@ -1,13 +1,16 @@ package io.findify.flinkpb import com.google.protobuf.{GeneratedMessageV3, Parser} -import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec} -import io.findify.flinkpb.ProtobufSerializer.{JavaConfigSnapshot, ScalaConfigSnapshot} +import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec, ScalaOneofCodec, ScalaOptionalOneofCodec} +import io.findify.flinkpb.config.{ + JavaConfigSnapshot, + ScalaConfigSnapshot, + ScalaOneofConfigSnapshot, + ScalaOptionalOneofConfigSnapshot +} import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream} import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import org.apache.flink.util.InstantiationUtil -import scalapb.{GeneratedMessage, GeneratedMessageCompanion} case class ProtobufSerializer[T](codec: Codec[T]) extends TypeSerializer[T] { @@ -33,67 +36,10 @@ case class ProtobufSerializer[T](codec: Codec[T]) extends TypeSerializer[T] { serialize(deserialize(source), target) override def snapshotConfiguration(): TypeSerializerSnapshot[T] = codec match { - case s: ScalaCodec[_] => new ScalaConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]] - case j: JavaCodec[_] => new JavaConfigSnapshot(j).asInstanceOf[TypeSerializerSnapshot[T]] - } -} - -object ProtobufSerializer { - - class ScalaConfigSnapshot[T <: GeneratedMessage]() extends TypeSerializerSnapshot[T] { - var codec: ScalaCodec[T] = _ - def this(c: ScalaCodec[T]) = { - this() - codec = c - } - - override def getCurrentVersion: Int = 1 - - override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { - codec = ScalaCodec( - companion = InstantiationUtil - .resolveClassByName[GeneratedMessageCompanion[T]](in, userCodeClassLoader) - .getField("MODULE$") - .get(null) - .asInstanceOf[GeneratedMessageCompanion[T]], - clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader) - ) - } - - override def writeSnapshot(out: DataOutputView): Unit = { - out.writeUTF(codec.companion.getClass.getName) - out.writeUTF(codec.clazz.getName) - } - - override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) - - override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = - TypeSerializerSchemaCompatibility.compatibleAsIs() - } - - class JavaConfigSnapshot[T <: GeneratedMessageV3]() extends TypeSerializerSnapshot[T] { - var codec: JavaCodec[T] = _ - def this(c: JavaCodec[T]) = { - this() - codec = c - } - - override def getCurrentVersion: Int = 1 - - override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { - val clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader) - val constructor = clazz.getDeclaredConstructor() - constructor.setAccessible(true) - codec = JavaCodec(constructor.newInstance(), clazz) - } - - override def writeSnapshot(out: DataOutputView): Unit = { - out.writeUTF(codec.clazz.getName) - } - - override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) - - override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = - TypeSerializerSchemaCompatibility.compatibleAsIs() + case s: ScalaCodec[_] => new ScalaConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]] + case j: JavaCodec[_] => new JavaConfigSnapshot(j).asInstanceOf[TypeSerializerSnapshot[T]] + case s: ScalaOneofCodec[_, _] => new ScalaOneofConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]] + case s: ScalaOptionalOneofCodec[_, _] => + new ScalaOptionalOneofConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]] } } diff --git a/src/main/scala/io/findify/flinkpb/config/JavaConfigSnapshot.scala b/src/main/scala/io/findify/flinkpb/config/JavaConfigSnapshot.scala new file mode 100644 index 0000000..bca0279 --- /dev/null +++ b/src/main/scala/io/findify/flinkpb/config/JavaConfigSnapshot.scala @@ -0,0 +1,34 @@ +package io.findify.flinkpb.config + +import com.google.protobuf.GeneratedMessageV3 +import io.findify.flinkpb.Codec.JavaCodec +import io.findify.flinkpb.ProtobufSerializer +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.util.InstantiationUtil + +class JavaConfigSnapshot[T <: GeneratedMessageV3]() extends TypeSerializerSnapshot[T] { + var codec: JavaCodec[T] = _ + def this(c: JavaCodec[T]) = { + this() + codec = c + } + + override def getCurrentVersion: Int = 1 + + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + val clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader) + val constructor = clazz.getDeclaredConstructor() + constructor.setAccessible(true) + codec = JavaCodec(constructor.newInstance(), clazz) + } + + override def writeSnapshot(out: DataOutputView): Unit = { + out.writeUTF(codec.clazz.getName) + } + + override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) + + override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = + TypeSerializerSchemaCompatibility.compatibleAsIs() +} diff --git a/src/main/scala/io/findify/flinkpb/config/ScalaConfigSnapshot.scala b/src/main/scala/io/findify/flinkpb/config/ScalaConfigSnapshot.scala new file mode 100644 index 0000000..5c2c0f4 --- /dev/null +++ b/src/main/scala/io/findify/flinkpb/config/ScalaConfigSnapshot.scala @@ -0,0 +1,39 @@ +package io.findify.flinkpb.config + +import io.findify.flinkpb.Codec.ScalaCodec +import io.findify.flinkpb.ProtobufSerializer +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.util.InstantiationUtil +import scalapb.{GeneratedMessage, GeneratedMessageCompanion} + +class ScalaConfigSnapshot[T <: GeneratedMessage]() extends TypeSerializerSnapshot[T] { + var codec: ScalaCodec[T] = _ + def this(c: ScalaCodec[T]) = { + this() + codec = c + } + + override def getCurrentVersion: Int = 1 + + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + codec = ScalaCodec( + companion = InstantiationUtil + .resolveClassByName[GeneratedMessageCompanion[T]](in, userCodeClassLoader) + .getField("MODULE$") + .get(null) + .asInstanceOf[GeneratedMessageCompanion[T]], + clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader) + ) + } + + override def writeSnapshot(out: DataOutputView): Unit = { + out.writeUTF(codec.companion.getClass.getName) + out.writeUTF(codec.clazz.getName) + } + + override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) + + override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = + TypeSerializerSchemaCompatibility.compatibleAsIs() +} diff --git a/src/main/scala/io/findify/flinkpb/config/ScalaOneofConfigSnapshot.scala b/src/main/scala/io/findify/flinkpb/config/ScalaOneofConfigSnapshot.scala new file mode 100644 index 0000000..4b706ea --- /dev/null +++ b/src/main/scala/io/findify/flinkpb/config/ScalaOneofConfigSnapshot.scala @@ -0,0 +1,44 @@ +package io.findify.flinkpb.config + +import io.findify.flinkpb.Codec.{ScalaCodec, ScalaOneofCodec} +import io.findify.flinkpb.ProtobufSerializer +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.util.InstantiationUtil +import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper} + +class ScalaOneofConfigSnapshot[T, M <: GeneratedMessage]() extends TypeSerializerSnapshot[T] { + var codec: ScalaOneofCodec[T, M] = _ + def this(c: ScalaOneofCodec[T, M]) = { + this() + codec = c + } + + override def getCurrentVersion: Int = 1 + + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + codec = ScalaOneofCodec[T, M]( + companion = InstantiationUtil + .resolveClassByName[GeneratedMessageCompanion[M]](in, userCodeClassLoader) + .getField("MODULE$") + .get(null) + .asInstanceOf[GeneratedMessageCompanion[M]], + clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader), + mapper = InstantiationUtil + .resolveClassByName[TypeMapper[M, T]](in, userCodeClassLoader) + .getDeclaredConstructor() + .newInstance() + ) + } + + override def writeSnapshot(out: DataOutputView): Unit = { + out.writeUTF(codec.companion.getClass.getName) + out.writeUTF(codec.clazz.getName) + out.writeUTF(codec.mapper.getClass.getName) + } + + override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) + + override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = + TypeSerializerSchemaCompatibility.compatibleAsIs() +} diff --git a/src/main/scala/io/findify/flinkpb/config/ScalaOptionalOneofConfigSnapshot.scala b/src/main/scala/io/findify/flinkpb/config/ScalaOptionalOneofConfigSnapshot.scala new file mode 100644 index 0000000..71b0b8a --- /dev/null +++ b/src/main/scala/io/findify/flinkpb/config/ScalaOptionalOneofConfigSnapshot.scala @@ -0,0 +1,44 @@ +package io.findify.flinkpb.config + +import io.findify.flinkpb.Codec.{ScalaOneofCodec, ScalaOptionalOneofCodec} +import io.findify.flinkpb.ProtobufSerializer +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.util.InstantiationUtil +import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper} + +class ScalaOptionalOneofConfigSnapshot[T, M <: GeneratedMessage]() extends TypeSerializerSnapshot[T] { + var codec: ScalaOptionalOneofCodec[T, M] = _ + def this(c: ScalaOptionalOneofCodec[T, M]) = { + this() + codec = c + } + + override def getCurrentVersion: Int = 1 + + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + codec = ScalaOptionalOneofCodec[T, M]( + companion = InstantiationUtil + .resolveClassByName[GeneratedMessageCompanion[M]](in, userCodeClassLoader) + .getField("MODULE$") + .get(null) + .asInstanceOf[GeneratedMessageCompanion[M]], + clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader), + mapper = InstantiationUtil + .resolveClassByName[TypeMapper[M, Option[T]]](in, userCodeClassLoader) + .getDeclaredConstructor() + .newInstance() + ) + } + + override def writeSnapshot(out: DataOutputView): Unit = { + out.writeUTF(codec.companion.getClass.getName) + out.writeUTF(codec.clazz.getName) + out.writeUTF(codec.mapper.getClass.getName) + } + + override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec) + + override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] = + TypeSerializerSchemaCompatibility.compatibleAsIs() +} diff --git a/src/test/protobuf/tests.proto b/src/test/protobuf/tests.proto index f050267..2b784bd 100644 --- a/src/test/protobuf/tests.proto +++ b/src/test/protobuf/tests.proto @@ -21,6 +21,7 @@ message Root { repeated Nested list = 1; } + message Foo { required int32 value = 1; } @@ -28,10 +29,46 @@ message Foo { message Bar { required string value = 1; } +message Foo1 { + required int32 value = 1; +} + +message Bar1 { + required string value = 1; +} -message Sealed { +message SealedOptional { oneof sealed_value_optional { - Foo foo = 1; - Bar bar = 2; + Foo1 foo = 1; + Bar1 bar = 2; + } +} + +message Foo2 { + required int32 value = 1; +} + +message Bar2 { + required string value = 1; +} +message SealedNonOpt { + oneof sealed_value { + Foo2 foo = 1; + Bar2 bar = 2; + } +} + +message Foo3 { + required int32 value = 1; +} + +message Bar3 { + required string value = 1; +} + +message NonSealedOneof { + oneof whatever { + Foo2 xfoo = 1; + Bar2 xbar = 2; } } \ No newline at end of file diff --git a/src/test/scala/io/findify/flinkpb/FlinkJobTest.scala b/src/test/scala/io/findify/flinkpb/FlinkJobTest.scala index c120807..0fedd08 100644 --- a/src/test/scala/io/findify/flinkpb/FlinkJobTest.scala +++ b/src/test/scala/io/findify/flinkpb/FlinkJobTest.scala @@ -1,6 +1,6 @@ package io.findify.flinkpb -import io.findify.flinkprotobuf.scala.{Bar, Foo, Sealed, SealedMessage} +import io.findify.flinkprotobuf.scala.{Bar, Bar1, Foo, Foo1, SealedOptional, SealedOptionalMessage} import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode} import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration @@ -9,6 +9,7 @@ import org.apache.flink.test.util.MiniClusterWithClientResource import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers + import scala.collection.JavaConverters._ class FlinkJobTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { @@ -50,11 +51,12 @@ class FlinkJobTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { } it should "use protobuf serialization for oneof messages" in { - implicit val ti = FlinkProtobuf.generateScala(SealedMessage) + implicit val ti = + FlinkProtobuf.generateScalaOptionalOneof[SealedOptional, SealedOptionalMessage](SealedOptionalMessage) val result = env - .fromCollection(List[SealedMessage](Foo(1).asMessage, Foo(2).asMessage, Foo(3).asMessage, Bar("a").asMessage)) + .fromCollection(List[SealedOptional](Foo1(1), Foo1(2), Foo1(3), Bar1("a"))) .rebalance .executeAndCollect(10) - result.flatMap(_.toSealed) shouldBe List(Foo(1), Foo(2), Foo(3), Bar("a")) + result shouldBe List(Foo1(1), Foo1(2), Foo1(3), Bar1("a")) } } diff --git a/src/test/scala/io/findify/flinkpb/JavaSerializerTest.scala b/src/test/scala/io/findify/flinkpb/JavaSerializerTest.scala index 201fe9e..ab846d8 100644 --- a/src/test/scala/io/findify/flinkpb/JavaSerializerTest.scala +++ b/src/test/scala/io/findify/flinkpb/JavaSerializerTest.scala @@ -22,8 +22,8 @@ class JavaSerializerTest extends AnyFlatSpec with Matchers with SerializerTest { } it should "write oneof messages" in { - val ser = FlinkProtobuf.generateJava(classOf[Sealed], Sealed.getDefaultInstance) - roundtrip(ser, Sealed.newBuilder().setFoo(Foo.newBuilder().setValue(1).build()).build()) + val ser = FlinkProtobuf.generateJava(classOf[SealedOptional], SealedOptional.getDefaultInstance) + roundtrip(ser, SealedOptional.newBuilder().setFoo(Foo1.newBuilder().setValue(1).build()).build()) serializable(ser) snapshotSerializable(ser) } diff --git a/src/test/scala/io/findify/flinkpb/ScalaSerializerTest.scala b/src/test/scala/io/findify/flinkpb/ScalaSerializerTest.scala index 7c40a0f..e1e7111 100644 --- a/src/test/scala/io/findify/flinkpb/ScalaSerializerTest.scala +++ b/src/test/scala/io/findify/flinkpb/ScalaSerializerTest.scala @@ -1,16 +1,24 @@ package io.findify.flinkpb +import io.findify.flinkprotobuf.scala.NonSealedOneof.Whatever.Xfoo import io.findify.flinkprotobuf.scala.Root.Nested -import io.findify.flinkprotobuf.scala.{Bar, Foo, Root, Sealed, SealedMessage} -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import io.findify.flinkprotobuf.scala.{ + Bar, + Bar1, + Bar2, + Foo, + Foo1, + Foo2, + NonSealedOneof, + Root, + SealedNonOpt, + SealedNonOptMessage, + SealedOptional, + SealedOptionalMessage +} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - class ScalaSerializerTest extends AnyFlatSpec with Matchers with SerializerTest { it should "write simple scala types" in { val ser = FlinkProtobuf.generateScala(Foo) @@ -24,10 +32,33 @@ class ScalaSerializerTest extends AnyFlatSpec with Matchers with SerializerTest serializable(ser) } - it should "write oneof messages" in { - val ser = FlinkProtobuf.generateScala(SealedMessage) - roundtrip[SealedMessage](ser, Foo(1).asMessage) - roundtrip[SealedMessage](ser, Bar("a").asMessage) + it should "write sealed optional oneof messages" in { + val ser = FlinkProtobuf.generateScala(SealedOptionalMessage) + roundtrip[SealedOptionalMessage](ser, Foo1(1).asMessage) + roundtrip[SealedOptionalMessage](ser, Bar1("a").asMessage) + serializable(ser) + snapshotSerializable(ser) + } + + it should "write sealed optional oneof messages via generateScalaOneof" in { + val ser = + FlinkProtobuf.generateScalaOptionalOneof[SealedOptional, SealedOptionalMessage](SealedOptionalMessage) + roundtrip(ser, Foo1(1)) + roundtrip(ser, Bar1("a")) + serializable(ser) + snapshotSerializable(ser) + } + + it should "write sealed nonopt oneof messages" in { + val ser = FlinkProtobuf.generateScalaOneof[SealedNonOpt, SealedNonOptMessage](SealedNonOptMessage) + roundtrip[SealedNonOpt](ser, Bar2("a")) + serializable(ser) + snapshotSerializable(ser) + } + + it should "write nonsealed oneof messages" in { + val ser = FlinkProtobuf.generateScala(NonSealedOneof) + roundtrip[NonSealedOneof](ser, NonSealedOneof.of(Xfoo(Foo2(1)))) serializable(ser) snapshotSerializable(ser) } diff --git a/src/test/scala/io/findify/flinkpb/SerializerTest.scala b/src/test/scala/io/findify/flinkpb/SerializerTest.scala index 93c4a91..8fcd118 100644 --- a/src/test/scala/io/findify/flinkpb/SerializerTest.scala +++ b/src/test/scala/io/findify/flinkpb/SerializerTest.scala @@ -34,7 +34,7 @@ trait SerializerTest { this: Suite with Matchers => this.getClass.getClassLoader ) val restored = conf.restoreSerializer() - restored shouldBe serializer + restored.getClass shouldBe serializer.getClass } }