Skip to content

Commit

Permalink
test "verifies private data is pulled when joining a new peer with ne…
Browse files Browse the repository at this point in the history
…w certs" is very unstable. I change log level for test in integration pvtdata.

Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Oct 15, 2024
1 parent c188707 commit 3272952
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 46 deletions.
1 change: 1 addition & 0 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
conn := c.connStore.onConnected(stream, connInfo, c.metrics)

h := func(m *protoext.SignedGossipMessage) {
c.logger.Debug("Got message1:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
SignedGossipMessage: m,
Expand Down
72 changes: 40 additions & 32 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,44 +373,47 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(msg protoext.ReceivedMessage) {
return
}

if memResp := m.GetMemRes(); memResp != nil {
d.pubsub.Publish(fmt.Sprintf("%d", m.Nonce), m.Nonce)
for _, env := range memResp.Alive {
am, err := protoext.EnvelopeToGossipMessage(env)
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an online peer:%+v", errors.WithStack(err))
return
}
if !protoext.IsAliveMsg(am.GossipMessage) {
d.logger.Warning("Expected alive message, got", am, "instead")
return
}
memResp := m.GetMemRes()
if memResp == nil {
return
}

if d.msgStore.CheckValid(am) && d.crypt.ValidateAliveMsg(am) {
d.handleAliveMessage(am)
}
d.pubsub.Publish(fmt.Sprintf("%d", m.Nonce), m.Nonce)
for _, env := range memResp.Alive {
am, err := protoext.EnvelopeToGossipMessage(env)
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an online peer:%+v", errors.WithStack(err))
return
}
if !protoext.IsAliveMsg(am.GossipMessage) {
d.logger.Warning("Expected alive message, got", am, "instead")
return
}

for _, env := range memResp.Dead {
dm, err := protoext.EnvelopeToGossipMessage(env)
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an offline peer %+v", errors.WithStack(err))
return
}
if d.msgStore.CheckValid(am) && d.crypt.ValidateAliveMsg(am) {
d.handleAliveMessage(am)
}
}

// Newer alive message exists or the message isn't authentic
if !d.msgStore.CheckValid(dm) || !d.crypt.ValidateAliveMsg(dm) {
continue
}
for _, env := range memResp.Dead {
dm, err := protoext.EnvelopeToGossipMessage(env)
if err != nil {
d.logger.Warningf("Membership response contains an invalid message from an offline peer %+v", errors.WithStack(err))
return
}

newDeadMembers := []*protoext.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
newDeadMembers = append(newDeadMembers, dm)
}
d.lock.RUnlock()
d.learnNewMembers([]*protoext.SignedGossipMessage{}, newDeadMembers)
// Newer alive message exists or the message isn't authentic
if !d.msgStore.CheckValid(dm) || !d.crypt.ValidateAliveMsg(dm) {
continue
}

newDeadMembers := []*protoext.SignedGossipMessage{}
d.lock.RLock()
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
newDeadMembers = append(newDeadMembers, dm)
}
d.lock.RUnlock()
d.learnNewMembers([]*protoext.SignedGossipMessage{}, newDeadMembers)
}
}

