Skip to content

Commit 0b1f00b

Browse files
updates
1 parent b9acf47 commit 0b1f00b

File tree

7 files changed

+103
-78
lines changed

7 files changed

+103
-78
lines changed

sdk/task/cascade.go

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,13 @@ package task
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"sync"
86
"time"
97

10-
"github.com/LumeraProtocol/supernode/pkg/logtrace"
118
"github.com/LumeraProtocol/supernode/sdk/adapters/lumera"
129
"github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice"
1310
"github.com/LumeraProtocol/supernode/sdk/event"
1411
"github.com/LumeraProtocol/supernode/sdk/net"
15-
"golang.org/x/sync/errgroup"
16-
"google.golang.org/grpc/health/grpc_health_v1"
1712
)
1813

1914
const (
@@ -63,66 +58,6 @@ func (t *CascadeTask) Run(ctx context.Context) error {
6358
return nil
6459
}
6560

66-
func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) {
67-
sns, err := t.client.GetSupernodes(ctx, height)
68-
if err != nil {
69-
return nil, fmt.Errorf("fetch supernodes: %w", err)
70-
}
71-
72-
if len(sns) == 0 {
73-
return nil, errors.New("no supernodes found")
74-
}
75-
76-
if len(sns) > 10 {
77-
sns = sns[:10]
78-
}
79-
80-
// Keep only SERVING nodes (done in parallel – keeps latency flat)
81-
healthy := make(lumera.Supernodes, 0, len(sns))
82-
eg, ctx := errgroup.WithContext(ctx)
83-
mu := sync.Mutex{}
84-
85-
for _, sn := range sns {
86-
sn := sn
87-
eg.Go(func() error {
88-
if t.isServing(ctx, sn) {
89-
mu.Lock()
90-
healthy = append(healthy, sn)
91-
mu.Unlock()
92-
}
93-
return nil
94-
})
95-
}
96-
if err := eg.Wait(); err != nil {
97-
return nil, fmt.Errorf("health-check goroutines: %w", err)
98-
}
99-
100-
if len(healthy) == 0 {
101-
return nil, errors.New("no healthy supernodes found")
102-
}
103-
104-
return healthy, nil
105-
}
106-
107-
// isServing pings the super-node once with a short timeout.
108-
func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool {
109-
ctx, cancel := context.WithTimeout(parent, connectionTimeout)
110-
defer cancel()
111-
112-
client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{
113-
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
114-
PeerType: t.config.Account.PeerType,
115-
}).CreateClient(ctx, sn)
116-
if err != nil {
117-
logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"})
118-
return false
119-
}
120-
defer client.Close(ctx)
121-
122-
resp, err := client.HealthCheck(ctx)
123-
return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING
124-
}
125-
12661
func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error {
12762
factoryCfg := net.FactoryConfig{
12863
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,

sdk/task/download.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
// task/download.go (NEW FILE)
21
package task
32

43
import (
@@ -18,12 +17,17 @@ const (
1817
)
1918

2019
type CascadeDownloadTask struct {
21-
CascadeTask
20+
BaseTask
21+
actionId string
22+
outputPath string
2223
}
2324

24-
// NewCascadeDownloadTask creates a new cascade download task
25-
func NewCascadeDownloadTask(cascadeTask *CascadeTask) *CascadeDownloadTask {
26-
return &CascadeDownloadTask{CascadeTask: *cascadeTask}
25+
func NewCascadeDownloadTask(base BaseTask, actionId string, outputPath string) *CascadeDownloadTask {
26+
return &CascadeDownloadTask{
27+
BaseTask: base,
28+
actionId: actionId,
29+
outputPath: outputPath,
30+
}
2731
}
2832

2933
func (t *CascadeDownloadTask) Run(ctx context.Context) error {
@@ -56,7 +60,9 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
5660
clientFactory := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, factoryCfg)
5761

5862
req := &supernodeservice.CascadeSupernodeDownloadRequest{
59-
ActionID: t.actionId,
63+
ActionID: t.actionId,
64+
TaskID: t.TaskID,
65+
OutputPath: t.outputPath,
6066
}
6167

6268
var lastErr error

sdk/task/helpers.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ func (m *ManagerImpl) validateDownloadAction(ctx context.Context, actionID strin
9292
return lumera.Action{}, fmt.Errorf("no action found with the specified ID")
9393
}
9494

95-
// Check action state
9695
if action.State != lumera.ACTION_STATE_DONE {
97-
return lumera.Action{}, fmt.Errorf("action is in %s state, expected PENDING", action.State)
96+
return lumera.Action{}, fmt.Errorf("action is in %s state, expected DONE", action.State)
9897
}
9998

10099
return action, nil

sdk/task/manager.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (m *ManagerImpl) CreateDownloadTask(
243243

244244
taskID := uuid.New().String()[:8]
245245

246-
m.logger.Debug(ctx, "Generated task ID", "task_id", taskID)
246+
m.logger.Debug(ctx, "Generated download task ID", "task_id", taskID)
247247

248248
baseTask := BaseTask{
249249
TaskID: taskID,
@@ -257,8 +257,7 @@ func (m *ManagerImpl) CreateDownloadTask(
257257
logger: m.logger,
258258
}
259259

260-
// Create cascade-specific task
261-
task := NewCascadeDownloadTask(NewCascadeTask(baseTask, outputPath, actionID))
260+
task := NewCascadeDownloadTask(baseTask, actionID, outputPath)
262261

263262
// Store task in cache
264263
m.taskCache.Set(ctx, taskID, task, TaskTypeCascade, actionID)

sdk/task/task.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,18 @@ package task
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
58

9+
"github.com/LumeraProtocol/supernode/pkg/errgroup"
10+
"github.com/LumeraProtocol/supernode/pkg/logtrace"
611
"github.com/LumeraProtocol/supernode/sdk/adapters/lumera"
712
"github.com/LumeraProtocol/supernode/sdk/config"
813
"github.com/LumeraProtocol/supernode/sdk/event"
914
"github.com/LumeraProtocol/supernode/sdk/log"
15+
"github.com/LumeraProtocol/supernode/sdk/net"
16+
"google.golang.org/grpc/health/grpc_health_v1"
1017

1118
"github.com/cosmos/cosmos-sdk/crypto/keyring"
1219
)
@@ -67,3 +74,63 @@ func (t *BaseTask) LogEvent(ctx context.Context, evt event.EventType, msg string
6774
t.logger.Info(ctx, msg, kvs...)
6875
t.emitEvent(ctx, evt, additionalInfo)
6976
}
77+
78+
func (t *BaseTask) fetchSupernodes(ctx context.Context, height int64) (lumera.Supernodes, error) {
79+
sns, err := t.client.GetSupernodes(ctx, height)
80+
if err != nil {
81+
return nil, fmt.Errorf("fetch supernodes: %w", err)
82+
}
83+
84+
if len(sns) == 0 {
85+
return nil, errors.New("no supernodes found")
86+
}
87+
88+
if len(sns) > 10 {
89+
sns = sns[:10]
90+
}
91+
92+
// Keep only SERVING nodes (done in parallel – keeps latency flat)
93+
healthy := make(lumera.Supernodes, 0, len(sns))
94+
eg, ctx := errgroup.WithContext(ctx)
95+
mu := sync.Mutex{}
96+
97+
for _, sn := range sns {
98+
sn := sn
99+
eg.Go(func() error {
100+
if t.isServing(ctx, sn) {
101+
mu.Lock()
102+
healthy = append(healthy, sn)
103+
mu.Unlock()
104+
}
105+
return nil
106+
})
107+
}
108+
if err := eg.Wait(); err != nil {
109+
return nil, fmt.Errorf("health-check goroutines: %w", err)
110+
}
111+
112+
if len(healthy) == 0 {
113+
return nil, errors.New("no healthy supernodes found")
114+
}
115+
116+
return healthy, nil
117+
}
118+
119+
// isServing pings the super-node once with a short timeout.
120+
func (t *BaseTask) isServing(parent context.Context, sn lumera.Supernode) bool {
121+
ctx, cancel := context.WithTimeout(parent, connectionTimeout)
122+
defer cancel()
123+
124+
client, err := net.NewClientFactory(ctx, t.logger, t.keyring, t.client, net.FactoryConfig{
125+
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
126+
PeerType: t.config.Account.PeerType,
127+
}).CreateClient(ctx, sn)
128+
if err != nil {
129+
logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{logtrace.FieldMethod: "isServing"})
130+
return false
131+
}
132+
defer client.Close(ctx)
133+
134+
resp, err := client.HealthCheck(ctx)
135+
return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING
136+
}

supernode/services/cascade/download.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,25 @@ func (task *CascadeRegistrationTask) Download(
7373
}
7474

7575
func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields) ([]byte, error) {
76+
77+
logtrace.Info(ctx, "downloading artifacts", fields)
7678
var layout codec.Layout
7779
for _, rqID := range metadata.RqIdsIds {
7880
rqIDFile, err := task.P2PClient.Retrieve(ctx, rqID)
81+
logtrace.Info(ctx, "RQIs", logtrace.Fields{
82+
"rqID": rqID,
83+
"rqIDFile": rqIDFile,
84+
})
7985
if err != nil || len(rqIDFile) == 0 {
8086
continue
8187
}
8288

8389
layout, _, _, err = parseRQMetadataFile(rqIDFile)
8490

91+
logtrace.Info(ctx, "layout parsed", logtrace.Fields{
92+
"rqID": rqID,
93+
"layout": layout})
94+
8595
if len(layout.Blocks) == 0 {
8696
logtrace.Info(ctx, "no symbols found in RQ metadata", fields)
8797
continue

tests/system/e2e_cascade_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ func TestCascadeE2E(t *testing.T) {
364364
sut.AwaitNextBlock(t)
365365

366366
// Verify the account can be queried with its public key
367-
accountResp := cli.CustomQuery("q", "auth", "account", userAddress)
368-
require.Contains(t, accountResp, "public_key", "User account public key should be available")
367+
// accountResp := cli.CustomQuery("q", "auth", "account", userAddress)
368+
// require.Contains(t, accountResp, "public_key", "User account public key should be available")
369369

370370
// Extract transaction hash from response for verification
371371
txHash := txresp.TxHash
@@ -557,6 +557,15 @@ func TestCascadeE2E(t *testing.T) {
557557
require.NotEmpty(t, fromAddress, "Spender address should not be empty")
558558
require.NotEmpty(t, toAddress, "Receiver address should not be empty")
559559
require.Equal(t, price, amount, "Payment amount should match action price")
560+
time.Sleep(1 * time.Minute)
561+
562+
// Try to download the file using the action ID
563+
dtaskID, err := actionClient.DownloadCascade(ctx, actionID, "")
564+
565+
t.Logf("Download response: %s", dtaskID)
566+
require.NoError(t, err, "Failed to download cascade data using action ID")
567+
568+
time.Sleep(30 * time.Second) // Wait to ensure all events are processed
560569

561570
t.Log("Test completed successfully!")
562571
}

0 commit comments

Comments
 (0)