Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@ type Client interface {
// - the base58 encoded identifier will be returned
Store(ctx context.Context, data []byte, typ int) (string, error)

// StoreBatch will store a batch of values with their Blake3 hash as the key
StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error
// StoreBatch will store a batch of values with their Blake3 hash as the key.
//
// Semantics:
// - Returns `successRatePct` as a percentage (0–100) computed as
// successful node RPCs divided by total node RPCs attempted during the
// network store phase for this batch.
// - Returns `requests` as the total number of node RPCs attempted for this
// batch (not the number of items in `values`).
// - On error, `successRatePct` and `requests` may reflect partial progress;
// prefer using them only when err == nil, or treat as best‑effort metrics.
StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) (float64, int, error)

// Delete a key, value
Delete(ctx context.Context, key string) error
Expand Down
35 changes: 25 additions & 10 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,31 +339,40 @@ func (s *DHT) Store(ctx context.Context, data []byte, typ int) (string, error) {
return retKey, nil
}

// StoreBatch will store a batch of values with their Blake3 hash as the key
func (s *DHT) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error {
// StoreBatch will store a batch of values with their Blake3 hash as the key.
//
// Returns:
// - successRatePct: percentage (0–100) of successful node RPCs during the
// network store phase for this batch.
// - requestCount: total number of node RPCs attempted (batch store calls) for
// this batch; this is not the number of values stored.
// - error: wrapped error if local DB store failed, or if the network store did
// not reach the configured minimum success rate.
func (s *DHT) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) (float64, int, error) {
logtrace.Info(ctx, "Store DB batch begin", logtrace.Fields{
logtrace.FieldModule: "dht",
logtrace.FieldTaskID: taskID,
"records": len(values),
})
if err := s.store.StoreBatch(ctx, values, typ, true); err != nil {
return fmt.Errorf("store batch: %v", err)
return 0, 0, fmt.Errorf("store batch: %v", err)
}
logtrace.Info(ctx, "Store DB batch done, store network batch begin", logtrace.Fields{
logtrace.FieldModule: "dht",
logtrace.FieldTaskID: taskID,
})

if err := s.IterateBatchStore(ctx, values, typ, taskID); err != nil {
return fmt.Errorf("iterate batch store: %v", err)
rate, requests, err := s.IterateBatchStore(ctx, values, typ, taskID)
if err != nil {
return rate, requests, fmt.Errorf("iterate batch store: %v", err)
}

logtrace.Info(ctx, "Store network batch workers done", logtrace.Fields{
logtrace.FieldModule: "dht",
logtrace.FieldTaskID: taskID,
})

return nil
return rate, requests, nil
}

// Retrieve data from the networking using key. Key is the base58 encoded
Expand Down Expand Up @@ -1568,7 +1577,13 @@ func (s *DHT) addKnownNodes(ctx context.Context, nodes []*Node, knownNodes map[s
}
}

func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) error {
// IterateBatchStore performs the network store and returns (successRatePct, requestCount, error).
//
// Request count is computed as the number of per-node batch store RPCs attempted
// during this run; success rate is successful responses divided by this count.
// If the success rate is below `minimumDataStoreSuccessRate`, an error is
// returned alongside the measured rate and request count.
func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) (float64, int, error) {
globalClosestContacts := make(map[string]*NodeList)
knownNodes := make(map[string]*Node)
// contacted := make(map[string]bool)
Expand Down Expand Up @@ -1646,18 +1661,18 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
"task_id": id,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
})
return nil
return successRate, requests, nil
} else {
logtrace.Info(ctx, "Failed to achieve desired success rate", logtrace.Fields{
logtrace.FieldModule: "dht",
"task_id": id,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
})
return fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate)
return successRate, requests, fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate)
}
}

return fmt.Errorf("no store operations were performed")
return 0, 0, fmt.Errorf("no store operations were performed")
}

func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[string]*Node, storageMap map[string][]int, typ int) chan *MessageWithError {
Expand Down
4 changes: 3 additions & 1 deletion p2p/kademlia/rq_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, keys []string)
return fmt.Errorf("load symbols: %w", err)
}

if err := s.StoreBatch(ctx, loaded, 1, dir); err != nil {
// Intentionally ignore (ratePct, requests) here; a non-nil error will already
// reflect whether the network store met the configured success threshold.
if _, _, err := s.StoreBatch(ctx, loaded, 1, dir); err != nil {
return fmt.Errorf("p2p store batch: %w", err)
}

Expand Down
28 changes: 23 additions & 5 deletions p2p/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,16 @@ func (s *p2p) Store(ctx context.Context, data []byte, typ int) (string, error) {
return s.dht.Store(ctx, data, typ)
}

// StoreBatch will store a batch of values with their Blake3 hash as the key
func (s *p2p) StoreBatch(ctx context.Context, data [][]byte, typ int, taskID string) error {
// StoreBatch will store a batch of values with their Blake3 hash as the key.
//
// It proxies to DHT.StoreBatch and returns:
// - successRatePct: percentage of successful node RPCs during the network store
// - requests: total node RPCs attempted for the batch
// - error: error if persistence or network store did not meet minimum success criteria
func (s *p2p) StoreBatch(ctx context.Context, data [][]byte, typ int, taskID string) (float64, int, error) {

if !s.running {
return errors.New("p2p service is not running")
return 0, 0, errors.New("p2p service is not running")
}

return s.dht.StoreBatch(ctx, data, typ, taskID)
Expand Down
12 changes: 7 additions & 5 deletions supernode/services/cascade/adaptors/mocks/p2p_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading