Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Testing Guide

This repo has two main kinds of tests: integration (unit-ish with multiple components) and system (end‑to‑end with local supernodes + a local chain).

## Quick Start

- P2P integration test (single process):
- `go test ./tests/integration/p2p -timeout 10m -run ^TestP2PBasicIntegration$ -v`

- System cascade E2E (spawns local supernodes and a local chain):
- `make setup-supernodes`
- `make test-cascade`

For a full system suite, use `make test-e2e`.

## Local/Loopback Networking In Tests

Production nodes reject loopback/private addresses when discovering peers. Tests run everything on localhost, so the P2P layer must accept local addresses during tests.

We centralize this behavior behind an environment flag used by the tests themselves:

- `INTEGRATION_TEST=true`
- When set, the P2P layer allows peers on `127.0.0.1` and hostnames like `localhost`.
- This applies to neighbor discovery and to the sender address on outbound P2P messages.
- The normal production behavior (rejecting unspecified/loopback/private addresses) remains unchanged when the flag is not set.

Where this is used:

- `p2p/kademlia/dht.go`
- `addKnownNodes`: Permits loopback/private/localhost peers when `INTEGRATION_TEST=true`.
- `newMessage`: Chooses a safe sender IP. In test mode, falls back to `127.0.0.1` when no public IP is available.

You do not need to export `INTEGRATION_TEST` manually for system tests; the tests set/unset it around execution where needed.

## System Test Supernode Layout

System tests use three local supernodes prepared under `tests/system`:

- Configs: `config.test-1.yml`, `config.test-2.yml`, `config.test-3.yml`
- Supernode listen hosts: `0.0.0.0`
- P2P ports: `4445`, `4447`, `4449` (paired with gRPC ports `4444`, `4446`, `4448`)
- Lumera gRPC: `localhost:9090` (local chain started by tests)

- Setup helper: `make setup-supernodes`
- Builds a `supernode` binary into `tests/system/supernode-data*`
- Copies the matching yaml config + test keyrings

- Runtime helper: `StartAllSupernodes` in `tests/system/supernode-utils.go`
- Launches the three supernodes using their config directories

## Troubleshooting

- “unable to fetch bootstrap IP addresses. No valid supernodes found.”
- Ensure the system tests are starting the local chain and supernodes first (use the Make targets).
- Confirm no port conflicts on `4444–4449` and that the processes are running.
- The `INTEGRATION_TEST` flag is already managed by tests; no extra setup required.

- P2P integration test flakiness
- This test brings up multiple P2P instances in‑process. Give it up to 10 minutes (`-timeout 10m`) on slow machines.

## Design Notes

We intentionally keep the test‑environment override behind a single env var to avoid widening production configuration surface. If we later want to move this into YAML, we can add a boolean like `p2p.allow_local_addresses: true` and thread it into the DHT options—but for now the env‑based switch keeps runtime logic minimal and isolated to tests.

48 changes: 34 additions & 14 deletions p2p/kademlia/banlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ const (
// banDuration - ban duration
banDuration = 3 * time.Hour

// threshold - threshold
threshold = 3
// threshold - number of failures required to consider a node banned.
// failures before treating a node as banned.
threshold = 1
)

