diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala new file mode 100644 index 0000000000..ba6d961d3e --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -0,0 +1,55 @@ +//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 +//> using dep org.apache.pekko::pekko-stream:1.1.2 +//> using dep org.typelevel::cats-effect:3.5.7 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2 + +package sttp.tapir.examples.streaming + +import cats.effect.{ExitCode, IO, IOApp, Resource} +import sttp.capabilities.WebSockets +import sttp.client3.pekkohttp.PekkoHttpBackend +import sttp.client3.{Response, SttpBackend, UriContext, basicRequest} + +import scala.concurrent.Future +import sttp.model.{Header, HeaderNames, Method, QueryParams} +import sttp.tapir.* +import org.apache.pekko +import org.apache.pekko.actor.ActorSystem +import sttp.capabilities.pekko.PekkoStreams +import pekko.stream.scaladsl.{Flow, Source} +import pekko.util.ByteString +import cats.effect.* +import cats.syntax.all.* + +import scala.concurrent.duration.* +import scala.concurrent.duration.FiniteDuration + +object longLastingClient extends IOApp: + implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + + private val givenLength: Long = 10000 + private val chunkSize = 100 + private val noChunks = givenLength / chunkSize + + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = + val stream: Source[ByteString, Any] = + Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte))) + .zipWithIndex + .take(noChunks) + .map { case (chunk, idx) => + println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk + } + + basicRequest + .post(uri"http://localhost:9000/chunks") + .header(Header(HeaderNames.ContentLength, givenLength.toString)) + .streamBody(PekkoStreams)(stream) + .send(backend) + + override def run(args: List[String]): IO[ExitCode] = + val backend = PekkoHttpBackend.usingActorSystem(actorSystem) + val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend))) + responseIO.flatMap { response => + IO(println(response.body)) + }.as(ExitCode.Success) \ No newline at end of file diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala new file mode 100644 index 0000000000..e281ed58c3 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -0,0 +1,112 @@ +//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11 +//> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11 +//> using dep org.playframework::play-netty-server:3.0.6 +//> using dep com.softwaremill.sttp.client3::core:3.10.2 +//> using dep org.slf4j:slf4j-simple:2.0.16 + +package sttp.tapir.examples.streaming + +import play.core.server.* +import play.api.routing.Router.Routes +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.Materializer +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.* +import sttp.tapir.server.play.PlayServerInterpreter + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} +import sttp.tapir.* + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.* +import org.apache.pekko +import pekko.stream.scaladsl.{Flow, Source, Sink} +import pekko.util.ByteString +import sttp.tapir.server.play.PlayServerOptions + +given ExecutionContext = ExecutionContext.global + +type ErrorInfo = String + +implicit val actorSystem: ActorSystem = ActorSystem("playServer") + +def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = + f.transform { + case Success(v) => Success(Right(v)) + case Failure(e) => + println(s"Exception when running endpoint logic: $e") + Success(Left(e.getMessage)) + } + +def logic(s: (Long, Source[ByteString, Any])): Future[String] = { + val (length, stream) = s + println(s"Transmitting $length bytes...") + val result = stream + .runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS) + .map(_.reduce(_ ++ _).decodeString("UTF-8")) + result.onComplete { + case Failure(ex) => + println(s"Stream failed with exception: $ex" ) + case Success(s) => + println(s"Stream finished: ${s.length}/$length transmitted") + } + result +} + +val e = endpoint.post + .in("chunks") + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(stringBody) + .errorOut(plainBody[ErrorInfo]) + .serverLogic(logic.andThen(handleErrors)) + + +val routes = PlayServerInterpreter(PlayServerOptions.customiseInterceptors().serverLog(PlayServerOptions.defaultServerLog.logWhenReceived(true) + .logAllDecodeFailures(true)).options).toRoutes(e) + +@main def playServer(): Unit = + import play.api.Configuration + import play.api.Mode + import play.core.server.ServerConfig + + + import java.io.File + import java.util.Properties + + val customConfig = Configuration( + "play.server.http.idleTimeout" -> "75 seconds", + "play.server.https.idleTimeout" -> "75 seconds", + "play.server.https.wantClientAuth" -> false, + "play.server.https.needClientAuth" -> false, + "play.server.netty.server-header" -> null, + "play.server.netty.shutdownQuietPeriod" -> "2 seconds", + "play.server.netty.maxInitialLineLength" -> "4096", + "play.server.netty.maxChunkSize" -> "8192", + "play.server.netty.eventLoopThreads" -> "0", + "play.server.netty.transport" -> "jdk", + "play.server.max-header-size" -> "8k", + "play.server.waitBeforeTermination" -> "0", + "play.server.deferBodyParsing" -> false, + "play.server.websocket.frame.maxLength" -> "64k", + "play.server.websocket.periodic-keep-alive-mode" -> "ping", + "play.server.websocket.periodic-keep-alive-max-idle" -> "infinite", + "play.server.max-content-length" -> "infinite", + "play.server.netty.log.wire" -> true, + "play.server.netty.option.child.SO_KEEPALIVE" -> false, + "play.server.pekko.requestTimeout" -> "5 seconds", + ) + val serverConfig = ServerConfig( + rootDir = new File("."), + port = Some(9000), + sslPort = None, + address = "0.0.0.0", + mode = Mode.Dev, + properties = System.getProperties, + configuration = customConfig + ) + + NettyServer.fromRouterWithComponents(serverConfig) { components => routes } \ No newline at end of file diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 7ef2dc432d..b62c6da644 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -1,21 +1,32 @@ package sttp.tapir.server.play +import scala.concurrent.duration.{DurationInt, FiniteDuration} import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} import cats.data.NonEmptyList import cats.effect.{IO, Resource} import cats.effect.unsafe.implicits.global -import org.scalatest.matchers.should.Matchers._ +import com.typesafe.config.ConfigFactory +import org.apache.pekko.util.ByteString +import org.scalatest.matchers.should.Matchers.{fail, _} +import play.api.{Configuration, Mode} import play.api.http.ParserConfiguration +import play.api.routing.Router +import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig} +import sttp.capabilities.Streams +import sttp.capabilities.fs2.Fs2Streams import sttp.capabilities.pekko.PekkoStreams import sttp.client3._ -import sttp.model.{MediaType, Part, StatusCode} +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.monad.FutureMonad import sttp.tapir._ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} +import fs2.{Chunk, Stream} +import sttp.capabilities.fs2.Fs2Streams import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class PlayServerTest extends TestSuite { @@ -23,10 +34,10 @@ class PlayServerTest extends TestSuite { Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void) override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => - actorSystemResource.map { implicit actorSystem => - implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher) + actorSystemResource.map { implicit _actorSystem => + implicit val m: FutureMonad = new FutureMonad()(_actorSystem.dispatcher) - val interpreter = new PlayTestServerInterpreter()(actorSystem) + val interpreter = new PlayTestServerInterpreter()(_actorSystem) val createServerTest = new DefaultCreateServerTest(backend, interpreter) def additionalTests(): List[Test] = List( @@ -98,7 +109,61 @@ class PlayServerTest extends TestSuite { } } .unsafeToFuture() - } + }, + Test("chunked transmission lasts longer than given timeout") { + val chunkSize = 100 + val beforeSendingSecondChunk: FiniteDuration = 2.second + val requestTimeout: FiniteDuration = 1.second + + val e = + endpoint.post + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(header[Long](HeaderNames.ContentLength)) + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .serverLogicSuccess[Future] { case (length, stream) => + Future.successful(length, stream) + } + + val components: DefaultPekkoHttpServerComponents = new DefaultPekkoHttpServerComponents { + val initialServerConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) + + val customConf: Configuration = + Configuration( + ConfigFactory.parseString(s"play { server.pekko.requestTimeout=${requestTimeout.toString} }") + ) + override lazy val serverConfig: ServerConfig = + initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration)) + override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher)) + override lazy val router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/chunks") + } + + def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) + val initialChunks = Stream.chunk(chunk) + val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) + initialChunks ++ delayedChunk + } + + val inputStream = createStream(chunkSize, beforeSendingSecondChunk) + + val bind = IO.blocking(components.server) + Resource.make(bind)(s => IO.blocking(s.stop())) + .map(_.mainAddress.getPort) + .use { port => + basicRequest + .post(uri"http://localhost:$port/chunks") + .contentLength(2 * chunkSize) + .streamBody(Fs2Streams[IO])(inputStream) + .send(backend) + .map{ response => + response.code shouldBe StatusCode.Ok + response.contentLength shouldBe Some(2 * chunkSize) + response.body shouldBe Right("A" * 2 * chunkSize) + } + } + .unsafeToFuture() + }, ) def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] = @@ -135,4 +200,6 @@ class PlayServerTest extends TestSuite { additionalTests() } } + + override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") }