diff --git a/src/main/scala/scala/compat/java8/FutureConverters.scala b/src/main/scala/scala/compat/java8/FutureConverters.scala
index 7f75ee0..f65a960 100644
--- a/src/main/scala/scala/compat/java8/FutureConverters.scala
+++ b/src/main/scala/scala/compat/java8/FutureConverters.scala
@@ -17,7 +17,7 @@ import scala.language.implicitConversions
import scala.concurrent.java8.FuturesConvertersImpl._
import scala.concurrent.java8.FuturesConvertersImplCompat._
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor }
-import java.util.concurrent.{ CompletionStage, Executor, ExecutorService }
+import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, ForkJoinPool }
import java.util.function.Consumer
/**
@@ -59,16 +59,38 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* thenRun
will internally call thenRunAsync
.
*
+ * Callbacks will run on ForkJoinPool.commonPool(), unless it does not
+ * support a parallelism level of at least two, in which case a new Thread
+ * is used.
+ *
* @param f The Scala Future which may eventually supply the completion for
* the returned CompletionStage
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
- def toJava[T](f: Future[T]): CompletionStage[T] = {
+ def toJava[T](f: Future[T]): CompletionStage[T] = toJava(f, ForkJoinPool.commonPool())
+
+ /**
+ * Returns a CompletionStage that will be completed with the same value or
+ * exception as the given Scala Future when that completes. Since the Future is a read-only
+ * representation, this CompletionStage does not support the
+ * toCompletableFuture
method. The semantics of Scala Future
+ * demand that all callbacks are invoked asynchronously by default, therefore
+ * the returned CompletionStage routes all calls to synchronous
+ * transformations to their asynchronous counterparts, i.e.
+ * thenRun
will internally call thenRunAsync
.
+ *
+ * @param f The Scala Future which may eventually supply the completion for
+ * the returned CompletionStage
+ * @param e The Java Executor onto which schedule the callbacks
+ * @return a CompletionStage that runs all callbacks asynchronously and does
+ * not support the CompletableFuture interface
+ */
+ def toJava[T](f: Future[T], e: Executor): CompletionStage[T] = {
f match {
case p: P[T @unchecked] => p.wrapped
case _ =>
- val cf = new CF[T](f)
+ val cf = new CF[T](f, e)
implicit val ec = InternalCallbackExecutor
f onComplete cf
cf
@@ -189,10 +211,30 @@ object FutureConverters {
* transformations to their asynchronous counterparts, i.e.
* thenRun
will internally call thenRunAsync
.
*
+ * Callbacks will run on ForkJoinPool.commonPool(), unless it does not
+ * support a parallelism level of at least two, in which case a new Thread
+ * is used.
+ *
* @return a CompletionStage that runs all callbacks asynchronously and does
* not support the CompletableFuture interface
*/
def toJava: CompletionStage[T] = FutureConverters.toJava(__self)
+
+ /**
+ * Returns a CompletionStage that will be completed with the same value or
+ * exception as the given Scala Future when that completes. Since the Future is a read-only
+ * representation, this CompletionStage does not support the
+ * toCompletableFuture
method. The semantics of Scala Future
+ * demand that all callbacks are invoked asynchronously by default, therefore
+ * the returned CompletionStage routes all calls to synchronous
+ * transformations to their asynchronous counterparts, i.e.
+ * thenRun
will internally call thenRunAsync
.
+ *
+ * @param e The Java Executor onto which schedule the callbacks
+ * @return a CompletionStage that runs all callbacks asynchronously and does
+ * not support the CompletableFuture interface
+ */
+ def toJava(e: Executor): CompletionStage[T] = FutureConverters.toJava(__self, e)
}
implicit def CompletionStageOps[T](cs: CompletionStage[T]): CompletionStageOps[T] = new CompletionStageOps(cs)
diff --git a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala
index 3099d6e..9e0c3bf 100644
--- a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala
+++ b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala
@@ -23,7 +23,9 @@ import scala.util.{Failure, Success, Try}
// TODO: make this private[scala] when genjavadoc allows for that.
object FuturesConvertersImpl {
- class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) {
+ class CF[T](val wrapped: Future[T], executor: Executor) extends CompletableFuture[T] with (Try[T] => Unit) {
+ def this(wrapped: Future[T]) = this(wrapped, ForkJoinPool.commonPool())
+
override def apply(t: Try[T]): Unit = t match {
case Success(v) => complete(v)
case Failure(e) => completeExceptionally(e)
@@ -32,29 +34,29 @@ object FuturesConvertersImpl {
/*
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
*/
- override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn)
+ override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn, executor)
- override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn)
+ override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn, executor)
- override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn)
+ override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn, executor)
- override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn)
+ override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn, executor)
- override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
+ override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn, executor)
- override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn)
+ override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn, executor)
- override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn)
+ override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn, executor)
- override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn)
+ override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn, executor)
- override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
+ override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn, executor)
- override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn)
+ override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn, executor)
- override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn)
+ override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn, executor)
- override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn)
+ override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn, executor)
override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = {
val cf = new CompletableFuture[T]
@@ -71,7 +73,7 @@ object FuturesConvertersImpl {
if (n ne this) cf.complete(n.asInstanceOf[T])
}
}
- })
+ }, executor)
cf
}