Skip to content

Chunked transmission lasts longer than timeout #4214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11
//> using dep org.apache.pekko::pekko-stream:1.1.2
//> using dep org.typelevel::cats-effect:3.5.7
//> using dep com.softwaremill.sttp.client3::core:3.10.2
//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2

package sttp.tapir.examples.streaming

import cats.effect.{ExitCode, IO, IOApp, Resource}
import sttp.capabilities.WebSockets
import sttp.client3.pekkohttp.PekkoHttpBackend
import sttp.client3.{Response, SttpBackend, UriContext, basicRequest}

import scala.concurrent.Future
import sttp.model.{Header, HeaderNames, Method, QueryParams}
import sttp.tapir.*
import org.apache.pekko
import org.apache.pekko.actor.ActorSystem
import sttp.capabilities.pekko.PekkoStreams
import pekko.stream.scaladsl.{Flow, Source}
import pekko.util.ByteString
import cats.effect.*
import cats.syntax.all.*

import scala.concurrent.duration.*
import scala.concurrent.duration.FiniteDuration

object longLastingClient extends IOApp:
implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient")

private val givenLength: Long = 10000
private val chunkSize = 100
private val noChunks = givenLength / chunkSize

private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] =
val stream: Source[ByteString, Any] =
Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte)))
.zipWithIndex
.take(noChunks)
.map { case (chunk, idx) =>
println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk
}

basicRequest
.post(uri"http://localhost:9000/chunks")
.header(Header(HeaderNames.ContentLength, givenLength.toString))
.streamBody(PekkoStreams)(stream)
.send(backend)

override def run(args: List[String]): IO[ExitCode] =
val backend = PekkoHttpBackend.usingActorSystem(actorSystem)
val responseIO: IO[Response[Either[String, String]]] = IO.fromFuture(IO(makeRequest(backend)))
responseIO.flatMap { response =>
IO(println(response.body))
}.as(ExitCode.Success)
112 changes: 112 additions & 0 deletions examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//> using dep com.softwaremill.sttp.tapir::tapir-core:1.11.11
//> using dep com.softwaremill.sttp.tapir::tapir-play-server:1.11.11
//> using dep org.playframework::play-netty-server:3.0.6
//> using dep com.softwaremill.sttp.client3::core:3.10.2
//> using dep org.slf4j:slf4j-simple:2.0.16

package sttp.tapir.examples.streaming

import play.core.server.*
import play.api.routing.Router.Routes
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import sttp.capabilities.pekko.PekkoStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.*
import sttp.tapir.server.play.PlayServerInterpreter

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import sttp.model.{HeaderNames, MediaType, Part, StatusCode}
import sttp.tapir.*

import scala.concurrent.{ExecutionContext, Future}
import scala.util.*
import org.apache.pekko
import pekko.stream.scaladsl.{Flow, Source, Sink}
import pekko.util.ByteString
import sttp.tapir.server.play.PlayServerOptions

given ExecutionContext = ExecutionContext.global

type ErrorInfo = String

implicit val actorSystem: ActorSystem = ActorSystem("playServer")

def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] =
f.transform {
case Success(v) => Success(Right(v))
case Failure(e) =>
println(s"Exception when running endpoint logic: $e")
Success(Left(e.getMessage))
}

def logic(s: (Long, Source[ByteString, Any])): Future[String] = {
val (length, stream) = s
println(s"Transmitting $length bytes...")
val result = stream
.runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS)
.map(_.reduce(_ ++ _).decodeString("UTF-8"))
result.onComplete {
case Failure(ex) =>
println(s"Stream failed with exception: $ex" )
case Success(s) =>
println(s"Stream finished: ${s.length}/$length transmitted")
}
result
}

val e = endpoint.post
.in("chunks")
.in(header[Long](HeaderNames.ContentLength))
.in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.out(stringBody)
.errorOut(plainBody[ErrorInfo])
.serverLogic(logic.andThen(handleErrors))


val routes = PlayServerInterpreter(PlayServerOptions.customiseInterceptors().serverLog(PlayServerOptions.defaultServerLog.logWhenReceived(true)
.logAllDecodeFailures(true)).options).toRoutes(e)

@main def playServer(): Unit =
import play.api.Configuration
import play.api.Mode
import play.core.server.ServerConfig


import java.io.File
import java.util.Properties

