Skip to content

Commit 69c828b

Browse files
authored
Plug in and switch on the 'HTTP/2 first' server handlers (#1050)
Motivation: We have all the parts in place to use HTTP/2 concepts by default on the server, so let's use them! Modifications: - Wire in the server error delegate to the 'HTTP2ToRawGRPCServerCodec' since this was missing - Modify the 'HTTPProtocolSwitcher' and relevant pipeline initializer to configure the pipeline correctly - Update various tests which relied on previous types - Move 'CallPath' out of the routing handler - Remove the routing handler and tests - Move the 'MessageEncodingHeaderValidator' out of the HTTP1 to GRPC server handler - Remove the 'HTTP1ToGRPCServerCodec' Result: When handling requests on HTTP/2 connections we no longer go translate to HTTP1 types first. A ~30-35% increase in QPS in unary benchmarks.
1 parent cdd3538 commit 69c828b

19 files changed

+329
-1455
lines changed

Sources/GRPC/CallHandlers/_BaseCallHandler.swift

+9-9
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public class _BaseCallHandler<
3030
public typealias RequestPayload = RequestDeserializer.Output
3131
public typealias ResponsePayload = ResponseSerializer.Input
3232

33-
public typealias InboundIn = _RawGRPCServerRequestPart
34-
public typealias OutboundOut = _RawGRPCServerResponsePart
33+
public typealias InboundIn = GRPCServerRequestPart<ByteBuffer>
34+
public typealias OutboundOut = GRPCServerResponsePart<ByteBuffer>
3535

3636
/// An interceptor pipeline.
3737
private var pipeline: ServerInterceptorPipeline<RequestPayload, ResponsePayload>?
@@ -117,8 +117,8 @@ public class _BaseCallHandler<
117117
let part = self.unwrapInboundIn(data)
118118

119119
switch part {
120-
case let .headers(headers):
121-
self.act(on: self.state.channelRead(.headers(headers)))
120+
case let .metadata(headers):
121+
self.act(on: self.state.channelRead(.metadata(headers)))
122122
case let .message(buffer):
123123
do {
124124
let request = try self.requestDeserializer.deserialize(byteBuffer: buffer)
@@ -466,7 +466,7 @@ extension _BaseCallHandler.State {
466466
/// Receive a request part from the `Channel`. If we're active we just forward these through the
467467
/// pipeline. We validate at the other end.
468468
internal mutating func channelRead(
469-
_ requestPart: _GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
469+
_ requestPart: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
470470
) -> Action {
471471
switch self {
472472
case .idle:
@@ -480,7 +480,7 @@ extension _BaseCallHandler.State {
480480
let part: GRPCServerRequestPart<_BaseCallHandler.RequestPayload>
481481

482482
switch requestPart {
483-
case let .headers(headers):
483+
case let .metadata(headers):
484484
filter = state.channelStreamState.receiveHeaders()
485485
part = .metadata(headers)
486486
case let .message(message):
@@ -703,7 +703,7 @@ extension _BaseCallHandler {
703703
// Only flush if we're streaming responses, if we're not streaming responses then we'll wait
704704
// for the response and end before emitting the flush.
705705
flush = self.callType.isStreamingResponses
706-
context.write(self.wrapOutboundOut(.headers(headers)), promise: promise)
706+
context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise)
707707

708708
case let .message(message, metadata):
709709
do {
@@ -712,7 +712,7 @@ extension _BaseCallHandler {
712712
allocator: context.channel.allocator
713713
)
714714
context.write(
715-
self.wrapOutboundOut(.message(.init(serializedResponse, compressed: metadata.compress))),
715+
self.wrapOutboundOut(.message(serializedResponse, metadata)),
716716
promise: promise
717717
)
718718
// Flush if we've been told to flush.
@@ -724,7 +724,7 @@ extension _BaseCallHandler {
724724
}
725725

726726
case let .end(status, trailers):
727-
context.write(self.wrapOutboundOut(.statusAndTrailers(status, trailers)), promise: promise)
727+
context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise)
728728
// Always flush on end.
729729
flush = true
730730
}

Sources/GRPC/GRPCServerRequestRoutingHandler.swift

+23-235
Original file line numberDiff line numberDiff line change
@@ -47,246 +47,34 @@ public struct CallHandlerContext {
4747
internal var path: String
4848
}
4949

50-
/// Attempts to route a request to a user-provided call handler. Also validates that the request has
51-
/// a suitable 'content-type' for gRPC.
52-
///
53-
/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name
54-
/// for a `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages.
55-
///
56-
/// After the pipeline has been configured with the `GRPCCallHandler`, this handler removes itself
57-
/// from the pipeline.
58-
public final class GRPCServerRequestRoutingHandler {
59-
private let logger: Logger
60-
private let servicesByName: [Substring: CallHandlerProvider]
61-
private let encoding: ServerMessageEncoding
62-
private weak var errorDelegate: ServerErrorDelegate?
63-
64-
private enum State: Equatable {
65-
case notConfigured
66-
case configuring([InboundOut])
67-
}
68-
69-
private var state: State = .notConfigured
70-
71-
public init(
72-
servicesByName: [Substring: CallHandlerProvider],
73-
encoding: ServerMessageEncoding,
74-
errorDelegate: ServerErrorDelegate?,
75-
logger: Logger
76-
) {
77-
self.servicesByName = servicesByName
78-
self.encoding = encoding
79-
self.errorDelegate = errorDelegate
80-
self.logger = logger
81-
}
82-
}
83-
84-
extension GRPCServerRequestRoutingHandler: ChannelInboundHandler, RemovableChannelHandler {
85-
public typealias InboundIn = HTTPServerRequestPart
86-
public typealias InboundOut = HTTPServerRequestPart
87-
public typealias OutboundOut = HTTPServerResponsePart
88-
89-
public func errorCaught(context: ChannelHandlerContext, error: Error) {
90-
let status: GRPCStatus
91-
if let errorWithContext = error as? GRPCError.WithContext {
92-
self.errorDelegate?.observeLibraryError(errorWithContext.error)
93-
status = errorWithContext.error.makeGRPCStatus()
94-
} else {
95-
self.errorDelegate?.observeLibraryError(error)
96-
status = (error as? GRPCStatusTransformable)?.makeGRPCStatus() ?? .processingError
97-
}
98-
99-
switch self.state {
100-
case .notConfigured:
101-
// We don't know what protocol we're speaking at this point. We'll just have to close the
102-
// channel.
103-
()
104-
105-
case let .configuring(messages):
106-
// first! is fine here: we only go from `.notConfigured` to `.configuring` when we receive
107-
// and validate the request head.
108-
let head = messages.compactMap { part -> HTTPRequestHead? in
109-
switch part {
110-
case let .head(head):
111-
return head
112-
default:
113-
return nil
114-
}
115-
}.first!
116-
117-
let responseHead = self.makeResponseHead(requestHead: head, status: status)
118-
context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
119-
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
120-
context.flush()
121-
}
122-
123-
context.close(mode: .all, promise: nil)
124-
}
125-
126-
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
127-
let requestPart = self.unwrapInboundIn(data)
128-
switch requestPart {
129-
case let .head(requestHead):
130-
precondition(self.state == .notConfigured)
131-
132-
// Validate the 'content-type' is related to gRPC before proceeding.
133-
let maybeContentType = requestHead.headers.first(name: GRPCHeaderName.contentType)
134-
guard let contentType = maybeContentType,
135-
contentType.starts(with: ContentType.commonPrefix) else {
136-
self.logger.warning(
137-
"received request whose 'content-type' does not exist or start with '\(ContentType.commonPrefix)'",
138-
metadata: ["content-type": "\(String(describing: maybeContentType))"]
139-
)
140-
141-
// From: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
142-
//
143-
// If 'content-type' does not begin with "application/grpc", gRPC servers SHOULD respond
144-
// with HTTP status of 415 (Unsupported Media Type). This will prevent other HTTP/2
145-
// clients from interpreting a gRPC error response, which uses status 200 (OK), as
146-
// successful.
147-
let responseHead = HTTPResponseHead(
148-
version: requestHead.version,
149-
status: .unsupportedMediaType
150-
)
151-
152-
// Fail the call. Note: we're not speaking gRPC here, so no status or message.
153-
context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
154-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
155-
return
156-
}
157-
158-
// Do we know how to handle this RPC?
159-
guard let callHandler = self.makeCallHandler(
160-
channel: context.channel,
161-
requestHead: requestHead
162-
) else {
163-
self.logger.warning(
164-
"unable to make call handler; the RPC is not implemented on this server",
165-
metadata: ["uri": "\(requestHead.uri)"]
166-
)
167-
168-
let status = GRPCError.RPCNotImplemented(rpc: requestHead.uri).makeGRPCStatus()
169-
let responseHead = self.makeResponseHead(requestHead: requestHead, status: status)
170-
171-
// Write back a 'trailers-only' response.
172-
context.write(self.wrapOutboundOut(.head(responseHead)), promise: nil)
173-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
174-
return
175-
}
176-
177-
self.logger.debug("received request head, configuring pipeline")
178-
179-
// Buffer the request head; we'll replay it in the next handler when we're removed from the
180-
// pipeline.
181-
self.state = .configuring([requestPart])
182-
183-
// Configure the rest of the pipeline to serve the RPC.
184-
let httpToGRPC = HTTP1ToGRPCServerCodec(encoding: self.encoding, logger: self.logger)
185-
context.pipeline.addHandlers([httpToGRPC, callHandler], position: .after(self))
186-
.whenSuccess {
187-
context.pipeline.removeHandler(self, promise: nil)
188-
}
189-
190-
case .body, .end:
191-
switch self.state {
192-
case .notConfigured:
193-
// We can reach this point if we're receiving messages for a method that isn't implemented,
194-
// in which case we just drop the messages; our response should already be in-flight.
195-
()
196-
197-
case var .configuring(buffer):
198-
// We received a message while the pipeline was being configured; hold on to it while we
199-
// finish configuring the pipeline.
200-
buffer.append(requestPart)
201-
self.state = .configuring(buffer)
202-
}
203-
}
204-
}
205-
206-
public func handlerRemoved(context: ChannelHandlerContext) {
207-
switch self.state {
208-
case .notConfigured:
209-
()
210-
211-
case let .configuring(messages):
212-
for message in messages {
213-
context.fireChannelRead(self.wrapInboundOut(message))
214-
}
215-
}
216-
}
217-
218-
/// A call URI split into components.
219-
struct CallPath {
220-
/// The name of the service to call.
221-
var service: String.UTF8View.SubSequence
222-
/// The name of the method to call.
223-
var method: String.UTF8View.SubSequence
224-
225-
/// Charater used to split the path into components.
226-
private let pathSplitDelimiter = UInt8(ascii: "/")
227-
228-
/// Split a path into service and method.
229-
/// Split is done in UTF8 as this turns out to be approximately 10x faster than a simple split.
230-
/// URI format: "/package.Servicename/MethodName"
231-
init?(requestURI: String) {
232-
var utf8View = requestURI.utf8[...]
233-
// Check and remove the split character at the beginning.
234-
guard let prefix = utf8View.trimPrefix(to: self.pathSplitDelimiter), prefix.isEmpty else {
235-
return nil
236-
}
237-
guard let service = utf8View.trimPrefix(to: pathSplitDelimiter) else {
238-
return nil
239-
}
240-
guard let method = utf8View.trimPrefix(to: pathSplitDelimiter) else {
241-
return nil
242-
}
243-
244-
self.service = service
245-
self.method = method
50+
/// A call URI split into components.
51+
struct CallPath {
52+
/// The name of the service to call.
53+
var service: String.UTF8View.SubSequence
54+
/// The name of the method to call.
55+
var method: String.UTF8View.SubSequence
56+
57+
/// Charater used to split the path into components.
58+
private let pathSplitDelimiter = UInt8(ascii: "/")
59+
60+
/// Split a path into service and method.
61+
/// Split is done in UTF8 as this turns out to be approximately 10x faster than a simple split.
62+
/// URI format: "/package.Servicename/MethodName"
63+
init?(requestURI: String) {
64+
var utf8View = requestURI.utf8[...]
65+
// Check and remove the split character at the beginning.
66+
guard let prefix = utf8View.trimPrefix(to: self.pathSplitDelimiter), prefix.isEmpty else {
67+
return nil
24668
}
247-
}
248-
249-
private func makeCallHandler(channel: Channel, requestHead: HTTPRequestHead) -> GRPCCallHandler? {
250-
// URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash:
251-
// - uriComponents[0]: empty
252-
// - uriComponents[1]: service name (including the package name);
253-
// `CallHandlerProvider`s should provide the service name including the package name.
254-
// - uriComponents[2]: method name.
255-
self.logger.debug("making call handler", metadata: ["path": "\(requestHead.uri)"])
256-
let uriComponents = CallPath(requestURI: requestHead.uri)
257-
258-
let context = CallHandlerContext(
259-
errorDelegate: self.errorDelegate,
260-
logger: self.logger,
261-
encoding: self.encoding,
262-
eventLoop: channel.eventLoop,
263-
path: requestHead.uri
264-
)
265-
266-
guard let callPath = uriComponents,
267-
let providerForServiceName = servicesByName[String.SubSequence(callPath.service)],
268-
let callHandler = providerForServiceName.handleMethod(
269-
String.SubSequence(callPath.method),
270-
callHandlerContext: context
271-
) else {
272-
self.logger.notice("could not create handler", metadata: ["path": "\(requestHead.uri)"])
69+
guard let service = utf8View.trimPrefix(to: pathSplitDelimiter) else {
27370
return nil
27471
}
275-
return callHandler
276-
}
277-
278-
private func makeResponseHead(requestHead: HTTPRequestHead,
279-
status: GRPCStatus) -> HTTPResponseHead {
280-
var headers: HTTPHeaders = [
281-
GRPCHeaderName.contentType: ContentType.protobuf.canonicalValue,
282-
GRPCHeaderName.statusCode: "\(status.code.rawValue)",
283-
]
284-
285-
if let message = status.message.flatMap(GRPCStatusMessageMarshaller.marshall) {
286-
headers.add(name: GRPCHeaderName.statusMessage, value: message)
72+
guard let method = utf8View.trimPrefix(to: pathSplitDelimiter) else {
73+
return nil
28774
}
28875

289-
return HTTPResponseHead(version: requestHead.version, status: .ok, headers: headers)
76+
self.service = service
77+
self.method = method
29078
}
29179
}
29280

0 commit comments

Comments
 (0)