Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ This project can be useful if you have:

`flink-protobuf` is released to Maven-central. For SBT, add this snippet to `build.sbt`:
```scala
libraryDependencies += "io.findify" %% "flink-protobuf" % "0.2"
libraryDependencies += "io.findify" %% "flink-protobuf" % "0.3"
```
Then, given that you have a following message format:
```proto
Expand All @@ -44,6 +44,55 @@ env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()), ti).exec

```

## ScalaPB oneof

It is also possible to generate `TypeInformation` for ADTs generated by ScalaPB. Given the message:
```proto
message Foo1 {
required int32 value = 1;
}

message Bar1 {
required string value = 1;
}

message SealedOptional {
// optional one!
oneof sealed_value_optional {
Foo1 foo = 1;
Bar1 bar = 2;
}
}
```
You can generate the following typeinfo:
```scala
implicit val ti = FlinkProtobuf.generateScalaOptionalOneof[SealedOptional,SealedOptionalMessage](SealedOptionalMessage)
val result = env.fromCollection(List(Foo1(1), Foo1(2), Foo1(3)))
```

The same can be done for non-optional ADTs:
```proto
message Foo2 {
required int32 value = 1;
}

message Bar2 {
required string value = 1;
}
message SealedNonOpt {
// non-optional one!
oneof sealed_value {
Foo2 foo = 1;
Bar2 bar = 2;
}
}
```
with the following typeinfo:
```scala
implicit val ti = FlinkProtobuf.generateScalaOptionalOneof[SealedNonOpt, SealedNonOptMessage](SealedNonOpt)
val result = env.fromCollection(List(Foo2(1), Foo2(2), Foo2(3)))
```

## Schema evolution

Compared to Flink schema evolution for POJO classes, with `flink-protobuf` you can do much more:
Expand All @@ -54,6 +103,8 @@ For Scala case classes Flink has no support for schema evolution, so with this p
* add, rename, remove fields
* change field types

For both Scala and Java messages, you cannot rename the main message class, and

## Compatibility

The library is built over Flink 1.13 for Scala 2.12, but should be binary compatible with older flink versions.
Expand Down
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
name := "flink-protobuf"

version := "0.2"
version := "0.3"

scalaVersion := "2.12.14"

lazy val scalapbVersion = "0.11.3"
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

lazy val scalapbVersion = "0.11.3+12-3a9f2017+20210601-1710-SNAPSHOT"
lazy val flinkVersion = "1.13.1"

libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf,test,compile",
"com.google.protobuf" % "protobuf-java" % "3.17.1" % "protobuf,test,compile",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.9" % "test"
"io.findify" %% "scalapb-runtime" % scalapbVersion % "protobuf,test,compile",
"com.google.protobuf" % "protobuf-java" % "3.17.1" % "protobuf,test,compile",
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.scalatest" %% "scalatest" % "3.2.9" % "test"
)

Test / PB.targets := Seq(
Expand Down
37 changes: 36 additions & 1 deletion src/main/scala/io/findify/flinkpb/Codec.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.findify.flinkpb

import com.google.protobuf.{GeneratedMessageV3, Parser}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper}

import java.io.{DataOutputStream, InputStream, OutputStream}

Expand All @@ -13,6 +13,41 @@ sealed trait Codec[T] {
}

