diff --git a/supernode/services/cascade/adaptors/p2p.go b/supernode/services/cascade/adaptors/p2p.go index bc5697e3..78e9b1bf 100644 --- a/supernode/services/cascade/adaptors/p2p.go +++ b/supernode/services/cascade/adaptors/p2p.go @@ -23,6 +23,8 @@ const ( loadSymbolsBatchSize = 2500 // Minimum first-pass coverage to store before returning from Register (percent) storeSymbolsPercent = 18 + + storeBatchContextTimeout = 3 * time.Minute ) // P2PService defines the interface for storing data in the P2P layer. @@ -79,58 +81,23 @@ type StoreArtefactsMetrics struct { func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) (StoreArtefactsMetrics, error) { logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": req.TaskID, "fileCount": len(req.IDFiles)}) - - metaRate, metaReqs, err := p.storeCascadeMetadata(ctx, req.IDFiles, req.TaskID) - if err != nil { - return StoreArtefactsMetrics{}, errors.Wrap(err, "failed to store ID files") - } - logtrace.Info(ctx, "id files have been stored", f) - // NOTE: For now we aggregate by item count (ID files + symbol count). // TODO(move-to-request-weighted): Switch aggregation to request-weighted once // external consumers and metrics expectations are updated. We already return // totalRequests so the event/logs can include accurate request counts. - symRate, symCount, symReqs, err := p.storeCascadeSymbols(ctx, req.TaskID, req.ActionID, req.SymbolsDir) + symRate, symCount, symReqs, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles) if err != nil { return StoreArtefactsMetrics{}, errors.Wrap(err, "error storing raptor-q symbols") } logtrace.Info(ctx, "raptor-q symbols have been stored", f) - // Aggregate: weight by item counts (ID files + symbols) for now. - metaCount := len(req.IDFiles) - totalItems := metaCount + symCount - aggRate := 0.0 - if totalItems > 0 { - aggRate = ((metaRate * float64(metaCount)) + (symRate * float64(symCount))) / float64(totalItems) - } - totalRequests := metaReqs + symReqs return StoreArtefactsMetrics{ - MetaRate: metaRate, - MetaRequests: metaReqs, - MetaCount: metaCount, - SymRate: symRate, - SymRequests: symReqs, - SymCount: symCount, - AggregatedRate: aggRate, - TotalRequests: totalRequests, + SymRate: symRate, + SymRequests: symReqs, + SymCount: symCount, }, nil } -// storeCascadeMetadata stores cascade metadata (ID files) via P2P. -// Returns (ratePct, requests, error) as reported by the P2P client. -func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]byte, taskID string) (float64, int, error) { - logtrace.Info(ctx, "Storing cascade metadata", logtrace.Fields{ - "taskID": taskID, - "fileCount": len(metadataFiles), - }) - - rate, reqs, err := p.p2p.StoreBatch(ctx, metadataFiles, storage.P2PDataCascadeMetadata, taskID) - if err != nil { - return rate, reqs, err - } - return rate, reqs, nil -} - // storeCascadeSymbols loads symbols from `symbolsDir`, optionally downsamples, // streams them in fixed-size batches to the P2P layer, and tracks: // - an item-weighted aggregate success rate across all batches @@ -138,7 +105,7 @@ func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]by // - the total number of node requests attempted across batches // // Returns (aggRate, totalSymbols, totalRequests, err). -func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) (float64, int, int, error) { +func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte) (float64, int, int, error) { /* record directory in DB */ if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil { return 0, 0, 0, fmt.Errorf("store symbol dir: %w", err) @@ -174,48 +141,104 @@ func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID stri logtrace.Info(ctx, "storing RaptorQ symbols", logtrace.Fields{"count": len(keys)}) /* stream in fixed-size batches -------------------------------------- */ + sumWeightedRates := 0.0 - totalSymbols := 0 + totalSymbols := 0 // symbols only + totalItems := 0 // symbols + metadata (for rate weighting) totalRequests := 0 + firstBatchProcessed := false + for start := 0; start < len(keys); { - end := start + loadSymbolsBatchSize - if end > len(keys) { - end = len(keys) - } + end := min(start+loadSymbolsBatchSize, len(keys)) batch := keys[start:end] - rate, requests, count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch) - if err != nil { - return rate, totalSymbols, totalRequests, err + + if !firstBatchProcessed && len(metadataFiles) > 0 { + // First "batch" has to include metadata + as many symbols as fit under batch size. + // If metadataFiles >= batch size, we send metadata in this batch and symbols start next batch. + roomForSymbols := loadSymbolsBatchSize - len(metadataFiles) + if roomForSymbols < 0 { + roomForSymbols = 0 + } + if roomForSymbols < len(batch) { + // trim the first symbol chunk to leave space for metadata + batch = batch[:roomForSymbols] + end = start + roomForSymbols + } + + // Load just this symbol chunk + symBytes, err := utils.LoadSymbols(symbolsDir, batch) + if err != nil { + return 0, totalSymbols, totalRequests, fmt.Errorf("load symbols: %w", err) + } + + // Build combined payload: metadata first, then symbols + payload := make([][]byte, 0, len(metadataFiles)+len(symBytes)) + payload = append(payload, metadataFiles...) + payload = append(payload, symBytes...) + + // Send as the same data type you use for symbols + bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout) + rate, reqs, err := p.p2p.StoreBatch(bctx, payload, storage.P2PDataRaptorQSymbol, taskID) + cancel() + if err != nil { + agg := 0.0 + if totalItems > 0 { + agg = sumWeightedRates / float64(totalItems) + } + return agg, totalSymbols, totalRequests + reqs, fmt.Errorf("p2p store batch (first): %w", err) + } + + // Metrics + items := len(payload) // meta + symbols + sumWeightedRates += rate * float64(items) + totalItems += items + totalSymbols += len(symBytes) + totalRequests += reqs + + // Delete only the symbols we uploaded + if len(batch) > 0 { + if err := utils.DeleteSymbols(ctx, symbolsDir, batch); err != nil { + return rate, totalSymbols, totalRequests, fmt.Errorf("delete symbols: %w", err) + } + } + + firstBatchProcessed = true + } else { + rate, requests, count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch) + if err != nil { + agg := 0.0 + if totalItems > 0 { + agg = sumWeightedRates / float64(totalItems) + } + return agg, totalSymbols, totalRequests, err + } + sumWeightedRates += rate * float64(count) + totalItems += count + totalSymbols += count + totalRequests += requests } - sumWeightedRates += rate * float64(count) - totalSymbols += count - totalRequests += requests + start = end } + // Coverage uses symbols only achievedPct := 0.0 if totalAvailable > 0 { achievedPct = (float64(totalSymbols) / float64(totalAvailable)) * 100.0 } - logtrace.Info(ctx, "first-pass achieved coverage (symbols)", logtrace.Fields{ - "achieved_symbols": totalSymbols, - "achieved_percent": achievedPct, - "total_requests": totalRequests, - }) + logtrace.Info(ctx, "first-pass achieved coverage (symbols)", + logtrace.Fields{"achieved_symbols": totalSymbols, "achieved_percent": achievedPct, "total_requests": totalRequests}) if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil { return 0, totalSymbols, totalRequests, fmt.Errorf("update first-batch flag: %w", err) } - logtrace.Info(ctx, "finished storing RaptorQ symbols", logtrace.Fields{ - "curr-time": time.Now().UTC(), - "count": len(keys), - }) aggRate := 0.0 - if totalSymbols > 0 { - aggRate = sumWeightedRates / float64(totalSymbols) + if totalItems > 0 { + aggRate = sumWeightedRates / float64(totalItems) } return aggRate, totalSymbols, totalRequests, nil + } func walkSymbolTree(root string) ([]string, error) { @@ -254,7 +277,7 @@ func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fi return 0, 0, 0, fmt.Errorf("load symbols: %w", err) } - symCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + symCtx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout) defer cancel() rate, requests, err := c.p2p.StoreBatch(symCtx, symbols, storage.P2PDataRaptorQSymbol, taskID)