Skip to content

Commit 529e74d

Browse files
committed
calculate download file hash incrementally
1 parent 4ce9bec commit 529e74d

File tree

4 files changed

+200
-31
lines changed

4 files changed

+200
-31
lines changed

pkg/crypto/hash.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,40 @@
11
package crypto
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"lukechampine.com/blake3"
7+
"os"
8+
)
9+
10+
const defaultHashBufferSize = 1024 * 1024 // 1 MB
11+
12+
func HashFileIncrementally(filePath string, bufferSize int) ([]byte, error) {
13+
f, err := os.Open(filePath)
14+
if err != nil {
15+
return nil, fmt.Errorf("open decoded file: %w", err)
16+
}
17+
defer f.Close()
18+
19+
if bufferSize == 0 {
20+
bufferSize = defaultHashBufferSize
21+
}
22+
23+
hasher := blake3.New(32, nil)
24+
buf := make([]byte, bufferSize) // 4MB buffer to balance memory vs I/O
25+
26+
for {
27+
n, readErr := f.Read(buf)
28+
if n > 0 {
29+
hasher.Write(buf[:n])
30+
}
31+
if readErr == io.EOF {
32+
break
33+
}
34+
if readErr != nil {
35+
return nil, fmt.Errorf("streaming file read failed: %w", readErr)
36+
}
37+
}
38+
39+
return hasher.Sum(nil), nil
40+
}

pkg/crypto/hash_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package crypto
2+
3+
import (
4+
"encoding/hex"
5+
"os"
6+
"path/filepath"
7+
"testing"
8+
9+
"lukechampine.com/blake3"
10+
)
11+
12+
func TestHashFileIncrementally(t *testing.T) {
13+
expectedBlake3 := func(data []byte) string {
14+
h := blake3.New(32, nil)
15+
h.Write(data)
16+
return hex.EncodeToString(h.Sum(nil))
17+
}
18+
19+
testData := []byte("hello world")
20+
emptyData := []byte("")
21+
largeData := make([]byte, 5*1024*1024)
22+
23+
// Temp dir for test files
24+
tmpDir := t.TempDir()
25+
26+
// Create helper function for file creation
27+
createTempFile := func(name string, content []byte) string {
28+
filePath := filepath.Join(tmpDir, name)
29+
if err := os.WriteFile(filePath, content, 0644); err != nil {
30+
t.Fatalf("failed to create temp file: %v", err)
31+
}
32+
return filePath
33+
}
34+
35+
// Create test files
36+
smallFile := createTempFile("small.txt", testData)
37+
emptyFile := createTempFile("empty.txt", emptyData)
38+
largeFile := createTempFile("large.bin", largeData)
39+
40+
tests := []struct {
41+
name string
42+
filePath string
43+
bufferSize int
44+
wantHash string
45+
wantErr bool
46+
}{
47+
{
48+
name: "small file",
49+
filePath: smallFile,
50+
bufferSize: 4 * 1024, // 4KB buffer
51+
wantHash: expectedBlake3(testData),
52+
wantErr: false,
53+
},
54+
{
55+
name: "empty file",
56+
filePath: emptyFile,
57+
bufferSize: 1024, // small buffer
58+
wantHash: expectedBlake3(emptyData),
59+
wantErr: false,
60+
},
61+
{
62+
name: "large file",
63+
filePath: largeFile,
64+
bufferSize: 1024 * 1024, // 1MB buffer
65+
wantHash: expectedBlake3(largeData),
66+
wantErr: false,
67+
},
68+
{
69+
name: "file does not exist",
70+
filePath: filepath.Join(tmpDir, "doesnotexist.txt"),
71+
bufferSize: 4096,
72+
wantHash: "",
73+
wantErr: true,
74+
},
75+
{
76+
name: "zero buffer size (should use default)",
77+
filePath: smallFile,
78+
bufferSize: 0,
79+
wantHash: expectedBlake3(testData),
80+
wantErr: false,
81+
},
82+
}
83+
84+
for _, tt := range tests {
85+
t.Run(tt.name, func(t *testing.T) {
86+
gotHash, err := HashFileIncrementally(tt.filePath, tt.bufferSize)
87+
88+
if (err != nil) != tt.wantErr {
89+
t.Fatalf("expected error=%v, got err=%v", tt.wantErr, err)
90+
}
91+
92+
if !tt.wantErr && gotHash != tt.wantHash {
93+
t.Errorf("hash mismatch!\n got: %s\n want: %s", gotHash, tt.wantHash)
94+
}
95+
})
96+
}
97+
}

