Skip to content

Commit 63c013d

Browse files
committed
Reduce channels in AbstractSimpleTransportTestCase (elastic#34863) (elastic#34880)
This is related to elastic#30876. The AbstractSimpleTransportTestCase initiates many tcp connections. There are normally over 1,000 connections in TIME_WAIT at the end of the test. This is because every test opens at least two different transports that connect to each other with 13 channel connection profiles. This commit modifies the default connection profile used by this test to 6. One connection for each type, except for REG which gets 2 connections.
1 parent 86af1ce commit 63c013d

File tree

8 files changed

+56
-35
lines changed

8 files changed

+56
-35
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
7373
}
7474
};
7575
MockTransportService mockTransportService =
76-
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
76+
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
7777
mockTransportService.start();
7878
return mockTransportService;
7979
}

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

+1-20
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable {
6767
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
6868

6969
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
70-
this(settings, transport, threadPool, buildDefaultConnectionProfile(settings));
70+
this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings));
7171
}
7272

7373
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) {
@@ -323,23 +323,4 @@ public void onConnectionClosed(Transport.Connection connection) {
323323
}
324324
}
325325
}
326-
327-
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
328-
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
329-
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
330-
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
331-
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
332-
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
333-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
334-
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
335-
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
336-
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
337-
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
338-
// if we are not master eligible we don't need a dedicated channel to publish the state
339-
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
340-
// if we are not a data-node we don't need any dedicated channels for recovery
341-
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
342-
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
343-
return builder.build();
344-
}
345326
}

server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

+27
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.settings.Settings;
2224
import org.elasticsearch.common.unit.TimeValue;
2325

2426
import java.util.ArrayList;
@@ -91,6 +93,31 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro
9193
}
9294
}
9395

96+
/**
97+
* Builds a default connection profile based on the provided settings.
98+
*
99+
* @param settings to build the connection profile from
100+
* @return the connection profile
101+
*/
102+
public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
103+
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
104+
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
105+
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);
106+
int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings);
107+
int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings);
108+
Builder builder = new Builder();
109+
builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
110+
builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
111+
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
112+
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
113+
// if we are not master eligible we don't need a dedicated channel to publish the state
114+
builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE);
115+
// if we are not a data-node we don't need any dedicated channels for recovery
116+
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
117+
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
118+
return builder.build();
119+
}
120+
94121
/**
95122
* A builder to build a new {@link ConnectionProfile}
96123
*/

server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void stopThreadPool() {
6464
}
6565

6666
public void testConnectionProfileResolve() {
67-
final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
67+
final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
6868
assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile));
6969

7070
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
@@ -96,31 +96,31 @@ public void testConnectionProfileResolve() {
9696
}
9797

9898
public void testDefaultConnectionProfile() {
99-
ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
99+
ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
100100
assertEquals(13, profile.getNumConnections());
101101
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
102102
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
103103
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
104104
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
105105
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
106106

107-
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
107+
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build());
108108
assertEquals(12, profile.getNumConnections());
109109
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
110110
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
111111
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
112112
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
113113
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
114114

115-
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
115+
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build());
116116
assertEquals(11, profile.getNumConnections());
117117
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
118118
assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
119119
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
120120
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
121121
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
122122

123-
profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
123+
profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false)
124124
.put("node.master", false).build());
125125
assertEquals(10, profile.getNumConnections());
126126
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING));

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
113113
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
114114

115115
protected int channelsPerNodeConnection() {
116-
return 13;
116+
// This is a customized profile for this test case.
117+
return 6;
117118
}
118119

119120
@Override
@@ -122,9 +123,17 @@ public void setUp() throws Exception {
122123
super.setUp();
123124
threadPool = new TestThreadPool(getClass().getName());
124125
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
125-
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
126+
Settings connectionSettings = Settings.builder()
127+
.put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1)
128+
.put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1)
129+
.put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2)
130+
.put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1)
131+
.put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1)
132+
.build();
133+
134+
serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates
126135
nodeA = serviceA.getLocalNode();
127-
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
136+
serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates
128137
nodeB = serviceB.getLocalNode();
129138
// wait till all nodes are properly connected and the event has been sent, so tests in this class
130139
// will not get this callback called on the connections done in this setup
@@ -171,7 +180,12 @@ private MockTransportService buildService(final String name, final Version versi
171180
}
172181

173182
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
174-
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
183+
return buildService(name, version, clusterSettings, Settings.EMPTY);
184+
}
185+
186+
protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
187+
Settings settings) {
188+
return buildService(name, version, clusterSettings, settings, true, true);
175189
}
176190

177191
@Override
@@ -2004,7 +2018,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea
20042018
assertEquals("handshake failed", exception.getCause().getMessage());
20052019
}
20062020

2007-
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
2021+
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
20082022
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
20092023
TcpTransport.NodeChannels connection = originalTransport.openConnection(
20102024
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),

test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, Time
5050
}
5151
};
5252
MockTransportService mockTransportService =
53-
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
53+
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
5454
mockTransportService.start();
5555
return mockTransportService;
5656
}

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
2121
import org.elasticsearch.transport.BindTransportException;
2222
import org.elasticsearch.transport.ConnectTransportException;
23-
import org.elasticsearch.transport.ConnectionManager;
2423
import org.elasticsearch.transport.ConnectionProfile;
2524
import org.elasticsearch.transport.TcpTransport;
2625
import org.elasticsearch.transport.TransportRequestOptions;
@@ -111,7 +110,7 @@ public void testTcpHandshake() throws IOException, InterruptedException {
111110
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
112111
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
113112

114-
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
113+
ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
115114
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
116115
TcpTransport.NodeChannels connection = originalTransport.openConnection(
117116
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected Version getCurrentVersion() {
9393

9494
};
9595
MockTransportService mockTransportService =
96-
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings,
96+
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings,
9797
Collections.emptySet());
9898
mockTransportService.start();
9999
return mockTransportService;

0 commit comments

Comments
 (0)