diff --git a/.gitignore b/.gitignore index 733ea559..d05db564 100644 --- a/.gitignore +++ b/.gitignore @@ -31,5 +31,4 @@ # Environemnt variable files .env* -!.env.example - +!.env.example \ No newline at end of file diff --git a/cli/flags.go b/cli/flags.go index f147c12b..f6918c57 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -8,6 +8,7 @@ const ( RelayCategory = "RELAYS" GeneralCategory = "GENERAL" Metrics = "METRICS" + MuxCategory = "RELAY MUXING" ) var flags = []cli.Flag{ @@ -36,10 +37,11 @@ var flags = []cli.Flag{ timeoutGetPayloadFlag, timeoutRegValFlag, maxRetriesFlag, - // metrics metricsFlag, metricsAddrFlag, + // mux + muxConfigFlag, } var ( @@ -183,7 +185,6 @@ var ( Value: 5, Category: RelayCategory, } - // metrics metricsFlag = &cli.BoolFlag{ Name: "metrics", @@ -198,4 +199,11 @@ var ( Usage: "listening address for the metrics server", Category: Metrics, } + // Mux + muxConfigFlag = &cli.StringFlag{ + Name: "mux-config", + Sources: cli.EnvVars("MUX_CONFIG_FILE"), + Usage: "path to YAML configuration file for relay muxing (policies and validator mappings)", + Category: MuxCategory, + } ) diff --git a/cli/main.go b/cli/main.go index b7f7ef60..688a0614 100644 --- a/cli/main.go +++ b/cli/main.go @@ -67,6 +67,7 @@ func start(_ context.Context, cmd *cli.Command) error { var ( genesisForkVersion, genesisTime = setupGenesis(cmd) relays, minBid, relayCheck = setupRelays(cmd) + muxConfig = setupMuxConfig(cmd) listenAddr = cmd.String(addrFlag.Name) metricsEnabled = cmd.Bool(metricsFlag.Name) metricsAddr = cmd.String(metricsAddrFlag.Name) @@ -76,6 +77,7 @@ func start(_ context.Context, cmd *cli.Command) error { Log: log, ListenAddr: listenAddr, Relays: relays, + MuxConfig: muxConfig, GenesisForkVersionHex: genesisForkVersion, GenesisTime: genesisTime, RelayCheck: relayCheck, @@ -221,3 +223,18 @@ func sanitizeMinBid(minBid float64) (*types.U256Str, error) { } return common.FloatEthTo256Wei(minBid) } + +func setupMuxConfig(cmd *cli.Command) *config.MuxConfig { + configPath := cmd.String(muxConfigFlag.Name) + if configPath == "" { + log.Info("no mux config file specified, using default relay selection for all validators") + return nil + } + muxConfig, err := config.LoadMuxConfig(configPath) + if err != nil { + log.WithError(err).Fatal("failed to load mux configuration") + return nil + } + + return muxConfig +} diff --git a/config/mux_config.go b/config/mux_config.go new file mode 100644 index 00000000..aa219e9a --- /dev/null +++ b/config/mux_config.go @@ -0,0 +1,173 @@ +package config + +import ( + "errors" + "fmt" + "io" + "os" + + "github.com/flashbots/mev-boost/server/types" + "gopkg.in/yaml.v3" +) + +var ( + ErrNoFilePathSpecified = errors.New("no file path specified") + ErrNoPolicyDefined = errors.New("at least one policy must be defined") + ErrPolicyNameEmpty = errors.New("policy name cannot be empty") + ErrDuplicatePolicyName = errors.New("duplicate policy name") + ErrPolicyNoRelayers = errors.New("policy must have at least one relayer") + ErrRelayerNameEmpty = errors.New("relayer name cannot be empty") + ErrRelayerURLEmpty = errors.New("relayer URL cannot be empty") + ErrMappingNameEmpty = errors.New("mapping name cannot be empty") + ErrMappingNoPolicySpecified = errors.New("mapping must specify a policy") + ErrMappingUnknownPolicy = errors.New("mapping references unknown policy") + ErrMappingNoPublicKeyFilter = errors.New("mapping must specify at least one public key filter") + ErrPolicyNotFound = errors.New("policy not found") +) + +type MuxConfig struct { + Policies []Policy `yaml:"policies"` + Mappings []Mapping `yaml:"mappings"` +} + +type Policy struct { + Name string `yaml:"name"` + Relayers []Relayer `yaml:"relayers"` +} + +type Relayer struct { + Name string `yaml:"name"` + URL string `yaml:"url"` + HTTPHeader map[string]string `yaml:"http-header,omitempty"` +} + +type Mapping struct { + Name string `yaml:"name"` + Policy string `yaml:"policy"` + Filters Filters `yaml:"filters"` +} + +type Filters struct { + PublicKeys []string `yaml:"public_keys,omitempty"` +} + +// LoadMuxConfig loads the muxing configuration from a yaml file +func LoadMuxConfig(configPath string) (*MuxConfig, error) { + if configPath == "" { + return nil, ErrNoFilePathSpecified + } + + file, err := os.Open(configPath) + if err != nil { + return nil, err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return nil, err + } + + var config MuxConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, err + } + + if err := config.validate(); err != nil { + return nil, err + } + + return &config, nil +} + +func (c *MuxConfig) validate() error { + if len(c.Policies) == 0 { + return ErrNoPolicyDefined + } + + policyNames := make(map[string]bool) + // check if policies are valid + for _, policy := range c.Policies { + if policy.Name == "" { + return ErrPolicyNameEmpty + } + if policyNames[policy.Name] { + return fmt.Errorf("%w: %s", ErrDuplicatePolicyName, policy.Name) + } + policyNames[policy.Name] = true + + if len(policy.Relayers) == 0 { + return fmt.Errorf("policy %s: %w", policy.Name, ErrPolicyNoRelayers) + } + + // check for the relayers if valid + for _, relayer := range policy.Relayers { + if relayer.Name == "" { + return fmt.Errorf("policy %s: %w", policy.Name, ErrRelayerNameEmpty) + } + if relayer.URL == "" { + return fmt.Errorf("policy %s, relayer %s: %w", policy.Name, relayer.Name, ErrRelayerURLEmpty) + } + if _, err := types.NewRelayEntry(relayer.URL); err != nil { + return err + } + } + } + + // check if mappings are valid + // also check if they reference the correct policies + for _, mapping := range c.Mappings { + if mapping.Name == "" { + return ErrMappingNameEmpty + } + if mapping.Policy == "" { + return fmt.Errorf("mapping %s: %w", mapping.Name, ErrMappingNoPolicySpecified) + } + if !policyNames[mapping.Policy] { + return fmt.Errorf("mapping %s: %w: %s", mapping.Name, ErrMappingUnknownPolicy, mapping.Policy) + } + if len(mapping.Filters.PublicKeys) == 0 { + return fmt.Errorf("mapping %s: %w", mapping.Name, ErrMappingNoPublicKeyFilter) + } + } + + return nil +} + +// GetPolicyForValidator returns the policy name for a given validator public key +// Returns empty string if no specific mapping is found (should use default behavior) +func (c *MuxConfig) GetPolicyForValidator(pubkey string) string { + for _, mapping := range c.Mappings { + for _, filterKey := range mapping.Filters.PublicKeys { + if filterKey == pubkey { + return mapping.Policy + } + } + } + return "" +} + +func (c *MuxConfig) GetRelaysForPolicy(policyName string) ([]types.RelayEntry, error) { + for _, policy := range c.Policies { + if policy.Name == policyName { + relays := make([]types.RelayEntry, 0, len(policy.Relayers)) + for _, relayer := range policy.Relayers { + relay, err := types.NewRelayEntry(relayer.URL) + if err != nil { + return nil, fmt.Errorf("failed to create relay entry for %s: %w", relayer.URL, err) + } + relays = append(relays, relay) + } + return relays, nil + } + } + return nil, fmt.Errorf("%w: %s", ErrPolicyNotFound, policyName) +} + +func (c *MuxConfig) GetAllPolicies() []string { + policies := make([]string, len(c.Policies)) + for i, policy := range c.Policies { + policies[i] = policy.Name + } + return policies +} diff --git a/examples/mux_config.yml b/examples/mux_config.yml new file mode 100644 index 00000000..26be5396 --- /dev/null +++ b/examples/mux_config.yml @@ -0,0 +1,34 @@ +# muxing config example +# its following the example specified in the flashbots forum: https://collective.flashbots.net/t/muxing-in-mev-boost-to-handle-diverse-relay-sets/5231/2 + +policies: + - name: "lido-policy" + relayers: + - name: "relayer-1" + url: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay.relayer1.net" + - name: "relayer-2" + url: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay.relayer2.com" + http-header: + X-Custom-Header: "custom-value" + - name: "rocket-pool-policy" + relayers: + - name: "relayer-1" + url: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay.relayer1.io" + - name: "relayer-2" + url: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay.relayer2.com" + +mappings: + - name: "lido-validators" + policy: "lido-policy" + filters: + public_keys: + - "0x8a1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + - "0x8b1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca250" + - "0x8c1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca251" + - name: "rocket-pool-validators" + policy: "rocket-pool-policy" + filters: + public_keys: + - "0x8d1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca252" + - "0x8e1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca253" + - "0x8f1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca254" \ No newline at end of file diff --git a/go.mod b/go.mod index cdf8b56f..7571fc99 100644 --- a/go.mod +++ b/go.mod @@ -61,5 +61,5 @@ require ( golang.org/x/crypto v0.37.0 // indirect golang.org/x/sys v0.32.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v3 v3.0.1 ) diff --git a/server/get_header.go b/server/get_header.go index c70d768f..ad3c0e09 100644 --- a/server/get_header.go +++ b/server/get_header.go @@ -69,8 +69,10 @@ func (m *BoostService) getHeader(log *logrus.Entry, slot phase0.Slot, pubkey, pa relays = make(map[BlockHashHex][]types.RelayEntry) ) + relaysForValidator := m.getRelaysForValidator(pubkey) + // Request a bid from each relay - for _, relay := range m.relays { + for _, relay := range relaysForValidator { wg.Add(1) go func(relay types.RelayEntry) { defer wg.Done() diff --git a/server/get_payload.go b/server/get_payload.go index b298a65b..a8e8407d 100644 --- a/server/get_payload.go +++ b/server/get_payload.go @@ -142,8 +142,15 @@ func (m *BoostService) innerGetPayload(log *logrus.Entry, signedBlindedBeaconBlo log.Warn("bid found but no associated relays") } + relays := m.relays + if len(originalBid.relays) > 0 { + // substitute to use originalBid relays since they are definitely going to contain policy based relays + // which are the only ones we want to request the payload from and not from all the relays. + relays = originalBid.relays + } + // Prepare for requests - resultCh := make(chan payloadResult, len(m.relays)) + resultCh := make(chan payloadResult, len(relays)) var received atomic.Bool go func() { // Make sure we receive a response within the timeout @@ -155,7 +162,7 @@ func (m *BoostService) innerGetPayload(log *logrus.Entry, signedBlindedBeaconBlo requestCtx, requestCtxCancel := context.WithTimeout(context.Background(), m.httpClientGetPayload.Timeout) defer requestCtxCancel() - for _, relay := range m.relays { + for _, relay := range relays { go func(relay types.RelayEntry) { var url string if version == GetPayloadV1 { diff --git a/server/mux_test.go b/server/mux_test.go new file mode 100644 index 00000000..4b0ade70 --- /dev/null +++ b/server/mux_test.go @@ -0,0 +1,142 @@ +package server + +import ( + "testing" + + "github.com/flashbots/mev-boost/config" + "github.com/flashbots/mev-boost/server/types" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +func TestGetRelaysForValidator(t *testing.T) { + relay1, err := types.NewRelayEntry("0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay1.example.com") + require.NoError(t, err) + relay2, err := types.NewRelayEntry("0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay2.example.com") + require.NoError(t, err) + relay3, err := types.NewRelayEntry("0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay3.example.com") + require.NoError(t, err) + + defaultRelays := []types.RelayEntry{relay1, relay2, relay3} + + muxConfig := &config.MuxConfig{ + Policies: []config.Policy{ + { + Name: "lido-policy", + Relayers: []config.Relayer{ + { + Name: "relay1", + URL: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay1.example.com", + }, + }, + }, + { + Name: "rocket-policy", + Relayers: []config.Relayer{ + { + Name: "relay2", + URL: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay2.example.com", + }, + }, + }, + }, + Mappings: []config.Mapping{ + { + Name: "lido-keys", + Policy: "lido-policy", + Filters: config.Filters{ + PublicKeys: []string{ + "0x8a1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249", + }, + }, + }, + { + Name: "rocket-keys", + Policy: "rocket-policy", + Filters: config.Filters{ + PublicKeys: []string{ + "0x8b1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249", + }, + }, + }, + }, + } + service := &BoostService{ + relays: defaultRelays, + muxConfig: muxConfig, + log: logrus.NewEntry(logrus.New()), + } + + t.Run("Validator with policy", func(t *testing.T) { + lidoValidator := "0x8a1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + relays := service.getRelaysForValidator(lidoValidator) + require.Len(t, relays, 1) + require.Equal(t, relay1.URL.String(), relays[0].URL.String()) + + rocketValidator := "0x8b1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + relays = service.getRelaysForValidator(rocketValidator) + require.Len(t, relays, 1) + require.Equal(t, relay2.URL.String(), relays[0].URL.String()) + }) + + t.Run("Validator without policy", func(t *testing.T) { + unknownValidator := "0x8c1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + relays := service.getRelaysForValidator(unknownValidator) + // should return default relays + require.Len(t, relays, 3) + require.Equal(t, defaultRelays, relays) + }) + + t.Run("No mux config", func(t *testing.T) { + serviceNoMux := &BoostService{ + relays: defaultRelays, + muxConfig: nil, + log: logrus.NewEntry(logrus.New()), + } + + lidoValidator := "0x8a1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + relays := serviceNoMux.getRelaysForValidator(lidoValidator) + // should return default relays + require.Len(t, relays, 3) + require.Equal(t, defaultRelays, relays) + }) + + t.Run("Invalid policy", func(t *testing.T) { + invalidMuxConfig := &config.MuxConfig{ + Policies: []config.Policy{ + { + Name: "lido-policy", + Relayers: []config.Relayer{ + { + Name: "relay1", + URL: "0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@relay1.example.com", + }, + }, + }, + }, + Mappings: []config.Mapping{ + { + Name: "rocket-keys", + Policy: "rocket-policy", + Filters: config.Filters{ + PublicKeys: []string{ + "0x8b1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249", + }, + }, + }, + }, + } + + serviceInvalid := &BoostService{ + relays: defaultRelays, + muxConfig: invalidMuxConfig, + log: logrus.NewEntry(logrus.New()), + } + + invalidValidator := "0x8d1d7b8dd64e0aafe7ea7b6c95065c9364cf99d38470c12ee807d55f7de1529ad29ce2c422e0b65e3d5a05c02caca249" + relays := serviceInvalid.getRelaysForValidator(invalidValidator) + // should return default relays + require.Len(t, relays, 3) + require.Equal(t, defaultRelays, relays) + }) +} diff --git a/server/register_validator.go b/server/register_validator.go index cb31fd10..f38518cf 100644 --- a/server/register_validator.go +++ b/server/register_validator.go @@ -3,27 +3,51 @@ package server import ( "bytes" "context" + "encoding/json" + "errors" "fmt" + "mime" "net/http" "strconv" "time" + builderApiV1 "github.com/attestantio/go-builder-client/api/v1" "github.com/flashbots/mev-boost/server/params" "github.com/flashbots/mev-boost/server/types" "github.com/sirupsen/logrus" ) +var ErrUnsupportedContentType = errors.New("unsupported content type") + func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, header http.Header) error { - respErrCh := make(chan error, len(m.relays)) + if m.muxConfig == nil { + return m.sendRegistrationsToRelays(log, regBytes, header, m.relays) + } + pubkeys, err := m.extractValidatorPubkeys(regBytes, header) + if err != nil { + return m.sendRegistrationsToRelays(log, regBytes, header, m.relays) + } + relaysToUse := m.getRelaysForValidators(pubkeys) + + log.WithFields(logrus.Fields{ + "numValidators": len(pubkeys), + "numRelays": len(relaysToUse), + }).Debug("sending validator registrations to relevant relays") + + return m.sendRegistrationsToRelays(log, regBytes, header, relaysToUse) +} + +func (m *BoostService) sendRegistrationsToRelays(log *logrus.Entry, regBytes []byte, header http.Header, relays []types.RelayEntry) error { + respErrCh := make(chan error, len(relays)) log.WithFields(logrus.Fields{ "timeout": m.httpClientRegVal.Timeout, - "numRelays": len(m.relays), + "numRelays": len(relays), "regBytes": len(regBytes), }).Info("calling registerValidator on relays") // Forward request to each relay - for _, relay := range m.relays { + for _, relay := range relays { go func(relay types.RelayEntry) { // Get the URL for this relay requestURL := relay.GetURI(params.PathRegisterValidator) @@ -72,7 +96,7 @@ func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, hea } // Return OK if any relay responds OK - for range m.relays { + for range relays { respErr := <-respErrCh if respErr == nil { // Goroutines are independent, so if there are a lot of configured @@ -87,3 +111,51 @@ func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, hea log.Debug("no relays accepted the registrations") return errNoSuccessfulRelayResponse } + +func (m *BoostService) extractValidatorPubkeys(regBytes []byte, header http.Header) ([]string, error) { + contentType := header.Get("Content-Type") + parsedContentType, _, err := mime.ParseMediaType(contentType) + if err != nil { + parsedContentType = "application/json" + } + + var registrations []*builderApiV1.SignedValidatorRegistration + + switch parsedContentType { + case "application/json": + if err := json.Unmarshal(regBytes, ®istrations); err != nil { + return nil, err + } + case "application/octet-stream": + var sszRegistrations builderApiV1.SignedValidatorRegistrations + if err := sszRegistrations.UnmarshalSSZ(regBytes); err != nil { + return nil, err + } + registrations = sszRegistrations.Registrations + default: + return nil, ErrUnsupportedContentType + } + pubkeys := make([]string, len(registrations)) + for i, reg := range registrations { + pubkeys[i] = reg.Message.Pubkey.String() + } + + return pubkeys, nil +} + +func (m *BoostService) getRelaysForValidators(pubkeys []string) []types.RelayEntry { + relayMap := make(map[string]types.RelayEntry) + + for _, pubkey := range pubkeys { + validatorRelays := m.getRelaysForValidator(pubkey) + for _, relay := range validatorRelays { + relayMap[relay.URL.String()] = relay + } + } + relays := make([]types.RelayEntry, 0, len(relayMap)) + for _, relay := range relayMap { + relays = append(relays, relay) + } + + return relays +} diff --git a/server/service.go b/server/service.go index 172535bd..892b61ca 100644 --- a/server/service.go +++ b/server/service.go @@ -55,6 +55,7 @@ type BoostServiceOpts struct { Log *logrus.Entry ListenAddr string Relays []types.RelayEntry + MuxConfig *config.MuxConfig GenesisForkVersionHex string GenesisTime uint64 RelayCheck bool @@ -72,6 +73,7 @@ type BoostServiceOpts struct { type BoostService struct { listenAddr string relays []types.RelayEntry + muxConfig *config.MuxConfig log *logrus.Entry srv *http.Server relayCheck bool @@ -107,6 +109,7 @@ func NewBoostService(opts BoostServiceOpts) (*BoostService, error) { return &BoostService{ listenAddr: opts.ListenAddr, relays: opts.Relays, + muxConfig: opts.MuxConfig, log: opts.Log, relayCheck: opts.RelayCheck, relayMinBid: opts.RelayMinBid, @@ -520,3 +523,20 @@ func (m *BoostService) CheckRelays() int { wg.Wait() return int(numSuccessRequestsToRelay) } + +func (m *BoostService) getRelaysForValidator(pubkey string) []types.RelayEntry { + if m.muxConfig == nil { + return m.relays + } + + policyName := m.muxConfig.GetPolicyForValidator(pubkey) + if policyName == "" { + return m.relays + } + + policyRelays, err := m.muxConfig.GetRelaysForPolicy(policyName) + if err != nil { + return m.relays + } + return policyRelays +}