Expand All @@ -430,6 +433,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *gossip.Member, inter
aliveMsg = d.selfAliveMessage
d.lock.RUnlock()
if aliveMsg == nil {
d.logger.Debug("PFI22")
aliveMsg, err = d.createSignedAliveMessage(true)
if err != nil {
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
Expand Down Expand Up @@ -663,6 +667,7 @@ func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember, inclu
}

func (d *gossipDiscoveryImpl) createMembershipRequest(includeInternalEndpoint bool) (*gossip.GossipMessage, error) {
d.logger.Debug("PFI21")
am, err := d.createSignedAliveMessage(includeInternalEndpoint)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -762,6 +767,7 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
for !d.toDie() {
d.logger.Debug("Sleeping", d.aliveTimeInterval)
time.Sleep(d.aliveTimeInterval)
d.logger.Debug("PFI20")
if d.aliveMembership.Size() == 0 {
d.logger.Debugf("Empty membership, no one to send a heartbeat to")
continue
Expand Down Expand Up @@ -949,7 +955,9 @@ func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
defer d.lock.RUnlock()

response := []NetworkMember{}
d.logger.Debug("PFI24")
for _, m := range d.aliveMembership.ToSlice() {
d.logger.Debug("PFI23")
member := m.GetAliveMsg()
response = append(response, NetworkMember{
PKIid: member.Membership.PkiId,
Expand Down
5 changes: 4 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
}

var members []discovery.NetworkMember
mf.logger.Debug("PFI25")
for _, mem := range mf.adapter.GetMembership() {
if mf.eligibleForChannelAndSameOrg(mem) {
members = append(members, mem)
Expand Down Expand Up @@ -365,9 +366,10 @@ func (gc *gossipChannel) hasLeftChannel() bool {
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
var members []discovery.NetworkMember
if gc.hasLeftChannel() {
gc.logger.Debug("PFI13")
return members
}

gc.logger.Debug("PFI15")
for _, member := range gc.GetMembership() {
if !gc.EligibleForChannel(member) {
continue
Expand All @@ -382,6 +384,7 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
}
member.Properties = stateInf.GetStateInfo().Properties
member.Envelope = stateInf.Envelope
gc.logger.Debug("PFI14 ", member.PKIid, member.Endpoint, member.InternalEndpoint)
members = append(members, member)
}
return members
Expand Down
3 changes: 3 additions & 0 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,15 @@ func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, channelID co
}
cs.Lock()
defer cs.Unlock()
cs.g.logger.Debug("PFI10")
if gc, exists := cs.channels[string(channelID)]; !exists {
cs.g.logger.Debug("PFI11")
pkiID := cs.g.comm.GetPKIid()
ga := &gossipAdapterImpl{Node: cs.g, Discovery: cs.g.disc}
gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, channelID, ga, joinMsg, metrics, nil)
cs.channels[string(channelID)] = gc
} else {
cs.g.logger.Debug("PFI12")
gc.ConfigureChannel(joinMsg)
}
}
Expand Down
1 change: 1 addition & 0 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (d *distributorImpl) createPrivateDataMessage(txID, namespace string,
collection *rwset.CollectionPvtReadWriteSet,
ccp *peer.CollectionConfigPackage,
blkHt uint64) (*protoext.SignedGossipMessage, error) {
d.logger.Debug("PFI03")
msg := &protosgossip.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
Expand Down
7 changes: 4 additions & 3 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (p *puller) listen() {

func (p *puller) handleRequest(message protoext.ReceivedMessage) {
p.logger.Debug("Got", message.GetGossipMessage(), "from", message.GetConnectionInfo().Endpoint)
p.logger.Debug("PFI01")
message.Respond(&protosgossip.GossipMessage{
Channel: []byte(p.channel),
Tag: protosgossip.GossipMessage_CHAN_ONLY,
Expand Down Expand Up @@ -250,6 +251,7 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
p.logger.Debug("Total members in channel:", members)
members = filter.AnyMatch(members, allFilters...)
p.logger.Debug("Total members that fit some digest:", members)
p.logger.Debug("PFI04")
if len(members) == 0 {
p.logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")
return nil, errors.New("Empty membership")
Expand All @@ -262,6 +264,7 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
// We expect all private RWSets represented as digests to be collected
itemsLeftToCollect := len(dig2Filter)
// As long as we still have some data to collect and new members to ask the data for:
p.logger.Debug("PFI05")
for itemsLeftToCollect > 0 && len(members) > 0 {
purgedPvt := p.getPurgedCollections(members, dig2Filter)
// Need to remove purged digest from mapping
Expand All @@ -277,20 +280,17 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
delete(dig2Filter, dig)
itemsLeftToCollect--
}

if itemsLeftToCollect == 0 {
p.logger.Debug("No items left to collect")
return res, nil
}

peer2digests, members = p.assignDigestsToPeers(members, dig2Filter)
if len(peer2digests) == 0 {
p.logger.Warningf("No available peers for digests request, "+
"cannot pull missing private data for following digests [%+v], peer membership: [%+v]",
dig2Filter.digests(), members)
return res, nil
}

p.logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)")
subscriptions := p.scatterRequests(peer2digests)
responses := p.gatherResponses(subscriptions)
Expand Down Expand Up @@ -345,6 +345,7 @@ func (p *puller) gatherResponses(subscriptions []util.Subscription) []*protosgos
func (p *puller) scatterRequests(peersDigestMapping peer2Digests) []util.Subscription {
var subscriptions []util.Subscription
for peer, digests := range peersDigestMapping {
p.logger.Debug("PFI02")
msg := &protosgossip.GossipMessage{
Tag: protosgossip.GossipMessage_CHAN_ONLY,
Channel: []byte(p.channel),
Expand Down
28 changes: 18 additions & 10 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var _ = Describe("PrivateData", func() {

testPeers := []*nwo.Peer{org1peer0, org2peer0, org3peer0}
for _, peer := range testPeers {
pr := network.PeerRunner(peer)
pr := network.PeerRunner(peer, "FABRIC_LOGGING_SPEC=debug")
p := ifrit.Invoke(pr)
peerProcesses[peer.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
Expand Down Expand Up @@ -264,7 +264,7 @@ var _ = Describe("PrivateData", func() {
updateConfigWithNewCertsForPeer(network, tempCryptoDir, orderer, org2Peer1)

By("starting the peer1.org2 process")
pr := network.PeerRunner(org2Peer1)
pr := network.PeerRunner(org2Peer1, "FABRIC_LOGGING_SPEC=debug")
p := ifrit.Invoke(pr)
peerProcesses[org2Peer1.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
Expand Down Expand Up @@ -370,14 +370,21 @@ var _ = Describe("PrivateData", func() {
}

By("verifying peer1.org2 got the private data that was created historically")
sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeQuery{
ChannelID: channelID,
Name: "marblesp",
Ctor: `{"Args":["readMarble","marble1"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(`{"docType":"marble","name":"marble1","color":"blue","size":35,"owner":"tom"}`))
Eventually(func() bool {
sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeQuery{
ChannelID: channelID,
Name: "marblesp",
Ctor: `{"Args":["readMarble","marble1"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit())
if sess.ExitCode() != 0 {
return false
}
Expect(sess).To(gbytes.Say(`{"docType":"marble","name":"marble1","color":"blue","size":35,"owner":"tom"}`))

return true
}, network.EventuallyTimeout).Should(BeTrue())

sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeQuery{
ChannelID: channelID,
Expand All @@ -387,6 +394,7 @@ var _ = Describe("PrivateData", func() {
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(`{"docType":"marblePrivateDetails","name":"marble1","price":99}`))
Expect(sess).To(gbytes.Say(`PFIPFI`))
})
})

Expand Down

0 comments on commit 3272952

Please sign in to comment.