Skip to content

Commit b377c70

Browse files
Codeq settings
1 parent cbdcdf9 commit b377c70

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

pkg/codec/decode.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,28 @@ func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeRespons
3939
defer processor.Free()
4040

4141
symbolsDir := filepath.Join(rq.symbolsBaseDir, req.ActionID)
42+
// Ensure a clean scratch directory (avoid contamination from previous attempts)
43+
if err := os.RemoveAll(symbolsDir); err != nil {
44+
fields[logtrace.FieldError] = err.Error()
45+
return DecodeResponse{}, fmt.Errorf("cleanup decode dir %s: %w", symbolsDir, err)
46+
}
4247
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
4348
fields[logtrace.FieldError] = err.Error()
4449
return DecodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err)
4550
}
4651

52+
// Validate layout before writing any symbols
53+
if len(req.Layout.Blocks) == 0 {
54+
fields[logtrace.FieldError] = "empty layout"
55+
return DecodeResponse{}, fmt.Errorf("invalid layout: no blocks present")
56+
}
57+
for _, blk := range req.Layout.Blocks {
58+
if len(blk.Symbols) == 0 {
59+
fields[logtrace.FieldError] = fmt.Sprintf("block_%d has no symbols", blk.BlockID)
60+
return DecodeResponse{}, fmt.Errorf("invalid layout: block %d has no symbols", blk.BlockID)
61+
}
62+
}
63+
4764
// Build symbol->block mapping from layout and ensure block directories exist
4865
symbolToBlock := make(map[string]int)
4966
for _, blk := range req.Layout.Blocks {

pkg/codec/raptorq.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ const (
1717
// Limit RaptorQ processor memory usage to ~2 GiB
1818
rqMaxMemoryMB uint64 = 2 * 1024 // MB
1919
// Concurrency tuned for 2 GiB limit and typical 8+ core CPUs
20-
rqConcurrency uint64 = 6
20+
rqConcurrency uint64 = 1
2121
// Target single-block output for up to 1 GiB files with padding headroom (~1.25 GiB)
2222
rqBlockSize int = 1280 * 1024 * 1024 // bytes (1,280 MiB)
2323
)

supernode/services/cascade/download.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
163163
if targetRequiredCount < 1 && totalSymbols > 0 {
164164
targetRequiredCount = 1
165165
}
166-
logtrace.Info(ctx, "Retrieving all symbols for decode", fields)
166+
logtrace.Info(ctx, "Retrieving target-required symbols for decode", fields)
167167

168168
// Enable retrieve metrics capture for this action
169169
cm.StartRetrieveCapture(actionID)
@@ -173,7 +173,13 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
173173
retrieveStart := time.Now()
174174
// Tag context with metrics task ID (actionID)
175175
ctxRetrieve := cm.WithTaskID(ctx, actionID)
176-
symbols, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, totalSymbols, actionID)
176+
// Retrieve only a fraction of symbols (targetRequiredCount) based on redundancy
177+
// The DHT will short-circuit once it finds the required number across the provided keys
178+
reqCount := targetRequiredCount
179+
if reqCount > totalSymbols {
180+
reqCount = totalSymbols
181+
}
182+
symbols, err := task.P2PClient.BatchRetrieve(ctxRetrieve, allSymbols, reqCount, actionID)
177183
if err != nil {
178184
fields[logtrace.FieldError] = err.Error()
179185
logtrace.Error(ctx, "batch retrieve failed", fields)

0 commit comments

Comments
 (0)