Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,17 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
var restoredFilePath string
var tmpDir string

// Ensure tmpDir is cleaned up even if errors occur after retrieval
defer func() {
if tmpDir != "" {
if err := task.CleanupDownload(ctx, tmpDir); err != nil {
logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{logtrace.FieldError: err.Error()})
} else {
logtrace.Info(ctx, "tmp dir has been cleaned up", logtrace.Fields{"tmp_dir": tmpDir})
}
}
}()

err := task.Download(ctx, &cascadeService.DownloadRequest{
ActionID: req.GetActionId(),
}, func(resp *cascadeService.DownloadResponse) error {
Expand Down Expand Up @@ -320,14 +331,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS
}
}

err = task.CleanupDownload(ctx, tmpDir)
if err != nil {
logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{
logtrace.FieldError: err.Error(),
})
}
fields["tmp_dir"] = tmpDir
logtrace.Info(ctx, "tmp dir has been cleaned up", fields)
// Cleanup is handled in deferred block above

logtrace.Info(ctx, "completed streaming all chunks", fields)
return nil
Expand Down
13 changes: 12 additions & 1 deletion supernode/services/cascade/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"
"github.com/LumeraProtocol/supernode/v2/supernode/services/cascade/adaptors"
"github.com/LumeraProtocol/supernode/v2/supernode/services/common"
)

const (
Expand All @@ -35,10 +36,20 @@ func (task *CascadeRegistrationTask) Download(
ctx context.Context,
req *DownloadRequest,
send func(resp *DownloadResponse) error,
) error {
) (err error) {
fields := logtrace.Fields{logtrace.FieldMethod: "Download", logtrace.FieldRequest: req}
logtrace.Info(ctx, "cascade-action-download request received", fields)

// Ensure task status is finalized regardless of outcome
defer func() {
if err != nil {
task.UpdateStatus(common.StatusTaskCanceled)
} else {
task.UpdateStatus(common.StatusTaskCompleted)
}
task.Cancel()
}()

actionDetails, err := task.LumeraClient.GetAction(ctx, req.ActionID)
if err != nil {
fields[logtrace.FieldError] = err
Expand Down
29 changes: 23 additions & 6 deletions supernode/services/cascade/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"

"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/LumeraProtocol/supernode/v2/supernode/services/common"
)

// RegisterRequest contains parameters for upload request
Expand Down Expand Up @@ -42,11 +43,32 @@ func (task *CascadeRegistrationTask) Register(
ctx context.Context,
req *RegisterRequest,
send func(resp *RegisterResponse) error,
) error {
) (err error) {

fields := logtrace.Fields{logtrace.FieldMethod: "Register", logtrace.FieldRequest: req}
logtrace.Info(ctx, "cascade-action-registration request received", fields)

// Ensure task status and resources are finalized regardless of outcome
defer func() {
if err != nil {
task.UpdateStatus(common.StatusTaskCanceled)
} else {
task.UpdateStatus(common.StatusTaskCompleted)
}
task.Cancel()
}()

// Always attempt to remove the uploaded file path
defer func() {
if req != nil && req.FilePath != "" {
if remErr := os.RemoveAll(req.FilePath); remErr != nil {
logtrace.Warn(ctx, "error removing file", fields)
} else {
logtrace.Info(ctx, "input file has been cleaned up", fields)
}
}
}()

/* 1. Fetch & validate action -------------------------------------------------- */
action, err := task.fetchAction(ctx, req.ActionID, fields)
if err != nil {
Expand Down Expand Up @@ -140,10 +162,5 @@ func (task *CascadeRegistrationTask) Register(
logtrace.Info(ctx, "action has been finalized", fields)
task.streamEvent(SupernodeEventTypeActionFinalized, "action has been finalized", txHash, send)

err = os.RemoveAll(req.FilePath)
if err != nil {
logtrace.Warn(ctx, "error removing file", fields)
}

return nil
}