Skip to content

Commit a5da9e7

Browse files
committed
implement supernode processing for download
1 parent 7df1f6d commit a5da9e7

File tree

16 files changed

+813
-107
lines changed

16 files changed

+813
-107
lines changed

gen/supernode/action/cascade/service.pb.go

Lines changed: 304 additions & 89 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/supernode/action/cascade/service_grpc.pb.go

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/codec/codec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ type EncodeRequest struct {
3737
type Codec interface {
3838
// Encode a file
3939
Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error)
40+
Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error)
4041
}

pkg/codec/codec_mock.go

Lines changed: 17 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/codec/decode.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package codec
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
9+
raptorq "github.com/LumeraProtocol/rq-go"
10+
"github.com/LumeraProtocol/supernode/pkg/logtrace"
11+
)
12+
13+
type DecodeRequest struct {
14+
ActionID string
15+
Layout Layout
16+
Symbols map[string][]byte
17+
}
18+
19+
type DecodeResponse struct {
20+
Path string
21+
LayoutPath string
22+
}
23+
24+
func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) {
25+
fields := logtrace.Fields{
26+
logtrace.FieldMethod: "Decode",
27+
logtrace.FieldModule: "rq",
28+
logtrace.FieldActionID: req.ActionID,
29+
}
30+
logtrace.Info(ctx, "RaptorQ decode request received", fields)
31+
32+
processor, err := raptorq.NewDefaultRaptorQProcessor()
33+
if err != nil {
34+
fields[logtrace.FieldError] = err.Error()
35+
return DecodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err)
36+
}
37+
defer processor.Free()
38+
39+
symbolsDir := filepath.Join(rq.symbolsBaseDir, req.ActionID)
40+
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
41+
fields[logtrace.FieldError] = err.Error()
42+
return DecodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err)
43+
}
44+
45+
// Write symbols to disk
46+
for id, data := range req.Symbols {
47+
symbolPath := filepath.Join(symbolsDir, id)
48+
if err := os.WriteFile(symbolPath, data, 0o644); err != nil {
49+
fields[logtrace.FieldError] = err.Error()
50+
return DecodeResponse{}, fmt.Errorf("write symbol %s: %w", id, err)
51+
}
52+
}
53+
logtrace.Info(ctx, "symbols written to disk", fields)
54+
55+
// Decode
56+
outputPath := filepath.Join(symbolsDir, "output")
57+
if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil {
58+
fields[logtrace.FieldError] = err.Error()
59+
_ = os.Remove(outputPath)
60+
return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err)
61+
}
62+
63+
logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields)
64+
return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil
65+
}

pkg/logtrace/fields.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ const (
1919
FieldTaskID = "task_id"
2020
FieldActionID = "action_id"
2121
FieldHashHex = "hash_hex"
22+
FieldActionState = "action_state"
2223
)

pkg/utils/utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,21 @@ func ZstdCompress(data []byte) ([]byte, error) {
429429
return encoder.EncodeAll(data, nil), nil
430430
}
431431

