Skip to content

Commit c370934

Browse files
committed
add muxing to service handlers
1 parent db6963a commit c370934

File tree

7 files changed

+140
-22
lines changed

7 files changed

+140
-22
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@
3333
.env*
3434
!.env.example
3535

36+
examples

config/mux_config.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"errors"
45
"fmt"
56
"io"
67
"os"
@@ -9,6 +10,21 @@ import (
910
"gopkg.in/yaml.v3"
1011
)
1112

13+
var (
14+
ErrNoFilePathSpecified = errors.New("no file path specified")
15+
ErrNoPolicyDefined = errors.New("at least one policy must be defined")
16+
ErrPolicyNameEmpty = errors.New("policy name cannot be empty")
17+
ErrDuplicatePolicyName = errors.New("duplicate policy name")
18+
ErrPolicyNoRelayers = errors.New("policy must have at least one relayer")
19+
ErrRelayerNameEmpty = errors.New("relayer name cannot be empty")
20+
ErrRelayerURLEmpty = errors.New("relayer URL cannot be empty")
21+
ErrMappingNameEmpty = errors.New("mapping name cannot be empty")
22+
ErrMappingNoPolicySpecified = errors.New("mapping must specify a policy")
23+
ErrMappingUnknownPolicy = errors.New("mapping references unknown policy")
24+
ErrMappingNoPublicKeyFilter = errors.New("mapping must specify at least one public key filter")
25+
ErrPolicyNotFound = errors.New("policy not found")
26+
)
27+
1228
type MuxConfig struct {
1329
Policies []Policy `yaml:"policies"`
1430
Mappings []Mapping `yaml:"mappings"`
@@ -38,7 +54,7 @@ type Filters struct {
3854
// LoadMuxConfig loads the muxing configuration from a yaml file
3955
func LoadMuxConfig(configPath string) (*MuxConfig, error) {
4056
if configPath == "" {
41-
return nil, nil
57+
return nil, ErrNoFilePathSpecified
4258
}
4359

4460
file, err := os.Open(configPath)
@@ -66,31 +82,31 @@ func LoadMuxConfig(configPath string) (*MuxConfig, error) {
6682

6783
func (c *MuxConfig) validate() error {
6884
if len(c.Policies) == 0 {
69-
return fmt.Errorf("atleast one policy must be defined")
85+
return ErrNoPolicyDefined
7086
}
7187

7288
policyNames := make(map[string]bool)
7389
// check if policies are valid
7490
for _, policy := range c.Policies {
7591
if policy.Name == "" {
76-
return fmt.Errorf("policy name cant be empty")
92+
return ErrPolicyNameEmpty
7793
}
7894
if policyNames[policy.Name] {
79-
return fmt.Errorf("duplicate policy name: %s", policy.Name)
95+
return fmt.Errorf("%w: %s", ErrDuplicatePolicyName, policy.Name)
8096
}
8197
policyNames[policy.Name] = true
8298

8399
if len(policy.Relayers) == 0 {
84-
return fmt.Errorf("policy %s must have atleast one relayer", policy.Name)
100+
return fmt.Errorf("policy %s: %w", policy.Name, ErrPolicyNoRelayers)
85101
}
86102

87103
// check for the relayers if valid
88104
for _, relayer := range policy.Relayers {
89105
if relayer.Name == "" {
90-
return fmt.Errorf("relayer name cant be empty in policy %s", policy.Name)
106+
return fmt.Errorf("policy %s: %w", policy.Name, ErrRelayerNameEmpty)
91107
}
92108
if relayer.URL == "" {
93-
return fmt.Errorf("relayer url cant be empty for %s in policy %s", relayer.Name, policy.Name)
109+
return fmt.Errorf("policy %s, relayer %s: %w", policy.Name, relayer.Name, ErrRelayerURLEmpty)
94110
}
95111
if _, err := types.NewRelayEntry(relayer.URL); err != nil {
96112
return err
@@ -102,16 +118,16 @@ func (c *MuxConfig) validate() error {
102118
// also check if they reference the correct policies
103119
for _, mapping := range c.Mappings {
104120
if mapping.Name == "" {
105-
return fmt.Errorf("mapping name cant be empty")
121+
return ErrMappingNameEmpty
106122
}
107123
if mapping.Policy == "" {
108-
return fmt.Errorf("mapping %s must specify a policy", mapping.Name)
124+
return fmt.Errorf("mapping %s: %w", mapping.Name, ErrMappingNoPolicySpecified)
109125
}
110126
if !policyNames[mapping.Policy] {
111-
return fmt.Errorf("mapping %s references unknown policy: %s", mapping.Name, mapping.Policy)
127+
return fmt.Errorf("mapping %s: %w: %s", mapping.Name, ErrMappingUnknownPolicy, mapping.Policy)
112128
}
113129
if len(mapping.Filters.PublicKeys) == 0 {
114-
return fmt.Errorf("mapping %s must specify atleast one public key filter", mapping.Name)
130+
return fmt.Errorf("mapping %s: %w", mapping.Name, ErrMappingNoPublicKeyFilter)
115131
}
116132
}
117133

@@ -145,13 +161,13 @@ func (c *MuxConfig) GetRelaysForPolicy(policyName string) ([]types.RelayEntry, e
145161
return relays, nil
146162
}
147163
}
148-
return nil, fmt.Errorf("policy not found: %s", policyName)
164+
return nil, fmt.Errorf("%w: %s", ErrPolicyNotFound, policyName)
149165
}
150166

151167
func (c *MuxConfig) GetAllPolicies() []string {
152168
policies := make([]string, len(c.Policies))
153-
for _, policy := range c.Policies {
154-
policies = append(policies, policy.Name)
169+
for i, policy := range c.Policies {
170+
policies[i] = policy.Name
155171
}
156172
return policies
157173
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ require (
5858
golang.org/x/crypto v0.37.0 // indirect
5959
golang.org/x/sys v0.32.0 // indirect
6060
gopkg.in/yaml.v2 v2.4.0 // indirect
61-
gopkg.in/yaml.v3 v3.0.1 // indirect
61+
gopkg.in/yaml.v3 v3.0.1
6262
)

server/get_header.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ func (m *BoostService) getHeader(log *logrus.Entry, slot phase0.Slot, pubkey, pa
6868
relays = make(map[BlockHashHex][]types.RelayEntry)
6969
)
7070

71+
relaysForValidator := m.getRelaysForValidator(pubkey)
72+
7173
// Request a bid from each relay
72-
for _, relay := range m.relays {
74+
for _, relay := range relaysForValidator {
7375
wg.Add(1)
7476
go func(relay types.RelayEntry) {
7577
defer wg.Done()

server/get_payload.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,15 @@ func (m *BoostService) innerGetPayload(log *logrus.Entry, signedBlindedBeaconBlo
141141
log.Warn("bid found but no associated relays")
142142
}
143143

144+
relays := m.relays
145+
if len(originalBid.relays) > 0 {
146+
// substitute to use originalBid relays since they are def going to contain policy based relays
147+
// which are the only ones we want to request the payload from and not from all the relays.
148+
relays = originalBid.relays
149+
}
150+
144151
// Prepare for requests
145-
resultCh := make(chan payloadResult, len(m.relays))
152+
resultCh := make(chan payloadResult, len(relays))
146153
var received atomic.Bool
147154
go func() {
148155
// Make sure we receive a response within the timeout
@@ -154,7 +161,7 @@ func (m *BoostService) innerGetPayload(log *logrus.Entry, signedBlindedBeaconBlo
154161
requestCtx, requestCtxCancel := context.WithTimeout(context.Background(), m.httpClientGetPayload.Timeout)
155162
defer requestCtxCancel()
156163

157-
for _, relay := range m.relays {
164+
for _, relay := range relays {
158165
go func(relay types.RelayEntry) {
159166
var url string
160167
if version == GetPayloadV1 {

server/register_validator.go

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,49 @@ package server
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
7+
"errors"
68
"fmt"
9+
"mime"
710
"net/http"
811

12+
builderApiV1 "github.com/attestantio/go-builder-client/api/v1"
913
"github.com/flashbots/mev-boost/server/params"
1014
"github.com/flashbots/mev-boost/server/types"
1115
"github.com/sirupsen/logrus"
1216
)
1317

18+
var ErrUnsupportedContentType = errors.New("unsupported content typee")
19+
1420
func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, header http.Header) error {
15-
respErrCh := make(chan error, len(m.relays))
21+
if m.muxConfig == nil {
22+
return m.sendRegistrationsToRelays(log, regBytes, header, m.relays)
23+
}
24+
pubkeys, err := m.extractValidatorPubkeys(regBytes, header)
25+
if err != nil {
26+
return m.sendRegistrationsToRelays(log, regBytes, header, m.relays)
27+
}
28+
relaysToUse := m.getRelaysForValidators(pubkeys)
29+
30+
log.WithFields(logrus.Fields{
31+
"numValidators": len(pubkeys),
32+
"numRelays": len(relaysToUse),
33+
}).Debug("sending validator registrations to relevant relays")
34+
35+
return m.sendRegistrationsToRelays(log, regBytes, header, relaysToUse)
36+
}
37+
38+
func (m *BoostService) sendRegistrationsToRelays(log *logrus.Entry, regBytes []byte, header http.Header, relays []types.RelayEntry) error {
39+
respErrCh := make(chan error, len(relays))
1640

1741
log.WithFields(logrus.Fields{
1842
"timeout": m.httpClientRegVal.Timeout,
19-
"numRelays": len(m.relays),
43+
"numRelays": len(relays),
2044
"regBytes": len(regBytes),
2145
}).Info("calling registerValidator on relays")
2246

2347
// Forward request to each relay
24-
for _, relay := range m.relays {
48+
for _, relay := range relays {
2549
go func(relay types.RelayEntry) {
2650
// Get the URL for this relay
2751
requestURL := relay.GetURI(params.PathRegisterValidator)
@@ -67,7 +91,7 @@ func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, hea
6791
}
6892

6993
// Return OK if any relay responds OK
70-
for range m.relays {
94+
for range relays {
7195
respErr := <-respErrCh
7296
if respErr == nil {
7397
// Goroutines are independent, so if there are a lot of configured
@@ -82,3 +106,51 @@ func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, hea
82106
log.Debug("no relays accepted the registrations")
83107
return errNoSuccessfulRelayResponse
84108
}
109+
110+
func (m *BoostService) extractValidatorPubkeys(regBytes []byte, header http.Header) ([]string, error) {
111+
contentType := header.Get("Content-Type")
112+
parsedContentType, _, err := mime.ParseMediaType(contentType)
113+
if err != nil {
114+
parsedContentType = "application/json"
115+
}
116+
117+
var registrations []*builderApiV1.SignedValidatorRegistration
118+
119+
switch parsedContentType {
120+
case "application/json":
121+
if err := json.Unmarshal(regBytes, &registrations); err != nil {
122+
return nil, err
123+
}
124+
case "application/octet-stream":
125+
var sszRegistrations builderApiV1.SignedValidatorRegistrations
126+
if err := sszRegistrations.UnmarshalSSZ(regBytes); err != nil {
127+
return nil, err
128+
}
129+
registrations = sszRegistrations.Registrations
130+
default:
131+
return nil, ErrUnsupportedContentType
132+
}
133+
pubkeys := make([]string, len(registrations))
134+
for i, reg := range registrations {
135+
pubkeys[i] = reg.Message.Pubkey.String()
136+
}
137+
138+
return pubkeys, nil
139+
}
140+
141+
func (m *BoostService) getRelaysForValidators(pubkeys []string) []types.RelayEntry {
142+
relayMap := make(map[string]types.RelayEntry)
143+
144+
for _, pubkey := range pubkeys {
145+
validatorRelays := m.getRelaysForValidator(pubkey)
146+
for _, relay := range validatorRelays {
147+
relayMap[relay.URL.String()] = relay
148+
}
149+
}
150+
relays := make([]types.RelayEntry, 0, len(relayMap))
151+
for _, relay := range relayMap {
152+
relays = append(relays, relay)
153+
}
154+
155+
return relays
156+
}

server/service.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type BoostServiceOpts struct {
5454
Log *logrus.Entry
5555
ListenAddr string
5656
Relays []types.RelayEntry
57+
MuxConfig *config.MuxConfig
5758
GenesisForkVersionHex string
5859
GenesisTime uint64
5960
RelayCheck bool
@@ -69,6 +70,7 @@ type BoostServiceOpts struct {
6970
type BoostService struct {
7071
listenAddr string
7172
relays []types.RelayEntry
73+
muxConfig *config.MuxConfig
7274
log *logrus.Entry
7375
srv *http.Server
7476
relayCheck bool
@@ -102,6 +104,7 @@ func NewBoostService(opts BoostServiceOpts) (*BoostService, error) {
102104
return &BoostService{
103105
listenAddr: opts.ListenAddr,
104106
relays: opts.Relays,
107+
muxConfig: opts.MuxConfig,
105108
log: opts.Log,
106109
relayCheck: opts.RelayCheck,
107110
relayMinBid: opts.RelayMinBid,
@@ -482,3 +485,20 @@ func (m *BoostService) CheckRelays() int {
482485
wg.Wait()
483486
return int(numSuccessRequestsToRelay)
484487
}
488+
489+
func (m *BoostService) getRelaysForValidator(pubkey string) []types.RelayEntry {
490+
if m.muxConfig == nil {
491+
return m.relays
492+
}
493+
494+
policyName := m.muxConfig.GetPolicyForValidator(pubkey)
495+
if policyName == "" {
496+
return m.relays
497+
}
498+
499+
policyRelays, err := m.muxConfig.GetRelaysForPolicy(policyName)
500+
if err != nil {
501+
return m.relays
502+
}
503+
return policyRelays
504+
}

0 commit comments

Comments
 (0)