Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Support HTTP2 protocol + Kinesis integration test puzzle #830

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions aws-spi-pekko-http/src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# SPDX-License-Identifier: Apache-2.0

pekko.http.client.parsing.max-content-length = 15m
pekko.http.client.log-unencrypted-network-bytes = 1000
pekko.http.client.http2.log-frames = true
pekko.loglevel = "DEBUG"
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,37 @@ class KinesisITTest extends AnyWordSpec with Matchers with TestBase {

def withClient(testCode: KinesisAsyncClient => Any): Any = {

val pekkoClient = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory().build()
// TODO use pekkoClientBuilder instead of httpClient in other withClient methods
val pekkoClientBuilder = new PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()

val client = KinesisAsyncClient
.builder()
.credentialsProvider(credentialProviderChain)
.region(defaultRegion)
.httpClient(pekkoClient)
.httpClientBuilder(pekkoClientBuilder)
.build()

try
testCode(client)
finally { // clean up
pekkoClient.close()
client.close()
}
}

"Kinesis async client" should {

"list streams" in withClient { implicit client =>
val result = client.listStreams().join()
result.streamNames() should not be null
}
"list streams in parallel" in withClient { implicit client =>
// if the number of requests is changed from 5 to 6, then the test will be stuck for 60s and then complete correctly
val x = for (_ <- 1 to 5) yield {
client.listStreams()
}
x.foreach(_.join().streamNames() should not be null)
}

"use a data stream: create + put + get + delete" in withClient { implicit client =>
val streamName = "aws-spi-test-" + Random.alphanumeric.take(10).filterNot(_.isUpper).mkString
val data = "123"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,107 @@
package org.apache.pekko.stream.connectors.awsspi

import java.util.concurrent.{ CompletableFuture, TimeUnit }

import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.http.scaladsl.Http
import pekko.http.scaladsl._
import pekko.http.scaladsl.model.HttpHeader.ParsingResult
import pekko.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import pekko.http.scaladsl.model.MediaType.Compressible
import pekko.http.scaladsl.model.RequestEntityAcceptance.Expected
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
import pekko.http.scaladsl.settings.ConnectionPoolSettings
import pekko.stream.scaladsl.Source
import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.stream.scaladsl._
import pekko.stream.{ Materializer, OverflowStrategy, SystemMaterializer }
import pekko.util.ByteString
import pekko.util.OptionConverters
import pekko.util.OptionConverters._
import pekko.util.JavaDurationConverters._
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.async._
import software.amazon.awssdk.http.SdkHttpRequest
import software.amazon.awssdk.http.{ Protocol, SdkHttpConfigurationOption, SdkHttpRequest }
import software.amazon.awssdk.utils.AttributeMap

import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl._
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext }
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }

class PekkoHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit
class PekkoHttpClient(
shutdownHandle: () => Unit,
protocol: HttpProtocol,
private[awsspi] val connectionSettings: ConnectionPoolSettings,
private[awsspi] val connectionContext: HttpsConnectionContext
)(
implicit
actorSystem: ActorSystem,
ec: ExecutionContext,
mat: Materializer) extends SdkAsyncHttpClient {
import PekkoHttpClient._

lazy val runner = new RequestRunner()
private lazy val runner = new RequestRunner()
private lazy val http2connectionFlows =
new java.util.concurrent.ConcurrentHashMap[Uri, SourceQueueWithComplete[HttpRequest]]()

override def execute(request: AsyncExecuteRequest): CompletableFuture[Void] = {
val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
runner.run(
() => Http().singleRequest(pekkoHttpRequest, settings = connectionSettings),
request.responseHandler())

logger.debug(s"Executing with protocol: $protocol")

if (protocol == HttpProtocols.`HTTP/2.0`) {
val useTls = request.request().protocol() == "https"
val akkaHttpRequest = toPekkoRequest(/*protocol, */ request.request(), request.requestContentPublisher())
val uri = akkaHttpRequest.effectiveUri(securedConnection = useTls)
val queue = http2connectionFlows.computeIfAbsent(uri,
_ => {
val baseConnection = Http()
.connectionTo(request.request().host())
.toPort(request.request().port())
.withCustomHttpsConnectionContext(connectionContext)
val http2client = request.request().protocol() match {
case "http" => baseConnection.managedPersistentHttp2WithPriorKnowledge()
case "https" => baseConnection.managedPersistentHttp2()
case _ => throw new IllegalArgumentException("Unsupported protocol")
}
Source
.queue[HttpRequest](4242, OverflowStrategy.fail)
.via(http2client)
.to(Sink.foreach { res =>
res.attribute(ResponsePromise.Key).get.promise.trySuccess(res)
})
.run()
})

val dispatch: HttpRequest => Future[HttpResponse] = req => {
val p = Promise[HttpResponse]()
queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))).flatMap(_ => p.future)
}

runner.run(
() => dispatch(akkaHttpRequest),
request.responseHandler()
)
} else {
runner.run(
() => {
val pekkoHttpRequest = toPekkoRequest(request.request(), request.requestContentPublisher())
Http().singleRequest(pekkoHttpRequest, settings = connectionSettings, connectionContext = connectionContext)
},
request.responseHandler())
}
}

