Skip to content

Commit

Permalink
add support GetMultipleKeys
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Feb 10, 2025
1 parent e22c269 commit 128c47f
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 24 deletions.
4 changes: 4 additions & 0 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type ChaincodeSupport struct {
UserRunsCC bool
UseWriteBatch bool
MaxSizeWriteBatch uint32
UseGetMultipleKeys bool
MaxSizeGetMultipleKeys uint32
}

// Launch starts executing chaincode if it is not already running. This method
Expand Down Expand Up @@ -130,6 +132,8 @@ func (cs *ChaincodeSupport) HandleChaincodeStream(stream ccintf.ChaincodeStream)
TotalQueryLimit: cs.TotalQueryLimit,
UseWriteBatch: cs.UseWriteBatch,
MaxSizeWriteBatch: cs.MaxSizeWriteBatch,
UseGetMultipleKeys: cs.UseGetMultipleKeys,
MaxSizeGetMultipleKeys: cs.MaxSizeGetMultipleKeys,
}

return handler.ProcessStream(stream)
Expand Down
40 changes: 25 additions & 15 deletions core/chaincode/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@ import (
)

const (
defaultExecutionTimeout = 30 * time.Second
minimumStartupTimeout = 5 * time.Second
defaultMaxSizeWriteBatch = 1000
defaultExecutionTimeout = 30 * time.Second
minimumStartupTimeout = 5 * time.Second
defaultMaxSizeWriteBatch = 1000
defaultMaxSizeGetMultipleKeys = 1000
)

type Config struct {
TotalQueryLimit int
TLSEnabled bool
Keepalive time.Duration
ExecuteTimeout time.Duration
InstallTimeout time.Duration
StartupTimeout time.Duration
LogFormat string
LogLevel string
ShimLogLevel string
SCCAllowlist map[string]bool
UseWriteBatch bool
MaxSizeWriteBatch uint32
TotalQueryLimit int
TLSEnabled bool
Keepalive time.Duration
ExecuteTimeout time.Duration
InstallTimeout time.Duration
StartupTimeout time.Duration
LogFormat string
LogLevel string
ShimLogLevel string
SCCAllowlist map[string]bool
UseWriteBatch bool
MaxSizeWriteBatch uint32
UseGetMultipleKeys bool
MaxSizeGetMultipleKeys uint32
}

