Skip to content

Commit

Permalink
Chunked transmission lasts longer than timeout
Browse files Browse the repository at this point in the history
*Why I did it?*
In order to have a test which might confirm an issue
with an interrupted request

*How I did it:*
I prepared `NettyCatsRequestTimeoutTest` with the folloing test scenario:
 - send first chunk (100 bytes)
 - sleep
 - send second chunk (100 bytes)
  • Loading branch information
sergiuszkierat committed Dec 19, 2024
1 parent 7f079cf commit aded4cc
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package sttp.tapir.server.netty.cats

import cats.effect.{IO, Resource}
import cats.effect.std.Dispatcher
import cats.effect.unsafe.implicits.global
import io.netty.channel.EventLoopGroup
import org.scalatest.matchers.should.Matchers._
import sttp.capabilities.WebSockets
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3._
import sttp.model.HeaderNames
import sttp.tapir._
import sttp.tapir.server.netty.NettyConfig
import sttp.tapir.tests.Test

import scala.concurrent.duration.DurationInt

class NettyCatsRequestTimeoutTest(
dispatcher: Dispatcher[IO],
eventLoopGroup: EventLoopGroup,
backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets]
) {
def tests(): List[Test] = List(
Test("chunked transmission lasts longer than given timeout") {
val givenRequestTimeout = 2.seconds
val howManyChunks: Int = 2
val chunkSize = 100
val millisBeforeSendingSecondChunk = 1000L

val e =
endpoint.post
.in(header[Long](HeaderNames.ContentLength))
.in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain()))
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain()))
.serverLogicSuccess[IO] { case (length, stream) =>
IO((length, stream))
}

val config =
NettyConfig.default
.eventLoopGroup(eventLoopGroup)
.randomPort
.withDontShutdownEventLoopGroupOnClose
.noGracefulShutdown
.requestTimeout(givenRequestTimeout)

val bind = NettyCatsServer(dispatcher, config).addEndpoint(e).start()

def iterator(howManyChunks: Int, chunkSize: Int): Iterator[Byte] = new Iterator[Iterator[Byte]] {
private var chunksToGo: Int = howManyChunks

def hasNext: Boolean = {
if (chunksToGo == 1)
Thread.sleep(millisBeforeSendingSecondChunk)
chunksToGo > 0
}

def next(): Iterator[Byte] = {
chunksToGo -= 1
List.fill('A')(chunkSize).map(_.toByte).iterator
}
}.flatten

val inputStream = fs2.Stream.fromIterator[IO](iterator(howManyChunks, chunkSize), chunkSize = chunkSize)

