Skip to content

Commit caa2a58

Browse files
authored
Add breaker to grpcx client middleware. (#162)
1 parent 615c1d4 commit caa2a58

File tree

3 files changed

+90
-1
lines changed

3 files changed

+90
-1
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ language: go
44
go:
55
- "1.14.x"
66
- "1.13.x"
7-
- "1.11.x"
87
- "1.x"
98
env:
109
- GO111MODULE=on

grpcx/middleware/breaker.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package middleware
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/msales/pkg/v4/breaker"
8+
"github.com/msales/pkg/v4/stats"
9+
"google.golang.org/grpc"
10+
)
11+
12+
const (
13+
breakerErrorKey = "breaker.error"
14+
stateTag = "state"
15+
)
16+
17+
// WithClientBreaker adds breaker to client request.
18+
func WithClientBreaker(br *breaker.Breaker) grpc.UnaryClientInterceptor {
19+
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
20+
err := br.Run(func() error {
21+
return invoker(ctx, method, req, reply, cc, opts...)
22+
})
23+
24+
if errors.Is(err, breaker.ErrOpenState) {
25+
_ = stats.Inc(ctx, breakerErrorKey, 1, 1.0, stateTag, "open")
26+
}
27+
28+
if errors.Is(err, breaker.ErrTooManyRequests) {
29+
_ = stats.Inc(ctx, breakerErrorKey, 1, 1.0, stateTag, "too_many_requests")
30+
}
31+
32+
return err
33+
}
34+
}

grpcx/middleware/breaker_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package middleware_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/msales/pkg/v4/breaker"
10+
"github.com/stretchr/testify/assert"
11+
"google.golang.org/grpc"
12+
13+
. "github.com/msales/pkg/v4/grpcx/middleware"
14+
)
15+
16+
var breakerErr = errors.New("breaker: circuit breaker is open")
17+
18+
func TestWithBreaker(t *testing.T) {
19+
ctx := context.Background()
20+
21+
br := breaker.NewBreaker(
22+
breaker.RateFuse(1),
23+
breaker.WithSleep(1*time.Second),
24+
breaker.WithTestRequests(1),
25+
)
26+
27+
interceptor := WithClientBreaker(br)
28+
err := interceptor(ctx, "method", nil, nil, nil, func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
29+
return nil
30+
})
31+
32+
assert.Nil(t, err)
33+
}
34+
35+
func TestWithBreaker_Errored(t *testing.T) {
36+
ctx := context.Background()
37+
38+
br := breaker.NewBreaker(
39+
breaker.RateFuse(10),
40+
breaker.WithSleep(1*time.Second),
41+
breaker.WithTestRequests(1),
42+
)
43+
44+
interceptor := WithClientBreaker(br)
45+
err := interceptor(ctx, "method", nil, nil, nil, func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
46+
return testErr
47+
})
48+
49+
assert.Equal(t, testErr, err)
50+
51+
err = interceptor(ctx, "method", nil, nil, nil, func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
52+
return testErr
53+
})
54+
55+
assert.Equal(t, breakerErr, err)
56+
}

0 commit comments

Comments
 (0)