diff --git a/p2p/kademlia/banlist.go b/p2p/kademlia/banlist.go index d5cdcc60..6a0ecf3d 100644 --- a/p2p/kademlia/banlist.go +++ b/p2p/kademlia/banlist.go @@ -10,7 +10,7 @@ import ( const ( // banDuration - ban duration - banDuration = 3 * time.Hour + banDuration = 1 * time.Hour // threshold - number of failures required to consider a node banned. // failures before treating a node as banned. @@ -170,14 +170,15 @@ func (s *BanList) ToNodeList() []*Node { s.mtx.RLock() defer s.mtx.RUnlock() - ret := make([]*Node, 0) - + ret := make([]*Node, 0, len(s.Nodes)) for i := 0; i < len(s.Nodes); i++ { if s.Nodes[i].count > threshold { - ret = append(ret, &s.Nodes[i].Node) + + n := s.Nodes[i].Node + n.SetHashedID() + ret = append(ret, &n) } } - return ret } diff --git a/p2p/kademlia/bootstrap.go b/p2p/kademlia/bootstrap.go index b617894a..5b29f44d 100644 --- a/p2p/kademlia/bootstrap.go +++ b/p2p/kademlia/bootstrap.go @@ -3,51 +3,52 @@ package kademlia import ( "context" "fmt" + "net" + "os" "strconv" "strings" - "sync" "time" + "github.com/LumeraProtocol/supernode/v2/p2p/kademlia/domain" "github.com/LumeraProtocol/supernode/v2/pkg/errors" - "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials" ) const ( - bootstrapRetryInterval = 10 - badAddrExpiryHours = 12 - defaultSuperNodeP2PPort = 4445 + bootstrapRefreshInterval = 10 * time.Minute + defaultSuperNodeP2PPort int = 4445 ) +// seed a couple of obviously bad addrs (unless in integration tests) func (s *DHT) skipBadBootstrapAddrs() { - //skipAddress1 := fmt.Sprintf("%s:%d", "127.0.0.1", s.options.Port) - //skipAddress2 := fmt.Sprintf("%s:%d", "localhost", s.options.Port) - //s.cache.Set(skipAddress1, []byte("true")) - //s.cache.Set(skipAddress2, []byte("true")) + isTest := os.Getenv("INTEGRATION_TEST") == "true" + if isTest { + return + } + s.cache.Set(fmt.Sprintf("%s:%d", "127.0.0.1", s.options.Port), []byte("true")) + s.cache.Set(fmt.Sprintf("%s:%d", "localhost", s.options.Port), []byte("true")) } + +// parseNode parses "host[:port]" into a Node with basic address hygiene. +// Loopback/private allow-listed only in integration tests. func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { if extP2P == "" { return nil, errors.New("empty address") } - if extP2P == selfAddr { return nil, errors.New("self address") } - if _, err := s.cache.Get(extP2P); err == nil { - return nil, errors.New("configure: skip bad p2p boostrap addr") + return nil, errors.New("skip cached-bad bootstrap addr") } // Extract IP and port from the address var ip string var port uint16 - if idx := strings.LastIndex(extP2P, ":"); idx != -1 { ip = extP2P[:idx] portStr := extP2P[idx+1:] - - // If we have a port in the address, parse it if portStr != "" { portNum, err := strconv.ParseUint(portStr, 10, 16) if err != nil { @@ -56,33 +57,44 @@ func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) { port = uint16(portNum) } } else { - // No port in the address ip = extP2P - port = defaultSuperNodeP2PPort + port = uint16(defaultSuperNodeP2PPort) } - if ip == "" { return nil, errors.New("empty ip") } - // Create the node with the correct IP and port - return &Node{ - IP: ip, - Port: port, - }, nil + // Hygiene: reject non-routables unless in integration tests + isTest := os.Getenv("INTEGRATION_TEST") == "true" + if parsed := net.ParseIP(ip); parsed != nil { + if parsed.IsUnspecified() || parsed.IsLinkLocalUnicast() || parsed.IsLinkLocalMulticast() { + return nil, errors.New("non-routable address") + } + if parsed.IsLoopback() && !isTest { + return nil, errors.New("loopback not allowed") + } + if parsed.IsPrivate() && !isTest { + return nil, errors.New("private address not allowed") + } + } + + return &Node{IP: ip, Port: port}, nil } -// setBootstrapNodesFromConfigVar parses config.bootstrapNodes and sets them -// as the bootstrap nodes - As of now, this is only supposed to be used for testing +// setBootstrapNodesFromConfigVar parses CSV of Lumera addresses and fills s.options.BootstrapNodes. +// Intended for tests / controlled runs (no pings here). func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes string) error { - nodes := []*Node{} + nodes := make([]*Node, 0, 8) bsNodes := strings.Split(bootstrapNodes, ",") for _, bsNode := range bsNodes { - lumeraAddress, err := ltc.ParseLumeraAddress(bsNode) + addr := strings.TrimSpace(bsNode) + if addr == "" { + continue + } + lumeraAddress, err := ltc.ParseLumeraAddress(addr) if err != nil { return fmt.Errorf("setBootstrapNodesFromConfigVar: %w", err) } - nodes = append(nodes, &Node{ ID: []byte(lumeraAddress.Identity), IP: lumeraAddress.Host, @@ -94,320 +106,215 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes logtrace.FieldModule: "p2p", "bootstrap_nodes": nodes, }) - return nil } -// loadBootstrapCandidatesFromChain queries the chain and builds a map of candidate nodes -// keyed by their full "ip:port" address. Only active supernodes are considered and the -// latest published IP and p2p port are used. +// loadBootstrapCandidatesFromChain returns active supernodes (by latest state) +// mapped by "ip:port". No pings here. func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, error) { - // Get all supernodes - supernodeResp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx) + resp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx) if err != nil { - return nil, fmt.Errorf("failed to get top supernodes: %w", err) + return nil, fmt.Errorf("failed to list supernodes: %w", err) } - mapNodes := make(map[string]*Node, len(supernodeResp.Supernodes)) - - for _, supernode := range supernodeResp.Supernodes { - // Skip non-active supernodes - find latest state by height - if len(supernode.States) == 0 { + mapNodes := make(map[string]*Node, len(resp.Supernodes)) + for _, sn := range resp.Supernodes { + if len(sn.States) == 0 { continue } - - var latestState int32 = 0 + var latestState int32 var maxStateHeight int64 = -1 - for _, state := range supernode.States { - if state.Height > maxStateHeight { - maxStateHeight = state.Height - latestState = int32(state.State) + for _, st := range sn.States { + if st.Height > maxStateHeight { + maxStateHeight = st.Height + latestState = int32(st.State) } } - if latestState != 1 { // SuperNodeStateActive = 1 continue } - // Find the latest IP address (with highest block height) + // latest IP by height var latestIP string var maxHeight int64 = -1 - for _, ipHistory := range supernode.PrevIpAddresses { - if ipHistory.Height > maxHeight { - maxHeight = ipHistory.Height - latestIP = ipHistory.Address + for _, ipHist := range sn.PrevIpAddresses { + if ipHist.Height > maxHeight { + maxHeight = ipHist.Height + latestIP = ipHist.Address } } - if latestIP == "" { - logtrace.Warn(ctx, "No valid IP address found for supernode", logtrace.Fields{ + logtrace.Warn(ctx, "No valid IP for supernode", logtrace.Fields{ logtrace.FieldModule: "p2p", - "supernode": supernode.SupernodeAccount, + "supernode": sn.SupernodeAccount, }) continue } - - // Extract IP from the address (remove port if present) ip := parseSupernodeAddress(latestIP) - // Use p2p_port from supernode record p2pPort := defaultSuperNodeP2PPort - if supernode.P2PPort != "" { - if port, err := strconv.ParseUint(supernode.P2PPort, 10, 16); err == nil { + if sn.P2PPort != "" { + if port, err := strconv.ParseUint(sn.P2PPort, 10, 16); err == nil { p2pPort = int(port) } } - // Create full address with p2p port for validation - fullAddress := fmt.Sprintf("%s:%d", ip, p2pPort) - - // Parse the node from the full address - node, err := s.parseNode(fullAddress, selfAddress) + full := fmt.Sprintf("%s:%d", ip, p2pPort) + node, err := s.parseNode(full, selfAddress) if err != nil { - logtrace.Warn(ctx, "Skip Bad Bootstrap Address", logtrace.Fields{ + logtrace.Warn(ctx, "Skipping bootstrap candidate (bad addr)", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), - "address": fullAddress, - "supernode": supernode.SupernodeAccount, + "address": full, + "supernode": sn.SupernodeAccount, }) continue } - - // Store the supernode account as the node ID - node.ID = []byte(supernode.SupernodeAccount) - mapNodes[fullAddress] = node + node.ID = []byte(sn.SupernodeAccount) + mapNodes[full] = node } - return mapNodes, nil } -// filterResponsiveByKademliaPing sends a Kademlia Ping RPC to each node and returns -// only those that respond successfully. -func (s *DHT) filterResponsiveByKademliaPing(ctx context.Context, nodes []*Node) []*Node { - maxInFlight := 32 - if len(nodes) < maxInFlight { - maxInFlight = len(nodes) +// upsertBootstrapNode inserts/updates replication_info for the discovered node (Active=false). +// No pings or routing decisions here; the health loop will manage Active/LastSeen. +func (s *DHT) upsertBootstrapNode(ctx context.Context, n *Node) error { + now := time.Now().UTC() + exists, err := s.store.RecordExists(string(n.ID)) + if err != nil { + return fmt.Errorf("check replication record: %w", err) + } + info := domain.NodeReplicationInfo{ + ID: n.ID, + IP: n.IP, + Port: n.Port, + Active: false, // health loop flips to true when node responds + IsAdjusted: false, + UpdatedAt: now, } - if maxInFlight < 1 { - maxInFlight = 1 + if exists { + return s.store.UpdateReplicationInfo(ctx, info) } + return s.store.AddReplicationInfo(ctx, info) +} - type result struct { - node *Node - alive bool +// seedRoutingFromDB adds nodes (from replication_info) into the routing table (in-memory only). +// Cheap; improves initial graph connectivity without pings. +func (s *DHT) seedRoutingFromDB(ctx context.Context) { + repInfo, err := s.store.GetAllReplicationInfo(ctx) + if err != nil { + logtrace.Warn(ctx, "seed routing: get replication info failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + return + } + for _, ri := range repInfo { + if len(ri.ID) == 0 || ri.IP == "" || ri.Port == 0 { + continue + } + n := &Node{ID: ri.ID, IP: ri.IP, Port: ri.Port} + s.addNode(ctx, n) } +} - sem := make(chan struct{}, maxInFlight) - resCh := make(chan result, len(nodes)) - var wg sync.WaitGroup +// SyncBootstrapOnce pulls candidates (config var or chain), upserts them into replication_info, +// populates s.options.BootstrapNodes, and seeds the routing table. No pings here. +func (s *DHT) SyncBootstrapOnce(ctx context.Context, bootstrapNodes string) error { + s.skipBadBootstrapAddrs() - for _, n := range nodes { - n := n - wg.Add(1) - sem <- struct{}{} - go func() { - defer wg.Done() - defer func() { <-sem }() - req := s.newMessage(Ping, n, nil) - pctx, cancel := context.WithTimeout(ctx, defaultPingTime) - defer cancel() - _, err := s.network.Call(pctx, req, false) - if err != nil { - // penalize, but Bootstrap will still retry independently - s.ignorelist.IncrementCount(n) - logtrace.Debug(ctx, "kademlia ping failed during configure", logtrace.Fields{ + // If config var provided, prefer that (tests) + if strings.TrimSpace(bootstrapNodes) != "" { + if err := s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes); err != nil { + return err + } + for _, n := range s.options.BootstrapNodes { + if err := s.upsertBootstrapNode(ctx, n); err != nil { + logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "node": n.String(), }) - resCh <- result{node: n, alive: false} - return } - resCh <- result{node: n, alive: true} - }() - } - - wg.Wait() - close(resCh) - - filtered := make([]*Node, 0, len(nodes)) - for r := range resCh { - if r.alive { - filtered = append(filtered, r.node) } - } - return filtered -} - -// ConfigureBootstrapNodes connects with lumera client & gets p2p boostrap ip & port -func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { - if bootstrapNodes != "" { - return s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes) + s.seedRoutingFromDB(ctx) + return nil } + // From chain supernodeAddr, err := s.getSupernodeAddress(ctx) if err != nil { return fmt.Errorf("get supernode address: %s", err) } selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port) - // Load bootstrap candidates from chain - mapNodes, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) + cands, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress) if err != nil { return err } - // Build candidate list and filter only those that respond to Kademlia Ping - candidates := make([]*Node, 0, len(mapNodes)) - for _, n := range mapNodes { - candidates = append(candidates, n) - } - pingResponsive := s.filterResponsiveByKademliaPing(ctx, candidates) - if len(pingResponsive) == 0 { - logtrace.Error(ctx, "no bootstrap nodes responded to Kademlia ping", logtrace.Fields{logtrace.FieldModule: "p2p"}) - return nil - } - - for _, node := range pingResponsive { - logtrace.Info(ctx, "adding p2p bootstrap node", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "bootstap_ip": node.IP, - "bootstrap_port": node.Port, - "node_id": string(node.ID), - }) + // Upsert candidates to replication_info + seen := make(map[string]struct{}, len(cands)) + s.options.BootstrapNodes = s.options.BootstrapNodes[:0] + for _, n := range cands { + if err := s.upsertBootstrapNode(ctx, n); err != nil { + logtrace.Warn(ctx, "bootstrap upsert failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + "node": n.String(), + }) + continue + } + if _, ok := seen[string(n.ID)]; ok { + continue + } + s.options.BootstrapNodes = append(s.options.BootstrapNodes, n) + seen[string(n.ID)] = struct{}{} } - s.options.BootstrapNodes = append(s.options.BootstrapNodes, pingResponsive...) - + // Seed routing + s.seedRoutingFromDB(ctx) return nil } -// Bootstrap attempts to bootstrap the network using the BootstrapNodes provided -// to the Options struct -func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error { - if len(s.options.BootstrapNodes) == 0 { - time.AfterFunc(bootstrapRetryInterval*time.Minute, func() { - s.retryBootstrap(ctx, bootstrapNodes) - }) - - return nil - } - - var wg sync.WaitGroup - for _, node := range s.options.BootstrapNodes { - nodeId := string(node.ID) - // sync the bootstrap node only once - val, exists := s.bsConnected.Load(nodeId) - if exists { - isConnected, _ := val.(bool) - if isConnected { - continue - } - } - - addr := fmt.Sprintf("%s:%v", node.IP, node.Port) - if _, err := s.cache.Get(addr); err == nil { - logtrace.Info(ctx, "skip bad p2p boostrap addr", logtrace.Fields{ +// StartBootstrapRefresher runs SyncBootstrapOnce every 10 minutes (idempotent upserts). +// This keeps replication_info and routing table current as the validator set changes. +func (s *DHT) StartBootstrapRefresher(ctx context.Context, bootstrapNodes string) { + go func() { + // Initial sync + if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { + logtrace.Warn(ctx, "initial bootstrap sync failed", logtrace.Fields{ logtrace.FieldModule: "p2p", - "addr": addr, + logtrace.FieldError: err.Error(), }) - continue } + t := time.NewTicker(bootstrapRefreshInterval) + defer t.Stop() - node := node - s.bsConnected.Store(nodeId, false) - wg.Add(1) - go func() { - defer wg.Done() - - // new a ping request message - request := s.newMessage(Ping, node, nil) - // new a context with timeout - ctx, cancel := context.WithTimeout(ctx, defaultPingTime) - defer cancel() - - // invoke the request and handle the response - for i := 0; i < 5; i++ { - response, err := s.network.Call(ctx, request, false) - if err != nil { - // This happening in bootstrap - so potentially other nodes not yet started - // So if bootstrap failed, should try to connect to node again for next bootstrap retry - // Mark this address as temporarily bad to avoid retrying immediately - s.cache.SetWithExpiry(addr, []byte("true"), badAddrExpiryHours*time.Hour) - - logtrace.Debug(ctx, "network call failed, sleeping 3 seconds", logtrace.Fields{ + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { + logtrace.Warn(ctx, "periodic bootstrap sync failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), }) - time.Sleep(5 * time.Second) - continue - } - logtrace.Debug(ctx, "ping response", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "response": response.String(), - }) - - // add the node to the route table - logtrace.Debug(ctx, "add-node params", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "sender-id": string(response.Sender.ID), - "sender-ip": string(response.Sender.IP), - "sender-port": response.Sender.Port, - }) - - if len(response.Sender.ID) != len(s.ht.self.ID) { - logtrace.Error(ctx, "self ID && sender ID len don't match", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "sender-id": string(response.Sender.ID), - "self-id": string(s.ht.self.ID), - }) - - continue } - - s.bsConnected.Store(nodeId, true) - s.addNode(ctx, response.Sender) - break } - }() - } - - // wait until all are done - wg.Wait() - - // if it has nodes in queries route tables - if s.ht.totalCount() > 0 { - // iterative find node from the nodes - if _, err := s.iterate(ctx, IterateFindNode, s.ht.self.ID, nil, 0); err != nil { - logtrace.Error(ctx, "iterative find node failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return err } - } else { - time.AfterFunc(bootstrapRetryInterval*time.Minute, func() { - s.retryBootstrap(ctx, bootstrapNodes) - }) - } - - return nil + }() } -func (s *DHT) retryBootstrap(ctx context.Context, bootstrapNodes string) { - if err := s.ConfigureBootstrapNodes(ctx, bootstrapNodes); err != nil { - logtrace.Error(ctx, "retry failed to get bootstap ip", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - return +// ConfigureBootstrapNodes wires to the new sync/refresher (no pings here). +func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error { + // One-time sync; start refresher in the background + if err := s.SyncBootstrapOnce(ctx, bootstrapNodes); err != nil { + return err } - // join the kademlia network if bootstrap nodes is set - if err := s.Bootstrap(ctx, bootstrapNodes); err != nil { - logtrace.Error(ctx, "retry failed - bootstrap the node.", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - } + s.StartBootstrapRefresher(ctx, bootstrapNodes) + + return nil } diff --git a/p2p/kademlia/conn_pool.go b/p2p/kademlia/conn_pool.go index 80d08578..8c5876f3 100644 --- a/p2p/kademlia/conn_pool.go +++ b/p2p/kademlia/conn_pool.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "sync" + "sync/atomic" "time" "github.com/LumeraProtocol/supernode/v2/pkg/errors" @@ -24,95 +25,22 @@ type ConnPool struct { capacity int conns map[string]*connectionItem mtx sync.Mutex + metrics *PoolMetrics } // NewConnPool return a connection pool func NewConnPool(ctx context.Context) *ConnPool { + m := &PoolMetrics{} pool := &ConnPool{ capacity: defaultCapacity, conns: map[string]*connectionItem{}, + metrics: m, } + m.Capacity.Store(int64(pool.capacity)) return pool } -// Add a connection to pool -func (pool *ConnPool) Add(addr string, conn net.Conn) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - // if connection already in pool - if item, ok := pool.conns[addr]; ok { - // close the old connection - _ = item.conn.Close() - // replace in-place without triggering capacity eviction - pool.conns[addr] = &connectionItem{ - lastAccess: time.Now().UTC(), - conn: conn, - } - return - } - - // if connection not in pool - if _, ok := pool.conns[addr]; !ok { - // if pool is full - if len(pool.conns) >= pool.capacity { - oldestAccess := time.Now().UTC() - oldestAccessAddr := "" - - for addr, item := range pool.conns { - if item.lastAccess.Before(oldestAccess) { - oldestAccessAddr = addr - oldestAccess = item.lastAccess - } - } - - if oldestAccessAddr != "" { - if item, ok := pool.conns[oldestAccessAddr]; ok { - _ = item.conn.Close() - } - delete(pool.conns, oldestAccessAddr) - } - } - } - - pool.conns[addr] = &connectionItem{ - lastAccess: time.Now().UTC(), - conn: conn, - } -} - -// Get return a connection from pool -func (pool *ConnPool) Get(addr string) (net.Conn, error) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - item, ok := pool.conns[addr] - if !ok { - return nil, fmt.Errorf("not found") - } - - item.lastAccess = time.Now().UTC() - return item.conn, nil -} - -// Del remove a connection from pool -func (pool *ConnPool) Del(addr string) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - delete(pool.conns, addr) -} - -// Release all connections in pool - used when exits -func (pool *ConnPool) Release() { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - for addr, item := range pool.conns { - item.conn.Close() - delete(pool.conns, addr) - } -} - // connWrapper implements wrapper of secure connection type connWrapper struct { secureConn net.Conn @@ -150,6 +78,11 @@ func NewSecureClientConn(ctx context.Context, tc credentials.TransportCredential return nil, errors.Errorf("dial %q: %w", remoteAddress, err) } + // Disable Nagle for lower-latency small RPCs. + if tcp, ok := rawConn.(*net.TCPConn); ok { + _ = tcp.SetNoDelay(true) + } + // Clear any global deadline; per-RPC deadlines are set in Network.Call. _ = rawConn.SetDeadline(time.Time{}) @@ -168,6 +101,15 @@ func NewSecureClientConn(ctx context.Context, tc credentials.TransportCredential // NewSecureServerConn do server handshake and create a secure connection func NewSecureServerConn(_ context.Context, tc credentials.TransportCredentials, rawConn net.Conn) (net.Conn, error) { + if tcp, ok := rawConn.(*net.TCPConn); ok { + _ = tcp.SetKeepAlive(true) + _ = tcp.SetKeepAlivePeriod(2 * time.Minute) // tune: 2–5 min + } + + if tcp, ok := rawConn.(*net.TCPConn); ok { + _ = tcp.SetNoDelay(true) + } + conn, _, err := tc.ServerHandshake(rawConn) if err != nil { return nil, errors.Errorf("server secure establish failed: %w", err) @@ -235,3 +177,169 @@ func (conn *connWrapper) SetWriteDeadline(t time.Time) error { defer conn.mtx.Unlock() return conn.secureConn.SetWriteDeadline(t) } + +// optional: expose a getter +func (pool *ConnPool) MetricsSnapshot() map[string]int64 { + if pool.metrics == nil { + return map[string]int64{} + } + return pool.metrics.Snapshot() +} + +// optional: allow changing capacity (updates gauge) +func (pool *ConnPool) SetCapacity(n int) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + pool.capacity = n + if pool.metrics != nil { + pool.metrics.Capacity.Store(int64(n)) + } +} + +func (pool *ConnPool) Add(addr string, conn net.Conn) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if item, ok := pool.conns[addr]; ok { + _ = item.conn.Close() + pool.conns[addr] = &connectionItem{lastAccess: time.Now().UTC(), conn: conn} + if pool.metrics != nil { + pool.metrics.Replacements.Add(1) + // OpenCurrent unchanged (we closed one, added one) + } + return + } + + // capacity-based eviction + if len(pool.conns) >= pool.capacity { + oldestAccess := time.Now().UTC() + oldestKey := "" + for k, it := range pool.conns { + if it.lastAccess.Before(oldestAccess) { + oldestAccess = it.lastAccess + oldestKey = k + } + } + if oldestKey != "" { + if it, ok := pool.conns[oldestKey]; ok { + _ = it.conn.Close() + } + delete(pool.conns, oldestKey) + if pool.metrics != nil { + pool.metrics.EvictionsLRU.Add(1) + } + // OpenCurrent will be incremented when we add below (net +/- 0) + } + } + + pool.conns[addr] = &connectionItem{lastAccess: time.Now().UTC(), conn: conn} + if pool.metrics != nil { + pool.metrics.Adds.Add(1) + pool.metrics.OpenCurrent.Store(int64(len(pool.conns))) + } +} + +func (pool *ConnPool) Get(addr string) (net.Conn, error) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + item, ok := pool.conns[addr] + if !ok { + if pool.metrics != nil { + pool.metrics.Misses.Add(1) + } + return nil, fmt.Errorf("not found") + } + item.lastAccess = time.Now().UTC() + if pool.metrics != nil { + pool.metrics.ReuseHits.Add(1) + } + return item.conn, nil +} + +func (pool *ConnPool) Del(addr string) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + if it, ok := pool.conns[addr]; ok { + _ = it.conn.Close() + delete(pool.conns, addr) + if pool.metrics != nil { + pool.metrics.OpenCurrent.Store(int64(len(pool.conns))) + } + } +} + +func (pool *ConnPool) Release() { + pool.mtx.Lock() + defer pool.mtx.Unlock() + for addr, item := range pool.conns { + _ = item.conn.Close() + delete(pool.conns, addr) + } + if pool.metrics != nil { + pool.metrics.OpenCurrent.Store(0) + } +} + +type PoolMetrics struct { + // counters + Adds atomic.Int64 // new unique Add (not replacement) + Replacements atomic.Int64 // Add replacing an existing entry + ReuseHits atomic.Int64 // Get hit + Misses atomic.Int64 // Get miss + EvictionsLRU atomic.Int64 // capacity-based evictions + PrunedIdle atomic.Int64 // idle evictions via PruneIdle + + // gauge-ish + OpenCurrent atomic.Int64 // current open conns tracked by pool + Capacity atomic.Int64 // configured capacity +} + +func (m *PoolMetrics) Snapshot() map[string]int64 { + return map[string]int64{ + "adds_total": m.Adds.Load(), + "replacements_total": m.Replacements.Load(), + "reuse_hits_total": m.ReuseHits.Load(), + "misses_total": m.Misses.Load(), + "evictions_lru_total": m.EvictionsLRU.Load(), + "pruned_idle_total": m.PrunedIdle.Load(), + "open_current": m.OpenCurrent.Load(), + "capacity": m.Capacity.Load(), + } +} + +// PruneIdle closes conns idle for >= idleFor and removes them from the pool. +func (pool *ConnPool) PruneIdle(idleFor time.Duration) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + now := time.Now().UTC() + pruned := int64(0) + for addr, it := range pool.conns { + if now.Sub(it.lastAccess) >= idleFor { + _ = it.conn.Close() + delete(pool.conns, addr) + pruned++ + } + } + if pool.metrics != nil { + if pruned > 0 { + pool.metrics.PrunedIdle.Add(pruned) + pool.metrics.OpenCurrent.Store(int64(len(pool.conns))) + } + } +} + +// StartPruner runs PruneIdle every interval; call from Network.NewNetwork or similar. +func (pool *ConnPool) StartPruner(ctx context.Context, interval, idleFor time.Duration) { + go func() { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + pool.PruneIdle(idleFor) + } + } + }() +} diff --git a/p2p/kademlia/dht.go b/p2p/kademlia/dht.go index b5863ad9..a232beae 100644 --- a/p2p/kademlia/dht.go +++ b/p2p/kademlia/dht.go @@ -41,8 +41,8 @@ const ( delKeysCountThreshold = 10 lowSpaceThreshold = 50 // GB batchStoreSize = 2500 - storeSameSymbolsBatchConcurrency = 1 - storeSymbolsBatchConcurrency = 2.0 + storeSameSymbolsBatchConcurrency = 3 + storeSymbolsBatchConcurrency = 3.0 minimumDataStoreSuccessRate = 75.0 maxIterations = 4 @@ -635,47 +635,40 @@ func (s *DHT) fetchAndAddLocalKeys(ctx context.Context, hexKeys []string, result return count, err } -// BatchRetrieve data from the networking using keys. Keys are the base58 encoded func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, txID string, localOnly ...bool) (result map[string][]byte, err error) { - result = make(map[string][]byte) // the result of the batch retrieve - keys are b58 encoded (as received in request) - var resMap sync.Map // the result of the batch retrieve - keys are b58 encoded - var foundLocalCount int32 // the number of values found so far + result = make(map[string][]byte) + var resMap sync.Map + var foundLocalCount int32 - hexKeys := make([]string, len(keys)) // the hex keys keys[i] = hex.EncodeToString(base58.Decode(keys[i])) - globalClosestContacts := make(map[string]*NodeList) // This will store the global top 6 nodes for each symbol's hash - hashes := make([][]byte, len(keys)) // the hashes of the keys - hashes[i] = base58.Decode(keys[i]) - knownNodes := make(map[string]*Node) // This will store the nodes we know about + hexKeys := make([]string, len(keys)) + globalClosestContacts := make(map[string]*NodeList) + hashes := make([][]byte, len(keys)) + knownNodes := make(map[string]*Node) + var knownMu sync.Mutex + var closestMu sync.RWMutex defer func() { - // Transfer data from resMap to result resMap.Range(func(key, value interface{}) bool { hexKey := key.(string) valBytes := value.([]byte) - k, err := hex.DecodeString(hexKey) if err != nil { logtrace.Error(ctx, "Failed to decode hex key in resMap.Range", logtrace.Fields{ logtrace.FieldModule: "dht", - "key": hexKey, - "txid": txID, - logtrace.FieldError: err.Error(), + "key": hexKey, "txid": txID, logtrace.FieldError: err.Error(), }) return true } - result[base58.Encode(k)] = valBytes - return true }) - - for key, value := range result { - if len(value) == 0 { - delete(result, key) + for k, v := range result { + if len(v) == 0 { + delete(result, k) } } }() - // populate result map with required keys for _, key := range keys { result[key] = nil } @@ -685,7 +678,6 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, self := &Node{ID: s.ht.self.ID, IP: hostIP, Port: s.ht.self.Port} self.SetHashedID() - // populate hexKeys and hashes for i, key := range keys { decoded := base58.Decode(key) if len(decoded) != B/8 { @@ -694,52 +686,31 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, hashes[i] = decoded hexKeys[i] = hex.EncodeToString(decoded) } - logtrace.Info(ctx, "Populated keys and hashes", logtrace.Fields{ - logtrace.FieldModule: "dht", - "self": self.String(), - "txid": txID, - }) - - // Add nodes from route table to known nodes map - for _, node := range s.ht.nodes() { - n := &Node{ID: node.ID, IP: node.IP, Port: node.Port} - n.SetHashedID() - knownNodes[string(node.ID)] = n + for _, n := range s.ht.nodes() { + nn := &Node{ID: n.ID, IP: n.IP, Port: n.Port} + nn.SetHashedID() + knownNodes[string(nn.ID)] = nn } - // Calculate the local top 6 nodes for each value for i := range keys { - // Calculate the local top 6 nodes for each value - top6 := s.ht.closestContactsWithInlcudingNode(Alpha, hashes[i], s.ignorelist.ToNodeList(), nil) + top6 := s.ht.closestContactsWithIncludingNode(Alpha, hashes[i], s.ignorelist.ToNodeList(), nil) + closestMu.Lock() globalClosestContacts[keys[i]] = top6 - - s.addKnownNodes(ctx, top6.Nodes, knownNodes) + closestMu.Unlock() + s.addKnownNodesSafe(ctx, top6.Nodes, knownNodes, &knownMu) } - logtrace.Info(ctx, "Closest contacts populated, fetching local keys now", logtrace.Fields{ - logtrace.FieldModule: "dht", - "txid": txID, - }) - - // remove self from the map delete(knownNodes, string(self.ID)) foundLocalCount, err = s.fetchAndAddLocalKeys(ctx, hexKeys, &resMap, required) if err != nil { return nil, fmt.Errorf("fetch and add local keys: %v", err) } - logtrace.Info(ctx, "Batch find values count", logtrace.Fields{ - logtrace.FieldModule: "dht", - "txid": txID, - "local_found_count": foundLocalCount, - }) - if foundLocalCount >= required { return result, nil } - // We don't have enough values locally, so we need to fetch from the network batchSize := batchStoreSize var networkFound int32 totalBatches := int(math.Ceil(float64(required) / float64(batchSize))) @@ -750,128 +721,127 @@ func (s *DHT) BatchRetrieve(ctx context.Context, keys []string, required int32, gctx, cancel := context.WithCancel(ctx) defer cancel() - logtrace.Info(ctx, "Begin iterate batch get values", logtrace.Fields{ - logtrace.FieldModule: "dht", - "txid": txID, - "parallel_batches": parallelBatches, - }) - // Process in batches for start := 0; start < len(keys); start += batchSize { end := start + batchSize if end > len(keys) { end = len(keys) } - - // Check for early termination if atomic.LoadInt32(&networkFound)+int32(foundLocalCount) >= int32(required) { break } wg.Add(1) - semaphore <- struct{}{} // Acquire a semaphore slot before launching the goroutine - - go s.processBatch(gctx, keys[start:end], hexKeys[start:end], semaphore, &wg, globalClosestContacts, knownNodes, &resMap, - required, foundLocalCount, &networkFound, cancel, txID) + semaphore <- struct{}{} + go s.processBatch( + gctx, + keys[start:end], + hexKeys[start:end], + semaphore, &wg, + globalClosestContacts, + &closestMu, + knownNodes, &knownMu, + &resMap, + required, + foundLocalCount, + &networkFound, + cancel, + txID, + ) } - wg.Wait() // Wait for all goroutines to finish - logtrace.Info(ctx, "Iterate batch get values workers done", logtrace.Fields{ - logtrace.FieldModule: "dht", - "txid": txID, - }) - + wg.Wait() return result, nil } -func (s *DHT) processBatch(ctx context.Context, batchKeys []string, batchHexKeys []string, semaphore chan struct{}, wg *sync.WaitGroup, - globalClosestContacts map[string]*NodeList, knownNodes map[string]*Node, resMap *sync.Map, required int32, foundLocalCount int32, networkFound *int32, - cancel context.CancelFunc, txID string) { - +func (s *DHT) processBatch( + ctx context.Context, + batchKeys []string, + batchHexKeys []string, + semaphore chan struct{}, + wg *sync.WaitGroup, + globalClosestContacts map[string]*NodeList, + closestMu *sync.RWMutex, + knownNodes map[string]*Node, + knownMu *sync.Mutex, + resMap *sync.Map, + required int32, + foundLocalCount int32, + networkFound *int32, + cancel context.CancelFunc, + txID string, +) { defer wg.Done() defer func() { <-semaphore }() for i := 0; i < maxIterations; i++ { - // Early check if context is done to stop processing select { case <-ctx.Done(): return default: } + // Build fetch map (read globalClosestContacts under RLock) fetchMap := make(map[string][]int) for i, key := range batchKeys { - fetchNodes := globalClosestContacts[key].Nodes - for _, node := range fetchNodes { + closestMu.RLock() + nl := globalClosestContacts[key] + closestMu.RUnlock() + if nl == nil { + continue + } + for _, node := range nl.Nodes { nodeID := string(node.ID) fetchMap[nodeID] = append(fetchMap[nodeID], i) } } - // Iterate through the network to get the values for the current batch - foundCount, newClosestContacts, batchErr := s.iterateBatchGetValues(ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound)) + foundCount, newClosestContacts, batchErr := s.iterateBatchGetValues( + ctx, knownNodes, batchKeys, batchHexKeys, fetchMap, resMap, required, foundLocalCount+atomic.LoadInt32(networkFound), + ) if batchErr != nil { logtrace.Error(ctx, "Iterate batch get values failed", logtrace.Fields{ - logtrace.FieldModule: "dht", - "txid": txID, - logtrace.FieldError: batchErr.Error(), + logtrace.FieldModule: "dht", "txid": txID, logtrace.FieldError: batchErr.Error(), }) } - // Update the global counter for found values atomic.AddInt32(networkFound, int32(foundCount)) - - // Check and propagate early termination if atomic.LoadInt32(networkFound)+int32(foundLocalCount) >= int32(required) { - cancel() // Cancels the context, signaling other goroutines to stop + cancel() break } - // we now need to check if the nodes in the globalClosestContacts Map are still in the top 6 - // if not, we need to send calls to the newly found nodes to inquire about the top 6 nodes changed := false for key, nodesList := range newClosestContacts { if nodesList == nil || nodesList.Nodes == nil { continue } - if globalClosestContacts[key] == nil || globalClosestContacts[key].Nodes == nil { - logtrace.Warn(ctx, "Global contacts list doesn't have the key", logtrace.Fields{ - logtrace.FieldModule: "dht", - "key": key, - }) + closestMu.RLock() + curr := globalClosestContacts[key] + closestMu.RUnlock() + if curr == nil || curr.Nodes == nil { + logtrace.Warn(ctx, "Global contacts missing key during merge", logtrace.Fields{"key": key}) continue } - if !haveAllNodes(nodesList.Nodes, globalClosestContacts[key].Nodes) { - logtrace.Info(ctx, "Global closest contacts list changed in fetch", logtrace.Fields{ - logtrace.FieldModule: "dht", - "key": key, - "have": nodesList.String(), - "task_id": txID, - "got": globalClosestContacts[key].String(), - }) + if !haveAllNodes(nodesList.Nodes, curr.Nodes) { changed = true } - nodesList.AddNodes(globalClosestContacts[key].Nodes) + nodesList.AddNodes(curr.Nodes) nodesList.Sort() nodesList.TopN(Alpha) - s.addKnownNodes(ctx, nodesList.Nodes, knownNodes) + s.addKnownNodesSafe(ctx, nodesList.Nodes, knownNodes, knownMu) + + closestMu.Lock() globalClosestContacts[key] = nodesList + closestMu.Unlock() } if !changed { break } - - if i == maxIterations-1 { - logtrace.Warn(ctx, "Max iterations reached, still top 6 list was changed", logtrace.Fields{ - logtrace.FieldModule: "dht", - "iter": i, - "task_id": txID, - }) - } } } @@ -1345,36 +1315,20 @@ func (s *DHT) sendStoreData(ctx context.Context, n *Node, request *StoreDataRequ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { // Allow localhost for integration testing isIntegrationTest := os.Getenv("INTEGRATION_TEST") == "true" - if node.IP == "" || node.IP == "0.0.0.0" || (!isIntegrationTest && node.IP == "127.0.0.1") { - logtrace.Debug(ctx, "Trying to add invalid node", logtrace.Fields{ - logtrace.FieldModule: "p2p", - }) + logtrace.Debug(ctx, "Trying to add invalid node", logtrace.Fields{logtrace.FieldModule: "p2p"}) return nil } - - // ensure this is not itself address if bytes.Equal(node.ID, s.ht.self.ID) { - logtrace.Debug(ctx, "Trying to add itself", logtrace.Fields{ - logtrace.FieldModule: "p2p", - }) + logtrace.Debug(ctx, "Trying to add itself", logtrace.Fields{logtrace.FieldModule: "p2p"}) return nil } node.SetHashedID() - index := s.ht.bucketIndex(s.ht.self.HashedID, node.HashedID) + idx := s.ht.bucketIndex(s.ht.self.HashedID, node.HashedID) - if err := s.updateReplicationNode(ctx, node.ID, node.IP, node.Port, true); err != nil { - logtrace.Error(ctx, "Update replication node failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "node_id": string(node.ID), - "node_ip": node.IP, - logtrace.FieldError: err.Error(), - }) - } - - if s.ht.hasBucketNode(index, node.ID) { - // refresh using hashed ID to match hashtable expectations + // already in table? refresh to MRU + if s.ht.hasBucketNode(idx, node.HashedID) { s.ht.refreshNode(node.HashedID) return nil } @@ -1382,46 +1336,24 @@ func (s *DHT) addNode(ctx context.Context, node *Node) *Node { s.ht.mutex.Lock() defer s.ht.mutex.Unlock() - // 2. if the bucket is full, ping the first node - bucket := s.ht.routeTable[index] - if len(bucket) == K { - first := bucket[0] - // new a ping request message - request := s.newMessage(Ping, first, nil) - // new a context with timeout - ctx, cancel := context.WithTimeout(ctx, defaultPingTime) - defer cancel() - - // invoke the request and handle the response - _, err := s.network.Call(ctx, request, false) - if err == nil { - // refresh the node to the end of bucket - bucket = bucket[1:] - bucket = append(bucket, node) - s.ht.routeTable[index] = bucket - return nil - } else { - - // Penalize the unresponsive resident, not the new candidate - s.ignorelist.IncrementCount(first) - // the node is down, remove the node from bucket - bucket = append(bucket, node) - bucket = bucket[1:] - - // need to reset the route table with the bucket - s.ht.routeTable[index] = bucket - - return first - } - - } else { - // 3. append the node to the end of the bucket - bucket = append(bucket, node) + b := s.ht.routeTable[idx] + if len(b) < K { + s.ht.routeTable[idx] = append(b, node) + return nil } - // need to update the route table with the bucket - s.ht.routeTable[index] = bucket + // Bucket full: + lru := b[0] + // If we already know the LRU is bad, replace immediately. + if s.ignorelist.Banned(lru) { + s.ignorelist.IncrementCount(lru) // optional: nudge the counter + b[0] = node + s.ht.routeTable[idx] = b + return lru + } + // Otherwise keep the resident, drop the newcomer. + // (The periodic ping/health loop will evict bad nodes later.) return nil } @@ -1471,71 +1403,59 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, storeCount := int32(0) alphaCh := make(chan bool, Alpha) - // Start by sending the first Alpha requests in parallel + // Launch up to Alpha requests in parallel (non-banned only) + launched := 0 for i := 0; i < Alpha && i < nl.Len(); i++ { n := nl.Nodes[i] - if s.ignorelist.Banned(n) { continue } - + launched++ go func(n *Node) { - request := &StoreDataRequest{Data: data, Type: typ} response, err := s.sendStoreData(ctx, n, request) if err != nil { - errorFields := logtrace.Fields{ + logtrace.Error(ctx, "Send store data failed", logtrace.Fields{ logtrace.FieldModule: "p2p", "node": n.String(), "task_id": taskID, logtrace.FieldError: err.Error(), - } - logtrace.Error(ctx, "Send store data failed", errorFields) + }) alphaCh <- false - } else if response.Status.Result != ResultOk { - errorFields := logtrace.Fields{ + return + } + if response.Status.Result != ResultOk { + logtrace.Error(ctx, "Reply store data failed", logtrace.Fields{ logtrace.FieldModule: "p2p", "node": n.String(), "task_id": taskID, logtrace.FieldError: response.Status.ErrMsg, - } - logtrace.Error(ctx, "Reply store data failed", errorFields) + }) alphaCh <- false - } else { - atomic.AddInt32(&storeCount, 1) - alphaCh <- true + return } + atomic.AddInt32(&storeCount, 1) + alphaCh <- true }(n) } - skey, _ := utils.Blake3Hash(data) - // Collect results from parallel requests - for i := 0; i < Alpha && i < len(nl.Nodes); i++ { + // Collect only what we launched + for i := 0; i < launched; i++ { <-alphaCh - if atomic.LoadInt32(&storeCount) >= int32(Alpha) { - nl.TopN(Alpha) - return nil - } } + // If needed, continue sequentially finalStoreCount := atomic.LoadInt32(&storeCount) - // If storeCount is still < Alpha, send requests sequentially until it reaches Alpha - for i := Alpha; i < len(nl.Nodes); i++ { - if finalStoreCount >= int32(Alpha) { - break - } - + for i := Alpha; i < nl.Len() && finalStoreCount < int32(Alpha); i++ { n := nl.Nodes[i] - if s.ignorelist.Banned(n) { - logtrace.Info(ctx, "Ignore node as its continuous failed count is above threshold", logtrace.Fields{ + logtrace.Info(ctx, "Ignore banned node during sequential store", logtrace.Fields{ logtrace.FieldModule: "p2p", "node": n.String(), "task_id": taskID, }) continue } - request := &StoreDataRequest{Data: data, Type: typ} response, err := s.sendStoreData(ctx, n, request) if err != nil { @@ -1545,18 +1465,22 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, "task_id": taskID, logtrace.FieldError: err.Error(), }) - } else if response.Status.Result != ResultOk { + continue + } + if response.Status.Result != ResultOk { logtrace.Error(ctx, "Reply store data failed", logtrace.Fields{ logtrace.FieldModule: "p2p", "node": n.String(), "task_id": taskID, logtrace.FieldError: response.Status.ErrMsg, }) - } else { - finalStoreCount++ + continue } + finalStoreCount++ } + skey, _ := utils.Blake3Hash(data) + if finalStoreCount >= int32(Alpha) { logtrace.Info(ctx, "Store data to alpha nodes success", logtrace.Fields{ logtrace.FieldModule: "dht", @@ -1564,15 +1488,16 @@ func (s *DHT) storeToAlphaNodes(ctx context.Context, nl *NodeList, data []byte, "len_total_nodes": nl.Len(), "skey": hex.EncodeToString(skey), }) + nl.TopN(Alpha) return nil } + logtrace.Info(ctx, "Store data to alpha nodes failed", logtrace.Fields{ logtrace.FieldModule: "dht", "task_id": taskID, "store_count": finalStoreCount, "skey": hex.EncodeToString(skey), }) - return fmt.Errorf("store data to alpha nodes failed, only %d nodes stored", finalStoreCount) } @@ -1589,7 +1514,7 @@ func (s *DHT) removeNode(ctx context.Context, node *Node) { index := s.ht.bucketIndex(s.ht.self.HashedID, node.HashedID) - if removed := s.ht.RemoveNode(index, node.ID); !removed { + if removed := s.ht.RemoveNode(index, node.HashedID); !removed { logtrace.Error(ctx, "Remove node not found in bucket", logtrace.Fields{ logtrace.FieldModule: "p2p", "node": node.String(), @@ -1658,103 +1583,13 @@ func (s *DHT) IterateBatchStore(ctx context.Context, values [][]byte, typ int, i for i := 0; i < len(values); i++ { target, _ := utils.Blake3Hash(values[i]) hashes[i] = target - top6 := s.ht.closestContactsWithInlcudingNode(Alpha, target, s.ignorelist.ToNodeList(), nil) + top6 := s.ht.closestContactsWithIncludingNode(Alpha, target, s.ignorelist.ToNodeList(), nil) globalClosestContacts[base58.Encode(target)] = top6 // log.WithContext(ctx).WithField("top 6", top6).Info("iterate batch store begin") s.addKnownNodes(ctx, top6.Nodes, knownNodes) } - // var changed bool - // var i int - // for { - // i++ - // logtrace.Info(ctx, "Iterate batch store begin", logtrace.Fields{ - // logtrace.FieldModule: "dht", - // "task_id": id, - // "iter": i, - // "keys": len(values), - // }) - // changed = false - // localClosestNodes := make(map[string]*NodeList) - // responses, atleastOneContacted := s.batchFindNode(ctx, hashes, knownNodes, contacted, id) - - // if !atleastOneContacted { - // logtrace.Info(ctx, "Break", logtrace.Fields{ - // logtrace.FieldModule: "dht", - // }) - // break - // } - - // for response := range responses { - // if response.Error != nil { - // logtrace.Error(ctx, "Batch find node failed on a node", logtrace.Fields{ - // logtrace.FieldModule: "dht", - // "task_id": id, - // logtrace.FieldError: response.Error.Error(), - // }) - // continue - // } - - // if response.Message == nil { - // continue - // } - - // v, ok := response.Message.Data.(*BatchFindNodeResponse) - // if ok && v.Status.Result == ResultOk { - // for key, nodesList := range v.ClosestNodes { - // if nodesList != nil { - // nl, exists := localClosestNodes[key] - // if exists { - // nl.AddNodes(nodesList) - // localClosestNodes[key] = nl - // } else { - // localClosestNodes[key] = &NodeList{Nodes: nodesList, Comparator: base58.Decode(key)} - // } - - // s.addKnownNodes(ctx, nodesList, knownNodes) - // } - // } - // } - // } - - // // we now need to check if the nodes in the globalClosestContacts Map are still in the top 6 - // // if yes, we can store the data to them - // // if not, we need to send calls to the newly found nodes to inquire about the top 6 nodes - // logtrace.Info(ctx, "Check closest nodes & begin store", logtrace.Fields{ - // logtrace.FieldModule: "dht", - // "task_id": id, - // "iter": i, - // "keys": len(values), - // }) - // for key, nodesList := range localClosestNodes { - // if nodesList == nil { - // continue - // } - - // nodesList.Comparator = base58.Decode(key) - // nodesList.Sort() - // nodesList.TopN(Alpha) - // s.addKnownNodes(ctx, nodesList.Nodes, knownNodes) - - // if !haveAllNodes(nodesList.Nodes, globalClosestContacts[key].Nodes) { - // changed = true - // } - - // nodesList.AddNodes(globalClosestContacts[key].Nodes) - // nodesList.Sort() - // nodesList.TopN(Alpha) - // globalClosestContacts[key] = nodesList - // } - - // if !changed { - // break - // } - // } - - // assume at this point, we have True\Golabl top 6 nodes for each symbol's hash stored in globalClosestContacts Map - // we now need to store the data to these nodes - storageMap := make(map[string][]int) // This will store the index of the data in the values array that needs to be stored to the node for i := 0; i < len(hashes); i++ { storageNodes := globalClosestContacts[base58.Encode(hashes[i])] @@ -1880,7 +1715,10 @@ func (s *DHT) batchStoreNetwork(ctx context.Context, values [][]byte, nodes map[ request := s.newMessage(BatchStoreData, receiver, data) response, err := s.network.Call(ctx, request, false) if err != nil { - s.ignorelist.IncrementCount(receiver) + if !isLocalCancel(err) { + s.ignorelist.IncrementCount(receiver) + } + logtrace.Info(ctx, "Network call batch store request failed", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), @@ -1951,7 +1789,10 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str request := s.newMessage(BatchFindNode, receiver, data) response, err := s.network.Call(ctx, request, false) if err != nil { - s.ignorelist.IncrementCount(receiver) + if !isLocalCancel(err) { + s.ignorelist.IncrementCount(receiver) + } + logtrace.Warn(ctx, "Batch find node network call request failed", logtrace.Fields{ logtrace.FieldModule: "dht", "node": receiver.String(), @@ -1977,3 +1818,10 @@ func (s *DHT) batchFindNode(ctx context.Context, payload [][]byte, nodes map[str return responses, atleastOneContacted } + +// addKnownNodesSafe wraps addKnownNodes with a mutex to avoid concurrent writes to knownNodes. +func (s *DHT) addKnownNodesSafe(ctx context.Context, nodes []*Node, knownNodes map[string]*Node, mu *sync.Mutex) { + mu.Lock() + s.addKnownNodes(ctx, nodes, knownNodes) + mu.Unlock() +} diff --git a/p2p/kademlia/hashtable.go b/p2p/kademlia/hashtable.go index 2731b398..dc22dc07 100644 --- a/p2p/kademlia/hashtable.go +++ b/p2p/kademlia/hashtable.go @@ -52,43 +52,6 @@ type HashTable struct { refreshers []time.Time } -// NewHashTable returns a new hashtable -func NewHashTable(options *Options) (*HashTable, error) { - ht := &HashTable{ - self: &Node{ - IP: options.IP, - Port: options.Port, - }, - refreshers: make([]time.Time, B), - routeTable: make([][]*Node, B), - } - // init the id for hashtable - if options.ID != nil { - ht.self.ID = options.ID - } else { - return nil, errors.New("id is nil") - } - ht.self.SetHashedID() - - // reset the refresh time for every bucket - for i := 0; i < B; i++ { - ht.resetRefreshTime(i) - } - - return ht, nil -} - -// ensureHashedTarget normalizes a comparator target into the Kademlia ID space (32 bytes). -// If the provided target is not 32 bytes, it is hashed via Blake3; otherwise it is used as-is. -// This centralizes the "is target already a hash?" logic to prevent accidental misuse. -func ensureHashedTarget(target []byte) []byte { - if len(target) != 32 { - h, _ := utils.Blake3Hash(target) - return h - } - return target -} - // resetRefreshTime - reset the refresh time func (ht *HashTable) resetRefreshTime(bucket int) { ht.refMutex.Lock() @@ -186,166 +149,156 @@ func (ht *HashTable) randomIDFromBucket(bucket int) []byte { return id } -// hasBucketNode check if the node id is existed in the bucket -func (ht *HashTable) hasBucketNode(bucket int, id []byte) bool { +// Count returns the number of nodes in route table +func (ht *HashTable) totalCount() int { ht.mutex.RLock() defer ht.mutex.RUnlock() + var num int + for _, v := range ht.routeTable { + num += len(v) + } + return num +} + +// Simple helper function to determine the value of a particular +// bit in a byte by index +// +// number: 1 +// bits: 00000001 +// pos: 01234567 +func hasBit(n byte, pos uint) bool { + val := n & (1 << (7 - pos)) // check bits from left to right (7 - pos) + return (val > 0) +} + +// NewHashTable returns a new hashtable +func NewHashTable(options *Options) (*HashTable, error) { + ht := &HashTable{ + self: &Node{IP: options.IP, Port: options.Port}, + refreshers: make([]time.Time, B), + routeTable: make([][]*Node, B), + } + if options.ID != nil { + ht.self.ID = options.ID + } else { + return nil, errors.New("id is nil") + } + ht.self.SetHashedID() + + // init buckets with capacity K and refresh times + for i := 0; i < B; i++ { + ht.routeTable[i] = make([]*Node, 0, K) + ht.resetRefreshTime(i) + } + return ht, nil +} + +// --- identity normalization for routing distance +func ensureHashedTarget(target []byte) []byte { + if len(target) != 32 { + h, _ := utils.Blake3Hash(target) + return h + } + return target +} + +// hasBucketNode: compare on HashedID +func (ht *HashTable) hasBucketNode(bucket int, hashedID []byte) bool { + ht.mutex.RLock() + defer ht.mutex.RUnlock() for _, node := range ht.routeTable[bucket] { - if bytes.Equal(node.ID, id) { + if bytes.Equal(node.HashedID, hashedID) { return true } } - return false } -// hasNode check if the node id is exists in the hash table -func (ht *HashTable) hasNode(id []byte) bool { +// hasNode: compare on HashedID +func (ht *HashTable) hasNode(hashedID []byte) bool { ht.mutex.RLock() defer ht.mutex.RUnlock() - for _, bucket := range ht.routeTable { for _, node := range bucket { - if bytes.Equal(node.ID, id) { + if bytes.Equal(node.HashedID, hashedID) { return true } } } + return false +} +// RemoveNode: compare on HashedID +func (ht *HashTable) RemoveNode(index int, hashedID []byte) bool { + ht.mutex.Lock() + defer ht.mutex.Unlock() + bucket := ht.routeTable[index] + for i, node := range bucket { + if bytes.Equal(node.HashedID, hashedID) { + if i+1 < len(bucket) { + bucket = append(bucket[:i], bucket[i+1:]...) + } else { + bucket = bucket[:i] + } + ht.routeTable[index] = bucket + return true + } + } return false } -// closestContacts returns the closest contacts of target +// closestContacts: use HashedID in ignored-map func (ht *HashTable) closestContacts(num int, target []byte, ignoredNodes []*Node) (*NodeList, int) { ht.mutex.RLock() defer ht.mutex.RUnlock() - // Normalize target into hashed ID space (32 bytes) hashedTarget := ensureHashedTarget(target) - // Convert ignoredNodes slice to a map for faster lookup - ignoredMap := make(map[string]bool) + ignoredMap := make(map[string]bool, len(ignoredNodes)) for _, node := range ignoredNodes { - ignoredMap[string(node.ID)] = true + ignoredMap[string(node.HashedID)] = true } nl := &NodeList{Comparator: hashedTarget} - counter := 0 - // Flatten the routeTable and add nodes to nl if they're not in the ignoredMap for _, bucket := range ht.routeTable { for _, node := range bucket { counter++ - if !ignoredMap[string(node.ID)] { + if !ignoredMap[string(node.HashedID)] { nl.AddNodes([]*Node{node}) } } } - - // Sort the node list and get the top 'num' nodes nl.Sort() nl.TopN(num) - return nl, counter } -// bucketIndex return the bucket index from two node ids -func (*HashTable) bucketIndex(id1 []byte, id2 []byte) int { - // look at each byte from left to right - for j := 0; j < len(id1); j++ { - // xor the byte - xor := id1[j] ^ id2[j] - - // check each bit on the xored result from left to right in order - for i := 0; i < 8; i++ { - if hasBit(xor, uint(i)) { - byteIndex := j * 8 // Convert byte position to bit position - bitIndex := i - // Return bucket index based on position of differing bit (B - total bit position - 1) - return B - (byteIndex + bitIndex) - 1 - } - } - } - - // only happen during bootstrap ping - return 0 -} - -// Count returns the number of nodes in route table -func (ht *HashTable) totalCount() int { - ht.mutex.RLock() - defer ht.mutex.RUnlock() - - var num int - for _, v := range ht.routeTable { - num += len(v) - } - return num -} - -// nodes returns nodes in table -func (ht *HashTable) nodes() []*Node { - nodeList := []*Node{} - ht.mutex.RLock() - defer ht.mutex.RUnlock() - - for _, v := range ht.routeTable { - nodeList = append(nodeList, v...) - } - return nodeList -} - -// newRandomID returns a new random id -func newRandomID() ([]byte, error) { - id := make([]byte, 20) - _, err := rand.Read(id) - return id, err -} - -// Simple helper function to determine the value of a particular -// bit in a byte by index -// -// number: 1 -// bits: 00000001 -// pos: 01234567 -func hasBit(n byte, pos uint) bool { - val := n & (1 << (7 - pos)) // check bits from left to right (7 - pos) - return (val > 0) -} - -func (ht *HashTable) closestContactsWithInlcudingNode(num int, target []byte, ignoredNodes []*Node, includeNode *Node) *NodeList { +// keep an alias for old callers; fix typo in new name +func (ht *HashTable) closestContactsWithIncludingNode(num int, target []byte, ignoredNodes []*Node, includeNode *Node) *NodeList { ht.mutex.RLock() defer ht.mutex.RUnlock() - // Normalize target into hashed ID space (32 bytes) hashedTarget := ensureHashedTarget(target) - // Convert ignoredNodes slice to a map for faster lookup - ignoredMap := make(map[string]bool) + ignoredMap := make(map[string]bool, len(ignoredNodes)) for _, node := range ignoredNodes { - ignoredMap[string(node.ID)] = true + ignoredMap[string(node.HashedID)] = true } nl := &NodeList{Comparator: hashedTarget} - - // Flatten the routeTable and add nodes to nl if they're not in the ignoredMap for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[string(node.ID)] { + if !ignoredMap[string(node.HashedID)] { nl.AddNodes([]*Node{node}) } } } - - // Add the included node (if any) to the list if includeNode != nil { nl.AddNodes([]*Node{includeNode}) } - - // Sort the node list and get the top 'num' nodes nl.Sort() nl.TopN(num) - return nl } @@ -353,65 +306,136 @@ func (ht *HashTable) closestContactsWithIncludingNodeList(num int, target []byte ht.mutex.RLock() defer ht.mutex.RUnlock() - // Normalize target into hashed ID space (32 bytes). Callers often pass a 32-byte - // content hash already, but centralizing this avoids future misuse. hashedTarget := ensureHashedTarget(target) - - // Convert ignoredNodes slice to a map for faster lookup - ignoredMap := make(map[string]bool) + ignoredMap := make(map[string]bool, len(ignoredNodes)) for _, node := range ignoredNodes { - ignoredMap[string(node.ID)] = true + ignoredMap[string(node.HashedID)] = true } nl := &NodeList{Comparator: hashedTarget} - - // Flatten the routeTable and add nodes to nl if they're not in the ignoredMap - counter := 0 for _, bucket := range ht.routeTable { for _, node := range bucket { - if !ignoredMap[string(node.ID)] { - counter++ + if !ignoredMap[string(node.HashedID)] { nl.AddNodes([]*Node{node}) } } } - // Add the included node (if any) to the list - var includeNodeList []*Node - if nodesToInclude != nil { + if len(nodesToInclude) > 0 { for _, node := range nodesToInclude { if !nl.exists(node) { - includeNodeList = append(includeNodeList, node) + nl.AddNodes([]*Node{node}) } } - - nl.AddNodes(includeNodeList) } - // Sort the node list and get the top 'num' nodes nl.Sort() nl.TopN(num) - return nl } -// RemoveNode removes node from bucket -func (ht *HashTable) RemoveNode(index int, id []byte) bool { +// bucketIndex: guard length +func (*HashTable) bucketIndex(id1, id2 []byte) int { + if len(id1) != 32 || len(id2) != 32 { + // defensive: treat as far (top bucket) + return B - 1 + } + for j := 0; j < len(id1); j++ { + xor := id1[j] ^ id2[j] + for i := 0; i < 8; i++ { + if hasBit(xor, uint(i)) { + byteIndex := j * 8 + bitIndex := i + return B - (byteIndex + bitIndex) - 1 + } + } + } + return 0 // identical IDs +} + +// nodes: optional safe snapshot (shallow copy of slice; caller must not mutate Nodes) +func (ht *HashTable) nodes() []*Node { + ht.mutex.RLock() + defer ht.mutex.RUnlock() + total := 0 + for _, b := range ht.routeTable { + total += len(b) + } + out := make([]*Node, 0, total) + for _, b := range ht.routeTable { + out = append(out, b...) + } + return out +} + +// newRandomID: match B=256 (32 bytes) +func newRandomID() ([]byte, error) { + id := make([]byte, B/8) + _, err := rand.Read(id) + return id, err +} + +// AddOrUpdate inserts or refreshes LRU for a node (updates IP/Port on change). +func (ht *HashTable) AddOrUpdate(n *Node) { + if n == nil || len(n.ID) == 0 { + return + } + n.SetHashedID() + ht.mutex.Lock() defer ht.mutex.Unlock() - - bucket := ht.routeTable[index] - for i, node := range bucket { - if bytes.Equal(node.ID, id) { - if i+1 < len(bucket) { - bucket = append(bucket[:i], bucket[i+1:]...) - } else { - bucket = bucket[:i] - } - ht.routeTable[index] = bucket - return true + bi := ht.bucketIndex(ht.self.HashedID, n.HashedID) + b := ht.routeTable[bi] + + // Refresh if present + for i, v := range b { + if bytes.Equal(v.HashedID, n.HashedID) { + v.IP, v.Port = n.IP, n.Port // refresh coordinates + cur := b[i] + b = append(b[:i], b[i+1:]...) + b = append(b, cur) + ht.routeTable[bi] = b + ht.resetRefreshTime(bi) + return } } + // Insert if space + if len(b) < K { + ht.routeTable[bi] = append(b, n) + ht.resetRefreshTime(bi) + return + } + // Else: bucket full; caller should ping LRU and then call ReplaceLRUIf(...) +} - return false +func (ht *HashTable) LRUForBucket(n *Node) *Node { + if n == nil || len(n.HashedID) != 32 { + return nil + } + ht.mutex.RLock() + defer ht.mutex.RUnlock() + bi := ht.bucketIndex(ht.self.HashedID, n.HashedID) + if len(ht.routeTable[bi]) == 0 { + return nil + } + return ht.routeTable[bi][0] +} + +func (ht *HashTable) ReplaceLRUIf(n, lru *Node) bool { + if n == nil || lru == nil { + return false + } + n.SetHashedID() + ht.mutex.Lock() + defer ht.mutex.Unlock() + bi := ht.bucketIndex(ht.self.HashedID, n.HashedID) + b := ht.routeTable[bi] + if len(b) == 0 || !bytes.Equal(b[0].HashedID, lru.HashedID) { + return false // LRU moved meanwhile + } + b = b[1:] + b = append(b, n) + ht.routeTable[bi] = b + ht.resetRefreshTime(bi) + return true } diff --git a/p2p/kademlia/network.go b/p2p/kademlia/network.go index 1a93ca51..da409e61 100644 --- a/p2p/kademlia/network.go +++ b/p2p/kademlia/network.go @@ -40,20 +40,20 @@ var execTimeouts map[int]time.Duration func init() { // Initialize the request execution timeout values execTimeouts = map[int]time.Duration{ - // Lightweight RPCs + // Lightweight Ping: 5 * time.Second, FindNode: 15 * time.Second, BatchFindNode: 15 * time.Second, // Value lookups FindValue: 20 * time.Second, - BatchFindValues: 60 * time.Second, // large responses, often compressed - BatchGetValues: 60 * time.Second, + BatchFindValues: 90 * time.Second, + BatchGetValues: 90 * time.Second, // Data movement - StoreData: 30 * time.Second, // allow for slower links - BatchStoreData: 60 * time.Second, - Replicate: 60 * time.Second, + StoreData: 5 * time.Second, + BatchStoreData: 90 * time.Second, + Replicate: 90 * time.Second, } } @@ -86,6 +86,7 @@ func NewNetwork(ctx context.Context, dht *DHT, self *Node, clientTC, serverTC cr } // init the rate limiter s.limiter = ratelimit.New(defaultConnRate) + s.connPool.StartPruner(ctx, 10*time.Minute, 1*time.Hour) addr := fmt.Sprintf("%s:%d", self.IP, self.Port) // new tcp listener @@ -344,6 +345,9 @@ func (s *Network) handleReplicateRequest(ctx context.Context, req *ReplicateData func (s *Network) handlePing(_ context.Context, message *Message) ([]byte, error) { // new a response message resMsg := s.dht.newMessage(Ping, message.Sender, nil) + + go s.dht.addNode(context.Background(), message.Sender) + return s.encodeMesage(resMsg) } @@ -373,7 +377,7 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { defer conn.Close() - const serverReadTimeout = 60 * time.Second + const serverReadTimeout = 90 * time.Second for { select { @@ -393,11 +397,11 @@ func (s *Network) handleConn(ctx context.Context, rawConn net.Conn) { } // downgrade pure timeouts to debug to reduce noise if ne, ok := err.(net.Error); ok && ne.Timeout() { - logtrace.Debug(ctx, "Read and decode timed out", logtrace.Fields{ + logtrace.Debug(ctx, "Read and decode timed out, keeping connection open", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), }) - return + continue } logtrace.Warn(ctx, "Read and decode failed", logtrace.Fields{ logtrace.FieldModule: "p2p", @@ -580,7 +584,7 @@ func (s *Network) serve(ctx context.Context) { if max := 1 * time.Second; tempDelay > max { tempDelay = max } - logtrace.Error(ctx, "Socket accept failed, retrying", logtrace.Fields{ + logtrace.Warn(ctx, "Socket accept failed, retrying", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), "retry-in": tempDelay.String(), @@ -631,7 +635,7 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes return nil, errors.New("secure transport credentials are not set") } - // build a safe pool key (use bech32 identity format for handshaker compatibility) + // pool key: bech32@ip:port (bech32 identity is your invariant) idStr := string(request.Receiver.ID) remoteAddr := fmt.Sprintf("%s@%s:%d", idStr, strings.TrimSpace(request.Receiver.IP), request.Receiver.Port) @@ -646,11 +650,8 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes if dialErr != nil { return nil, errors.Errorf("client secure establish %q: %w", remoteAddr, dialErr) } - - // double-check someone didn't add meanwhile s.connPoolMtx.Lock() if existing, getErr := s.connPool.Get(remoteAddr); getErr == nil { - // someone added already; use existing, close ours _ = newConn.Close() conn = existing } else { @@ -660,88 +661,241 @@ func (s *Network) Call(ctx context.Context, request *Message, isLong bool) (*Mes s.connPoolMtx.Unlock() } - // Encode once outside the lock + // Encode once data, err := encode(request) if err != nil { return nil, errors.Errorf("encode: %w", err) } - // If it's our wrapper, lock the whole RPC + // Wrapper: lock whole RPC to prevent cross-talk; retry once on stale pooled socket if cw, ok := conn.(*connWrapper); ok { - var resp *Message - var rpcErr error - var mustDrop bool + return s.rpcOnceWrapper(ctx, cw, remoteAddr, data, timeout, request.MessageType) + } + + // Non-wrapper fallback: one stale retry + return s.rpcOnceNonWrapper(ctx, conn, remoteAddr, data, timeout, request.MessageType) +} + +// ---- retryable RPC helpers ------------------------------------------------- + +func (s *Network) rpcOnceWrapper(ctx context.Context, cw *connWrapper, remoteAddr string, data []byte, timeout time.Duration, msgType int) (*Message, error) { + + sizeMB := float64(len(data)) / (1024.0 * 1024.0) // data is your gob-encoded message + throughputFloor := 8.0 // MB/s (~64 Mbps) + est := time.Duration(sizeMB / throughputFloor * float64(time.Second)) + base := 1 * time.Second + cushion := 5 * time.Second + writeDL := base + est + cushion + if writeDL < 5*time.Second { + writeDL = 5 * time.Second + } + if writeDL > timeout-1*time.Second { + writeDL = timeout - 1*time.Second + } + + retried := false + for { + // lock the WHOLE RPC on a pooled wrapper cw.mtx.Lock() - { - if e := cw.secureConn.SetWriteDeadline(time.Now().Add(5 * time.Second)); e != nil { - rpcErr = errors.Errorf("set write deadline: %w", e) - mustDrop = true - } else if _, e := cw.secureConn.Write(data); e != nil { - rpcErr = errors.Errorf("conn write: %w", e) - mustDrop = true - } else if e := cw.secureConn.SetReadDeadline(time.Now().Add(timeout)); e != nil { - rpcErr = errors.Errorf("set read deadline: %w", e) - mustDrop = true - } else if r, e := decode(cw.secureConn); e != nil { - rpcErr = errors.Errorf("conn read: %w", e) - mustDrop = true - } else { - resp = r + + // write + if e := cw.secureConn.SetWriteDeadline(time.Now().Add(writeDL)); e != nil { + cw.mtx.Unlock() + s.dropFromPool(remoteAddr, cw) + return nil, errors.Errorf("set write deadline: %w", e) + } + if _, e := cw.secureConn.Write(data); e != nil { + cw.mtx.Unlock() + if isStaleConnError(e) && !retried { + logtrace.Info(ctx, "Stale pooled connection on write; redialing", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + }) + s.dropFromPool(remoteAddr, cw) + fresh, derr := NewSecureClientConn(ctx, s.clientTC, remoteAddr) + if derr != nil { + logtrace.Error(ctx, "Retry redial failed (write)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + logtrace.FieldError: derr.Error(), + }) + return nil, errors.Errorf("re-dial after write: %w", derr) + } + s.addToPool(remoteAddr, fresh) + if nw, ok := fresh.(*connWrapper); ok { + cw = nw + retried = true + continue // retry whole RPC under the new wrapper + } + // Non-wrapper fallback retry + return s.rpcOnceNonWrapper(ctx, fresh, remoteAddr, data, timeout, msgType) } - _ = cw.secureConn.SetDeadline(time.Time{}) // clear for reuse + s.dropFromPool(remoteAddr, cw) + return nil, errors.Errorf("conn write: %w", e) } - cw.mtx.Unlock() - if mustDrop { - // evict AFTER unlocking to avoid deadlock in Close() - s.connPoolMtx.Lock() - _ = conn.Close() - s.connPool.Del(remoteAddr) - s.connPoolMtx.Unlock() + // read + rdl := readDeadlineFor(msgType, timeout) + if e := cw.secureConn.SetReadDeadline(time.Now().Add(rdl)); e != nil { + cw.mtx.Unlock() + s.dropFromPool(remoteAddr, cw) + return nil, errors.Errorf("set read deadline: %w", e) } - - if rpcErr != nil { - return nil, rpcErr + r, e := decode(cw.secureConn) + _ = cw.secureConn.SetDeadline(time.Time{}) + cw.mtx.Unlock() + if e != nil { + if isStaleConnError(e) && !retried { + logtrace.Info(ctx, "Stale pooled connection on read; redialing", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + }) + s.dropFromPool(remoteAddr, cw) + fresh, derr := NewSecureClientConn(ctx, s.clientTC, remoteAddr) + if derr != nil { + logtrace.Error(ctx, "Retry redial failed (read)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + logtrace.FieldError: derr.Error(), + }) + return nil, errors.Errorf("re-dial after read: %w", derr) + } + s.addToPool(remoteAddr, fresh) + if nw, ok := fresh.(*connWrapper); ok { + cw = nw + retried = true + continue // retry whole RPC + } + return s.rpcOnceNonWrapper(ctx, fresh, remoteAddr, data, timeout, msgType) + } + s.dropFromPool(remoteAddr, cw) + return nil, errors.Errorf("conn read: %w", e) } - return resp, nil + return r, nil } +} - // Fallback: not a connWrapper (rare) - if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { - // best effort evict - s.connPoolMtx.Lock() - _ = conn.Close() - s.connPool.Del(remoteAddr) - s.connPoolMtx.Unlock() +func (s *Network) rpcOnceNonWrapper(ctx context.Context, conn net.Conn, remoteAddr string, data []byte, timeout time.Duration, msgType int) (*Message, error) { + sizeMB := float64(len(data)) / (1024.0 * 1024.0) // data is your gob-encoded message + throughputFloor := 8.0 // MB/s (~64 Mbps) + est := time.Duration(sizeMB / throughputFloor * float64(time.Second)) + base := 1 * time.Second + cushion := 5 * time.Second + + writeDL := base + est + cushion + if writeDL < 5*time.Second { + writeDL = 5 * time.Second + } + if writeDL > timeout-1*time.Second { + writeDL = timeout - 1*time.Second + } + retried := false +Retry: + if err := conn.SetWriteDeadline(time.Now().Add(writeDL)); err != nil { + s.dropFromPool(remoteAddr, conn) return nil, errors.Errorf("set write deadline: %w", err) } if _, err := conn.Write(data); err != nil { - s.connPoolMtx.Lock() - _ = conn.Close() - s.connPool.Del(remoteAddr) - s.connPoolMtx.Unlock() + if isStaleConnError(err) && !retried { + logtrace.Info(ctx, "Stale pooled connection on write; redialing", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + }) + s.dropFromPool(remoteAddr, conn) + fresh, derr := NewSecureClientConn(ctx, s.clientTC, remoteAddr) + if derr != nil { + logtrace.Error(ctx, "Retry redial failed (write)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + logtrace.FieldError: derr.Error(), + }) + return nil, errors.Errorf("re-dial after write: %w", derr) + } + s.addToPool(remoteAddr, fresh) + conn = fresh + retried = true + goto Retry + } + s.dropFromPool(remoteAddr, conn) return nil, errors.Errorf("conn write: %w", err) } - if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { - s.connPoolMtx.Lock() - _ = conn.Close() - s.connPool.Del(remoteAddr) - s.connPoolMtx.Unlock() + + rdl := readDeadlineFor(msgType, timeout) + if err := conn.SetReadDeadline(time.Now().Add(rdl)); err != nil { + s.dropFromPool(remoteAddr, conn) return nil, errors.Errorf("set read deadline: %w", err) } resp, err := decode(conn) + _ = conn.SetDeadline(time.Time{}) if err != nil { - s.connPoolMtx.Lock() - _ = conn.Close() - s.connPool.Del(remoteAddr) - s.connPoolMtx.Unlock() + if isStaleConnError(err) && !retried { + logtrace.Info(ctx, "Stale pooled connection on read; redialing", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + }) + s.dropFromPool(remoteAddr, conn) + fresh, derr := NewSecureClientConn(ctx, s.clientTC, remoteAddr) + if derr != nil { + logtrace.Error(ctx, "Retry redial failed (read)", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "remote": remoteAddr, + "message_type": msgType, + logtrace.FieldError: derr.Error(), + }) + return nil, errors.Errorf("re-dial after read: %w", derr) + } + s.addToPool(remoteAddr, fresh) + conn = fresh + retried = true + goto Retry + } + s.dropFromPool(remoteAddr, conn) return nil, errors.Errorf("conn read: %w", err) } - _ = conn.SetDeadline(time.Time{}) return resp, nil } +// classify stale pooled sockets (not timeouts) +func isStaleConnError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + s := err.Error() + switch { + case strings.Contains(s, "use of closed network connection"), + strings.Contains(s, "connection reset by peer"), + strings.Contains(s, "broken pipe"), + strings.Contains(s, "connection aborted"): + return true + default: + return false + } +} + +// small helpers for pool ops +func (s *Network) dropFromPool(key string, c net.Conn) { + s.connPoolMtx.Lock() + _ = c.Close() + s.connPool.Del(key) + s.connPoolMtx.Unlock() +} +func (s *Network) addToPool(key string, c net.Conn) { + s.connPoolMtx.Lock() + s.connPool.Add(key, c) + s.connPoolMtx.Unlock() +} + func (s *Network) handleBatchFindValues(ctx context.Context, message *Message, reqID string) (res []byte, err error) { // Try to acquire the semaphore, wait up to 1 minute logtrace.Debug(ctx, "Attempting to acquire semaphore immediately", logtrace.Fields{logtrace.FieldModule: "p2p"}) @@ -1143,9 +1297,13 @@ func (s *Network) generateResponseMessage(messageType int, receiver *Node, resul switch messageType { case StoreData, BatchStoreData: response = &StoreDataResponse{Status: responseStatus} - case FindNode, BatchFindNode: + case FindNode: + response = &FindNodeResponse{Status: responseStatus} + case BatchFindNode: response = &BatchFindNodeResponse{Status: responseStatus} - case FindValue, BatchFindValues: + case FindValue: + response = &FindValueResponse{Status: responseStatus} + case BatchFindValues: response = &BatchFindValuesResponse{Status: responseStatus} case Replicate: response = &ReplicateDataResponse{Status: responseStatus} @@ -1190,3 +1348,32 @@ func (s *Network) handlePanic(ctx context.Context, sender *Node, messageType int return nil, nil } + +func readDeadlineFor(msgType int, overall time.Duration) time.Duration { + small := 10 * time.Second + switch msgType { + case Ping, FindNode, BatchFindNode, FindValue, StoreData, BatchStoreData: + if overall > small+1*time.Second { + return small + } + return overall - 1*time.Second + default: + return overall // Bulk responses keep full budget + } +} + +func isLocalCancel(err error) bool { + if err == nil { + return false + } + // our own contexts + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return true + } + // net timeout also flows as DeadlineExceeded through some stacks – we want + // checkNodeActivity to decide; for hot path ops we do not count pure timeouts here + if ne, ok := err.(net.Error); ok && ne.Timeout() { + return true + } + return false +} diff --git a/p2p/kademlia/node.go b/p2p/kademlia/node.go index 7ddda663..b7a4baeb 100644 --- a/p2p/kademlia/node.go +++ b/p2p/kademlia/node.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math/big" "sort" "strings" "sync" @@ -47,8 +46,6 @@ type NodeList struct { Comparator []byte Mux sync.RWMutex - - debug bool } // String returns the dump information for node list @@ -63,42 +60,6 @@ func (s *NodeList) String() string { return strings.Join(nodes, ",") } -// DelNode deletes a node from list -func (s *NodeList) DelNode(node *Node) { - s.Mux.Lock() - defer s.Mux.Unlock() - - for i := 0; i < s.Len(); i++ { - if bytes.Equal(s.Nodes[i].ID, node.ID) { - newList := s.Nodes[:i] - if i+1 < s.Len() { - newList = append(newList, s.Nodes[i+1:]...) - } - - s.Nodes = newList - - return - } - } -} - -func haveAllNodes(nodesA []*Node, nodesB []*Node) bool { - for _, nodeA := range nodesA { - found := false - for _, nodeB := range nodesB { - if bytes.Equal(nodeA.ID, nodeB.ID) { - found = true - break - } - } - if !found { - return false - } - } - - return true -} - // Exists return true if the node is already there func (s *NodeList) Exists(node *Node) bool { s.Mux.RLock() @@ -107,15 +68,6 @@ func (s *NodeList) Exists(node *Node) bool { return s.exists(node) } -func (s *NodeList) exists(node *Node) bool { - for i := 0; i < len(s.Nodes); i++ { - if bytes.Equal(s.Nodes[i].ID, node.ID) { - return true - } - } - return false -} - // AddNodes appends the nodes to node list if it's not existed func (s *NodeList) AddNodes(nodes []*Node) { s.Mux.Lock() @@ -154,13 +106,6 @@ func (s *NodeList) TopN(n int) { } } -func (s *NodeList) distance(id1, id2 []byte) *big.Int { - o1 := new(big.Int).SetBytes(id1) - o2 := new(big.Int).SetBytes(id2) - - return new(big.Int).Xor(o1, o2) -} - // Sort sorts nodes func (s *NodeList) Sort() { if len(s.Comparator) == 0 { @@ -180,15 +125,20 @@ func (s *NodeList) Swap(i, j int) { } } -// Less compare two nodes func (s *NodeList) Less(i, j int) bool { - if i >= 0 && i < s.Len() && j >= 0 && j < s.Len() { - id := s.distance(s.Nodes[i].HashedID, s.Comparator) - jd := s.distance(s.Nodes[j].HashedID, s.Comparator) - - return id.Cmp(jd) == -1 + if i < 0 || j < 0 || i >= s.Len() || j >= s.Len() || len(s.Comparator) != 32 { + return false + } + ai := s.Nodes[i].HashedID + aj := s.Nodes[j].HashedID + // Compare big-endian XOR distance: (ai ^ comp) < (aj ^ comp) + for k := 0; k < 32; k++ { + di := ai[k] ^ s.Comparator[k] + dj := aj[k] ^ s.Comparator[k] + if di != dj { + return di < dj + } } - return false } @@ -217,3 +167,39 @@ func (s *NodeList) NodeIPs() []string { return out } + +func (s *NodeList) DelNode(node *Node) { + s.Mux.Lock() + defer s.Mux.Unlock() + for i := 0; i < s.Len(); i++ { + if bytes.Equal(s.Nodes[i].HashedID, node.HashedID) { + s.Nodes = append(s.Nodes[:i], s.Nodes[i+1:]...) + return + } + } +} + +func haveAllNodes(a, b []*Node) bool { + for _, x := range a { + found := false + for _, y := range b { + if bytes.Equal(x.HashedID, y.HashedID) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func (s *NodeList) exists(node *Node) bool { + for i := 0; i < len(s.Nodes); i++ { + if bytes.Equal(s.Nodes[i].HashedID, node.HashedID) { + return true + } + } + return false +} diff --git a/p2p/kademlia/node_activity.go b/p2p/kademlia/node_activity.go index 56c8411c..af278903 100644 --- a/p2p/kademlia/node_activity.go +++ b/p2p/kademlia/node_activity.go @@ -2,106 +2,140 @@ package kademlia import ( "context" + "math/rand/v2" + "sync" "time" "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/utils" ) -// checkNodeActivity keeps track of active nodes - the idea here is to ping nodes periodically and mark them as inactive if they don't respond +// checkNodeActivity keeps track of active nodes: ping periodically, mark inactive on sustained failure, +// unban/re-add on success. Uses bounded concurrency and short per-ping timeouts. func (s *DHT) checkNodeActivity(ctx context.Context) { + ticker := time.NewTicker(checkNodeActivityInterval) + defer ticker.Stop() + + const maxInflight = 32 + sem := make(chan struct{}, maxInflight) + for { select { case <-ctx.Done(): return - case <-time.After(checkNodeActivityInterval): // Adjust the interval as needed + case <-ticker.C: if !utils.CheckInternetConnectivity() { logtrace.Info(ctx, "no internet connectivity, not checking node activity", logtrace.Fields{}) - } else { - repInfo, err := s.store.GetAllReplicationInfo(ctx) - if err != nil { - logtrace.Error(ctx, "get all replicationInfo failed", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - }) - } - - for _, info := range repInfo { - // new a ping request message - node := &Node{ - ID: []byte(info.ID), - IP: info.IP, - Port: info.Port, - } + continue + } + + repInfo, err := s.store.GetAllReplicationInfo(ctx) + if err != nil { + logtrace.Error(ctx, "get all replicationInfo failed", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + }) + continue + } + + rand.Shuffle(len(repInfo), func(i, j int) { repInfo[i], repInfo[j] = repInfo[j], repInfo[i] }) - request := s.newMessage(Ping, node, nil) - - // invoke the request and handle the response - _, err := s.network.Call(ctx, request, false) - if err != nil { - logtrace.Debug(ctx, "failed to ping node", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "ip": info.IP, - "node_id": string(info.ID), - }) - if info.Active { - logtrace.Warn(ctx, "setting node to inactive", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "ip": info.IP, - "node_id": string(info.ID), - }) - - // add node to ignore list - // we maintain this list to avoid pinging nodes that are not responding - s.ignorelist.IncrementCount(node) - // remove from route table - s.removeNode(ctx, node) - - // mark node as inactive in database - if err := s.store.UpdateIsActive(ctx, string(info.ID), false, false); err != nil { - logtrace.Error(ctx, "failed to update replication info, node is inactive", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "ip": info.IP, - "node_id": string(info.ID), - }) - } - } - - } else if err == nil { - // remove node from ignore list - s.ignorelist.Delete(node) - - if !info.Active { - logtrace.Info(ctx, "node found to be active again", logtrace.Fields{ - logtrace.FieldModule: "p2p", - "ip": info.IP, - "node_id": string(info.ID), - }) - // add node adds in the route table - s.addNode(ctx, node) - if err := s.store.UpdateIsActive(ctx, string(info.ID), true, false); err != nil { - logtrace.Error(ctx, "failed to update replication info, node is inactive", logtrace.Fields{ - logtrace.FieldModule: "p2p", - logtrace.FieldError: err.Error(), - "ip": info.IP, - "node_id": string(info.ID), - }) - } - } - - if err := s.store.UpdateLastSeen(ctx, string(info.ID)); err != nil { - logtrace.Error(ctx, "failed to update last seen", logtrace.Fields{ - logtrace.FieldError: err.Error(), - "ip": info.IP, - "node_id": string(info.ID), - }) - } + var wg sync.WaitGroup + for _, info := range repInfo { + info := info // capture + wg.Add(1) + sem <- struct{}{} // acquire + go func() { + defer wg.Done() + defer func() { <-sem }() + + node := s.makeNode([]byte(info.ID), info.IP, info.Port) + + // Short per-ping timeout (fail fast) + if err := s.pingNode(ctx, node, 3*time.Second); err != nil { + s.handlePingFailure(ctx, info.Active, node, err) + return } - } + s.handlePingSuccess(ctx, info.Active, node) + }() } + wg.Wait() } } } + +// ------------- Helper Funcs -----------------------// + +func (s *DHT) makeNode(id []byte, ip string, port uint16) *Node { + n := &Node{ID: id, IP: ip, Port: port} + n.SetHashedID() + return n +} + +func (s *DHT) pingNode(ctx context.Context, n *Node, timeout time.Duration) error { + pctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + req := s.newMessage(Ping, n, nil) + _, err := s.network.Call(pctx, req, false) + return err +} + +func (s *DHT) handlePingFailure(ctx context.Context, wasActive bool, n *Node, err error) { + logtrace.Debug(ctx, "failed to ping node", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + + // increment soft-fail counter; only evict when past threshold + s.ignorelist.IncrementCount(n) + if wasActive && s.ignorelist.Banned(n) { + logtrace.Warn(ctx, "setting node to inactive", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: err.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + + s.removeNode(ctx, n) // uses HashedID internally + if uerr := s.store.UpdateIsActive(ctx, string(n.ID), false, false); uerr != nil { + logtrace.Error(ctx, "failed to update replication info, node is inactive", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } + } +} + +func (s *DHT) handlePingSuccess(ctx context.Context, wasActive bool, n *Node) { + // clear from ignorelist and ensure presence in routing + s.ignorelist.Delete(n) + + if !wasActive { + logtrace.Info(ctx, "node found to be active again", logtrace.Fields{ + logtrace.FieldModule: "p2p", + "ip": n.IP, + "node_id": string(n.ID), + }) + s.addNode(ctx, n) + if uerr := s.store.UpdateIsActive(ctx, string(n.ID), true, false); uerr != nil { + logtrace.Error(ctx, "failed to update replication info, node is active", logtrace.Fields{ + logtrace.FieldModule: "p2p", + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } + } + + if uerr := s.store.UpdateLastSeen(ctx, string(n.ID)); uerr != nil { + logtrace.Error(ctx, "failed to update last seen", logtrace.Fields{ + logtrace.FieldError: uerr.Error(), + "ip": n.IP, + "node_id": string(n.ID), + }) + } +} diff --git a/p2p/kademlia/redundant_data.go b/p2p/kademlia/redundant_data.go index 89c028ee..bfe6947d 100644 --- a/p2p/kademlia/redundant_data.go +++ b/p2p/kademlia/redundant_data.go @@ -80,7 +80,7 @@ func (s *DHT) cleanupRedundantDataWorker(ctx context.Context) { for i := 0; i < len(replicationKeys); i++ { decKey, _ := hex.DecodeString(replicationKeys[i].Key) - nodes := s.ht.closestContactsWithInlcudingNode(Alpha, decKey, ignores, self) + nodes := s.ht.closestContactsWithIncludingNode(Alpha, decKey, ignores, self) closestContactsMap[replicationKeys[i].Key] = nodes.NodeIDs() } diff --git a/p2p/kademlia/replication.go b/p2p/kademlia/replication.go index a444b14d..5163fd0b 100644 --- a/p2p/kademlia/replication.go +++ b/p2p/kademlia/replication.go @@ -173,7 +173,7 @@ func (s *DHT) Replicate(ctx context.Context) { for i := 0; i < len(replicationKeys); i++ { decKey, _ := hex.DecodeString(replicationKeys[i].Key) - closestContactsMap[replicationKeys[i].Key] = s.ht.closestContactsWithInlcudingNode(Alpha, decKey, ignores, self).NodeIDs() + closestContactsMap[replicationKeys[i].Key] = s.ht.closestContactsWithIncludingNode(Alpha, decKey, ignores, self).NodeIDs() } for _, info := range repInfo { @@ -288,7 +288,7 @@ func (s *DHT) adjustNodeKeys(ctx context.Context, from time.Time, info domain.No // get closest contacts to the key key, _ := hex.DecodeString(replicationKeys[i].Key) - nodeList := s.ht.closestContactsWithInlcudingNode(Alpha+1, key, updatedIgnored, offNode) // +1 because we want to include the node we are adjusting + nodeList := s.ht.closestContactsWithIncludingNode(Alpha+1, key, updatedIgnored, offNode) // +1 because we want to include the node we are adjusting // check if the node that is gone was supposed to hold the key if !nodeList.Exists(offNode) { // the node is not supposed to hold this key as its not in 6 closest contacts diff --git a/p2p/kademlia/store/sqlite/replication.go b/p2p/kademlia/store/sqlite/replication.go index ff7f77ba..b43b227a 100644 --- a/p2p/kademlia/store/sqlite/replication.go +++ b/p2p/kademlia/store/sqlite/replication.go @@ -309,13 +309,15 @@ func (s *Store) UpdateReplicationInfo(_ context.Context, rep domain.NodeReplicat return fmt.Errorf("invalid replication info: %v", rep) } - _, err := s.db.Exec(`UPDATE replication_info SET ip = ?, is_active = ?, is_adjusted = ?, lastReplicatedAt = ?, updatedAt =?, port = ?, last_seen = ? WHERE id = ?`, - rep.IP, rep.Active, rep.IsAdjusted, rep.LastReplicatedAt, rep.UpdatedAt, rep.Port, string(rep.ID), rep.LastSeen) + _, err := s.db.Exec(`UPDATE replication_info + SET ip = ?, is_active = ?, is_adjusted = ?, lastReplicatedAt = ?, updatedAt = ?, port = ?, last_seen = ? + WHERE id = ?`, + rep.IP, rep.Active, rep.IsAdjusted, rep.LastReplicatedAt, rep.UpdatedAt, rep.Port, rep.LastSeen, string(rep.ID)) if err != nil { return fmt.Errorf("failed to update replication info: %v", err) } - return err + return nil } // AddReplicationInfo adds replication info @@ -333,44 +335,21 @@ func (s *Store) AddReplicationInfo(_ context.Context, rep domain.NodeReplication return err } -// UpdateLastSeen updates last seen -func (s *Store) UpdateLastSeen(_ context.Context, id string) error { - _, err := s.db.Exec(`UPDATE replication_info SET last_seen = ? WHERE id = ?`, time.Now().UTC(), id) - if err != nil { - return fmt.Errorf("failed to update last_seen of node: %s: - err: %v", id, err) - } - - return err -} +const sqliteMaxVars = 900 // keep well below 999 -// UpdateLastReplicated updates replication info last replicated -func (s *Store) UpdateLastReplicated(_ context.Context, id string, t time.Time) error { - _, err := s.db.Exec(`UPDATE replication_info SET lastReplicatedAt = ?, updatedAt = ? WHERE id = ?`, t, time.Now().UTC(), id) - if err != nil { - return fmt.Errorf("failed to update last replicated: %v", err) +func chunkStrings(in []string, n int) [][]string { + if n <= 0 { + n = sqliteMaxVars } - - return err -} - -// UpdateIsAdjusted updates adjusted -func (s *Store) UpdateIsAdjusted(_ context.Context, id string, isAdjusted bool) error { - _, err := s.db.Exec(`UPDATE replication_info SET is_adjusted = ?, updatedAt = ? WHERE id = ?`, isAdjusted, time.Now().UTC(), id) - if err != nil { - return fmt.Errorf("failed to update is_adjusted of node: %s: - err: %v", id, err) - } - - return err -} - -// UpdateIsActive updates active -func (s *Store) UpdateIsActive(_ context.Context, id string, isActive bool, isAdjusted bool) error { - _, err := s.db.Exec(`UPDATE replication_info SET is_active = ?, is_adjusted = ?, updatedAt = ? WHERE id = ?`, isActive, isAdjusted, time.Now().UTC(), id) - if err != nil { - return fmt.Errorf("failed to update is_active of node: %s: - err: %v", id, err) + out := make([][]string, 0, (len(in)+n-1)/n) + for i := 0; i < len(in); i += n { + j := i + n + if j > len(in) { + j = len(in) + } + out = append(out, in[i:j]) } - - return err + return out } // RetrieveBatchNotExist returns a list of keys (hex-decoded) that do not exist in the table @@ -382,15 +361,7 @@ func (s *Store) RetrieveBatchNotExist(ctx context.Context, keys []string, batchS keyMap[keys[i]] = true } - batchCount := (len(keys) + batchSize - 1) / batchSize // Round up division - - for i := 0; i < batchCount; i++ { - start := i * batchSize - end := start + batchSize - if end > len(keys) { - end = len(keys) - } - batchKeys := keys[start:end] + for _, batchKeys := range chunkStrings(keys, sqliteMaxVars) { placeholders := make([]string, len(batchKeys)) args := make([]interface{}, len(batchKeys)) @@ -440,59 +411,94 @@ func (s Store) RetrieveBatchValues(ctx context.Context, keys []string, getFromCl return retrieveBatchValues(ctx, s.db, keys, getFromCloud, s) } -// RetrieveBatchValues returns a list of values for the given keys (hex-encoded) +// RetrieveBatchValues returns a list of values for the given keys (hex-encoded). func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFromCloud bool, s Store) ([][]byte, int, error) { - placeholders := make([]string, len(keys)) - args := make([]interface{}, len(keys)) - keyToIndex := make(map[string]int) + // Early return + if len(keys) == 0 { + return nil, 0, nil + } - for i := 0; i < len(keys); i++ { - placeholders[i] = "?" - args[i] = keys[i] - keyToIndex[keys[i]] = i + // Helper: chunk keys into slices of at most sqliteMaxVars + chunkStrings := func(in []string, n int) [][]string { + if n <= 0 { + n = sqliteMaxVars + } + out := make([][]string, 0, (len(in)+n-1)/n) + for i := 0; i < len(in); i += n { + j := i + n + if j > len(in) { + j = len(in) + } + out = append(out, in[i:j]) + } + return out } - query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s)`, strings.Join(placeholders, ",")) - rows, err := db.QueryContext(ctx, query, args...) - if err != nil { - return nil, 0, fmt.Errorf("failed to retrieve records: %w", err) + // Map key -> index in the output slice (first occurrence wins) + keyToIndex := make(map[string]int, len(keys)) + for i := range keys { + if _, exists := keyToIndex[keys[i]]; !exists { + keyToIndex[keys[i]] = i + } } - defer rows.Close() values := make([][]byte, len(keys)) - var cloudKeys []string keysFound := 0 + var cloudKeys []string - for rows.Next() { - var key string - var value []byte - var is_on_cloud bool - if err := rows.Scan(&key, &value, &is_on_cloud); err != nil { - return nil, keysFound, fmt.Errorf("failed to scan key and value: %w", err) + // Query in chunks + for _, chunk := range chunkStrings(keys, sqliteMaxVars) { + placeholders := make([]string, len(chunk)) + args := make([]interface{}, len(chunk)) + for i := range chunk { + placeholders[i] = "?" + args[i] = chunk[i] + } + + query := fmt.Sprintf(`SELECT key, data, is_on_cloud FROM data WHERE key IN (%s)`, strings.Join(placeholders, ",")) + rows, err := db.QueryContext(ctx, query, args...) + if err != nil { + return nil, keysFound, fmt.Errorf("failed to retrieve records: %w", err) } - if idx, found := keyToIndex[key]; found { - values[idx] = value - keysFound++ + for rows.Next() { + var key string + var value []byte + var is_on_cloud bool + + if err := rows.Scan(&key, &value, &is_on_cloud); err != nil { + _ = rows.Close() + return nil, keysFound, fmt.Errorf("failed to scan key and value: %w", err) + } + + if idx, found := keyToIndex[key]; found { + // Count only first-time fills (defensive against duplicates) + if len(values[idx]) == 0 { + keysFound++ + } + values[idx] = value - if s.IsCloudBackupOn() && !s.migrationStore.isSyncInProgress { - if len(value) == 0 && is_on_cloud { - cloudKeys = append(cloudKeys, key) + // Cloud-handling + access update (preserve original behavior) + if s.IsCloudBackupOn() && !s.migrationStore.isSyncInProgress { + if len(value) == 0 && is_on_cloud { + cloudKeys = append(cloudKeys, key) + } + PostAccessUpdate([]string{key}) } - PostAccessUpdate([]string{key}) } } - } - if err := rows.Err(); err != nil { - return nil, keysFound, fmt.Errorf("rows processing error: %w", err) + if err := rows.Err(); err != nil { + _ = rows.Close() + return nil, keysFound, fmt.Errorf("rows processing error: %w", err) + } + _ = rows.Close() } + // Fetch missing-but-on-cloud keys if requested if len(cloudKeys) > 0 && getFromCloud { - // Fetch from cloud cloudValues, err := s.cloud.FetchBatch(cloudKeys) if err != nil { - // log.WithContext(ctx).WithError(err).Error("failed to fetch from cloud") logtrace.Error(ctx, "failed to fetch from cloud", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), @@ -501,20 +507,24 @@ func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFro for key, value := range cloudValues { if idx, found := keyToIndex[key]; found { + // Only count if we actually fill something that was empty + if len(values[idx]) == 0 && len(value) > 0 { + keysFound++ + } values[idx] = value - keysFound++ } } + // Best-effort background store for fetched-from-cloud values go func() { + if len(cloudValues) == 0 { + return + } datList := make([][]byte, 0, len(cloudValues)) for _, v := range cloudValues { datList = append(datList, v) } - - // Store the fetched data in the local store - if err := s.StoreBatch(ctx, datList, 0, false); err != nil { - // log.WithError(err).Error("failed to store fetched data in local store") + if err := s.StoreBatch(context.Background(), datList, 0, false); err != nil { logtrace.Error(ctx, "failed to store fetched data in local store", logtrace.Fields{ logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error(), @@ -528,23 +538,24 @@ func retrieveBatchValues(ctx context.Context, db *sqlx.DB, keys []string, getFro // BatchDeleteRepKeys will delete a list of keys from the replication_keys table func (s *Store) BatchDeleteRepKeys(keys []string) error { - var placeholders []string - var arguments []interface{} - for _, key := range keys { - placeholders = append(placeholders, "?") - arguments = append(arguments, key) - } - - query := fmt.Sprintf("DELETE FROM replication_keys WHERE key IN (%s)", strings.Join(placeholders, ",")) - res, err := s.db.Exec(query, arguments...) - if err != nil { - return fmt.Errorf("cannot batch delete records from replication_keys table: %w", err) + totalAffected := int64(0) + for _, chunk := range chunkStrings(keys, sqliteMaxVars) { + ph := make([]string, len(chunk)) + args := make([]interface{}, len(chunk)) + for i := range chunk { + ph[i] = "?" + args[i] = chunk[i] + } + q := fmt.Sprintf("DELETE FROM replication_keys WHERE key IN (%s)", strings.Join(ph, ",")) + res, err := s.db.Exec(q, args...) + if err != nil { + return fmt.Errorf("batch delete: %w", err) + } + n, _ := res.RowsAffected() + totalAffected += n } - - if rowsAffected, err := res.RowsAffected(); err != nil { - return fmt.Errorf("cannot get affected rows after batch deleting records from replication_keys table: %w", err) - } else if rowsAffected == 0 { + if totalAffected == 0 { return fmt.Errorf("no record deleted (batch) from replication_keys table") } @@ -582,3 +593,150 @@ func (s *Store) IncrementAttempts(keys []string) error { // commit the transaction return tx.Commit() } + +type repJobKind int + +const ( + repJobLastSeen repJobKind = iota + repJobIsActive + repJobIsAdjusted + repJobLastReplicated +) + +type repJob struct { + kind repJobKind + id string + isActive bool + isAdjusted bool + t time.Time + ctx context.Context + done chan error +} + +type RepWriter struct { + ch chan repJob + quit chan struct{} +} + +func NewRepWriter(buf int) *RepWriter { + return &RepWriter{ + ch: make(chan repJob, buf), + quit: make(chan struct{}), + } +} + +func (w *RepWriter) Stop() { close(w.quit) } + +// startRepWriter runs a single serialized loop for replication_info updates. +func (s *Store) startRepWriter(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-s.repWriter.quit: + return + case j := <-s.repWriter.ch: + var err error + // pick a context for DB ops (prefer job ctx if non-nil) + dbctx := j.ctx + if dbctx == nil { + dbctx = context.Background() + } + switch j.kind { + case repJobLastSeen: + _, err = s.db.ExecContext(dbctx, + `UPDATE replication_info SET last_seen = ? WHERE id = ?`, + time.Now().UTC(), j.id) + + case repJobIsActive: + _, err = s.db.ExecContext(dbctx, + `UPDATE replication_info SET is_active = ?, is_adjusted = ?, updatedAt = ? WHERE id = ?`, + j.isActive, j.isAdjusted, time.Now().UTC(), j.id) + + case repJobIsAdjusted: + _, err = s.db.ExecContext(dbctx, + `UPDATE replication_info SET is_adjusted = ?, updatedAt = ? WHERE id = ?`, + j.isAdjusted, time.Now().UTC(), j.id) + + case repJobLastReplicated: + _, err = s.db.ExecContext(dbctx, + `UPDATE replication_info SET lastReplicatedAt = ?, updatedAt = ? WHERE id = ?`, + j.t, time.Now().UTC(), j.id) + } + // deliver result + if j.done != nil { + j.done <- err + } + } + } + }() +} + +// UpdateLastSeen updates last_seen for a node (serialized via writer) +func (s *Store) UpdateLastSeen(ctx context.Context, id string) error { + job := repJob{ + kind: repJobLastSeen, + id: id, + ctx: ctx, + done: make(chan error, 1), + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.repWriter.ch <- job: + } + return <-job.done +} + +// UpdateIsActive sets is_active / is_adjusted and bumps updatedAt (serialized) +func (s *Store) UpdateIsActive(ctx context.Context, id string, isActive bool, isAdjusted bool) error { + job := repJob{ + kind: repJobIsActive, + id: id, + isActive: isActive, + isAdjusted: isAdjusted, + ctx: ctx, + done: make(chan error, 1), + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.repWriter.ch <- job: + } + return <-job.done +} + +// UpdateIsAdjusted flips is_adjusted and bumps updatedAt (serialized) +func (s *Store) UpdateIsAdjusted(ctx context.Context, id string, isAdjusted bool) error { + job := repJob{ + kind: repJobIsAdjusted, + id: id, + isAdjusted: isAdjusted, + ctx: ctx, + done: make(chan error, 1), + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.repWriter.ch <- job: + } + return <-job.done +} + +// UpdateLastReplicated sets lastReplicatedAt and bumps updatedAt (serialized) +func (s *Store) UpdateLastReplicated(ctx context.Context, id string, t time.Time) error { + job := repJob{ + kind: repJobLastReplicated, + id: id, + t: t, + ctx: ctx, + done: make(chan error, 1), + } + select { + case <-ctx.Done(): + return ctx.Err() + case s.repWriter.ch <- job: + } + return <-job.done +} diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index 66a9713a..71224a57 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -21,7 +21,7 @@ import ( // Exponential backoff parameters var ( - checkpointInterval = 5 * time.Second // Checkpoint interval in seconds + checkpointInterval = 60 * time.Second // Checkpoint interval in seconds //dbLock sync.Mutex dbName = "data001.sqlite3" dbFilePath = "" @@ -53,6 +53,7 @@ type Store struct { worker *Worker cloud cloud.Storage migrationStore *MigrationMetaStore + repWriter *RepWriter } // Record is a data record @@ -70,10 +71,10 @@ type Record struct { // NewStore returns a new store func NewStore(ctx context.Context, dataDir string, cloud cloud.Storage, mst *MigrationMetaStore) (*Store, error) { worker := &Worker{ - JobQueue: make(chan Job, 500), + JobQueue: make(chan Job, 2048), quit: make(chan bool), } - + repWriter := NewRepWriter(1024) logtrace.Debug(ctx, "p2p data dir", logtrace.Fields{logtrace.FieldModule: "p2p", "data_dir": dataDir}) if _, err := os.Stat(dataDir); os.IsNotExist(err) { if err := os.MkdirAll(dataDir, 0750); err != nil { @@ -88,13 +89,14 @@ func NewStore(ctx context.Context, dataDir string, cloud cloud.Storage, mst *Mig if err != nil { return nil, fmt.Errorf("cannot open sqlite database: %w", err) } - db.SetMaxOpenConns(200) // set appropriate value - db.SetMaxIdleConns(10) // set appropriate value + db.SetMaxOpenConns(3) + db.SetMaxIdleConns(3) s := &Store{ - worker: worker, - db: db, - cloud: cloud, + worker: worker, + db: db, + cloud: cloud, + repWriter: repWriter, } if !s.checkStore() { @@ -131,25 +133,20 @@ func NewStore(ctx context.Context, dataDir string, cloud cloud.Storage, mst *Mig logtrace.Error(ctx, "URGENT! unable to create datatype column in p2p database", logtrace.Fields{logtrace.FieldError: err.Error()}) } - logtrace.Debug(ctx, "p2p database creating index on key column", logtrace.Fields{logtrace.FieldModule: "p2p"}) - _, err = db.Exec("CREATE INDEX IF NOT EXISTS idx_key ON data(key);") - if err != nil { - logtrace.Error(ctx, "URGENT! unable to create index on key column in p2p database", logtrace.Fields{logtrace.FieldError: err.Error()}) - } - logtrace.Debug(ctx, "p2p database created index on key column", logtrace.Fields{logtrace.FieldModule: "p2p"}) - _, err = db.Exec("CREATE INDEX IF NOT EXISTS idx_createdat ON data(createdAt);") if err != nil { logtrace.Error(ctx, "URGENT! unable to create index on createdAt column in p2p database", logtrace.Fields{logtrace.FieldError: err.Error()}) } + logtrace.Debug(ctx, "p2p database created index on createdAt column", logtrace.Fields{logtrace.FieldModule: "p2p"}) pragmas := []string{ "PRAGMA journal_mode=WAL;", "PRAGMA synchronous=NORMAL;", "PRAGMA cache_size=-20000;", - "PRAGMA busy_timeout=120000;", + "PRAGMA busy_timeout=15000;", "PRAGMA journal_size_limit=5242880;", + "PRAGMA wal_autocheckpoint = 1000;", } for _, pragma := range pragmas { @@ -162,9 +159,11 @@ func NewStore(ctx context.Context, dataDir string, cloud cloud.Storage, mst *Mig dbFilePath = dbFile go s.start(ctx) - // Run WAL checkpoint worker every 5 seconds + // Run WAL checkpoint worker every 60 seconds go s.startCheckpointWorker(ctx) + go s.startRepWriter(ctx) + if s.IsCloudBackupOn() { s.migrationStore = mst } @@ -423,12 +422,12 @@ func (s *Store) UpdateKeyReplication(ctx context.Context, key []byte) error { } // Retrieve will return the queries key/value if it exists -func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) { +func (s *Store) Retrieve(ctx context.Context, key []byte) ([]byte, error) { hkey := hex.EncodeToString(key) r := Record{} query := `SELECT data, is_on_cloud, is_original, datatype FROM data WHERE key = ?` - err := s.db.QueryRow(query, hkey).Scan(&r.Data, &r.IsOnCloud, &r.Isoriginal, &r.Datatype) + err := s.db.QueryRowContext(ctx, query, hkey).Scan(&r.Data, &r.IsOnCloud, &r.Isoriginal, &r.Datatype) if err != nil { return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err) } @@ -489,7 +488,7 @@ func (s *Store) performJob(j Job) error { return fmt.Errorf("failed to store batch record: %w", err) } - logtrace.Info(ctx, "successfully stored batch records", logtrace.Fields{logtrace.FieldTaskID: j.TaskID, "id": j.ReqID}) + logtrace.Debug(ctx, "successfully stored batch records", logtrace.Fields{logtrace.FieldTaskID: j.TaskID, "id": j.ReqID}) case "Update": err := s.updateKeyReplication(j.Key, j.ReplicatedAt) if err != nil { @@ -653,9 +652,9 @@ func (s *Store) deleteAll() error { } // Count the records in store -func (s *Store) Count(_ context.Context) (int, error) { +func (s *Store) Count(ctx context.Context) (int, error) { var count int - err := s.db.Get(&count, `SELECT COUNT(*) FROM data`) + err := s.db.GetContext(ctx, &count, `SELECT COUNT(*) FROM data`) if err != nil { return -1, fmt.Errorf("failed to get count of records: %w", err) } @@ -686,11 +685,16 @@ func (s *Store) Stats(ctx context.Context) (map[string]interface{}, error) { func (s *Store) Close(ctx context.Context) { s.worker.Stop() + if s.repWriter != nil { + s.repWriter.Stop() + } + if s.db != nil { if err := s.db.Close(); err != nil { logtrace.Error(ctx, "Failed to close database", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err.Error()}) } } + } // GetOwnCreatedAt func @@ -734,7 +738,7 @@ func (s *Store) GetLocalKeys(from time.Time, to time.Time) ([]string, error) { var keys []string ctx := context.Background() logtrace.Info(ctx, "getting all keys for SC", logtrace.Fields{}) - if err := s.db.Select(&keys, `SELECT key FROM data WHERE createdAt > ? and createdAt < ?`, from, to); err != nil { + if err := s.db.SelectContext(ctx, &keys, `SELECT key FROM data WHERE createdAt > ? and createdAt < ?`, from, to); err != nil { return keys, fmt.Errorf("error reading all keys from database: %w", err) } logtrace.Info(ctx, "got all keys for SC", logtrace.Fields{}) @@ -747,66 +751,54 @@ func (s *Store) BatchDeleteRecords(keys []string) error { return batchDeleteRecords(s.db, keys) } +// stringArgsToInterface converts a slice of strings to a slice of interface{} +func stringArgsToInterface(args []string) []interface{} { + iargs := make([]interface{}, len(args)) + for i, v := range args { + iargs[i] = v + } + return iargs +} + func batchDeleteRecords(db *sqlx.DB, keys []string) error { if len(keys) == 0 { - ctx := context.Background() - logtrace.Info(ctx, "no keys provided for batch delete", logtrace.Fields{logtrace.FieldModule: "p2p"}) + logtrace.Info(context.Background(), "no keys provided for batch delete", logtrace.Fields{logtrace.FieldModule: "p2p"}) return nil } - - // Create a parameter string for SQL query (?, ?, ?, ...) - paramStr := strings.Repeat("?,", len(keys)-1) + "?" - - // Create the SQL statement - query := fmt.Sprintf("DELETE FROM data WHERE key IN (%s)", paramStr) - - // Execute the query - res, err := db.Exec(query, stringArgsToInterface(keys)...) - if err != nil { - return fmt.Errorf("cannot batch delete records: %w", err) + total := int64(0) + for _, chunk := range chunkStrings(keys, sqliteMaxVars) { + paramStr := strings.Repeat("?,", len(chunk)-1) + "?" + q := fmt.Sprintf("DELETE FROM data WHERE key IN (%s)", paramStr) + res, err := db.Exec(q, stringArgsToInterface(chunk)...) + if err != nil { + return fmt.Errorf("cannot batch delete records: %w", err) + } + n, _ := res.RowsAffected() + total += n } - - // Optionally check rows affected - if rowsAffected, err := res.RowsAffected(); err != nil { - return fmt.Errorf("failed to get rows affected for batch delete: %w", err) - } else if rowsAffected == 0 { + if total == 0 { return fmt.Errorf("no rows affected for batch delete") } - return nil } -// stringArgsToInterface converts a slice of strings to a slice of interface{} -func stringArgsToInterface(args []string) []interface{} { - iargs := make([]interface{}, len(args)) - for i, v := range args { - iargs[i] = v - } - return iargs -} - func batchSetMigratedRecords(db *sqlx.DB, keys []string) error { if len(keys) == 0 { logtrace.Info(context.Background(), "no keys provided for batch update (migrated)", logtrace.Fields{logtrace.FieldModule: "p2p"}) return nil } - - // Create a parameter string for SQL query (?, ?, ?, ...) - paramStr := strings.Repeat("?,", len(keys)-1) + "?" - - // Create the SQL statement - query := fmt.Sprintf("UPDATE data set data = X'', is_on_cloud = true WHERE key IN (%s)", paramStr) - - // Execute the query - res, err := db.Exec(query, stringArgsToInterface(keys)...) - if err != nil { - return fmt.Errorf("cannot batch update records (migrated): %w", err) + total := int64(0) + for _, chunk := range chunkStrings(keys, sqliteMaxVars) { + paramStr := strings.Repeat("?,", len(chunk)-1) + "?" + q := fmt.Sprintf("UPDATE data SET data = X'', is_on_cloud = true WHERE key IN (%s)", paramStr) + res, err := db.Exec(q, stringArgsToInterface(chunk)...) + if err != nil { + return fmt.Errorf("cannot batch update records (migrated): %w", err) + } + n, _ := res.RowsAffected() + total += n } - - // Optionally check rows affected - if rowsAffected, err := res.RowsAffected(); err != nil { - return fmt.Errorf("failed to get rows affected for batch update(migrated): %w", err) - } else if rowsAffected == 0 { + if total == 0 { return fmt.Errorf("no rows affected for batch update (migrated)") } diff --git a/p2p/kademlia/store/sqlite/sqlite_test.go b/p2p/kademlia/store/sqlite/sqlite_test.go index 3ce6d2d2..2f840867 100644 --- a/p2p/kademlia/store/sqlite/sqlite_test.go +++ b/p2p/kademlia/store/sqlite/sqlite_test.go @@ -182,5 +182,7 @@ func TestStore(t *testing.T) { os.Remove("data001-migration-meta.sqlite3") os.Remove("data001.sqlite3-shm") os.Remove("data001.sqlite3-wal") + os.Remove("data001-migration-meta.sqlite3-shm") + os.Remove("data001-migration-meta.sqlite3-wal") } diff --git a/p2p/p2p.go b/p2p/p2p.go index 93afe95f..cb48bf99 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -93,13 +93,6 @@ func (s *p2p) run(ctx context.Context) error { logtrace.Error(ctx, "failed to configure bootstrap nodes", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err}) logtrace.Error(ctx, "failed to get bootstap ip", logtrace.Fields{logtrace.FieldModule: "p2p", logtrace.FieldError: err}) } - - // join the kademlia network if bootstrap nodes is set - if err := s.dht.Bootstrap(ctx, s.config.BootstrapNodes); err != nil { - // stop the node for kademlia network - s.dht.Stop(ctx) - return errors.Errorf("bootstrap the node: %w", err) - } s.running = true logtrace.Info(ctx, "p2p service is started", logtrace.Fields{})