val customConfig = Configuration(
"play.server.http.idleTimeout" -> "75 seconds",
"play.server.https.idleTimeout" -> "75 seconds",
"play.server.https.wantClientAuth" -> false,
"play.server.https.needClientAuth" -> false,
"play.server.netty.server-header" -> null,
"play.server.netty.shutdownQuietPeriod" -> "2 seconds",
"play.server.netty.maxInitialLineLength" -> "4096",
"play.server.netty.maxChunkSize" -> "8192",
"play.server.netty.eventLoopThreads" -> "0",
"play.server.netty.transport" -> "jdk",
"play.server.max-header-size" -> "8k",
"play.server.waitBeforeTermination" -> "0",
"play.server.deferBodyParsing" -> false,
"play.server.websocket.frame.maxLength" -> "64k",
"play.server.websocket.periodic-keep-alive-mode" -> "ping",
"play.server.websocket.periodic-keep-alive-max-idle" -> "infinite",
"play.server.max-content-length" -> "infinite",
"play.server.netty.log.wire" -> true,
"play.server.netty.option.child.SO_KEEPALIVE" -> false,
"play.server.pekko.requestTimeout" -> "5 seconds",
)
val serverConfig = ServerConfig(
rootDir = new File("."),
port = Some(9000),
sslPort = None,
address = "0.0.0.0",
mode = Mode.Dev,
properties = System.getProperties,
configuration = customConfig
)

NettyServer.fromRouterWithComponents(serverConfig) { components => routes }
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
package sttp.tapir.server.play

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import cats.data.NonEmptyList
import cats.effect.{IO, Resource}
import cats.effect.unsafe.implicits.global
import org.scalatest.matchers.should.Matchers._
import com.typesafe.config.ConfigFactory
import org.apache.pekko.util.ByteString
import org.scalatest.matchers.should.Matchers.{fail, _}
import play.api.{Configuration, Mode}
import play.api.http.ParserConfiguration
import play.api.routing.Router
import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig}
import sttp.capabilities.Streams
import sttp.capabilities.fs2.Fs2Streams
import sttp.capabilities.pekko.PekkoStreams
import sttp.client3._
import sttp.model.{MediaType, Part, StatusCode}
import sttp.model.{HeaderNames, MediaType, Part, StatusCode}
import sttp.monad.FutureMonad
import sttp.tapir._
import sttp.tapir.server.tests._
import sttp.tapir.tests.{Test, TestSuite}
import fs2.{Chunk, Stream}
import sttp.capabilities.fs2.Fs2Streams

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration

class PlayServerTest extends TestSuite {

def actorSystemResource: Resource[IO, ActorSystem] =
Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void)

override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend =>
actorSystemResource.map { implicit actorSystem =>
implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher)
actorSystemResource.map { implicit _actorSystem =>
implicit val m: FutureMonad = new FutureMonad()(_actorSystem.dispatcher)

val interpreter = new PlayTestServerInterpreter()(actorSystem)
val interpreter = new PlayTestServerInterpreter()(_actorSystem)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)

def additionalTests(): List[Test] = List(
Expand Down Expand Up @@ -98,7 +109,61 @@ class PlayServerTest extends TestSuite {
}
}
.unsafeToFuture()
}
},
Test("chunked transmission lasts longer than given timeout") {
val chunkSize = 100
val beforeSendingSecondChunk: FiniteDuration = 2.second
val requestTimeout: FiniteDuration = 1.second

val e =
endpoint.post
.in(header[Long](HeaderNames.ContentLength))
.in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.out(header[Long](HeaderNames.ContentLength))
.out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain()))
.serverLogicSuccess[Future] { case (length, stream) =>
Future.successful(length, stream)
}

val components: DefaultPekkoHttpServerComponents = new DefaultPekkoHttpServerComponents {
val initialServerConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test)

val customConf: Configuration =
Configuration(
ConfigFactory.parseString(s"play { server.pekko.requestTimeout=${requestTimeout.toString} }")
)
override lazy val serverConfig: ServerConfig =
initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration))
override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher))
override lazy val router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/chunks")
}

def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = {
val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte))
val initialChunks = Stream.chunk(chunk)
val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk)
initialChunks ++ delayedChunk
}

val inputStream = createStream(chunkSize, beforeSendingSecondChunk)

