Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions p2p/kademlia/banlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kademlia
import (
"bytes"
"context"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -232,3 +233,46 @@ func NewBanList(ctx context.Context) *BanList {

return list
}

type BanSnapshot struct {
ID string `json:"id"` // printable id
IP string `json:"ip"` // last seen ip
Port uint16 `json:"port"` // last seen port
Count int `json:"count"` // failure count
CreatedAt time.Time `json:"created_at"` // first ban time
Age time.Duration `json:"age"` // time since CreatedAt
}

func (s *BanList) Snapshot(limit int) []BanSnapshot {
s.mtx.RLock()
defer s.mtx.RUnlock()

n := len(s.Nodes)
if n == 0 {
return nil
}
out := make([]BanSnapshot, 0, n)
for _, b := range s.Nodes {
idPrintable := string(b.ID)
// If your IDs are binary, switch to base58:
// idPrintable = base58.Encode(b.ID)
out = append(out, BanSnapshot{
ID: idPrintable,
IP: b.IP,
Port: b.Port,
Count: b.count,
CreatedAt: b.CreatedAt,
Age: time.Since(b.CreatedAt).Round(time.Second),
})
}
sort.Slice(out, func(i, j int) bool {
if out[i].Count == out[j].Count {
return out[i].CreatedAt.Before(out[j].CreatedAt)
}
return out[i].Count > out[j].Count
})
if limit > 0 && len(out) > limit {
out = out[:limit]
}
return out
}
138 changes: 136 additions & 2 deletions p2p/kademlia/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type DHT struct {
ignorelist *BanList
replicationMtx sync.RWMutex
rqstore rqstore.Store
metrics DHTMetrics
}

// bootstrapIgnoreList seeds the in-memory ignore list with nodes that are
Expand Down Expand Up @@ -107,6 +108,17 @@ func (s *DHT) bootstrapIgnoreList(ctx context.Context) error {
}
return nil
}
func (s *DHT) MetricsSnapshot() DHTMetricsSnapshot {
return s.metrics.Snapshot()
}

func (s *DHT) BanListSnapshot() []BanSnapshot {
return s.ignorelist.Snapshot(0)
}

func (s *DHT) ConnPoolSnapshot() map[string]int64 {
return s.network.connPool.metrics.Snapshot()
}

// Options contains configuration options for the queries node
type Options struct {
Expand Down Expand Up @@ -442,10 +454,11 @@ func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) {
return nil, err
}

dhtStats := map[string]interface{}{}
dhtStats := map[string]any{}
dhtStats["self"] = s.ht.self
dhtStats["peers_count"] = len(s.ht.nodes())
dhtStats["peers"] = s.ht.nodes()
dhtStats["network"] = s.network.HandleMetricsSnapshot()
dhtStats["database"] = dbStats

return dhtStats, nil
Expand Down Expand Up @@ -636,6 +649,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result
}

func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) {
start := time.Now()
result = make(map[string][]byte)
var resMap sync.Map
var foundLocalCount int32
Expand Down Expand Up @@ -750,6 +764,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32,
}

wg.Wait()

netFound := int(atomic.LoadInt32(&networkFound))
s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Duration(time.Since(start).Milliseconds())) // NEW

return result, nil
}

Expand Down Expand Up @@ -1571,7 +1589,6 @@ func (s *DHT) addKnownNodes(ctx context.Context, nodes []*Node, knownNodes map[s
func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) error {
globalClosestContacts := make(map[string]*NodeList)
knownNodes := make(map[string]*Node)
// contacted := make(map[string]bool)
hashes := make([][]byte, len(values))

logtrace.Info(ctx, "Iterate batch store begin", logtrace.Fields{
Expand Down Expand Up @@ -1639,6 +1656,8 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
}

if requests > 0 {
s.metrics.RecordStoreSuccess(requests, successful)

successRate := float64(successful) / float64(requests) * 100
if successRate >= minimumDataStoreSuccessRate {
logtrace.Info(ctx, "Successful store operations", logtrace.Fields{
Expand All @@ -1655,6 +1674,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i
})
return fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate)
}

}

return fmt.Errorf("no store operations were performed")
Expand Down Expand Up @@ -1683,6 +1703,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
logtrace.FieldModule: "dht",
"node": node.String(),
})
s.metrics.IncHotPathBannedSkip()
continue
}

