Skip to content

Commit e7d1324

Browse files
Improve Context handling & Optimize server settings (#119)
* Use detached context for SDK background tasks * Optimize grpc settings for file size
1 parent 604ebab commit e7d1324

File tree

8 files changed

+104
-34
lines changed

8 files changed

+104
-34
lines changed

sdk/adapters/supernodeservice/adapter.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func calculateOptimalChunkSize(fileSize int64) int {
7575
return chunkSize
7676
}
7777

78+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
79+
7880
func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error) {
7981
// Create the client stream
8082
ctx = net.AddCorrelationID(ctx)
@@ -102,6 +104,15 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
102104
}
103105
totalBytes := fileInfo.Size()
104106

107+
// Validate file size before starting upload
108+
if totalBytes > maxFileSize {
109+
a.logger.Error(ctx, "File exceeds maximum size limit",
110+
"filePath", in.FilePath,
111+
"fileSize", totalBytes,
112+
"maxSize", maxFileSize)
113+
return nil, fmt.Errorf("file size %d bytes exceeds maximum allowed size of 1GB", totalBytes)
114+
}
115+
105116
// Define adaptive chunk size based on file size
106117
chunkSize := calculateOptimalChunkSize(totalBytes)
107118

sdk/net/factory.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,17 @@ func NewClientFactory(ctx context.Context, logger log.Logger, keyring keyring.Ke
3636
logger.Debug(ctx, "Creating supernode client factory",
3737
"localAddress", config.LocalCosmosAddress)
3838

39+
// Optimized for streaming 1GB files with 4MB chunks (10 concurrent streams)
40+
opts := client.DefaultClientOptions()
41+
opts.MaxRecvMsgSize = 16 * 1024 * 1024 // 16MB to match server
42+
opts.MaxSendMsgSize = 16 * 1024 * 1024 // 16MB to match server
43+
opts.InitialWindowSize = 16 * 1024 * 1024 // 16MB per stream (4x chunk size)
44+
opts.InitialConnWindowSize = 160 * 1024 * 1024 // 160MB (16MB x 10 streams)
45+
3946
return &ClientFactory{
4047
logger: logger,
4148
keyring: keyring,
42-
clientOptions: client.DefaultClientOptions(),
49+
clientOptions: opts,
4350
config: config,
4451
lumeraClient: lumeraClient,
4552
}

sdk/task/cache.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type TaskEntry struct {
2323
Events []event.Event
2424
CreatedAt time.Time
2525
LastUpdatedAt time.Time
26+
Cancel context.CancelFunc // For cancelling long-running tasks
2627
}
2728

2829
type TaskCache struct {
@@ -64,8 +65,8 @@ func (tc *TaskCache) getOrCreateMutex(taskID string) *sync.Mutex {
6465
return mu.(*sync.Mutex)
6566
}
6667

67-
// Set stores a task in the cache with initial metadata
68-
func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType TaskType, actionID string) bool {
68+
// Set stores a task in the cache with initial metadata and optional cancel function
69+
func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType TaskType, actionID string, cancel context.CancelFunc) bool {
6970
mu := tc.getOrCreateMutex(taskID)
7071
mu.Lock()
7172
defer mu.Unlock()
@@ -82,6 +83,7 @@ func (tc *TaskCache) Set(ctx context.Context, taskID string, task Task, taskType
8283
Events: make([]event.Event, 0),
8384
CreatedAt: now,
8485
LastUpdatedAt: now,
86+
Cancel: cancel,
8587
}
8688

8789
success := tc.cache.Set(taskID, entry, 1)

sdk/task/cascade.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@ func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTas
3333

3434
// Run executes the full cascade‐task lifecycle.
3535
func (t *CascadeTask) Run(ctx context.Context) error {
36-
3736
t.LogEvent(ctx, event.SDKTaskStarted, "Running cascade task", nil)
3837

38+
// Validate file size before proceeding
39+
if err := ValidateFileSize(t.filePath); err != nil {
40+
t.LogEvent(ctx, event.SDKTaskFailed, "File validation failed", event.EventData{event.KeyError: err.Error()})
41+
return err
42+
}
43+
3944
// 1 - Fetch the supernodes
4045
supernodes, err := t.fetchSupernodes(ctx, t.Action.Height)
4146

sdk/task/helpers.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,29 @@ import (
44
"context"
55
"encoding/base64"
66
"fmt"
7+
"os"
78
"path/filepath"
89
"strings"
910

1011
"github.com/LumeraProtocol/supernode/sdk/adapters/lumera"
1112
)
1213

14+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
15+
16+
// ValidateFileSize checks if a file size is within the allowed 1GB limit
17+
func ValidateFileSize(filePath string) error {
18+
fileInfo, err := os.Stat(filePath)
19+
if err != nil {
20+
return fmt.Errorf("failed to check file: %w", err)
21+
}
22+
23+
if fileInfo.Size() > maxFileSize {
24+
return fmt.Errorf("file size %d bytes exceeds maximum allowed size of 1GB", fileInfo.Size())
25+
}
26+
27+
return nil
28+
}
29+
1330
func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lumera.Action, error) {
1431
action, err := m.lumeraClient.GetAction(ctx, actionID)
1532
if err != nil {

sdk/task/manager.go

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,20 +92,25 @@ func NewManagerWithLumeraClient(ctx context.Context, config config.Config, logge
9292

9393
// CreateCascadeTask creates and starts a Cascade task using the new pattern
9494
func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, actionID, signature string) (string, error) {
95+
// Create a detached context immediately to prevent HTTP request cancellation
96+
taskCtx, cancel := context.WithCancel(context.Background())
97+
9598
// First validate the action before creating the task
96-
action, err := m.validateAction(ctx, actionID)
99+
action, err := m.validateAction(taskCtx, actionID)
97100
if err != nil {
101+
cancel() // Clean up if validation fails
98102
return "", err
99103
}
100104

101105
// verify signature
102-
if err := m.validateSignature(ctx, action, signature); err != nil {
106+
if err := m.validateSignature(taskCtx, action, signature); err != nil {
107+
cancel() // Clean up if signature validation fails
103108
return "", err
104109
}
105110

106111
taskID := uuid.New().String()[:8]
107112

108-
m.logger.Debug(ctx, "Generated task ID", "taskID", taskID)
113+
m.logger.Debug(taskCtx, "Generated task ID", "taskID", taskID)
109114

110115
baseTask := BaseTask{
111116
TaskID: taskID,
@@ -122,23 +127,23 @@ func (m *ManagerImpl) CreateCascadeTask(ctx context.Context, filePath string, ac
122127
// Create cascade-specific task
123128
task := NewCascadeTask(baseTask, filePath, actionID)
124129

125-
// Store task in cache
126-
m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID)
130+
// Store task in cache with cancel function
131+
m.taskCache.Set(taskCtx, taskID, task, TaskTypeCascade, actionID, cancel)
127132

128133
// Ensure task is stored before returning
129134
m.taskCache.Wait()
130135

131136
go func() {
132-
m.logger.Debug(ctx, "Starting cascade task asynchronously", "taskID", taskID)
133-
err := task.Run(ctx)
137+
m.logger.Debug(taskCtx, "Starting cascade task asynchronously", "taskID", taskID)
138+
err := task.Run(taskCtx)
134139
if err != nil {
135140
// Error handling is done via events in the task.Run method
136141
// This is just a failsafe in case something goes wrong
137-
m.logger.Error(ctx, "Cascade task failed with error", "taskID", taskID, "error", err)
142+
m.logger.Error(taskCtx, "Cascade task failed with error", "taskID", taskID, "error", err)
138143
}
139144
}()
140145

141-
m.logger.Info(ctx, "Cascade task created successfully", "taskID", taskID)
146+
m.logger.Info(taskCtx, "Cascade task created successfully", "taskID", taskID)
142147
return taskID, nil
143148
}
144149

@@ -152,13 +157,19 @@ func (m *ManagerImpl) GetTask(ctx context.Context, taskID string) (*TaskEntry, b
152157
func (m *ManagerImpl) DeleteTask(ctx context.Context, taskID string) error {
153158
m.logger.Info(ctx, "Deleting task", "taskID", taskID)
154159

155-
// First check if the task exists
156-
_, exists := m.taskCache.Get(ctx, taskID)
160+
// First check if the task exists and get its entry
161+
taskEntry, exists := m.taskCache.Get(ctx, taskID)
157162
if !exists {
158163
m.logger.Warn(ctx, "Task not found for deletion", "taskID", taskID)
159164
return fmt.Errorf("task not found: %s", taskID)
160165
}
161166

167+
// Cancel the task if it has a cancel function
168+
if taskEntry.Cancel != nil {
169+
m.logger.Info(ctx, "Cancelling task before deletion", "taskID", taskID)
170+
taskEntry.Cancel()
171+
}
172+
162173
// Delete the task from the cache
163174
m.taskCache.Del(ctx, taskID)
164175

@@ -242,19 +253,25 @@ func (m *ManagerImpl) Close(ctx context.Context) {
242253
}
243254

244255
func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, outputDir string, signature string) (string, error) {
256+
// Create a detached context immediately to prevent HTTP request cancellation
257+
taskCtx, cancel := context.WithCancel(context.Background())
258+
245259
// First validate the action before creating the task
246-
action, err := m.validateDownloadAction(ctx, actionID)
260+
action, err := m.validateDownloadAction(taskCtx, actionID)
247261
if err != nil {
262+
cancel() // Clean up if validation fails
248263
return "", err
249264
}
250265
// Decode metadata to get the filename
251-
metadata, err := m.lumeraClient.DecodeCascadeMetadata(ctx, action)
266+
metadata, err := m.lumeraClient.DecodeCascadeMetadata(taskCtx, action)
252267
if err != nil {
268+
cancel() // Clean up if metadata decode fails
253269
return "", fmt.Errorf("failed to decode cascade metadata: %w", err)
254270
}
255271

256272
// Ensure we have a filename from metadata
257273
if metadata.FileName == "" {
274+
cancel() // Clean up if no filename
258275
return "", fmt.Errorf("no filename found in cascade metadata")
259276
}
260277

@@ -263,7 +280,7 @@ func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, o
263280

264281
taskID := uuid.New().String()[:8]
265282

266-
m.logger.Debug(ctx, "Generated download task ID", "task_id", taskID, "final_output_path", finalOutputPath)
283+
m.logger.Debug(taskCtx, "Generated download task ID", "task_id", taskID, "final_output_path", finalOutputPath)
267284

268285
baseTask := BaseTask{
269286
TaskID: taskID,
@@ -280,22 +297,22 @@ func (m *ManagerImpl) CreateDownloadTask(ctx context.Context, actionID string, o
280297
// Use the final output path with the correct filename
281298
task := NewCascadeDownloadTask(baseTask, actionID, finalOutputPath, signature)
282299

283-
// Store task in cache
284-
m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID)
300+
// Store task in cache with cancel function
301+
m.taskCache.Set(taskCtx, taskID, task, TaskTypeCascade, actionID, cancel)
285302

286303
// Ensure task is stored before returning
287304
m.taskCache.Wait()
288305

289306
go func() {
290-
m.logger.Debug(ctx, "Starting download cascade task asynchronously", "taskID", taskID)
291-
err := task.Run(ctx)
307+
m.logger.Debug(taskCtx, "Starting download cascade task asynchronously", "taskID", taskID)
308+
err := task.Run(taskCtx)
292309
if err != nil {
293310
// Error handling is done via events in the task.Run method
294311
// This is just a failsafe in case something goes wrong
295-
m.logger.Error(ctx, "Download Cascade task failed with error", "taskID", taskID, "error", err)
312+
m.logger.Error(taskCtx, "Download Cascade task failed with error", "taskID", taskID, "error", err)
296313
}
297314
}()
298315

299-
m.logger.Info(ctx, "Download Cascade task created successfully", "taskID", taskID, "outputPath", finalOutputPath)
316+
m.logger.Info(taskCtx, "Download Cascade task created successfully", "taskID", taskID, "outputPath", finalOutputPath)
300317
return taskID, nil
301318
}

supernode/node/action/server/cascade/cascade_action_server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
7676
ctx := stream.Context()
7777
logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields)
7878

79+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
80+
7981
var (
8082
metadata *pb.Metadata
8183
totalSize int
@@ -130,6 +132,14 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
130132
}
131133
totalSize += len(x.Chunk.Data)
132134

135+
// Validate total size doesn't exceed limit
136+
if totalSize > maxFileSize {
137+
fields[logtrace.FieldError] = "file size exceeds 1GB limit"
138+
fields["total_size"] = totalSize
139+
logtrace.Error(ctx, "upload rejected: file too large", fields)
140+
return fmt.Errorf("file size %d exceeds maximum allowed size of 1GB", totalSize)
141+
}
142+
133143
logtrace.Info(ctx, "received data chunk", logtrace.Fields{
134144
"chunk_size": len(x.Chunk.Data),
135145
"total_size_so_far": totalSize,

supernode/node/supernode/server/server.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,16 @@ func (server *Server) Run(ctx context.Context) error {
5858
logtrace.Fatal(ctx, "Failed to setup gRPC server", logtrace.Fields{logtrace.FieldModule: "server", logtrace.FieldError: err.Error()})
5959
}
6060

61-
// Custom server options
61+
// Optimized for streaming 1GB files with 4MB chunks (10 concurrent streams)
6262
opts := grpcserver.DefaultServerOptions()
6363

64-
opts.MaxRecvMsgSize = 2 * 1024 * 1024 * 1024 // 2 GB
65-
opts.MaxSendMsgSize = 2 * 1024 * 1024 * 1024 // 2 GB
66-
opts.InitialWindowSize = 32 * 1024 * 1024 // 32 MB
67-
opts.InitialConnWindowSize = 32 * 1024 * 1024 // 32 MB
68-
opts.WriteBufferSize = 1024 * 1024 // 1 MB
69-
opts.ReadBufferSize = 1024 * 1024 // 1 MB
64+
opts.MaxRecvMsgSize = (16 * 1024 * 1024) // 16MB (supports 4MB chunks + overhead)
65+
opts.MaxSendMsgSize = (16 * 1024 * 1024) // 16MB for download streaming
66+
opts.InitialWindowSize = (16 * 1024 * 1024) // 16MB per stream (4x chunk size)
67+
opts.InitialConnWindowSize = (160 * 1024 * 1024) // 160MB (16MB x 10 streams)
68+
opts.MaxConcurrentStreams = 20 // Limit to prevent resource exhaustion
69+
opts.ReadBufferSize = (8 * 1024 * 1024) // 8MB TCP buffer
70+
opts.WriteBufferSize = (8 * 1024 * 1024) // 8MB TCP buffer
7071

7172
for _, address := range addresses {
7273
addr := net.JoinHostPort(strings.TrimSpace(address), strconv.Itoa(server.config.Port))
@@ -110,20 +111,20 @@ func (server *Server) setupGRPCServer() error {
110111
for _, service := range server.services {
111112
server.grpcServer.RegisterService(service.Desc(), service)
112113
server.healthServer.SetServingStatus(service.Desc().ServiceName, healthpb.HealthCheckResponse_SERVING)
113-
114+
114115
// Keep reference to SupernodeServer
115116
if ss, ok := service.(*SupernodeServer); ok {
116117
supernodeServer = ss
117118
}
118119
}
119-
120+
120121
// After all services are registered, update SupernodeServer with the list
121122
if supernodeServer != nil {
122123
// Register all custom services
123124
for _, svc := range server.services {
124125
supernodeServer.RegisterService(svc.Desc().ServiceName, svc.Desc())
125126
}
126-
127+
127128
// Also register the health service
128129
healthDesc := healthpb.Health_ServiceDesc
129130
supernodeServer.RegisterService(healthDesc.ServiceName, &healthDesc)

0 commit comments

Comments
 (0)