diff --git a/sdk/action/client.go b/sdk/action/client.go index ba7f7c5d..d6a3e2ff 100644 --- a/sdk/action/client.go +++ b/sdk/action/client.go @@ -16,7 +16,7 @@ import ( // //go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go type Client interface { - StartCascade(ctx context.Context, data []byte, actionID string) (string, error) + StartCascade(ctx context.Context, filePath string, actionID string) (string, error) DeleteTask(ctx context.Context, taskID string) error GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool) SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error @@ -54,17 +54,17 @@ func NewClient(ctx context.Context, config config.Config, logger log.Logger, key } // StartCascade initiates a cascade operation -func (c *ClientImpl) StartCascade(ctx context.Context, data []byte, actionID string) (string, error) { +func (c *ClientImpl) StartCascade(ctx context.Context, filePath string, actionID string) (string, error) { if actionID == "" { c.logger.Error(ctx, "Empty action ID provided") return "", ErrEmptyActionID } - if len(data) == 0 { - c.logger.Error(ctx, "Empty data provided") + if filePath == "" { + c.logger.Error(ctx, "Empty file path provided") return "", ErrEmptyData } - taskID, err := c.taskManager.CreateCascadeTask(ctx, data, actionID) + taskID, err := c.taskManager.CreateCascadeTask(ctx, filePath, actionID) if err != nil { c.logger.Error(ctx, "Failed to create cascade task", "error", err) return "", fmt.Errorf("failed to create cascade task: %w", err) diff --git a/sdk/action/client_test.go b/sdk/action/client_test.go deleted file mode 100644 index 096c572c..00000000 --- a/sdk/action/client_test.go +++ /dev/null @@ -1,194 +0,0 @@ -package action - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - "github.com/LumeraProtocol/supernode/sdk/config" - "github.com/LumeraProtocol/supernode/sdk/event" - "github.com/LumeraProtocol/supernode/sdk/log" - "github.com/LumeraProtocol/supernode/sdk/task" - - mocktask "github.com/LumeraProtocol/supernode/sdk/task/testutil/mocks" -) - -func newClientWithMock(mgr *mocktask.Manager) *ClientImpl { - return &ClientImpl{ - config: config.Config{}, // empty fixture - taskManager: mgr, // injected mock - logger: log.NewNoopLogger(), // quiet logger - } -} - -func TestClient_StartCascade(t *testing.T) { - ctx := context.Background() - payload := []byte("hello-world") - actionID := "act-123" - mockErr := errors.New("boom") - - tests := map[string]struct { - data []byte - actionID string - setupMock func(*mocktask.Manager) - wantTaskID string - wantErrMatch func(error) bool - }{ - "happy path": { - data: payload, - actionID: actionID, - setupMock: func(m *mocktask.Manager) { - m.On("CreateCascadeTask", ctx, payload, actionID). - Return("task-42", nil) - }, - wantTaskID: "task-42", - wantErrMatch: func(err error) bool { return err == nil }, - }, - "empty action id": { - data: payload, - actionID: "", - setupMock: func(*mocktask.Manager) {}, - wantErrMatch: func(err error) bool { return errors.Is(err, ErrEmptyActionID) }, - }, - "empty data": { - data: nil, - actionID: actionID, - setupMock: func(*mocktask.Manager) {}, - wantErrMatch: func(err error) bool { return errors.Is(err, ErrEmptyData) }, - }, - "manager failure": { - data: payload, - actionID: actionID, - setupMock: func(m *mocktask.Manager) { - m.On("CreateCascadeTask", ctx, payload, actionID). - Return("", mockErr) - }, - wantErrMatch: func(err error) bool { - return err != nil && errors.Is(err, mockErr) - }, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - mockMgr := mocktask.NewManager(t) - tc.setupMock(mockMgr) - - client := newClientWithMock(mockMgr) - - gotID, err := client.StartCascade(ctx, tc.data, tc.actionID) - assert.True(t, tc.wantErrMatch(err), "error assertion failed") - assert.Equal(t, tc.wantTaskID, gotID) - }) - } -} - -func TestClient_GetTask(t *testing.T) { - ctx := context.Background() - entry := &task.TaskEntry{TaskID: "tid-7"} - - tests := map[string]struct { - taskID string - setupMock func(*mocktask.Manager) - wantEntry *task.TaskEntry - wantFound bool - }{ - "found": { - taskID: "tid-7", - setupMock: func(m *mocktask.Manager) { - m.On("GetTask", ctx, "tid-7").Return(entry, true) - }, - wantEntry: entry, wantFound: true, - }, - "missing": { - taskID: "tid-404", - setupMock: func(m *mocktask.Manager) { - m.On("GetTask", ctx, "tid-404").Return((*task.TaskEntry)(nil), false) - }, - wantEntry: nil, wantFound: false, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - mockMgr := mocktask.NewManager(t) - tc.setupMock(mockMgr) - - client := newClientWithMock(mockMgr) - got, ok := client.GetTask(ctx, tc.taskID) - - assert.Equal(t, tc.wantFound, ok) - assert.Equal(t, tc.wantEntry, got) - }) - } -} - -func TestClient_DeleteTask(t *testing.T) { - ctx := context.Background() - mockErr := errors.New("delete fail") - - tests := map[string]struct { - taskID string - setupMock func(*mocktask.Manager) - wantErrNil bool - }{ - "happy path": { - taskID: "tid-1", - setupMock: func(m *mocktask.Manager) { - m.On("DeleteTask", ctx, "tid-1").Return(nil) - }, - wantErrNil: true, - }, - "empty id": { - taskID: "", - setupMock: func(*mocktask.Manager) {}, - wantErrNil: false, - }, - "manager error": { - taskID: "tid-2", - setupMock: func(m *mocktask.Manager) { - m.On("DeleteTask", ctx, "tid-2").Return(mockErr) - }, - wantErrNil: false, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - mockMgr := mocktask.NewManager(t) - tc.setupMock(mockMgr) - - client := newClientWithMock(mockMgr) - err := client.DeleteTask(ctx, tc.taskID) - - assert.Equal(t, tc.wantErrNil, err == nil) - }) - } -} - -func TestClient_Subscribe(t *testing.T) { - ctx := context.Background() - handler := func(context.Context, event.Event) {} - - mockMgr := mocktask.NewManager(t) - - mockMgr. - On("SubscribeToEvents", ctx, event.EventType("foo"), mock.AnythingOfType("event.Handler")). - Once() - mockMgr. - On("SubscribeToAllEvents", ctx, mock.AnythingOfType("event.Handler")). - Once() - - client := newClientWithMock(mockMgr) - - assert.NoError(t, client.SubscribeToEvents(ctx, "foo", handler)) - assert.NoError(t, client.SubscribeToAllEvents(ctx, handler)) - - // nil Manager branch - nilClient := &ClientImpl{} - assert.Error(t, nilClient.SubscribeToEvents(ctx, "bar", handler)) - assert.Error(t, nilClient.SubscribeToAllEvents(ctx, handler)) -} diff --git a/sdk/adapters/supernodeservice/adapter.go b/sdk/adapters/supernodeservice/adapter.go index 3588bb63..3d5fe314 100644 --- a/sdk/adapters/supernodeservice/adapter.go +++ b/sdk/adapters/supernodeservice/adapter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade" "github.com/LumeraProtocol/supernode/pkg/net" @@ -42,30 +43,47 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca return nil, err } + // Open the file for reading + file, err := os.Open(in.FilePath) + if err != nil { + a.logger.Error(ctx, "Failed to open file", "filePath", in.FilePath, "error", err) + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Get file stats for progress tracking + fileInfo, err := file.Stat() + if err != nil { + a.logger.Error(ctx, "Failed to get file stats", "filePath", in.FilePath, "error", err) + return nil, fmt.Errorf("failed to get file stats: %w", err) + } + totalBytes := fileInfo.Size() + // Define chunk size const chunkSize = 1024 // 1 KB // Keep track of how much data we've processed bytesRead := int64(0) - totalBytes := int64(len(in.Data)) chunkIndex := 0 + buffer := make([]byte, chunkSize) // Read and send data in chunks - for bytesRead < totalBytes { - // Determine size of the next chunk - end := bytesRead + chunkSize - if end > totalBytes { - end = totalBytes + for { + // Read a chunk from the file + n, err := file.Read(buffer) + if err == io.EOF { + break + } + if err != nil { + a.logger.Error(ctx, "Failed to read file chunk", "chunkIndex", chunkIndex, "error", err) + return nil, fmt.Errorf("failed to read file chunk: %w", err) } - // Prepare the chunk data - chunkData := in.Data[bytesRead:end] - - // Create the chunk request + // Create the chunk request with just the bytes we read chunk := &cascade.RegisterRequest{ RequestType: &cascade.RegisterRequest_Chunk{ Chunk: &cascade.DataChunk{ - Data: chunkData, + Data: buffer[:n], }, }, } @@ -75,10 +93,10 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca return nil, fmt.Errorf("failed to send chunk: %w", err) } - bytesRead += int64(len(chunkData)) + bytesRead += int64(n) progress := float64(bytesRead) / float64(totalBytes) * 100 - a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", len(chunkData), "progress", fmt.Sprintf("%.1f%%", progress)) + a.logger.Debug(ctx, "Sent data chunk", "chunkIndex", chunkIndex, "chunkSize", n, "progress", fmt.Sprintf("%.1f%%", progress)) chunkIndex++ } @@ -117,13 +135,7 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca } // Log the streamed progress update - a.logger.Info(ctx, "Supernode progress update received", - "event_type", resp.EventType, - "message", resp.Message, - "tx_hash", resp.TxHash, - "task_id", in.TaskId, - "action_id", in.ActionID, - ) + a.logger.Info(ctx, "Supernode progress update received", "event_type", resp.EventType, "message", resp.Message, "tx_hash", resp.TxHash, "task_id", in.TaskId, "action_id", in.ActionID) if in.EventLogger != nil { in.EventLogger(ctx, toSdkEvent(resp.EventType), resp.Message, nil) diff --git a/sdk/adapters/supernodeservice/types.go b/sdk/adapters/supernodeservice/types.go index 6a11f74e..1c864fbb 100644 --- a/sdk/adapters/supernodeservice/types.go +++ b/sdk/adapters/supernodeservice/types.go @@ -16,7 +16,7 @@ type LoggerFunc func( ) type CascadeSupernodeRegisterRequest struct { - Data []byte + FilePath string ActionID string TaskId string EventLogger LoggerFunc diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index b058cc77..34293c85 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -24,15 +24,15 @@ const ( type CascadeTask struct { BaseTask - data []byte + filePath string actionId string } // NewCascadeTask creates a new CascadeTask using a BaseTask plus cascade-specific parameters -func NewCascadeTask(base BaseTask, data []byte, actionId string) *CascadeTask { +func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTask { return &CascadeTask{ BaseTask: base, - data: data, + filePath: filePath, actionId: actionId, } } @@ -153,7 +153,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, factoryCfg) req := &supernodeservice.CascadeSupernodeRegisterRequest{ - Data: t.data, + FilePath: t.filePath, ActionID: t.ActionID, TaskId: t.TaskID, } diff --git a/sdk/task/manager.go b/sdk/task/manager.go index c87eec87..45d07523 100644 --- a/sdk/task/manager.go +++ b/sdk/task/manager.go @@ -19,7 +19,7 @@ const MAX_EVENT_WORKERS = 100 // //go:generate mockery --name=Manager --output=testutil/mocks --outpkg=mocks --filename=manager_mock.go type Manager interface { - CreateCascadeTask(ctx context.Context, data []byte, actionID string) (string, error) + CreateCascadeTask(ctx context.Context, filePath string, actionID string) (string, error) GetTask(ctx context.Context, taskID string) (*TaskEntry, bool) DeleteTask(ctx context.Context, taskID string) error SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) @@ -78,14 +78,7 @@ func NewManager(ctx context.Context, config config.Config, logger log.Logger, kr } // CreateCascadeTask creates and starts a Cascade task using the new pattern -func (m *ManagerImpl) CreateCascadeTask( - ctx context.Context, - data []byte, - actionID string, -) (string, error) { - - // Generate task ID - // slice this to 8 bytes +func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, actionID string) (string, error) { taskID := uuid.New().String()[:8] m.logger.Debug(ctx, "Generated task ID", "taskID", taskID) @@ -101,7 +94,7 @@ func (m *ManagerImpl) CreateCascadeTask( logger: m.logger, } // Create cascade-specific task - task := NewCascadeTask(baseTask, data, actionID) + task := NewCascadeTask(baseTask, filePath, actionID) // Store task in cache m.taskCache.Set(ctx, taskID, task, TaskTypeCascade) @@ -109,10 +102,7 @@ func (m *ManagerImpl) CreateCascadeTask( // Ensure task is stored before returning m.taskCache.Wait() - // Start task asynchronously go func() { - // Create a separate context for the goroutine - m.logger.Debug(ctx, "Starting cascade task asynchronously", "taskID", taskID) err := task.Run(ctx) if err != nil { diff --git a/tests/system/e2e_cascade_test.go b/tests/system/e2e_cascade_test.go index c68c79e3..a2752635 100644 --- a/tests/system/e2e_cascade_test.go +++ b/tests/system/e2e_cascade_test.go @@ -440,8 +440,8 @@ func TestCascadeE2E(t *testing.T) { t.Logf("Starting cascade operation with action ID: %s", actionID) taskID, err := actionClient.StartCascade( ctx, - data, // data []byte - actionID, // Action ID from the transaction + testFileFullpath, // path + actionID, // Action ID from the transaction ) require.NoError(t, err, "Failed to start cascade operation") t.Logf("Cascade operation started with task ID: %s", taskID)