object Codec {
case class ScalaOptionalOneofCodec[T, M <: GeneratedMessage](
mapper: TypeMapper[M, Option[T]],
companion: GeneratedMessageCompanion[M],
clazz: Class[T]
) extends Codec[T] {
override def defaultInstance: T = mapper
.toCustom(companion.defaultInstance)
.getOrElse(throw new IllegalArgumentException("cannot decode empty message"))
override def parseFrom(in: InputStream): T = {
val message =
companion.parseDelimitedFrom(in).getOrElse(throw new IllegalArgumentException("cannot parse message"))
mapper.toCustom(message).getOrElse(throw new IllegalArgumentException("cannot decode empty message"))
}

override def writeTo(out: OutputStream, value: T): Unit = {
mapper.toBase(Some(value)).writeDelimitedTo(out)
}
}
case class ScalaOneofCodec[T, M <: GeneratedMessage](
mapper: TypeMapper[M, T],
companion: GeneratedMessageCompanion[M],
clazz: Class[T]
) extends Codec[T] {
override def defaultInstance: T = mapper.toCustom(companion.defaultInstance)
override def parseFrom(in: InputStream): T = {
val message =
companion.parseDelimitedFrom(in).getOrElse(throw new IllegalArgumentException("cannot parse message"))
mapper.toCustom(message)
}

override def writeTo(out: OutputStream, value: T): Unit = {
mapper.toBase(value).writeDelimitedTo(out)
}
}

case class ScalaCodec[T <: GeneratedMessage](companion: GeneratedMessageCompanion[T], clazz: Class[T])
extends Codec[T] {
override def defaultInstance: T = companion.defaultInstance
Expand Down
19 changes: 17 additions & 2 deletions src/main/scala/io/findify/flinkpb/FlinkProtobuf.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
package io.findify.flinkpb

import com.google.protobuf.{GeneratedMessageV3, Parser}
import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec}
import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec, ScalaOneofCodec, ScalaOptionalOneofCodec}
import org.apache.flink.api.common.typeinfo.TypeInformation
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, GeneratedSealedOneof, TypeMapper}

import scala.reflect.{ClassTag, classTag}

object FlinkProtobuf {

def generateScalaOptionalOneof[T: ClassTag, M <: GeneratedMessage](
companion: GeneratedMessageCompanion[M]
)(implicit mapper: TypeMapper[M, Option[T]]): TypeInformation[T] = {
new ProtobufTypeInformation[T](
ScalaOptionalOneofCodec(mapper, companion, classTag[T].runtimeClass.asInstanceOf[Class[T]])
)
}
def generateScalaOneof[T: ClassTag, M <: GeneratedMessage](
companion: GeneratedMessageCompanion[M]
)(implicit mapper: TypeMapper[M, T]): TypeInformation[T] = {
new ProtobufTypeInformation[T](ScalaOneofCodec(mapper, companion, classTag[T].runtimeClass.asInstanceOf[Class[T]]))
}

def generateScala[T <: GeneratedMessage: ClassTag](companion: GeneratedMessageCompanion[T]): TypeInformation[T] =
new ProtobufTypeInformation[T](ScalaCodec(companion, classTag[T].runtimeClass.asInstanceOf[Class[T]]))

Expand Down
78 changes: 12 additions & 66 deletions src/main/scala/io/findify/flinkpb/ProtobufSerializer.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.findify.flinkpb

import com.google.protobuf.{GeneratedMessageV3, Parser}
import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec}
import io.findify.flinkpb.ProtobufSerializer.{JavaConfigSnapshot, ScalaConfigSnapshot}
import io.findify.flinkpb.Codec.{JavaCodec, ScalaCodec, ScalaOneofCodec, ScalaOptionalOneofCodec}
import io.findify.flinkpb.config.{
JavaConfigSnapshot,
ScalaConfigSnapshot,
ScalaOneofConfigSnapshot,
ScalaOptionalOneofConfigSnapshot
}
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

case class ProtobufSerializer[T](codec: Codec[T]) extends TypeSerializer[T] {

Expand All @@ -33,67 +36,10 @@ case class ProtobufSerializer[T](codec: Codec[T]) extends TypeSerializer[T] {
serialize(deserialize(source), target)

override def snapshotConfiguration(): TypeSerializerSnapshot[T] = codec match {
case s: ScalaCodec[_] => new ScalaConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]]
case j: JavaCodec[_] => new JavaConfigSnapshot(j).asInstanceOf[TypeSerializerSnapshot[T]]
}
}

object ProtobufSerializer {

class ScalaConfigSnapshot[T <: GeneratedMessage]() extends TypeSerializerSnapshot[T] {
var codec: ScalaCodec[T] = _
def this(c: ScalaCodec[T]) = {
this()
codec = c
}

override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
codec = ScalaCodec(
companion = InstantiationUtil
.resolveClassByName[GeneratedMessageCompanion[T]](in, userCodeClassLoader)
.getField("MODULE$")
.get(null)
.asInstanceOf[GeneratedMessageCompanion[T]],
clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
)
}

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(codec.companion.getClass.getName)
out.writeUTF(codec.clazz.getName)
}

