|
1 | 1 | package sttp.tapir.server.play
|
2 | 2 |
|
| 3 | +import scala.concurrent.duration.{DurationInt, FiniteDuration} |
3 | 4 | import org.apache.pekko.actor.ActorSystem
|
4 | 5 | import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
|
5 | 6 | import cats.data.NonEmptyList
|
6 | 7 | import cats.effect.{IO, Resource}
|
7 | 8 | import cats.effect.unsafe.implicits.global
|
8 |
| -import org.scalatest.matchers.should.Matchers._ |
| 9 | +import com.typesafe.config.ConfigFactory |
| 10 | +import org.apache.pekko.util.ByteString |
| 11 | +import org.scalatest.matchers.should.Matchers.{fail, _} |
| 12 | +import play.api.{Configuration, Mode} |
9 | 13 | import play.api.http.ParserConfiguration
|
| 14 | +import play.api.routing.Router |
| 15 | +import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig} |
| 16 | +import sttp.capabilities.Streams |
| 17 | +import sttp.capabilities.fs2.Fs2Streams |
10 | 18 | import sttp.capabilities.pekko.PekkoStreams
|
11 | 19 | import sttp.client3._
|
12 |
| -import sttp.model.{MediaType, Part, StatusCode} |
| 20 | +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} |
13 | 21 | import sttp.monad.FutureMonad
|
14 | 22 | import sttp.tapir._
|
15 | 23 | import sttp.tapir.server.tests._
|
16 | 24 | import sttp.tapir.tests.{Test, TestSuite}
|
| 25 | +import fs2.{Chunk, Stream} |
| 26 | +import sttp.capabilities.fs2.Fs2Streams |
17 | 27 |
|
18 | 28 | import scala.concurrent.Future
|
| 29 | +import scala.concurrent.duration.FiniteDuration |
19 | 30 |
|
20 | 31 | class PlayServerTest extends TestSuite {
|
21 | 32 |
|
22 | 33 | def actorSystemResource: Resource[IO, ActorSystem] =
|
23 | 34 | Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void)
|
24 | 35 |
|
25 | 36 | override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend =>
|
26 |
| - actorSystemResource.map { implicit actorSystem => |
27 |
| - implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher) |
| 37 | + actorSystemResource.map { implicit _actorSystem => |
| 38 | + implicit val m: FutureMonad = new FutureMonad()(_actorSystem.dispatcher) |
28 | 39 |
|
29 |
| - val interpreter = new PlayTestServerInterpreter()(actorSystem) |
| 40 | + val interpreter = new PlayTestServerInterpreter()(_actorSystem) |
30 | 41 | val createServerTest = new DefaultCreateServerTest(backend, interpreter)
|
31 | 42 |
|
32 | 43 | def additionalTests(): List[Test] = List(
|
@@ -98,7 +109,61 @@ class PlayServerTest extends TestSuite {
|
98 | 109 | }
|
99 | 110 | }
|
100 | 111 | .unsafeToFuture()
|
101 |
| - } |
| 112 | + }, |
| 113 | + Test("chunked transmission lasts longer than given timeout") { |
| 114 | + val chunkSize = 100 |
| 115 | + val beforeSendingSecondChunk: FiniteDuration = 2.second |
| 116 | + val requestTimeout: FiniteDuration = 1.second |
| 117 | + |
| 118 | + val e = |
| 119 | + endpoint.post |
| 120 | + .in(header[Long](HeaderNames.ContentLength)) |
| 121 | + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) |
| 122 | + .out(header[Long](HeaderNames.ContentLength)) |
| 123 | + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) |
| 124 | + .serverLogicSuccess[Future] { case (length, stream) => |
| 125 | + Future.successful(length, stream) |
| 126 | + } |
| 127 | + |
| 128 | + val components: DefaultPekkoHttpServerComponents = new DefaultPekkoHttpServerComponents { |
| 129 | + val initialServerConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) |
| 130 | + |
| 131 | + val customConf: Configuration = |
| 132 | + Configuration( |
| 133 | + ConfigFactory.parseString(s"play { server.pekko.requestTimeout=${requestTimeout.toString} }") |
| 134 | + ) |
| 135 | + override lazy val serverConfig: ServerConfig = |
| 136 | + initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration)) |
| 137 | + override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher)) |
| 138 | + override lazy val router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/chunks") |
| 139 | + } |
| 140 | + |
| 141 | + def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { |
| 142 | + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) |
| 143 | + val initialChunks = Stream.chunk(chunk) |
| 144 | + val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) |
| 145 | + initialChunks ++ delayedChunk |
| 146 | + } |
| 147 | + |
| 148 | + val inputStream = createStream(chunkSize, beforeSendingSecondChunk) |
| 149 | + |
| 150 | + val bind = IO.blocking(components.server) |
| 151 | + Resource.make(bind)(s => IO.blocking(s.stop())) |
| 152 | + .map(_.mainAddress.getPort) |
| 153 | + .use { port => |
| 154 | + basicRequest |
| 155 | + .post(uri"http://localhost:$port/chunks") |
| 156 | + .contentLength(2 * chunkSize) |
| 157 | + .streamBody(Fs2Streams[IO])(inputStream) |
| 158 | + .send(backend) |
| 159 | + .map{ response => |
| 160 | + response.code shouldBe StatusCode.Ok |
| 161 | + response.contentLength shouldBe Some(2 * chunkSize) |
| 162 | + response.body shouldBe Right("A" * 2 * chunkSize) |
| 163 | + } |
| 164 | + } |
| 165 | + .unsafeToFuture() |
| 166 | + }, |
102 | 167 | )
|
103 | 168 |
|
104 | 169 | def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] =
|
@@ -135,4 +200,6 @@ class PlayServerTest extends TestSuite {
|
135 | 200 | additionalTests()
|
136 | 201 | }
|
137 | 202 | }
|
| 203 | + |
| 204 | + override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") |
138 | 205 | }
|
0 commit comments