From 1b7ce2e993c790cee95775e1f8443347d23fdbfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 10:53:45 +0100 Subject: [PATCH 01/13] withConnectionPoolSettingsBuilderFromAttributeMap --- .../connectors/awsspi/PekkoHttpClient.scala | 42 +++++++-- .../awsspi/PekkoHttpClientSpec.scala | 90 ++++++++++++++++++- 2 files changed, 126 insertions(+), 6 deletions(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index 9aacfd2b5..e2b2aaaea 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -35,14 +35,16 @@ import pekko.util.ByteString import pekko.util.OptionConverters import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ -import software.amazon.awssdk.http.SdkHttpRequest +import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.{ Await, ExecutionContext } +import scala.jdk.DurationConverters._ -class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit +class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)( + implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { @@ -151,18 +153,42 @@ object PekkoHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } + // based on NettyNioAsyncHttpClient and ApacheHttpClient + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code + private[awsspi] def buildConnectionPoolSettings( + base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = { + def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = + if (duration.isZero) scala.concurrent.duration.Duration.Inf + else duration.toScala + + base + .withUpdatedConnectionSettings(s => + s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) + .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala)) + .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) + .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) + } + def builder() = PekkoHttpClientBuilder() case class PekkoHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None, private val executionContext: Option[ExecutionContext] = None, - private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) + private val connectionPoolSettings: Option[ConnectionPoolSettings] = None, + private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings = + (c, _) => c) extends SdkAsyncHttpClient.Builder[PekkoHttpClientBuilder] { - def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = { + def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = { implicit val as = actorSystem.getOrElse(ActorSystem("aws-pekko-http")) implicit val ec = executionContext.getOrElse(as.dispatcher) val mat: Materializer = SystemMaterializer(as).materializer - val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)) + val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS); + + val cps = connectionPoolSettingsBuilder( + connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), + resolvedOptions + ) val shutdownhandleF = () => { if (actorSystem.isEmpty) { Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), @@ -179,6 +205,12 @@ object PekkoHttpClient { copy(executionContext = Some(executionContext)) def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): PekkoHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings)) + def withConnectionPoolSettingsBuilder( + connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings + ): PekkoHttpClientBuilder = + copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder) + def withConnectionPoolSettingsBuilderFromAttributeMap(): PekkoHttpClientBuilder = + copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings) } lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible)) diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala index 6046aa1b6..3f2aca831 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala @@ -17,14 +17,21 @@ package org.apache.pekko.stream.connectors.awsspi -import java.util.Collections +import com.typesafe.config.ConfigFactory +import java.util.Collections import org.apache.pekko +import org.apache.pekko.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import pekko.http.scaladsl.model.headers.`Content-Type` import pekko.http.scaladsl.model.MediaTypes import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import software.amazon.awssdk.http.SdkHttpConfigurationOption +import software.amazon.awssdk.utils.AttributeMap + +import scala.concurrent.duration._ +import scala.jdk.DurationConverters._ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { @@ -47,5 +54,86 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { contentTypeHeader.value.lowercaseName() shouldBe `Content-Type`.lowercaseName reqHeaders should have size 1 } + "build() should use default ConnectionPoolSettings" in { + val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() + .build() + .asInstanceOf[PekkoHttpClient] + + pekkoClient.connectionSettings shouldBe ConnectionPoolSettings(ConfigFactory.load()) + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in { + val attributeMap = AttributeMap.builder() + .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.toJava) + .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.toJava) + .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3)) + .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.toJava) + .build() + val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .buildWithDefaults(attributeMap) + .asInstanceOf[PekkoHttpClient] + + pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe 1.second + pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe 2.seconds + pekkoClient.connectionSettings.maxConnections shouldBe 3 + pekkoClient.connectionSettings.maxConnectionLifetime shouldBe 4.seconds + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().build() should fallback to GLOBAL_HTTP_DEFAULTS" in { + val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .build() + .asInstanceOf[PekkoHttpClient] + + pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala + pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala + pekkoClient.connectionSettings.maxConnections shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() + infiniteToZero(pekkoClient.connectionSettings.maxConnectionLifetime) shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE) + } + + "withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + + val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilder((_, _) => connectionPoolSettings) + .build() + .asInstanceOf[PekkoHttpClient] + + pekkoClient.connectionSettings shouldBe connectionPoolSettings + } + + "withConnectionPoolSettings().build() should use passed ConnectionPoolSettings" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettings(connectionPoolSettings) + .build() + .asInstanceOf[PekkoHttpClient] + + pekkoClient.connectionSettings shouldBe connectionPoolSettings + } + } + + private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { + case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO + case duration: FiniteDuration => duration.toJava } } From 0d74f9070ea6b470b858802e784fd07ec594a01c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 11:11:40 +0100 Subject: [PATCH 02/13] scalafmt tests --- .../connectors/awsspi/PekkoHttpClientSpec.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala index 3f2aca831..fe7b261a6 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import java.util.Collections import org.apache.pekko -import org.apache.pekko.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import org.apache.pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings } import pekko.http.scaladsl.model.headers.`Content-Type` import pekko.http.scaladsl.model.MediaTypes import org.scalatest.OptionValues @@ -87,13 +87,13 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { .asInstanceOf[PekkoHttpClient] pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala pekkoClient.connectionSettings.maxConnections shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() infiniteToZero(pekkoClient.connectionSettings.maxConnectionLifetime) shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE) + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE) } "withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in { @@ -134,6 +134,6 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO - case duration: FiniteDuration => duration.toJava + case duration: FiniteDuration => duration.toJava } } From 9d83b878cf682c33649854aa98e2fc57bbba738b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 11:21:15 +0100 Subject: [PATCH 03/13] remove comment --- .../pekko/stream/connectors/awsspi/PekkoHttpClient.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index e2b2aaaea..0a9cfe698 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -153,9 +153,6 @@ object PekkoHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } - // based on NettyNioAsyncHttpClient and ApacheHttpClient - // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code - // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code private[awsspi] def buildConnectionPoolSettings( base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = { def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = From 239857fc2e9bddcf50aba89e93c08fd26a6a010a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 11:21:27 +0100 Subject: [PATCH 04/13] reorganize imports --- .../pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala index fe7b261a6..2308ff23e 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala @@ -17,13 +17,13 @@ package org.apache.pekko.stream.connectors.awsspi +import java.util.Collections import com.typesafe.config.ConfigFactory -import java.util.Collections import org.apache.pekko -import org.apache.pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings } import pekko.http.scaladsl.model.headers.`Content-Type` import pekko.http.scaladsl.model.MediaTypes +import pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings } import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec From f3f04f8027aa362e10b89894ebc7bf83e9ea2bbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 11:33:16 +0100 Subject: [PATCH 05/13] DurationConverters scala 2.12 --- .../stream/connectors/awsspi/PekkoHttpClient.scala | 13 ++++++------- .../connectors/awsspi/PekkoHttpClientSpec.scala | 14 +++++++------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index 0a9cfe698..df57812e2 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -32,7 +32,8 @@ import pekko.http.scaladsl.settings.ConnectionPoolSettings import pekko.stream.scaladsl.Source import pekko.stream.{ Materializer, SystemMaterializer } import pekko.util.ByteString -import pekko.util.OptionConverters +import pekko.util.OptionConverters._ +import pekko.util.JavaDurationConverters._ import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } @@ -41,7 +42,6 @@ import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.{ Await, ExecutionContext } -import scala.jdk.DurationConverters._ class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)( implicit @@ -86,8 +86,7 @@ object PekkoHttpClient { contentType: ContentType, contentPublisher: SdkHttpContentPublisher): RequestEntity = method.requestEntityAcceptance match { - case Expected => - OptionConverters.toScala(contentPublisher.contentLength()) match { + case Expected => contentPublisher.contentLength().toScala match { case Some(length) => HttpEntity(contentType, length, Source.fromPublisher(contentPublisher).map(ByteString(_))) case None => HttpEntity(contentType, Source.fromPublisher(contentPublisher).map(ByteString(_))) @@ -157,12 +156,12 @@ object PekkoHttpClient { base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = { def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = if (duration.isZero) scala.concurrent.duration.Duration.Inf - else duration.toScala + else duration.asScala base .withUpdatedConnectionSettings(s => - s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala) - .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala)) + s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).asScala) + .withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).asScala)) .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) } diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala index 2308ff23e..3ae36ed21 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala @@ -24,6 +24,7 @@ import org.apache.pekko import pekko.http.scaladsl.model.headers.`Content-Type` import pekko.http.scaladsl.model.MediaTypes import pekko.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings } +import pekko.util.JavaDurationConverters._ import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -31,7 +32,6 @@ import software.amazon.awssdk.http.SdkHttpConfigurationOption import software.amazon.awssdk.utils.AttributeMap import scala.concurrent.duration._ -import scala.jdk.DurationConverters._ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { @@ -64,10 +64,10 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in { val attributeMap = AttributeMap.builder() - .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.toJava) - .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.toJava) + .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, 1.second.asJava) + .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, 2.second.asJava) .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3)) - .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.toJava) + .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, 4.second.asJava) .build() val pekkoClient: PekkoHttpClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() .withConnectionPoolSettingsBuilderFromAttributeMap() @@ -87,9 +87,9 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { .asInstanceOf[PekkoHttpClient] pekkoClient.connectionSettings.connectionSettings.connectingTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).toScala + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).asScala pekkoClient.connectionSettings.connectionSettings.idleTimeout shouldBe - SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).toScala + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).asScala pekkoClient.connectionSettings.maxConnections shouldBe SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() infiniteToZero(pekkoClient.connectionSettings.maxConnectionLifetime) shouldBe @@ -134,6 +134,6 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO - case duration: FiniteDuration => duration.toJava + case duration: FiniteDuration => duration.asJava } } From 9838a3226b891cd0a5562f22aa8baf45b93174bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 16:27:41 +0100 Subject: [PATCH 06/13] SdkAsyncHttpClientH1TestSuite --- .../connectors/awsspi/PekkoHttpClient.scala | 52 +++++++++++++++++-- .../connectors/awsspi/{ => s3}/S3Test.java | 3 +- .../awsspi/PekkoHttpClientH1TestSuite.scala | 35 +++++++++++++ project/Dependencies.scala | 5 +- project/plugins.sbt | 2 + 5 files changed, 90 insertions(+), 7 deletions(-) rename aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/{ => s3}/S3Test.java (98%) create mode 100644 aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index df57812e2..5c68a60d6 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{ CompletableFuture, TimeUnit } import org.apache.pekko import pekko.actor.{ ActorSystem, ClassicActorSystemProvider } -import pekko.http.scaladsl.Http +import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext } import pekko.http.scaladsl.model.HttpHeader.ParsingResult import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok import pekko.http.scaladsl.model.MediaType.Compressible @@ -39,11 +39,18 @@ import software.amazon.awssdk.http.async._ import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } import software.amazon.awssdk.utils.AttributeMap +import java.security.SecureRandom +import java.security.cert.X509Certificate +import javax.net.ssl._ import scala.collection.immutable import scala.concurrent.duration.Duration import scala.concurrent.{ Await, ExecutionContext } -class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connectionSettings: ConnectionPoolSettings)( +class PekkoHttpClient( + shutdownHandle: () => Unit, + private[awsspi] val connectionSettings: ConnectionPoolSettings, + private[awsspi] val connectionContext: HttpsConnectionContext +)( implicit actorSystem: ActorSystem, ec: ExecutionContext, @@ -55,7 +62,8 @@ class PekkoHttpClient(shutdownHandle: () => Unit, private[awsspi] val connection override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = { val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) runner.run( - () => Http().singleRequest(pekkoHttpRequest, settings = connectionSettings), + () => + Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext), request.responseHandler()) } @@ -185,6 +193,12 @@ object PekkoHttpClient { connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions ) + + val connectionContext = + if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES)) + ConnectionContext.httpsClient(createInsecureSslEngine _) + else ConnectionContext.httpsClient(SSLContext.getDefault) + val shutdownhandleF = () => { if (actorSystem.isEmpty) { Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), @@ -192,7 +206,7 @@ object PekkoHttpClient { } () } - new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat) + new PekkoHttpClient(shutdownhandleF, cps, connectionContext)(as, ec, mat) } def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem)) def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder = @@ -223,4 +237,34 @@ object PekkoHttpClient { "application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded, "application/x-www-form-urlencoded" -> formUrlEncoded, "application/xml" -> applicationXml) + + private def createInsecureSslEngine(host: String, port: Int): SSLEngine = { + val engine = createTrustfulSslContext().createSSLEngine(host, port) + engine.setUseClientMode(true) + + // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures + // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to. + // When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with: + // engine.setSSLParameters({ + // val params = engine.getSSLParameters + // params.setEndpointIdentificationAlgorithm("https") + // params + // }) + + engine + } + + private def createTrustfulSslContext(): SSLContext = { + object NoCheckX509TrustManager extends X509TrustManager { + override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = () + + override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = () + + override def getAcceptedIssuers = Array[X509Certificate]() + } + + val context = SSLContext.getInstance("TLS") + context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom()) + context + } } diff --git a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java similarity index 98% rename from aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java rename to aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java index 528b285dd..3261e21ea 100644 --- a/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/S3Test.java +++ b/aws-spi-pekko-http/src/test/java/org/apache/pekko/stream/connectors/awsspi/s3/S3Test.java @@ -21,7 +21,6 @@ import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService; import org.junit.Rule; import org.junit.Test; -import org.scalatestplus.junit.JUnitSuite; import org.testcontainers.containers.GenericContainer; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.core.ResponseBytes; @@ -44,7 +43,7 @@ import static org.junit.Assert.assertEquals; -public class S3Test extends JUnitSuite { +public class S3Test { private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; private static SecureRandom rnd = new SecureRandom(); diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala new file mode 100644 index 000000000..459dff2a9 --- /dev/null +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.connectors.awsspi + +import software.amazon.awssdk.http.{ SdkAsyncHttpClientH1TestSuite, SdkHttpConfigurationOption } +import software.amazon.awssdk.http.async.SdkAsyncHttpClient +import software.amazon.awssdk.utils.AttributeMap + +class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite { + + override def setupClient(): SdkAsyncHttpClient = { + PekkoHttpClient.builder().buildWithDefaults( + AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.box(true)).build()); + } + + // Failed tests + override def naughtyHeaderCharactersDoNotGetToServer(): Unit = () + override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = () + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 137bb6b95..2d7ae2901 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -10,6 +10,7 @@ import sbt._ import Common.isScala3 import Keys._ +import com.github.sbt.junit.jupiter.sbt.Import.JupiterKeys object Dependencies { @@ -142,9 +143,11 @@ object Dependencies { ExclusionRule("software.amazon.awssdk", "netty-nio-client")), ("software.amazon.awssdk" % "s3" % AwsSdk2Version % "it,test").excludeAll( ExclusionRule("software.amazon.awssdk", "netty-nio-client")), + ("software.amazon.awssdk" % "http-client-tests" % AwsSdk2Version % "it,test").excludeAll( + ExclusionRule("software.amazon.awssdk", "netty-nio-client")), "com.dimafeng" %% "testcontainers-scala" % TestContainersScalaTestVersion % Test, + "com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test, "org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test", - "org.scalatestplus" %% "junit-4-13" % scalaTestScalaCheckVersion % "it,test", "ch.qos.logback" % "logback-classic" % LogbackVersion % "it,test")) val AwsLambda = Seq( diff --git a/project/plugins.sbt b/project/plugins.sbt index 37abc6f69..609ec7be5 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -31,3 +31,5 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2") addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.1.0-M1") // templating addSbtPlugin("com.github.sbt" % "sbt-boilerplate" % "0.7.0") +// Run JUnit 5 tests with sbt +addSbtPlugin("com.github.sbt.junit" % "sbt-jupiter-interface" % "0.13.0") From 136d1d3c68c024122a279d05ac5b643af0a2843a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 16:41:39 +0100 Subject: [PATCH 07/13] fix naughtyHeaderCharactersDoNotGetToServer test exceptions in toPekkoRequest should be wrapped in a future failed --- .../pekko/stream/connectors/awsspi/PekkoHttpClient.scala | 7 ++++--- .../pekko/stream/connectors/awsspi/RequestRunner.scala | 5 +---- .../connectors/awsspi/PekkoHttpClientH1TestSuite.scala | 1 - 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index 5c68a60d6..8f26362cb 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -60,10 +60,11 @@ class PekkoHttpClient( lazy val runner = new RequestRunner() override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = { - val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) runner.run( - () => - Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext), + () => { + val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) + Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext) + }, request.responseHandler()) } diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala index fa1335bce..0be379ec4 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala @@ -26,7 +26,6 @@ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` } import pekko.stream.Materializer import pekko.stream.scaladsl.{ Keep, Sink } import pekko.util.FutureConverters -import org.slf4j.LoggerFactory import software.amazon.awssdk.http.SdkHttpFullResponse import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler @@ -34,10 +33,8 @@ import scala.concurrent.{ ExecutionContext, Future } class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) { - val logger = LoggerFactory.getLogger(this.getClass) - def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = { - val result = runRequest().flatMap { response => + val result = Future.delegate(runRequest()).flatMap { response => handler.onHeaders(toSdkHttpFullResponse(response)) val (complete, publisher) = response.entity.dataBytes diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala index 459dff2a9..0eaf1f280 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala @@ -29,7 +29,6 @@ class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite { } // Failed tests - override def naughtyHeaderCharactersDoNotGetToServer(): Unit = () override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = () } From 56dc24550d56ff8b6992b150a329ad7479ca9d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 16:53:33 +0100 Subject: [PATCH 08/13] add comment about connectionReceiveServerErrorStatusShouldNotReuseConnection --- .../stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala index 0eaf1f280..06148812b 100644 --- a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala +++ b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientH1TestSuite.scala @@ -29,6 +29,8 @@ class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite { } // Failed tests + // The logic to not reuse connections on server error status is not implemented in PekkoHttpClient, and + // it seems that it is being reverted in https://github.com/aws/aws-sdk-java-v2/pull/5607 override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = () } From 3aae0a2cc2e27ae9e1df586d79233a81f63bdd34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 18:36:49 +0100 Subject: [PATCH 09/13] scala 2.12 strikes again --- .../pekko/stream/connectors/awsspi/PekkoHttpClient.scala | 2 +- .../apache/pekko/stream/connectors/awsspi/RequestRunner.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index 8f26362cb..b3ad36746 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -196,7 +196,7 @@ object PekkoHttpClient { ) val connectionContext = - if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES)) + if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue()) ConnectionContext.httpsClient(createInsecureSslEngine _) else ConnectionContext.httpsClient(SSLContext.getDefault) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala index 0be379ec4..ac01b0edb 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/RequestRunner.scala @@ -34,7 +34,8 @@ import scala.concurrent.{ ExecutionContext, Future } class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) { def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = { - val result = Future.delegate(runRequest()).flatMap { response => + // Future.unit.flatMap(expr) is a scala 2.12 equivalent of Future.delegate(expr) + val result = Future.unit.flatMap(_ => runRequest()).flatMap { response => handler.onHeaders(toSdkHttpFullResponse(response)) val (complete, publisher) = response.entity.dataBytes From 7c7240b650826a78cd79b6bae48b8aea2c55013f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 18:39:48 +0100 Subject: [PATCH 10/13] silence netty logs SdkAsyncHttpClientH1TestSuite uses netty to simulate a server... --- aws-spi-pekko-http/src/test/resources/logback-test.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws-spi-pekko-http/src/test/resources/logback-test.xml b/aws-spi-pekko-http/src/test/resources/logback-test.xml index 5d1d29de5..bf6018724 100644 --- a/aws-spi-pekko-http/src/test/resources/logback-test.xml +++ b/aws-spi-pekko-http/src/test/resources/logback-test.xml @@ -15,7 +15,7 @@ - + From 7aa78c3ea6b9f6e9573014c4dbe578e7246f14f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 19:54:05 +0100 Subject: [PATCH 11/13] Kinesis integration test puzzle --- .../src/it/resources/application.conf | 2 + .../awsspi/kinesis/KinesisITTest.scala | 18 ++++- .../connectors/awsspi/PekkoHttpClient.scala | 81 +++++++++++++++---- 3 files changed, 83 insertions(+), 18 deletions(-) diff --git a/aws-spi-pekko-http/src/it/resources/application.conf b/aws-spi-pekko-http/src/it/resources/application.conf index 8a2cb9651..9c91829f7 100644 --- a/aws-spi-pekko-http/src/it/resources/application.conf +++ b/aws-spi-pekko-http/src/it/resources/application.conf @@ -1,3 +1,5 @@ # SPDX-License-Identifier: Apache-2.0 pekko.http.client.parsing.max-content-length = 15m +pekko.http.client.log-unencrypted-network-bytes = 1000 +pekko.loglevel = "DEBUG" diff --git a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala index 348987b62..ae6d7ad47 100644 --- a/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala +++ b/aws-spi-pekko-http/src/it/scala/org/apache/pekko/stream/connectors/awsspi/kinesis/KinesisITTest.scala @@ -31,25 +31,37 @@ class KinesisITTest extends AnyWordSpec with Matchers with TestBase { def withClient(testCode: KinesisAsyncClient => Any): Any = { - val pekkoClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build() + // TODO use pekkoClientBuilder instead of httpClient in other withClient methods + val pekkoClientBuilder = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory() val client = KinesisAsyncClient .builder() .credentialsProvider(credentialProviderChain) .region(defaultRegion) - .httpClient(pekkoClient) + .httpClientBuilder(pekkoClientBuilder) .build() try testCode(client) finally { // clean up - pekkoClient.close() client.close() } } "Kinesis async client" should { + "list streams" in withClient { implicit client => + val result = client.listStreams().join() + result.streamNames() should not be null + } + "list streams in parallel" in withClient { implicit client => + // if the number of requests is changed from 5 to 6, then the test will be stuck for 60s and then complete correctly + val x = for (_ <- 1 to 5) yield { + client.listStreams() + } + x.foreach(_.join().streamNames() should not be null) + } + "use a data stream: create + put + get + delete" in withClient { implicit client => val streamName = "aws-spi-test-" + Random.alphanumeric.take(10).filterNot(_.isUpper).mkString val data = "123" diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index b3ad36746..719b88b31 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -18,10 +18,9 @@ package org.apache.pekko.stream.connectors.awsspi import java.util.concurrent.{ CompletableFuture, TimeUnit } - import org.apache.pekko import pekko.actor.{ ActorSystem, ClassicActorSystemProvider } -import pekko.http.scaladsl.{ ConnectionContext, Http, HttpsConnectionContext } +import pekko.http.scaladsl._ import pekko.http.scaladsl.model.HttpHeader.ParsingResult import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok import pekko.http.scaladsl.model.MediaType.Compressible @@ -29,14 +28,14 @@ import pekko.http.scaladsl.model.RequestEntityAcceptance.Expected import pekko.http.scaladsl.model._ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` } import pekko.http.scaladsl.settings.ConnectionPoolSettings -import pekko.stream.scaladsl.Source -import pekko.stream.{ Materializer, SystemMaterializer } +import pekko.stream.scaladsl._ +import pekko.stream.{ Materializer, OverflowStrategy, SystemMaterializer } import pekko.util.ByteString import pekko.util.OptionConverters._ import pekko.util.JavaDurationConverters._ import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ -import software.amazon.awssdk.http.{ SdkHttpConfigurationOption, SdkHttpRequest } +import software.amazon.awssdk.http.{ Protocol, SdkHttpConfigurationOption, SdkHttpRequest } import software.amazon.awssdk.utils.AttributeMap import java.security.SecureRandom @@ -44,10 +43,11 @@ import java.security.cert.X509Certificate import javax.net.ssl._ import scala.collection.immutable import scala.concurrent.duration.Duration -import scala.concurrent.{ Await, ExecutionContext } +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } class PekkoHttpClient( shutdownHandle: () => Unit, + protocol: HttpProtocol, private[awsspi] val connectionSettings: ConnectionPoolSettings, private[awsspi] val connectionContext: HttpsConnectionContext )( @@ -57,15 +57,55 @@ class PekkoHttpClient( mat: Materializer) extends SdkAsyncHttpClient { import PekkoHttpClient._ - lazy val runner = new RequestRunner() + private lazy val runner = new RequestRunner() + private lazy val http2connectionFlows = + new java.util.concurrent.ConcurrentHashMap[Uri, SourceQueueWithComplete[HttpRequest]]() override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = { - runner.run( - () => { - val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) - Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext) - }, - request.responseHandler()) + + logger.debug(s"Executing with protocol: $protocol") + + if (protocol == HttpProtocols.`HTTP/2.0`) { + val useTls = request.request().protocol() == "https" + val akkaHttpRequest = toPekkoRequest(/*protocol, */ request.request(), request.requestContentPublisher()) + val uri = akkaHttpRequest.effectiveUri(securedConnection = useTls) + val queue = http2connectionFlows.computeIfAbsent(uri, + _ => { + val baseConnection = Http() + .connectionTo(request.request().host()) + .toPort(request.request().port()) + .withCustomHttpsConnectionContext(connectionContext) + val http2client = request.request().protocol() match { + case "http" => baseConnection.managedPersistentHttp2WithPriorKnowledge() + case "https" => baseConnection.managedPersistentHttp2() + case _ => throw new IllegalArgumentException("Unsupported protocol") + } + Source + .queue[HttpRequest](4242, OverflowStrategy.fail) + .via(http2client) + .to(Sink.foreach { res => + res.attribute(ResponsePromise.Key).get.promise.trySuccess(res) + }) + .run() + }) + + val dispatch: HttpRequest => Future[HttpResponse] = req => { + val p = Promise[HttpResponse]() + queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))).flatMap(_ => p.future) + } + + runner.run( + () => dispatch(akkaHttpRequest), + request.responseHandler() + ) + } else { + runner.run( + () => { + val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher()) + Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext) + }, + request.responseHandler()) + } } override def close(): Unit = @@ -76,7 +116,7 @@ class PekkoHttpClient( object PekkoHttpClient { - val logger = LoggerFactory.getLogger(this.getClass) + private val logger = LoggerFactory.getLogger(this.getClass) private[awsspi] def toPekkoRequest(request: SdkHttpRequest, contentPublisher: SdkHttpContentPublisher): HttpRequest = { @@ -188,8 +228,12 @@ object PekkoHttpClient { implicit val ec = executionContext.getOrElse(as.dispatcher) val mat: Materializer = SystemMaterializer(as).materializer + println("serviceDefaults: " + serviceDefaults) + val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS); + val protocol = toProtocol(resolvedOptions.get(SdkHttpConfigurationOption.PROTOCOL)) + val cps = connectionPoolSettingsBuilder( connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions @@ -207,8 +251,9 @@ object PekkoHttpClient { } () } - new PekkoHttpClient(shutdownhandleF, cps, connectionContext)(as, ec, mat) + new PekkoHttpClient(shutdownhandleF, protocol, cps, connectionContext)(as, ec, mat) } + def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem)) def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem)) @@ -239,6 +284,12 @@ object PekkoHttpClient { "application/x-www-form-urlencoded" -> formUrlEncoded, "application/xml" -> applicationXml) + private def toProtocol(protocol: Protocol): HttpProtocol = protocol match { + case Protocol.HTTP2 => HttpProtocols.`HTTP/2.0` + case Protocol.HTTP1_1 => HttpProtocols.`HTTP/1.1` + case _ => throw new IllegalArgumentException(s"Unsupported protocol: $protocol") + } + private def createInsecureSslEngine(host: String, port: Int): SSLEngine = { val engine = createTrustfulSslContext().createSSLEngine(host, port) engine.setUseClientMode(true) From 56a14934066714a1eaec2c383559b5ecc786ee42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 25 Sep 2024 20:11:24 +0100 Subject: [PATCH 12/13] close http2connectionFlows --- .../pekko/stream/connectors/awsspi/PekkoHttpClient.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala index 719b88b31..c0f29afed 100644 --- a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala +++ b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala @@ -108,8 +108,10 @@ class PekkoHttpClient( } } - override def close(): Unit = + override def close(): Unit = { + http2connectionFlows.values().iterator().forEachRemaining(_.complete()) shutdownHandle() + } override def clientName(): String = "pekko-http" } From f2ab11c088e0f08e3e6e39063f258eeb2027033a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Thu, 26 Sep 2024 15:09:35 +0100 Subject: [PATCH 13/13] enable http2.log-frames --- aws-spi-pekko-http/src/it/resources/application.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/aws-spi-pekko-http/src/it/resources/application.conf b/aws-spi-pekko-http/src/it/resources/application.conf index 9c91829f7..a21b38f23 100644 --- a/aws-spi-pekko-http/src/it/resources/application.conf +++ b/aws-spi-pekko-http/src/it/resources/application.conf @@ -2,4 +2,5 @@ pekko.http.client.parsing.max-content-length = 15m pekko.http.client.log-unencrypted-network-bytes = 1000 +pekko.http.client.http2.log-frames = true pekko.loglevel = "DEBUG"