override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec)

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()
}

class JavaConfigSnapshot[T <: GeneratedMessageV3]() extends TypeSerializerSnapshot[T] {
var codec: JavaCodec[T] = _
def this(c: JavaCodec[T]) = {
this()
codec = c
}

override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
val constructor = clazz.getDeclaredConstructor()
constructor.setAccessible(true)
codec = JavaCodec(constructor.newInstance(), clazz)
}

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(codec.clazz.getName)
}

override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec)

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()
case s: ScalaCodec[_] => new ScalaConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]]
case j: JavaCodec[_] => new JavaConfigSnapshot(j).asInstanceOf[TypeSerializerSnapshot[T]]
case s: ScalaOneofCodec[_, _] => new ScalaOneofConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]]
case s: ScalaOptionalOneofCodec[_, _] =>
new ScalaOptionalOneofConfigSnapshot(s).asInstanceOf[TypeSerializerSnapshot[T]]
}
}
34 changes: 34 additions & 0 deletions src/main/scala/io/findify/flinkpb/config/JavaConfigSnapshot.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.findify.flinkpb.config

import com.google.protobuf.GeneratedMessageV3
import io.findify.flinkpb.Codec.JavaCodec
import io.findify.flinkpb.ProtobufSerializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil

class JavaConfigSnapshot[T <: GeneratedMessageV3]() extends TypeSerializerSnapshot[T] {
var codec: JavaCodec[T] = _
def this(c: JavaCodec[T]) = {
this()
codec = c
}

override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
val constructor = clazz.getDeclaredConstructor()
constructor.setAccessible(true)
codec = JavaCodec(constructor.newInstance(), clazz)
}

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(codec.clazz.getName)
}

override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec)

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()
}
39 changes: 39 additions & 0 deletions src/main/scala/io/findify/flinkpb/config/ScalaConfigSnapshot.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.findify.flinkpb.config

import io.findify.flinkpb.Codec.ScalaCodec
import io.findify.flinkpb.ProtobufSerializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

class ScalaConfigSnapshot[T <: GeneratedMessage]() extends TypeSerializerSnapshot[T] {
var codec: ScalaCodec[T] = _
def this(c: ScalaCodec[T]) = {
this()
codec = c
}

override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
codec = ScalaCodec(
companion = InstantiationUtil
.resolveClassByName[GeneratedMessageCompanion[T]](in, userCodeClassLoader)
.getField("MODULE$")
.get(null)
.asInstanceOf[GeneratedMessageCompanion[T]],
clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
)
}

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(codec.companion.getClass.getName)
out.writeUTF(codec.clazz.getName)
}

override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec)

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.findify.flinkpb.config

import io.findify.flinkpb.Codec.{ScalaCodec, ScalaOneofCodec}
import io.findify.flinkpb.ProtobufSerializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TypeMapper}

class ScalaOneofConfigSnapshot[T, M <: GeneratedMessage]() extends TypeSerializerSnapshot[T] {
var codec: ScalaOneofCodec[T, M] = _
def this(c: ScalaOneofCodec[T, M]) = {
this()
codec = c
}

override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
codec = ScalaOneofCodec[T, M](
companion = InstantiationUtil
.resolveClassByName[GeneratedMessageCompanion[M]](in, userCodeClassLoader)
.getField("MODULE$")
.get(null)
.asInstanceOf[GeneratedMessageCompanion[M]],
clazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader),
mapper = InstantiationUtil
.resolveClassByName[TypeMapper[M, T]](in, userCodeClassLoader)
.getDeclaredConstructor()
.newInstance()
)
}

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(codec.companion.getClass.getName)
out.writeUTF(codec.clazz.getName)
out.writeUTF(codec.mapper.getClass.getName)
}

override def restoreSerializer(): TypeSerializer[T] = new ProtobufSerializer[T](codec)

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()
}
Loading