Skip to content

Commit 9f31613

Browse files
authored
feat(shard-distributor): add SpectatorPeerChooser for shard-aware routing (#7478)
This PR depends on #7476 (spectator return metadata) being merged first. **What changed?** Added `SpectatorPeerChooser` that implements YARPC's peer.Chooser interface to route requests to the correct executor based on shard ownership. **Why?** Enable executor-to-executor communication in the shard distributor canary system. The peer chooser queries the Spectator to find which executor owns a shard, then routes requests to that executor's gRPC address. **How did you test it?** Unit tests **Potential risks** None **Release notes** **Documentation Changes** --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 8a47ad7 commit 9f31613

File tree

2 files changed

+367
-0
lines changed

2 files changed

+367
-0
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"go.uber.org/fx"
9+
"go.uber.org/yarpc/api/peer"
10+
"go.uber.org/yarpc/api/transport"
11+
"go.uber.org/yarpc/peer/hostport"
12+
"go.uber.org/yarpc/yarpcerrors"
13+
14+
"github.com/uber/cadence/common/log"
15+
"github.com/uber/cadence/common/log/tag"
16+
)
17+
18+
const (
19+
NamespaceHeader = "x-shard-distributor-namespace"
20+
grpcAddressMetadataKey = "grpc_address"
21+
)
22+
23+
// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method
24+
type SpectatorPeerChooserInterface interface {
25+
peer.Chooser
26+
SetSpectators(spectators *Spectators)
27+
}
28+
29+
// SpectatorPeerChooser is a peer.Chooser that uses the Spectator to route requests
30+
// to the correct executor based on shard ownership.
31+
// This is the shard distributor equivalent of Cadence's RingpopPeerChooser.
32+
//
33+
// Flow:
34+
// 1. Client calls RPC with yarpc.WithShardKey("shard-key")
35+
// 2. Choose() is called with req.ShardKey = "shard-key"
36+
// 3. Query Spectator for shard owner
37+
// 4. Extract grpc_address from owner metadata
38+
// 5. Create/reuse peer for that address
39+
// 6. Return peer to YARPC for connection
40+
type SpectatorPeerChooser struct {
41+
spectators *Spectators
42+
transport peer.Transport
43+
logger log.Logger
44+
namespace string
45+
46+
peersMutex sync.RWMutex
47+
peers map[string]peer.Peer // grpc_address -> peer
48+
}
49+
50+
type SpectatorPeerChooserParams struct {
51+
fx.In
52+
Transport peer.Transport
53+
Logger log.Logger
54+
}
55+
56+
// NewSpectatorPeerChooser creates a new peer chooser that routes based on shard distributor ownership
57+
func NewSpectatorPeerChooser(
58+
params SpectatorPeerChooserParams,
59+
) SpectatorPeerChooserInterface {
60+
return &SpectatorPeerChooser{
61+
transport: params.Transport,
62+
logger: params.Logger,
63+
peers: make(map[string]peer.Peer),
64+
}
65+
}
66+
67+
// Start satisfies the peer.Chooser interface
68+
func (c *SpectatorPeerChooser) Start() error {
69+
c.logger.Info("Starting shard distributor peer chooser", tag.ShardNamespace(c.namespace))
70+
return nil
71+
}
72+
73+
// Stop satisfies the peer.Chooser interface
74+
func (c *SpectatorPeerChooser) Stop() error {
75+
c.logger.Info("Stopping shard distributor peer chooser", tag.ShardNamespace(c.namespace))
76+
77+
// Release all peers
78+
c.peersMutex.Lock()
79+
defer c.peersMutex.Unlock()
80+
81+
for addr, p := range c.peers {
82+
if err := c.transport.ReleasePeer(p, &noOpSubscriber{}); err != nil {
83+
c.logger.Error("Failed to release peer", tag.Error(err), tag.Address(addr))
84+
}
85+
}
86+
c.peers = make(map[string]peer.Peer)
87+
88+
return nil
89+
}
90+
91+
// IsRunning satisfies the peer.Chooser interface
92+
func (c *SpectatorPeerChooser) IsRunning() bool {
93+
return true
94+
}
95+
96+
// Choose returns a peer for the given shard key by:
97+
// 0. Looking up the spectator for the namespace using the x-shard-distributor-namespace header
98+
// 1. Looking up the shard owner via the Spectator
99+
// 2. Extracting the grpc_address from the owner's metadata
100+
// 3. Creating/reusing a peer for that address
101+
//
102+
// The ShardKey in the request is the shard key (e.g., shard ID)
103+
// The function returns
104+
// peer: the peer to use for the request
105+
// onFinish: a function to call when the request is finished (currently no-op)
106+
// err: the error if the request failed
107+
func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer peer.Peer, onFinish func(error), err error) {
108+
if req.ShardKey == "" {
109+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty")
110+
}
111+
112+
// Get the spectator for the namespace
113+
namespace, ok := req.Headers.Get(NamespaceHeader)
114+
if !ok || namespace == "" {
115+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires x-shard-distributor-namespace header to be non-empty")
116+
}
117+
118+
spectator, err := c.spectators.ForNamespace(namespace)
119+
if err != nil {
120+
return nil, nil, yarpcerrors.InvalidArgumentErrorf("get spectator for namespace %s: %w", namespace, err)
121+
}
122+
123+
// Query spectator for shard owner
124+
owner, err := spectator.GetShardOwner(ctx, req.ShardKey)
125+
if err != nil {
126+
return nil, nil, yarpcerrors.UnavailableErrorf("get shard owner for key %s: %v", req.ShardKey, err)
127+
}
128+
129+
// Extract GRPC address from owner metadata
130+
grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey]
131+
if !ok || grpcAddress == "" {
132+
return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey)
133+
}
134+
135+
// Get peer for this address
136+
peer, err = c.getOrCreatePeer(grpcAddress)
137+
if err != nil {
138+
return nil, nil, yarpcerrors.InternalErrorf("get or create peer for address %s: %v", grpcAddress, err)
139+
}
140+
141+
return peer, func(error) {}, nil
142+
}
143+
144+
func (c *SpectatorPeerChooser) SetSpectators(spectators *Spectators) {
145+
c.spectators = spectators
146+
}
147+
148+
func (c *SpectatorPeerChooser) getOrCreatePeer(grpcAddress string) (peer.Peer, error) {
149+
c.peersMutex.RLock()
150+
peer, ok := c.peers[grpcAddress]
151+
c.peersMutex.RUnlock()
152+
153+
if ok {
154+
return peer, nil
155+
}
156+
157+
// Create new peer for this address
158+
c.peersMutex.Lock()
159+
defer c.peersMutex.Unlock()
160+
161+
// Check again in case another goroutine added it
162+
if peer, ok := c.peers[grpcAddress]; ok {
163+
return peer, nil
164+
}
165+
166+
peer, err := c.transport.RetainPeer(hostport.Identify(grpcAddress), &noOpSubscriber{})
167+
if err != nil {
168+
return nil, fmt.Errorf("retain peer: %w", err)
169+
}
170+
171+
c.peers[grpcAddress] = peer
172+
return peer, nil
173+
}
174+
175+
// noOpSubscriber is a no-op implementation of peer.Subscriber
176+
type noOpSubscriber struct{}
177+
178+
func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
"go.uber.org/mock/gomock"
10+
"go.uber.org/yarpc/api/peer"
11+
"go.uber.org/yarpc/api/transport"
12+
"go.uber.org/yarpc/transport/grpc"
13+
14+
"github.com/uber/cadence/common/log/testlogger"
15+
)
16+
17+
func TestSpectatorPeerChooser_Choose_MissingShardKey(t *testing.T) {
18+
chooser := &SpectatorPeerChooser{
19+
logger: testlogger.New(t),
20+
peers: make(map[string]peer.Peer),
21+
}
22+
23+
req := &transport.Request{
24+
ShardKey: "",
25+
Headers: transport.NewHeaders(),
26+
}
27+
28+
p, onFinish, err := chooser.Choose(context.Background(), req)
29+
30+
assert.Error(t, err)
31+
assert.Nil(t, p)
32+
assert.Nil(t, onFinish)
33+
assert.Contains(t, err.Error(), "ShardKey")
34+
}
35+
36+
func TestSpectatorPeerChooser_Choose_MissingNamespaceHeader(t *testing.T) {
37+
chooser := &SpectatorPeerChooser{
38+
logger: testlogger.New(t),
39+
peers: make(map[string]peer.Peer),
40+
}
41+
42+
req := &transport.Request{
43+
ShardKey: "shard-1",
44+
Headers: transport.NewHeaders(),
45+
}
46+
47+
p, onFinish, err := chooser.Choose(context.Background(), req)
48+
49+
assert.Error(t, err)
50+
assert.Nil(t, p)
51+
assert.Nil(t, onFinish)
52+
assert.Contains(t, err.Error(), "x-shard-distributor-namespace")
53+
}
54+
55+
func TestSpectatorPeerChooser_Choose_SpectatorNotFound(t *testing.T) {
56+
chooser := &SpectatorPeerChooser{
57+
logger: testlogger.New(t),
58+
peers: make(map[string]peer.Peer),
59+
spectators: &Spectators{spectators: make(map[string]Spectator)},
60+
}
61+
62+
req := &transport.Request{
63+
ShardKey: "shard-1",
64+
Headers: transport.NewHeaders().With(NamespaceHeader, "unknown-namespace"),
65+
}
66+
67+
p, onFinish, err := chooser.Choose(context.Background(), req)
68+
69+
assert.Error(t, err)
70+
assert.Nil(t, p)
71+
assert.Nil(t, onFinish)
72+
assert.Contains(t, err.Error(), "spectator not found")
73+
}
74+
75+
func TestSpectatorPeerChooser_StartStop(t *testing.T) {
76+
chooser := &SpectatorPeerChooser{
77+
logger: testlogger.New(t),
78+
peers: make(map[string]peer.Peer),
79+
}
80+
81+
err := chooser.Start()
82+
require.NoError(t, err)
83+
84+
assert.True(t, chooser.IsRunning())
85+
86+
err = chooser.Stop()
87+
assert.NoError(t, err)
88+
}
89+
90+
func TestSpectatorPeerChooser_SetSpectators(t *testing.T) {
91+
chooser := &SpectatorPeerChooser{
92+
logger: testlogger.New(t),
93+
}
94+
95+
spectators := &Spectators{spectators: make(map[string]Spectator)}
96+
chooser.SetSpectators(spectators)
97+
98+
assert.Equal(t, spectators, chooser.spectators)
99+
}
100+
101+
func TestSpectatorPeerChooser_Choose_Success(t *testing.T) {
102+
ctrl := gomock.NewController(t)
103+
defer ctrl.Finish()
104+
105+
mockSpectator := NewMockSpectator(ctrl)
106+
peerTransport := grpc.NewTransport()
107+
108+
chooser := &SpectatorPeerChooser{
109+
transport: peerTransport,
110+
logger: testlogger.New(t),
111+
peers: make(map[string]peer.Peer),
112+
spectators: &Spectators{
113+
spectators: map[string]Spectator{
114+
"test-namespace": mockSpectator,
115+
},
116+
},
117+
}
118+
119+
ctx := context.Background()
120+
req := &transport.Request{
121+
ShardKey: "shard-1",
122+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
123+
}
124+
125+
// Mock spectator to return shard owner with grpc_address
126+
mockSpectator.EXPECT().
127+
GetShardOwner(ctx, "shard-1").
128+
Return(&ShardOwner{
129+
ExecutorID: "executor-1",
130+
Metadata: map[string]string{
131+
grpcAddressMetadataKey: "127.0.0.1:7953",
132+
},
133+
}, nil)
134+
135+
// Execute
136+
p, onFinish, err := chooser.Choose(ctx, req)
137+
138+
// Assert
139+
assert.NoError(t, err)
140+
assert.NotNil(t, p)
141+
assert.NotNil(t, onFinish)
142+
assert.Equal(t, "127.0.0.1:7953", p.Identifier())
143+
assert.Len(t, chooser.peers, 1)
144+
}
145+
146+
func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) {
147+
ctrl := gomock.NewController(t)
148+
defer ctrl.Finish()
149+
150+
mockSpectator := NewMockSpectator(ctrl)
151+
peerTransport := grpc.NewTransport()
152+
153+
chooser := &SpectatorPeerChooser{
154+
transport: peerTransport,
155+
logger: testlogger.New(t),
156+
peers: make(map[string]peer.Peer),
157+
spectators: &Spectators{
158+
spectators: map[string]Spectator{
159+
"test-namespace": mockSpectator,
160+
},
161+
},
162+
}
163+
164+
req := &transport.Request{
165+
ShardKey: "shard-1",
166+
Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"),
167+
}
168+
169+
// First call creates the peer
170+
mockSpectator.EXPECT().
171+
GetShardOwner(gomock.Any(), "shard-1").
172+
Return(&ShardOwner{
173+
ExecutorID: "executor-1",
174+
Metadata: map[string]string{
175+
grpcAddressMetadataKey: "127.0.0.1:7953",
176+
},
177+
}, nil).Times(2)
178+
179+
firstPeer, _, err := chooser.Choose(context.Background(), req)
180+
require.NoError(t, err)
181+
182+
// Second call should reuse the same peer
183+
secondPeer, _, err := chooser.Choose(context.Background(), req)
184+
185+
// Assert - should reuse existing peer
186+
assert.NoError(t, err)
187+
assert.Equal(t, firstPeer, secondPeer)
188+
assert.Len(t, chooser.peers, 1)
189+
}

0 commit comments

Comments
 (0)