From f1aad13333594b2418d53a06d86ca0291c10860b Mon Sep 17 00:00:00 2001 From: Sergiusz Kierat Date: Thu, 2 Jan 2025 12:56:00 +0100 Subject: [PATCH] fixes after review --- .../streaming/longLastingClient.scala | 18 ++++++++++++----- .../tapir/examples/streaming/playServer.scala | 20 +++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala index c94e859626..ba6d961d3e 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/longLastingClient.scala @@ -2,7 +2,7 @@ //> using dep org.apache.pekko::pekko-stream:1.1.2 //> using dep org.typelevel::cats-effect:3.5.7 //> using dep com.softwaremill.sttp.client3::core:3.10.2 -//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.1 +//> using dep com.softwaremill.sttp.client3::pekko-http-backend:3.10.2 package sttp.tapir.examples.streaming @@ -28,14 +28,22 @@ import scala.concurrent.duration.FiniteDuration object longLastingClient extends IOApp: implicit val actorSystem: ActorSystem = ActorSystem("longLastingClient") + private val givenLength: Long = 10000 + private val chunkSize = 100 + private val noChunks = givenLength / chunkSize + private def makeRequest(backend: SttpBackend[Future, PekkoStreams & WebSockets]): Future[Response[Either[String, String]]] = - val stream: Source[ByteString, Any] = Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(10)('A').map(_.toByte))).map { elem => - println(s"$elem ${java.time.LocalTime.now()}"); elem - } + val stream: Source[ByteString, Any] = + Source.tick(1.seconds, 1.seconds, ByteString(Array.fill(chunkSize)('A').map(_.toByte))) + .zipWithIndex + .take(noChunks) + .map { case (chunk, idx) => + println(s"Chunk ${idx + 1} sent ${java.time.LocalTime.now()}"); chunk + } basicRequest .post(uri"http://localhost:9000/chunks") - .header(Header(HeaderNames.ContentLength, "10000")) + .header(Header(HeaderNames.ContentLength, givenLength.toString)) .streamBody(PekkoStreams)(stream) .send(backend) diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala index 756c4fafde..e281ed58c3 100644 --- a/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/playServer.scala @@ -23,7 +23,7 @@ import sttp.tapir.* import scala.concurrent.{ExecutionContext, Future} import scala.util.* import org.apache.pekko -import pekko.stream.scaladsl.{Flow, Source} +import pekko.stream.scaladsl.{Flow, Source, Sink} import pekko.util.ByteString import sttp.tapir.server.play.PlayServerOptions @@ -41,18 +41,26 @@ def handleErrors[T](f: Future[T]): Future[Either[ErrorInfo, T]] = Success(Left(e.getMessage)) } -def logic(s: (Long, Source[ByteString, Any])): Future[(Long, Source[ByteString, Any])] = { +def logic(s: (Long, Source[ByteString, Any])): Future[String] = { val (length, stream) = s - println(s"Received $length bytes, ${stream.map(_.length)} bytes in total") - Future.successful((length, stream)) + println(s"Transmitting $length bytes...") + val result = stream + .runFold(List.empty[ByteString])((acc, byteS) => acc :+ byteS) + .map(_.reduce(_ ++ _).decodeString("UTF-8")) + result.onComplete { + case Failure(ex) => + println(s"Stream failed with exception: $ex" ) + case Success(s) => + println(s"Stream finished: ${s.length}/$length transmitted") + } + result } val e = endpoint.post .in("chunks") .in(header[Long](HeaderNames.ContentLength)) .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) - .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(stringBody) .errorOut(plainBody[ErrorInfo]) .serverLogic(logic.andThen(handleErrors))