Skip to content

Commit 0167ae7

Browse files
committed
Use ALPN to determine what sort of connection we've accepted
Motivation: To determine whether we're dealing with gRPC or gRPC Web (i.e. HTTP/2 or HTTP/1) the first bytes received on a new connection are parsed. When TLS is enabled and ALPN is supported there's no need to do this: we will be informed about the protocol we negotiated with our peer and therefore how to configure the channel. We also never enforced that the protocol negotiated via ALPN was used! Modifications: - Add a 'GRPCServerPipelineConfigurator' handler to supersede the 'HTTPProtocolSwitcher'. The new handler will use either ALPN and fall back to parsing the incoming bytes. - The parsing behavior is slightly different to that of 'HTTPProtocolSwitcher' in that we only read off enough bytes for the HTTP/2 connection preface, or enough bytes to parse an HTTP/1 request line (rather than reading the whole buffer as a string and processing that). - Added a server configuration option to disable ALPN being required. - Added tests. Result: - Server pipeline configuration is driven by ALPN, falling back to parsing. - ALPN is enforced on the server (partially resolving grpc#1042)
1 parent 69c828b commit 0167ae7

10 files changed

+701
-291
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
/*
2+
* Copyright 2020, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import Logging
17+
import NIO
18+
import NIOHTTP1
19+
import NIOHTTP2
20+
import NIOTLS
21+
22+
/// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
23+
/// version used for transport.
24+
///
25+
/// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
26+
/// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
27+
/// configured then the HTTP version is determined by parsing the inbound byte stream.
28+
final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
29+
internal typealias InboundIn = ByteBuffer
30+
internal typealias InboundOut = ByteBuffer
31+
32+
/// The server configuration.
33+
private let configuration: Server.Configuration
34+
35+
/// Reads which we're holding on to before the pipeline is configured.
36+
private var bufferedReads = CircularBuffer<NIOAny>()
37+
38+
/// The current state.
39+
private var state: State = .notConfigured
40+
41+
private enum State {
42+
/// The pipeline isn't configured yet.
43+
case notConfigured
44+
/// We're configuring the pipeline.
45+
case configuring
46+
}
47+
48+
init(configuration: Server.Configuration) {
49+
self.configuration = configuration
50+
}
51+
52+
/// Makes a gRPC Server keepalive handler.
53+
private func makeKeepaliveHandler() -> GRPCServerKeepaliveHandler {
54+
return .init(configuration: self.configuration.connectionKeepalive)
55+
}
56+
57+
/// Makes a gRPC idle handler for the server..
58+
private func makeIdleHandler() -> GRPCIdleHandler {
59+
return .init(
60+
mode: .server,
61+
logger: self.configuration.logger,
62+
idleTimeout: self.configuration.connectionIdleTimeout
63+
)
64+
}
65+
66+
/// Makes an HTTP/2 handler.
67+
private func makeHTTP2Handler() -> NIOHTTP2Handler {
68+
return .init(mode: .server)
69+
}
70+
71+
/// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
72+
private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
73+
var logger = self.configuration.logger
74+
75+
return .init(
76+
mode: .server,
77+
channel: channel,
78+
targetWindowSize: self.configuration.httpTargetWindowSize
79+
) { stream in
80+
stream.getOption(HTTP2StreamChannelOptions.streamID).map { streamID -> Logger in
81+
logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
82+
return logger
83+
}.recover { _ in
84+
logger[metadataKey: MetadataKey.h2StreamID] = "<unknown>"
85+
return logger
86+
}.flatMap { logger in
87+
// TODO: provide user configuration for header normalization.
88+
let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
89+
return stream.pipeline.addHandler(handler)
90+
}
91+
}
92+
}
93+
94+
/// Makes an HTTP/2 to raw gRPC server handler.
95+
private func makeHTTP2ToRawGRPCHandler(
96+
normalizeHeaders: Bool,
97+
logger: Logger
98+
) -> HTTP2ToRawGRPCServerCodec {
99+
return HTTP2ToRawGRPCServerCodec(
100+
servicesByName: self.configuration.serviceProvidersByName,
101+
encoding: self.configuration.messageEncoding,
102+
errorDelegate: self.configuration.errorDelegate,
103+
normalizeHeaders: normalizeHeaders,
104+
logger: logger
105+
)
106+
}
107+
108+
/// The pipeline finished configuring.
109+
private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
110+
switch result {
111+
case .success:
112+
context.pipeline.removeHandler(context: context, promise: nil)
113+
case let .failure(error):
114+
self.errorCaught(context: context, error: error)
115+
}
116+
}
117+
118+
/// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
119+
private func configureHTTP2(context: ChannelHandlerContext) {
120+
// We're now configuring the pipeline.
121+
self.state = .configuring
122+
123+
// We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
124+
// to then insert our keepalive and idle handlers between. We can just add everything together.
125+
var handlers: [ChannelHandler] = []
126+
handlers.reserveCapacity(4)
127+
handlers.append(self.makeHTTP2Handler())
128+
handlers.append(self.makeKeepaliveHandler())
129+
handlers.append(self.makeIdleHandler())
130+
handlers.append(self.makeHTTP2Multiplexer(for: context.channel))
131+
132+
// Now configure the pipeline with the handlers.
133+
context.channel.pipeline.addHandlers(handlers).whenComplete { result in
134+
self.configurationCompleted(result: result, context: context)
135+
}
136+
}
137+
138+
/// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
139+
private func configureHTTP1(context: ChannelHandlerContext) {
140+
// We're now configuring the pipeline.
141+
self.state = .configuring
142+
143+
context.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
144+
context.pipeline.addHandlers([
145+
WebCORSHandler(),
146+
GRPCWebToHTTP2ServerCodec(scheme: self.configuration.tls == nil ? "http" : "https"),
147+
// There's no need to normalize headers for HTTP/1.
148+
self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger),
149+
])
150+
}.whenComplete { result in
151+
self.configurationCompleted(result: result, context: context)
152+
}
153+
}
154+
155+
/// Attempts to determine the HTTP version from the buffer and then configure the pipeline
156+
/// appropriately. Closes the connection if the HTTP version could not be determined.
157+
private func determineHTTPVersionAndConfigurePipeline(
158+
usingBuffer buffer: ByteBuffer,
159+
context: ChannelHandlerContext
160+
) {
161+
if HTTPVersionParser.prefixedWithHTTP2ConnectionPreface(buffer) {
162+
self.configureHTTP2(context: context)
163+
} else if HTTPVersionParser.prefixedWithHTTP1RequestLine(buffer) {
164+
self.configureHTTP1(context: context)
165+
} else {
166+
self.configuration.logger.error("Unable to determine http version, closing")
167+
context.close(mode: .all, promise: nil)
168+
}
169+
}
170+
171+
/// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
172+
/// requests.
173+
private func handleHandshakeCompletedEvent(
174+
_ event: TLSUserEvent,
175+
context: ChannelHandlerContext
176+
) {
177+
switch event {
178+
case let .handshakeCompleted(negotiatedProtocol):
179+
self.configuration.logger.debug("TLS handshake completed", metadata: [
180+
"alpn": "\(negotiatedProtocol ?? "nil")",
181+
])
182+
183+
switch negotiatedProtocol {
184+
case let .some(negotiated):
185+
if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
186+
self.configureHTTP2(context: context)
187+
} else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
188+
self.configureHTTP1(context: context)
189+
} else {
190+
self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
191+
context.close(mode: .all, promise: nil)
192+
}
193+
194+
case .none:
195+
if self.configuration.tls?.requireALPN ?? true {
196+
self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
197+
context.close(mode: .all, promise: nil)
198+
} else {
199+
self.configuration.logger.warning("No ALPN protocol negotiated'")
200+
}
201+
}
202+
203+
case .shutdownCompleted:
204+
// We don't care about this here.
205+
()
206+
}
207+
}
208+
209+
// MARK: - Channel Handler
210+
211+
internal func errorCaught(context: ChannelHandlerContext, error: Error) {
212+
if let delegate = self.configuration.errorDelegate {
213+
let baseError: Error
214+
215+
if let errorWithContext = error as? GRPCError.WithContext {
216+
baseError = errorWithContext.error
217+
} else {
218+
baseError = error
219+
}
220+
221+
delegate.observeLibraryError(baseError)
222+
}
223+
224+
context.close(mode: .all, promise: nil)
225+
}
226+
227+
internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
228+
switch self.state {
229+
case .notConfigured:
230+
if let event = event as? TLSUserEvent {
231+
self.handleHandshakeCompletedEvent(event, context: context)
232+
}
233+
case .configuring:
234+
()
235+
}
236+
237+
context.fireUserInboundEventTriggered(event)
238+
}
239+
240+
internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
241+
self.bufferedReads.append(data)
242+
243+
switch self.state {
244+
case .notConfigured:
245+
let buffer = self.unwrapInboundIn(data)
246+
self.determineHTTPVersionAndConfigurePipeline(usingBuffer: buffer, context: context)
247+
248+
case .configuring:
249+
// Just buffer, that's okay.
250+
()
251+
}
252+
253+
// Don't forward the reads: we'll do so when we have configured the pipeline.
254+
}
255+
256+
internal func removeHandler(
257+
context: ChannelHandlerContext,
258+
removalToken: ChannelHandlerContext.RemovalToken
259+
) {
260+
// Forward any buffered reads.
261+
while let read = self.bufferedReads.popFirst() {
262+
context.fireChannelRead(read)
263+
}
264+
context.leavePipeline(removalToken: removalToken)
265+
}
266+
}
267+
268+
// MARK: - HTTP Version Parser
269+
270+
struct HTTPVersionParser {
271+
/// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
272+
private static let http2ClientMagic = [
273+
UInt8(ascii: "P"),
274+
UInt8(ascii: "R"),
275+
UInt8(ascii: "I"),
276+
UInt8(ascii: " "),
277+
UInt8(ascii: "*"),
278+
UInt8(ascii: " "),
279+
UInt8(ascii: "H"),
280+
UInt8(ascii: "T"),
281+
UInt8(ascii: "T"),
282+
UInt8(ascii: "P"),
283+
UInt8(ascii: "/"),
284+
UInt8(ascii: "2"),
285+
UInt8(ascii: "."),
286+
UInt8(ascii: "0"),
287+
UInt8(ascii: "\r"),
288+
UInt8(ascii: "\n"),
289+
UInt8(ascii: "\r"),
290+
UInt8(ascii: "\n"),
291+
UInt8(ascii: "S"),
292+
UInt8(ascii: "M"),
293+
UInt8(ascii: "\r"),
294+
UInt8(ascii: "\n"),
295+
UInt8(ascii: "\r"),
296+
UInt8(ascii: "\n"),
297+
]
298+
299+
/// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
300+
/// connection preface.
301+
static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> Bool {
302+
var buffer = buffer
303+
304+
guard let bytes = buffer.readBytes(length: HTTPVersionParser.http2ClientMagic.count) else {
305+
return false
306+
}
307+
308+
return bytes == HTTPVersionParser.http2ClientMagic
309+
}
310+
311+
private static let http1_1 = [
312+
UInt8(ascii: "H"),
313+
UInt8(ascii: "T"),
314+
UInt8(ascii: "T"),
315+
UInt8(ascii: "P"),
316+
UInt8(ascii: "/"),
317+
UInt8(ascii: "1"),
318+
UInt8(ascii: "."),
319+
UInt8(ascii: "1"),
320+
]
321+
322+
/// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
323+
static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> Bool {
324+
var readableBytesView = buffer.readableBytesView
325+
326+
// From RFC 2616 § 5.1:
327+
// Request-Line = Method SP Request-URI SP HTTP-Version CRLF
328+
329+
// Read off the Method and Request-URI (and spaces).
330+
guard readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil,
331+
readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil else {
332+
return false
333+
}
334+
335+
// Read off the HTTP-Version and CR.
336+
guard let versionView = readableBytesView.trimPrefix(to: UInt8(ascii: "\r")) else {
337+
return false
338+
}
339+
340+
// Check that the LF followed the CR.
341+
guard readableBytesView.first == UInt8(ascii: "\n") else {
342+
return false
343+
}
344+
345+
// Now check the HTTP version.
346+
var version = ByteBuffer(versionView)
347+
let versionBytes = version.readBytes(length: version.readableBytes)!
348+
return versionBytes == HTTPVersionParser.http1_1
349+
}
350+
}

Sources/GRPC/GRPCServerRequestRoutingHandler.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ struct CallPath {
8181
extension Collection where Self == Self.SubSequence, Self.Element: Equatable {
8282
/// Trims out the prefix up to `separator`, and returns it.
8383
/// Sets self to the subsequence after the separator, and returns the subsequence before the separator.
84-
/// If self is emtpy returns `nil`
84+
/// If self is empty returns `nil`
8585
/// - parameters:
8686
/// - separator : The Element between the head which is returned and the rest which is left in self.
87-
/// - returns: SubSequence containing everything between the beginnning and the first occurance of
87+
/// - returns: SubSequence containing everything between the beginning and the first occurrence of
8888
/// `separator`. If `separator` is not found this will be the entire Collection. If the collection is empty
8989
/// returns `nil`
9090
mutating func trimPrefix(to separator: Element) -> SubSequence? {

0 commit comments

Comments
 (0)