Skip to content

Allow to pass an Executor when converting Futures #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions src/main/scala/scala/compat/java8/FutureConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import scala.language.implicitConversions
import scala.concurrent.java8.FuturesConvertersImpl._
import scala.concurrent.java8.FuturesConvertersImplCompat._
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor }
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService }
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, ForkJoinPool }
import java.util.function.Consumer

/**
Expand Down Expand Up @@ -59,16 +59,38 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
* support a parallelism level of at least two, in which case a new Thread
* is used.
*
* @param f The Scala Future which may eventually supply the completion for
* the returned CompletionStage
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava[T](f: Future[T]): CompletionStage[T] = {
def toJava[T](f: Future[T]): CompletionStage[T] = toJava(f, ForkJoinPool.commonPool())

/**
* Returns a CompletionStage that will be completed with the same value or
* exception as the given Scala Future when that completes. Since the Future is a read-only
* representation, this CompletionStage does not support the
* <code>toCompletableFuture</code> method. The semantics of Scala Future
* demand that all callbacks are invoked asynchronously by default, therefore
* the returned CompletionStage routes all calls to synchronous
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* @param f The Scala Future which may eventually supply the completion for
* the returned CompletionStage
* @param e The Java Executor onto which schedule the callbacks
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava[T](f: Future[T], e: Executor): CompletionStage[T] = {
f match {
case p: P[T @unchecked] => p.wrapped
case _ =>
val cf = new CF[T](f)
val cf = new CF[T](f, e)
implicit val ec = InternalCallbackExecutor
f onComplete cf
cf
Expand Down Expand Up @@ -189,10 +211,30 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
* support a parallelism level of at least two, in which case a new Thread
* is used.
*
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava: CompletionStage[T] = FutureConverters.toJava(__self)

/**
* Returns a CompletionStage that will be completed with the same value or
* exception as the given Scala Future when that completes. Since the Future is a read-only
* representation, this CompletionStage does not support the
* <code>toCompletableFuture</code> method. The semantics of Scala Future
* demand that all callbacks are invoked asynchronously by default, therefore
* the returned CompletionStage routes all calls to synchronous
* transformations to their asynchronous counterparts, i.e.
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
*
* @param e The Java Executor onto which schedule the callbacks
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava(e: Executor): CompletionStage[T] = FutureConverters.toJava(__self, e)
}

implicit def CompletionStageOps[T](cs: CompletionStage[T]): CompletionStageOps[T] = new CompletionStageOps(cs)
Expand Down
30 changes: 16 additions & 14 deletions src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import scala.util.{Failure, Success, Try}

// TODO: make this private[scala] when genjavadoc allows for that.
object FuturesConvertersImpl {
class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) {
class CF[T](val wrapped: Future[T], executor: Executor) extends CompletableFuture[T] with (Try[T] => Unit) {
def this(wrapped: Future[T]) = this(wrapped, ForkJoinPool.commonPool())

override def apply(t: Try[T]): Unit = t match {
case Success(v) => complete(v)
case Failure(e) => completeExceptionally(e)
Expand All @@ -32,29 +34,29 @@ object FuturesConvertersImpl {
/*
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
*/
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn)
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn, executor)

override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn)
override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn, executor)

override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn)
override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn, executor)

override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn)
override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn, executor)

override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn, executor)

override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn)
override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn, executor)

override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn)
override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn, executor)

override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn)
override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn, executor)

override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn, executor)

override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn)
override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn, executor)

override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn)
override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn, executor)

override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn)
override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn, executor)

override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = {
val cf = new CompletableFuture[T]
Expand All @@ -71,7 +73,7 @@ object FuturesConvertersImpl {
if (n ne this) cf.complete(n.asInstanceOf[T])
}
}
})
}, executor)
cf
}

Expand Down