Skip to content

Commit

Permalink
Zio support
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 27, 2024
1 parent 68363ba commit 55f1b24
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import java.util.concurrent.Flow.Publisher
import java.{util => ju}
import scala.collection.JavaConverters._
import sttp.client4.compression.Compressor
import sttp.client4.httpclient.fs2.compression.{DeflateFs2Compressor, GZipFs2Compressor}

class HttpClientFs2Backend[F[_]: Async] private (
client: HttpClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttp.client4.httpclient.fs2.compression
package sttp.client4.httpclient.fs2

import sttp.client4._
import sttp.client4.GenericRequestBody
Expand All @@ -17,12 +17,11 @@ trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] {

override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
body match {
case InputStreamBody(b, defaultContentType) =>
case InputStreamBody(b, _) =>
StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(b.pure[F](fSync), 1024)(fSync)))
case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]]))
case FileBody(f, defaultContentType) =>
StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024)))
case _ => super.apply(body, encoding)
case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]]))
case FileBody(f, _) => StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024)))
case _ => super.apply(body, encoding)
}

def compressStream(stream: fs2.Stream[F, Byte]): fs2.Stream[F, Byte]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import sttp.client4.internal._
import sttp.client4.internal.httpclient.{BodyFromHttpClient, BodyToHttpClient, Sequencer}
import sttp.client4.internal.ws.SimpleQueue
import sttp.client4.testing.WebSocketStreamBackendStub
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.{wrappers, BackendOptions, GenericRequest, Response, WebSocketStreamBackend}
import sttp.monad.MonadError
import zio.Chunk.ByteArray
Expand All @@ -25,6 +24,7 @@ import java.nio.ByteBuffer
import java.util
import java.util.concurrent.Flow.Publisher
import java.{util => ju}
import sttp.client4.compression.Compressor

class HttpClientZioBackend private (
client: HttpClient,
Expand Down Expand Up @@ -58,10 +58,11 @@ class HttpClientZioBackend private (
ByteArray(a, 0, a.length)
}

override protected val bodyToHttpClient: BodyToHttpClient[Task, ZioStreams] =
new BodyToHttpClient[Task, ZioStreams] {
override protected val bodyToHttpClient: BodyToHttpClient[Task, ZioStreams, R] =
new BodyToHttpClient[Task, ZioStreams, R] {
override val streams: ZioStreams = ZioStreams
override implicit def monad: MonadError[Task] = self.monad
override def compressors: List[Compressor[R]] = List(new GZipZioCompressor[R](), new DeflateZioCompressor[R]())
override def streamToPublisher(stream: ZStream[Any, Throwable, Byte]): Task[BodyPublisher] = {
import _root_.zio.interop.reactivestreams.{streamToPublisher => zioStreamToPublisher}
val publisher = stream.mapChunks(byteChunk => Chunk(ByteBuffer.wrap(byteChunk.toArray))).toPublisher
Expand All @@ -88,7 +89,7 @@ class HttpClientZioBackend private (
override protected def standardEncoding: (ZStream[Any, Throwable, Byte], String) => ZStream[Any, Throwable, Byte] = {
case (body, "gzip") => body.via(ZPipeline.gunzip())
case (body, "deflate") =>
ZStream.scoped(body.peel(ZSink.take[Byte](1))).flatMap { case (chunk, stream) =>
ZStream.scoped[Any](body.peel(ZSink.take[Byte](1))).flatMap { case (chunk, stream) =>
val wrapped = chunk.headOption.exists(byte => (byte & 0x0f) == 0x08)
(ZStream.fromChunk(chunk) ++ stream).via(ZPipeline.inflate(noWrap = !wrapped))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package sttp.client4.httpclient.zio

import sttp.client4._
import sttp.client4.compression.Compressor
import sttp.capabilities.zio.ZioStreams

import zio.stream.Stream
import sttp.client4.compression.GZipDefaultCompressor
import sttp.client4.compression.DeflateDefaultCompressor
import zio.stream.ZPipeline
import zio.stream.ZStream

trait ZioCompressor[R <: ZioStreams] extends Compressor[R] {
override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
body match {
case InputStreamBody(b, _) => StreamBody(ZioStreams)(compressStream(ZStream.fromInputStream(b)))
case StreamBody(b) => StreamBody(ZioStreams)(compressStream(b.asInstanceOf[Stream[Throwable, Byte]]))
case FileBody(f, _) => StreamBody(ZioStreams)(compressStream(ZStream.fromFile(f.toFile)))
case _ => super.apply(body, encoding)
}

def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte]
}

class GZipZioCompressor[R <: ZioStreams] extends GZipDefaultCompressor[R] with ZioCompressor[R] {
def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.gzip())
}

class DeflateZioCompressor[R <: ZioStreams] extends DeflateDefaultCompressor[R] with ZioCompressor[R] {
def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.deflate())
}

0 comments on commit 55f1b24

Please sign in to comment.