diff --git a/build.sbt b/build.sbt index e7a560f..51cea4d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import scalapb.compiler.Version.scalapbVersion organization in ThisBuild := "beyondthelines" -version in ThisBuild := "0.0.7" +version in ThisBuild := "0.0.8" bintrayOrganization in ThisBuild := Some(organization.value) bintrayRepository in ThisBuild := "maven" bintrayPackageLabels in ThisBuild := Seq("scala", "protobuf", "grpc", "monix") @@ -15,7 +15,7 @@ lazy val runtime = (project in file("runtime")) name := "GrpcMonixRuntime", libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, - "io.monix" %% "monix" % "2.3.3" + "io.monix" %% "monix" % "3.1.0" ) ) diff --git a/generator/src/main/scala/grpcmonix/generators/GrpcMonixGenerator.scala b/generator/src/main/scala/grpcmonix/generators/GrpcMonixGenerator.scala index 21ff462..79cc4e0 100644 --- a/generator/src/main/scala/grpcmonix/generators/GrpcMonixGenerator.scala +++ b/generator/src/main/scala/grpcmonix/generators/GrpcMonixGenerator.scala @@ -4,7 +4,7 @@ import com.google.protobuf.Descriptors._ import com.google.protobuf.ExtensionRegistry import com.google.protobuf.compiler.PluginProtos.{CodeGeneratorRequest, CodeGeneratorResponse} import scalapb.compiler.FunctionalPrinter.PrinterEndo -import scalapb.compiler.{DescriptorPimps, FunctionalPrinter, GeneratorParams, ProtobufGenerator, StreamType} +import scalapb.compiler.{DescriptorPimps, FunctionalPrinter, GeneratorParams, StreamType} import scalapb.options.compiler.Scalapb import scala.collection.JavaConverters._ diff --git a/runtime/src/main/scala/grpcmonix/GrpcMonix.scala b/runtime/src/main/scala/grpcmonix/GrpcMonix.scala index 353d3eb..81dc93e 100644 --- a/runtime/src/main/scala/grpcmonix/GrpcMonix.scala +++ b/runtime/src/main/scala/grpcmonix/GrpcMonix.scala @@ -2,11 +2,11 @@ package grpcmonix import com.google.common.util.concurrent.ListenableFuture import io.grpc.stub.StreamObserver -import monix.eval.{Callback, Task} +import monix.eval.Task import monix.execution.Ack.{Continue, Stop} -import monix.execution.{Ack, Scheduler} +import monix.execution.{Ack, Callback, Scheduler} import monix.reactive.Observable -import monix.reactive.observables.ObservableLike.{Operator, Transformer} +import monix.reactive.Observable.Operator import monix.reactive.observers.Subscriber import monix.reactive.subjects.PublishSubject import org.reactivestreams.{Subscriber => SubscriberR} @@ -23,7 +23,7 @@ object GrpcMonix { Grpc.guavaFuture2ScalaFuture(future) } - def grpcOperatorToMonixOperator[I,O](grpcOperator: GrpcOperator[I,O]): Operator[I,O] = { + def grpcOperatorToMonixOperator[I, O](grpcOperator: GrpcOperator[I, O]): Operator[I, O] = { outputSubsriber: Subscriber[O] => val outputObserver: StreamObserver[O] = monixSubscriberToGrpcObserver(outputSubsriber) val inputObserver: StreamObserver[I] = grpcOperator(outputObserver) @@ -33,22 +33,29 @@ object GrpcMonix { def monixSubscriberToGrpcObserver[T](subscriber: Subscriber[T]): StreamObserver[T] = new StreamObserver[T] { override def onError(t: Throwable): Unit = subscriber.onError(t) + override def onCompleted(): Unit = subscriber.onComplete() + override def onNext(value: T): Unit = subscriber.onNext(value) } def reactiveSubscriberToGrpcObserver[T](subscriber: SubscriberR[_ >: T]): StreamObserver[T] = new StreamObserver[T] { override def onError(t: Throwable): Unit = subscriber.onError(t) + override def onCompleted(): Unit = subscriber.onComplete() + override def onNext(value: T): Unit = subscriber.onNext(value) } def grpcObserverToMonixSubscriber[T](observer: StreamObserver[T], s: Scheduler): Subscriber[T] = new Subscriber[T] { override implicit def scheduler: Scheduler = s + override def onError(t: Throwable): Unit = observer.onError(t) + override def onComplete(): Unit = observer.onCompleted() + override def onNext(value: T): Future[Ack] = try { observer.onNext(value) @@ -60,10 +67,11 @@ object GrpcMonix { } } - def grpcObserverToMonixCallback[T](observer: StreamObserver[T]): Callback[T] = - new Callback[T] { + def grpcObserverToMonixCallback[A](observer: StreamObserver[A]): Callback[Throwable, A] = + new Callback[Throwable, A] { override def onError(t: Throwable): Unit = observer.onError(t) - override def onSuccess(value: T): Unit = { + + override def onSuccess(value: A): Unit = { observer.onNext(value) observer.onCompleted() } @@ -80,9 +88,20 @@ object GrpcMonix { subject.transform(transformer).subscribe(subscriber) override implicit def scheduler: Scheduler = subscriber.scheduler + override def onError(t: Throwable): Unit = subject.onError(t) + override def onComplete(): Unit = subject.onComplete() + override def onNext(value: I): Future[Ack] = subject.onNext(value) } + type Transformer[A, B] = Observable[A] => Observable[B] + + implicit class ObservableExtension[A](ob: Observable[A]) { + def transform[B](transformer: Transformer[A, B]): Observable[B] = { + transformer(ob) + } + } + }