Skip to content

Commit 046c128

Browse files
j-rafiquemateeullahmalik
authored andcommitted
Self Healing
1 parent 810de25 commit 046c128

File tree

8 files changed

+582
-9
lines changed

8 files changed

+582
-9
lines changed

p2p/kademlia/dht.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ func (s *DHT) eligibleForRouting(n *Node) bool {
210210
if integrationTestEnabled() {
211211
return true
212212
}
213-
// Bootstrap-safe behavior: until first non-empty chain allowlist arrives,
214-
// keep routing/read gating disabled to avoid accidental lockout.
213+
// Bootstrap fail-open: until the first successful allowlist sync, do not block routing.
214+
// This avoids startup isolation during transient chain/API failures.
215215
if !s.routingAllowReady.Load() {
216216
return true
217217
}
218-
// Once initialized, an empty active set means no routing-eligible peers.
218+
// Once initialized, an empty routing set means no routing-eligible peers.
219219
if s.routingAllowCount.Load() == 0 {
220220
return false
221221
}
@@ -274,8 +274,8 @@ func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node {
274274
if integrationTestEnabled() {
275275
return nodes
276276
}
277-
// If the routing allowlist has not been initialized yet, keep gating disabled
278-
// but still sanitize malformed node entries.
277+
// Bootstrap fail-open: before first allowlist sync, keep routing candidates
278+
// but sanitize malformed node entries.
279279
if !s.routingAllowReady.Load() {
280280
out := nodes[:0]
281281
for _, n := range nodes {
@@ -286,7 +286,7 @@ func (s *DHT) filterEligibleNodes(nodes []*Node) []*Node {
286286
}
287287
return out
288288
}
289-
// Once initialized, empty means no routing-eligible peers.
289+
// Once initialized, an empty allowlist means no routing-eligible peers.
290290
if s.routingAllowCount.Load() == 0 {
291291
return nil
292292
}
@@ -514,6 +514,7 @@ func (s *DHT) Start(ctx context.Context) error {
514514
}
515515

516516
go s.StartReplicationWorker(ctx)
517+
go s.startRebalanceWorker(ctx)
517518
go s.startDisabledKeysCleanupWorker(ctx)
518519
// TEMPORARY: disabled to pause redundant-key classification into del_keys.
519520
// Re-enable once deletion-safety behavior is finalized.
@@ -2099,7 +2100,7 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node {
20992100
node.SetHashedID()
21002101

21012102
// Chain-state routing gate (enabled after allowlist initialization):
2102-
// only chain-allowlisted peers may enter the routing table.
2103+
// only routing-eligible supernodes (Active + Postponed) can be admitted.
21032104
if !s.eligibleForRouting(node) {
21042105
logtrace.Debug(ctx, "Rejecting node: not eligible for routing", logtrace.Fields{
21052106
logtrace.FieldModule: "p2p",

p2p/kademlia/message.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
BatchFindNode
3131
// BatchGetValues finds values in kademlia network
3232
BatchGetValues
33+
// BatchProbeKeys checks local key presence/status without side effects
34+
BatchProbeKeys
3335
)
3436

3537
func init() {
@@ -49,6 +51,8 @@ func init() {
4951
gob.Register(&BatchFindNodeResponse{})
5052
gob.Register(&BatchGetValuesRequest{})
5153
gob.Register(&BatchGetValuesResponse{})
54+
gob.Register(&BatchProbeKeysRequest{})
55+
gob.Register(&BatchProbeKeysResponse{})
5256
}
5357

5458
type MessageWithError struct {
@@ -166,6 +170,17 @@ type BatchGetValuesResponse struct {
166170
Status ResponseStatus
167171
}
168172

173+
// BatchProbeKeysRequest defines the request data for side-effect-free local key probes.
174+
type BatchProbeKeysRequest struct {
175+
Keys []string // hex-encoded keys
176+
}
177+
178+
// BatchProbeKeysResponse defines the response data for local key probes.
179+
type BatchProbeKeysResponse struct {
180+
Status ResponseStatus
181+
Data map[string]LocalKeyStatus // key -> local status
182+
}
183+
169184
// encode the message
170185
func encode(message *Message) ([]byte, error) {
171186
var buf bytes.Buffer

p2p/kademlia/network.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func init() {
4747
FindValue: 5 * time.Second,
4848
BatchFindValues: 60 * time.Second, // responder compresses
4949
BatchGetValues: 75 * time.Second, // large, sometimes cloud fetch then send back
50+
BatchProbeKeys: 10 * time.Second,
5051
StoreData: 10 * time.Second,
5152
BatchStoreData: 75 * time.Second, // large uncompressed payloads
5253
Replicate: 90 * time.Second,
@@ -235,6 +236,37 @@ func (s *Network) handleFindValue(ctx context.Context, message *Message) (res []
235236
return s.encodeMesage(resMsg)
236237
}
237238

239+
func (s *Network) handleBatchProbeKeys(ctx context.Context, message *Message) (res []byte, err error) {
240+
defer func() {
241+
if response, err := s.handlePanic(ctx, message.Sender, BatchProbeKeys); response != nil || err != nil {
242+
res = response
243+
}
244+
}()
245+
246+
request, ok := message.Data.(*BatchProbeKeysRequest)
247+
if !ok {
248+
err := errors.New("invalid BatchProbeKeysRequest")
249+
return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error())
250+
}
251+
252+
// Keep routing table fresh while handling probe traffic.
253+
s.dht.addNode(ctx, message.Sender)
254+
255+
statuses, err := s.dht.store.RetrieveBatchLocalStatus(ctx, request.Keys)
256+
if err != nil {
257+
err = errors.Errorf("batch probe keys: %w", err)
258+
return s.generateResponseMessage(ctx, BatchProbeKeys, message.Sender, ResultFailed, err.Error())
259+
}
260+
261+
response := &BatchProbeKeysResponse{
262+
Status: ResponseStatus{Result: ResultOk},
263+
Data: statuses,
264+
}
265+
266+
resMsg := s.dht.newMessage(BatchProbeKeys, message.Sender, response)
267+
return s.encodeMesage(resMsg)
268+
}
269+
238270
func (s *Network) handleStoreData(ctx context.Context, message *Message) (res []byte, err error) {
239271
defer func() {
240272
if response, err := s.handlePanic(ctx, message.Sender, StoreData); response != nil || err != nil {
@@ -480,6 +512,10 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) {
480512
response, hErr = s.withMetrics(BatchGetValues, func() ([]byte, error) {
481513
return s.handleGetValuesRequest(ctx, request, reqID)
482514
})
515+
case BatchProbeKeys:
516+
response, hErr = s.withMetrics(BatchProbeKeys, func() ([]byte, error) {
517+
return s.handleBatchProbeKeys(ctx, request)
518+
})
483519
default:
484520
// count unknown types as failure and return
485521
m := s.metricsFor(mt)
@@ -1321,6 +1357,8 @@ func (s *Network) generateResponseMessage(ctx context.Context, messageType int,
13211357
response = &ReplicateDataResponse{Status: responseStatus}
13221358
case BatchGetValues:
13231359
response = &BatchGetValuesResponse{Status: responseStatus}
1360+
case BatchProbeKeys:
1361+
response = &BatchProbeKeysResponse{Status: responseStatus}
13241362
default:
13251363
return nil, fmt.Errorf("unsupported message type %d", messageType)
13261364
}
@@ -1461,6 +1499,8 @@ func msgName(t int) string {
14611499
return "BatchStoreData"
14621500
case Replicate:
14631501
return "Replicate"
1502+
case BatchProbeKeys:
1503+
return "BatchProbeKeys"
14641504
default:
14651505
return fmt.Sprintf("Type_%d", t)
14661506
}
@@ -1501,7 +1541,7 @@ func (s *Network) HandleMetricsSnapshot() map[string]HandleCounters {
15011541
func readDeadlineFor(msgType int, overall time.Duration) time.Duration {
15021542
small := 10 * time.Second
15031543
switch msgType {
1504-
case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData:
1544+
case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData, BatchProbeKeys:
15051545
if overall > small+1*time.Second {
15061546
return small
15071547
}

0 commit comments

Comments
 (0)