Skip to content

Commit

Permalink
test "verifies private data is pulled when joining a new peer with ne…
Browse files Browse the repository at this point in the history
…w certs" is very unstable. I change log level for test in integration pvtdata.

Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Oct 15, 2024
1 parent c188707 commit 548af5f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 6 deletions.
1 change: 1 addition & 0 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
conn := c.connStore.onConnected(stream, connInfo, c.metrics)

h := func(m *protoext.SignedGossipMessage) {
c.logger.Debug("Got message1:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
SignedGossipMessage: m,
Expand Down
4 changes: 3 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,10 @@ func (gc *gossipChannel) hasLeftChannel() bool {
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
var members []discovery.NetworkMember
if gc.hasLeftChannel() {
gc.logger.Debug("PFI13")
return members
}

gc.logger.Debug("PFI15")
for _, member := range gc.GetMembership() {
if !gc.EligibleForChannel(member) {
continue
Expand All @@ -382,6 +383,7 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
}
member.Properties = stateInf.GetStateInfo().Properties
member.Envelope = stateInf.Envelope
gc.logger.Debug("PFI14 ", member.PKIid, member.Endpoint, member.InternalEndpoint)
members = append(members, member)
}
return members
Expand Down
3 changes: 3 additions & 0 deletions gossip/gossip/chanstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,15 @@ func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, channelID co
}
cs.Lock()
defer cs.Unlock()
cs.g.logger.Debug("PFI10")
if gc, exists := cs.channels[string(channelID)]; !exists {
cs.g.logger.Debug("PFI11")
pkiID := cs.g.comm.GetPKIid()
ga := &gossipAdapterImpl{Node: cs.g, Discovery: cs.g.disc}
gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, channelID, ga, joinMsg, metrics, nil)
cs.channels[string(channelID)] = gc
} else {
cs.g.logger.Debug("PFI12")
gc.ConfigureChannel(joinMsg)
}
}
Expand Down
1 change: 1 addition & 0 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (d *distributorImpl) createPrivateDataMessage(txID, namespace string,
collection *rwset.CollectionPvtReadWriteSet,
ccp *peer.CollectionConfigPackage,
blkHt uint64) (*protoext.SignedGossipMessage, error) {
d.logger.Debug("PFI03")
msg := &protosgossip.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
Expand Down
7 changes: 4 additions & 3 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (p *puller) listen() {

func (p *puller) handleRequest(message protoext.ReceivedMessage) {
p.logger.Debug("Got", message.GetGossipMessage(), "from", message.GetConnectionInfo().Endpoint)
p.logger.Debug("PFI01")
message.Respond(&protosgossip.GossipMessage{
Channel: []byte(p.channel),
Tag: protosgossip.GossipMessage_CHAN_ONLY,
Expand Down Expand Up @@ -250,6 +251,7 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
p.logger.Debug("Total members in channel:", members)
members = filter.AnyMatch(members, allFilters...)
p.logger.Debug("Total members that fit some digest:", members)
p.logger.Debug("PFI04")
if len(members) == 0 {
p.logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")
return nil, errors.New("Empty membership")
Expand All @@ -262,6 +264,7 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
// We expect all private RWSets represented as digests to be collected
itemsLeftToCollect := len(dig2Filter)
// As long as we still have some data to collect and new members to ask the data for:
p.logger.Debug("PFI05")
for itemsLeftToCollect > 0 && len(members) > 0 {
purgedPvt := p.getPurgedCollections(members, dig2Filter)
// Need to remove purged digest from mapping
Expand All @@ -277,20 +280,17 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
delete(dig2Filter, dig)
itemsLeftToCollect--
}

if itemsLeftToCollect == 0 {
p.logger.Debug("No items left to collect")
return res, nil
}

peer2digests, members = p.assignDigestsToPeers(members, dig2Filter)
if len(peer2digests) == 0 {
p.logger.Warningf("No available peers for digests request, "+
"cannot pull missing private data for following digests [%+v], peer membership: [%+v]",
dig2Filter.digests(), members)
return res, nil
}

p.logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)")
subscriptions := p.scatterRequests(peer2digests)
responses := p.gatherResponses(subscriptions)
Expand Down Expand Up @@ -345,6 +345,7 @@ func (p *puller) gatherResponses(subscriptions []util.Subscription) []*protosgos
func (p *puller) scatterRequests(peersDigestMapping peer2Digests) []util.Subscription {
var subscriptions []util.Subscription
for peer, digests := range peersDigestMapping {
p.logger.Debug("PFI02")
msg := &protosgossip.GossipMessage{
Tag: protosgossip.GossipMessage_CHAN_ONLY,
Channel: []byte(p.channel),
Expand Down
5 changes: 3 additions & 2 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ var _ = Describe("PrivateData", func() {

testPeers := []*nwo.Peer{org1peer0, org2peer0, org3peer0}
for _, peer := range testPeers {
pr := network.PeerRunner(peer)
pr := network.PeerRunner(peer, "FABRIC_LOGGING_SPEC=debug")
p := ifrit.Invoke(pr)
peerProcesses[peer.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
Expand Down Expand Up @@ -264,7 +264,7 @@ var _ = Describe("PrivateData", func() {
updateConfigWithNewCertsForPeer(network, tempCryptoDir, orderer, org2Peer1)

By("starting the peer1.org2 process")
pr := network.PeerRunner(org2Peer1)
pr := network.PeerRunner(org2Peer1, "FABRIC_LOGGING_SPEC=debug")
p := ifrit.Invoke(pr)
peerProcesses[org2Peer1.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
Expand Down Expand Up @@ -387,6 +387,7 @@ var _ = Describe("PrivateData", func() {
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(`{"docType":"marblePrivateDetails","name":"marble1","price":99}`))
Expect(sess).To(gbytes.Say(`PFIPFI`))
})
})

Expand Down

0 comments on commit 548af5f

Please sign in to comment.