override def close(): Unit =
override def close(): Unit = {
http2connectionFlows.values().iterator().forEachRemaining(_.complete())
shutdownHandle()
}

override def clientName(): String = "pekko-http"
}

object PekkoHttpClient {

val logger = LoggerFactory.getLogger(this.getClass)
private val logger = LoggerFactory.getLogger(this.getClass)

private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
contentPublisher: SdkHttpContentPublisher): HttpRequest = {
Expand All @@ -84,8 +137,7 @@ object PekkoHttpClient {
contentType: ContentType,
contentPublisher: SdkHttpContentPublisher): RequestEntity =
method.requestEntityAcceptance match {
case Expected =>
OptionConverters.toScala(contentPublisher.contentLength()) match {
case Expected => contentPublisher.contentLength().toScala match {
case Some(length) =>
HttpEntity(contentType, length, Source.fromPublisher(contentPublisher).map(ByteString(_)))
case None => HttpEntity(contentType, Source.fromPublisher(contentPublisher).map(ByteString(_)))
Expand Down Expand Up @@ -151,34 +203,72 @@ object PekkoHttpClient {
else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.")
}

private[awsspi] def buildConnectionPoolSettings(
base: ConnectionPoolSettings, attributeMap: AttributeMap): ConnectionPoolSettings = {
def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration =
if (duration.isZero) scala.concurrent.duration.Duration.Inf
else duration.asScala

base
.withUpdatedConnectionSettings(s =>
s.withConnectingTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT).asScala)
.withIdleTimeout(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT).asScala))
.withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue())
.withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)))
}

def builder() = PekkoHttpClientBuilder()

case class PekkoHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None,
private val executionContext: Option[ExecutionContext] = None,
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None)
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None,
private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings =
(c, _) => c)
extends SdkAsyncHttpClient.Builder[PekkoHttpClientBuilder] {
def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = {
implicit val as = actorSystem.getOrElse(ActorSystem("aws-pekko-http"))
implicit val ec = executionContext.getOrElse(as.dispatcher)
val mat: Materializer = SystemMaterializer(as).materializer

val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
println("serviceDefaults: " + serviceDefaults)

val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS);

val protocol = toProtocol(resolvedOptions.get(SdkHttpConfigurationOption.PROTOCOL))

val cps = connectionPoolSettingsBuilder(
connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)),
resolvedOptions
)

val connectionContext =
if (resolvedOptions.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES).booleanValue())
ConnectionContext.httpsClient(createInsecureSslEngine _)
else ConnectionContext.httpsClient(SSLContext.getDefault)

val shutdownhandleF = () => {
if (actorSystem.isEmpty) {
Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()),
Duration.apply(10, TimeUnit.SECONDS))
}
()
}
new PekkoHttpClient(shutdownhandleF, cps)(as, ec, mat)
new PekkoHttpClient(shutdownhandleF, protocol, cps, connectionContext)(as, ec, mat)
}

