Skip to content

Commit a3d00a6

Browse files
clintonpiLukasadnadobaglbrntt
authored
Add "debug initializer" hook for channels (#801)
Motivation: As requested in #596, it can be handy to have a lower-level access to channels (HTTP/1 connection, HTTP/2 connection, or HTTP/2 stream) to enable a more fine-grained interaction for, say, observability, testing, etc. Modifications: - Add 3 new properties (`http1_1ConnectionDebugInitializer`, `http2ConnectionDebugInitializer` and `http2StreamChannelDebugInitializer`) to `HTTPClient.Configuration` with access to the respective channels. These properties are of `Optional` type `@Sendable (Channel) -> EventLoopFuture<Void>` and are called when creating a connection/stream. Result: Provides APIs for a lower-level access to channels. --------- Co-authored-by: Cory Benfield <[email protected]> Co-authored-by: David Nadoba <[email protected]> Co-authored-by: George Barnett <[email protected]>
1 parent 373862a commit a3d00a6

File tree

5 files changed

+264
-9
lines changed

5 files changed

+264
-9
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+18-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ final class HTTP2Connection {
3535
let multiplexer: HTTP2StreamMultiplexer
3636
let logger: Logger
3737

38+
/// A method with access to the stream channel that is called when creating the stream.
39+
let streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
40+
3841
/// the connection pool that created the connection
3942
let delegate: HTTP2ConnectionDelegate
4043

@@ -95,7 +98,8 @@ final class HTTP2Connection {
9598
decompression: HTTPClient.Decompression,
9699
maximumConnectionUses: Int?,
97100
delegate: HTTP2ConnectionDelegate,
98-
logger: Logger
101+
logger: Logger,
102+
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
99103
) {
100104
self.channel = channel
101105
self.id = connectionID
@@ -114,6 +118,7 @@ final class HTTP2Connection {
114118
)
115119
self.delegate = delegate
116120
self.state = .initialized
121+
self.streamChannelDebugInitializer = streamChannelDebugInitializer
117122
}
118123

119124
deinit {
@@ -128,15 +133,17 @@ final class HTTP2Connection {
128133
delegate: HTTP2ConnectionDelegate,
129134
decompression: HTTPClient.Decompression,
130135
maximumConnectionUses: Int?,
131-
logger: Logger
136+
logger: Logger,
137+
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
132138
) -> EventLoopFuture<(HTTP2Connection, Int)> {
133139
let connection = HTTP2Connection(
134140
channel: channel,
135141
connectionID: connectionID,
136142
decompression: decompression,
137143
maximumConnectionUses: maximumConnectionUses,
138144
delegate: delegate,
139-
logger: logger
145+
logger: logger,
146+
streamChannelDebugInitializer: streamChannelDebugInitializer
140147
)
141148
return connection._start0().map { maxStreams in (connection, maxStreams) }
142149
}
@@ -259,8 +266,14 @@ final class HTTP2Connection {
259266
self.openStreams.remove(box)
260267
}
261268

262-
channel.write(request, promise: nil)
263-
return channel.eventLoop.makeSucceededVoidFuture()
269+
if let streamChannelDebugInitializer = self.streamChannelDebugInitializer {
270+
return streamChannelDebugInitializer(channel).map { _ in
271+
channel.write(request, promise: nil)
272+
}
273+
} else {
274+
channel.write(request, promise: nil)
275+
return channel.eventLoop.makeSucceededVoidFuture()
276+
}
264277
} catch {
265278
return channel.eventLoop.makeFailedFuture(error)
266279
}

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift

+38-3
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,19 @@ extension HTTPConnectionPool.ConnectionFactory {
8585
decompression: self.clientConfiguration.decompression,
8686
logger: logger
8787
)
88-
requester.http1ConnectionCreated(connection)
88+
89+
if let connectionDebugInitializer = self.clientConfiguration.http1_1ConnectionDebugInitializer {
90+
connectionDebugInitializer(channel).whenComplete { debugInitializerResult in
91+
switch debugInitializerResult {
92+
case .success:
93+
requester.http1ConnectionCreated(connection)
94+
case .failure(let error):
95+
requester.failedToCreateHTTPConnection(connectionID, error: error)
96+
}
97+
}
98+
} else {
99+
requester.http1ConnectionCreated(connection)
100+
}
89101
} catch {
90102
requester.failedToCreateHTTPConnection(connectionID, error: error)
91103
}
@@ -96,11 +108,34 @@ extension HTTPConnectionPool.ConnectionFactory {
96108
delegate: http2ConnectionDelegate,
97109
decompression: self.clientConfiguration.decompression,
98110
maximumConnectionUses: self.clientConfiguration.maximumUsesPerConnection,
99-
logger: logger
111+
logger: logger,
112+
streamChannelDebugInitializer:
113+
self.clientConfiguration.http2StreamChannelDebugInitializer
100114
).whenComplete { result in
101115
switch result {
102116
case .success((let connection, let maximumStreams)):
103-
requester.http2ConnectionCreated(connection, maximumStreams: maximumStreams)
117+
if let connectionDebugInitializer = self.clientConfiguration.http2ConnectionDebugInitializer {
118+
connectionDebugInitializer(channel).whenComplete {
119+
debugInitializerResult in
120+
switch debugInitializerResult {
121+
case .success:
122+
requester.http2ConnectionCreated(
123+
connection,
124+
maximumStreams: maximumStreams
125+
)
126+
case .failure(let error):
127+
requester.failedToCreateHTTPConnection(
128+
connectionID,
129+
error: error
130+
)
131+
}
132+
}
133+
} else {
134+
requester.http2ConnectionCreated(
135+
connection,
136+
maximumStreams: maximumStreams
137+
)
138+
}
104139
case .failure(let error):
105140
requester.failedToCreateHTTPConnection(connectionID, error: error)
106141
}

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+3-1
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,9 @@ final class HTTPConnectionPool:
324324
connection.executeRequest(request.req)
325325

326326
case .executeRequests(let requests, let connection):
327-
for request in requests { connection.executeRequest(request.req) }
327+
for request in requests {
328+
connection.executeRequest(request.req)
329+
}
328330

329331
case .failRequest(let request, let error):
330332
request.req.fail(error)

Sources/AsyncHTTPClient/HTTPClient.swift

+35
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,15 @@ public class HTTPClient {
847847
/// By default, don't use it
848848
public var enableMultipath: Bool
849849

850+
/// A method with access to the HTTP/1 connection channel that is called when creating the connection.
851+
public var http1_1ConnectionDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
852+
853+
/// A method with access to the HTTP/2 connection channel that is called when creating the connection.
854+
public var http2ConnectionDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
855+
856+
/// A method with access to the HTTP/2 stream channel that is called when creating the stream.
857+
public var http2StreamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
858+
850859
public init(
851860
tlsConfiguration: TLSConfiguration? = nil,
852861
redirectConfiguration: RedirectConfiguration? = nil,
@@ -949,6 +958,32 @@ public class HTTPClient {
949958
decompression: decompression
950959
)
951960
}
961+
962+
public init(
963+
tlsConfiguration: TLSConfiguration? = nil,
964+
redirectConfiguration: RedirectConfiguration? = nil,
965+
timeout: Timeout = Timeout(),
966+
connectionPool: ConnectionPool = ConnectionPool(),
967+
proxy: Proxy? = nil,
968+
ignoreUncleanSSLShutdown: Bool = false,
969+
decompression: Decompression = .disabled,
970+
http1_1ConnectionDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
971+
http2ConnectionDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil,
972+
http2StreamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
973+
) {
974+
self.init(
975+
tlsConfiguration: tlsConfiguration,
976+
redirectConfiguration: redirectConfiguration,
977+
timeout: timeout,
978+
connectionPool: connectionPool,
979+
proxy: proxy,
980+
ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown,
981+
decompression: decompression
982+
)
983+
self.http1_1ConnectionDebugInitializer = http1_1ConnectionDebugInitializer
984+
self.http2ConnectionDebugInitializer = http2ConnectionDebugInitializer
985+
self.http2StreamChannelDebugInitializer = http2StreamChannelDebugInitializer
986+
}
952987
}
953988

954989
/// Specifies how `EventLoopGroup` will be created and establishes lifecycle ownership.

Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+170
Original file line numberDiff line numberDiff line change
@@ -4436,4 +4436,174 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass {
44364436
request.setBasicAuth(username: "foo", password: "bar")
44374437
XCTAssertEqual(request.headers.first(name: "Authorization"), "Basic Zm9vOmJhcg==")
44384438
}
4439+
4440+
func runBaseTestForHTTP1ConnectionDebugInitializer(ssl: Bool) {
4441+
let connectionDebugInitializerUtil = CountingDebugInitializerUtil()
4442+
4443+
// Initializing even with just `http1_1ConnectionDebugInitializer` (rather than manually
4444+
// modifying `config`) to ensure that the matching `init` actually wires up this argument
4445+
// with the respective property. This is necessary as these parameters are defaulted and can
4446+
// be easy to miss.
4447+
var config = HTTPClient.Configuration(
4448+
http1_1ConnectionDebugInitializer: { channel in
4449+
connectionDebugInitializerUtil.initialize(channel: channel)
4450+
}
4451+
)
4452+
config.httpVersion = .http1Only
4453+
4454+
if ssl {
4455+
config.tlsConfiguration = .clientDefault
4456+
config.tlsConfiguration?.certificateVerification = .none
4457+
}
4458+
4459+
let higherConnectTimeout = CountingDebugInitializerUtil.duration + .milliseconds(100)
4460+
var configWithHigherTimeout = config
4461+
configWithHigherTimeout.timeout = .init(connect: higherConnectTimeout)
4462+
4463+
let clientWithHigherTimeout = HTTPClient(
4464+
eventLoopGroupProvider: .singleton,
4465+
configuration: configWithHigherTimeout,
4466+
backgroundActivityLogger: Logger(
4467+
label: "HTTPClient",
4468+
factory: StreamLogHandler.standardOutput(label:)
4469+
)
4470+
)
4471+
defer { XCTAssertNoThrow(try clientWithHigherTimeout.syncShutdown()) }
4472+
4473+
let bin = HTTPBin(.http1_1(ssl: ssl, compress: false))
4474+
defer { XCTAssertNoThrow(try bin.shutdown()) }
4475+
4476+
let scheme = ssl ? "https" : "http"
4477+
4478+
for _ in 0..<3 {
4479+
XCTAssertNoThrow(
4480+
try clientWithHigherTimeout.get(url: "\(scheme)://localhost:\(bin.port)/get").wait()
4481+
)
4482+
}
4483+
4484+
// Even though multiple requests were made, the connection debug initializer must be called
4485+
// only once.
4486+
XCTAssertEqual(connectionDebugInitializerUtil.executionCount, 1)
4487+
4488+
let lowerConnectTimeout = CountingDebugInitializerUtil.duration - .milliseconds(100)
4489+
var configWithLowerTimeout = config
4490+
configWithLowerTimeout.timeout = .init(connect: lowerConnectTimeout)
4491+
4492+
let clientWithLowerTimeout = HTTPClient(
4493+
eventLoopGroupProvider: .singleton,
4494+
configuration: configWithLowerTimeout,
4495+
backgroundActivityLogger: Logger(
4496+
label: "HTTPClient",
4497+
factory: StreamLogHandler.standardOutput(label:)
4498+
)
4499+
)
4500+
defer { XCTAssertNoThrow(try clientWithLowerTimeout.syncShutdown()) }
4501+
4502+
XCTAssertThrowsError(
4503+
try clientWithLowerTimeout.get(url: "\(scheme)://localhost:\(bin.port)/get").wait()
4504+
) {
4505+
XCTAssertEqual($0 as? HTTPClientError, .connectTimeout)
4506+
}
4507+
}
4508+
4509+
func testHTTP1PlainTextConnectionDebugInitializer() {
4510+
runBaseTestForHTTP1ConnectionDebugInitializer(ssl: false)
4511+
}
4512+
4513+
func testHTTP1EncryptedConnectionDebugInitializer() {
4514+
runBaseTestForHTTP1ConnectionDebugInitializer(ssl: true)
4515+
}
4516+
4517+
func testHTTP2ConnectionAndStreamChannelDebugInitializers() {
4518+
let connectionDebugInitializerUtil = CountingDebugInitializerUtil()
4519+
let streamChannelDebugInitializerUtil = CountingDebugInitializerUtil()
4520+
4521+
// Initializing even with just `http2ConnectionDebugInitializer` and
4522+
// `http2StreamChannelDebugInitializer` (rather than manually modifying `config`) to ensure
4523+
// that the matching `init` actually wires up these arguments with the respective
4524+
// properties. This is necessary as these parameters are defaulted and can be easy to miss.
4525+
var config = HTTPClient.Configuration(
4526+
http2ConnectionDebugInitializer: { channel in
4527+
connectionDebugInitializerUtil.initialize(channel: channel)
4528+
},
4529+
http2StreamChannelDebugInitializer: { channel in
4530+
streamChannelDebugInitializerUtil.initialize(channel: channel)
4531+
}
4532+
)
4533+
config.tlsConfiguration = .clientDefault
4534+
config.tlsConfiguration?.certificateVerification = .none
4535+
config.httpVersion = .automatic
4536+
4537+
let higherConnectTimeout = CountingDebugInitializerUtil.duration + .milliseconds(100)
4538+
var configWithHigherTimeout = config
4539+
configWithHigherTimeout.timeout = .init(connect: higherConnectTimeout)
4540+
4541+
let clientWithHigherTimeout = HTTPClient(
4542+
eventLoopGroupProvider: .singleton,
4543+
configuration: configWithHigherTimeout,
4544+
backgroundActivityLogger: Logger(
4545+
label: "HTTPClient",
4546+
factory: StreamLogHandler.standardOutput(label:)
4547+
)
4548+
)
4549+
defer { XCTAssertNoThrow(try clientWithHigherTimeout.syncShutdown()) }
4550+
4551+
let bin = HTTPBin(.http2(compress: false))
4552+
defer { XCTAssertNoThrow(try bin.shutdown()) }
4553+
4554+
let numberOfRequests = 3
4555+
4556+
for _ in 0..<numberOfRequests {
4557+
XCTAssertNoThrow(
4558+
try clientWithHigherTimeout.get(url: "https://localhost:\(bin.port)/get").wait()
4559+
)
4560+
}
4561+
4562+
// Even though multiple requests were made, the connection debug initializer must be called
4563+
// only once.
4564+
XCTAssertEqual(connectionDebugInitializerUtil.executionCount, 1)
4565+
4566+
// The stream channel debug initializer must be called only as much as the number of
4567+
// requests made.
4568+
XCTAssertEqual(streamChannelDebugInitializerUtil.executionCount, numberOfRequests)
4569+
4570+
let lowerConnectTimeout = CountingDebugInitializerUtil.duration - .milliseconds(100)
4571+
var configWithLowerTimeout = config
4572+
configWithLowerTimeout.timeout = .init(connect: lowerConnectTimeout)
4573+
4574+
let clientWithLowerTimeout = HTTPClient(
4575+
eventLoopGroupProvider: .singleton,
4576+
configuration: configWithLowerTimeout,
4577+
backgroundActivityLogger: Logger(
4578+
label: "HTTPClient",
4579+
factory: StreamLogHandler.standardOutput(label:)
4580+
)
4581+
)
4582+
defer { XCTAssertNoThrow(try clientWithLowerTimeout.syncShutdown()) }
4583+
4584+
XCTAssertThrowsError(
4585+
try clientWithLowerTimeout.get(url: "https://localhost:\(bin.port)/get").wait()
4586+
) {
4587+
XCTAssertEqual($0 as? HTTPClientError, .connectTimeout)
4588+
}
4589+
}
4590+
}
4591+
4592+
final class CountingDebugInitializerUtil: Sendable {
4593+
private let _executionCount = NIOLockedValueBox<Int>(0)
4594+
var executionCount: Int { self._executionCount.withLockedValue { $0 } }
4595+
4596+
/// The minimum time to spend running the debug initializer.
4597+
static let duration: TimeAmount = .milliseconds(300)
4598+
4599+
/// The actual debug initializer.
4600+
func initialize(channel: Channel) -> EventLoopFuture<Void> {
4601+
self._executionCount.withLockedValue { $0 += 1 }
4602+
4603+
let someScheduledTask = channel.eventLoop.scheduleTask(in: Self.duration) {
4604+
channel.eventLoop.makeSucceededVoidFuture()
4605+
}
4606+
4607+
return someScheduledTask.futureResult.flatMap { $0 }
4608+
}
44394609
}

0 commit comments

Comments
 (0)