432+
func ZstdDecompress(data []byte) ([]byte, error) {
433+
decoder, err := zstd.NewReader(nil)
434+
if err != nil {
435+
return nil, fmt.Errorf("failed to create zstd decoder: %v", err)
436+
}
437+
defer decoder.Close()
438+
439+
decoded, err := decoder.DecodeAll(data, nil)
440+
if err != nil {
441+
return nil, fmt.Errorf("failed to decompress zstd data: %v", err)
442+
}
443+
444+
return decoded, nil
445+
}
446+
432447
// HighCompress compresses the data
433448
func HighCompress(cctx context.Context, data []byte) ([]byte, error) {
434449
ctx, cancel := context.WithTimeout(cctx, highCompressTimeout)

proto/supernode/action/cascade/service.proto

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ option go_package = "github.com/LumeraProtocol/supernode/gen/supernode/action/ca
55
service CascadeService {
66
rpc Register (stream RegisterRequest) returns (stream RegisterResponse);
77
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
8-
8+
rpc Download (DownloadRequest) returns (stream DownloadResponse);
99
}
1010

1111
message RegisterRequest {
@@ -30,6 +30,22 @@ message RegisterResponse {
3030
string tx_hash = 3;
3131
}
3232

33+
message DownloadRequest {
34+
string action_id = 1;
35+
}
36+
37+
message DownloadResponse {
38+
oneof response_type {
39+
DownloadEvent event = 1;
40+
DataChunk chunk = 2;
41+
}
42+
}
43+
44+
message DownloadEvent {
45+
SupernodeEventType event_type = 1;
46+
string message = 2;
47+
}
48+
3349
enum SupernodeEventType {
3450
UNKNOWN = 0;
3551
ACTION_RETRIEVED = 1;
@@ -43,6 +59,7 @@ enum SupernodeEventType {
4359
RQID_VERIFIED = 9;
4460
ARTEFACTS_STORED = 10;
4561
ACTION_FINALIZED = 11;
62+
Artefacts_Downloaded = 12;
4663
}
4764

4865
message HealthCheckRequest {}

supernode/node/action/server/cascade/cascade_action_server.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,77 @@ func (server *ActionServer) HealthCheck(ctx context.Context, _ *pb.HealthCheckRe
183183
TasksInProgress: resp.TasksInProgress,
184184
}, nil
185185
}
186+
187+
func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeService_DownloadServer) error {
188+
fields := logtrace.Fields{
189+
logtrace.FieldMethod: "Download",
190+
logtrace.FieldModule: "CascadeActionServer",
191+
logtrace.FieldActionID: req.GetActionId(),
192+
}
193+
194+
ctx := stream.Context()
195+
logtrace.Info(ctx, "download request received from client", fields)
196+
197+
task := server.factory.NewCascadeRegistrationTask()
198+
199+
var restoredFile []byte
200+
201+
err := task.Download(ctx, &cascadeService.DownloadRequest{
202+
ActionID: req.GetActionId(),
203+
}, func(resp *cascadeService.DownloadResponse) error {
204+
grpcResp := &pb.DownloadResponse{
205+
ResponseType: &pb.DownloadResponse_Event{
206+
Event: &pb.DownloadEvent{
207+
EventType: pb.SupernodeEventType(resp.EventType),
208+
Message: resp.Message,
209+
},
210+
},
211+
}
212+
213+
if len(resp.Artefacts) > 0 {
214+
restoredFile = resp.Artefacts
215+
}
216+
217+
return stream.Send(grpcResp)
218+
})
219+
220+
if err != nil {
221+
logtrace.Error(ctx, "error occurred during download process", logtrace.Fields{
222+
logtrace.FieldError: err.Error(),
223+
})
224+
return err
225+
}
226+
227+
if len(restoredFile) == 0 {
228+
logtrace.Error(ctx, "no artefact file retrieved", fields)
229+
return fmt.Errorf("no artefact to stream")
230+
}
231+
logtrace.Info(ctx, "streaming artefact file in chunks", fields)
232+
233+
// Split and stream the file in 1024 byte chunks
234+
const chunkSize = 1024
235+
for i := 0; i < len(restoredFile); i += chunkSize {
236+
end := i + chunkSize
237+
if end > len(restoredFile) {
238+
end = len(restoredFile)
239+
}
240+
241+
err := stream.Send(&pb.DownloadResponse{
242+
ResponseType: &pb.DownloadResponse_Chunk{
243+
Chunk: &pb.DataChunk{
244+
Data: restoredFile[i:end],
245+
},
246+
},
247+
})
248+
249+
if err != nil {
250+
logtrace.Error(ctx, "failed to stream chunk", logtrace.Fields{
251+
logtrace.FieldError: err.Error(),
252+
})
253+
return err
254+
}
255+
}
256+
257+
logtrace.Info(ctx, "completed streaming all chunks", fields)
258+
return nil
259+
}

0 commit comments

Comments
 (0)