supernode/node/action/server/cascade/cascade_action_server.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
235235
}
236236
}
237237

238-
var restoredFile []byte
238+
var restoredFilePath string
239239
var tmpDir string
240240

241241
err := task.Download(ctx, &cascadeService.DownloadRequest{
@@ -250,8 +250,8 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
250250
},
251251
}
252252

253-
if len(resp.Artefacts) > 0 {
254-
restoredFile = resp.Artefacts
253+
if resp.FilePath != "" {
254+
restoredFilePath = resp.FilePath
255255
tmpDir = resp.DownloadedDir
256256
}
257257

@@ -265,15 +265,23 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
265265
return err
266266
}
267267

268-
if len(restoredFile) == 0 {
268+
if restoredFilePath == "" {
269269
logtrace.Error(ctx, "no artefact file retrieved", fields)
270270
return fmt.Errorf("no artefact to stream")
271271
}
272272
logtrace.Info(ctx, "streaming artefact file in chunks", fields)
273273

274+
restoredFile, err := readFileContentsInChunks(restoredFilePath)
275+
if err != nil {
276+
logtrace.Error(ctx, "failed to read restored file", logtrace.Fields{
277+
logtrace.FieldError: err.Error(),
278+
})
279+
return err
280+
}
281+
logtrace.Info(ctx, "file has been read in chunks", fields)
282+
274283
// Calculate optimal chunk size based on file size
275284
chunkSize := calculateOptimalChunkSize(int64(len(restoredFile)))
276-
277285
logtrace.Info(ctx, "calculated optimal chunk size for download", logtrace.Fields{
278286
"file_size": len(restoredFile),
279287
"chunk_size": chunkSize,
@@ -314,3 +322,30 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
314322
logtrace.Info(ctx, "completed streaming all chunks", fields)
315323
return nil
316324
}
325+
326+
func readFileContentsInChunks(filePath string) ([]byte, error) {
327+
f, err := os.Open(filePath)
328+
if err != nil {
329+
return nil, err
330+
}
331+
defer f.Close()
332+
333+
buf := make([]byte, 1024*1024)
334+
var fileBytes []byte
335+
336+
for {
337+
n, readErr := f.Read(buf)
338+
if n > 0 {
339+
// Process chunk
340+
fileBytes = append(fileBytes, buf[:n]...)
341+
}
342+
if readErr == io.EOF {
343+
break
344+
}
345+
if readErr != nil {
346+
return nil, fmt.Errorf("chunked read failed: %w", readErr)
347+
}
348+
}
349+
350+
return fileBytes, nil
351+
}

supernode/services/cascade/download.go

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
actiontypes "github.com/LumeraProtocol/lumera/x/action/v1/types"
1111
"github.com/LumeraProtocol/supernode/pkg/codec"
12+
"github.com/LumeraProtocol/supernode/pkg/crypto"
1213
"github.com/LumeraProtocol/supernode/pkg/errors"
1314
"github.com/LumeraProtocol/supernode/pkg/logtrace"
1415
"github.com/LumeraProtocol/supernode/pkg/utils"
@@ -26,7 +27,7 @@ type DownloadRequest struct {
2627
type DownloadResponse struct {
2728
EventType SupernodeEventType
2829
Message string
29-
Artefacts []byte
30+
FilePath string
3031
DownloadedDir string
3132
}
3233

@@ -44,7 +45,7 @@ func (task *CascadeRegistrationTask) Download(
4445
return task.wrapErr(ctx, "failed to get action", err, fields)
4546
}
4647
logtrace.Info(ctx, "action has been retrieved", fields)
47-
task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", nil, "", send)
48+
task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", "", "", send)
4849

4950
if actionDetails.GetAction().State != actiontypes.ActionStateDone {
5051
err = errors.New("action is not in a valid state")
@@ -53,28 +54,28 @@ func (task *CascadeRegistrationTask) Download(
5354
return task.wrapErr(ctx, "action not found", err, fields)
5455
}
5556
logtrace.Info(ctx, "action has been validated", fields)
56-
task.streamDownloadEvent(SupernodeEventTypeActionFinalized, "action state has been validated", nil, "", send)
57+
task.streamDownloadEvent(SupernodeEventTypeActionFinalized, "action state has been validated", "", "", send)
5758

5859
metadata, err := task.decodeCascadeMetadata(ctx, actionDetails.GetAction().Metadata, fields)
5960
if err != nil {
6061
fields[logtrace.FieldError] = err.Error()
6162
return task.wrapErr(ctx, "error decoding cascade metadata", err, fields)
6263
}
6364
logtrace.Info(ctx, "cascade metadata has been decoded", fields)
64-
task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "metadata has been decoded", nil, "", send)
65+
task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "metadata has been decoded", "", "", send)
6566

66-
file, tmpDir, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields)
67+
filePath, tmpDir, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields)
6768
if err != nil {
6869
fields[logtrace.FieldError] = err.Error()
6970
return task.wrapErr(ctx, "failed to download artifacts", err, fields)
7071
}
7172
logtrace.Info(ctx, "artifacts have been downloaded", fields)
72-
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "artifacts have been downloaded", file, tmpDir, send)
73+
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, "artifacts have been downloaded", filePath, tmpDir, send)
7374