val bind = IO.blocking(components.server)
Resource.make(bind)(s => IO.blocking(s.stop()))
.map(_.mainAddress.getPort)
.use { port =>
basicRequest
.post(uri"http://localhost:$port/chunks")
.contentLength(2 * chunkSize)
.streamBody(Fs2Streams[IO])(inputStream)
.send(backend)
.map{ response =>
response.code shouldBe StatusCode.Ok
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm well if this test passes, something is wrong - we set the timeout to 1s, so we should never receive a response if it takes 2s to send it? unless the request timeout is for something else?

anyway, this doesn't test the scenario from the test case - where the transmission is interrupted half-way because of connection problems; I don't know if we can simulate this in a test case, but using a timeout is a good approximation. But probably a good way to check if we can at all reproduce the bug is to run: a long-running client sender process; a server process; then kill -9 the client process when it's half-way sending the data, and seeing on the server if received the incomplete data in the server logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have given it another try as you suggested. There are playServer and longLastingClient but I don't know what's wrong with that approach. Suggestions are welcome 💡

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you write "I don't know what's wrong with this approach ", do you mean that it works as expected (that is: you run both, interrupt the client brutally after some time, and the server properly closes the connection), or is there something else that's wrong?

Copy link
Member Author

@sergiuszkierat sergiuszkierat Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying with the following steps:

  1. $ scala-cli run playServer.scala
[playServer-pekko.actor.default-dispatcher-4] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
[application-pekko.actor.default-dispatcher-6] INFO org.apache.pekko.event.slf4j.Slf4jLogger - Slf4jLogger started
[main] INFO play.api.Play - Application started (Dev) (no global state)
[main] INFO play.core.server.NettyServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
  1. $ scala-cli run longLastingClient.scala
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:14.514330
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:15.534314
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:16.553033
ByteString(65, 65, 65, 65, 65, 65, 65, 65, 65, 65) 17:46:17.573135
....
  1. server side
Received 10000 bytes, Source(SourceShape(Map.out(1467551936))) bytes in total
  1. $ ps aux | grep longLastingClient | awk '{print $2}' | head -n 1 | xargs kill -9

  2. and nothing new (error/exception/whatever) on server side 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lack of exception/error might be the problem here ;) but first there are two problems in the code:

  1. in the client code, you claim to send 10000 bytes in the content-length, but in fact you might send more?
  2. in the server code, you do stream.map(_.length), which just creates a Source[Long], that is a description of a stream that produces lenghts of received byte-strings (byte chunks). You never run (receive) the stream, and that's where you'd expect to see errors (when the stream is being run)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reproducing test case :). Though if you are closing the connection & socket on the client side, the server side receives a "data complete signal" (even though it's less than the declared amount), and continues to process it. So SimpleSubscriber.onComplete is called, which gathers all buffers into a byte array and passes it for parsing. Hence I would say that the current behavior is at least somewhat correct. What would you expect instead?

As for the LEAK warning, that must be a bug somewhere. Although this unfortunately I can't reproduce. I tried following your line of thought, but I can't see where the future is GCed before being fully handled?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect received bytes to be skipped (not pushed down) and 400 Bad request to be returned immediately.

Not sure if I can reproduce LEAK with this sample code but in my production code I can easily reproduce it by:

  1. Repeating partial requests in the loop
  2. Limiting the heap size for the server

After doing ~200-300 requests I get

2025-04-22 09:32:38,452 ERROR[KQueueEventLoopGroup-2-12] i.n.u.ResourceLeakDetector - [ResourceLeakDetector.java:337]  LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	Hint: 'HttpStreamsServerHandler#0-body-publisher' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:115)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
	org.playframework.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:194)
	org.playframework.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	org.playframework.netty.http.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:96)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1429)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:543)
	io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readEOF(AbstractKQueueChannel.java:552)
	io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.handle(AbstractKQueueChannel.java:435)
	io.netty.channel.kqueue.KQueueIoHandler$DefaultKqueueIoRegistration.handle(KQueueIoHandler.java:396)
	io.netty.channel.kqueue.KQueueIoHandler.processReady(KQueueIoHandler.java:181)
	io.netty.channel.kqueue.KQueueIoHandler.run(KQueueIoHandler.java:237)
	io.netty.channel.SingleThreadIoEventLoop.runIo(SingleThreadIoEventLoop.java:204)
	io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:175)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1073)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:1583)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks; could you maybe create two new issues: (1) return 400 if not all declared bytes are received, and (2) leak when running many incomplete requests in Netty?

I think these are separate from the one discussed in this PR

For (1) I'd also like to verify other servers, what's the "living standard"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know akka/pekko is doing exactly this. When I call toStrictEntity

HttpMessageParser returns EntityStreamError(ErrorInfo( "Entity stream truncation. The HTTP parser was receiving an entity when the underlying connection was closed unexpectedly.")).

toStrictEntity throws throw IllegalRequestException(StatusCodes.BadRequest, info) which is then handled inside ExceptionHandler.default

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.contentLength shouldBe Some(2 * chunkSize)
response.body shouldBe Right("A" * 2 * chunkSize)
}
}
.unsafeToFuture()
},
)

def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] =
Expand Down Expand Up @@ -135,4 +200,6 @@ class PlayServerTest extends TestSuite {
additionalTests()
}
}

override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout")
}
Loading