Resource
.make(bind)(_.stop())
.map(_.port)
.use { port =>
basicRequest
.post(uri"http://localhost:$port")
.contentLength(howManyChunks * chunkSize)
.streamBody(Fs2Streams[IO])(inputStream)
.send(backend)
.map { _ =>
fail("I've got a bad feeling about this.")
}
}
.attempt
.map {
case Left(ex: sttp.client3.SttpClientException.TimeoutException) =>
ex.getCause.getMessage shouldBe "request timed out"
case Left(ex: sttp.client3.SttpClientException.ReadException) if ex.getCause.isInstanceOf[java.io.IOException] =>
println(s"Unexpected IOException: $ex")
fail(s"Unexpected IOException: $ex")

Check failure on line 86 in server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala

View workflow job for this annotation

GitHub Actions / Test report for 2.13-JVM-11

sttp.tapir.server.netty.cats.NettyCatsServerTest ► chunked transmission lasts longer than given timeout

Failed test found in: server/netty-server/cats/target/jvm-2.13/test-reports/TEST-sttp.tapir.server.netty.cats.NettyCatsServerTest.xml Error: org.scalatest.exceptions.TestFailedException: Unexpected IOException: sttp.client3.SttpClientException$ReadException: Exception when sending request: POST http://localhost:32909
Raw output
org.scalatest.exceptions.TestFailedException: Unexpected IOException: sttp.client3.SttpClientException$ReadException: Exception when sending request: POST http://localhost:32909
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
	at org.scalatest.matchers.should.Matchers$.newAssertionFailedException(Matchers.scala:8430)
	at org.scalatest.Assertions.fail(Assertions.scala:933)
	at org.scalatest.Assertions.fail$(Assertions.scala:929)
	at org.scalatest.matchers.should.Matchers$.fail(Matchers.scala:8430)
	at sttp.tapir.server.netty.cats.NettyCatsRequestTimeoutTest.$anonfun$tests$8(NettyCatsRequestTimeoutTest.scala:86)
	at delay @ sttp.tapir.server.netty.cats.internal.CatsUtil$.$anonfun$nettyFutureToScala$1(CatsUtil.scala:27)
	at async @ sttp.tapir.server.netty.cats.internal.CatsUtil$.nettyFutureToScala(CatsUtil.scala:26)
	at flatMap @ sttp.tapir.server.netty.cats.NettyCatsServer.$anonfun$stop$2(NettyCatsServer.scala:121)
	at defer @ sttp.tapir.server.netty.cats.NettyCatsServer.$anonfun$stop$1(NettyCatsServer.scala:121)
	at delay @ sttp.tapir.server.netty.cats.NettyCatsServer.waitForClosedChannels(NettyCatsServer.scala:108)
	at delay @ sttp.tapir.server.netty.cats.NettyCatsServer.shutdownChannelGroup(NettyCatsServer.scala:141)
	at voidError$extension @ fs2.interop.reactivestreams.StreamSubscription.run(StreamSubscription.scala:90)
	at delay @ sttp.client3.impl.cats.CatsMonadAsyncError.$anonfun$async$1(CatsMonadAsyncError.scala:10)

Check failure on line 86 in server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala

View workflow job for this annotation

GitHub Actions / Test report for 3-JVM-11

sttp.tapir.server.netty.cats.NettyCatsServerTest ► chunked transmission lasts longer than given timeout

Failed test found in: server/netty-server/cats/target/jvm-3/test-reports/TEST-sttp.tapir.server.netty.cats.NettyCatsServerTest.xml Error: org.scalatest.exceptions.TestFailedException: Unexpected IOException: sttp.client3.SttpClientException$ReadException: Exception when sending request: POST http://localhost:38231
Raw output
org.scalatest.exceptions.TestFailedException: Unexpected IOException: sttp.client3.SttpClientException$ReadException: Exception when sending request: POST http://localhost:38231
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:476)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:421)
	at org.scalatest.matchers.should.Matchers$.newAssertionFailedException(Matchers.scala:7725)
	at org.scalatest.Assertions.failImpl(Assertions.scala:1060)
	at org.scalatest.Assertions.org$scalatest$Assertions$$inline$failImpl(Assertions.scala:1056)
	at org.scalatest.Assertions.org$scalatest$Assertions$$inline$failImpl$(Assertions.scala:421)
	at org.scalatest.matchers.should.Matchers$.org$scalatest$Assertions$$inline$failImpl(Matchers.scala:7725)
	at sttp.tapir.server.netty.cats.NettyCatsRequestTimeoutTest.tests$$anonfun$1$$anonfun$4(NettyCatsRequestTimeoutTest.scala:86)
	at delay @ sttp.tapir.server.netty.cats.internal.CatsUtil$.nettyFutureToScala$$anonfun$1(CatsUtil.scala:35)
	at async @ sttp.tapir.server.netty.cats.internal.CatsUtil$.nettyFutureToScala(CatsUtil.scala:35)
	at flatMap @ sttp.tapir.server.netty.cats.NettyCatsServer.stop$$anonfun$1$$anonfun$1(NettyCatsServer.scala:121)
	at defer @ sttp.tapir.server.netty.cats.NettyCatsServer.stop$$anonfun$1(NettyCatsServer.scala:122)
	at delay @ sttp.tapir.server.netty.cats.NettyCatsServer.waitForClosedChannels(NettyCatsServer.scala:108)
	at delay @ sttp.tapir.server.netty.cats.NettyCatsServer.shutdownChannelGroup(NettyCatsServer.scala:141)
	at voidError$extension @ fs2.interop.reactivestreams.StreamSubscription.run(StreamSubscription.scala:90)
	at delay @ sttp.client3.impl.cats.CatsMonadAsyncError.async$$anonfun$1(CatsMonadAsyncError.scala:10)
case Left(ex) =>
fail(s"Unexpected exception: $ex")
case Right(_) =>
fail("Expected an exception but got success")
}
.unsafeToFuture()
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues {

val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)
val ioSleeper: Sleeper[IO] = new Sleeper[IO] {
override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration)
}
val ioSleeper: Sleeper[IO] = (duration: FiniteDuration) => IO.sleep(duration)
def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] =
stream.compile.drain.void

Expand All @@ -50,12 +48,16 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
) {
override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f)
override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty
}.tests()
}
.tests() ++
new NettyCatsRequestTimeoutTest(dispatcher, eventLoopGroup, backend).tests()

IO.pure((tests, eventLoopGroup))
} { case (_, eventLoopGroup) =>
IO.fromFuture(IO.delay(FutureUtil.nettyFutureToScala(eventLoopGroup.shutdownGracefully()): Future[_])).void
}
.map { case (tests, _) => tests }
}

override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout")
}

0 comments on commit aded4cc

Please sign in to comment.