Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 86 additions & 63 deletions supernode/services/cascade/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
loadSymbolsBatchSize = 2500
// Minimum first-pass coverage to store before returning from Register (percent)
storeSymbolsPercent = 18

storeBatchContextTimeout = 3 * time.Minute
)

// P2PService defines the interface for storing data in the P2P layer.
Expand Down Expand Up @@ -79,66 +81,31 @@ type StoreArtefactsMetrics struct {

func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) (StoreArtefactsMetrics, error) {
logtrace.Info(ctx, "About to store ID files", logtrace.Fields{"taskID": req.TaskID, "fileCount": len(req.IDFiles)})

metaRate, metaReqs, err := p.storeCascadeMetadata(ctx, req.IDFiles, req.TaskID)
if err != nil {
return StoreArtefactsMetrics{}, errors.Wrap(err, "failed to store ID files")
}
logtrace.Info(ctx, "id files have been stored", f)

// NOTE: For now we aggregate by item count (ID files + symbol count).
// TODO(move-to-request-weighted): Switch aggregation to request-weighted once
// external consumers and metrics expectations are updated. We already return
// totalRequests so the event/logs can include accurate request counts.
symRate, symCount, symReqs, err := p.storeCascadeSymbols(ctx, req.TaskID, req.ActionID, req.SymbolsDir)
symRate, symCount, symReqs, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles)
if err != nil {
return StoreArtefactsMetrics{}, errors.Wrap(err, "error storing raptor-q symbols")
}
logtrace.Info(ctx, "raptor-q symbols have been stored", f)

// Aggregate: weight by item counts (ID files + symbols) for now.
metaCount := len(req.IDFiles)
totalItems := metaCount + symCount
aggRate := 0.0
if totalItems > 0 {
aggRate = ((metaRate * float64(metaCount)) + (symRate * float64(symCount))) / float64(totalItems)
}
totalRequests := metaReqs + symReqs
return StoreArtefactsMetrics{
MetaRate: metaRate,
MetaRequests: metaReqs,
MetaCount: metaCount,
SymRate: symRate,
SymRequests: symReqs,
SymCount: symCount,
AggregatedRate: aggRate,
TotalRequests: totalRequests,
SymRate: symRate,
SymRequests: symReqs,
SymCount: symCount,
}, nil
}

// storeCascadeMetadata stores cascade metadata (ID files) via P2P.
// Returns (ratePct, requests, error) as reported by the P2P client.
func (p *p2pImpl) storeCascadeMetadata(ctx context.Context, metadataFiles [][]byte, taskID string) (float64, int, error) {
logtrace.Info(ctx, "Storing cascade metadata", logtrace.Fields{
"taskID": taskID,
"fileCount": len(metadataFiles),
})

rate, reqs, err := p.p2p.StoreBatch(ctx, metadataFiles, storage.P2PDataCascadeMetadata, taskID)
if err != nil {
return rate, reqs, err
}
return rate, reqs, nil
}

