diff --git a/config/client.go b/config/client.go index 6ca2c2073..c9c549685 100644 --- a/config/client.go +++ b/config/client.go @@ -44,7 +44,9 @@ import ( const ( // DefStoreLivenessTimeout is the default value for store liveness timeout. - DefStoreLivenessTimeout = "1s" + DefStoreLivenessTimeout = "1s" + DefGrpcInitialWindowSize = 1 << 27 // 128MiB + DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB ) // TiKVClient is the config for tikv client. @@ -62,6 +64,10 @@ type TiKVClient struct { GrpcCompressionType string `toml:"grpc-compression-type" json:"grpc-compression-type"` // GrpcSharedBufferPool is the flag to control whether to share the buffer pool in the TiKV gRPC clients. GrpcSharedBufferPool bool `toml:"grpc-shared-buffer-pool" json:"grpc-shared-buffer-pool"` + // GrpcInitialWindowSize is the value for initial window size on a stream. + GrpcInitialWindowSize int32 `toml:"grpc-initial-window-size" json:"grpc-initial-window-size"` + // GrpcInitialConnWindowSize is the value for initial window size on a connection. + GrpcInitialConnWindowSize int32 `toml:"grpc-initial-conn-window-size" json:"grpc-initial-conn-window-size"` // CommitTimeout is the max time which command 'commit' will wait. CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"` AsyncCommit AsyncCommit `toml:"async-commit" json:"async-commit"` @@ -130,12 +136,14 @@ type CoprocessorCache struct { // DefaultTiKVClient returns default config for TiKVClient. func DefaultTiKVClient() TiKVClient { return TiKVClient{ - GrpcConnectionCount: 4, - GrpcKeepAliveTime: 10, - GrpcKeepAliveTimeout: 3, - GrpcCompressionType: "none", - GrpcSharedBufferPool: false, - CommitTimeout: "41s", + GrpcConnectionCount: 4, + GrpcKeepAliveTime: 10, + GrpcKeepAliveTimeout: 3, + GrpcCompressionType: "none", + GrpcSharedBufferPool: false, + GrpcInitialWindowSize: DefGrpcInitialWindowSize, + GrpcInitialConnWindowSize: DefGrpcInitialConnWindowSize, + CommitTimeout: "41s", AsyncCommit: AsyncCommit{ // FIXME: Find an appropriate default limit. KeysLimit: 256, diff --git a/internal/client/client.go b/internal/client/client.go index 9bed1129f..31d837b57 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -90,12 +90,6 @@ const ( MaxWriteExecutionTime = ReadTimeoutShort - 10*time.Second ) -// Grpc window size -const ( - GrpcInitialWindowSize = 1 << 30 - GrpcInitialConnWindowSize = 1 << 30 -) - // forwardMetadataKey is the key of gRPC metadata which represents a forwarded request. const forwardMetadataKey = "tikv-forwarded-host" @@ -320,8 +314,8 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint opts = append([]grpc.DialOption{ opt, - grpc.WithInitialWindowSize(GrpcInitialWindowSize), - grpc.WithInitialConnWindowSize(GrpcInitialConnWindowSize), + grpc.WithInitialWindowSize(cfg.TiKVClient.GrpcInitialWindowSize), + grpc.WithInitialConnWindowSize(cfg.TiKVClient.GrpcInitialConnWindowSize), grpc.WithUnaryInterceptor(unaryInterceptor), grpc.WithStreamInterceptor(streamInterceptor), grpc.WithDefaultCallOptions(callOptions...), diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index fd8a4fc5a..235cbd0a1 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -28,7 +28,6 @@ import ( "github.com/pkg/errors" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" - "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" @@ -734,8 +733,8 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h ctx, addr, opt, - grpc.WithInitialWindowSize(client.GrpcInitialWindowSize), - grpc.WithInitialConnWindowSize(client.GrpcInitialConnWindowSize), + grpc.WithInitialWindowSize(cfg.TiKVClient.GrpcInitialWindowSize), + grpc.WithInitialConnWindowSize(cfg.TiKVClient.GrpcInitialConnWindowSize), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ BaseDelay: 100 * time.Millisecond, // Default was 1s.