def withActorSystem(actorSystem: ActorSystem): PekkoHttpClientBuilder = copy(actorSystem = Some(actorSystem))
def withActorSystem(actorSystem: ClassicActorSystemProvider): PekkoHttpClientBuilder =
copy(actorSystem = Some(actorSystem.classicSystem))
def withExecutionContext(executionContext: ExecutionContext): PekkoHttpClientBuilder =
copy(executionContext = Some(executionContext))
def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): PekkoHttpClientBuilder =
copy(connectionPoolSettings = Some(connectionPoolSettings))
def withConnectionPoolSettingsBuilder(
connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings
): PekkoHttpClientBuilder =
copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder)
def withConnectionPoolSettingsBuilderFromAttributeMap(): PekkoHttpClientBuilder =
copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings)
}

lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible))
Expand All @@ -195,4 +285,40 @@ object PekkoHttpClient {
"application/x-www-form-urlencoded; charset-UTF-8" -> formUrlEncoded,
"application/x-www-form-urlencoded" -> formUrlEncoded,
"application/xml" -> applicationXml)

private def toProtocol(protocol: Protocol): HttpProtocol = protocol match {
case Protocol.HTTP2 => HttpProtocols.`HTTP/2.0`
case Protocol.HTTP1_1 => HttpProtocols.`HTTP/1.1`
case _ => throw new IllegalArgumentException(s"Unsupported protocol: $protocol")
}

private def createInsecureSslEngine(host: String, port: Int): SSLEngine = {
val engine = createTrustfulSslContext().createSSLEngine(host, port)
engine.setUseClientMode(true)

// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
// When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with:
// engine.setSSLParameters({
// val params = engine.getSSLParameters
// params.setEndpointIdentificationAlgorithm("https")
// params
// })

engine
}

private def createTrustfulSslContext(): SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String) = ()

override def checkServerTrusted(chain: Array[X509Certificate], authType: String) = ()

override def getAcceptedIssuers = Array[X509Certificate]()
}

val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), new SecureRandom())
context
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ import pekko.http.scaladsl.model.headers.{ `Content-Length`, `Content-Type` }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Keep, Sink }
import pekko.util.FutureConverters
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.SdkHttpFullResponse
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler

import scala.concurrent.{ ExecutionContext, Future }

class RequestRunner()(implicit ec: ExecutionContext, mat: Materializer) {

val logger = LoggerFactory.getLogger(this.getClass)

def run(runRequest: () => Future[HttpResponse], handler: SdkAsyncHttpResponseHandler): CompletableFuture[Void] = {
val result = runRequest().flatMap { response =>
// Future.unit.flatMap(expr) is a scala 2.12 equivalent of Future.delegate(expr)
val result = Future.unit.flatMap(_ => runRequest()).flatMap { response =>
handler.onHeaders(toSdkHttpFullResponse(response))

val (complete, publisher) = response.entity.dataBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.pekko.stream.connectors.awsspi.PekkoHttpAsyncHttpService;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import org.testcontainers.containers.GenericContainer;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.ResponseBytes;
Expand All @@ -44,7 +43,7 @@

import static org.junit.Assert.assertEquals;

public class S3Test extends JUnitSuite {
public class S3Test {

private static final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static SecureRandom rnd = new SecureRandom();
Expand Down
2 changes: 1 addition & 1 deletion aws-spi-pekko-http/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<appender-ref ref="STDOUT" />
</root>

<logger name="io.netty" level="trace" additivity="false">
<logger name="io.netty" level="info" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.typesafe" level="error" additivity="false">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.awsspi

import software.amazon.awssdk.http.{ SdkAsyncHttpClientH1TestSuite, SdkHttpConfigurationOption }
import software.amazon.awssdk.http.async.SdkAsyncHttpClient
import software.amazon.awssdk.utils.AttributeMap

class PekkoHttpClientH1TestSuite extends SdkAsyncHttpClientH1TestSuite {

override def setupClient(): SdkAsyncHttpClient = {
PekkoHttpClient.builder().buildWithDefaults(
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.box(true)).build());
}

// Failed tests
// The logic to not reuse connections on server error status is not implemented in PekkoHttpClient, and
// it seems that it is being reverted in https://github.com/aws/aws-sdk-java-v2/pull/5607
override def connectionReceiveServerErrorStatusShouldNotReuseConnection(): Unit = ()

}
Loading