Skip to content

Commit b81b8f2

Browse files
authored
Use ALPN to determine what sort of connection we've accepted (#1055)
Motivation: To determine whether we're dealing with gRPC or gRPC Web (i.e. HTTP/2 or HTTP/1) the first bytes received on a new connection are parsed. When TLS is enabled and ALPN is supported there's no need to do this: we will be informed about the protocol we negotiated with our peer and therefore how to configure the channel. We also never enforced that the protocol negotiated via ALPN was used! Modifications: - Add a 'GRPCServerPipelineConfigurator' handler to supersede the 'HTTPProtocolSwitcher'. The new handler will use either ALPN and fall back to parsing the incoming bytes. - The parsing behavior is slightly different to that of 'HTTPProtocolSwitcher' in that we only read off enough bytes for the HTTP/2 connection preface, or enough bytes to parse an HTTP/1 request line (rather than reading the whole buffer as a string and processing that). - Added a server configuration option to disable ALPN being required. - Added tests. Result: - Server pipeline configuration is driven by ALPN, falling back to parsing. - ALPN is enforced on the server (partially resolving #1042)
1 parent 5b2b7ec commit b81b8f2

10 files changed

+788
-291
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
/*
2+
* Copyright 2020, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import Logging
17+
import NIO
18+
import NIOHTTP1
19+
import NIOHTTP2
20+
import NIOTLS
21+
22+
/// Configures a server pipeline for gRPC with the appropriate handlers depending on the HTTP
23+
/// version used for transport.
24+
///
25+
/// If TLS is enabled then the handler listens for an 'TLSUserEvent.handshakeCompleted' event and
26+
/// configures the pipeline appropriately for the protocol negotiated via ALPN. If TLS is not
27+
/// configured then the HTTP version is determined by parsing the inbound byte stream.
28+
final class GRPCServerPipelineConfigurator: ChannelInboundHandler, RemovableChannelHandler {
29+
internal typealias InboundIn = ByteBuffer
30+
internal typealias InboundOut = ByteBuffer
31+
32+
/// The server configuration.
33+
private let configuration: Server.Configuration
34+
35+
/// Reads which we're holding on to before the pipeline is configured.
36+
private var bufferedReads = CircularBuffer<NIOAny>()
37+
38+
/// The current state.
39+
private var state: State
40+
41+
private enum ALPN {
42+
/// ALPN is expected. It may or may not be required, however.
43+
case expected(required: Bool)
44+
45+
/// ALPN was expected but not required and no protocol was negotiated in the handshake. We may
46+
/// now fall back to parsing bytes on the connection.
47+
case expectedButFallingBack
48+
49+
/// ALPN is not expected; this is a cleartext connection.
50+
case notExpected
51+
}
52+
53+
private enum State {
54+
/// The pipeline isn't configured yet.
55+
case notConfigured(alpn: ALPN)
56+
/// We're configuring the pipeline.
57+
case configuring
58+
}
59+
60+
init(configuration: Server.Configuration) {
61+
if let tls = configuration.tls {
62+
self.state = .notConfigured(alpn: .expected(required: tls.requireALPN))
63+
} else {
64+
self.state = .notConfigured(alpn: .notExpected)
65+
}
66+
67+
self.configuration = configuration
68+
}
69+
70+
/// Makes a gRPC Server keepalive handler.
71+
private func makeKeepaliveHandler() -> GRPCServerKeepaliveHandler {
72+
return .init(configuration: self.configuration.connectionKeepalive)
73+
}
74+
75+
/// Makes a gRPC idle handler for the server..
76+
private func makeIdleHandler() -> GRPCIdleHandler {
77+
return .init(
78+
mode: .server,
79+
logger: self.configuration.logger,
80+
idleTimeout: self.configuration.connectionIdleTimeout
81+
)
82+
}
83+
84+
/// Makes an HTTP/2 handler.
85+
private func makeHTTP2Handler() -> NIOHTTP2Handler {
86+
return .init(mode: .server)
87+
}
88+
89+
/// Makes an HTTP/2 multiplexer suitable handling gRPC requests.
90+
private func makeHTTP2Multiplexer(for channel: Channel) -> HTTP2StreamMultiplexer {
91+
var logger = self.configuration.logger
92+
93+
return .init(
94+
mode: .server,
95+
channel: channel,
96+
targetWindowSize: self.configuration.httpTargetWindowSize
97+
) { stream in
98+
stream.getOption(HTTP2StreamChannelOptions.streamID).map { streamID -> Logger in
99+
logger[metadataKey: MetadataKey.h2StreamID] = "\(streamID)"
100+
return logger
101+
}.recover { _ in
102+
logger[metadataKey: MetadataKey.h2StreamID] = "<unknown>"
103+
return logger
104+
}.flatMap { logger in
105+
// TODO: provide user configuration for header normalization.
106+
let handler = self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: true, logger: logger)
107+
return stream.pipeline.addHandler(handler)
108+
}
109+
}
110+
}
111+
112+
/// Makes an HTTP/2 to raw gRPC server handler.
113+
private func makeHTTP2ToRawGRPCHandler(
114+
normalizeHeaders: Bool,
115+
logger: Logger
116+
) -> HTTP2ToRawGRPCServerCodec {
117+
return HTTP2ToRawGRPCServerCodec(
118+
servicesByName: self.configuration.serviceProvidersByName,
119+
encoding: self.configuration.messageEncoding,
120+
errorDelegate: self.configuration.errorDelegate,
121+
normalizeHeaders: normalizeHeaders,
122+
logger: logger
123+
)
124+
}
125+
126+
/// The pipeline finished configuring.
127+
private func configurationCompleted(result: Result<Void, Error>, context: ChannelHandlerContext) {
128+
switch result {
129+
case .success:
130+
context.pipeline.removeHandler(context: context, promise: nil)
131+
case let .failure(error):
132+
self.errorCaught(context: context, error: error)
133+
}
134+
}
135+
136+
/// Configures the pipeline to handle gRPC requests on an HTTP/2 connection.
137+
private func configureHTTP2(context: ChannelHandlerContext) {
138+
// We're now configuring the pipeline.
139+
self.state = .configuring
140+
141+
// We could use 'Channel.configureHTTP2Pipeline', but then we'd have to find the right handlers
142+
// to then insert our keepalive and idle handlers between. We can just add everything together.
143+
var handlers: [ChannelHandler] = []
144+
handlers.reserveCapacity(4)
145+
handlers.append(self.makeHTTP2Handler())
146+
handlers.append(self.makeKeepaliveHandler())
147+
handlers.append(self.makeIdleHandler())
148+
handlers.append(self.makeHTTP2Multiplexer(for: context.channel))
149+
150+
// Now configure the pipeline with the handlers.
151+
context.channel.pipeline.addHandlers(handlers).whenComplete { result in
152+
self.configurationCompleted(result: result, context: context)
153+
}
154+
}
155+
156+
/// Configures the pipeline to handle gRPC-Web requests on an HTTP/1 connection.
157+
private func configureHTTP1(context: ChannelHandlerContext) {
158+
// We're now configuring the pipeline.
159+
self.state = .configuring
160+
161+
context.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap {
162+
context.pipeline.addHandlers([
163+
WebCORSHandler(),
164+
GRPCWebToHTTP2ServerCodec(scheme: self.configuration.tls == nil ? "http" : "https"),
165+
// There's no need to normalize headers for HTTP/1.
166+
self.makeHTTP2ToRawGRPCHandler(normalizeHeaders: false, logger: self.configuration.logger),
167+
])
168+
}.whenComplete { result in
169+
self.configurationCompleted(result: result, context: context)
170+
}
171+
}
172+
173+
/// Attempts to determine the HTTP version from the buffer and then configure the pipeline
174+
/// appropriately. Closes the connection if the HTTP version could not be determined.
175+
private func determineHTTPVersionAndConfigurePipeline(
176+
buffer: ByteBuffer,
177+
context: ChannelHandlerContext
178+
) {
179+
if HTTPVersionParser.prefixedWithHTTP2ConnectionPreface(buffer) {
180+
self.configureHTTP2(context: context)
181+
} else if HTTPVersionParser.prefixedWithHTTP1RequestLine(buffer) {
182+
self.configureHTTP1(context: context)
183+
} else {
184+
self.configuration.logger.error("Unable to determine http version, closing")
185+
context.close(mode: .all, promise: nil)
186+
}
187+
}
188+
189+
/// Handles a 'TLSUserEvent.handshakeCompleted' event and configures the pipeline to handle gRPC
190+
/// requests.
191+
private func handleHandshakeCompletedEvent(
192+
_ event: TLSUserEvent,
193+
alpnIsRequired: Bool,
194+
context: ChannelHandlerContext
195+
) {
196+
switch event {
197+
case let .handshakeCompleted(negotiatedProtocol):
198+
self.configuration.logger.debug("TLS handshake completed", metadata: [
199+
"alpn": "\(negotiatedProtocol ?? "nil")",
200+
])
201+
202+
switch negotiatedProtocol {
203+
case let .some(negotiated):
204+
if GRPCApplicationProtocolIdentifier.isHTTP2Like(negotiated) {
205+
self.configureHTTP2(context: context)
206+
} else if GRPCApplicationProtocolIdentifier.isHTTP1(negotiated) {
207+
self.configureHTTP1(context: context)
208+
} else {
209+
self.configuration.logger.warning("Unsupported ALPN identifier '\(negotiated)', closing")
210+
context.close(mode: .all, promise: nil)
211+
}
212+
213+
case .none:
214+
if alpnIsRequired {
215+
self.configuration.logger.warning("No ALPN protocol negotiated, closing'")
216+
context.close(mode: .all, promise: nil)
217+
} else {
218+
self.configuration.logger.warning("No ALPN protocol negotiated'")
219+
// We're now falling back to parsing bytes.
220+
self.state = .notConfigured(alpn: .expectedButFallingBack)
221+
self.tryParsingBufferedData(context: context)
222+
}
223+
}
224+
225+
case .shutdownCompleted:
226+
// We don't care about this here.
227+
()
228+
}
229+
}
230+
231+
/// Try to parse the buffered data to determine whether or not HTTP/2 or HTTP/1 should be used.
232+
private func tryParsingBufferedData(context: ChannelHandlerContext) {
233+
guard let first = self.bufferedReads.first else {
234+
// No data buffered yet. We'll try when we read.
235+
return
236+
}
237+
238+
let buffer = self.unwrapInboundIn(first)
239+
self.determineHTTPVersionAndConfigurePipeline(buffer: buffer, context: context)
240+
}
241+
242+
// MARK: - Channel Handler
243+
244+
internal func errorCaught(context: ChannelHandlerContext, error: Error) {
245+
if let delegate = self.configuration.errorDelegate {
246+
let baseError: Error
247+
248+
if let errorWithContext = error as? GRPCError.WithContext {
249+
baseError = errorWithContext.error
250+
} else {
251+
baseError = error
252+
}
253+
254+
delegate.observeLibraryError(baseError)
255+
}
256+
257+
context.close(mode: .all, promise: nil)
258+
}
259+
260+
internal func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
261+
switch self.state {
262+
case let .notConfigured(alpn: .expected(required)):
263+
if let event = event as? TLSUserEvent {
264+
self.handleHandshakeCompletedEvent(event, alpnIsRequired: required, context: context)
265+
}
266+
267+
case .notConfigured(alpn: .expectedButFallingBack),
268+
.notConfigured(alpn: .notExpected),
269+
.configuring:
270+
()
271+
}
272+
273+
context.fireUserInboundEventTriggered(event)
274+
}
275+
276+
internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
277+
self.bufferedReads.append(data)
278+
279+
switch self.state {
280+
case .notConfigured(alpn: .notExpected),
281+
.notConfigured(alpn: .expectedButFallingBack):
282+
// If ALPN isn't expected, or we didn't negotiate via ALPN and we don't require it then we
283+
// can try parsing the data we just buffered.
284+
self.tryParsingBufferedData(context: context)
285+
286+
case .notConfigured(alpn: .expected),
287+
.configuring:
288+
// We expect ALPN or we're being configured, just buffer the data, we'll forward it later.
289+
()
290+
}
291+
292+
// Don't forward the reads: we'll do so when we have configured the pipeline.
293+
}
294+
295+
internal func removeHandler(
296+
context: ChannelHandlerContext,
297+
removalToken: ChannelHandlerContext.RemovalToken
298+
) {
299+
// Forward any buffered reads.
300+
while let read = self.bufferedReads.popFirst() {
301+
context.fireChannelRead(read)
302+
}
303+
context.leavePipeline(removalToken: removalToken)
304+
}
305+
}
306+
307+
// MARK: - HTTP Version Parser
308+
309+
struct HTTPVersionParser {
310+
/// HTTP/2 connection preface bytes. See RFC 7540 § 5.3.
311+
private static let http2ClientMagic = [
312+
UInt8(ascii: "P"),
313+
UInt8(ascii: "R"),
314+
UInt8(ascii: "I"),
315+
UInt8(ascii: " "),
316+
UInt8(ascii: "*"),
317+
UInt8(ascii: " "),
318+
UInt8(ascii: "H"),
319+
UInt8(ascii: "T"),
320+
UInt8(ascii: "T"),
321+
UInt8(ascii: "P"),
322+
UInt8(ascii: "/"),
323+
UInt8(ascii: "2"),
324+
UInt8(ascii: "."),
325+
UInt8(ascii: "0"),
326+
UInt8(ascii: "\r"),
327+
UInt8(ascii: "\n"),
328+
UInt8(ascii: "\r"),
329+
UInt8(ascii: "\n"),
330+
UInt8(ascii: "S"),
331+
UInt8(ascii: "M"),
332+
UInt8(ascii: "\r"),
333+
UInt8(ascii: "\n"),
334+
UInt8(ascii: "\r"),
335+
UInt8(ascii: "\n"),
336+
]
337+
338+
/// Determines whether the bytes in the `ByteBuffer` are prefixed with the HTTP/2 client
339+
/// connection preface.
340+
static func prefixedWithHTTP2ConnectionPreface(_ buffer: ByteBuffer) -> Bool {
341+
let view = buffer.readableBytesView
342+
343+
guard view.count >= HTTPVersionParser.http2ClientMagic.count else {
344+
// Not enough bytes.
345+
return false
346+
}
347+
348+
let slice = view[view.startIndex ..< view.startIndex.advanced(by: self.http2ClientMagic.count)]
349+
return slice.elementsEqual(HTTPVersionParser.http2ClientMagic)
350+
}
351+
352+
private static let http1_1 = [
353+
UInt8(ascii: "H"),
354+
UInt8(ascii: "T"),
355+
UInt8(ascii: "T"),
356+
UInt8(ascii: "P"),
357+
UInt8(ascii: "/"),
358+
UInt8(ascii: "1"),
359+
UInt8(ascii: "."),
360+
UInt8(ascii: "1"),
361+
]
362+
363+
/// Determines whether the bytes in the `ByteBuffer` are prefixed with an HTTP/1.1 request line.
364+
static func prefixedWithHTTP1RequestLine(_ buffer: ByteBuffer) -> Bool {
365+
var readableBytesView = buffer.readableBytesView
366+
367+
// From RFC 2616 § 5.1:
368+
// Request-Line = Method SP Request-URI SP HTTP-Version CRLF
369+
370+
// Read off the Method and Request-URI (and spaces).
371+
guard readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil,
372+
readableBytesView.trimPrefix(to: UInt8(ascii: " ")) != nil else {
373+
return false
374+
}
375+
376+
// Read off the HTTP-Version and CR.
377+
guard let versionView = readableBytesView.trimPrefix(to: UInt8(ascii: "\r")) else {
378+
return false
379+
}
380+
381+
// Check that the LF followed the CR.
382+
guard readableBytesView.first == UInt8(ascii: "\n") else {
383+
return false
384+
}
385+
386+
// Now check the HTTP version.
387+
return versionView.elementsEqual(HTTPVersionParser.http1_1)
388+
}
389+
}

0 commit comments

Comments
 (0)