From 755dcd5be132deda185ea26a16b2436525595e8f Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 3 Dec 2024 15:46:46 +0100 Subject: [PATCH 01/10] Chunked transmission lasts longer than timeout *Why I did it?* In order to have a test which might confirm an issue with an interrupted request *How I did it:* I prepared `NettyCatsRequestTimeoutTest` with the folloing test scenario: - send first chunk (100 bytes) - sleep - send second chunk (100 bytes) --- .../cats/NettyCatsRequestTimeoutTest.scala | 95 +++++++++++++++++++ .../netty/cats/NettyCatsServerTest.scala | 10 +- 2 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala new file mode 100644 index 0000000000..0954ae11f6 --- /dev/null +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala @@ -0,0 +1,95 @@ +package sttp.tapir.server.netty.cats + +import cats.effect.{IO, Resource} +import cats.effect.std.Dispatcher +import cats.effect.unsafe.implicits.global +import io.netty.channel.EventLoopGroup +import org.scalatest.matchers.should.Matchers._ +import sttp.capabilities.WebSockets +import sttp.capabilities.fs2.Fs2Streams +import sttp.client3._ +import sttp.model.HeaderNames +import sttp.tapir._ +import sttp.tapir.server.netty.NettyConfig +import sttp.tapir.tests.Test + +import scala.concurrent.duration.DurationInt + +class NettyCatsRequestTimeoutTest( + dispatcher: Dispatcher[IO], + eventLoopGroup: EventLoopGroup, + backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] +) { + def tests(): List[Test] = List( + Test("chunked transmission lasts longer than given timeout") { + val givenRequestTimeout = 2.seconds + val howManyChunks: Int = 2 + val chunkSize = 100 + val millisBeforeSendingSecondChunk = 1000L + + val e = + endpoint.post + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + .out(header[Long](HeaderNames.ContentLength)) + .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + .serverLogicSuccess[IO] { case (length, stream) => + IO((length, stream)) + } + + val config = + NettyConfig.default + .eventLoopGroup(eventLoopGroup) + .randomPort + .withDontShutdownEventLoopGroupOnClose + .noGracefulShutdown + .requestTimeout(givenRequestTimeout) + + val bind = NettyCatsServer(dispatcher, config).addEndpoint(e).start() + + def iterator(howManyChunks: Int, chunkSize: Int): Iterator[Byte] = new Iterator[Iterator[Byte]] { + private var chunksToGo: Int = howManyChunks + + def hasNext: Boolean = { + if (chunksToGo == 1) + Thread.sleep(millisBeforeSendingSecondChunk) + chunksToGo > 0 + } + + def next(): Iterator[Byte] = { + chunksToGo -= 1 + List.fill('A')(chunkSize).map(_.toByte).iterator + } + }.flatten + + val inputStream = fs2.Stream.fromIterator[IO](iterator(howManyChunks, chunkSize), chunkSize = chunkSize) + + Resource + .make(bind)(_.stop()) + .map(_.port) + .use { port => + basicRequest + .post(uri"http://localhost:$port") + .contentLength(howManyChunks * chunkSize) + .streamBody(Fs2Streams[IO])(inputStream) + .send(backend) + .map { _ => + fail("I've got a bad feeling about this.") + } + } + .attempt + .map { + case Left(ex: sttp.client3.SttpClientException.TimeoutException) => + ex.getCause.getMessage shouldBe "request timed out" + case Left(ex: sttp.client3.SttpClientException.ReadException) if ex.getCause.isInstanceOf[java.io.IOException] => + println(s"Unexpected IOException: $ex") + fail(s"Unexpected IOException: $ex") + case Left(ex) => + fail(s"Unexpected exception: $ex") + case Right(_) => + fail("Expected an exception but got success") + } + .unsafeToFuture() + } + ) +} diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 73053ed362..7f38b698d9 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -24,9 +24,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher) val createServerTest = new DefaultCreateServerTest(backend, interpreter) - val ioSleeper: Sleeper[IO] = new Sleeper[IO] { - override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration) - } + val ioSleeper: Sleeper[IO] = (duration: FiniteDuration) => IO.sleep(duration) def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] = stream.compile.drain.void @@ -50,7 +48,9 @@ class NettyCatsServerTest extends TestSuite with EitherValues { ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty - }.tests() + } + .tests() ++ + new NettyCatsRequestTimeoutTest(dispatcher, eventLoopGroup, backend).tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => @@ -58,4 +58,6 @@ class NettyCatsServerTest extends TestSuite with EitherValues { } .map { case (tests, _) => tests } } + + override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") } From 699e4aab85d159d63ceddb865421876fd0391e74 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Fri, 20 Dec 2024 08:17:12 +0100 Subject: [PATCH 02/10] Flatten nested Iterator correctly in Scala 2.12 --- .../tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala index 0954ae11f6..bdd38d68cd 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala @@ -60,7 +60,7 @@ class NettyCatsRequestTimeoutTest( chunksToGo -= 1 List.fill('A')(chunkSize).map(_.toByte).iterator } - }.flatten + }.flatMap(identity) val inputStream = fs2.Stream.fromIterator[IO](iterator(howManyChunks, chunkSize), chunkSize = chunkSize) From e176a1a2d9ad60fd14a0abac0b3287f303d04be0 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Mon, 23 Dec 2024 14:21:59 +0100 Subject: [PATCH 03/10] fixes after adamw's feedback - add PlayServerTest instead of NettyCatsServerTest - improve fs2 implementation --- .../cats/NettyCatsRequestTimeoutTest.scala | 95 ------------------- .../netty/cats/NettyCatsServerTest.scala | 3 +- .../tapir/server/play/PlayServerTest.scala | 79 +++++++++++++-- 3 files changed, 74 insertions(+), 103 deletions(-) delete mode 100644 server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala deleted file mode 100644 index bdd38d68cd..0000000000 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala +++ /dev/null @@ -1,95 +0,0 @@ -package sttp.tapir.server.netty.cats - -import cats.effect.{IO, Resource} -import cats.effect.std.Dispatcher -import cats.effect.unsafe.implicits.global -import io.netty.channel.EventLoopGroup -import org.scalatest.matchers.should.Matchers._ -import sttp.capabilities.WebSockets -import sttp.capabilities.fs2.Fs2Streams -import sttp.client3._ -import sttp.model.HeaderNames -import sttp.tapir._ -import sttp.tapir.server.netty.NettyConfig -import sttp.tapir.tests.Test - -import scala.concurrent.duration.DurationInt - -class NettyCatsRequestTimeoutTest( - dispatcher: Dispatcher[IO], - eventLoopGroup: EventLoopGroup, - backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] -) { - def tests(): List[Test] = List( - Test("chunked transmission lasts longer than given timeout") { - val givenRequestTimeout = 2.seconds - val howManyChunks: Int = 2 - val chunkSize = 100 - val millisBeforeSendingSecondChunk = 1000L - - val e = - endpoint.post - .in(header[Long](HeaderNames.ContentLength)) - .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .serverLogicSuccess[IO] { case (length, stream) => - IO((length, stream)) - } - - val config = - NettyConfig.default - .eventLoopGroup(eventLoopGroup) - .randomPort - .withDontShutdownEventLoopGroupOnClose - .noGracefulShutdown - .requestTimeout(givenRequestTimeout) - - val bind = NettyCatsServer(dispatcher, config).addEndpoint(e).start() - - def iterator(howManyChunks: Int, chunkSize: Int): Iterator[Byte] = new Iterator[Iterator[Byte]] { - private var chunksToGo: Int = howManyChunks - - def hasNext: Boolean = { - if (chunksToGo == 1) - Thread.sleep(millisBeforeSendingSecondChunk) - chunksToGo > 0 - } - - def next(): Iterator[Byte] = { - chunksToGo -= 1 - List.fill('A')(chunkSize).map(_.toByte).iterator - } - }.flatMap(identity) - - val inputStream = fs2.Stream.fromIterator[IO](iterator(howManyChunks, chunkSize), chunkSize = chunkSize) - - Resource - .make(bind)(_.stop()) - .map(_.port) - .use { port => - basicRequest - .post(uri"http://localhost:$port") - .contentLength(howManyChunks * chunkSize) - .streamBody(Fs2Streams[IO])(inputStream) - .send(backend) - .map { _ => - fail("I've got a bad feeling about this.") - } - } - .attempt - .map { - case Left(ex: sttp.client3.SttpClientException.TimeoutException) => - ex.getCause.getMessage shouldBe "request timed out" - case Left(ex: sttp.client3.SttpClientException.ReadException) if ex.getCause.isInstanceOf[java.io.IOException] => - println(s"Unexpected IOException: $ex") - fail(s"Unexpected IOException: $ex") - case Left(ex) => - fail(s"Unexpected exception: $ex") - case Right(_) => - fail("Expected an exception but got success") - } - .unsafeToFuture() - } - ) -} diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 7f38b698d9..e8e36e90a5 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -49,8 +49,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty } - .tests() ++ - new NettyCatsRequestTimeoutTest(dispatcher, eventLoopGroup, backend).tests() + .tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 7ef2dc432d..b62c6da644 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -1,21 +1,32 @@ package sttp.tapir.server.play +import scala.concurrent.duration.{DurationInt, FiniteDuration} import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} import cats.data.NonEmptyList import cats.effect.{IO, Resource} import cats.effect.unsafe.implicits.global -import org.scalatest.matchers.should.Matchers._ +import com.typesafe.config.ConfigFactory +import org.apache.pekko.util.ByteString +import org.scalatest.matchers.should.Matchers.{fail, _} +import play.api.{Configuration, Mode} import play.api.http.ParserConfiguration +import play.api.routing.Router +import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig} +import sttp.capabilities.Streams +import sttp.capabilities.fs2.Fs2Streams import sttp.capabilities.pekko.PekkoStreams import sttp.client3._ -import sttp.model.{MediaType, Part, StatusCode} +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.monad.FutureMonad import sttp.tapir._ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} +import fs2.{Chunk, Stream} +import sttp.capabilities.fs2.Fs2Streams import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class PlayServerTest extends TestSuite { @@ -23,10 +34,10 @@ class PlayServerTest extends TestSuite { Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void) override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => - actorSystemResource.map { implicit actorSystem => - implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher) + actorSystemResource.map { implicit _actorSystem => + implicit val m: FutureMonad = new FutureMonad()(_actorSystem.dispatcher) - val interpreter = new PlayTestServerInterpreter()(actorSystem) + val interpreter = new PlayTestServerInterpreter()(_actorSystem) val createServerTest = new DefaultCreateServerTest(backend, interpreter) def additionalTests(): List[Test] = List( @@ -98,7 +109,61 @@ class PlayServerTest extends TestSuite { } } .unsafeToFuture() - } + }, + Test("chunked transmission lasts longer than given timeout") { + val chunkSize = 100 + val beforeSendingSecondChunk: FiniteDuration = 2.second + val requestTimeout: FiniteDuration = 1.second + + val e = + endpoint.post + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(header[Long](HeaderNames.ContentLength)) + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .serverLogicSuccess[Future] { case (length, stream) => + Future.successful(length, stream) + } + + val components: DefaultPekkoHttpServerComponents = new DefaultPekkoHttpServerComponents { + val initialServerConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) + + val customConf: Configuration = + Configuration( + ConfigFactory.parseString(s"play { server.pekko.requestTimeout=${requestTimeout.toString} }") + ) + override lazy val serverConfig: ServerConfig = + initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration)) + override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher)) + override lazy val router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/chunks") + } + + def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) + val initialChunks = Stream.chunk(chunk) + val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) + initialChunks ++ delayedChunk + } + + val inputStream = createStream(chunkSize, beforeSendingSecondChunk) + + val bind = IO.blocking(components.server) + Resource.make(bind)(s => IO.blocking(s.stop())) + .map(_.mainAddress.getPort) + .use { port => + basicRequest + .post(uri"http://localhost:$port/chunks") + .contentLength(2 * chunkSize) + .streamBody(Fs2Streams[IO])(inputStream) + .send(backend) + .map{ response => + response.code shouldBe StatusCode.Ok + response.contentLength shouldBe Some(2 * chunkSize) + response.body shouldBe Right("A" * 2 * chunkSize) + } + } + .unsafeToFuture() + }, ) def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] = @@ -135,4 +200,6 @@ class PlayServerTest extends TestSuite { additionalTests() } } + + override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") } From fd853257bb29d65ee00924eeabde9848fbc37095 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Mon, 23 Dec 2024 16:40:53 +0100 Subject: [PATCH 04/10] Clean up --- .../tapir/server/netty/cats/NettyCatsServerTest.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index e8e36e90a5..73053ed362 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -24,7 +24,9 @@ class NettyCatsServerTest extends TestSuite with EitherValues { val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher) val createServerTest = new DefaultCreateServerTest(backend, interpreter) - val ioSleeper: Sleeper[IO] = (duration: FiniteDuration) => IO.sleep(duration) + val ioSleeper: Sleeper[IO] = new Sleeper[IO] { + override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration) + } def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] = stream.compile.drain.void @@ -48,8 +50,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { ) { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty - } - .tests() + }.tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => @@ -57,6 +58,4 @@ class NettyCatsServerTest extends TestSuite with EitherValues { } .map { case (tests, _) => tests } } - - override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") } From 4ffd614d438405dc82050509f2a1b4c4c8e404e7 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Fri, 27 Dec 2024 13:45:02 +0100 Subject: [PATCH 05/10] add playServer and longLastingClient --- .../streaming/longLastingClient.scala | 47 +++++++++ .../tapir/examples/streaming/playServer.scala | 98 +++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala new file mode 100644 index 0000000000..8ae1764872 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -0,0 +1,47 @@ +//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 +//> using dep org.apache.pekko::pekko-stream:1.1.2 +//> using dep org.typelevel::cats-effect:3.5.7 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 +//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 + +package sttp.tapir.examples.streaming + +import cats.effect.{ExitCode, IO, IOApp, Resource} +import sttp.capabilities.WebSockets +import sttp.client3.pekkohttp.PekkoHttpBackend +import sttp.client3.{Response, SttpBackend, UriContext, basicRequest} + +import scala.concurrent.Future +import sttp.model.{Header, HeaderNames, Method, QueryParams} +import sttp.tapir.* +import org.apache.pekko +import org.apache.pekko.actor.ActorSystem +import sttp.capabilities.pekko.PekkoStreams +import pekko.stream.scaladsl.{Flow, Source} +import pekko.util.ByteString +import cats.effect.* +import cats.syntax.all.* + +import scala.concurrent.duration.* +import scala.concurrent.duration.FiniteDuration + +object longLastingClient extends IOApp: + implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = + val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => + println(s"$elem ${java.time.LocalTime.now()}"); elem + } + + basicRequest + .post(uri"http://localhost:9000/chunks") + .header(Header(HeaderNames.ContentLength, "10000")) + .streamBody(PekkoStreams)(stream) + .send(backend) + + override def run(args: List[String]): IO[ExitCode] = + val backend = PekkoHttpBackend.usingActorSystem(actorSystem) + val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend))) + responseIO.flatMap { response => + IO(println(response.body)) + }.as(ExitCode.Success) \ No newline at end of file diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala new file mode 100644 index 0000000000..c3df7ce830 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -0,0 +1,98 @@ +//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 +//> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 +//> using dep org.playframework::play-netty-server:3.0.6 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 + +package sttp.tapir.examples.streaming + +import play.core.server.* +import play.api.routing.Router.Routes +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.Materializer +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.* +import sttp.tapir.server.play.PlayServerInterpreter + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} +import sttp.tapir.* +import scala.concurrent.{ExecutionContext, Future} +import scala.util.* +import org.apache.pekko +import pekko.stream.scaladsl.{Flow, Source} +import pekko.util.ByteString + +given ExecutionContext = ExecutionContext.global + +type ErrorInfo = String + +implicit val actorSystem: ActorSystem = ActorSystem("playServer") + +def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = + f.transform { + case Success(v) => Success(Right(v)) + case Failure(e) => + println(s"Exception when running endpoint logic: $e") + Success(Left(e.getMessage)) + } + +def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { + val (length, stream) = s + println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") + Future.successful((length, stream)) +} + +val e = endpoint.post + .in("chunks") + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(header[Long](HeaderNames.ContentLength)) + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .errorOut(plainBody[ErrorInfo]) + .serverLogic((logic _).andThen(handleErrors)) + +val routes = PlayServerInterpreter().toRoutes(e) + +@main def playServer(): Unit = + import play.api.Configuration + import play.api.Mode + import play.core.server.ServerConfig + + import java.io.File + import java.util.Properties + + val customConfig = Configuration( + "play.server.http.idleTimeout" -> "75 seconds", + "play.server.https.idleTimeout" -> "75 seconds", + "play.server.https.wantClientAuth" -> false, + "play.server.https.needClientAuth" -> false, + "play.server.netty.server-header" -> null, + "play.server.netty.shutdownQuietPeriod" -> "2 seconds", + "play.server.netty.maxInitialLineLength" -> "4096", + "play.server.netty.maxChunkSize" -> "8192", + "play.server.netty.eventLoopThreads" -> "0", + "play.server.netty.transport" -> "jdk", + "play.server.max-header-size" -> "8k", + "play.server.waitBeforeTermination" -> "0", + "play.server.deferBodyParsing" -> false, + "play.server.websocket.frame.maxLength" -> "64k", + "play.server.websocket.periodic-keep-alive-mode" -> "ping", + "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", + "play.server.max-content-length" -> "infinite", + "play.server.netty.log.wire" -> true, + "play.server.netty.option.child.tcpNoDelay" -> true, + "play.server.pekko.requestTimeout" -> "5 seconds", + ) + val serverConfig = ServerConfig( + rootDir = new File("."), + port = Some(9000), + sslPort = Some(9443), + address = "0.0.0.0", + mode = Mode.Dev, + properties = System.getProperties, + configuration = customConfig + ) + + NettyServer.fromRouterWithComponents(serverConfig) { components => routes } \ No newline at end of file From ebf5495548819c3e1e5d5a5f9d644def5c314dff Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 31 Dec 2024 13:47:10 +0100 Subject: [PATCH 06/10] wip --- .../streaming/longLastingClient.scala | 34 +++--- .../tapir/examples/streaming/playServer.scala | 111 +++++++++--------- 2 files changed, 78 insertions(+), 67 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index 8ae1764872..89f99739e8 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -1,8 +1,10 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 +//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 -//> using dep com.softwaremill.sttp.client3::core:3.10.1 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 //> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 +//> using dep com.softwaremill.sttp.client3::fs2:3.10.2 package sttp.tapir.examples.streaming @@ -21,27 +23,31 @@ import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString import cats.effect.* import cats.syntax.all.* - +import sttp.client3.httpclient.fs2.HttpClientFs2Backend import scala.concurrent.duration.* import scala.concurrent.duration.FiniteDuration +import sttp.capabilities.fs2.Fs2Streams +import fs2.{Chunk, Stream} object longLastingClient extends IOApp: - implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") - - private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = - val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => - println(s"$elem ${java.time.LocalTime.now()}"); elem + private def makeRequest(backend: SttpBackend[IO, Fs2Streams[IO] & WebSockets]): IO[Response[Either[String, String]]] = + def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) + val initialChunks = Stream.chunk(chunk) + val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) + initialChunks ++ delayedChunk } - + val stream = createStream(100, 2.seconds) + basicRequest .post(uri"http://localhost:9000/chunks") .header(Header(HeaderNames.ContentLength, "10000")) - .streamBody(PekkoStreams)(stream) + .streamBody(Fs2Streams[IO])(stream) .send(backend) - + override def run(args: List[String]): IO[ExitCode] = - val backend = PekkoHttpBackend.usingActorSystem(actorSystem) - val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend))) - responseIO.flatMap { response => + HttpClientFs2Backend.resource[IO]().use { backend => + makeRequest(backend).flatMap { response => IO(println(response.body)) - }.as(ExitCode.Success) \ No newline at end of file + } + }.as(ExitCode.Success) \ No newline at end of file diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index c3df7ce830..cf2e992a8b 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -1,12 +1,11 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 //> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 +//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.playframework::play-netty-server:3.0.6 -//> using dep com.softwaremill.sttp.client3::core:3.10.1 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 package sttp.tapir.examples.streaming -import play.core.server.* -import play.api.routing.Router.Routes import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import sttp.capabilities.pekko.PekkoStreams @@ -18,11 +17,23 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.tapir.* + import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString +import sttp.client3.UriContext +import sttp.tapir.server.netty.cats.NettyCatsServer +import sttp.tapir.server.netty.NettyConfig +import scala.concurrent.duration.DurationInt +import cats.effect.{IO, Resource} +import cats.effect.std.Dispatcher +import scala.concurrent.duration.FiniteDuration +import fs2.{Chunk, Stream} +import cats.effect.unsafe.implicits.global + +import sttp.capabilities.fs2.Fs2Streams given ExecutionContext = ExecutionContext.global @@ -30,30 +41,37 @@ type ErrorInfo = String implicit val actorSystem: ActorSystem = ActorSystem("playServer") -def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = - f.transform { - case Success(v) => Success(Right(v)) - case Failure(e) => - println(s"Exception when running endpoint logic: $e") - Success(Left(e.getMessage)) - } - -def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { - val (length, stream) = s - println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") - Future.successful((length, stream)) +val givenRequestTimeout = 2.seconds +val chunkSize = 100 +val beforeSendingSecondChunk: FiniteDuration = 2.second + +def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): fs2.Stream[IO, Byte] = { + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) + val initialChunks = fs2.Stream.chunk(chunk) + val delayedChunk = fs2.Stream.sleep[IO](beforeSendingSecondChunk) >> fs2.Stream.chunk(chunk) + initialChunks ++ delayedChunk } +val inputStream = createStream(chunkSize, beforeSendingSecondChunk) + val e = endpoint.post .in("chunks") .in(header[Long](HeaderNames.ContentLength)) - .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) - .errorOut(plainBody[ErrorInfo]) - .serverLogic((logic _).andThen(handleErrors)) + .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + .serverLogicSuccess[IO] { case (length, stream) => + IO((length, stream)) + } -val routes = PlayServerInterpreter().toRoutes(e) +val config = + NettyConfig.default + .host("0.0.0.0") + .port(9000) + .requestTimeout(givenRequestTimeout) + + +//val routes = PlayServerInterpreter().toRoutes(e) @main def playServer(): Unit = import play.api.Configuration @@ -63,36 +81,23 @@ val routes = PlayServerInterpreter().toRoutes(e) import java.io.File import java.util.Properties - val customConfig = Configuration( - "play.server.http.idleTimeout" -> "75 seconds", - "play.server.https.idleTimeout" -> "75 seconds", - "play.server.https.wantClientAuth" -> false, - "play.server.https.needClientAuth" -> false, - "play.server.netty.server-header" -> null, - "play.server.netty.shutdownQuietPeriod" -> "2 seconds", - "play.server.netty.maxInitialLineLength" -> "4096", - "play.server.netty.maxChunkSize" -> "8192", - "play.server.netty.eventLoopThreads" -> "0", - "play.server.netty.transport" -> "jdk", - "play.server.max-header-size" -> "8k", - "play.server.waitBeforeTermination" -> "0", - "play.server.deferBodyParsing" -> false, - "play.server.websocket.frame.maxLength" -> "64k", - "play.server.websocket.periodic-keep-alive-mode" -> "ping", - "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", - "play.server.max-content-length" -> "infinite", - "play.server.netty.log.wire" -> true, - "play.server.netty.option.child.tcpNoDelay" -> true, - "play.server.pekko.requestTimeout" -> "5 seconds", - ) - val serverConfig = ServerConfig( - rootDir = new File("."), - port = Some(9000), - sslPort = Some(9443), - address = "0.0.0.0", - mode = Mode.Dev, - properties = System.getProperties, - configuration = customConfig - ) - - NettyServer.fromRouterWithComponents(serverConfig) { components => routes } \ No newline at end of file + println(s"Server is starting...") + + NettyCatsServer + .io(config) + .use { server => + for { + binding <- server + .addEndpoint(e) + .start() + result <- IO + .blocking { + val port = binding.port + val host = binding.hostName + println(s"Server started at port = ${binding.port}") + } + .guarantee(binding.stop()) + } yield result + }.unsafeRunSync() + + println(s"Server started at port ???") \ No newline at end of file From 024332bb1f9e0fdae539c26c2b1363c546e983f7 Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 31 Dec 2024 16:32:54 +0100 Subject: [PATCH 07/10] Revert "wip" This reverts commit 02ba4b01925ef4a60064330838de4f0c414d2a88. --- .../streaming/longLastingClient.scala | 34 +++--- .../tapir/examples/streaming/playServer.scala | 111 +++++++++--------- 2 files changed, 67 insertions(+), 78 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index 89f99739e8..8ae1764872 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -1,10 +1,8 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 -//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 -//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 //> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 -//> using dep com.softwaremill.sttp.client3::fs2:3.10.2 package sttp.tapir.examples.streaming @@ -23,31 +21,27 @@ import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString import cats.effect.* import cats.syntax.all.* -import sttp.client3.httpclient.fs2.HttpClientFs2Backend + import scala.concurrent.duration.* import scala.concurrent.duration.FiniteDuration -import sttp.capabilities.fs2.Fs2Streams -import fs2.{Chunk, Stream} object longLastingClient extends IOApp: - private def makeRequest(backend: SttpBackend[IO, Fs2Streams[IO] & WebSockets]): IO[Response[Either[String, String]]] = - def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { - val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) - val initialChunks = Stream.chunk(chunk) - val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) - initialChunks ++ delayedChunk + implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = + val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => + println(s"$elem ${java.time.LocalTime.now()}"); elem } - val stream = createStream(100, 2.seconds) - + basicRequest .post(uri"http://localhost:9000/chunks") .header(Header(HeaderNames.ContentLength, "10000")) - .streamBody(Fs2Streams[IO])(stream) + .streamBody(PekkoStreams)(stream) .send(backend) - + override def run(args: List[String]): IO[ExitCode] = - HttpClientFs2Backend.resource[IO]().use { backend => - makeRequest(backend).flatMap { response => + val backend = PekkoHttpBackend.usingActorSystem(actorSystem) + val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend))) + responseIO.flatMap { response => IO(println(response.body)) - } - }.as(ExitCode.Success) \ No newline at end of file + }.as(ExitCode.Success) \ No newline at end of file diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index cf2e992a8b..c3df7ce830 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -1,11 +1,12 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 //> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 -//> using dep com.softwaremill.sttp.tapir::tapir-netty-server-cats:1.11.11 //> using dep org.playframework::play-netty-server:3.0.6 -//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep com.softwaremill.sttp.client3::core:3.10.1 package sttp.tapir.examples.streaming +import play.core.server.* +import play.api.routing.Router.Routes import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import sttp.capabilities.pekko.PekkoStreams @@ -17,23 +18,11 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.tapir.* - import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString -import sttp.client3.UriContext -import sttp.tapir.server.netty.cats.NettyCatsServer -import sttp.tapir.server.netty.NettyConfig -import scala.concurrent.duration.DurationInt -import cats.effect.{IO, Resource} -import cats.effect.std.Dispatcher -import scala.concurrent.duration.FiniteDuration -import fs2.{Chunk, Stream} -import cats.effect.unsafe.implicits.global - -import sttp.capabilities.fs2.Fs2Streams given ExecutionContext = ExecutionContext.global @@ -41,37 +30,30 @@ type ErrorInfo = String implicit val actorSystem: ActorSystem = ActorSystem("playServer") -val givenRequestTimeout = 2.seconds -val chunkSize = 100 -val beforeSendingSecondChunk: FiniteDuration = 2.second - -def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): fs2.Stream[IO, Byte] = { - val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) - val initialChunks = fs2.Stream.chunk(chunk) - val delayedChunk = fs2.Stream.sleep[IO](beforeSendingSecondChunk) >> fs2.Stream.chunk(chunk) - initialChunks ++ delayedChunk +def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = + f.transform { + case Success(v) => Success(Right(v)) + case Failure(e) => + println(s"Exception when running endpoint logic: $e") + Success(Left(e.getMessage)) + } + +def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { + val (length, stream) = s + println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") + Future.successful((length, stream)) } -val inputStream = createStream(chunkSize, beforeSendingSecondChunk) - val e = endpoint.post .in("chunks") .in(header[Long](HeaderNames.ContentLength)) - .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .serverLogicSuccess[IO] { case (length, stream) => - IO((length, stream)) - } + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .errorOut(plainBody[ErrorInfo]) + .serverLogic((logic _).andThen(handleErrors)) -val config = - NettyConfig.default - .host("0.0.0.0") - .port(9000) - .requestTimeout(givenRequestTimeout) - - -//val routes = PlayServerInterpreter().toRoutes(e) +val routes = PlayServerInterpreter().toRoutes(e) @main def playServer(): Unit = import play.api.Configuration @@ -81,23 +63,36 @@ val config = import java.io.File import java.util.Properties - println(s"Server is starting...") - - NettyCatsServer - .io(config) - .use { server => - for { - binding <- server - .addEndpoint(e) - .start() - result <- IO - .blocking { - val port = binding.port - val host = binding.hostName - println(s"Server started at port = ${binding.port}") - } - .guarantee(binding.stop()) - } yield result - }.unsafeRunSync() - - println(s"Server started at port ???") \ No newline at end of file + val customConfig = Configuration( + "play.server.http.idleTimeout" -> "75 seconds", + "play.server.https.idleTimeout" -> "75 seconds", + "play.server.https.wantClientAuth" -> false, + "play.server.https.needClientAuth" -> false, + "play.server.netty.server-header" -> null, + "play.server.netty.shutdownQuietPeriod" -> "2 seconds", + "play.server.netty.maxInitialLineLength" -> "4096", + "play.server.netty.maxChunkSize" -> "8192", + "play.server.netty.eventLoopThreads" -> "0", + "play.server.netty.transport" -> "jdk", + "play.server.max-header-size" -> "8k", + "play.server.waitBeforeTermination" -> "0", + "play.server.deferBodyParsing" -> false, + "play.server.websocket.frame.maxLength" -> "64k", + "play.server.websocket.periodic-keep-alive-mode" -> "ping", + "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", + "play.server.max-content-length" -> "infinite", + "play.server.netty.log.wire" -> true, + "play.server.netty.option.child.tcpNoDelay" -> true, + "play.server.pekko.requestTimeout" -> "5 seconds", + ) + val serverConfig = ServerConfig( + rootDir = new File("."), + port = Some(9000), + sslPort = Some(9443), + address = "0.0.0.0", + mode = Mode.Dev, + properties = System.getProperties, + configuration = customConfig + ) + + NettyServer.fromRouterWithComponents(serverConfig) { components => routes } \ No newline at end of file From 0bbc22ee4324e9b613f7f7aaaa70ab98deb6390f Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 31 Dec 2024 17:40:52 +0100 Subject: [PATCH 08/10] fixes --- .../examples/streaming/longLastingClient.scala | 2 +- .../tapir/examples/streaming/playServer.scala | 16 +++++++++++----- .../main/scala/sttp/tapir/perf/play/Play.scala | 2 +- .../sttp/tapir/server/play/PlayServerTest.scala | 3 +++ 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index 8ae1764872..c94e859626 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -1,7 +1,7 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 -//> using dep com.softwaremill.sttp.client3::core:3.10.1 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 //> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 package sttp.tapir.examples.streaming diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index c3df7ce830..756c4fafde 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -1,7 +1,8 @@ //> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 //> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 //> using dep org.playframework::play-netty-server:3.0.6 -//> using dep com.softwaremill.sttp.client3::core:3.10.1 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep org.slf4j:slf4j-simple:2.0.16 package sttp.tapir.examples.streaming @@ -18,11 +19,13 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.tapir.* + import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko import pekko.stream.scaladsl.{Flow, Source} import pekko.util.ByteString +import sttp.tapir.server.play.PlayServerOptions given ExecutionContext = ExecutionContext.global @@ -51,15 +54,18 @@ val e = endpoint.post .out(header[Long](HeaderNames.ContentLength)) .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) .errorOut(plainBody[ErrorInfo]) - .serverLogic((logic _).andThen(handleErrors)) + .serverLogic(logic.andThen(handleErrors)) + -val routes = PlayServerInterpreter().toRoutes(e) +val routes = PlayServerInterpreter(PlayServerOptions.customiseInterceptors().serverLog(PlayServerOptions.defaultServerLog.logWhenReceived(true) + .logAllDecodeFailures(true)).options).toRoutes(e) @main def playServer(): Unit = import play.api.Configuration import play.api.Mode import play.core.server.ServerConfig + import java.io.File import java.util.Properties @@ -82,13 +88,13 @@ val routes = PlayServerInterpreter().toRoutes(e) "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", "play.server.max-content-length" -> "infinite", "play.server.netty.log.wire" -> true, - "play.server.netty.option.child.tcpNoDelay" -> true, + "play.server.netty.option.child.SO_KEEPALIVE" -> false, "play.server.pekko.requestTimeout" -> "5 seconds", ) val serverConfig = ServerConfig( rootDir = new File("."), port = Some(9000), - sslPort = Some(9443), + sslPort = None, address = "0.0.0.0", mode = Mode.Dev, properties = System.getProperties, diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala b/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala index c3e576696c..ceb9f6d983 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala @@ -78,7 +78,7 @@ object Tapir extends Endpoints { (actorSystem: ActorSystem) => { implicit val actorSystemForMaterializer: ActorSystem = actorSystem implicit val ec: ExecutionContext = actorSystem.dispatcher - val serverOptions = buildOptions(PlayServerOptions.customiseInterceptors(), withServerLog) + val serverOptions = buildOptions(PlayServerInterpreter, withServerLog) PlayServerInterpreter(serverOptions).toRoutes( genEndpointsFuture(nRoutes) ) diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index c25bd41529..550bb1370f 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -103,6 +103,9 @@ class PlayServerTest extends TestSuite { def drainAkka(stream: AkkaStreams.BinaryStream): Future[Unit] = stream.runWith(Sink.ignore).map(_ => ()) + PlayServerOptions.customiseInterceptors().serverLog(PlayServerOptions.defaultServerLog.logWhenReceived(true) + .logAllDecodeFailures(true)).options + new ServerBasicTests( createServerTest, interpreter, From f2175174eb1704129c027a7d7a9fa56254b7d6db Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Tue, 31 Dec 2024 17:43:00 +0100 Subject: [PATCH 09/10] revert --- perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala | 2 +- .../src/test/scala/sttp/tapir/server/play/PlayServerTest.scala | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala b/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala index ceb9f6d983..c3e576696c 100644 --- a/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala +++ b/perf-tests/src/main/scala/sttp/tapir/perf/play/Play.scala @@ -78,7 +78,7 @@ object Tapir extends Endpoints { (actorSystem: ActorSystem) => { implicit val actorSystemForMaterializer: ActorSystem = actorSystem implicit val ec: ExecutionContext = actorSystem.dispatcher - val serverOptions = buildOptions(PlayServerInterpreter, withServerLog) + val serverOptions = buildOptions(PlayServerOptions.customiseInterceptors(), withServerLog) PlayServerInterpreter(serverOptions).toRoutes( genEndpointsFuture(nRoutes) ) diff --git a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 550bb1370f..c25bd41529 100644 --- a/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play29-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -103,9 +103,6 @@ class PlayServerTest extends TestSuite { def drainAkka(stream: AkkaStreams.BinaryStream): Future[Unit] = stream.runWith(Sink.ignore).map(_ => ()) - PlayServerOptions.customiseInterceptors().serverLog(PlayServerOptions.defaultServerLog.logWhenReceived(true) - .logAllDecodeFailures(true)).options - new ServerBasicTests( createServerTest, interpreter, From f1aad13333594b2418d53a06d86ca0291c10860b Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Thu, 2 Jan 2025 12:56:00 +0100 Subject: [PATCH 10/10] fixes after review --- .../streaming/longLastingClient.scala | 18 ++++++++++++----- .../tapir/examples/streaming/playServer.scala | 20 +++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index c94e859626..ba6d961d3e 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -2,7 +2,7 @@ //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 //> using dep com.softwaremill.sttp.client3::core:3.10.2 -//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 +//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2 package sttp.tapir.examples.streaming @@ -28,14 +28,22 @@ import scala.concurrent.duration.FiniteDuration object longLastingClient extends IOApp: implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + private val givenLength: Long = 10000 + private val chunkSize = 100 + private val noChunks = givenLength / chunkSize + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = - val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => - println(s"$elem ${java.time.LocalTime.now()}"); elem - } + val stream: Source[ByteString, Any] = + Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte))) + .zipWithIndex + .take(noChunks) + .map { case (chunk, idx) => + println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk + } basicRequest .post(uri"http://localhost:9000/chunks") - .header(Header(HeaderNames.ContentLength, "10000")) + .header(Header(HeaderNames.ContentLength, givenLength.toString)) .streamBody(PekkoStreams)(stream) .send(backend) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index 756c4fafde..e281ed58c3 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -23,7 +23,7 @@ import sttp.tapir.* import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko -import pekko.stream.scaladsl.{Flow, Source} +import pekko.stream.scaladsl.{Flow, Source, Sink} import pekko.util.ByteString import sttp.tapir.server.play.PlayServerOptions @@ -41,18 +41,26 @@ def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = Success(Left(e.getMessage)) } -def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { +def logic(s: (Long, Source[ByteString, Any])): Future[String] = { val (length, stream) = s - println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") - Future.successful((length, stream)) + println(s"Transmitting $length bytes...") + val result = stream + .runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS) + .map(_.reduce(_ ++ _).decodeString("UTF-8")) + result.onComplete { + case Failure(ex) => + println(s"Stream failed with exception: $ex" ) + case Success(s) => + println(s"Stream finished: ${s.length}/$length transmitted") + } + result } val e = endpoint.post .in("chunks") .in(header[Long](HeaderNames.ContentLength)) .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) - .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(stringBody) .errorOut(plainBody[ErrorInfo]) .serverLogic(logic.andThen(handleErrors))