forked from hashicorp/memberlist
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transport_test.go
208 lines (177 loc) · 5.26 KB
/
transport_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package memberlist
import (
"log"
"net"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTransport_Join(t *testing.T) {
net := &MockNetwork{}
t1 := net.NewTransport("node1")
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = t1
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
}
m1.setAlive()
m1.schedule()
defer m1.Shutdown()
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
}
m2.setAlive()
m2.schedule()
defer m2.Shutdown()
num, err := m2.Join([]string{c1.Name + "/" + t1.addr.String()})
if num != 1 {
t.Fatalf("bad: %d", num)
}
if err != nil {
t.Fatalf("err: %v", err)
}
if len(m2.Members()) != 2 {
t.Fatalf("bad: %v", m2.Members())
}
if m2.estNumNodes() != 2 {
t.Fatalf("bad: %v", m2.Members())
}
}
func TestTransport_Send(t *testing.T) {
net := &MockNetwork{}
t1 := net.NewTransport("node1")
d1 := &MockDelegate{}
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = t1
c1.Delegate = d1
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
}
m1.setAlive()
m1.schedule()
defer m1.Shutdown()
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
}
m2.setAlive()
m2.schedule()
defer m2.Shutdown()
num, err := m2.Join([]string{c1.Name + "/" + t1.addr.String()})
if num != 1 {
t.Fatalf("bad: %d", num)
}
if err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendTo(t1.addr, []byte("SendTo")); err != nil {
t.Fatalf("err: %v", err)
}
var n1 *Node
for _, n := range m2.Members() {
if n.Name == c1.Name {
n1 = n
break
}
}
if n1 == nil {
t.Fatalf("bad")
}
if err := m2.SendToUDP(n1, []byte("SendToUDP")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendToTCP(n1, []byte("SendToTCP")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendBestEffort(n1, []byte("SendBestEffort")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendReliable(n1, []byte("SendReliable")); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(100 * time.Millisecond)
expected := []string{"SendTo", "SendToUDP", "SendToTCP", "SendBestEffort", "SendReliable"}
msgs1 := d1.getMessages()
received := make([]string, len(msgs1))
for i, bs := range msgs1 {
received[i] = string(bs)
}
// Some of these are UDP so often get re-ordered making the test flaky if we
// assert send ordering. Sort both slices to be tolerant of re-ordering.
require.ElementsMatch(t, expected, received)
}
type testCountingWriter struct {
t *testing.T
numCalls *int32
}
func (tw testCountingWriter) Write(p []byte) (n int, err error) {
atomic.AddInt32(tw.numCalls, 1)
if !strings.Contains(string(p), "memberlist: Error accepting TCP connection") {
tw.t.Error("did not receive expected log message")
}
tw.t.Log("countingWriter:", string(p))
return len(p), nil
}
// TestTransport_TcpListenBackoff tests that AcceptTCP() errors in NetTransport#tcpListen()
// do not result in a tight loop and spam the log. We verify this here by counting the number
// of entries logged in a given time period.
func TestTransport_TcpListenBackoff(t *testing.T) {
// testTime is the amount of time we will allow NetTransport#tcpListen() to run
// This needs to be long enough that to verify that maxDelay is in force,
// but not so long as to be obnoxious when running the test suite.
const testTime = 4 * time.Second
var numCalls int32
countingWriter := testCountingWriter{t, &numCalls}
countingLogger := log.New(countingWriter, "test", log.LstdFlags)
transport := NetTransport{
streamCh: make(chan net.Conn),
logger: countingLogger,
}
transport.wg.Add(1)
// create a listener that will cause AcceptTCP calls to fail
listener, _ := net.ListenTCP("tcp", nil)
listener.Close()
go transport.tcpListen(listener)
// sleep (+yield) for testTime seconds before asking the accept loop to shut down
time.Sleep(testTime)
atomic.StoreInt32(&transport.shutdown, 1)
// Verify that the wg was completed on exit (but without blocking this test)
// maxDelay == 1s, so we will give the routine 1.25s to loop around and shut down.
c := make(chan struct{})
go func() {
defer close(c)
transport.wg.Wait()
}()
select {
case <-c:
case <-time.After(1250 * time.Millisecond):
t.Error("timed out waiting for transport waitgroup to be done after flagging shutdown")
}
// In testTime==4s, we expect to loop approximately 12 times (and log approximately 11 errors),
// with the following delays (in ms):
// 0+5+10+20+40+80+160+320+640+1000+1000+1000 == 4275 ms
// Too few calls suggests that the minDelay is not in force; too many calls suggests that the
// maxDelay is not in force or that the back-off isn't working at all.
// We'll leave a little flex; the important thing here is the asymptotic behavior.
// If the minDelay or maxDelay in NetTransport#tcpListen() are modified, this test may fail
// and need to be adjusted.
require.True(t, numCalls > 8)
require.True(t, numCalls < 14)
// no connections should have been accepted and sent to the channel
require.Equal(t, len(transport.streamCh), 0)
}