Skip to content
2 changes: 1 addition & 1 deletion client/orb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Client) selectNode(ctx context.Context, service string, opts *client.Ca
}

// Run the configured Selector to get a node from the resolved nodes.
node, err := opts.Selector(ctx, service, nodes, opts.PreferredTransports, opts.AnyTransport)
node, err := opts.Selector(ctx, service, nodes, opts.PreferredTransports, opts.AnyTransport, opts.Metadata)
if err != nil {
c.Logger().Error("Failed to resolve service", "error", err, "service", service)
return "", "", err
Expand Down
20 changes: 11 additions & 9 deletions client/orb_transport/drpc/drpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"os"
"testing"

"github.com/stretchr/testify/suite"

"github.com/go-orb/go-orb/log"
"github.com/go-orb/go-orb/registry"
"github.com/go-orb/go-orb/server"
"github.com/go-orb/go-orb/types"
"github.com/go-orb/plugins/client/tests"
"github.com/stretchr/testify/suite"

"github.com/go-orb/plugins/server/drpc"

echohandler "github.com/go-orb/plugins/client/tests/handler/echo"
Expand All @@ -28,24 +28,20 @@ import (
_ "github.com/go-orb/plugins/log/slog"
)

func setupServer(sn string) (*tests.SetupData, error) {
func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) {
ctx, cancel := context.WithCancel(context.Background())

setupData := &tests.SetupData{}

sv := ""

logger, err := log.New()
if err != nil {
cancel()

return nil, err
}

reg, err := registry.New(sn, sv, nil, &types.Components{}, logger)
if err != nil {
cancel()

return nil, err
}

Expand All @@ -55,10 +51,16 @@ func setupServer(sn string) (*tests.SetupData, error) {
fileHInstance := new(filehandler.Handler)
fileHRegister := fileproto.RegisterFileServiceHandler(fileHInstance)

ep, err := drpc.New(drpc.NewConfig(drpc.WithHandlers(echoHRegister, fileHRegister)), logger, reg)
options := []server.Option{
drpc.WithHandlers(echoHRegister, fileHRegister),
}
if metadata != nil {
options = append(options, server.WithEntrypointMetadata(metadata))
}

ep, err := drpc.New(drpc.NewConfig(options...), logger, reg)
if err != nil {
cancel()

return nil, err
}

Expand Down
9 changes: 7 additions & 2 deletions client/orb_transport/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
_ "github.com/go-orb/plugins/log/slog"
)

func setupServer(sn string) (*tests.SetupData, error) {
func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) {
ctx, cancel := context.WithCancel(context.Background())

setupData := &tests.SetupData{}
Expand Down Expand Up @@ -59,14 +59,19 @@ func setupServer(sn string) (*tests.SetupData, error) {
grpc.NewConfig(server.WithEntrypointName("grpc"),
grpc.WithHandlers(hRegister, fileHRegister),
grpc.WithInsecure(),
server.WithEntrypointMetadata(metadata),
), logger, reg)
if err != nil {
cancel()

return nil, err
}

ep2, err := grpc.New(grpc.NewConfig(server.WithEntrypointName("grpcs"), grpc.WithHandlers(hRegister, fileHRegister)), logger, reg)
ep2, err := grpc.New(
grpc.NewConfig(server.WithEntrypointName("grpcs"),
grpc.WithHandlers(hRegister, fileHRegister),
server.WithEntrypointMetadata(metadata),
), logger, reg)
if err != nil {
cancel()

Expand Down
4 changes: 3 additions & 1 deletion client/orb_transport/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
_ "github.com/go-orb/plugins/log/slog"
)

func setupServer(sn string) (*tests.SetupData, error) {
func setupServer(sn string, metadata map[string]string) (*tests.SetupData, error) {
ctx, cancel := context.WithCancel(context.Background())

setupData := &tests.SetupData{}
Expand Down Expand Up @@ -54,6 +54,7 @@ func setupServer(sn string) (*tests.SetupData, error) {
server.WithEntrypointName("http"),
http.WithHandlers(hRegister),
http.WithInsecure(),
server.WithEntrypointMetadata(metadata),
),
logger,
reg,
Expand All @@ -70,6 +71,7 @@ func setupServer(sn string) (*tests.SetupData, error) {
http.WithHandlers(hRegister),
http.WithInsecure(),
http.WithAllowH2C(),
server.WithEntrypointMetadata(metadata),
),
logger,
reg,
Expand Down
85 changes: 81 additions & 4 deletions client/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"time"

"github.com/go-orb/go-orb/client"
"github.com/go-orb/go-orb/codecs"
Expand Down Expand Up @@ -133,15 +134,16 @@ type TestSuite struct {

entrypoints []server.Entrypoint
ctx context.Context
setupServer func(service string) (*SetupData, error)
setupServer func(service string, metadata map[string]string) (*SetupData, error)
stopServer context.CancelFunc

// To create more clients in Benchmarks.
clientName string
}

// NewSuite creates a new test suite.
func NewSuite(setupServer func(service string) (*SetupData, error), transports []string, requests ...TestRequest) *TestSuite {
func NewSuite(setupServer func(service string, metadata map[string]string) (*SetupData, error),
transports []string, requests ...TestRequest) *TestSuite {
s := new(TestSuite)

s.Transports = transports
Expand Down Expand Up @@ -174,7 +176,7 @@ type TestRequest struct {
func (s *TestSuite) SetupSuite() {
var err error

setupData, err := s.setupServer(ServiceName)
setupData, err := s.setupServer(ServiceName, nil)
if err != nil {
s.Require().NoError(err, "while setting up the server")
}
Expand Down Expand Up @@ -218,7 +220,7 @@ func (s *TestSuite) TearDownSuite() {
}

func (s *TestSuite) doRequest(ctx context.Context, req *TestRequest, clientWire client.Type, transport string) {
opts := []client.CallOption{}
var opts []client.CallOption
if req.ContentType != "" {
opts = append(opts, client.WithContentType(req.ContentType))
}
Expand Down Expand Up @@ -318,6 +320,81 @@ func (s *TestSuite) TestMetadata() {
}
}

// TestMetadataFilter tests if metadata gets filtered.
func (s *TestSuite) TestMetadataFilter() {
regions := []string{"as-1", "eu-1", "us-1"}

const commonServerName = "metadata-server"

setupDatas := make([]*SetupData, 0, len(regions))
clientTypes := make([]client.Type, 0, len(regions))

defer func() {
for _, cli := range clientTypes {
if stopErr := cli.Stop(context.Background()); stopErr != nil {
s.T().Logf("Error stopping client: %v", stopErr)
}
}

for _, setup := range setupDatas {
setup.Stop()
}
}()

for _, region := range regions {
setupData, err := s.setupServer(commonServerName, map[string]string{"region": region})
s.Require().NoError(err, "Server setup failed for region "+region)

s.Require().NoError(setupData.Registry.Start(setupData.Ctx),
"Registry start failed for region "+region)

for _, ep := range setupData.Entrypoints {
s.Require().NoError(ep.Start(setupData.Ctx),
"Entrypoint start failed for region "+region)
}

setupDatas = append(setupDatas, setupData)

cli, err := client.New(nil, &types.Components{}, setupData.Logger, setupData.Registry)
s.Require().NoError(err, "Client creation failed for region "+region)

s.Require().NoError(cli.Start(setupData.Ctx),
"Client start failed for region "+region)

clientTypes = append(clientTypes, cli)
}

time.Sleep(time.Second)

mainClient := clientTypes[0]
echoClient := echo.NewStreamsClient(mainClient)

for _, region := range regions {
s.Run("Matching region "+region, func() {
resp, err := echoClient.Call(
context.Background(),
commonServerName,
&echo.CallRequest{Name: "test"},
client.WithRegistryMetadata("region", region),
)

s.Require().NoError(err, "Request with matching region should succeed")
s.Require().Equal("Hello test", resp.GetMsg(), "Unexpected response message")
})

s.Run("Non-matching region for "+region, func() {
_, err := echoClient.Call(
context.Background(),
commonServerName,
&echo.CallRequest{Name: "test"},
client.WithRegistryMetadata("region", "wrong-region"),
)

s.Require().Error(err, "Request with non-matching region should fail")
})
}
}

// TestFileUpload tests the client streaming functionality for file uploads.
func (s *TestSuite) TestFileUpload() {
// Create a file service client
Expand Down
2 changes: 1 addition & 1 deletion registry/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (r *TestSuite) TestMetadataFiltering() {
r.Require().NoError(err)

// Manually filter for production services
prodServices := []*registry.Service{}
var prodServices []*registry.Service

for _, svc := range allServices {
// Skip non-test services (those not starting with orb.test.filter)
Expand Down
7 changes: 6 additions & 1 deletion server/drpc/drpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func (s *Server) Transport() string {
return "drpc"
}

// Metadata returns the metadata of this entrypoint.
func (s *Server) Metadata() map[string]string {
return s.config.Metadata
}

// EntrypointID returns the id of this entrypoint (node) in the registry.
func (s *Server) EntrypointID() string {
return s.registry.ServiceName() + types.DefaultSeparator + s.config.Name
Expand Down Expand Up @@ -189,7 +194,7 @@ func (s *Server) registryService() *registry.Service {
ID: s.EntrypointID(),
Address: s.Address(),
Transport: s.Transport(),
Metadata: make(map[string]string),
Metadata: s.Metadata(),
}

return &registry.Service{
Expand Down
7 changes: 6 additions & 1 deletion server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func (s *Server) Transport() string {
return "grpc"
}

// Metadata returns the metadata of this entrypoint.
func (s *Server) Metadata() map[string]string {
return s.config.Metadata
}

// EntrypointID returns the id of this entrypoint (node) in the registry.
func (s *Server) EntrypointID() string {
return s.registry.ServiceName() + types.DefaultSeparator + s.config.Name
Expand Down Expand Up @@ -339,7 +344,7 @@ func (s *Server) registryService() *registry.Service {
ID: s.EntrypointID(),
Address: s.Address(),
Transport: s.Transport(),
Metadata: make(map[string]string),
Metadata: s.Metadata(),
}

eps := s.getEndpoints()
Expand Down
7 changes: 6 additions & 1 deletion server/http/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ func (s *Server) Transport() string {
return "http"
}

// Metadata returns the metadata of this entrypoint.
func (s *Server) Metadata() map[string]string {
return s.config.Metadata
}

// EntrypointID returns the id (configured name) of this entrypoint in the registry.
func (s *Server) EntrypointID() string {
return s.config.Name
Expand Down Expand Up @@ -384,7 +389,7 @@ func (s *Server) registryService() (*registry.Service, error) {
ID: s.EntrypointID(),
Address: s.Address(),
Transport: s.Transport(),
Metadata: make(map[string]string),
Metadata: s.Metadata(),
}

eps, err := s.getEndpoints()
Expand Down
5 changes: 5 additions & 0 deletions server/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (s *Server) EntrypointID() string {
return s.config.Name
}

// Metadata returns the metadata of this entrypoint.
func (s *Server) Metadata() map[string]string {
return s.config.Metadata
}

// String returns the entrypoint type.
func (s *Server) String() string {
return s.Type()
Expand Down
Loading