Skip to content

Commit 36acc70

Browse files
feat: enhance supernode functionality and improve installation script
- Added optional parameters for Lumera binary path and installation mode in Makefile. - Refactored GetSupernodeStatus to return a pointer type for better handling. - Improved error handling and logging in health check and download processes. - Implemented concurrent download attempts from supernodes for efficiency. - Updated installation script to verify binary installation and handle different modes. - Enhanced tests to validate supernode status and downloaded files.
1 parent 8238151 commit 36acc70

File tree

14 files changed

+356
-138
lines changed

14 files changed

+356
-138
lines changed

Makefile

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,16 @@ CONFIG_FILE3=tests/system/config.test-3.yml
6161
SETUP_SCRIPT=tests/scripts/setup-supernodes.sh
6262

6363
# Install Lumera
64+
# Optional: specify lumera binary path to skip download
65+
LUMERAD_BINARY ?=
66+
# Optional: specify installation mode (latest-release, latest-tag, or vX.Y.Z)
67+
INSTALL_MODE ?=latest-tag
68+
6469
install-lumera:
6570
@echo "Installing Lumera..."
6671
@chmod +x tests/scripts/install-lumera.sh
67-
@sudo tests/scripts/install-lumera.sh latest-tag
68-
72+
@sudo LUMERAD_BINARY="$(LUMERAD_BINARY)" tests/scripts/install-lumera.sh $(INSTALL_MODE)
73+
@echo "PtTDUHythfRfXHh63yzyiGDid4TZj2P76Zd,18749999981413" > ~/claims.csv
6974
# Setup supernode environments
7075
setup-supernodes:
7176
@echo "Setting up all supernode environments..."

p2p/kademlia/bootstrap.go

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kademlia
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"strconv"
87
"strings"
98
"sync"
@@ -34,11 +33,6 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
3433
return nil, errors.New("empty address")
3534
}
3635

37-
/*if strings.Contains(extP2P, "0.0.0.0") {
38-
fmt.Println("skippping node")
39-
return nil, errors.New("invalid address")
40-
}*/
41-
4236
if extP2P == selfAddr {
4337
return nil, errors.New("self address")
4438
}
@@ -61,24 +55,12 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
6155
if err != nil {
6256
return nil, errors.New("invalid port number")
6357
}
64-
65-
// For system testing, use port+1 if SYSTEM_TEST=true
66-
if os.Getenv("SYSTEM_TEST") == "true" {
67-
port = uint16(portNum) + 1
68-
logtrace.Info(context.Background(), "Using port+1 for system testing", logtrace.Fields{
69-
logtrace.FieldModule: "p2p",
70-
"original_port": portNum,
71-
"adjusted_port": port,
72-
})
73-
} else {
74-
// For normal P2P operation, always use the default port
75-
port = defaultNetworkPort
76-
}
58+
port = uint16(portNum)
7759
}
7860
} else {
7961
// No port in the address
8062
ip = extP2P
81-
port = defaultNetworkPort
63+
port = defaultSuperNodeP2PPort
8264
}
8365

8466
if ip == "" {
@@ -170,21 +152,40 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
170152
continue
171153
}
172154

173-
// Parse the node from the IP address
174-
node, err := s.parseNode(latestIP, selfAddress)
155+
// Extract IP from the address (remove port if present)
156+
var ip string
157+
if idx := strings.LastIndex(latestIP, ":"); idx != -1 {
158+
ip = latestIP[:idx]
159+
} else {
160+
ip = latestIP
161+
}
162+
163+
// Use p2p_port from supernode record
164+
p2pPort := defaultSuperNodeP2PPort
165+
if supernode.P2PPort != "" {
166+
if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil {
167+
p2pPort = int(port)
168+
}
169+
}
170+
171+
// Create full address with p2p port for validation
172+
fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort)
173+
174+
// Parse the node from the full address
175+
node, err := s.parseNode(fullAddress, selfAddress)
175176
if err != nil {
176177
logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{
177178
logtrace.FieldModule: "p2p",
178179
logtrace.FieldError: err.Error(),
179-
"address": latestIP,
180+
"address": fullAddress,
180181
"supernode": supernode.SupernodeAccount,
181182
})
182183
continue
183184
}
184185

185186
// Store the supernode account as the node ID
186187
node.ID = []byte(supernode.SupernodeAccount)
187-
mapNodes[latestIP] = node
188+
mapNodes[fullAddress] = node
188189
}
189190

190191
// Convert the map to a slice

