diff --git a/agreement/abstractions.go b/agreement/abstractions.go index 90d4e1cbfc..be55a614e1 100644 --- a/agreement/abstractions.go +++ b/agreement/abstractions.go @@ -293,6 +293,10 @@ type Network interface { // associated with the given MessageHandle. Disconnect(MessageHandle) + // Ignore sends the Network a hint to ignore messages from the peer + // associated with the given MessageHandle. + Ignore(MessageHandle) + // Start notifies the network that the agreement service is ready // to start receiving messages. Start() diff --git a/agreement/actions.go b/agreement/actions.go index 29e37b5c4c..2dbc10835e 100644 --- a/agreement/actions.go +++ b/agreement/actions.go @@ -175,6 +175,7 @@ func (a networkAction) do(ctx context.Context, s *Service) { case disconnect: s.Network.Disconnect(a.h) case ignore: + s.Network.Ignore(a.h) // pass } } diff --git a/agreement/demux.go b/agreement/demux.go index 01429be30f..0dba5db187 100644 --- a/agreement/demux.go +++ b/agreement/demux.go @@ -122,6 +122,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol. return } d.UpdateEventsQueue(eventQueueTokenizing[tag], 1) + // syncCh := net.SyncChannel(raw.MessageHandle) o, err := tokenize(raw.Data) if err != nil { diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index 1d0d7d622a..efe1186d67 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -269,11 +269,20 @@ func (n *NetworkFacade) RegisterHandlers(dispatch []network.TaggedMessageHandler n.mux.RegisterHandlers(dispatch) } +func (n *NetworkFacade) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) { + // n.mux.RegisterValidatorHandlers(dispatch) +} + // ClearHandlers func (n *NetworkFacade) ClearHandlers() { n.mux.ClearHandlers([]network.Tag{}) } +// ClearValidatorHandlers deregisters all the existing message handlers. +func (n *NetworkFacade) ClearValidatorHandlers() { + // n.mux.ClearValidatorHandlers([]network.Tag{}) +} + // SetDownstreamFilter sets the downstream filter. func (n *NetworkFacade) SetDownstreamFilter(f DownstreamFilter) { n.downstreamMu.Lock() diff --git a/agreement/gossip/network.go b/agreement/gossip/network.go index 41294d28cf..8563761496 100644 --- a/agreement/gossip/network.go +++ b/agreement/gossip/network.go @@ -44,7 +44,8 @@ const ( ) type messageMetadata struct { - raw network.IncomingMessage + raw network.IncomingMessage + syncCh chan network.ForwardingPolicy } // networkImpl wraps network.GossipNode to provide a compatible interface with agreement. @@ -86,6 +87,13 @@ func (i *networkImpl) Start() { {Tag: protocol.VoteBundleTag, MessageHandler: network.HandlerFunc(i.processBundleMessage)}, } i.net.RegisterHandlers(handlers) + + validateHandlers := []network.TaggedMessageValidatorHandler{ + {Tag: protocol.AgreementVoteTag, MessageHandler: network.ValidateHandleFunc(i.processValidateVoteMessage)}, + {Tag: protocol.ProposalPayloadTag, MessageHandler: network.ValidateHandleFunc(i.processValidateProposalMessage)}, + {Tag: protocol.VoteBundleTag, MessageHandler: network.ValidateHandleFunc(i.processValidateBundleMessage)}, + } + i.net.RegisterValidatorHandlers(validateHandlers) } func messageMetadataFromHandle(h agreement.MessageHandle) *messageMetadata { @@ -99,6 +107,10 @@ func (i *networkImpl) processVoteMessage(raw network.IncomingMessage) network.Ou return i.processMessage(raw, i.voteCh, agreementVoteMessageType) } +func (i *networkImpl) processValidateVoteMessage(raw network.IncomingMessage) network.OutgoingMessage { + return i.processValidateMessage(raw, i.voteCh, agreementVoteMessageType) +} + func (i *networkImpl) processProposalMessage(raw network.IncomingMessage) network.OutgoingMessage { if i.trace != nil { i.trace.HashTrace(messagetracer.Proposal, raw.Data) @@ -106,10 +118,21 @@ func (i *networkImpl) processProposalMessage(raw network.IncomingMessage) networ return i.processMessage(raw, i.proposalCh, agreementProposalMessageType) } +func (i *networkImpl) processValidateProposalMessage(raw network.IncomingMessage) network.OutgoingMessage { + if i.trace != nil { + i.trace.HashTrace(messagetracer.Proposal, raw.Data) + } + return i.processValidateMessage(raw, i.proposalCh, agreementProposalMessageType) +} + func (i *networkImpl) processBundleMessage(raw network.IncomingMessage) network.OutgoingMessage { return i.processMessage(raw, i.bundleCh, agreementBundleMessageType) } +func (i *networkImpl) processValidateBundleMessage(raw network.IncomingMessage) network.OutgoingMessage { + return i.processValidateMessage(raw, i.bundleCh, agreementBundleMessageType) +} + // i.e. processMessage func (i *networkImpl) processMessage(raw network.IncomingMessage, submit chan<- agreement.Message, msgType string) network.OutgoingMessage { metadata := &messageMetadata{raw: raw} @@ -130,6 +153,32 @@ func (i *networkImpl) processMessage(raw network.IncomingMessage, submit chan<- return network.OutgoingMessage{Action: network.Ignore} } +// i.e. processMessage +func (i *networkImpl) processValidateMessage(raw network.IncomingMessage, submit chan<- agreement.Message, msgType string) network.OutgoingMessage { + metadata := &messageMetadata{ + raw: raw, + syncCh: make(chan network.ForwardingPolicy, 1), + } + + var action network.ForwardingPolicy + select { + case submit <- agreement.Message{MessageHandle: agreement.MessageHandle(metadata), Data: raw.Data}: + action = <-metadata.syncCh + // It would be slightly better to measure at de-queue + // time, but that happens in many places in code and + // this is much easier. + messagesHandledTotal.Inc(nil) + messagesHandledByType.Add(msgType, 1) + default: + messagesDroppedTotal.Inc(nil) + messagesDroppedByType.Add(msgType, 1) + action = network.Ignore + } + + // Immediately ignore everything here, sometimes Relay/Broadcast/Disconnect later based on API handles saved from IncomingMessage + return network.OutgoingMessage{Action: action} +} + func (i *networkImpl) Messages(t protocol.Tag) <-chan agreement.Message { switch t { case protocol.AgreementVoteTag: @@ -160,9 +209,14 @@ func (i *networkImpl) Relay(h agreement.MessageHandle, t protocol.Tag, data []by i.log.Infof("agreement: could not (pseudo)relay message with tag %v: %v", t, err) } } else { - err = i.net.Relay(context.Background(), t, data, false, metadata.raw.Sender) - if err != nil { - i.log.Infof("agreement: could not relay message from %v with tag %v: %v", metadata.raw.Sender, t, err) + if metadata.syncCh != nil { + // Synchronous validation path + metadata.syncCh <- network.Accept + } else { + err = i.net.Relay(context.Background(), t, data, false, metadata.raw.Sender) + if err != nil { + i.log.Infof("agreement: could not relay message from %v with tag %v: %v", metadata.raw.Sender, t, err) + } } } return @@ -176,5 +230,23 @@ func (i *networkImpl) Disconnect(h agreement.MessageHandle) { return } + if metadata.syncCh != nil { + // Synchronous validation path + metadata.syncCh <- network.Disconnect + return + } i.net.Disconnect(metadata.raw.Sender) } + +func (i *networkImpl) Ignore(h agreement.MessageHandle) { + metadata := messageMetadataFromHandle(h) + + if metadata == nil { // synthentic loopback + return + } + + if metadata.syncCh != nil { + // Synchronous validation path + metadata.syncCh <- network.Ignore + } +} diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index 0c19ac7bab..56d60f6884 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -91,11 +91,20 @@ func (w *whiteholeNetwork) RegisterHandlers(dispatch []network.TaggedMessageHand w.mux.RegisterHandlers(dispatch) } +func (w *whiteholeNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) { + w.mux.RegisterValidatorHandlers(dispatch) +} + // ClearHandlers deregisters all the existing message handlers. func (w *whiteholeNetwork) ClearHandlers() { w.mux.ClearHandlers([]network.Tag{}) } +// ClearValidatorHandlers deregisters all the existing message handlers. +func (w *whiteholeNetwork) ClearValidatorHandlers() { + w.mux.ClearValidatorHandlers([]network.Tag{}) +} + func (w *whiteholeNetwork) Address() (string, bool) { return "", false } diff --git a/agreement/proposalManager.go b/agreement/proposalManager.go index bde20fc67e..1ac76e2e59 100644 --- a/agreement/proposalManager.go +++ b/agreement/proposalManager.go @@ -103,7 +103,7 @@ func (m *proposalManager) handleNewPeriod(r routerHandle, p player, e thresholdE // proposal-vote. A voteFiltered event is returned if the proposal-vote is // not fresh or is a duplicate. Otherwise, an empty event is returned. // -// - A voteVerified event is delievered after verification was attempted on a +// - A voteVerified event is delivered after verification was attempted on a // proposal-vote. A voteMalformed event is returned if the proposal-vote is // ill-formed and resulted from a corrupt process. A voteFiltered event is // emitted if the vote is not fresh or is a duplicate. Otherwise the diff --git a/agreement/service_test.go b/agreement/service_test.go index b46aefb659..db3fdbef47 100644 --- a/agreement/service_test.go +++ b/agreement/service_test.go @@ -510,6 +510,9 @@ func (e *testingNetworkEndpoint) Disconnect(h MessageHandle) { e.parent.disconnect(e.id, sourceID) } +func (e *testingNetworkEndpoint) Ignore(h MessageHandle) { +} + func (e *testingNetworkEndpoint) Start() {} type activityMonitor struct { diff --git a/network/mesh_test.go b/network/mesh_test.go index dae83940e3..62bd6c4810 100644 --- a/network/mesh_test.go +++ b/network/mesh_test.go @@ -35,12 +35,28 @@ import ( "github.com/stretchr/testify/require" ) +type mockSub struct { + closed *atomic.Bool +} + +func (m *mockSub) Next(ctx context.Context) (*pubsub.Message, error) { + if m.closed.Load() { + return nil, pubsub.ErrTopicClosed + } + return nil, nil +} + +func (m *mockSub) Cancel() {} + // mockP2PService implements p2p.Service and counts DialPeersUntilTargetCount invocations. // It relies on p2p's meshThreadInner's defer of DialPeersUntilTargetCount to detect invocation. -type mockP2PService struct{ dialCount atomic.Int32 } +type mockP2PService struct { + dialCount atomic.Int32 + closed atomic.Bool +} func (m *mockP2PService) Start() error { return nil } -func (m *mockP2PService) Close() error { return nil } +func (m *mockP2PService) Close() error { m.closed.Store(true); return nil } func (m *mockP2PService) ID() peer.ID { return "" } func (m *mockP2PService) IDSigner() *p2piface.PeerIDChallengeSigner { return nil } func (m *mockP2PService) AddrInfo() peer.AddrInfo { return peer.AddrInfo{} } @@ -51,7 +67,7 @@ func (m *mockP2PService) ClosePeer(peer.ID) error { return nil func (m *mockP2PService) Conns() []network.Conn { return nil } func (m *mockP2PService) ListPeersForTopic(string) []peer.ID { return nil } func (m *mockP2PService) Subscribe(string, pubsub.ValidatorEx) (p2piface.SubNextCancellable, error) { - return nil, nil + return &mockSub{&m.closed}, nil } func (m *mockP2PService) Publish(context.Context, string, []byte) error { return nil } func (m *mockP2PService) GetHTTPClient(*peer.AddrInfo, limitcaller.ConnectionTimeStore, time.Duration) (*http.Client, error) { @@ -79,7 +95,6 @@ func TestMesh_HybridRelayP2PInnerCall(t *testing.T) { mockSvc := &mockP2PService{} net.p2pNetwork.service = mockSvc - net.p2pNetwork.relayMessages = false // prevent pubsub startup err = net.Start() require.NoError(t, err) diff --git a/network/metrics.go b/network/metrics.go index 5400c927a3..bea35196e9 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -194,6 +194,18 @@ func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) { networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1) + case p2p.AVTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.AgreementVoteTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.AgreementVoteTag), 1) + case p2p.PPTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.ProposalPayloadTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.ProposalPayloadTag), 1) + case p2p.VBTopicName: + networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PReceivedBytesByTag.Add(string(protocol.VoteBundleTag), uint64(len(rpc.Publish[i].Data))) + networkP2PMessageReceivedByTag.Add(string(protocol.VoteBundleTag), 1) } } } @@ -211,6 +223,18 @@ func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data))) networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1) + case p2p.AVTopicName: + networkP2PSentBytesByTag.Add(string(protocol.AgreementVoteTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.AgreementVoteTag), 1) + case p2p.PPTopicName: + networkP2PSentBytesByTag.Add(string(protocol.ProposalPayloadTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.ProposalPayloadTag), 1) + case p2p.VBTopicName: + networkP2PSentBytesByTag.Add(string(protocol.VoteBundleTag), uint64(len(rpc.Publish[i].Data))) + networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil) + networkP2PMessageSentByTag.Add(string(protocol.VoteBundleTag), 1) } } } diff --git a/network/msgCompressor_test.go b/network/msgCompressor_test.go index 333c0b0baa..3a50d9bf9f 100644 --- a/network/msgCompressor_test.go +++ b/network/msgCompressor_test.go @@ -338,7 +338,7 @@ func TestVoteStatefulCompressionAbortMessage(t *testing.T) { factory voteNetFactory }{ {"Websocket", makeWebsocketVoteNets}, - {"P2P", makeP2PVoteNets}, + // {"P2P", makeP2PVoteNets}, // TODO: enable when p2p pubsub transport supports vote compression } for _, f := range factories { @@ -439,7 +439,7 @@ func TestVoteStatefulVoteCompression(t *testing.T) { factory voteNetFactory }{ {"Websocket", makeWebsocketVoteNets}, - {"P2P", makeP2PVoteNets}, + // {"P2P", makeP2PVoteNets}, // TODO: enable when p2p pubsub transport supports vote compression } for _, f := range factories { diff --git a/network/p2p/pubsub.go b/network/p2p/pubsub.go index 2e5906aeb3..278c7e443b 100644 --- a/network/p2p/pubsub.go +++ b/network/p2p/pubsub.go @@ -55,6 +55,15 @@ const ( // Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version const TXTopicName = "algotx01" +// AVTopicName defines a pubsub topic for Agreement Vote messages +const AVTopicName = "algoav01" + +// PPTopicName defines a pubsub topic for Proposal Payload messages +const PPTopicName = "algopp01" + +// VBTopicName defines a pubsub topic for Vote Bundle messages +const VBTopicName = "algovb01" + const incomingThreads = 20 // matches to number wsNetwork workers // deriveGossipSubParams derives the gossip sub parameters from the cfg.GossipFanout value @@ -73,6 +82,21 @@ func deriveGossipSubParams(numOutgoingConns int) pubsub.GossipSubParams { func makePubSub(ctx context.Context, host host.Host, numOutgoingConns int, opts ...pubsub.Option) (*pubsub.PubSub, error) { gossipSubParams := deriveGossipSubParams(numOutgoingConns) + topicScoringOpts := pubsub.TopicScoreParams{ + TopicWeight: 0.1, + + TimeInMeshWeight: 0.0002778, // ~1/3600 + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 1, + + FirstMessageDeliveriesWeight: 0.5, // max value is 50 + FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute), + FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes + + // invalid messages decay after 1 hour + InvalidMessageDeliveriesWeight: -1000, + InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), + } options := []pubsub.Option{ pubsub.WithGossipSubParams(gossipSubParams), pubsub.WithPeerScore(&pubsub.PeerScoreParams{ @@ -82,21 +106,10 @@ func makePubSub(ctx context.Context, host host.Host, numOutgoingConns int, opts AppSpecificScore: func(p peer.ID) float64 { return 1000 }, Topics: map[string]*pubsub.TopicScoreParams{ - TXTopicName: { - TopicWeight: 0.1, - - TimeInMeshWeight: 0.0002778, // ~1/3600 - TimeInMeshQuantum: time.Second, - TimeInMeshCap: 1, - - FirstMessageDeliveriesWeight: 0.5, // max value is 50 - FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute), - FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes - - // invalid messages decay after 1 hour - InvalidMessageDeliveriesWeight: -1000, - InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour), - }, + TXTopicName: &topicScoringOpts, + AVTopicName: &topicScoringOpts, + PPTopicName: &topicScoringOpts, + VBTopicName: &topicScoringOpts, }, }, &pubsub.PeerScoreThresholds{ @@ -108,7 +121,7 @@ func makePubSub(ctx context.Context, host host.Host, numOutgoingConns int, opts }, ), // pubsub.WithPeerGater(&pubsub.PeerGaterParams{}), - pubsub.WithSubscriptionFilter(pubsub.WrapLimitSubscriptionFilter(pubsub.NewAllowlistSubscriptionFilter(TXTopicName), 100)), + pubsub.WithSubscriptionFilter(pubsub.WrapLimitSubscriptionFilter(pubsub.NewAllowlistSubscriptionFilter(TXTopicName, AVTopicName, PPTopicName, VBTopicName), 100)), // pubsub.WithEventTracer(jsonTracer), pubsub.WithValidateQueueSize(256), pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), @@ -140,7 +153,7 @@ func (s *serviceImpl) getOrCreateTopic(topicName string) (*pubsub.Topic, error) if _, ok := s.topics[topicName]; !ok { var topt []pubsub.TopicOpt switch topicName { - case TXTopicName: + case TXTopicName, AVTopicName, PPTopicName, VBTopicName: topt = append(topt, pubsub.WithTopicMessageIdFn(txMsgID)) } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 959d960ec7..4dda847ff0 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -224,9 +224,20 @@ type p2pPeerStats struct { // gossipSubTags defines protocol messages that are relayed using GossipSub var gossipSubTags = map[protocol.Tag]string{ - protocol.TxnTag: p2p.TXTopicName, + protocol.TxnTag: p2p.TXTopicName, + protocol.AgreementVoteTag: p2p.AVTopicName, + protocol.ProposalPayloadTag: p2p.PPTopicName, + protocol.VoteBundleTag: p2p.VBTopicName, } +var gossipSubTopics = func() map[string]protocol.Tag { + result := make(map[string]protocol.Tag) + for tag, topic := range gossipSubTags { + result[topic] = tag + } + return result +}() + // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebookAddresses []string, genesisInfo GenesisInfo, node NodeInfo, identityOpts *identityOpts, meshCreator MeshCreator) (*P2PNetwork, error) { const readBufferLen = 2048 @@ -449,6 +460,9 @@ func (n *P2PNetwork) Start() error { go n.txTopicHandleLoop() } + n.wg.Add(1) + go n.agreementTopicHandleLoop() + if n.wsPeersConnectivityCheckTicker != nil { n.wsPeersConnectivityCheckTicker.Stop() } @@ -1173,6 +1187,57 @@ func (n *P2PNetwork) txTopicHandleLoop() { wg.Wait() } +// agreementTopicHandleLoop reads messages from the pubsub topic for AV/PP +func (n *P2PNetwork) agreementTopicHandleLoop() { + defer n.wg.Done() + subAV, err := n.service.Subscribe(p2p.AVTopicName, n.txTopicValidator) + if err != nil { + n.log.Errorf("Failed to subscribe to topic %s: %v", p2p.AVTopicName, err) + return + } + n.log.Debugf("Subscribed to topic %s", p2p.AVTopicName) + + subPP, err := n.service.Subscribe(p2p.PPTopicName, n.txTopicValidator) + if err != nil { + n.log.Errorf("Failed to subscribe to topic %s: %v", p2p.PPTopicName, err) + return + } + n.log.Debugf("Subscribed to topic %s", p2p.PPTopicName) + + subVB, err := n.service.Subscribe(p2p.VBTopicName, n.txTopicValidator) + if err != nil { + n.log.Errorf("Failed to subscribe to topic %s: %v", p2p.VBTopicName, err) + return + } + n.log.Debugf("Subscribed to topic %s", p2p.VBTopicName) + + var wg sync.WaitGroup + handler := func(ctx context.Context, topic string, sub p2p.SubNextCancellable, peerID peer.ID, log logging.Logger) { + defer wg.Done() + for { + // msg from sub.Next not used since all work done by txTopicValidator + _, err := sub.Next(ctx) + if err != nil { + if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled { + log.Errorf("Error reading from subscription %v, peerId %s", err, peerID) + } + log.Debugf("Cancelling subscription to topic %s due Subscription.Next error: %v", topic, err) + sub.Cancel() + return + } + } + } + + const threads = incomingThreads / 2 // TBD: perf test + wg.Add(threads * 3) + for i := 0; i < threads; i++ { + go handler(n.ctx, p2p.AVTopicName, subAV, n.service.ID(), n.log) + go handler(n.ctx, p2p.PPTopicName, subPP, n.service.ID(), n.log) + go handler(n.ctx, p2p.VBTopicName, subVB, n.service.ID(), n.log) + } + wg.Wait() +} + type gsPeer struct { peerID peer.ID net *P2PNetwork @@ -1200,9 +1265,20 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * } n.wsPeersLock.Unlock() + if msg.Topic == nil { + n.log.Warnf("Received message with nil topic from peer %s", peerID) + return pubsub.ValidationReject + } + + tag, ok := gossipSubTopics[*msg.Topic] + if !ok { + n.log.Warnf("Received message for unknown topic %s from peer %s", *msg.Topic, peerID) + return pubsub.ValidationReject + } + inmsg := IncomingMessage{ Sender: sender, - Tag: protocol.TxnTag, + Tag: tag, Data: msg.Data, Net: n, Received: time.Now().UnixNano(), diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 9c44f1f74c..1471da9562 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -120,13 +120,9 @@ func TestP2PSubmitTX(t *testing.T) { passThroughHandler := []TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, - MessageHandler: struct { - ValidateHandleFunc - }{ - ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { - return OutgoingMessage{Action: Accept, Tag: msg.Tag} - }), - }, + MessageHandler: ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { + return OutgoingMessage{Action: Accept, Tag: msg.Tag} + }), }, } @@ -213,13 +209,9 @@ func TestP2PSubmitTXNoGossip(t *testing.T) { passThroughHandler := []TaggedMessageValidatorHandler{ { Tag: protocol.TxnTag, - MessageHandler: struct { - ValidateHandleFunc - }{ - ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { - return OutgoingMessage{Action: Accept, Tag: msg.Tag} - }), - }, + MessageHandler: ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { + return OutgoingMessage{Action: Accept, Tag: msg.Tag} + }), }, } @@ -295,7 +287,7 @@ func TestP2PSubmitWS(t *testing.T) { // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other - testTag := protocol.AgreementVoteTag + testTag := protocol.NetPrioResponseTag // this tag does not have any special handling but also listed in defaultSendMessageTags var handlerCount atomic.Uint32 // Since we aren't using the transaction handler in this test, we need to register a pass-through handler @@ -312,7 +304,7 @@ func TestP2PSubmitWS(t *testing.T) { // send messages from B and confirm that they get received by C (via A) for i := 0; i < 10; i++ { - err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello %d", i)), false, nil) + err = netB.Broadcast(t.Context(), testTag, []byte(fmt.Sprintf("hello %d", i)), false, nil) require.NoError(t, err) } @@ -1317,34 +1309,46 @@ func TestP2PEnableGossipService_NodeDisable(t *testing.T) { netB.Start() defer netB.Stop() + require.Eventually( + t, + func() bool { + return len(netA.service.ListPeersForTopic(p2p.AVTopicName)) == 1 && + len(netB.service.ListPeersForTopic(p2p.AVTopicName)) == 1 + }, + 2*time.Second, + 50*time.Millisecond, + ) + require.Eventually(t, func() bool { return netA.hasPeers() && netB.hasPeers() }, 1*time.Second, 50*time.Millisecond) + time.Sleep(time.Second) // give time for peers to connect. + testTag := protocol.AgreementVoteTag var handlerCountA atomic.Uint32 - passThroughHandlerA := []TaggedMessageHandler{ - {Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { + passThroughHandlerA := []TaggedMessageValidatorHandler{ + {Tag: testTag, MessageHandler: ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { handlerCountA.Add(1) - return OutgoingMessage{Action: Broadcast} + return OutgoingMessage{Action: Accept} })}, } var handlerCountB atomic.Uint32 - passThroughHandlerB := []TaggedMessageHandler{ - {Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { + passThroughHandlerB := []TaggedMessageValidatorHandler{ + {Tag: testTag, MessageHandler: ValidateHandleFunc(func(msg IncomingMessage) OutgoingMessage { handlerCountB.Add(1) - return OutgoingMessage{Action: Broadcast} + return OutgoingMessage{Action: Accept} })}, } - netA.RegisterHandlers(passThroughHandlerA) - netB.RegisterHandlers(passThroughHandlerB) + netA.RegisterValidatorHandlers(passThroughHandlerA) + netB.RegisterValidatorHandlers(passThroughHandlerB) // send messages from both nodes to each other and confirm they are received. for i := 0; i < 10; i++ { - err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil) + err = netA.Broadcast(t.Context(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil) require.NoError(t, err) - err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil) + err = netB.Broadcast(t.Context(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil) require.NoError(t, err) } @@ -1438,7 +1442,8 @@ func TestP2PTxTopicValidator_NoWsPeer(t *testing.T) { defer net.Stop() peerID := peer.ID("12345678") // must be 8+ in size - msg := pubsub.Message{Message: &pb.Message{}, ID: string(peerID)} + topic := p2p.TXTopicName + msg := pubsub.Message{Message: &pb.Message{Topic: &topic}, ID: string(peerID)} validateIncomingTxMessage := func(rawmsg IncomingMessage) OutgoingMessage { require.NotEmpty(t, rawmsg.Sender) require.Implements(t, (*DisconnectableAddressablePeer)(nil), rawmsg.Sender) @@ -1619,6 +1624,8 @@ func TestP2PMetainfoV1vsV22(t *testing.T) { func TestP2PVoteCompression(t *testing.T) { partitiontest.PartitionTest(t) + t.Skip("Skipping: vote compression is not supported for pubsub transport") + type testDef struct { netAEnableCompression, netBEnableCompression bool }