func GlobalConfig() *Config {
Expand Down Expand Up @@ -81,6 +84,13 @@ func (c *Config) load() {
if c.MaxSizeWriteBatch <= 0 {
c.MaxSizeWriteBatch = defaultMaxSizeWriteBatch
}
if viper.IsSet("chaincode.runtimeParams.useGetMultipleKeys") {
c.UseGetMultipleKeys = viper.GetBool("chaincode.runtimeParams.useGetMultipleKeys")
}
c.MaxSizeGetMultipleKeys = viper.GetUint32("chaincode.runtimeParams.maxSizeGetMultipleKeys")
if c.MaxSizeGetMultipleKeys <= 0 {
c.MaxSizeGetMultipleKeys = defaultMaxSizeGetMultipleKeys
}
}

func parseBool(s string) bool {
Expand Down
52 changes: 50 additions & 2 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type Handler struct {
UseWriteBatch bool
// MaxSizeWriteBatch maximum batch size for the change segment
MaxSizeWriteBatch uint32
// UseGetMultipleKeys an indication that the peer can handle get multiple keys
UseGetMultipleKeys bool
// MaxSizeGetMultipleKeys maximum size of batches with get multiple keys
MaxSizeGetMultipleKeys uint32

// stateLock is used to read and set State.
stateLock sync.RWMutex
Expand Down Expand Up @@ -221,6 +225,8 @@ func (h *Handler) handleMessageReadyState(msg *pb.ChaincodeMessage) error {
go h.HandleTransaction(msg, h.HandlePurgePrivateData)
case pb.ChaincodeMessage_WRITE_BATCH_STATE:
go h.HandleTransaction(msg, h.HandleWriteBatch)
case pb.ChaincodeMessage_GET_STATE_MULTIPLE:
go h.HandleTransaction(msg, h.HandleGetStateMultipleKeys)
default:
return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in ready state", msg.Txid, msg.Type)
}
Expand Down Expand Up @@ -449,8 +455,10 @@ func (h *Handler) sendReady() error {
chaincodeLogger.Debugf("sending READY for chaincode %s", h.chaincodeID)

chaincodeAdditionalParams := &pb.ChaincodeAdditionalParams{
UseWriteBatch: h.UseWriteBatch,
MaxSizeWriteBatch: h.MaxSizeWriteBatch,
UseWriteBatch: h.UseWriteBatch,
MaxSizeWriteBatch: h.MaxSizeWriteBatch,
UseGetMultipleKeys: h.UseGetMultipleKeys,
MaxSizeGetMultipleKeys: h.MaxSizeGetMultipleKeys,
}
payloadBytes, err := proto.Marshal(chaincodeAdditionalParams)
if err != nil {
Expand Down Expand Up @@ -678,6 +686,46 @@ func (h *Handler) HandleGetState(msg *pb.ChaincodeMessage, txContext *Transactio
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

// Handles query to ledger to get state
func (h *Handler) HandleGetStateMultipleKeys(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
getState := &pb.GetStateMultiple{}
err := proto.Unmarshal(msg.Payload, getState)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

var res [][]byte
namespaceID := txContext.NamespaceID
collection := getState.GetCollection()
chaincodeLogger.Debugf("[%s] getting state for chaincode %s, keys %v, channel %s", shorttxid(msg.Txid), namespaceID, getState.GetKeys(), txContext.ChannelID)

if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
res, err = txContext.TXSimulator.GetPrivateDataMultipleKeys(namespaceID, collection, getState.GetKeys())
} else {
res, err = txContext.TXSimulator.GetStateMultipleKeys(namespaceID, getState.GetKeys())
}
if err != nil {
return nil, errors.WithStack(err)
}
if len(res) == 0 {
chaincodeLogger.Debugf("[%s] No state associated with keys: %v. Sending %s with an empty payload", shorttxid(msg.Txid), getState.GetKeys(), pb.ChaincodeMessage_RESPONSE)
}

payloadBytes, err := proto.Marshal(&pb.GetStateMultipleResult{Values: res})
if err != nil {
return nil, errors.Wrap(err, "marshal failed")
}

// Send response msg back to chaincode. GetState will not trigger event
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

func (h *Handler) HandleGetPrivateDataHash(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
getState := &pb.GetState{}
err := proto.Unmarshal(msg.Payload, getState)
Expand Down
16 changes: 10 additions & 6 deletions core/chaincode/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ var _ = Describe("Handler", func() {
UUIDGenerator: chaincode.UUIDGeneratorFunc(func() string {
return "generated-query-id"
}),
AppConfig: fakeApplicationConfigRetriever,
Metrics: chaincodeMetrics,
UseWriteBatch: true,
MaxSizeWriteBatch: 1000,
AppConfig: fakeApplicationConfigRetriever,
Metrics: chaincodeMetrics,
UseWriteBatch: true,
MaxSizeWriteBatch: 1000,
UseGetMultipleKeys: true,
MaxSizeGetMultipleKeys: 1000,
}
chaincode.SetHandlerChatStream(handler, fakeChatStream)
chaincode.SetHandlerChaincodeID(handler, "test-handler-name:1.0")
Expand Down Expand Up @@ -2786,8 +2788,10 @@ var _ = Describe("Handler", func() {
}))

chaincodeAdditionalParams := &pb.ChaincodeAdditionalParams{
UseWriteBatch: true,
MaxSizeWriteBatch: 1000,
UseWriteBatch: true,
MaxSizeWriteBatch: 1000,
UseGetMultipleKeys: true,
MaxSizeGetMultipleKeys: 1000,
}
payloadBytes, err := proto.Marshal(chaincodeAdditionalParams)
Expect(err).NotTo(HaveOccurred())
Expand Down
47 changes: 46 additions & 1 deletion integration/chaincode/multi/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,19 @@ func (t *Operations) Invoke(stub shim.ChaincodeStubInterface) *pb.Response {
return shim.Error("Incorrect number of arguments. Expecting 1")
}
return t.putPrivateKey(stub, args[1])
case "get-multiple-keys":
if len(args) != 1 {
return shim.Error("Incorrect number of arguments. Expecting 1")
}
return t.getMultiple(stub, args[0])
default:
// error
fmt.Println("invoke did not find func: " + function)
return shim.Error("Received unknown function invocation")
}
}

// both params should be marshalled json data and base64 encoded
// put to state keys from "key0" to "key(cntCall-1)"
func (t *Operations) put(stub shim.ChaincodeStubInterface, numberCallsPut string) *pb.Response {
cntCall, _ := strconv.Atoi(numberCallsPut)

Expand Down Expand Up @@ -97,3 +102,43 @@ func (t *Operations) putPrivateKey(stub shim.ChaincodeStubInterface, numberCalls
}
return shim.Success(nil)
}

// getMultiple - get multiple states
func (t *Operations) getMultiple(stub shim.ChaincodeStubInterface, countKeys string) *pb.Response {
num, _ := strconv.Atoi(countKeys)

keys := make([]string, 0, num)

keys = append(keys, "non-exist-key")
for i := range num {
key := "key" + strconv.Itoa(i)
keys = append(keys, key)
}

resps, err := stub.GetMultipleStates(keys...)
if err != nil {
return shim.Error(err.Error())
}

if len(resps) != num+1 {
return shim.Error("number of results is not correct")
}

// non exist key return nil
if resps[0] != nil {
errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[0]))
return shim.Error(errStr)
}

if string(resps[1]) != "key"+strconv.Itoa(0) {
errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[1]))
return shim.Error(errStr)
}

if string(resps[num]) != "key"+strconv.Itoa(num-1) {
errStr := fmt.Sprintf("incorrect result %d elem, got %v", 0, string(resps[num]))
return shim.Error(errStr)
}

return shim.Success(nil)
}
89 changes: 89 additions & 0 deletions integration/e2e/write_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,85 @@ var _ = Describe("Network", func() {
RunInvoke(network, orderer, peer, "put-private-key", true, 3, 1, []string{"collection testchannel/mycc/col", "could not be found"})
})
})