// BanNode is the over-the-wire representation of a node
Expand All @@ -34,6 +35,14 @@ func (s *BanList) Add(node *Node) {
s.mtx.Lock()
defer s.mtx.Unlock()

// If already exists, just increment count instead of duplicating
for i := range s.Nodes {
if bytes.Equal(s.Nodes[i].ID, node.ID) {
s.Nodes[i].count++
return
}
}

banNode := BanNode{
Node: Node{
ID: node.ID,
Expand Down Expand Up @@ -82,13 +91,16 @@ func (s *BanList) Banned(node *Node) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

maxCount := -1
for _, item := range s.Nodes {
if bytes.Equal(item.ID, node.ID) {
return item.count > threshold
if item.count > maxCount {
maxCount = item.count
}
}
}

return false
return maxCount > threshold
}

// Exists return true if the node is already there
Expand All @@ -110,18 +122,13 @@ func (s *BanList) Delete(node *Node) {
s.mtx.Lock()
defer s.mtx.Unlock()

l := len(s.Nodes)
for i := 0; i < l; i++ {
if bytes.Equal(s.Nodes[i].ID, node.ID) {
newNodes := s.Nodes[:i]
if i+1 < l {
newNodes = append(newNodes, s.Nodes[i+1:]...)
}
s.Nodes = newNodes

return
filtered := s.Nodes[:0]
for _, it := range s.Nodes {
if !bytes.Equal(it.ID, node.ID) {
filtered = append(filtered, it)
}
}
s.Nodes = filtered
}

// Purge removes all expired nodes from the ban list
Expand Down Expand Up @@ -179,6 +186,19 @@ func (s *BanList) AddWithCreatedAt(node *Node, createdAt time.Time, count int) {
s.mtx.Lock()
defer s.mtx.Unlock()

// If exists, update in-place using the stronger ban and earliest createdAt
for i := range s.Nodes {
if bytes.Equal(s.Nodes[i].ID, node.ID) {
if createdAt.Before(s.Nodes[i].CreatedAt) {
s.Nodes[i].CreatedAt = createdAt
}
if count > s.Nodes[i].count {
s.Nodes[i].count = count
}
return
}
}

banNode := BanNode{
Node: Node{
ID: node.ID,
Expand Down
168 changes: 119 additions & 49 deletions p2p/kademlia/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/LumeraProtocol/supernode/v2/pkg/errors"
"github.com/LumeraProtocol/supernode/v2/pkg/utils"

"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
ltc "github.com/LumeraProtocol/supernode/v2/pkg/net/credentials"
Expand All @@ -27,7 +26,6 @@ func (s *DHT) skipBadBootstrapAddrs() {
//s.cache.Set(skipAddress1, []byte("true"))
//s.cache.Set(skipAddress2, []byte("true"))
}

func (s *DHT) parseNode(extP2P string, selfAddr string) (*Node, error) {
if extP2P == "" {
return nil, errors.New("empty address")
Expand Down Expand Up @@ -100,42 +98,40 @@ func (s *DHT) setBootstrapNodesFromConfigVar(ctx context.Context, bootstrapNodes
return nil
}

// ConfigureBootstrapNodes connects with lumera client & gets p2p boostrap ip & port
func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error {
if bootstrapNodes != "" {
return s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes)
}

supernodeAddr, err := s.getSupernodeAddress(ctx)
// loadBootstrapCandidatesFromChain queries the chain and builds a map of candidate nodes
// keyed by their full "ip:port" address. Only active supernodes are considered and the
// latest published IP and p2p port are used.
func (s *DHT) loadBootstrapCandidatesFromChain(ctx context.Context, selfAddress string) (map[string]*Node, error) {
// Get all supernodes
supernodeResp, err := s.options.LumeraClient.SuperNode().ListSuperNodes(ctx)
if err != nil {
return fmt.Errorf("get supernode address: %s", err)
return nil, fmt.Errorf("failed to get top supernodes: %w", err)
}
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)

var boostrapNodes []*Node
mapNodes := make(map[string]*Node, len(supernodeResp.Supernodes))

// Get the latest block to determine height
latestBlockResp, err := s.options.LumeraClient.Node().GetLatestBlock(ctx)
if err != nil {
return fmt.Errorf("failed to get latest block: %w", err)
}

// Get the block height
blockHeight := uint64(latestBlockResp.SdkBlock.Header.Height)
for _, supernode := range supernodeResp.Supernodes {
// Skip non-active supernodes - find latest state by height
if len(supernode.States) == 0 {
continue
}

// Get top supernodes for this block
supernodeResp, err := s.options.LumeraClient.SuperNode().GetTopSuperNodesForBlock(ctx, blockHeight)
if err != nil {
return fmt.Errorf("failed to get top supernodes: %w", err)
}
var latestState int32 = 0
var maxStateHeight int64 = -1
for _, state := range supernode.States {
if state.Height > maxStateHeight {
maxStateHeight = state.Height
latestState = int32(state.State)
}
}

mapNodes := map[string]*Node{}
if latestState != 1 { // SuperNodeStateActive = 1
continue
}

for _, supernode := range supernodeResp.Supernodes {
// Find the latest IP address (with highest block height)
var latestIP string
var maxHeight int64 = -1

for _, ipHistory := range supernode.PrevIpAddresses {
if ipHistory.Height > maxHeight {
maxHeight = ipHistory.Height
Expand Down Expand Up @@ -182,26 +178,97 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
mapNodes[fullAddress] = node
}

// Convert the map to a slice
for _, node := range mapNodes {
hID, _ := utils.Blake3Hash(node.ID)
node.HashedID = hID
logtrace.Debug(ctx, "node adding", logtrace.Fields{
logtrace.FieldModule: "p2p",
"node": node.String(),
"hashed_id": string(node.HashedID),
})
boostrapNodes = append(boostrapNodes, node)
return mapNodes, nil
}

// filterResponsiveByKademliaPing sends a Kademlia Ping RPC to each node and returns
// only those that respond successfully.
func (s *DHT) filterResponsiveByKademliaPing(ctx context.Context, nodes []*Node) []*Node {
maxInFlight := 32
if len(nodes) < maxInFlight {
maxInFlight = len(nodes)
}
if maxInFlight < 1 {
maxInFlight = 1
}

if len(boostrapNodes) == 0 {
logtrace.Error(ctx, "unable to fetch bootstrap IP addresses. No valid supernodes found.", logtrace.Fields{
logtrace.FieldModule: "p2p",
})
type result struct {
node *Node
alive bool
}

sem := make(chan struct{}, maxInFlight)
resCh := make(chan result, len(nodes))
var wg sync.WaitGroup

for _, n := range nodes {
n := n
wg.Add(1)
sem <- struct{}{}
go func() {
defer wg.Done()
defer func() { <-sem }()
req := s.newMessage(Ping, n, nil)
pctx, cancel := context.WithTimeout(ctx, defaultPingTime)
defer cancel()
_, err := s.network.Call(pctx, req, false)
if err != nil {
// penalize, but Bootstrap will still retry independently
s.ignorelist.IncrementCount(n)
logtrace.Debug(ctx, "kademlia ping failed during configure", logtrace.Fields{
logtrace.FieldModule: "p2p",
logtrace.FieldError: err.Error(),
"node": n.String(),
})
resCh <- result{node: n, alive: false}
return
}
resCh <- result{node: n, alive: true}
}()
}

wg.Wait()
close(resCh)

filtered := make([]*Node, 0, len(nodes))
for r := range resCh {
if r.alive {
filtered = append(filtered, r.node)
}
}
return filtered
}

// ConfigureBootstrapNodes connects with lumera client & gets p2p boostrap ip & port
func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string) error {
if bootstrapNodes != "" {
return s.setBootstrapNodesFromConfigVar(ctx, bootstrapNodes)
}

supernodeAddr, err := s.getSupernodeAddress(ctx)
if err != nil {
return fmt.Errorf("get supernode address: %s", err)
}
selfAddress := fmt.Sprintf("%s:%d", parseSupernodeAddress(supernodeAddr), s.options.Port)

// Load bootstrap candidates from chain
mapNodes, err := s.loadBootstrapCandidatesFromChain(ctx, selfAddress)
if err != nil {
return err
}

// Build candidate list and filter only those that respond to Kademlia Ping
candidates := make([]*Node, 0, len(mapNodes))
for _, n := range mapNodes {
candidates = append(candidates, n)
}
pingResponsive := s.filterResponsiveByKademliaPing(ctx, candidates)
if len(pingResponsive) == 0 {
logtrace.Error(ctx, "no bootstrap nodes responded to Kademlia ping", logtrace.Fields{logtrace.FieldModule: "p2p"})
return nil
}

for _, node := range boostrapNodes {
for _, node := range pingResponsive {
logtrace.Info(ctx, "adding p2p bootstrap node", logtrace.Fields{
logtrace.FieldModule: "p2p",
"bootstap_ip": node.IP,
Expand All @@ -210,7 +277,7 @@ func (s *DHT) ConfigureBootstrapNodes(ctx context.Context, bootstrapNodes string
})
}

s.options.BootstrapNodes = append(s.options.BootstrapNodes, boostrapNodes...)
s.options.BootstrapNodes = append(s.options.BootstrapNodes, pingResponsive...)

return nil
}
Expand All @@ -230,9 +297,12 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error {
for _, node := range s.options.BootstrapNodes {
nodeId := string(node.ID)
// sync the bootstrap node only once
isConnected, exists := s.bsConnected[nodeId]
if exists && isConnected {
continue
val, exists := s.bsConnected.Load(nodeId)
if exists {
isConnected, _ := val.(bool)
if isConnected {
continue
}
}

addr := fmt.Sprintf("%s:%v", node.IP, node.Port)
Expand All @@ -245,7 +315,7 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error {
}

node := node
s.bsConnected[nodeId] = false
s.bsConnected.Store(nodeId, false)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -295,7 +365,7 @@ func (s *DHT) Bootstrap(ctx context.Context, bootstrapNodes string) error {
continue
}

s.bsConnected[nodeId] = true
s.bsConnected.Store(nodeId, true)
s.addNode(ctx, response.Sender)
break
}
Expand Down
Loading