sdk/action/client.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,16 @@ import (
1919
//
2020
//go:generate mockery --name=Client --output=testutil/mocks --outpkg=mocks --filename=client_mock.go
2121
type Client interface {
22-
// - signature: Base64-encoded cryptographic signature of the file's data hash (blake3)
23-
// 1- hash(blake3) > 2- sign > 3- base64
24-
// The signature must be created by the same account that created the Lumera action.
25-
// It must be a digital signature of the data hash found in the action's CASCADE metadata.
22+
// StartCascade initiates a cascade operation with file path, action ID, and signature
23+
// signature: Base64-encoded signature of file's blake3 hash by action creator
2624
StartCascade(ctx context.Context, filePath string, actionID string, signature string) (string, error)
2725
DeleteTask(ctx context.Context, taskID string) error
2826
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
2927
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error
3028
SubscribeToAllEvents(ctx context.Context, handler event.Handler) error
31-
GetSupernodeStatus(ctx context.Context, supernodeAddress string) (supernodeservice.SupernodeStatusresponse, error)
32-
DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error)
29+
GetSupernodeStatus(ctx context.Context, supernodeAddress string) (*supernodeservice.SupernodeStatusresponse, error)
30+
// DownloadCascade downloads cascade to outputDir, filename determined by action ID
31+
DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error)
3332
}
3433

3534
// ClientImpl implements the Client interface
@@ -152,7 +151,7 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han
152151
}
153152

154153
// GetSupernodeStatus retrieves the status of a specific supernode by its address
155-
func (c *ClientImpl) GetSupernodeStatus(ctx context.Context, supernodeAddress string) (supernodeservice.SupernodeStatusresponse, error) {
154+
func (c *ClientImpl) GetSupernodeStatus(ctx context.Context, supernodeAddress string) (*supernodeservice.SupernodeStatusresponse, error) {
156155
if supernodeAddress == "" {
157156
c.logger.Error(ctx, "Empty supernode address provided")
158157
return nil, fmt.Errorf("supernode address cannot be empty")
@@ -205,16 +204,14 @@ func (c *ClientImpl) GetSupernodeStatus(ctx context.Context, supernodeAddress st
205204
c.logger.Info(ctx, "Successfully retrieved supernode status", "address", supernodeAddress)
206205
return status, nil
207206
}
208-
func (c *ClientImpl) DownloadCascade(
209-
ctx context.Context,
210-
actionID, outputPath string,
211-
) (string, error) {
207+
208+
func (c *ClientImpl) DownloadCascade(ctx context.Context, actionID, outputDir string) (string, error) {
212209

213210
if actionID == "" {
214211
return "", fmt.Errorf("actionID is empty")
215212
}
216213

217-
taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath)
214+
taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputDir)
218215
if err != nil {
219216
return "", fmt.Errorf("create download task: %w", err)
220217
}

sdk/adapters/supernodeservice/adapter.go

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"os"
8+
"path/filepath"
89

910
"github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
1011
"github.com/LumeraProtocol/supernode/pkg/net"
@@ -168,12 +169,12 @@ func (a *cascadeAdapter) GetSupernodeStatus(ctx context.Context) (SupernodeStatu
168169
resp, err := a.client.HealthCheck(ctx, &cascade.HealthCheckRequest{})
169170
if err != nil {
170171
a.logger.Error(ctx, "Failed to get supernode status", "error", err)
171-
return nil, fmt.Errorf("failed to get supernode status: %w", err)
172+
return SupernodeStatusresponse{}, fmt.Errorf("failed to get supernode status: %w", err)
172173
}
173174

174175
a.logger.Debug(ctx, "Supernode status retrieved", "status", resp)
175176

176-
return resp, nil
177+
return *toSdkSupernodeStatus(resp), nil
177178
}
178179

179180
// CascadeSupernodeDownload downloads a file from a supernode gRPC stream
@@ -190,16 +191,20 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
190191
ActionId: in.ActionID,
191192
}, opts...)
192193
if err != nil {
193-
a.logger.Error(ctx, "failed to create download stream",
194-
"action_id", in.ActionID, "error", err)
194+
a.logger.Error(ctx, "failed to create download stream", "action_id", in.ActionID, "error", err)
195195
return nil, err
196196
}
197197

198198
// 2. Prepare destination file
199+
// Create directory structure if it doesn't exist
200+
if err := os.MkdirAll(filepath.Dir(in.OutputPath), 0755); err != nil {
201+
a.logger.Error(ctx, "failed to create output directory", "path", filepath.Dir(in.OutputPath), "error", err)
202+
return nil, fmt.Errorf("create output directory: %w", err)
203+
}
204+
199205
outFile, err := os.Create(in.OutputPath)
200206
if err != nil {
201-
a.logger.Error(ctx, "failed to create output file",
202-
"path", in.OutputPath, "error", err)
207+
a.logger.Error(ctx, "failed to create output file", "path", in.OutputPath, "error", err)
203208
return nil, fmt.Errorf("create output file: %w", err)
204209
}
205210
defer outFile.Close()
@@ -223,10 +228,7 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
223228

224229
// 3a. Progress / event message
225230
case *cascade.DownloadResponse_Event:
226-
a.logger.Info(ctx, "supernode event",
227-
"event_type", x.Event.EventType,
228-
"message", x.Event.Message,
229-
"action_id", in.ActionID)
231+
a.logger.Info(ctx, "supernode event", "event_type", x.Event.EventType, "message", x.Event.Message, "action_id", in.ActionID)
230232

231233
if in.EventLogger != nil {
232234
in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{
@@ -249,17 +251,11 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
249251
bytesWritten += int64(len(data))
250252
chunkIndex++
251253

252-
a.logger.Debug(ctx, "received chunk",
253-
"chunk_index", chunkIndex,
254-
"chunk_size", len(data),
255-
"bytes_written", bytesWritten)
254+
a.logger.Debug(ctx, "received chunk", "chunk_index", chunkIndex, "chunk_size", len(data), "bytes_written", bytesWritten)
256255
}
257256
}
258257

259-
a.logger.Info(ctx, "download complete",
260-
"bytes_written", bytesWritten,
261-
"path", in.OutputPath,
262-
"action_id", in.ActionID)
258+
a.logger.Info(ctx, "download complete", "bytes_written", bytesWritten, "path", in.OutputPath, "action_id", in.ActionID)
263259

264260
return &CascadeSupernodeDownloadResponse{
265261
Success: true,
@@ -299,3 +295,25 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
299295
return event.SupernodeUnknown
300296
}
301297
}
298+
299+
func toSdkSupernodeStatus(resp *cascade.HealthCheckResponse) *SupernodeStatusresponse {
300+
result := &SupernodeStatusresponse{
301+
TasksInProgress: resp.TasksInProgress,
302+
}
303+
304+
// Convert CPU data
305+
if resp.Cpu != nil {
306+
result.CPU.Usage = resp.Cpu.Usage
307+
result.CPU.Remaining = resp.Cpu.Remaining
308+
}
309+
310+
// Convert Memory data
311+
if resp.Memory != nil {
312+
result.Memory.Total = resp.Memory.Total
313+
result.Memory.Used = resp.Memory.Used
314+
result.Memory.Available = resp.Memory.Available
315+
result.Memory.UsedPerc = resp.Memory.UsedPerc
316+
}
317+
318+
return result
319+
}

