Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agreement/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ type Network interface {
// associated with the given MessageHandle.
Disconnect(MessageHandle)

// Ignore sends the Network a hint to ignore messages from the peer
// associated with the given MessageHandle.
Ignore(MessageHandle)

// Start notifies the network that the agreement service is ready
// to start receiving messages.
Start()
Expand Down
1 change: 1 addition & 0 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (a networkAction) do(ctx context.Context, s *Service) {
case disconnect:
s.Network.Disconnect(a.h)
case ignore:
s.Network.Ignore(a.h)
// pass
}
}
Expand Down
1 change: 1 addition & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
return
}
d.UpdateEventsQueue(eventQueueTokenizing[tag], 1)
// syncCh := net.SyncChannel(raw.MessageHandle)

o, err := tokenize(raw.Data)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,20 @@ func (n *NetworkFacade) RegisterHandlers(dispatch []network.TaggedMessageHandler
n.mux.RegisterHandlers(dispatch)
}

func (n *NetworkFacade) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
// n.mux.RegisterValidatorHandlers(dispatch)
}

// ClearHandlers
func (n *NetworkFacade) ClearHandlers() {
n.mux.ClearHandlers([]network.Tag{})
}

// ClearValidatorHandlers deregisters all the existing message handlers.
func (n *NetworkFacade) ClearValidatorHandlers() {
// n.mux.ClearValidatorHandlers([]network.Tag{})
}

// SetDownstreamFilter sets the downstream filter.
func (n *NetworkFacade) SetDownstreamFilter(f DownstreamFilter) {
n.downstreamMu.Lock()
Expand Down
80 changes: 76 additions & 4 deletions agreement/gossip/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const (
)

type messageMetadata struct {
raw network.IncomingMessage
raw network.IncomingMessage
syncCh chan network.ForwardingPolicy
}

// networkImpl wraps network.GossipNode to provide a compatible interface with agreement.
Expand Down Expand Up @@ -86,6 +87,13 @@ func (i *networkImpl) Start() {
{Tag: protocol.VoteBundleTag, MessageHandler: network.HandlerFunc(i.processBundleMessage)},
}
i.net.RegisterHandlers(handlers)

validateHandlers := []network.TaggedMessageValidatorHandler{
{Tag: protocol.AgreementVoteTag, MessageHandler: network.ValidateHandleFunc(i.processValidateVoteMessage)},
{Tag: protocol.ProposalPayloadTag, MessageHandler: network.ValidateHandleFunc(i.processValidateProposalMessage)},
{Tag: protocol.VoteBundleTag, MessageHandler: network.ValidateHandleFunc(i.processValidateBundleMessage)},
}
i.net.RegisterValidatorHandlers(validateHandlers)
}

func messageMetadataFromHandle(h agreement.MessageHandle) *messageMetadata {
Expand All @@ -99,17 +107,32 @@ func (i *networkImpl) processVoteMessage(raw network.IncomingMessage) network.Ou
return i.processMessage(raw, i.voteCh, agreementVoteMessageType)
}

func (i *networkImpl) processValidateVoteMessage(raw network.IncomingMessage) network.OutgoingMessage {
return i.processValidateMessage(raw, i.voteCh, agreementVoteMessageType)
}

func (i *networkImpl) processProposalMessage(raw network.IncomingMessage) network.OutgoingMessage {
if i.trace != nil {
i.trace.HashTrace(messagetracer.Proposal, raw.Data)
}
return i.processMessage(raw, i.proposalCh, agreementProposalMessageType)
}

func (i *networkImpl) processValidateProposalMessage(raw network.IncomingMessage) network.OutgoingMessage {
if i.trace != nil {
i.trace.HashTrace(messagetracer.Proposal, raw.Data)
}
return i.processValidateMessage(raw, i.proposalCh, agreementProposalMessageType)
}

func (i *networkImpl) processBundleMessage(raw network.IncomingMessage) network.OutgoingMessage {
return i.processMessage(raw, i.bundleCh, agreementBundleMessageType)
}

func (i *networkImpl) processValidateBundleMessage(raw network.IncomingMessage) network.OutgoingMessage {
return i.processValidateMessage(raw, i.bundleCh, agreementBundleMessageType)
}

