Skip to content

Commit 99823f9

Browse files
Optimize grpc settings for file size
1 parent fe92b83 commit 99823f9

File tree

6 files changed

+63
-12
lines changed

6 files changed

+63
-12
lines changed

sdk/adapters/supernodeservice/adapter.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func calculateOptimalChunkSize(fileSize int64) int {
7575
return chunkSize
7676
}
7777

78+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
79+
7880
func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error) {
7981
// Create the client stream
8082
ctx = net.AddCorrelationID(ctx)
@@ -102,6 +104,15 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
102104
}
103105
totalBytes := fileInfo.Size()
104106

107+
// Validate file size before starting upload
108+
if totalBytes > maxFileSize {
109+
a.logger.Error(ctx, "File exceeds maximum size limit",
110+
"filePath", in.FilePath,
111+
"fileSize", totalBytes,
112+
"maxSize", maxFileSize)
113+
return nil, fmt.Errorf("file size %d bytes exceeds maximum allowed size of 1GB", totalBytes)
114+
}
115+
105116
// Define adaptive chunk size based on file size
106117
chunkSize := calculateOptimalChunkSize(totalBytes)
107118

sdk/net/factory.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,17 @@ func NewClientFactory(ctx context.Context, logger log.Logger, keyring keyring.Ke
3636
logger.Debug(ctx, "Creating supernode client factory",
3737
"localAddress", config.LocalCosmosAddress)
3838

39+
// Optimized for streaming 1GB files with 4MB chunks (10 concurrent streams)
40+
opts := client.DefaultClientOptions()
41+
opts.MaxRecvMsgSize = 16 * 1024 * 1024 // 16MB to match server
42+
opts.MaxSendMsgSize = 16 * 1024 * 1024 // 16MB to match server
43+
opts.InitialWindowSize = 16 * 1024 * 1024 // 16MB per stream (4x chunk size)
44+
opts.InitialConnWindowSize = 160 * 1024 * 1024 // 160MB (16MB x 10 streams)
45+
3946
return &ClientFactory{
4047
logger: logger,
4148
keyring: keyring,
42-
clientOptions: client.DefaultClientOptions(),
49+
clientOptions: opts,
4350
config: config,
4451
lumeraClient: lumeraClient,
4552
}

sdk/task/cascade.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@ func NewCascadeTask(base BaseTask, filePath string, actionId string) *CascadeTas
3333

3434
// Run executes the full cascade‐task lifecycle.
3535
func (t *CascadeTask) Run(ctx context.Context) error {
36-
3736
t.LogEvent(ctx, event.SDKTaskStarted, "Running cascade task", nil)
3837

38+
// Validate file size before proceeding
39+
if err := ValidateFileSize(t.filePath); err != nil {
40+
t.LogEvent(ctx, event.SDKTaskFailed, "File validation failed", event.EventData{event.KeyError: err.Error()})
41+
return err
42+
}
43+
3944
// 1 - Fetch the supernodes
4045
supernodes, err := t.fetchSupernodes(ctx, t.Action.Height)
4146

sdk/task/helpers.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,29 @@ import (
44
"context"
55
"encoding/base64"
66
"fmt"
7+
"os"
78
"path/filepath"
89
"strings"
910

1011
"github.com/LumeraProtocol/supernode/sdk/adapters/lumera"
1112
)
1213

14+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
15+
16+
// ValidateFileSize checks if a file size is within the allowed 1GB limit
17+
func ValidateFileSize(filePath string) error {
18+
fileInfo, err := os.Stat(filePath)
19+
if err != nil {
20+
return fmt.Errorf("failed to check file: %w", err)
21+
}
22+
23+
if fileInfo.Size() > maxFileSize {
24+
return fmt.Errorf("file size %d bytes exceeds maximum allowed size of 1GB", fileInfo.Size())
25+
}
26+
27+
return nil
28+
}
29+
1330
func (m *ManagerImpl) validateAction(ctx context.Context, actionID string) (lumera.Action, error) {
1431
action, err := m.lumeraClient.GetAction(ctx, actionID)
1532
if err != nil {

supernode/node/action/server/cascade/cascade_action_server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
7676
ctx := stream.Context()
7777
logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields)
7878

79+
const maxFileSize = 1 * 1024 * 1024 * 1024 // 1GB limit
80+
7981
var (
8082
metadata *pb.Metadata
8183
totalSize int
@@ -130,6 +132,14 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
130132
}
131133
totalSize += len(x.Chunk.Data)
132134

135+
// Validate total size doesn't exceed limit
136+
if totalSize > maxFileSize {
137+
fields[logtrace.FieldError] = "file size exceeds 1GB limit"
138+
fields["total_size"] = totalSize
139+
logtrace.Error(ctx, "upload rejected: file too large", fields)
140+
return fmt.Errorf("file size %d exceeds maximum allowed size of 1GB", totalSize)
141+
}
142+
133143
logtrace.Info(ctx, "received data chunk", logtrace.Fields{
134144
"chunk_size": len(x.Chunk.Data),
135145
"total_size_so_far": totalSize,

supernode/node/supernode/server/server.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,16 @@ func (server *Server) Run(ctx context.Context) error {
5858
logtrace.Fatal(ctx, "Failed to setup gRPC server", logtrace.Fields{logtrace.FieldModule: "server", logtrace.FieldError: err.Error()})
5959
}
6060

61-
// Custom server options
61+
// Optimized for streaming 1GB files with 4MB chunks (10 concurrent streams)
6262
opts := grpcserver.DefaultServerOptions()
6363

64-
opts.MaxRecvMsgSize = 2 * 1024 * 1024 * 1024 // 2 GB
65-
opts.MaxSendMsgSize = 2 * 1024 * 1024 * 1024 // 2 GB
66-
opts.InitialWindowSize = 32 * 1024 * 1024 // 32 MB
67-
opts.InitialConnWindowSize = 32 * 1024 * 1024 // 32 MB
68-
opts.WriteBufferSize = 1024 * 1024 // 1 MB
69-
opts.ReadBufferSize = 1024 * 1024 // 1 MB
64+
opts.MaxRecvMsgSize = (16 * 1024 * 1024) // 16MB (supports 4MB chunks + overhead)
65+
opts.MaxSendMsgSize = (16 * 1024 * 1024) // 16MB for download streaming
66+
opts.InitialWindowSize = (16 * 1024 * 1024) // 16MB per stream (4x chunk size)
67+
opts.InitialConnWindowSize = (160 * 1024 * 1024) // 160MB (16MB x 10 streams)
68+
opts.MaxConcurrentStreams = 20 // Limit to prevent resource exhaustion
69+
opts.ReadBufferSize = (8 * 1024 * 1024) // 8MB TCP buffer
70+
opts.WriteBufferSize = (8 * 1024 * 1024) // 8MB TCP buffer
7071

7172
for _, address := range addresses {
7273
addr := net.JoinHostPort(strings.TrimSpace(address), strconv.Itoa(server.config.Port))
@@ -110,20 +111,20 @@ func (server *Server) setupGRPCServer() error {
110111
for _, service := range server.services {
111112
server.grpcServer.RegisterService(service.Desc(), service)
112113
server.healthServer.SetServingStatus(service.Desc().ServiceName, healthpb.HealthCheckResponse_SERVING)
113-
114+
114115
// Keep reference to SupernodeServer
115116
if ss, ok := service.(*SupernodeServer); ok {
116117
supernodeServer = ss
117118
}
118119
}
119-
120+
120121
// After all services are registered, update SupernodeServer with the list
121122
if supernodeServer != nil {
122123
// Register all custom services
123124
for _, svc := range server.services {
124125
supernodeServer.RegisterService(svc.Desc().ServiceName, svc.Desc())
125126
}
126-
127+
127128
// Also register the health service
128129
healthDesc := healthpb.Health_ServiceDesc
129130
supernodeServer.RegisterService(healthDesc.ServiceName, &healthDesc)

0 commit comments

Comments
 (0)