sdk/adapters/supernodeservice/types.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package supernodeservice
33
import (
44
"context"
55

6-
"github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
76
"google.golang.org/grpc"
87

98
"github.com/LumeraProtocol/supernode/sdk/event"
@@ -29,7 +28,19 @@ type CascadeSupernodeRegisterResponse struct {
2928
TxHash string
3029
}
3130

32-
type SupernodeStatusresponse *cascade.HealthCheckResponse
31+
type SupernodeStatusresponse struct {
32+
CPU struct {
33+
Usage string
34+
Remaining string
35+
}
36+
Memory struct {
37+
Total uint64
38+
Used uint64
39+
Available uint64
40+
UsedPerc float64
41+
}
42+
TasksInProgress []string
43+
}
3344
type CascadeSupernodeDownloadRequest struct {
3445
ActionID string
3546
TaskID string

sdk/net/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type SupernodeClient interface {
1515
// HealthCheck performs a health check on the supernode
1616
HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error)
1717

18-
GetSupernodeStatus(ctx context.Context) (supernodeservice.SupernodeStatusresponse, error)
18+
GetSupernodeStatus(ctx context.Context) (*supernodeservice.SupernodeStatusresponse, error)
1919
// Download downloads the cascade action file
2020
Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error)
2121

sdk/net/impl.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,14 @@ func (c *supernodeClient) HealthCheck(ctx context.Context) (*grpc_health_v1.Heal
129129
return resp, nil
130130
}
131131

132-
func (c *supernodeClient) GetSupernodeStatus(ctx context.Context) (supernodeservice.SupernodeStatusresponse, error) {
132+
func (c *supernodeClient) GetSupernodeStatus(ctx context.Context) (*supernodeservice.SupernodeStatusresponse, error) {
133133
resp, err := c.cascadeClient.GetSupernodeStatus(ctx)
134134
if err != nil {
135135
return nil, fmt.Errorf("failed to get supernode status: %w", err)
136136
}
137137

138138
c.logger.Debug(ctx, "Supernode status retrieved successfully")
139-
return resp, nil
139+
return &resp, nil
140140
}
141141

142142
// Download downloads the cascade action file

0 commit comments

Comments
 (0)