diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index c88b8919..03d5e9ba 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -153,6 +153,7 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string // Convert the map to a slice for _, node := range mapNodes { + node.Port = node.Port + 1 hID, _ := utils.Blake3Hash(node.ID) node.HashedID = hID fmt.Println("node adding", node.String(), "hashed id", string(node.HashedID)) diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index d666574e..792fa887 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -5,12 +5,16 @@ import ( "errors" "fmt" "strings" + "sync" "time" + "github.com/LumeraProtocol/supernode/pkg/logtrace" "github.com/LumeraProtocol/supernode/sdk/adapters/lumera" "github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice" "github.com/LumeraProtocol/supernode/sdk/event" "github.com/LumeraProtocol/supernode/sdk/net" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/health/grpc_health_v1" ) const ( @@ -90,60 +94,62 @@ func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera if err != nil { return nil, fmt.Errorf("fetch supernodes: %w", err) } - // t.logger.Info(ctx, "Supernodes fetched", "count", len(sns)) - - // if len(sns) == 0 { - // return nil, errors.New("no supernodes found") - // } - - // if len(sns) > 10 { - // sns = sns[:10] - // } - - // // Keep only SERVING nodes (done in parallel – keeps latency flat) - // healthy := make(lumera.Supernodes, 0, len(sns)) - // eg, ctx := errgroup.WithContext(ctx) - // mu := sync.Mutex{} - - // for _, sn := range sns { - // sn := sn - // eg.Go(func() error { - // if t.isServing(ctx, sn) { - // mu.Lock() - // healthy = append(healthy, sn) - // mu.Unlock() - // } - // return nil - // }) - // } - // if err := eg.Wait(); err != nil { - // return nil, fmt.Errorf("health-check goroutines: %w", err) - // } - - // if len(healthy) == 0 { - // return nil, errors.New("no healthy supernodes found") - // } - // t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy)) + t.logger.Info(ctx, "Supernodes fetched", "count", len(sns)) + + if len(sns) == 0 { + return nil, errors.New("no supernodes found") + } + + if len(sns) > 10 { + sns = sns[:10] + } + + // Keep only SERVING nodes (done in parallel – keeps latency flat) + healthy := make(lumera.Supernodes, 0, len(sns)) + eg, ctx := errgroup.WithContext(ctx) + mu := sync.Mutex{} + + for _, sn := range sns { + sn := sn + eg.Go(func() error { + if t.isServing(ctx, sn) { + mu.Lock() + healthy = append(healthy, sn) + mu.Unlock() + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("health-check goroutines: %w", err) + } + + if len(healthy) == 0 { + return nil, errors.New("no healthy supernodes found") + } + t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy)) return sns, nil } -// // isServing pings the super-node once with a short timeout. -// func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool { -// ctx, cancel := context.WithTimeout(parent, connectionTimeout) -// defer cancel() - -// client, err := net.NewClientFactory(ctx, t.logger, nil, net.FactoryConfig{ -// LocalCosmosAddress: t.config.Account.LocalCosmosAddress, -// }).CreateClient(ctx, sn) -// if err != nil { -// logtrace -// } -// defer client.Close(ctx) - -// resp, err := client.HealthCheck(ctx) -// return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING -// } +// isServing pings the super-node once with a short timeout. +func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool { + ctx, cancel := context.WithTimeout(parent, connectionTimeout) + defer cancel() + + client, err := net.NewClientFactory(ctx, t.logger, t.keyring, net.FactoryConfig{ + LocalCosmosAddress: t.config.Account.LocalCosmosAddress, + }).CreateClient(ctx, sn) + if err != nil { + logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{ + logtrace.FieldMethod: "isServing"}) + return false + } + defer client.Close(ctx) + + resp, err := client.HealthCheck(ctx) + return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING +} func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error { factoryCfg := net.FactoryConfig{ diff --git a/supernode/services/cascade/register.go b/supernode/services/cascade/register.go index f6f46542..003b9a6c 100644 --- a/supernode/services/cascade/register.go +++ b/supernode/services/cascade/register.go @@ -3,7 +3,6 @@ package cascade import ( "context" "fmt" - "time" "github.com/LumeraProtocol/supernode/pkg/logtrace" ) @@ -88,7 +87,6 @@ func (task *CascadeRegistrationTask) Register(ctx context.Context, req *Register } /* 9. Persist artefacts -------------------------------------------------------- */ - time.Sleep(time.Second * 30) if err := task.storeArtefacts(ctx, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil { return nil, err }