From 79ffd76d590958779951400725bb358312170d64 Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Sat, 30 Aug 2025 10:39:57 +0500 Subject: [PATCH 1/2] p2p imporvements --- .gitignore | 2 +- p2p/kademlia/bootstrap.go | 6 ++++++ p2p/kademlia/conn_pool.go | 7 ++++++- p2p/kademlia/dht.go | 7 ++++--- p2p/kademlia/hashtable.go | 8 +++++++- p2p/kademlia/message.go | 12 +++++++++++ p2p/kademlia/network.go | 20 ++++++++++++++++--- p2p/kademlia/node.go | 16 +++++++-------- .../store/{cloud.go => cloud}/cloud.go | 0 .../store/{cloud.go => cloud}/cloud_test.go | 0 p2p/kademlia/store/sqlite/meta_worker.go | 2 +- p2p/kademlia/store/sqlite/sqlite.go | 2 +- p2p/kademlia/store/sqlite/sqlite_test.go | 2 +- p2p/p2p.go | 2 +- 14 files changed, 65 insertions(+), 21 deletions(-) rename p2p/kademlia/store/{cloud.go => cloud}/cloud.go (100%) rename p2p/kademlia/store/{cloud.go => cloud}/cloud_test.go (100%) diff --git a/.gitignore b/.gitignore index 5c42397c..341114a4 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out - +CLAUDE.md # Dependency directories (remove the comment below to include it) # vendor/ # sn-manager binary diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index f8080977..ad98989f 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -232,7 +232,9 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { for _, node := range s.options.BootstrapNodes { nodeId := string(node.ID) // sync the bootstrap node only once + s.bsConnectedMtx.RLock() isConnected, exists := s.bsConnected[nodeId] + s.bsConnectedMtx.RUnlock() if exists && isConnected { continue } @@ -247,7 +249,9 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { } node := node + s.bsConnectedMtx.Lock() s.bsConnected[nodeId] = false + s.bsConnectedMtx.Unlock() wg.Add(1) go func() { defer wg.Done() @@ -296,7 +300,9 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { continue } + s.bsConnectedMtx.Lock() s.bsConnected[nodeId] = true + s.bsConnectedMtx.Unlock() s.addNode(ctx, response.Sender) break } diff --git a/p2p/kademlia/conn_pool.go b/p2p/kademlia/conn_pool.go index f571b10a..5b11f7a5 100644 --- a/p2p/kademlia/conn_pool.go +++ b/p2p/kademlia/conn_pool.go @@ -63,7 +63,12 @@ func (pool *ConnPool) Add(addr string, conn net.Conn) { } } - delete(pool.conns, oldestAccessAddr) + if oldestAccessAddr != "" { + if item, ok := pool.conns[oldestAccessAddr]; ok { + _ = item.conn.Close() + } + delete(pool.conns, oldestAccessAddr) + } } } diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index d47155ea..52059a43 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -6,11 +6,11 @@ import ( "encoding/hex" "fmt" "math" + "net" + "net/url" "sync" "sync/atomic" "time" - "net" - "net/url" "github.com/btcsuite/btcutil/base58" "github.com/cenkalti/backoff/v4" @@ -56,6 +56,7 @@ type DHT struct { done chan struct{} // distributed hash table is done cache storage.KeyValue // store bad bootstrap addresses bsConnected map[string]bool // map of connected bootstrap nodes [identity] -> connected + bsConnectedMtx sync.RWMutex // mutex for bsConnected map supernodeAddr string // cached address from chain mtx sync.Mutex ignorelist *BanList @@ -1273,7 +1274,7 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { } if s.ht.hasBucketNode(index, node.ID) { - s.ht.refreshNode(node.HashedID) + s.ht.refreshNode(node.ID) return nil } diff --git a/p2p/kademlia/hashtable.go b/p2p/kademlia/hashtable.go index 3a28baca..0dfc9478 100644 --- a/p2p/kademlia/hashtable.go +++ b/p2p/kademlia/hashtable.go @@ -97,6 +97,7 @@ func (ht *HashTable) refreshNode(id []byte) { bucket := ht.routeTable[index] var offset int + offset = -1 // find the position of the node for i, v := range bucket { if bytes.Equal(v.ID, id) { @@ -106,6 +107,11 @@ func (ht *HashTable) refreshNode(id []byte) { } // makes the node to the end + + if offset < 0 { + return + } // nothing to refresh + current := bucket[offset] bucket = append(bucket[:offset], bucket[offset+1:]...) bucket = append(bucket, current) @@ -307,7 +313,7 @@ func (ht *HashTable) closestContactsWithInlcudingNode(num int, target []byte, ig // Flatten the routeTable and add nodes to nl if they're not in the ignoredMap for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[string(node.ID)] { + if !ignoredMap[hex.EncodeToString(node.ID)] { nl.AddNodes([]*Node{node}) } } diff --git a/p2p/kademlia/message.go b/p2p/kademlia/message.go index 3dc67559..7ef3f206 100644 --- a/p2p/kademlia/message.go +++ b/p2p/kademlia/message.go @@ -167,6 +167,12 @@ func encode(message *Message) ([]byte, error) { return nil, err } + // Check against absolute maximum first + const maxMessageSize = 500 * 1024 * 1024 // 500MB absolute max + if buf.Len() > maxMessageSize { + return nil, errors.New("message size exceeds absolute maximum") + } + if utils.BytesIntToMB(buf.Len()) > defaultMaxPayloadSize { return nil, errors.New("payload too big") } @@ -196,6 +202,12 @@ func decode(conn io.Reader) (*Message, error) { return nil, errors.Errorf("parse header length: %w", err) } + // Check against absolute maximum first to prevent DoS + const maxMessageSize = 500 * 1024 * 1024 // 500MB absolute max + if length > maxMessageSize { + return nil, errors.New("message size exceeds absolute maximum") + } + if utils.BytesToMB(length) > defaultMaxPayloadSize { return nil, errors.New("payload too big") } diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 50dcca86..53324ce2 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -362,6 +362,8 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { defer conn.Close() + const serverReadTimeout = 60 * time.Second + for { select { case <-ctx.Done(): @@ -369,6 +371,9 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { default: } + // enforce a per-message read deadline so a slow peer can't hang us + _ = conn.SetReadDeadline(time.Now().Add(serverReadTimeout)) + // read the request from connection request, err := decode(conn) if err != nil { @@ -493,6 +498,7 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { } // write the response + _ = conn.SetWriteDeadline(time.Now().Add(serverReadTimeout)) if _, err := conn.Write(response); err != nil { logtrace.Error(ctx, "Write failed", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -614,17 +620,25 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes } // do secure handshaking + // Fast path: try get under lock s.connPoolMtx.Lock() conn, err := s.connPool.Get(remoteAddr) + s.connPoolMtx.Unlock() if err != nil { + // Slow path: build without holding the lock conn, err = NewSecureClientConn(ctx, s.clientTC, remoteAddr) if err != nil { - s.connPoolMtx.Unlock() return nil, errors.Errorf("client secure establish %q: %w", remoteAddr, err) } - s.connPool.Add(remoteAddr, conn) + s.connPoolMtx.Lock() + // another goroutine may have added meanwhile; replace safely + if _, e := s.connPool.Get(remoteAddr); e == nil { + conn.Close() + } else { + s.connPool.Add(remoteAddr, conn) + } + s.connPoolMtx.Unlock() } - s.connPoolMtx.Unlock() defer func() { if err != nil && s.clientTC != nil { diff --git a/p2p/kademlia/node.go b/p2p/kademlia/node.go index e7709d7c..b6050a10 100644 --- a/p2p/kademlia/node.go +++ b/p2p/kademlia/node.go @@ -197,12 +197,12 @@ func (s *NodeList) NodeIDs() [][]byte { s.Mux.RLock() defer s.Mux.RUnlock() - toRet := make([][]byte, len(s.Nodes)) - for i := 0; i < len(s.Nodes); i++ { - toRet = append(toRet, s.Nodes[i].ID) + out := make([][]byte, 0, len(s.Nodes)) + for _, n := range s.Nodes { + out = append(out, n.ID) } - return toRet + return out } // NodeIPs returns the dump information for node list @@ -210,10 +210,10 @@ func (s *NodeList) NodeIPs() []string { s.Mux.RLock() defer s.Mux.RUnlock() - toRet := make([]string, len(s.Nodes)) - for i := 0; i < len(s.Nodes); i++ { - toRet = append(toRet, s.Nodes[i].IP) + out := make([]string, 0, len(s.Nodes)) + for _, n := range s.Nodes { + out = append(out, n.IP) } - return toRet + return out } diff --git a/p2p/kademlia/store/cloud.go/cloud.go b/p2p/kademlia/store/cloud/cloud.go similarity index 100% rename from p2p/kademlia/store/cloud.go/cloud.go rename to p2p/kademlia/store/cloud/cloud.go diff --git a/p2p/kademlia/store/cloud.go/cloud_test.go b/p2p/kademlia/store/cloud/cloud_test.go similarity index 100% rename from p2p/kademlia/store/cloud.go/cloud_test.go rename to p2p/kademlia/store/cloud/cloud_test.go diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index 637cf1e3..eb7a968f 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -11,7 +11,7 @@ import ( "sync" "time" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud.go" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/jmoiron/sqlx" diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index c70cdca1..66a9713a 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -10,7 +10,7 @@ import ( "strings" "time" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud.go" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/utils" diff --git a/p2p/kademlia/store/sqlite/sqlite_test.go b/p2p/kademlia/store/sqlite/sqlite_test.go index ae699639..3ce6d2d2 100644 --- a/p2p/kademlia/store/sqlite/sqlite_test.go +++ b/p2p/kademlia/store/sqlite/sqlite_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud.go" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/pkg/utils" "github.com/stretchr/testify/assert" ) diff --git a/p2p/p2p.go b/p2p/p2p.go index b037044a..93afe95f 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -7,7 +7,7 @@ import ( "time" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud.go" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/meta" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/sqlite" "github.com/LumeraProtocol/supernode/v2/pkg/errors" From 2b1aa47a9c3d685a0c707f54fa7219a1d5057d7e Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Sun, 31 Aug 2025 11:40:51 +0500 Subject: [PATCH 2/2] More Fixes --- p2p/kademlia/bootstrap.go | 137 +++++++++++++++++------------------- p2p/kademlia/dht.go | 85 +++++++++++++--------- p2p/kademlia/hashtable.go | 46 ++++++++---- p2p/kademlia/network.go | 17 ++--- p2p/kademlia/node.go | 12 ++-- p2p/kademlia/replication.go | 2 +- supernode/cmd/start.go | 2 +- 7 files changed, 162 insertions(+), 139 deletions(-) diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index ad98989f..1c8c86dd 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -114,86 +114,84 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string var boostrapNodes []*Node - if s.options.LumeraClient != nil { - // Get the latest block to determine height - latestBlockResp, err := s.options.LumeraClient.Node().GetLatestBlock(ctx) - if err != nil { - return fmt.Errorf("failed to get latest block: %w", err) - } + // Get the latest block to determine height + latestBlockResp, err := s.options.LumeraClient.Node().GetLatestBlock(ctx) + if err != nil { + return fmt.Errorf("failed to get latest block: %w", err) + } - // Get the block height - blockHeight := uint64(latestBlockResp.SdkBlock.Header.Height) + // Get the block height + blockHeight := uint64(latestBlockResp.SdkBlock.Header.Height) - // Get top supernodes for this block - supernodeResp, err := s.options.LumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) - if err != nil { - return fmt.Errorf("failed to get top supernodes: %w", err) - } + // Get top supernodes for this block + supernodeResp, err := s.options.LumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight) + if err != nil { + return fmt.Errorf("failed to get top supernodes: %w", err) + } - mapNodes := map[string]*Node{} + mapNodes := map[string]*Node{} - for _, supernode := range supernodeResp.Supernodes { - // Find the latest IP address (with highest block height) - var latestIP string - var maxHeight int64 = -1 + for _, supernode := range supernodeResp.Supernodes { + // Find the latest IP address (with highest block height) + var latestIP string + var maxHeight int64 = -1 - for _, ipHistory := range supernode.PrevIpAddresses { - if ipHistory.Height > maxHeight { - maxHeight = ipHistory.Height - latestIP = ipHistory.Address - } + for _, ipHistory := range supernode.PrevIpAddresses { + if ipHistory.Height > maxHeight { + maxHeight = ipHistory.Height + latestIP = ipHistory.Address } + } - if latestIP == "" { - logtrace.Warn(ctx, "No valid IP address found for supernode", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "supernode": supernode.SupernodeAccount, - }) - continue - } - - // Extract IP from the address (remove port if present) - ip := parseSupernodeAddress(latestIP) - - // Use p2p_port from supernode record - p2pPort := defaultSuperNodeP2PPort - if supernode.P2PPort != "" { - if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil { - p2pPort = int(port) - } - } + if latestIP == "" { + logtrace.Warn(ctx, "No valid IP address found for supernode", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "supernode": supernode.SupernodeAccount, + }) + continue + } - // Create full address with p2p port for validation - fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort) + // Extract IP from the address (remove port if present) + ip := parseSupernodeAddress(latestIP) - // Parse the node from the full address - node, err := s.parseNode(fullAddress, selfAddress) - if err != nil { - logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "address": fullAddress, - "supernode": supernode.SupernodeAccount, - }) - continue + // Use p2p_port from supernode record + p2pPort := defaultSuperNodeP2PPort + if supernode.P2PPort != "" { + if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil { + p2pPort = int(port) } - - // Store the supernode account as the node ID - node.ID = []byte(supernode.SupernodeAccount) - mapNodes[fullAddress] = node } - // Convert the map to a slice - for _, node := range mapNodes { - hID, _ := utils.Blake3Hash(node.ID) - node.HashedID = hID - logtrace.Debug(ctx, "node adding", logtrace.Fields{ + // Create full address with p2p port for validation + fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort) + + // Parse the node from the full address + node, err := s.parseNode(fullAddress, selfAddress) + if err != nil { + logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{ logtrace.FieldModule: "p2p", - "node": node.String(), - "hashed_id": string(node.HashedID), + logtrace.FieldError: err.Error(), + "address": fullAddress, + "supernode": supernode.SupernodeAccount, }) - boostrapNodes = append(boostrapNodes, node) + continue } + + // Store the supernode account as the node ID + node.ID = []byte(supernode.SupernodeAccount) + mapNodes[fullAddress] = node + } + + // Convert the map to a slice + for _, node := range mapNodes { + hID, _ := utils.Blake3Hash(node.ID) + node.HashedID = hID + logtrace.Debug(ctx, "node adding", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "node": node.String(), + "hashed_id": string(node.HashedID), + }) + boostrapNodes = append(boostrapNodes, node) } if len(boostrapNodes) == 0 { @@ -232,9 +230,7 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { for _, node := range s.options.BootstrapNodes { nodeId := string(node.ID) // sync the bootstrap node only once - s.bsConnectedMtx.RLock() isConnected, exists := s.bsConnected[nodeId] - s.bsConnectedMtx.RUnlock() if exists && isConnected { continue } @@ -249,9 +245,7 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { } node := node - s.bsConnectedMtx.Lock() s.bsConnected[nodeId] = false - s.bsConnectedMtx.Unlock() wg.Add(1) go func() { defer wg.Done() @@ -268,7 +262,8 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { if err != nil { // This happening in bootstrap - so potentially other nodes not yet started // So if bootstrap failed, should try to connect to node again for next bootstrap retry - // s.cache.SetWithExpiry(addr, []byte("true"), badAddrExpiryHours*time.Hour) + // Mark this address as temporarily bad to avoid retrying immediately + s.cache.SetWithExpiry(addr, []byte("true"), badAddrExpiryHours*time.Hour) logtrace.Debug(ctx, "network call failed, sleeping 3 seconds", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -300,9 +295,7 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { continue } - s.bsConnectedMtx.Lock() s.bsConnected[nodeId] = true - s.bsConnectedMtx.Unlock() s.addNode(ctx, response.Sender) break } diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index 52059a43..bd8d5b45 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -56,9 +56,8 @@ type DHT struct { done chan struct{} // distributed hash table is done cache storage.KeyValue // store bad bootstrap addresses bsConnected map[string]bool // map of connected bootstrap nodes [identity] -> connected - bsConnectedMtx sync.RWMutex // mutex for bsConnected map supernodeAddr string // cached address from chain - mtx sync.Mutex + mtx sync.RWMutex ignorelist *BanList replicationMtx sync.RWMutex rqstore rqstore.Store @@ -173,16 +172,24 @@ func (s *DHT) getSupernodeAddress(ctx context.Context) (string, error) { // Query chain for supernode info supernodeInfo, err := s.options.LumeraClient.SuperNode().GetSupernodeWithLatestAddress(ctx, string(s.options.ID)) - if err != nil || supernodeInfo == nil { - // Fallback to local IP if chain query fails - s.supernodeAddr = s.ht.self.IP - return s.supernodeAddr, nil + if err != nil { + return "", fmt.Errorf("failed to get supernode address: %w", err) } - s.supernodeAddr = supernodeInfo.LatestAddress return supernodeInfo.LatestAddress, nil } +// getCachedSupernodeAddress returns cached supernode address without chain queries +func (s *DHT) getCachedSupernodeAddress() string { + s.mtx.RLock() + defer s.mtx.RUnlock() + + if s.supernodeAddr != "" { + return s.supernodeAddr + } + return s.ht.self.IP // fallback without chain query +} + // parseSupernodeAddress extracts the host part from a URL or address string. // It handles http/https prefixes, optional ports, and raw host:port formats. func parseSupernodeAddress(address string) string { @@ -211,6 +218,17 @@ func (s *DHT) Start(ctx context.Context) error { return fmt.Errorf("start network: %v", err) } + // Pre-fetch supernode address with generous timeout to avoid chain queries during message creation + initCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + if _, err := s.getSupernodeAddress(initCtx); err != nil { + logtrace.Warn(ctx, "Failed to pre-fetch supernode address, will use fallback", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + } + go s.StartReplicationWorker(ctx) go s.startDisabledKeysCleanupWorker(ctx) go s.startCleanupRedundantDataWorker(ctx) @@ -330,13 +348,13 @@ func (s *DHT) Retrieve(ctx context.Context, key string, localOnly ...bool) ([]by return nil, errors.Errorf("retrieve from peer: %w", err) } if len(peerValue) > 0 { - logtrace.Debug(ctx, "Not found locally, retrieved from other nodes", logtrace.Fields{ + logtrace.Info(ctx, "Not found locally, retrieved from other nodes", logtrace.Fields{ logtrace.FieldModule: "dht", "key": dbKey, "data_len": len(peerValue), }) } else { - logtrace.Debug(ctx, "Not found locally, not found in other nodes", logtrace.Fields{ + logtrace.Info(ctx, "Not found locally, not found in other nodes", logtrace.Fields{ logtrace.FieldModule: "dht", "key": dbKey, }) @@ -379,8 +397,7 @@ func (s *DHT) Stats(ctx context.Context) (map[string]interface{}, error) { // newMessage creates a new message func (s *DHT) newMessage(messageType int, receiver *Node, data interface{}) *Message { - ctx := context.Background() - supernodeAddr, _ := s.getSupernodeAddress(ctx) + supernodeAddr := s.getCachedSupernodeAddress() hostIP := parseSupernodeAddress(supernodeAddr) sender := &Node{ IP: hostIP, @@ -407,7 +424,7 @@ func (s *DHT) GetValueFromNode(ctx context.Context, target []byte, n *Node) ([]b response, err := s.network.Call(cctx, request, false) if err != nil { - logtrace.Debug(ctx, "Network call request failed", logtrace.Fields{ + logtrace.Info(ctx, "Network call request failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "request": request.String(), @@ -449,7 +466,7 @@ func (s *DHT) doMultiWorkers(ctx context.Context, iterativeType int, target []by // update the running goroutines number++ - logtrace.Debug(ctx, "Start work for node", logtrace.Fields{ + logtrace.Info(ctx, "Start work for node", logtrace.Fields{ logtrace.FieldModule: "p2p", "iterate_type": iterativeType, "node": node.String(), @@ -476,7 +493,7 @@ func (s *DHT) doMultiWorkers(ctx context.Context, iterativeType int, target []by // send the request and receive the response response, err := s.network.Call(ctx, request, false) if err != nil { - logtrace.Debug(ctx, "Network call request failed", logtrace.Fields{ + logtrace.Info(ctx, "Network call request failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "request": request.String(), @@ -660,7 +677,7 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, gctx, cancel := context.WithCancel(ctx) defer cancel() - logtrace.Debug(ctx, "Begin iterate batch get values", logtrace.Fields{ + logtrace.Info(ctx, "Begin iterate batch get values", logtrace.Fields{ logtrace.FieldModule: "dht", "txid": txID, "parallel_batches": parallelBatches, @@ -957,7 +974,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat if nl.Len() == 0 { return nil, nil } - logtrace.Debug(ctx, "Iterate start", logtrace.Fields{ + logtrace.Info(ctx, "Iterate start", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "type": iterativeType, @@ -971,7 +988,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat if iterativeType == IterateFindNode { hashedTargetID, _ := utils.Blake3Hash(target) bucket := s.ht.bucketIndex(s.ht.self.HashedID, hashedTargetID) - logtrace.Debug(ctx, "Bucket for target", logtrace.Fields{ + logtrace.Info(ctx, "Bucket for target", logtrace.Fields{ logtrace.FieldModule: "p2p", "target": sKey, }) @@ -995,7 +1012,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat // Set a maximum number of iterations to prevent indefinite looping maxIterations := 5 // Adjust the maximum iterations as needed - logtrace.Debug(ctx, "Begin iteration", logtrace.Fields{ + logtrace.Info(ctx, "Begin iteration", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "key": sKey, @@ -1006,7 +1023,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat case <-ctx.Done(): return nil, fmt.Errorf("iterate cancelled: %w", ctx.Err()) case <-timeout: - logtrace.Debug(ctx, "Iteration timed out", logtrace.Fields{ + logtrace.Info(ctx, "Iteration timed out", logtrace.Fields{ logtrace.FieldModule: "p2p", }) return nil, nil @@ -1050,7 +1067,7 @@ func (s *DHT) iterate(ctx context.Context, iterativeType int, target []byte, dat nl.Comparator = target nl.Sort() - logtrace.Debug(ctx, "Iterate sorted nodes", logtrace.Fields{ + logtrace.Info(ctx, "Iterate sorted nodes", logtrace.Fields{ logtrace.FieldModule: "p2p", "id": base58.Encode(s.ht.self.ID), "iterate": iterativeType, @@ -1107,7 +1124,7 @@ func (s *DHT) handleResponses(ctx context.Context, responses <-chan *Message, nl v, ok := response.Data.(*FindValueResponse) if ok { if v.Status.Result == ResultOk && len(v.Value) > 0 { - logtrace.Debug(ctx, "Iterate found value from network", logtrace.Fields{ + logtrace.Info(ctx, "Iterate found value from network", logtrace.Fields{ logtrace.FieldModule: "p2p", }) return nl, v.Value @@ -1152,7 +1169,7 @@ func (s *DHT) iterateFindValue(ctx context.Context, iterativeType int, target [] searchRest := false // keep track of contacted nodes so that we don't hit them again contacted := make(map[string]bool) - logtrace.Debug(ctx, "Begin iteration", logtrace.Fields{ + logtrace.Info(ctx, "Begin iteration", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "key": sKey, @@ -1161,7 +1178,7 @@ func (s *DHT) iterateFindValue(ctx context.Context, iterativeType int, target [] var closestNode *Node var iterationCount int for iterationCount = 0; iterationCount < maxIterations; iterationCount++ { - logtrace.Debug(ctx, "Begin find value", logtrace.Fields{ + logtrace.Info(ctx, "Begin find value", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "nl": nl.Len(), @@ -1181,7 +1198,7 @@ func (s *DHT) iterateFindValue(ctx context.Context, iterativeType int, target [] // if the closest node is the same as the last iteration and we don't want to search rest of nodes, we are done if !searchRest && (closestNode != nil && bytes.Equal(nl.Nodes[0].ID, closestNode.ID)) { - logtrace.Debug(ctx, "Closest node is the same as the last iteration", logtrace.Fields{ + logtrace.Info(ctx, "Closest node is the same as the last iteration", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "key": sKey, @@ -1200,7 +1217,7 @@ func (s *DHT) iterateFindValue(ctx context.Context, iterativeType int, target [] nl.Sort() - logtrace.Debug(ctx, "Iteration progress", logtrace.Fields{ + logtrace.Info(ctx, "Iteration progress", logtrace.Fields{ logtrace.FieldModule: "p2p", "task_id": taskID, "key": sKey, @@ -1255,7 +1272,7 @@ func (s *DHT) sendStoreData(ctx context.Context, n *Node, request *StoreDataRequ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { // ensure this is not itself address if bytes.Equal(node.ID, s.ht.self.ID) { - logtrace.Debug(ctx, "Trying to add itself", logtrace.Fields{ + logtrace.Info(ctx, "Trying to add itself", logtrace.Fields{ logtrace.FieldModule: "p2p", }) return nil @@ -1456,7 +1473,7 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, } if finalStoreCount >= int32(Alpha) { - logtrace.Debug(ctx, "Store data to alpha nodes success", logtrace.Fields{ + logtrace.Info(ctx, "Store data to alpha nodes success", logtrace.Fields{ logtrace.FieldModule: "dht", "task_id": taskID, "len_total_nodes": nl.Len(), @@ -1478,7 +1495,7 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, func (s *DHT) removeNode(ctx context.Context, node *Node) { // ensure this is not itself address if bytes.Equal(node.ID, s.ht.self.ID) { - logtrace.Debug(ctx, "Trying to remove itself", logtrace.Fields{ + logtrace.Info(ctx, "Trying to remove itself", logtrace.Fields{ logtrace.FieldModule: "p2p", }) return @@ -1542,7 +1559,7 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i var i int for { i++ - logtrace.Debug(ctx, "Iterate batch store begin", logtrace.Fields{ + logtrace.Info(ctx, "Iterate batch store begin", logtrace.Fields{ logtrace.FieldModule: "dht", "task_id": id, "iter": i, @@ -1710,7 +1727,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ "port": node.String(), }) if s.ignorelist.Banned(node) { - logtrace.Debug(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{ + logtrace.Info(ctx, "Ignoring banned node in batch store network call", logtrace.Fields{ logtrace.FieldModule: "dht", "node": node.String(), }) @@ -1736,7 +1753,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ totalBytes += len(values[idx]) } - logtrace.Debug(ctx, "Batch store to node", logtrace.Fields{ + logtrace.Info(ctx, "Batch store to node", logtrace.Fields{ logtrace.FieldModule: "dht", "keys": len(toStore), "size_before_compress": utils.BytesIntToMB(totalBytes), @@ -1747,7 +1764,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ response, err := s.network.Call(ctx, request, false) if err != nil { s.ignorelist.IncrementCount(receiver) - logtrace.Debug(ctx, "Network call batch store request failed", logtrace.Fields{ + logtrace.Info(ctx, "Network call batch store request failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "request": request.String(), @@ -1768,7 +1785,7 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ } func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[string]*Node, contacted map[string]bool, txid string) (chan *MessageWithError, bool) { - logtrace.Debug(ctx, "Batch find node begin", logtrace.Fields{ + logtrace.Info(ctx, "Batch find node begin", logtrace.Fields{ logtrace.FieldModule: "dht", "task_id": txid, "nodes_count": len(nodes), @@ -1827,7 +1844,7 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str } wg.Wait() close(responses) - logtrace.Debug(ctx, "Batch find node done", logtrace.Fields{ + logtrace.Info(ctx, "Batch find node done", logtrace.Fields{ logtrace.FieldModule: "dht", "nodes_count": len(nodes), "len_resp": len(responses), diff --git a/p2p/kademlia/hashtable.go b/p2p/kademlia/hashtable.go index 0dfc9478..109b6915 100644 --- a/p2p/kademlia/hashtable.go +++ b/p2p/kademlia/hashtable.go @@ -3,13 +3,12 @@ package kademlia import ( "bytes" "crypto/rand" - "encoding/hex" - "math" "math/big" "sync" "time" "github.com/LumeraProtocol/supernode/v2/pkg/errors" + "github.com/LumeraProtocol/supernode/v2/pkg/utils" ) const ( @@ -97,15 +96,22 @@ func (ht *HashTable) refreshNode(id []byte) { bucket := ht.routeTable[index] var offset int - offset = -1 + found := false // find the position of the node for i, v := range bucket { - if bytes.Equal(v.ID, id) { + if bytes.Equal(v.HashedID, id) { offset = i + found = true break } } + // Safety check: only rotate if node was actually found + if !found { + // Node not in bucket, nothing to refresh + return + } + // makes the node to the end if offset < 0 { @@ -136,7 +142,7 @@ func (ht *HashTable) randomIDFromBucket(bucket int) []byte { index := bucket / 8 var id []byte for i := 0; i < index; i++ { - id = append(id, ht.self.ID[i]) + id = append(id, ht.self.HashedID[i]) } start := bucket % 8 @@ -145,7 +151,7 @@ func (ht *HashTable) randomIDFromBucket(bucket int) []byte { for i := 0; i < 8; i++ { var bit bool if i < start { - bit = hasBit(ht.self.ID[index], uint(i)) + bit = hasBit(ht.self.HashedID[index], uint(i)) } else { nBig, _ := rand.Int(rand.Reader, big.NewInt(2)) n := nBig.Int64() @@ -153,13 +159,13 @@ func (ht *HashTable) randomIDFromBucket(bucket int) []byte { bit = n == 1 } if bit { - first += byte(math.Pow(2, float64(7-i))) + first |= 1 << (7 - i) } } id = append(id, first) // randomize each remaining byte - for i := index + 1; i < 20; i++ { + for i := index + 1; i < B/8; i++ { nBig, _ := rand.Int(rand.Reader, big.NewInt(256)) n := nBig.Int64() @@ -204,6 +210,14 @@ func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Nod ht.mutex.RLock() defer ht.mutex.RUnlock() + // Ensure target is hashed for consistent distance comparisons + var hashedTarget []byte + if len(target) != 32 { + hashedTarget, _ = utils.Blake3Hash(target) + } else { + hashedTarget = target + } + // Convert ignoredNodes slice to a map for faster lookup ignoredMap := make(map[string]bool) for _, node := range ignoredNodes { @@ -211,7 +225,7 @@ func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Nod } nl := &NodeList{ - Comparator: target, + Comparator: hashedTarget, } counter := 0 @@ -300,20 +314,26 @@ func (ht *HashTable) closestContactsWithInlcudingNode(num int, target []byte, ig ht.mutex.RLock() defer ht.mutex.RUnlock() + var hashedTarget []byte + if len(target) != 32 { + hashedTarget, _ = utils.Blake3Hash(target) + } else { + hashedTarget = target + } // Convert ignoredNodes slice to a map for faster lookup ignoredMap := make(map[string]bool) for _, node := range ignoredNodes { - ignoredMap[hex.EncodeToString(node.ID)] = true + ignoredMap[string(node.ID)] = true } nl := &NodeList{ - Comparator: target, + Comparator: hashedTarget, } // Flatten the routeTable and add nodes to nl if they're not in the ignoredMap for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[hex.EncodeToString(node.ID)] { + if !ignoredMap[string(node.ID)] { nl.AddNodes([]*Node{node}) } } @@ -338,7 +358,7 @@ func (ht *HashTable) closestContactsWithIncludingNodeList(num int, target []byte // Convert ignoredNodes slice to a map for faster lookup ignoredMap := make(map[string]bool) for _, node := range ignoredNodes { - ignoredMap[hex.EncodeToString(node.ID)] = true + ignoredMap[string(node.ID)] = true } nl := &NodeList{ diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 53324ce2..509239f0 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -620,25 +620,17 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes } // do secure handshaking - // Fast path: try get under lock s.connPoolMtx.Lock() conn, err := s.connPool.Get(remoteAddr) - s.connPoolMtx.Unlock() if err != nil { - // Slow path: build without holding the lock conn, err = NewSecureClientConn(ctx, s.clientTC, remoteAddr) if err != nil { + s.connPoolMtx.Unlock() return nil, errors.Errorf("client secure establish %q: %w", remoteAddr, err) } - s.connPoolMtx.Lock() - // another goroutine may have added meanwhile; replace safely - if _, e := s.connPool.Get(remoteAddr); e == nil { - conn.Close() - } else { - s.connPool.Add(remoteAddr, conn) - } - s.connPoolMtx.Unlock() + s.connPool.Add(remoteAddr, conn) } + s.connPoolMtx.Unlock() defer func() { if err != nil && s.clientTC != nil { @@ -661,7 +653,8 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes if err != nil { return nil, errors.Errorf("encode: %w", err) } - if _, err := conn.Write(data); err != nil { + if _, werr := conn.Write(data); werr != nil { + err = werr return nil, errors.Errorf("conn write: %w", err) } diff --git a/p2p/kademlia/node.go b/p2p/kademlia/node.go index b6050a10..7ddda663 100644 --- a/p2p/kademlia/node.go +++ b/p2p/kademlia/node.go @@ -197,9 +197,9 @@ func (s *NodeList) NodeIDs() [][]byte { s.Mux.RLock() defer s.Mux.RUnlock() - out := make([][]byte, 0, len(s.Nodes)) - for _, n := range s.Nodes { - out = append(out, n.ID) + out := make([][]byte, len(s.Nodes)) + for i := 0; i < len(s.Nodes); i++ { + out[i] = s.Nodes[i].ID } return out @@ -210,9 +210,9 @@ func (s *NodeList) NodeIPs() []string { s.Mux.RLock() defer s.Mux.RUnlock() - out := make([]string, 0, len(s.Nodes)) - for _, n := range s.Nodes { - out = append(out, n.IP) + out := make([]string, len(s.Nodes)) + for i := 0; i < len(s.Nodes); i++ { + out[i] = s.Nodes[i].IP } return out diff --git a/p2p/kademlia/replication.go b/p2p/kademlia/replication.go index f7f6f301..a444b14d 100644 --- a/p2p/kademlia/replication.go +++ b/p2p/kademlia/replication.go @@ -136,7 +136,7 @@ func (s *DHT) Replicate(ctx context.Context) { for i := 0; i < B; i++ { if time.Since(s.ht.refreshTime(i)) > defaultRefreshTime { // refresh the bucket by iterative find node - id := s.ht.randomIDFromBucket(K) + id := s.ht.randomIDFromBucket(i) if _, err := s.iterate(ctx, IterateFindNode, id, nil, 0); err != nil { logtrace.Error(ctx, "replicate iterate find node failed", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()}) } diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 4e829ff3..befaf85d 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -9,7 +9,7 @@ import ( "syscall" "github.com/LumeraProtocol/supernode/v2/p2p" - "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud.go" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/cloud" "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/store/sqlite" "github.com/LumeraProtocol/supernode/v2/pkg/codec" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace"