Skip to content

Commit

Permalink
Revert "*: enable gRPC pooling (#7742)"
Browse files Browse the repository at this point in the history
This reverts commit ca8ab90.
  • Loading branch information
GiedriusS committed Oct 1, 2024
1 parent d5ca4b3 commit ba76e5a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 42 deletions.
60 changes: 22 additions & 38 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/version"
"go.uber.org/automaxprocs/maxprocs"
_ "google.golang.org/grpc/encoding/proto"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -34,11 +35,9 @@ import (
// use the original golang/protobuf package we can continue serializing
// messages from our dependencies, particularly from OTEL. Original version
// from Vitess.
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"

// Guarantee that the built-in proto is called registered before this one
// so that it can be replaced.
"google.golang.org/grpc/encoding"
_ "google.golang.org/grpc/encoding/proto"
)

Expand All @@ -49,56 +48,41 @@ const Name = "proto"
// but also handles non-vtproto messages that are needed
// for stuff like OpenTelemetry. Otherwise, such errors appear:
// error while marshaling: failed to marshal, message is *v1.ExportTraceServiceRequest (missing vtprotobuf helpers).
type vtprotoCodec struct {
fallback encoding.CodecV2
}
type vtprotoCodec struct{}

type vtprotoMessage interface {
MarshalVT() ([]byte, error)
UnmarshalVT([]byte) error
MarshalToSizedBufferVT(data []byte) (int, error)
SizeVT() int
}

func (c *vtprotoCodec) Marshal(v any) (mem.BufferSlice, error) {
if m, ok := v.(vtprotoMessage); ok {
size := m.SizeVT()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBufferVT(buf); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
}
pool := mem.DefaultBufferPool()
buf := pool.Get(size)
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
pool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, pool)}, nil
func (vtprotoCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case vtprotoMessage:
return v.MarshalVT()
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("failed to marshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}

return c.fallback.Marshal(v)
}

func (c *vtprotoCodec) Unmarshal(data mem.BufferSlice, v any) error {
if m, ok := v.(vtprotoMessage); ok {
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
defer buf.Free()
return m.UnmarshalVT(buf.ReadOnlyData())
func (vtprotoCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case vtprotoMessage:
return v.UnmarshalVT(data)
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("failed to unmarshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}

return c.fallback.Unmarshal(data, v)
}

func (vtprotoCodec) Name() string {
return Name
}

func init() {
encoding.RegisterCodecV2(&vtprotoCodec{
fallback: encoding.GetCodecV2("proto"),
})
encoding.RegisterCodec(vtprotoCodec{})
}

func main() {
Expand Down
6 changes: 2 additions & 4 deletions scripts/genproto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ PROTOC_GO_INJECT_TAG_BIN=${PROTOC_GO_INJECT_TAG_BIN:-protoc-go-inject-tag}
PROTOC_GEN_GO_BIN=${PROTOC_GEN_GO_BIN:-protoc-gen-go}
PROTOC_GEN_GO_GRPC_BIN=${PROTOC_GEN_GO_GRPC_BIN:-protoc-gen-go-grpc}
PROTOC_GEN_GO_VTPROTO_BIN=${PROTOC_GEN_GO_VTPROTO_BIN:-protoc-gen-go-vtproto}
VTPROTOBUF_VERSION="$(go list -m all | grep 'github.com/planetscale/vtprotobuf' | awk '{ print $2 }')"
VTPROTOBUF_INCLUDE_PATH="$(go env GOMODCACHE)/github.com/planetscale/vtprotobuf@${VTPROTOBUF_VERSION}/include"

if ! [[ "scripts/genproto.sh" =~ $0 ]]; then
echo "must be run from repository root"
Expand All @@ -36,7 +34,7 @@ for dir in ${DIRS}; do
--plugin=protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \
--plugin=protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} -I=${VTPROTOBUF_INCLUDE_PATH} \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} \
--go_opt=Mstore/storepb/types.proto=github.com/thanos-io/thanos/pkg/store/storepb \
--go_opt=Mrules/rulespb/rpc.proto=github.com/thanos-io/thanos/pkg/rules/rulespb \
--go-vtproto_out=. --go-vtproto_opt=features=marshal+unmarshal+size+equal,paths=source_relative \
Expand All @@ -59,7 +57,7 @@ for dir in ${CORTEX_DIRS}; do
--plugin=protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \
--plugin=protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \
--go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} -I=${VTPROTOBUF_INCLUDE_PATH} \
--go-grpc_out=. --go-grpc_opt=paths=source_relative -I=. -I=${INCLUDE_PATH} \
--go_opt=Mstore/storepb/types.proto=github.com/thanos-io/thanos/pkg/store/storepb \
--go_opt=Mrules/rulespb/rpc.proto=github.com/thanos-io/thanos/pkg/rules/rulespb \
--go-vtproto_out=. --go-vtproto_opt=features=marshal+unmarshal+size+equal,paths=source_relative \
Expand Down

0 comments on commit ba76e5a

Please sign in to comment.