// storeCascadeSymbols loads symbols from `symbolsDir`, optionally downsamples,
// streams them in fixed-size batches to the P2P layer, and tracks:
// - an item-weighted aggregate success rate across all batches
// - the total number of symbols processed (item count)
// - the total number of node requests attempted across batches
//
// Returns (aggRate, totalSymbols, totalRequests, err).
func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID string, symbolsDir string) (float64, int, int, error) {
func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte) (float64, int, int, error) {
/* record directory in DB */
if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil {
return 0, 0, 0, fmt.Errorf("store symbol dir: %w", err)
Expand Down Expand Up @@ -174,48 +141,104 @@ func (p *p2pImpl) storeCascadeSymbols(ctx context.Context, taskID, actionID stri
logtrace.Info(ctx, "storing RaptorQ symbols", logtrace.Fields{"count": len(keys)})

/* stream in fixed-size batches -------------------------------------- */

sumWeightedRates := 0.0
totalSymbols := 0
totalSymbols := 0 // symbols only
totalItems := 0 // symbols + metadata (for rate weighting)
totalRequests := 0
firstBatchProcessed := false

for start := 0; start < len(keys); {
end := start + loadSymbolsBatchSize
if end > len(keys) {
end = len(keys)
}
end := min(start+loadSymbolsBatchSize, len(keys))
batch := keys[start:end]
rate, requests, count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch)
if err != nil {
return rate, totalSymbols, totalRequests, err

if !firstBatchProcessed && len(metadataFiles) > 0 {
// First "batch" has to include metadata + as many symbols as fit under batch size.
// If metadataFiles >= batch size, we send metadata in this batch and symbols start next batch.
roomForSymbols := loadSymbolsBatchSize - len(metadataFiles)
if roomForSymbols < 0 {
roomForSymbols = 0
}
if roomForSymbols < len(batch) {
// trim the first symbol chunk to leave space for metadata
batch = batch[:roomForSymbols]
end = start + roomForSymbols
}

// Load just this symbol chunk
symBytes, err := utils.LoadSymbols(symbolsDir, batch)
if err != nil {
return 0, totalSymbols, totalRequests, fmt.Errorf("load symbols: %w", err)
}

// Build combined payload: metadata first, then symbols
payload := make([][]byte, 0, len(metadataFiles)+len(symBytes))
payload = append(payload, metadataFiles...)
payload = append(payload, symBytes...)

// Send as the same data type you use for symbols
bctx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout)
rate, reqs, err := p.p2p.StoreBatch(bctx, payload, storage.P2PDataRaptorQSymbol, taskID)
cancel()
if err != nil {
agg := 0.0
if totalItems > 0 {
agg = sumWeightedRates / float64(totalItems)
}
return agg, totalSymbols, totalRequests + reqs, fmt.Errorf("p2p store batch (first): %w", err)
}

// Metrics
items := len(payload) // meta + symbols
sumWeightedRates += rate * float64(items)
totalItems += items
totalSymbols += len(symBytes)
totalRequests += reqs

// Delete only the symbols we uploaded
if len(batch) > 0 {
if err := utils.DeleteSymbols(ctx, symbolsDir, batch); err != nil {
return rate, totalSymbols, totalRequests, fmt.Errorf("delete symbols: %w", err)
}
}

firstBatchProcessed = true
} else {
rate, requests, count, err := p.storeSymbolsInP2P(ctx, taskID, symbolsDir, batch)
if err != nil {
agg := 0.0
if totalItems > 0 {
agg = sumWeightedRates / float64(totalItems)
}
return agg, totalSymbols, totalRequests, err
}
sumWeightedRates += rate * float64(count)
totalItems += count
totalSymbols += count
totalRequests += requests
}
sumWeightedRates += rate * float64(count)
totalSymbols += count
totalRequests += requests

start = end
}

// Coverage uses symbols only
achievedPct := 0.0
if totalAvailable > 0 {
achievedPct = (float64(totalSymbols) / float64(totalAvailable)) * 100.0
}
logtrace.Info(ctx, "first-pass achieved coverage (symbols)", logtrace.Fields{
"achieved_symbols": totalSymbols,
"achieved_percent": achievedPct,
"total_requests": totalRequests,
})
logtrace.Info(ctx, "first-pass achieved coverage (symbols)",
logtrace.Fields{"achieved_symbols": totalSymbols, "achieved_percent": achievedPct, "total_requests": totalRequests})

if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil {
return 0, totalSymbols, totalRequests, fmt.Errorf("update first-batch flag: %w", err)
}
logtrace.Info(ctx, "finished storing RaptorQ symbols", logtrace.Fields{
"curr-time": time.Now().UTC(),
"count": len(keys),
})

aggRate := 0.0
if totalSymbols > 0 {
aggRate = sumWeightedRates / float64(totalSymbols)
if totalItems > 0 {
aggRate = sumWeightedRates / float64(totalItems)
}
return aggRate, totalSymbols, totalRequests, nil

}

func walkSymbolTree(root string) ([]string, error) {
Expand Down Expand Up @@ -254,7 +277,7 @@ func (c *p2pImpl) storeSymbolsInP2P(ctx context.Context, taskID, root string, fi
return 0, 0, 0, fmt.Errorf("load symbols: %w", err)
}

symCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
symCtx, cancel := context.WithTimeout(ctx, storeBatchContextTimeout)
defer cancel()

rate, requests, err := c.p2p.StoreBatch(symCtx, symbols, storage.P2PDataRaptorQSymbol, taskID)
Expand Down