diff --git a/p2p/client.go b/p2p/client.go index 50dbd12c..6eb169b2 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -22,8 +22,17 @@ type Client interface { // - the base58 encoded identifier will be returned Store(ctx context.Context, data []byte, typ int) (string, error) - // StoreBatch will store a batch of values with their Blake3 hash as the key - StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error + // StoreBatch will store a batch of values with their Blake3 hash as the key. + // + // Semantics: + // - Returns `successRatePct` as a percentage (0–100) computed as + // successful node RPCs divided by total node RPCs attempted during the + // network store phase for this batch. + // - Returns `requests` as the total number of node RPCs attempted for this + // batch (not the number of items in `values`). + // - On error, `successRatePct` and `requests` may reflect partial progress; + // prefer using them only when err == nil, or treat as best‑effort metrics. + StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) (float64, int, error) // Delete a key, value Delete(ctx context.Context, key string) error diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index a232beae..455da275 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -339,23 +339,32 @@ func (s *DHT) Store(ctx context.Context, data []byte, typ int) (string, error) { return retKey, nil } -// StoreBatch will store a batch of values with their Blake3 hash as the key -func (s *DHT) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error { +// StoreBatch will store a batch of values with their Blake3 hash as the key. +// +// Returns: +// - successRatePct: percentage (0–100) of successful node RPCs during the +// network store phase for this batch. +// - requestCount: total number of node RPCs attempted (batch store calls) for +// this batch; this is not the number of values stored. +// - error: wrapped error if local DB store failed, or if the network store did +// not reach the configured minimum success rate. +func (s *DHT) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) (float64, int, error) { logtrace.Info(ctx, "Store DB batch begin", logtrace.Fields{ logtrace.FieldModule: "dht", logtrace.FieldTaskID: taskID, "records": len(values), }) if err := s.store.StoreBatch(ctx, values, typ, true); err != nil { - return fmt.Errorf("store batch: %v", err) + return 0, 0, fmt.Errorf("store batch: %v", err) } logtrace.Info(ctx, "Store DB batch done, store network batch begin", logtrace.Fields{ logtrace.FieldModule: "dht", logtrace.FieldTaskID: taskID, }) - if err := s.IterateBatchStore(ctx, values, typ, taskID); err != nil { - return fmt.Errorf("iterate batch store: %v", err) + rate, requests, err := s.IterateBatchStore(ctx, values, typ, taskID) + if err != nil { + return rate, requests, fmt.Errorf("iterate batch store: %v", err) } logtrace.Info(ctx, "Store network batch workers done", logtrace.Fields{ @@ -363,7 +372,7 @@ func (s *DHT) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID s logtrace.FieldTaskID: taskID, }) - return nil + return rate, requests, nil } // Retrieve data from the networking using key. Key is the base58 encoded @@ -1568,7 +1577,13 @@ func (s *DHT) addKnownNodes(ctx context.Context, nodes []*Node, knownNodes map[s } } -func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) error { +// IterateBatchStore performs the network store and returns (successRatePct, requestCount, error). +// +// Request count is computed as the number of per-node batch store RPCs attempted +// during this run; success rate is successful responses divided by this count. +// If the success rate is below `minimumDataStoreSuccessRate`, an error is +// returned alongside the measured rate and request count. +func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) (float64, int, error) { globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) // contacted := make(map[string]bool) @@ -1646,18 +1661,18 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i "task_id": id, "success_rate": fmt.Sprintf("%.2f%%", successRate), }) - return nil + return successRate, requests, nil } else { logtrace.Info(ctx, "Failed to achieve desired success rate", logtrace.Fields{ logtrace.FieldModule: "dht", "task_id": id, "success_rate": fmt.Sprintf("%.2f%%", successRate), }) - return fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate) + return successRate, requests, fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate) } } - return fmt.Errorf("no store operations were performed") + return 0, 0, fmt.Errorf("no store operations were performed") } func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[string]*Node, storageMap map[string][]int, typ int) chan *MessageWithError { diff --git a/p2p/kademlia/rq_symbols.go b/p2p/kademlia/rq_symbols.go index fbf6563d..0b530f98 100644 --- a/p2p/kademlia/rq_symbols.go +++ b/p2p/kademlia/rq_symbols.go @@ -96,7 +96,9 @@ func (s *DHT) storeSymbolsInP2P(ctx context.Context, dir string, keys []string) return fmt.Errorf("load symbols: %w", err) } - if err := s.StoreBatch(ctx, loaded, 1, dir); err != nil { + // Intentionally ignore (ratePct, requests) here; a non-nil error will already + // reflect whether the network store met the configured success threshold. + if _, _, err := s.StoreBatch(ctx, loaded, 1, dir); err != nil { return fmt.Errorf("p2p store batch: %w", err) } diff --git a/p2p/mocks/Client.go b/p2p/mocks/Client.go index b79f6a60..6d092c92 100644 --- a/p2p/mocks/Client.go +++ b/p2p/mocks/Client.go @@ -245,17 +245,35 @@ func (_m *Client) Store(ctx context.Context, data []byte, typ int) (string, erro } // StoreBatch provides a mock function with given fields: ctx, values, typ, taskID -func (_m *Client) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) error { +func (_m *Client) StoreBatch(ctx context.Context, values [][]byte, typ int, taskID string) (float64, int, error) { ret := _m.Called(ctx, values, typ, taskID) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, [][]byte, int, string) error); ok { + var r0 float64 + if rf, ok := ret.Get(0).(func(context.Context, [][]byte, int, string) float64); ok { r0 = rf(ctx, values, typ, taskID) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(float64) + } } - return r0 + var r1 int + if rf, ok := ret.Get(1).(func(context.Context, [][]byte, int, string) int); ok { + r1 = rf(ctx, values, typ, taskID) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(int) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, [][]byte, int, string) error); ok { + r2 = rf(ctx, values, typ, taskID) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 } type mockConstructorTestingTNewClient interface { diff --git a/p2p/p2p.go b/p2p/p2p.go index cb48bf99..43f8bf22 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -117,11 +117,16 @@ func (s *p2p) Store(ctx context.Context, data []byte, typ int) (string, error) { return s.dht.Store(ctx, data, typ) } -// StoreBatch will store a batch of values with their Blake3 hash as the key -func (s *p2p) StoreBatch(ctx context.Context, data [][]byte, typ int, taskID string) error { +// StoreBatch will store a batch of values with their Blake3 hash as the key. +// +// It proxies to DHT.StoreBatch and returns: +// - successRatePct: percentage of successful node RPCs during the network store +// - requests: total node RPCs attempted for the batch +// - error: error if persistence or network store did not meet minimum success criteria +func (s *p2p) StoreBatch(ctx context.Context, data [][]byte, typ int, taskID string) (float64, int, error) { if !s.running { - return errors.New("p2p service is not running") + return 0, 0, errors.New("p2p service is not running") } return s.dht.StoreBatch(ctx, data, typ, taskID) diff --git a/supernode/services/cascade/adaptors/mocks/p2p_mock.go b/supernode/services/cascade/adaptors/mocks/p2p_mock.go index 4f62a440..ad6c297e 100644 --- a/supernode/services/cascade/adaptors/mocks/p2p_mock.go +++ b/supernode/services/cascade/adaptors/mocks/p2p_mock.go @@ -43,11 +43,13 @@ func (m *MockP2PService) EXPECT() *MockP2PServiceMockRecorder { } // StoreArtefacts mocks base method. -func (m *MockP2PService) StoreArtefacts(ctx context.Context, req adaptors.StoreArtefactsRequest, f logtrace.Fields) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StoreArtefacts", ctx, req, f) - ret0, _ := ret[0].(error) - return ret0 +func (m *MockP2PService) StoreArtefacts(ctx context.Context, req adaptors.StoreArtefactsRequest, f logtrace.Fields) (float64, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StoreArtefacts", ctx, req, f) + ret0, _ := ret[0].(float64) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // StoreArtefacts indicates an expected call of StoreArtefacts. diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go index 4e85958b..9d1e2779 100644 --- a/supernode/services/cascade/adaptors/p2p.go +++ b/supernode/services/cascade/adaptors/p2p.go @@ -28,7 +28,15 @@ const ( // //go:generate mockgen -destination=mocks/p2p_mock.go -package=cascadeadaptormocks -source=p2p.go type P2PService interface { - StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error + // StoreArtefacts stores ID files and RaptorQ symbols, returning an aggregated + // network success rate percentage across all store batches. + // + // Aggregation model: + // - Each underlying StoreBatch returns (ratePct, requests) where requests is + // the number of node RPCs. The aggregated rate is computed as a weighted + // average by requests across metadata and symbol batches, which yields the + // global success rate across all node calls attempted for this action. + StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) (float64, int, error) } // p2pImpl is the default implementation of the P2PService interface. @@ -49,41 +57,67 @@ type StoreArtefactsRequest struct { SymbolsDir string } -func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error { - logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": req.TaskID, "fileCount": len(req.IDFiles)}) - - if err := p.storeCascadeMetadata(ctx, req.IDFiles, req.TaskID); err != nil { - return errors.Wrap(err, "failed to store ID files") - } - logtrace.Info(ctx, "id files have been stored", f) - - if err := p.storeCascadeSymbols(ctx, req.TaskID, req.ActionID, req.SymbolsDir); err != nil { - return errors.Wrap(err, "error storing raptor-q symbols") - } - logtrace.Info(ctx, "raptor-q symbols have been stored", f) - - return nil +func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) (float64, int, error) { + logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": req.TaskID, "fileCount": len(req.IDFiles)}) + + metaRate, metaReqs, err := p.storeCascadeMetadata(ctx, req.IDFiles, req.TaskID) + if err != nil { + return 0, 0, errors.Wrap(err, "failed to store ID files") + } + logtrace.Info(ctx, "id files have been stored", f) + + // NOTE: For now we aggregate by item count (ID files + symbol count). + // TODO(move-to-request-weighted): Switch aggregation to request-weighted once + // external consumers and metrics expectations are updated. We already return + // totalRequests so the event/logs can include accurate request counts. + symRate, symCount, symReqs, err := p.storeCascadeSymbols(ctx, req.TaskID, req.ActionID, req.SymbolsDir) + if err != nil { + return 0, 0, errors.Wrap(err, "error storing raptor-q symbols") + } + logtrace.Info(ctx, "raptor-q symbols have been stored", f) + + // Aggregate: weight by item counts (ID files + symbols) for now. + totalItems := len(req.IDFiles) + symCount + aggRate := 0.0 + if totalItems > 0 { + aggRate = ((metaRate * float64(len(req.IDFiles))) + (symRate * float64(symCount))) / float64(totalItems) + } + totalRequests := metaReqs + symReqs + return aggRate, totalRequests, nil } -func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]byte, taskID string) error { +// storeCascadeMetadata stores cascade metadata (ID files) via P2P. +// Returns (ratePct, requests, error) as reported by the P2P client. +func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]byte, taskID string) (float64, int, error) { logtrace.Info(ctx, "Storing cascade metadata", logtrace.Fields{ "taskID": taskID, "fileCount": len(metadataFiles), }) - return p.p2p.StoreBatch(ctx, metadataFiles, storage.P2PDataCascadeMetadata, taskID) + rate, reqs, err := p.p2p.StoreBatch(ctx, metadataFiles, storage.P2PDataCascadeMetadata, taskID) + if err != nil { + return rate, reqs, err + } + return rate, reqs, nil } -func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) error { +// storeCascadeSymbols loads symbols from `symbolsDir`, optionally downsamples, +// streams them in fixed-size batches to the P2P layer, and tracks: +// - an item-weighted aggregate success rate across all batches +// - the total number of symbols processed (item count) +// - the total number of node requests attempted across batches +// +// Returns (aggRate, totalSymbols, totalRequests, err). +func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) (float64, int, int, error) { /* record directory in DB */ if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil { - return fmt.Errorf("store symbol dir: %w", err) + return 0, 0, 0, fmt.Errorf("store symbol dir: %w", err) } /* gather every symbol path under symbolsDir ------------------------- */ keys, err := walkSymbolTree(symbolsDir) if err != nil { - return err + return 0, 0, 0, err } /* down-sample if we exceed the “big directory” threshold ------------- */ @@ -98,27 +132,39 @@ func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID stri logtrace.Info(ctx, "storing RaptorQ symbols", logtrace.Fields{"count": len(keys)}) - /* stream in fixed-size batches -------------------------------------- */ - for start := 0; start < len(keys); { - end := start + loadSymbolsBatchSize - if end > len(keys) { - end = len(keys) - } - if err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, keys[start:end]); err != nil { - return err - } - start = end - } - - if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil { - return fmt.Errorf("update first-batch flag: %w", err) - } - logtrace.Info(ctx, "finished storing RaptorQ symbols", logtrace.Fields{ - "curr-time": time.Now().UTC(), - "count": len(keys), - }) - - return nil + /* stream in fixed-size batches -------------------------------------- */ + sumWeightedRates := 0.0 + totalSymbols := 0 + totalRequests := 0 + for start := 0; start < len(keys); { + end := start + loadSymbolsBatchSize + if end > len(keys) { + end = len(keys) + } + batch := keys[start:end] + rate, requests, count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch) + if err != nil { + return rate, totalSymbols, totalRequests, err + } + sumWeightedRates += rate * float64(count) + totalSymbols += count + totalRequests += requests + start = end + } + + if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil { + return 0, totalSymbols, totalRequests, fmt.Errorf("update first-batch flag: %w", err) + } + logtrace.Info(ctx, "finished storing RaptorQ symbols", logtrace.Fields{ + "curr-time": time.Now().UTC(), + "count": len(keys), + }) + + aggRate := 0.0 + if totalSymbols > 0 { + aggRate = sumWeightedRates / float64(totalSymbols) + } + return aggRate, totalSymbols, totalRequests, nil } func walkSymbolTree(root string) ([]string, error) { @@ -147,26 +193,29 @@ func walkSymbolTree(root string) ([]string, error) { return keys, nil } -func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) error { +// storeSymbolsInP2P loads a batch of symbols and stores them via P2P. +// Returns (ratePct, requests, count, error) where `count` is the number of symbols in this batch. +func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fileKeys []string) (float64, int, int, error) { logtrace.Info(ctx, "loading batch symbols", logtrace.Fields{"count": len(fileKeys)}) - symbols, err := utils.LoadSymbols(root, fileKeys) - if err != nil { - return fmt.Errorf("load symbols: %w", err) - } + symbols, err := utils.LoadSymbols(root, fileKeys) + if err != nil { + return 0, 0, 0, fmt.Errorf("load symbols: %w", err) + } symCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - if err := c.p2p.StoreBatch(symCtx, symbols, storage.P2PDataRaptorQSymbol, taskID); err != nil { - return fmt.Errorf("p2p store batch: %w", err) - } - logtrace.Info(ctx, "stored batch symbols", logtrace.Fields{"count": len(symbols)}) + rate, requests, err := c.p2p.StoreBatch(symCtx, symbols, storage.P2PDataRaptorQSymbol, taskID) + if err != nil { + return rate, requests, len(symbols), fmt.Errorf("p2p store batch: %w", err) + } + logtrace.Info(ctx, "stored batch symbols", logtrace.Fields{"count": len(symbols)}) - if err := utils.DeleteSymbols(ctx, root, fileKeys); err != nil { - return fmt.Errorf("delete symbols: %w", err) - } - logtrace.Info(ctx, "deleted batch symbols", logtrace.Fields{"count": len(symbols)}) + if err := utils.DeleteSymbols(ctx, root, fileKeys); err != nil { + return rate, requests, len(symbols), fmt.Errorf("delete symbols: %w", err) + } + logtrace.Info(ctx, "deleted batch symbols", logtrace.Fields{"count": len(symbols)}) - return nil + return rate, requests, len(symbols), nil } diff --git a/supernode/services/cascade/helper.go b/supernode/services/cascade/helper.go index 0c4ef7d3..42ef0055 100644 --- a/supernode/services/cascade/helper.go +++ b/supernode/services/cascade/helper.go @@ -169,13 +169,21 @@ func (task *CascadeRegistrationTask) generateRQIDFiles(ctx context.Context, meta }, nil } -func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionID string, idFiles [][]byte, symbolsDir string, f logtrace.Fields) error { - return task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{ - IDFiles: idFiles, - SymbolsDir: symbolsDir, - TaskID: task.ID(), - ActionID: actionID, - }, f) +// storeArtefacts persists cascade artefacts (ID files + RaptorQ symbols) via the +// P2P adaptor and returns an aggregated network success rate percentage and total +// node requests used to compute it. +// +// Aggregation details: +// - Underlying batches return (ratePct, requests) where `requests` is the number +// of node RPCs attempted. The adaptor computes a weighted average by requests +// across all batches, reflecting the overall network success rate. +func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionID string, idFiles [][]byte, symbolsDir string, f logtrace.Fields) (float64, int, error) { + return task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{ + IDFiles: idFiles, + SymbolsDir: symbolsDir, + TaskID: task.ID(), + ActionID: actionID, + }, f) } func (task *CascadeRegistrationTask) wrapErr(ctx context.Context, msg string, err error, f logtrace.Fields) error { diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index b3b30346..4b5d62a2 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -1,8 +1,10 @@ package cascade import ( - "context" - "os" + "context" + "fmt" + "math" + "os" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/supernode/services/common" @@ -154,12 +156,29 @@ func (task *CascadeRegistrationTask) Register( // Transmit as a standard event so SDK can propagate it (dedicated type) task.streamEvent(SupernodeEventTypeFinalizeSimulated, "finalize action simulation passed", "", send) - /* 11. Persist artefacts ------------------------------------------------------- */ - if err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { + /* 11. Persist artefacts -------------------------------------------------------- */ + // Persist artefacts to the P2P network. The returned `successRate` is a + // request-weighted percentage (0–100) computed across all underlying P2P + // store batches for this action. Each batch contributes its success rate + // weighted by the number of node RPCs attempted, so the aggregate reflects + // overall network behavior rather than item counts. + successRate, totalRequests, err := task.storeArtefacts(ctx, action.ActionID, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields) + if err != nil { return err } + // Attach the success rate to structured fields for observability. This value + // is best-effort and non-fatal so long as it meets the configured minimum in + // lower layers; failures below threshold would already propagate an error. + fields["success_rate"] = successRate + fields["requests"] = totalRequests logtrace.Info(ctx, "artefacts have been stored", fields) - task.streamEvent(SupernodeEventTypeArtefactsStored, "artefacts have been stored", "", send) + // Emit compact, rich metrics in the event message for external visibility. + // ok and fail are derived counts based on the measured rate and requests. + // TODO(move-to-request-weighted): Once aggregation switches to request-weighted, + // these derived counts will align exactly with the per-request success rate. + ok := int(math.Round(successRate / 100.0 * float64(totalRequests))) + fail := totalRequests - ok + task.streamEvent(SupernodeEventTypeArtefactsStored, fmt.Sprintf("artefacts stored | rate=%.2f%% req=%d ok=%d fail=%d", successRate, totalRequests, ok, fail), "", send) resp, err := task.LumeraClient.FinalizeAction(ctx, action.ActionID, rqidResp.RQIDs) if err != nil { diff --git a/supernode/services/cascade/register_test.go b/supernode/services/cascade/register_test.go index 109941c4..0bebd232 100644 --- a/supernode/services/cascade/register_test.go +++ b/supernode/services/cascade/register_test.go @@ -104,10 +104,10 @@ func TestCascadeRegistrationTask_Register(t *testing.T) { Metadata: codecpkg.Layout{Blocks: []codecpkg.Block{{BlockID: 1, Hash: "abc"}}}, }, nil) - // 8. Store artefacts + // 8. Store artefacts (returns success rate, requests) p2p.EXPECT(). StoreArtefacts(gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil) + Return(95.0, 120, nil) }, expectedError: "", expectedEvents: 12, diff --git a/supernode/services/common/storage/handler.go b/supernode/services/common/storage/handler.go index 2f7a6a28..ae80615b 100644 --- a/supernode/services/common/storage/handler.go +++ b/supernode/services/common/storage/handler.go @@ -65,7 +65,11 @@ func (h *StorageHandler) StoreBytesIntoP2P(ctx context.Context, data []byte, typ return h.P2PClient.Store(ctx, data, typ) } -// StoreBatch stores into P2P array of bytes arrays +// StoreBatch stores into P2P an array of byte slices. +// +// Note: The underlying P2P client returns (successRatePct, requests, err). +// This handler intentionally ignores the metrics and only propagates error, +// as callers of this common storage path historically consumed only errors. func (h *StorageHandler) StoreBatch(ctx context.Context, list [][]byte, typ int) error { val := ctx.Value(logtrace.CorrelationIDKey) taskID := "" @@ -75,7 +79,8 @@ func (h *StorageHandler) StoreBatch(ctx context.Context, list [][]byte, typ int) logtrace.Info(ctx, "task_id in storeList", logtrace.Fields{logtrace.FieldTaskID: taskID}) - return h.P2PClient.StoreBatch(ctx, list, typ, taskID) + _, _, err := h.P2PClient.StoreBatch(ctx, list, typ, taskID) + return err } // StoreRaptorQSymbolsIntoP2P stores RaptorQ symbols into P2P @@ -83,6 +88,9 @@ func (h *StorageHandler) StoreBatch(ctx context.Context, list [][]byte, typ int) // under the specified directory. If the number of keys exceeds a certain threshold, // it randomly samples a percentage of them. Finally, it streams the symbols in // fixed-size batches to the P2P network. +// +// Note: P2P client returns (ratePct, requests, err) for each batch; we ignore +// the metrics here and only validate error semantics. func (h *StorageHandler) StoreRaptorQSymbolsIntoP2P(ctx context.Context, taskID, symbolsDir string) error { /* record directory in DB */ if err := h.store.StoreSymbolDirectory(taskID, symbolsDir); err != nil { @@ -162,7 +170,7 @@ func (h *StorageHandler) storeSymbolsInP2P(ctx context.Context, taskID, root str return fmt.Errorf("load symbols: %w", err) } - if err := h.P2PClient.StoreBatch(ctx, symbols, P2PDataRaptorQSymbol, taskID); err != nil { + if _, _, err := h.P2PClient.StoreBatch(ctx, symbols, P2PDataRaptorQSymbol, taskID); err != nil { return fmt.Errorf("p2p store batch: %w", err) } diff --git a/supernode/services/common/storage/handler_test.go b/supernode/services/common/storage/handler_test.go index d418d713..e1be29f1 100644 --- a/supernode/services/common/storage/handler_test.go +++ b/supernode/services/common/storage/handler_test.go @@ -49,7 +49,7 @@ func TestStoreBatch(t *testing.T) { ctx := context.WithValue(context.Background(), "task_id", "123") list := [][]byte{[]byte("a"), []byte("b")} - p2pClient.On("StoreBatch", mock.Anything, list, 3, "").Return(nil) + p2pClient.On("StoreBatch", mock.Anything, list, 3, "").Return(0.0, 0, nil) err := handler.StoreBatch(ctx, list, 3) assert.NoError(t, err) diff --git a/tests/integration/p2p/p2p_integration_test.go b/tests/integration/p2p/p2p_integration_test.go index c4b8e1bd..a689b75d 100644 --- a/tests/integration/p2p/p2p_integration_test.go +++ b/tests/integration/p2p/p2p_integration_test.go @@ -108,7 +108,7 @@ func TestP2PBasicIntegration(t *testing.T) { // Add debug logging log.Printf("Storing batch with keys: %v", expectedKeys) - err := services[0].StoreBatch(ctx, batchData, 0, taskID) + _, _, err := services[0].StoreBatch(ctx, batchData, 0, taskID) require.NoError(t, err) // Add immediate verification