Skip to content

Commit 0e6c747

Browse files
Updating grpc handler to gracefully close backend connections
This should address part 1 of #807 and #912 update grpc docs
1 parent 87c4da1 commit 0e6c747

18 files changed

+123
-84
lines changed

config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type Proxy struct {
9393
AuthSchemes map[string]AuthScheme
9494
GRPCMaxRxMsgSize int
9595
GRPCMaxTxMsgSize int
96+
GRPCGShutdownTimeout time.Duration
9697
}
9798

9899
type STSHeader struct {

config/default.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,19 @@ var defaultConfig = &Config{
4242
},
4343
},
4444
Proxy: Proxy{
45-
MaxConn: 10000,
46-
Strategy: "rnd",
47-
Matcher: "prefix",
48-
NoRouteStatus: 404,
49-
DialTimeout: 30 * time.Second,
50-
FlushInterval: time.Second,
51-
GlobalFlushInterval: 0,
52-
LocalIP: LocalIPString(),
53-
AuthSchemes: map[string]AuthScheme{},
54-
IdleConnTimeout: 15 * time.Second,
55-
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
56-
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
45+
MaxConn: 10000,
46+
Strategy: "rnd",
47+
Matcher: "prefix",
48+
NoRouteStatus: 404,
49+
DialTimeout: 30 * time.Second,
50+
FlushInterval: time.Second,
51+
GlobalFlushInterval: 0,
52+
LocalIP: LocalIPString(),
53+
AuthSchemes: map[string]AuthScheme{},
54+
IdleConnTimeout: 15 * time.Second,
55+
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
56+
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
57+
GRPCGShutdownTimeout: time.Second * 2,
5758
},
5859
Registry: Registry{
5960
Backend: "consul",

config/kvslice.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ import (
88

99
// parseKVSlice parses a configuration string in the form
1010
//
11-
// key=val;key=val,key=val;key=val
11+
// key=val;key=val,key=val;key=val
1212
//
1313
// into a list of string maps. maps are separated by comma and key/value
1414
// pairs within a map are separated by semicolons. The first key/value
1515
// pair of a map can omit the key and its value will be stored under the
1616
// empty key. This allows support of legacy configuration formats which
1717
// are
1818
//
19-
// val;opt1=val1;opt2=val2;...
19+
// val;opt1=val1;opt2=val2;...
2020
func parseKVSlice(in string) ([]map[string]string, error) {
2121
var keyOrFirstVal string
2222
maps := []map[string]string{}

config/load.go

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
153153
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
154154
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
155155
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
156+
f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
156157
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
157158
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
158159
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")

demo/server/server.go

+18-19
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,33 @@
44
//
55
// During startup the server performs the following steps:
66
//
7-
// * Add a handler for each prefix which provides a unique
8-
// response for that instance and endpoint
9-
// * Add a `/health` handler for the consul health check
10-
// * Register the service in consul with the listen address,
11-
// a health check under the given name and with one `urlprefix-`
12-
// tag per prefix
13-
// * Install a signal handler to deregister the service on exit
7+
// - Add a handler for each prefix which provides a unique
8+
// response for that instance and endpoint
9+
// - Add a `/health` handler for the consul health check
10+
// - Register the service in consul with the listen address,
11+
// a health check under the given name and with one `urlprefix-`
12+
// tag per prefix
13+
// - Install a signal handler to deregister the service on exit
1414
//
1515
// If the protocol is set to "ws" the registered endpoints function
1616
// as websocket echo servers.
1717
//
1818
// Example:
1919
//
20-
// # http server
21-
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
22-
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
23-
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
20+
// # http server
21+
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
22+
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
23+
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
2424
//
25-
// # https server
26-
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
27-
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
25+
// # https server
26+
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
27+
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
2828
//
29-
// # websocket server
30-
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
31-
//
32-
// # tcp server
33-
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
29+
// # websocket server
30+
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
3431
//
32+
// # tcp server
33+
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
3534
package main
3635

3736
import (
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
title: "proxy.grpcmaxrxmsgsize"
3+
---
4+
5+
`proxy.grpcmaxrxmsgsize` configures the grpc max receive message size in bytes. The default
6+
value is
7+
8+
proxy.grpcmaxrxmsgsize = 4194304
9+
10+
which is 4MB
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
title: "proxy.grpcmaxtxmsgsize"
3+
---
4+
5+
`proxy.grpcmaxtxmsgsize` configures the grpc max transmit message size in bytes. The default
6+
value is
7+
8+
proxy.grpcmaxtxmsgsize = 4194304
9+
10+
which is 4MB
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
title: "proxy.grpcshutdowntimeout"
3+
---
4+
5+
`proxy.grpcshutdowntimeout` configures the amount of time fabio will wait to attempt
6+
to close the connection while waiting for grpc traffic to finish to a backend that's been
7+
deregistered. The default value is
8+
9+
proxy.grpcshutdowntimeout = 2s

fabio.properties

+6
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,12 @@
561561
# The default is
562562
# proxy.grpcmaxtxmsgsize = 4194304
563563
#
564+
#
565+
# proxy.grpcshutdowntimeout configures the amount of time fabio will wait to attempt
566+
# to close the connection while waiting for grpc traffic to finish to a backend that's been
567+
# deregistered. Default value is
568+
# proxy.grpcshutdowntimeout = 2s
569+
# setting to 0s disables the wait.
564570

565571
# log.access.format configures the format of the access log.
566572
#

logger/logger.go

+31-32
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,37 @@
55
// takes place. Text between two fields is printed verbatim. See the common
66
// log file formats for an example.
77
//
8-
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
9-
// $remote_addr - host:port of remote client
10-
// $remote_host - host of remote client
11-
// $remote_port - port of remote client
12-
// $request - request <method> <uri> <proto>
13-
// $request_args - request query parameters
14-
// $request_host - request host header (aka server name)
15-
// $request_method - request method
16-
// $request_scheme - request scheme
17-
// $request_uri - request URI
18-
// $request_url - request URL
19-
// $request_proto - request protocol
20-
// $response_body_size - response body size in bytes
21-
// $response_status - response status code
22-
// $response_time_ms - response time in S.sss format
23-
// $response_time_us - response time in S.ssssss format
24-
// $response_time_ns - response time in S.sssssssss format
25-
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
26-
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
27-
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
28-
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
29-
// $time_unix_ms - log timestamp in unix epoch ms
30-
// $time_unix_us - log timestamp in unix epoch us
31-
// $time_unix_ns - log timestamp in unix epoch ns
32-
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
33-
// $upstream_addr - host:port of upstream server
34-
// $upstream_host - host of upstream server
35-
// $upstream_port - port of upstream server
36-
// $upstream_request_scheme - upstream request scheme
37-
// $upstream_request_uri - upstream request URI
38-
// $upstream_request_url - upstream request URL
39-
//
8+
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
9+
// $remote_addr - host:port of remote client
10+
// $remote_host - host of remote client
11+
// $remote_port - port of remote client
12+
// $request - request <method> <uri> <proto>
13+
// $request_args - request query parameters
14+
// $request_host - request host header (aka server name)
15+
// $request_method - request method
16+
// $request_scheme - request scheme
17+
// $request_uri - request URI
18+
// $request_url - request URL
19+
// $request_proto - request protocol
20+
// $response_body_size - response body size in bytes
21+
// $response_status - response status code
22+
// $response_time_ms - response time in S.sss format
23+
// $response_time_us - response time in S.ssssss format
24+
// $response_time_ns - response time in S.sssssssss format
25+
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
26+
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
27+
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
28+
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
29+
// $time_unix_ms - log timestamp in unix epoch ms
30+
// $time_unix_us - log timestamp in unix epoch us
31+
// $time_unix_ns - log timestamp in unix epoch ns
32+
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
33+
// $upstream_addr - host:port of upstream server
34+
// $upstream_host - host of upstream server
35+
// $upstream_port - port of upstream server
36+
// $upstream_request_scheme - upstream request scheme
37+
// $upstream_request_uri - upstream request URI
38+
// $upstream_request_url - upstream request URL
4039
package logger
4140

4241
import (

proxy/grpc_handler.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,13 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string)
165165
return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil
166166
}
167167

168-
//grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
169-
//the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
170-
//the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
171-
//a route record will be setup in route Table, t['betatest']
172-
//the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
173-
//if t[dstHost] not exists, fallback to t[""] is used
174-
//dstHost will be "" as before if not specified by grpc client side.
168+
// grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
169+
// the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
170+
// the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
171+
// a route record will be setup in route Table, t['betatest']
172+
// the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
173+
// if t[dstHost] not exists, fallback to t[""] is used
174+
// dstHost will be "" as before if not specified by grpc client side.
175175
func (g GrpcProxyInterceptor) getDestinationHostFromMetadata(md metadata.MD) (dstHost string) {
176176
dstHost = ""
177177
hosts := md["dsthost"]
@@ -299,14 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
299299
p.lock.Lock()
300300
table := route.GetTable()
301301
for tKey, cs := range p.connections {
302-
if cs.GetState() == connectivity.Shutdown {
302+
state := cs.GetState()
303+
if state == connectivity.Shutdown {
303304
delete(p.connections, tKey)
304305
continue
305306
}
306307

307308
if !hasTarget(tKey, table) {
308309
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
309-
cs.Close()
310+
go func(cs *grpc.ClientConn, state connectivity.State) {
311+
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
312+
defer cancel()
313+
// wait for state to change, or timeout, before closing, in case it's still handling traffic.
314+
cs.WaitForStateChange(ctx, state)
315+
cs.Close()
316+
}(cs, state)
310317
delete(p.connections, tKey)
311318
}
312319
}

proxy/http_headers.go

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ func addResponseHeaders(w http.ResponseWriter, r *http.Request, cfg config.Proxy
3333
// * add X-Real-Ip, if not present
3434
// * ClientIPHeader != "": Set header with that name to <remote ip>
3535
// * TLS connection: Set header with name from `cfg.TLSHeader` to `cfg.TLSHeaderValue`
36-
//
3736
func addHeaders(r *http.Request, cfg config.Proxy, stripPath string) error {
3837
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
3938
if err != nil {

proxy/http_integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const (
3333
globDisabled = true
3434
)
3535

36-
//Global GlobCache for Testing
36+
// Global GlobCache for Testing
3737
var globCache = route.NewGlobCache(1000)
3838

3939
func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {

registry/consul/register.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ const (
2626
// consul. To wait for completion the caller should read the next value from
2727
// the dereg channel.
2828
//
29-
// dereg <- true // trigger deregistration
30-
// <-dereg // wait for completion
31-
//
29+
// dereg <- true // trigger deregistration
30+
// <-dereg // wait for completion
3231
func register(c *api.Client, service *api.AgentServiceRegistration) chan bool {
3332
registered := func(serviceID string) bool {
3433
if serviceID == "" {

route/picker.go

-2
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,3 @@ var randIntn = func(n int) int {
4141
}
4242
return rand.Intn(n)
4343
}
44-
45-

route/picker_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ var oldRandInt = func(n int) int {
6565
if n == 0 {
6666
return 0
6767
}
68-
return int(time.Now().UnixNano()/int64(time.Microsecond) % int64(n))
68+
return int(time.Now().UnixNano() / int64(time.Microsecond) % int64(n))
6969
}
7070

7171
var result int // prevent compiler optimization

route/table.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func SetTable(t Table) {
7272
type Table map[string]Routes
7373

7474
// hostpath splits a 'host/path' prefix into 'host' and '/path' or it returns a
75-
// ':port' prefix as ':port' and '' since there is no path component for TCP
75+
// ':port' prefix as ':port' and since there is no path component for TCP
7676
// connections.
7777
func hostpath(prefix string) (host string, path string) {
7878
if strings.HasPrefix(prefix, ":") {

route/table_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const (
1919
globDisabled = true
2020
)
2121

22-
//Global GlobCache for Testing
22+
// Global GlobCache for Testing
2323
var globCache = NewGlobCache(1000)
2424

2525
func TestTableParse(t *testing.T) {

0 commit comments

Comments
 (0)