Skip to content

Commit 9a1595a

Browse files
Add healthcheck (#47)
1 parent e2cc711 commit 9a1595a

File tree

3 files changed

+57
-52
lines changed

3 files changed

+57
-52
lines changed

p2p/kademlia/bootstrap.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
153153

154154
// Convert the map to a slice
155155
for _, node := range mapNodes {
156+
node.Port = node.Port + 1
156157
hID, _ := utils.Blake3Hash(node.ID)
157158
node.HashedID = hID
158159
fmt.Println("node adding", node.String(), "hashed id", string(node.HashedID))

sdk/task/cascade.go

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ import (
55
"errors"
66
"fmt"
77
"strings"
8+
"sync"
89
"time"
910

11+
"github.com/LumeraProtocol/supernode/pkg/logtrace"
1012
"github.com/LumeraProtocol/supernode/sdk/adapters/lumera"
1113
"github.com/LumeraProtocol/supernode/sdk/adapters/supernodeservice"
1214
"github.com/LumeraProtocol/supernode/sdk/event"
1315
"github.com/LumeraProtocol/supernode/sdk/net"
16+
"golang.org/x/sync/errgroup"
17+
"google.golang.org/grpc/health/grpc_health_v1"
1418
)
1519

1620
const (
@@ -85,60 +89,62 @@ func (t *CascadeTask) fetchSupernodes(ctx context.Context, height int64) (lumera
8589
if err != nil {
8690
return nil, fmt.Errorf("fetch supernodes: %w", err)
8791
}
88-
// t.logger.Info(ctx, "Supernodes fetched", "count", len(sns))
89-
90-
// if len(sns) == 0 {
91-
// return nil, errors.New("no supernodes found")
92-
// }
93-
94-
// if len(sns) > 10 {
95-
// sns = sns[:10]
96-
// }
97-
98-
// // Keep only SERVING nodes (done in parallel – keeps latency flat)
99-
// healthy := make(lumera.Supernodes, 0, len(sns))
100-
// eg, ctx := errgroup.WithContext(ctx)
101-
// mu := sync.Mutex{}
102-
103-
// for _, sn := range sns {
104-
// sn := sn
105-
// eg.Go(func() error {
106-
// if t.isServing(ctx, sn) {
107-
// mu.Lock()
108-
// healthy = append(healthy, sn)
109-
// mu.Unlock()
110-
// }
111-
// return nil
112-
// })
113-
// }
114-
// if err := eg.Wait(); err != nil {
115-
// return nil, fmt.Errorf("health-check goroutines: %w", err)
116-
// }
117-
118-
// if len(healthy) == 0 {
119-
// return nil, errors.New("no healthy supernodes found")
120-
// }
121-
// t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy))
92+
t.logger.Info(ctx, "Supernodes fetched", "count", len(sns))
93+
94+
if len(sns) == 0 {
95+
return nil, errors.New("no supernodes found")
96+
}
97+
98+
if len(sns) > 10 {
99+
sns = sns[:10]
100+
}
101+
102+
// Keep only SERVING nodes (done in parallel – keeps latency flat)
103+
healthy := make(lumera.Supernodes, 0, len(sns))
104+
eg, ctx := errgroup.WithContext(ctx)
105+
mu := sync.Mutex{}
106+
107+
for _, sn := range sns {
108+
sn := sn
109+
eg.Go(func() error {
110+
if t.isServing(ctx, sn) {
111+
mu.Lock()
112+
healthy = append(healthy, sn)
113+
mu.Unlock()
114+
}
115+
return nil
116+
})
117+
}
118+
if err := eg.Wait(); err != nil {
119+
return nil, fmt.Errorf("health-check goroutines: %w", err)
120+
}
121+
122+
if len(healthy) == 0 {
123+
return nil, errors.New("no healthy supernodes found")
124+
}
125+
t.logger.Info(ctx, "Healthy supernodes", "count", len(healthy))
122126

123127
return sns, nil
124128
}
125129

126-
// // isServing pings the super-node once with a short timeout.
127-
// func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool {
128-
// ctx, cancel := context.WithTimeout(parent, connectionTimeout)
129-
// defer cancel()
130-
131-
// client, err := net.NewClientFactory(ctx, t.logger, nil, net.FactoryConfig{
132-
// LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
133-
// }).CreateClient(ctx, sn)
134-
// if err != nil {
135-
// logtrace
136-
// }
137-
// defer client.Close(ctx)
138-
139-
// resp, err := client.HealthCheck(ctx)
140-
// return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING
141-
// }
130+
// isServing pings the super-node once with a short timeout.
131+
func (t *CascadeTask) isServing(parent context.Context, sn lumera.Supernode) bool {
132+
ctx, cancel := context.WithTimeout(parent, connectionTimeout)
133+
defer cancel()
134+
135+
client, err := net.NewClientFactory(ctx, t.logger, t.keyring, net.FactoryConfig{
136+
LocalCosmosAddress: t.config.Account.LocalCosmosAddress,
137+
}).CreateClient(ctx, sn)
138+
if err != nil {
139+
logtrace.Info(ctx, "Failed to create client for supernode", logtrace.Fields{
140+
logtrace.FieldMethod: "isServing"})
141+
return false
142+
}
143+
defer client.Close(ctx)
144+
145+
resp, err := client.HealthCheck(ctx)
146+
return err == nil && resp.Status == grpc_health_v1.HealthCheckResponse_SERVING
147+
}
142148

143149
func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lumera.Supernodes) error {
144150
factoryCfg := net.FactoryConfig{

supernode/services/cascade/register.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package cascade
33
import (
44
"context"
55
"fmt"
6-
"time"
76

87
"github.com/LumeraProtocol/supernode/pkg/logtrace"
98
)
@@ -88,7 +87,6 @@ func (task *CascadeRegistrationTask) Register(ctx context.Context, req *Register
8887
}
8988

9089
/* 9. Persist artefacts -------------------------------------------------------- */
91-
time.Sleep(time.Second * 30)
9290
if err := task.storeArtefacts(ctx, rqidResp.RedundantMetadataFiles, encResp.SymbolsDir, fields); err != nil {
9391
return nil, err
9492
}

0 commit comments

Comments
 (0)