DescribeTableSubtree("benchmark get multiple keys", func(desc string, useGetMultipleKeys bool) {
var network *nwo.Network
var ordererRunner *ginkgomon.Runner
var ordererProcess, peerProcess ifrit.Process

BeforeEach(func() {
network = nwo.New(nwo.BasicEtcdRaft(), tempDir, client, StartPort(), components)
network.UseGetMultipleKeys = useGetMultipleKeys

// Generate config and bootstrap the network
network.GenerateConfigTree()
network.Bootstrap()

// Start all the fabric processes
ordererRunner, ordererProcess, peerProcess = network.StartSingleOrdererNetwork("orderer")
})

AfterEach(func() {
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}

if peerProcess != nil {
peerProcess.Signal(syscall.SIGTERM)
Eventually(peerProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}

network.Cleanup()
})

It("deploys and executes experiment bench", func() {
orderer := network.Orderer("orderer")
channelparticipation.JoinOrdererJoinPeersAppChannel(network, "testchannel", orderer, ordererRunner)
peer := network.Peer("Org1", "peer0")

chaincode := nwo.Chaincode{
Name: "mycc",
Version: "0.0",
Path: "github.com/hyperledger/fabric/integration/chaincode/multi/cmd",
Lang: "golang",
PackageFile: filepath.Join(tempDir, "multi.tar.gz"),
Ctor: `{"Args":["init"]}`,
SignaturePolicy: `AND ('Org1MSP.member','Org2MSP.member')`,
Sequence: "1",
InitRequired: true,
Label: "my_multi_operations_chaincode",
}

network.VerifyMembership(network.PeersWithChannel("testchannel"), "testchannel")

nwo.EnableCapabilities(
network,
"testchannel",
"Application", "V2_0",
orderer,
network.PeersWithChannel("testchannel")...,
)
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)

RunInvoke(network, orderer, peer, "invoke", true, 10000, 0, nil)

By("run query get state multiple keys")
experiment := gmeasure.NewExperiment("Get state multiple keys " + desc)
AddReportEntry(experiment.Name, experiment)

experiment.SampleDuration("invoke N-10 cycle-1000", func(idx int) {
RunGetStateMultipleKeys(network, peer, 1000)
}, gmeasure.SamplingConfig{N: 10})

experiment.SampleDuration("invoke N-10 cycle-10000", func(idx int) {
RunGetStateMultipleKeys(network, peer, 10000)
}, gmeasure.SamplingConfig{N: 10})
})
},
Entry("without peer support", "without peer support", false),
Entry("with peer support", "with peer support", true),
)
})