7475
return nil
7576
}
7677

77-
func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) ([]byte, string, error) {
78+
func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) (string, string, error) {
7879
logtrace.Info(ctx, "started downloading the artifacts", fields)
7980

8081
var layout codec.Layout
@@ -106,7 +107,7 @@ func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, acti
106107
}
107108

108109
if len(layout.Blocks) == 0 {
109-
return nil, "", errors.New("no symbols found in RQ metadata")
110+
return "", "", errors.New("no symbols found in RQ metadata")
110111
}
111112

112113
return task.restoreFileFromLayout(ctx, layout, metadata.DataHash, actionID)
@@ -117,7 +118,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
117118
layout codec.Layout,
118119
dataHash string,
119120
actionID string,
120-
) ([]byte, string, error) {
121+
) (string, string, error) {
121122

122123
fields := logtrace.Fields{
123124
logtrace.FieldActionID: actionID,
@@ -139,7 +140,7 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
139140
if err != nil {
140141
fields[logtrace.FieldError] = err.Error()
141142
logtrace.Error(ctx, "failed to retrieve symbols", fields)
142-
return nil, "", fmt.Errorf("failed to retrieve symbols: %w", err)
143+
return "", "", fmt.Errorf("failed to retrieve symbols: %w", err)
143144
}
144145

145146
fields["retrievedSymbols"] = len(symbols)
@@ -154,40 +155,37 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
154155
if err != nil {
155156
fields[logtrace.FieldError] = err.Error()
156157
logtrace.Error(ctx, "failed to decode symbols", fields)
157-
return nil, "", fmt.Errorf("decode symbols using RaptorQ: %w", err)
158+
return "", "", fmt.Errorf("decode symbols using RaptorQ: %w", err)
158159
}
159160

160-
file, err := os.ReadFile(decodeInfo.FilePath)
161+
fileHash, err := crypto.HashFileIncrementally(decodeInfo.FilePath, 0)
161162
if err != nil {
162163
fields[logtrace.FieldError] = err.Error()
163-
logtrace.Error(ctx, "failed to read file", fields)
164-
return nil, "", fmt.Errorf("read decoded file: %w", err)
164+
logtrace.Error(ctx, "failed to hash file", fields)
165+
return "", "", fmt.Errorf("hash file: %w", err)
165166
}
166-
167-
// 3. Validate hash (Blake3)
168-
fileHash, err := utils.Blake3Hash(file)
169-
if err != nil {
170-
fields[logtrace.FieldError] = err.Error()
171-
logtrace.Error(ctx, "failed to do hash", fields)
172-
return nil, "", fmt.Errorf("hash file: %w", err)
167+
if fileHash == nil {
168+
fields[logtrace.FieldError] = "file hash is nil"
169+
logtrace.Error(ctx, "failed to hash file", fields)
170+
return "", "", errors.New("file hash is nil")
173171
}
174172

175173
err = task.verifyDataHash(ctx, fileHash, dataHash, fields)
176174
if err != nil {
177175
logtrace.Error(ctx, "failed to verify hash", fields)
178176
fields[logtrace.FieldError] = err.Error()
179-
return nil, decodeInfo.DecodeTmpDir, err
177+
return "", decodeInfo.DecodeTmpDir, err
180178
}
181-
182179
logtrace.Info(ctx, "file successfully restored and hash verified", fields)
183-
return file, decodeInfo.DecodeTmpDir, nil
180+
181+
return decodeInfo.FilePath, decodeInfo.DecodeTmpDir, nil
184182
}
185183

186-
func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg string, file []byte, tmpDir string, send func(resp *DownloadResponse) error) {
184+
func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg string, filePath string, tmpDir string, send func(resp *DownloadResponse) error) {
187185
_ = send(&DownloadResponse{
188186
EventType: eventType,
189187
Message: msg,
190-
Artefacts: file,
188+
FilePath: filePath,
191189
DownloadedDir: tmpDir,
192190
})
193191

0 commit comments

Comments
 (0)