Expand Down Expand Up @@ -1717,6 +1738,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[
if err != nil {
if !isLocalCancel(err) {
s.ignorelist.IncrementCount(receiver)
s.metrics.IncHotPathBanIncr()
}

logtrace.Info(ctx, "Network call batch store request failed", logtrace.Fields{
Expand Down Expand Up @@ -1768,6 +1790,7 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str
"node": node.String(),
"txid": txid,
})
s.metrics.IncHotPathBannedSkip()
continue
}

Expand All @@ -1791,6 +1814,7 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str
if err != nil {
if !isLocalCancel(err) {
s.ignorelist.IncrementCount(receiver)
s.metrics.IncHotPathBanIncr()
}

logtrace.Warn(ctx, "Batch find node network call request failed", logtrace.Fields{
Expand Down Expand Up @@ -1825,3 +1849,113 @@ func (s *DHT) addKnownNodesSafe(ctx context.Context, nodes []*Node, knownNodes m
s.addKnownNodes(ctx, nodes, knownNodes)
mu.Unlock()
}

// ---- DHT metrics -----------------------------------------------------------

type StoreSuccessPoint struct {
Time time.Time `json:"time"`
Requests int `json:"requests"`
Successful int `json:"successful"`
SuccessRate float64 `json:"success_rate"`
}

type BatchRetrievePoint struct {
Time time.Time `json:"time"`
Keys int `json:"keys"`
Required int `json:"required"`
FoundLocal int `json:"found_local"`
FoundNet int `json:"found_network"`
Duration time.Duration `json:"duration"`
}

type DHTMetricsSnapshot struct {
// rolling windows
StoreSuccessRecent []StoreSuccessPoint `json:"store_success_recent"`
BatchRetrieveRecent []BatchRetrievePoint `json:"batch_retrieve_recent"`

// hot path counters
HotPathBannedSkips int64 `json:"hot_path_banned_skips"`
HotPathBanIncrements int64 `json:"hot_path_ban_increments"`
}

type DHTMetrics struct {
mu sync.Mutex

// bounded windows (most recent first)
storeSuccess []StoreSuccessPoint
batchRetrieve []BatchRetrievePoint
maxWindow int

// hot path counters
hotPathBannedSkips atomic.Int64
hotPathBanIncrements atomic.Int64
}

func (m *DHTMetrics) init() {
m.mu.Lock()
defer m.mu.Unlock()
if m.maxWindow == 0 {
m.maxWindow = 48 // e.g. last 48 events (~several hours)
}
}

func (m *DHTMetrics) trimStore() {
if len(m.storeSuccess) > m.maxWindow {
m.storeSuccess = m.storeSuccess[:m.maxWindow]
}
}
func (m *DHTMetrics) trimRetrieve() {
if len(m.batchRetrieve) > m.maxWindow {
m.batchRetrieve = m.batchRetrieve[:m.maxWindow]
}
}

func (m *DHTMetrics) RecordStoreSuccess(req, succ int) {
m.init()
rate := 0.0
if req > 0 {
rate = (float64(succ) / float64(req)) * 100.0
}
m.mu.Lock()
m.storeSuccess = append([]StoreSuccessPoint{{
Time: time.Now().UTC(),
Requests: req,
Successful: succ,
SuccessRate: rate,
}}, m.storeSuccess...)
m.trimStore()
m.mu.Unlock()
}

func (m *DHTMetrics) RecordBatchRetrieve(keys, required, foundLocal, foundNet int, dur time.Duration) {
m.init()
m.mu.Lock()
m.batchRetrieve = append([]BatchRetrievePoint{{
Time: time.Now().UTC(),
Keys: keys,
Required: required,
FoundLocal: foundLocal,
FoundNet: foundNet,
Duration: dur,
}}, m.batchRetrieve...)
m.trimRetrieve()
m.mu.Unlock()
}

func (m *DHTMetrics) IncHotPathBannedSkip() { m.hotPathBannedSkips.Add(1) }
func (m *DHTMetrics) IncHotPathBanIncr() { m.hotPathBanIncrements.Add(1) }

func (m *DHTMetrics) Snapshot() DHTMetricsSnapshot {
m.init()
m.mu.Lock()
defer m.mu.Unlock()
// shallow copies of bounded windows
storeCopy := append([]StoreSuccessPoint(nil), m.storeSuccess...)
retrCopy := append([]BatchRetrievePoint(nil), m.batchRetrieve...)
return DHTMetricsSnapshot{
StoreSuccessRecent: storeCopy,
BatchRetrieveRecent: retrCopy,
HotPathBannedSkips: m.hotPathBannedSkips.Load(),
HotPathBanIncrements: m.hotPathBanIncrements.Load(),
}
}
Loading