func RunInvoke(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, fn string, startWriteBatch bool, numberCallsPut int, exitCode int, expectedError []string) {
Expand Down Expand Up @@ -237,3 +316,13 @@ func RunGetState(n *nwo.Network, peer *nwo.Peer, keyUniq string) {
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say("key" + keyUniq))
}

func RunGetStateMultipleKeys(n *nwo.Network, peer *nwo.Peer, countKeys int) {
sess, err := n.PeerUserSession(peer, "User1", commands.ChaincodeQuery{
ChannelID: "testchannel",
Name: "mycc",
Ctor: `{"Args":["get-multiple-keys","` + fmt.Sprint(countKeys) + `"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
}
2 changes: 2 additions & 0 deletions integration/nwo/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type Network struct {
OrdererReplicationPolicy string
PeerDeliveryClientPolicy string
UseWriteBatch bool
UseGetMultipleKeys bool

PortsByOrdererID map[string]Ports
PortsByPeerID map[string]Ports
Expand Down Expand Up @@ -194,6 +195,7 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int,
PortsByPeerID: map[string]Ports{},
PeerDeliveryClientPolicy: "",
UseWriteBatch: true,
UseGetMultipleKeys: true,

Organizations: c.Organizations,
Consensus: c.Consensus,
Expand Down
2 changes: 2 additions & 0 deletions integration/nwo/template/core_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ chaincode:
runtimeParams:
useWriteBatch: {{ .UseWriteBatch }}
maxSizeWriteBatch: 1000
useGetMultipleKeys: {{ .UseGetMultipleKeys }}
maxSizeGetMultipleKeys: 1000
logging:
level: info
shim: warning
Expand Down
2 changes: 2 additions & 0 deletions internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ func serve(args []string) error {
UserRunsCC: userRunsCC,
UseWriteBatch: chaincodeConfig.UseWriteBatch,
MaxSizeWriteBatch: chaincodeConfig.MaxSizeWriteBatch,
UseGetMultipleKeys: chaincodeConfig.UseGetMultipleKeys,
MaxSizeGetMultipleKeys: chaincodeConfig.MaxSizeGetMultipleKeys,
}

custodianLauncher := custodianLauncherAdapter{
Expand Down
4 changes: 4 additions & 0 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,10 @@ chaincode:
useWriteBatch: true
# MaxSizeWriteBatch maximum batch size for the change segment
maxSizeWriteBatch: 1000
# UseGetMultipleKeys an indication that the peer can handle get multiple keys
useGetMultipleKeys: true
# MaxSizeGetMultipleKeys maximum size of batches with get multiple keys
maxSizeGetMultipleKeys: 1000

###############################################################################
#
Expand Down

0 comments on commit 128c47f

Please sign in to comment.