diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 81c35651a1..3fe6fb9ad1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -534,7 +534,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def concurrently[F2[x] >: F[x], O2]( that: Stream[F2, O2] )(implicit F: Concurrent[F2]): Stream[F2, O] = - concurrentlyAux(that).flatMap { case (startBack, fore) => startBack >> fore } + Stream.eval(F.unit.map(_ => println("DEBUG: Invoked concurrently"))) *> concurrentlyAux(that) + .flatMap { case (startBack, fore) => + startBack >> fore + } private def concurrentlyAux[F2[x] >: F[x], O2]( that: Stream[F2, O2] @@ -545,20 +548,59 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, interrupt <- F.deferred[Unit] backResult <- F.deferred[Either[Throwable, Unit]] } yield { - def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) + def watch[A](str: Stream[F2, A]) = str.interruptWhen(interrupt.get.attempt) <* Stream.eval( + F.unit.map(_ => println("DEBUG: Inside 'concurrently' watch is invoked")) + ) - val compileBack: F2[Unit] = watch(that).compile.drain.guaranteeCase { + val compileBack: F2[Unit] = F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack started (starting resource)") + ) *> watch(that).compile.drain.guaranteeCase { // Pass the result of backstream completion in the backResult deferred. // IF result of back-stream was failed, interrupt fore. Otherwise, let it be - case Outcome.Errored(t) => backResult.complete(Left(t)) >> interrupt.complete(()).void - case _ => backResult.complete(Right(())).void + case Outcome.Errored(t) => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack ERRORED case") + ) *> backResult.complete(Left(t)) >> interrupt.complete(()).void + case Outcome.Canceled() => + backResult.complete(Right(())).void *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack CANCELED case") + ) + case Outcome.Succeeded(fa) => + F.unit.map(_ => + println(s"DEBUG: Inside 'concurrently' compileBack SUCCESS case with value: $fa") + ) *> + backResult.complete(Right(())).void *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' compileBack SUCCESS case") + ) }.voidError // stop background process but await for it to finalise with a result // We use F.fromEither to bring errors from the back into the fore - val stopBack: F2[Unit] = interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) + val stopBack: F2[Unit] = + interrupt.complete(()) >> backResult.get.flatMap(F.fromEither) *> F.unit.map(_ => + println("DEBUG: Inside 'concurrently' AFTER stopBack completed") + ) + + val tmp = Stream.resource( + Resource.make(compileBack.start)(_ => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)") + ) *> stopBack + ) + ) - (Stream.bracket(compileBack.start)(_ => stopBack), watch(this)) + (tmp, watch(this)) + + /* + ( + Stream.bracket(compileBack.start)(_ => + F.unit.map(_ => + println("DEBUG: Inside 'concurrently' stopBack invoked (closing resource)") + ) *> stopBack + ), + watch(this) + ) + */ } Stream.eval(fstream) @@ -2251,8 +2293,14 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val releaseAndCheckCompletion = semaphore.release *> semaphore.available.flatMap { - case `concurrency` => channel.close *> end.complete(()).void - case _ => F.unit + case `concurrency` => + F.unit.map(_ => + println("DEBUG: inside releaseAndCheckCompletion 'concurrency' case ") + ) *> + channel.close *> end.complete(()).void + case _ => + F.unit.map(_ => println("DEBUG: inside releaseAndCheckCompletion 'other' case ")) *> + F.unit } def forkOnElem(el: O): F2[Unit] = @@ -2260,10 +2308,18 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, poll(semaphore.acquire) <* Deferred[F2, Unit].flatMap { pushed => val init = initFork(pushed.complete(()).void) - poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => - val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.start(stop.get.race(action) *> releaseAndCheckCompletion) - } + F.unit.map(_ => println("DEBUG: Inside forkOnElem")) *> + poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => + val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get + F.unit + .map(_ => println("DEBUG: Inside forkOnElem and BEFORE action invocation")) *> + // F.start(stop.get.race(action) *> releaseAndCheckCompletion) + F.start( + stop.get.race(action) *> F.unit.map(_ => + println("DEBUG: Inside forkOnElem when stop vs action race ends") + ) *> releaseAndCheckCompletion + ) + } } } @@ -2272,11 +2328,21 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, interruptWhen(stop.get.map(_.asRight[Throwable])) .foreach(forkOnElem) .onFinalizeCase { - case ExitCase.Succeeded => releaseAndCheckCompletion - case _ => stop.complete(()) *> releaseAndCheckCompletion + case ExitCase.Succeeded => + F.unit.map(_ => println("DEBUG: inside background SUCCESS case")) *> + releaseAndCheckCompletion + case _ => + F.unit.map(_ => println("DEBUG: inside background OTHER case")) *> + stop.complete(()) *> releaseAndCheckCompletion } - val foreground = channel.stream.evalMap(_.rethrow) + val foreground = channel.stream + .evalTap { x => + F.unit.map(_ => println("DEBUG: Inside foreground evalTap")) *> + x + } + .evalMap(_.rethrow) + foreground.onFinalize(stop.complete(()) *> end.get).concurrently(background) } @@ -3845,13 +3911,16 @@ object Stream extends StreamLowPriority { case Resource.Allocate(resource) => Stream .bracketFullWeak(resource) { case ((_, release), exit) => - release(exit) + F.unit.map(_ => println("DEBUG: inside resourceWeak Allocate case CALLING release")) *> + release(exit) } .mapNoScope(_._1) case Resource.Bind(source, f) => resourceWeak(source).flatMap(o => resourceWeak(f(o))) - case Resource.Eval(fo) => Stream.eval(fo) - case Resource.Pure(o) => Stream.emit(o) + case Resource.Eval(fo) => + Stream.eval(fo) + case Resource.Pure(o) => + Stream.emit(o) } /** Same as [[resourceWeak]], but expressed as a FunctionK. */ diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index d1fb1ae82d..246ddafdf2 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1042,4 +1042,55 @@ class StreamSuite extends Fs2Suite { } } + test("parEvalMap works correctly") { + /* + Stream + .resource( + Resource.make(IO.println("acquire"))(_ => IO.println("release")) + ) + */ + /* + Stream + .resource( + Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => + IO.println("Closing Resource") *> r.set(false) + ) + ) + .parEvalMap(2)(ref => + ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> IO.sleep(1.second) >> + ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) + ) + .compile + .drain + */ + + Stream + .resource( + Resource.make(IO.println("Creating Resource") *> IO.ref(true))(r => + IO.println("Closing Resource") *> r.set(false) + ) + ) + .parEvalMap(2) { ref => + ref.get.flatMap(x => IO.println(s"before sleep: ${x}")) >> + IO.sleep(1 second) >> + ref.get.flatMap(x => IO.println(s"after sleep: ${x}")) + } + .compile + .drain + /* + Stream + .resource(Resource.make(IO.println("acquire"))(_ => IO.println("release"))) + .parEvalMap(2)(_ => IO.sleep(1.second) >> IO.println("use")) + .compile + .drain + + Stream.range(0, 60) + .covary[IO] + .parEvalMap(60)(_ => IO.sleep(1.second)) + .compile + .drain + */ + + } + }