diff --git a/p2p/kademlia/banlist.go b/p2p/kademlia/banlist.go index 6a0ecf3d..553dc4da 100644 --- a/p2p/kademlia/banlist.go +++ b/p2p/kademlia/banlist.go @@ -3,6 +3,7 @@ package kademlia import ( "bytes" "context" + "sort" "strings" "sync" "time" @@ -232,3 +233,46 @@ func NewBanList(ctx context.Context) *BanList { return list } + +type BanSnapshot struct { + ID string `json:"id"` // printable id + IP string `json:"ip"` // last seen ip + Port uint16 `json:"port"` // last seen port + Count int `json:"count"` // failure count + CreatedAt time.Time `json:"created_at"` // first ban time + Age time.Duration `json:"age"` // time since CreatedAt +} + +func (s *BanList) Snapshot(limit int) []BanSnapshot { + s.mtx.RLock() + defer s.mtx.RUnlock() + + n := len(s.Nodes) + if n == 0 { + return nil + } + out := make([]BanSnapshot, 0, n) + for _, b := range s.Nodes { + idPrintable := string(b.ID) + // If your IDs are binary, switch to base58: + // idPrintable = base58.Encode(b.ID) + out = append(out, BanSnapshot{ + ID: idPrintable, + IP: b.IP, + Port: b.Port, + Count: b.count, + CreatedAt: b.CreatedAt, + Age: time.Since(b.CreatedAt).Round(time.Second), + }) + } + sort.Slice(out, func(i, j int) bool { + if out[i].Count == out[j].Count { + return out[i].CreatedAt.Before(out[j].CreatedAt) + } + return out[i].Count > out[j].Count + }) + if limit > 0 && len(out) > limit { + out = out[:limit] + } + return out +} diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index a232beae..9f5fb3c9 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -63,6 +63,7 @@ type DHT struct { ignorelist *BanList replicationMtx sync.RWMutex rqstore rqstore.Store + metrics DHTMetrics } // bootstrapIgnoreList seeds the in-memory ignore list with nodes that are @@ -107,6 +108,17 @@ func (s *DHT) bootstrapIgnoreList(ctx context.Context) error { } return nil } +func (s *DHT) MetricsSnapshot() DHTMetricsSnapshot { + return s.metrics.Snapshot() +} + +func (s *DHT) BanListSnapshot() []BanSnapshot { + return s.ignorelist.Snapshot(0) +} + +func (s *DHT) ConnPoolSnapshot() map[string]int64 { + return s.network.connPool.metrics.Snapshot() +} // Options contains configuration options for the queries node type Options struct { @@ -442,10 +454,11 @@ func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) { return nil, err } - dhtStats := map[string]interface{}{} + dhtStats := map[string]any{} dhtStats["self"] = s.ht.self dhtStats["peers_count"] = len(s.ht.nodes()) dhtStats["peers"] = s.ht.nodes() + dhtStats["network"] = s.network.HandleMetricsSnapshot() dhtStats["database"] = dbStats return dhtStats, nil @@ -636,6 +649,7 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result } func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) { + start := time.Now() result = make(map[string][]byte) var resMap sync.Map var foundLocalCount int32 @@ -750,6 +764,10 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, } wg.Wait() + + netFound := int(atomic.LoadInt32(&networkFound)) + s.metrics.RecordBatchRetrieve(len(keys), int(required), int(foundLocalCount), netFound, time.Duration(time.Since(start).Milliseconds())) // NEW + return result, nil } @@ -1571,7 +1589,6 @@ func (s *DHT) addKnownNodes(ctx context.Context, nodes []*Node, knownNodes map[s func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, id string) error { globalClosestContacts := make(map[string]*NodeList) knownNodes := make(map[string]*Node) - // contacted := make(map[string]bool) hashes := make([][]byte, len(values)) logtrace.Info(ctx, "Iterate batch store begin", logtrace.Fields{ @@ -1639,6 +1656,8 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i } if requests > 0 { + s.metrics.RecordStoreSuccess(requests, successful) + successRate := float64(successful) / float64(requests) * 100 if successRate >= minimumDataStoreSuccessRate { logtrace.Info(ctx, "Successful store operations", logtrace.Fields{ @@ -1655,6 +1674,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i }) return fmt.Errorf("failed to achieve desired success rate, only: %.2f%% successful", successRate) } + } return fmt.Errorf("no store operations were performed") @@ -1683,6 +1703,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ logtrace.FieldModule: "dht", "node": node.String(), }) + s.metrics.IncHotPathBannedSkip() continue } @@ -1717,6 +1738,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ if err != nil { if !isLocalCancel(err) { s.ignorelist.IncrementCount(receiver) + s.metrics.IncHotPathBanIncr() } logtrace.Info(ctx, "Network call batch store request failed", logtrace.Fields{ @@ -1768,6 +1790,7 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str "node": node.String(), "txid": txid, }) + s.metrics.IncHotPathBannedSkip() continue } @@ -1791,6 +1814,7 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str if err != nil { if !isLocalCancel(err) { s.ignorelist.IncrementCount(receiver) + s.metrics.IncHotPathBanIncr() } logtrace.Warn(ctx, "Batch find node network call request failed", logtrace.Fields{ @@ -1825,3 +1849,113 @@ func (s *DHT) addKnownNodesSafe(ctx context.Context, nodes []*Node, knownNodes m s.addKnownNodes(ctx, nodes, knownNodes) mu.Unlock() } + +// ---- DHT metrics ----------------------------------------------------------- + +type StoreSuccessPoint struct { + Time time.Time `json:"time"` + Requests int `json:"requests"` + Successful int `json:"successful"` + SuccessRate float64 `json:"success_rate"` +} + +type BatchRetrievePoint struct { + Time time.Time `json:"time"` + Keys int `json:"keys"` + Required int `json:"required"` + FoundLocal int `json:"found_local"` + FoundNet int `json:"found_network"` + Duration time.Duration `json:"duration"` +} + +type DHTMetricsSnapshot struct { + // rolling windows + StoreSuccessRecent []StoreSuccessPoint `json:"store_success_recent"` + BatchRetrieveRecent []BatchRetrievePoint `json:"batch_retrieve_recent"` + + // hot path counters + HotPathBannedSkips int64 `json:"hot_path_banned_skips"` + HotPathBanIncrements int64 `json:"hot_path_ban_increments"` +} + +type DHTMetrics struct { + mu sync.Mutex + + // bounded windows (most recent first) + storeSuccess []StoreSuccessPoint + batchRetrieve []BatchRetrievePoint + maxWindow int + + // hot path counters + hotPathBannedSkips atomic.Int64 + hotPathBanIncrements atomic.Int64 +} + +func (m *DHTMetrics) init() { + m.mu.Lock() + defer m.mu.Unlock() + if m.maxWindow == 0 { + m.maxWindow = 48 // e.g. last 48 events (~several hours) + } +} + +func (m *DHTMetrics) trimStore() { + if len(m.storeSuccess) > m.maxWindow { + m.storeSuccess = m.storeSuccess[:m.maxWindow] + } +} +func (m *DHTMetrics) trimRetrieve() { + if len(m.batchRetrieve) > m.maxWindow { + m.batchRetrieve = m.batchRetrieve[:m.maxWindow] + } +} + +func (m *DHTMetrics) RecordStoreSuccess(req, succ int) { + m.init() + rate := 0.0 + if req > 0 { + rate = (float64(succ) / float64(req)) * 100.0 + } + m.mu.Lock() + m.storeSuccess = append([]StoreSuccessPoint{{ + Time: time.Now().UTC(), + Requests: req, + Successful: succ, + SuccessRate: rate, + }}, m.storeSuccess...) + m.trimStore() + m.mu.Unlock() +} + +func (m *DHTMetrics) RecordBatchRetrieve(keys, required, foundLocal, foundNet int, dur time.Duration) { + m.init() + m.mu.Lock() + m.batchRetrieve = append([]BatchRetrievePoint{{ + Time: time.Now().UTC(), + Keys: keys, + Required: required, + FoundLocal: foundLocal, + FoundNet: foundNet, + Duration: dur, + }}, m.batchRetrieve...) + m.trimRetrieve() + m.mu.Unlock() +} + +func (m *DHTMetrics) IncHotPathBannedSkip() { m.hotPathBannedSkips.Add(1) } +func (m *DHTMetrics) IncHotPathBanIncr() { m.hotPathBanIncrements.Add(1) } + +func (m *DHTMetrics) Snapshot() DHTMetricsSnapshot { + m.init() + m.mu.Lock() + defer m.mu.Unlock() + // shallow copies of bounded windows + storeCopy := append([]StoreSuccessPoint(nil), m.storeSuccess...) + retrCopy := append([]BatchRetrievePoint(nil), m.batchRetrieve...) + return DHTMetricsSnapshot{ + StoreSuccessRecent: storeCopy, + BatchRetrieveRecent: retrCopy, + HotPathBannedSkips: m.hotPathBannedSkips.Load(), + HotPathBanIncrements: m.hotPathBanIncrements.Load(), + } +} diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index da409e61..366f0f4a 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -40,20 +41,15 @@ var execTimeouts map[int]time.Duration func init() { // Initialize the request execution timeout values execTimeouts = map[int]time.Duration{ - // Lightweight - Ping: 5 * time.Second, - FindNode: 15 * time.Second, - BatchFindNode: 15 * time.Second, - - // Value lookups - FindValue: 20 * time.Second, - BatchFindValues: 90 * time.Second, - BatchGetValues: 90 * time.Second, - - // Data movement - StoreData: 5 * time.Second, - BatchStoreData: 90 * time.Second, - Replicate: 90 * time.Second, + Ping: 5 * time.Second, + FindNode: 10 * time.Second, + BatchFindNode: 5 * time.Second, // small requests, quick + FindValue: 5 * time.Second, + BatchFindValues: 60 * time.Second, // responder compresses + BatchGetValues: 75 * time.Second, // large, sometimes cloud fetch then send back + StoreData: 10 * time.Second, + BatchStoreData: 75 * time.Second, // large uncompressed payloads + Replicate: 90 * time.Second, } } @@ -71,6 +67,8 @@ type Network struct { connPool *ConnPool connPoolMtx sync.Mutex sem *semaphore.Weighted + + metrics sync.Map } // NewNetwork returns a network service @@ -83,6 +81,7 @@ func NewNetwork(ctx context.Context, dht *DHT, self *Node, clientTC, serverTC cr serverTC: serverTC, connPool: NewConnPool(ctx), sem: semaphore.NewWeighted(maxConcurrentFindBatchValsRequests), + metrics: sync.Map{}, } // init the rate limiter s.limiter = ratelimit.New(defaultConnRate) @@ -360,11 +359,11 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { "local-addr": rawConn.LocalAddr().String(), "remote-addr": rawConn.RemoteAddr().String(), }) - // do secure handshaking + // secure handshake if s.serverTC != nil { conn, err = NewSecureServerConn(ctx, s.serverTC, rawConn) if err != nil { - rawConn.Close() + _ = rawConn.Close() logtrace.Warn(ctx, "Server secure handshake failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), @@ -374,7 +373,6 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { } else { conn = rawConn } - defer conn.Close() const serverReadTimeout = 90 * time.Second @@ -386,16 +384,15 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { default: } - // enforce a per-message read deadline so a slow peer can't hang us + // per-message read deadline _ = conn.SetReadDeadline(time.Now().Add(serverReadTimeout)) - // read the request from connection + // read request request, err := decode(conn) if err != nil { if err == io.EOF { return } - // downgrade pure timeouts to debug to reduce noise if ne, ok := err.(net.Error); ok && ne.Timeout() { logtrace.Debug(ctx, "Read and decode timed out, keeping connection open", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -410,124 +407,75 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { return } reqID := uuid.New().String() + mt := request.MessageType + // invoke handler with metrics var response []byte - switch request.MessageType { - case FindNode: - encoded, err := s.handleFindNode(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle find node request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) + var hErr error - return - } - response = encoded + switch mt { + case FindNode: + response, hErr = s.withMetrics(FindNode, func() ([]byte, error) { + return s.handleFindNode(ctx, request) + }) case BatchFindNode: - encoded, err := s.handleBatchFindNode(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle batch find node request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(BatchFindNode, func() ([]byte, error) { + return s.handleBatchFindNode(ctx, request) + }) case FindValue: - // handle the request for finding value - encoded, err := s.handleFindValue(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle find value request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(FindValue, func() ([]byte, error) { + return s.handleFindValue(ctx, request) + }) case Ping: - encoded, err := s.handlePing(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle ping request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(Ping, func() ([]byte, error) { + return s.handlePing(ctx, request) + }) case StoreData: - // handle the request for storing data - encoded, err := s.handleStoreData(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle store data request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(StoreData, func() ([]byte, error) { + return s.handleStoreData(ctx, request) + }) case Replicate: - // handle the request for replicate request - encoded, err := s.handleReplicate(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle replicate request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(Replicate, func() ([]byte, error) { + return s.handleReplicate(ctx, request) + }) case BatchFindValues: - // handle the request for finding value - encoded, err := s.handleBatchFindValues(ctx, request, reqID) - if err != nil { - logtrace.Error(ctx, "Handle batch find values request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "p2p-req-id": reqID, - }) - return - } - response = encoded + response, hErr = s.withMetrics(BatchFindValues, func() ([]byte, error) { + return s.handleBatchFindValues(ctx, request, reqID) + }) case BatchStoreData: - // handle the request for storing data - encoded, err := s.handleBatchStoreData(ctx, request) - if err != nil { - logtrace.Error(ctx, "Handle batch store data request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return - } - response = encoded + response, hErr = s.withMetrics(BatchStoreData, func() ([]byte, error) { + return s.handleBatchStoreData(ctx, request) + }) case BatchGetValues: - // handle the request for finding value - encoded, err := s.handleGetValuesRequest(ctx, request, reqID) - if err != nil { - logtrace.Error(ctx, "Handle batch get values request failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "p2p-req-id": reqID, - }) - return - } - response = encoded + response, hErr = s.withMetrics(BatchGetValues, func() ([]byte, error) { + return s.handleGetValuesRequest(ctx, request, reqID) + }) default: + // count unknown types as failure and return + m := s.metricsFor(mt) + m.Total.Add(1) + m.Failure.Add(1) logtrace.Error(ctx, "Invalid message type", logtrace.Fields{ logtrace.FieldModule: "p2p", - "message-type": request.MessageType, + "message-type": mt, }) return } - // write the response + if hErr != nil { + // handler already logged; keep connection open for next request + continue + } + + // write the response (transport write failures counted as well) _ = conn.SetWriteDeadline(time.Now().Add(serverReadTimeout)) if _, err := conn.Write(response); err != nil { + s.markTransportWrite(mt, err) logtrace.Error(ctx, "Write failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "p2p-req-id": reqID, - "message-type": request.MessageType, + "message-type": mt, }) return } @@ -1349,19 +1297,6 @@ func (s *Network) handlePanic(ctx context.Context, sender *Node, messageType int return nil, nil } -func readDeadlineFor(msgType int, overall time.Duration) time.Duration { - small := 10 * time.Second - switch msgType { - case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData: - if overall > small+1*time.Second { - return small - } - return overall - 1*time.Second - default: - return overall // Bulk responses keep full budget - } -} - func isLocalCancel(err error) bool { if err == nil { return false @@ -1377,3 +1312,121 @@ func isLocalCancel(err error) bool { } return false } + +// Per-handler counters +type HandleMetrics struct { + Total atomic.Int64 + Success atomic.Int64 + Failure atomic.Int64 + Timeout atomic.Int64 +} + +// Get-or-create metrics bucket for a message type. +func (s *Network) metricsFor(t int) *HandleMetrics { + if v, ok := s.metrics.Load(t); ok { + if m, ok := v.(*HandleMetrics); ok && m != nil { + return m + } + } + m := &HandleMetrics{} + actual, _ := s.metrics.LoadOrStore(t, m) + return actual.(*HandleMetrics) +} + +// Wrap a handler so we consistently count total / success / failure / timeout. +func (s *Network) withMetrics(t int, fn func() ([]byte, error)) ([]byte, error) { + m := s.metricsFor(t) + m.Total.Add(1) + + res, err := fn() + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + m.Timeout.Add(1) + } else { + m.Failure.Add(1) + } + return res, err + } + + m.Success.Add(1) + return res, nil +} + +// After a successful handler, if the write to the socket fails we also count that. +func (s *Network) markTransportWrite(t int, err error) { + if err == nil { + return + } + m := s.metricsFor(t) + if ne, ok := err.(net.Error); ok && ne.Timeout() { + m.Timeout.Add(1) + } else { + m.Failure.Add(1) + } +} + +// Optional: export a snapshot (e.g., in DHT metrics or /debug handler) +type HandleCounters struct { + Total int64 `json:"total"` + Success int64 `json:"success"` + Failure int64 `json:"failure"` + Timeout int64 `json:"timeout"` +} + +func msgName(t int) string { + switch t { + case Ping: + return "Ping" + case StoreData: + return "StoreData" + case FindNode: + return "FindNode" + case BatchFindNode: + return "BatchFindNode" + case FindValue: + return "FindValue" + case BatchFindValues: + return "BatchFindValues" + case BatchGetValues: + return "BatchGetValues" + case BatchStoreData: + return "BatchStoreData" + case Replicate: + return "Replicate" + default: + return fmt.Sprintf("Type_%d", t) + } +} + +func (s *Network) HandleMetricsSnapshot() map[string]HandleCounters { + out := make(map[string]HandleCounters) + s.metrics.Range(func(k, v any) bool { + t, _ := k.(int) + m, _ := v.(*HandleMetrics) + if m == nil { + return true + } + out[msgName(t)] = HandleCounters{ + Total: m.Total.Load(), + Success: m.Success.Load(), + Failure: m.Failure.Load(), + Timeout: m.Timeout.Load(), + } + return true + }) + return out +} + +// Reuse your calibrated read deadline chooser from client RPC side. +func readDeadlineFor(msgType int, overall time.Duration) time.Duration { + small := 10 * time.Second + switch msgType { + case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData: + if overall > small+1*time.Second { + return small + } + return overall - 1*time.Second + default: + return overall // Bulk responses keep full budget + } +} diff --git a/p2p/kademlia/node_activity.go b/p2p/kademlia/node_activity.go index af278903..cc7089d6 100644 --- a/p2p/kademlia/node_activity.go +++ b/p2p/kademlia/node_activity.go @@ -91,7 +91,7 @@ func (s *DHT) handlePingFailure(ctx context.Context, wasActive bool, n *Node, er // increment soft-fail counter; only evict when past threshold s.ignorelist.IncrementCount(n) if wasActive && s.ignorelist.Banned(n) { - logtrace.Warn(ctx, "setting node to inactive", logtrace.Fields{ + logtrace.Error(ctx, "setting node to inactive", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "ip": n.IP, diff --git a/p2p/p2p.go b/p2p/p2p.go index cb48bf99..1963933f 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -175,6 +175,10 @@ func (s *p2p) Stats(ctx context.Context) (map[string]interface{}, error) { } retStats["disk-info"] = &diskUse + retStats["ban-list"] = s.dht.BanListSnapshot() + retStats["conn-pool"] = s.dht.ConnPoolSnapshot() + dhtStats["dht_metrics"] = s.dht.MetricsSnapshot() + return retStats, nil }