// i.e. process<Type>Message
func (i *networkImpl) processMessage(raw network.IncomingMessage, submit chan<- agreement.Message, msgType string) network.OutgoingMessage {
metadata := &messageMetadata{raw: raw}
Expand All @@ -130,6 +153,32 @@ func (i *networkImpl) processMessage(raw network.IncomingMessage, submit chan<-
return network.OutgoingMessage{Action: network.Ignore}
}

// i.e. process<Type>Message
func (i *networkImpl) processValidateMessage(raw network.IncomingMessage, submit chan<- agreement.Message, msgType string) network.OutgoingMessage {
metadata := &messageMetadata{
raw: raw,
syncCh: make(chan network.ForwardingPolicy, 1),
}

var action network.ForwardingPolicy
select {
case submit <- agreement.Message{MessageHandle: agreement.MessageHandle(metadata), Data: raw.Data}:
action = <-metadata.syncCh
// It would be slightly better to measure at de-queue
// time, but that happens in many places in code and
// this is much easier.
messagesHandledTotal.Inc(nil)
messagesHandledByType.Add(msgType, 1)
default:
messagesDroppedTotal.Inc(nil)
messagesDroppedByType.Add(msgType, 1)
action = network.Ignore
}

// Immediately ignore everything here, sometimes Relay/Broadcast/Disconnect later based on API handles saved from IncomingMessage
return network.OutgoingMessage{Action: action}
}

func (i *networkImpl) Messages(t protocol.Tag) <-chan agreement.Message {
switch t {
case protocol.AgreementVoteTag:
Expand Down Expand Up @@ -160,9 +209,14 @@ func (i *networkImpl) Relay(h agreement.MessageHandle, t protocol.Tag, data []by
i.log.Infof("agreement: could not (pseudo)relay message with tag %v: %v", t, err)
}
} else {
err = i.net.Relay(context.Background(), t, data, false, metadata.raw.Sender)
if err != nil {
i.log.Infof("agreement: could not relay message from %v with tag %v: %v", metadata.raw.Sender, t, err)
if metadata.syncCh != nil {
// Synchronous validation path
metadata.syncCh <- network.Accept
} else {
err = i.net.Relay(context.Background(), t, data, false, metadata.raw.Sender)
if err != nil {
i.log.Infof("agreement: could not relay message from %v with tag %v: %v", metadata.raw.Sender, t, err)
}
}
}
return
Expand All @@ -176,5 +230,23 @@ func (i *networkImpl) Disconnect(h agreement.MessageHandle) {
return
}

if metadata.syncCh != nil {
// Synchronous validation path
metadata.syncCh <- network.Disconnect
return
}
i.net.Disconnect(metadata.raw.Sender)
}

func (i *networkImpl) Ignore(h agreement.MessageHandle) {
metadata := messageMetadataFromHandle(h)

if metadata == nil { // synthentic loopback
return
}

if metadata.syncCh != nil {
// Synchronous validation path
metadata.syncCh <- network.Ignore
}
}
9 changes: 9 additions & 0 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,20 @@ func (w *whiteholeNetwork) RegisterHandlers(dispatch []network.TaggedMessageHand
w.mux.RegisterHandlers(dispatch)
}

func (w *whiteholeNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
w.mux.RegisterValidatorHandlers(dispatch)
}

// ClearHandlers deregisters all the existing message handlers.
func (w *whiteholeNetwork) ClearHandlers() {
w.mux.ClearHandlers([]network.Tag{})
}

// ClearValidatorHandlers deregisters all the existing message handlers.
func (w *whiteholeNetwork) ClearValidatorHandlers() {
w.mux.ClearValidatorHandlers([]network.Tag{})
}

func (w *whiteholeNetwork) Address() (string, bool) {
return "", false
}
Expand Down
2 changes: 1 addition & 1 deletion agreement/proposalManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (m *proposalManager) handleNewPeriod(r routerHandle, p player, e thresholdE
// proposal-vote. A voteFiltered event is returned if the proposal-vote is
// not fresh or is a duplicate. Otherwise, an empty event is returned.
//
// - A voteVerified event is delievered after verification was attempted on a
// - A voteVerified event is delivered after verification was attempted on a
// proposal-vote. A voteMalformed event is returned if the proposal-vote is
// ill-formed and resulted from a corrupt process. A voteFiltered event is
// emitted if the vote is not fresh or is a duplicate. Otherwise the
Expand Down
3 changes: 3 additions & 0 deletions agreement/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,9 @@ func (e *testingNetworkEndpoint) Disconnect(h MessageHandle) {
e.parent.disconnect(e.id, sourceID)
}

func (e *testingNetworkEndpoint) Ignore(h MessageHandle) {
}

func (e *testingNetworkEndpoint) Start() {}

type activityMonitor struct {
Expand Down
23 changes: 19 additions & 4 deletions network/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,28 @@ import (
"github.com/stretchr/testify/require"
)

type mockSub struct {
closed *atomic.Bool
}

func (m *mockSub) Next(ctx context.Context) (*pubsub.Message, error) {
if m.closed.Load() {
return nil, pubsub.ErrTopicClosed
}
return nil, nil
}

func (m *mockSub) Cancel() {}

// mockP2PService implements p2p.Service and counts DialPeersUntilTargetCount invocations.
// It relies on p2p's meshThreadInner's defer of DialPeersUntilTargetCount to detect invocation.
type mockP2PService struct{ dialCount atomic.Int32 }
type mockP2PService struct {
dialCount atomic.Int32
closed atomic.Bool
}

func (m *mockP2PService) Start() error { return nil }
func (m *mockP2PService) Close() error { return nil }
func (m *mockP2PService) Close() error { m.closed.Store(true); return nil }
func (m *mockP2PService) ID() peer.ID { return "" }
func (m *mockP2PService) IDSigner() *p2piface.PeerIDChallengeSigner { return nil }
func (m *mockP2PService) AddrInfo() peer.AddrInfo { return peer.AddrInfo{} }
Expand All @@ -51,7 +67,7 @@ func (m *mockP2PService) ClosePeer(peer.ID) error { return nil
func (m *mockP2PService) Conns() []network.Conn { return nil }
func (m *mockP2PService) ListPeersForTopic(string) []peer.ID { return nil }
func (m *mockP2PService) Subscribe(string, pubsub.ValidatorEx) (p2piface.SubNextCancellable, error) {
return nil, nil
return &mockSub{&m.closed}, nil
}
func (m *mockP2PService) Publish(context.Context, string, []byte) error { return nil }
func (m *mockP2PService) GetHTTPClient(*peer.AddrInfo, limitcaller.ConnectionTimeStore, time.Duration) (*http.Client, error) {
Expand Down Expand Up @@ -79,7 +95,6 @@ func TestMesh_HybridRelayP2PInnerCall(t *testing.T) {

mockSvc := &mockP2PService{}
net.p2pNetwork.service = mockSvc
net.p2pNetwork.relayMessages = false // prevent pubsub startup

err = net.Start()
require.NoError(t, err)
Expand Down
24 changes: 24 additions & 0 deletions network/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {
networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
case p2p.AVTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.AgreementVoteTag), uint64(len(rpc.Publish[i].Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.AgreementVoteTag), 1)
case p2p.PPTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.ProposalPayloadTag), uint64(len(rpc.Publish[i].Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.ProposalPayloadTag), 1)
case p2p.VBTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.VoteBundleTag), uint64(len(rpc.Publish[i].Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.VoteBundleTag), 1)
}
}
}
Expand All @@ -211,6 +223,18 @@ func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1)
case p2p.AVTopicName:
networkP2PSentBytesByTag.Add(string(protocol.AgreementVoteTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.AgreementVoteTag), 1)
case p2p.PPTopicName:
networkP2PSentBytesByTag.Add(string(protocol.ProposalPayloadTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.ProposalPayloadTag), 1)
case p2p.VBTopicName:
networkP2PSentBytesByTag.Add(string(protocol.VoteBundleTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.VoteBundleTag), 1)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions network/msgCompressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestVoteStatefulCompressionAbortMessage(t *testing.T) {
factory voteNetFactory
}{
{"Websocket", makeWebsocketVoteNets},
{"P2P", makeP2PVoteNets},
// {"P2P", makeP2PVoteNets}, // TODO: enable when p2p pubsub transport supports vote compression
}

for _, f := range factories {
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestVoteStatefulVoteCompression(t *testing.T) {
factory voteNetFactory
}{
{"Websocket", makeWebsocketVoteNets},
{"P2P", makeP2PVoteNets},
// {"P2P", makeP2PVoteNets}, // TODO: enable when p2p pubsub transport supports vote compression
}

for _, f := range factories {
Expand Down
Loading
Loading