diff --git a/supernode/node/action/server/cascade/cascade_action_server.go b/supernode/node/action/server/cascade/cascade_action_server.go index e96fa483..e2fe9c08 100644 --- a/supernode/node/action/server/cascade/cascade_action_server.go +++ b/supernode/node/action/server/cascade/cascade_action_server.go @@ -248,6 +248,17 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS var restoredFilePath string var tmpDir string + // Ensure tmpDir is cleaned up even if errors occur after retrieval + defer func() { + if tmpDir != "" { + if err := task.CleanupDownload(ctx, tmpDir); err != nil { + logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{logtrace.FieldError: err.Error()}) + } else { + logtrace.Info(ctx, "tmp dir has been cleaned up", logtrace.Fields{"tmp_dir": tmpDir}) + } + } + }() + err := task.Download(ctx, &cascadeService.DownloadRequest{ ActionID: req.GetActionId(), }, func(resp *cascadeService.DownloadResponse) error { @@ -320,14 +331,7 @@ func (server *ActionServer) Download(req *pb.DownloadRequest, stream pb.CascadeS } } - err = task.CleanupDownload(ctx, tmpDir) - if err != nil { - logtrace.Error(ctx, "error cleaning up the tmp dir", logtrace.Fields{ - logtrace.FieldError: err.Error(), - }) - } - fields["tmp_dir"] = tmpDir - logtrace.Info(ctx, "tmp dir has been cleaned up", fields) + // Cleanup is handled in deferred block above logtrace.Info(ctx, "completed streaming all chunks", fields) return nil diff --git a/supernode/services/cascade/download.go b/supernode/services/cascade/download.go index 3121e6ad..146c8016 100644 --- a/supernode/services/cascade/download.go +++ b/supernode/services/cascade/download.go @@ -14,6 +14,7 @@ import ( "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/LumeraProtocol/supernode/v2/supernode/services/cascade/adaptors" + "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) const ( @@ -35,10 +36,20 @@ func (task *CascadeRegistrationTask) Download( ctx context.Context, req *DownloadRequest, send func(resp *DownloadResponse) error, -) error { +) (err error) { fields := logtrace.Fields{logtrace.FieldMethod: "Download", logtrace.FieldRequest: req} logtrace.Info(ctx, "cascade-action-download request received", fields) + // Ensure task status is finalized regardless of outcome + defer func() { + if err != nil { + task.UpdateStatus(common.StatusTaskCanceled) + } else { + task.UpdateStatus(common.StatusTaskCompleted) + } + task.Cancel() + }() + actionDetails, err := task.LumeraClient.GetAction(ctx, req.ActionID) if err != nil { fields[logtrace.FieldError] = err diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index 182d69ac..7ccb9fc3 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -5,6 +5,7 @@ import ( "os" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" + "github.com/LumeraProtocol/supernode/v2/supernode/services/common" ) // RegisterRequest contains parameters for upload request @@ -42,11 +43,32 @@ func (task *CascadeRegistrationTask) Register( ctx context.Context, req *RegisterRequest, send func(resp *RegisterResponse) error, -) error { +) (err error) { fields := logtrace.Fields{logtrace.FieldMethod: "Register", logtrace.FieldRequest: req} logtrace.Info(ctx, "cascade-action-registration request received", fields) + // Ensure task status and resources are finalized regardless of outcome + defer func() { + if err != nil { + task.UpdateStatus(common.StatusTaskCanceled) + } else { + task.UpdateStatus(common.StatusTaskCompleted) + } + task.Cancel() + }() + + // Always attempt to remove the uploaded file path + defer func() { + if req != nil && req.FilePath != "" { + if remErr := os.RemoveAll(req.FilePath); remErr != nil { + logtrace.Warn(ctx, "error removing file", fields) + } else { + logtrace.Info(ctx, "input file has been cleaned up", fields) + } + } + }() + /* 1. Fetch & validate action -------------------------------------------------- */ action, err := task.fetchAction(ctx, req.ActionID, fields) if err != nil { @@ -140,10 +162,5 @@ func (task *CascadeRegistrationTask) Register( logtrace.Info(ctx, "action has been finalized", fields) task.streamEvent(SupernodeEventTypeActionFinalized, "action has been finalized", txHash, send) - err = os.RemoveAll(req.FilePath) - if err != nil { - logtrace.Warn(ctx, "error removing file", fields) - } - return nil }