Skip to content

Commit 9b87ec9

Browse files
authoredNov 11, 2020
Avoid caching presto worker nodes (#465)
Divide ClusterManager implementations to be sync or async. Sync ClusterManagers always return the cluster state as seen by the Engine. Async ClusterManagers may take some time to converge to the cluster state as seen by the Engine.
1 parent 258f8fe commit 9b87ec9

File tree

19 files changed

+724
-415
lines changed

19 files changed

+724
-415
lines changed
 

‎rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import java.lang.reflect.ParameterizedType;
4949
import java.lang.reflect.Type;
5050
import java.net.URI;
51-
import java.util.List;
51+
import java.util.Set;
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353

5454
import static com.google.common.base.Preconditions.checkState;
@@ -401,7 +401,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
401401
return fs.getFileBlockLocations(file, start, len);
402402
}
403403

404-
List<String> nodes = clusterManager.getNodes();
404+
Set<String> nodes = clusterManager.getNodes();
405405

406406
if (nodes == null) {
407407
return fs.getFileBlockLocations(file, start, len);

‎rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManager.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@
1212
*/
1313
package com.qubole.rubix.core.utils;
1414

15-
import com.qubole.rubix.spi.ClusterManager;
15+
import com.qubole.rubix.spi.AsyncClusterManager;
1616
import com.qubole.rubix.spi.ClusterType;
1717

1818
import java.net.InetAddress;
1919
import java.net.UnknownHostException;
2020
import java.util.ArrayList;
21+
import java.util.HashSet;
2122
import java.util.List;
23+
import java.util.Set;
2224

2325
/**
2426
* Created by Abhishek on 6/8/18.
2527
*/
26-
public class DummyClusterManager extends ClusterManager
28+
public class DummyClusterManager extends AsyncClusterManager
2729
{
2830
@Override
29-
public List<String> getNodesInternal()
31+
public Set<String> getNodesInternal()
3032
{
31-
List<String> list = new ArrayList<String>();
33+
Set<String> list = new HashSet<>();
3234
String hostName = "";
3335
try {
3436
hostName = InetAddress.getLocalHost().getCanonicalHostName();
@@ -51,6 +53,6 @@ public ClusterType getClusterType()
5153
@Override
5254
public String getCurrentNodeName()
5355
{
54-
return getNodes().get(0);
56+
return getNodes().iterator().next();
5557
}
5658
}

‎rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManagerMultinode.java

+23-14
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,41 @@
1212
*/
1313
package com.qubole.rubix.core.utils;
1414

15-
import com.qubole.rubix.spi.ClusterManager;
15+
import com.qubole.rubix.spi.AsyncClusterManager;
1616
import com.qubole.rubix.spi.ClusterType;
1717

1818
import java.net.InetAddress;
1919
import java.net.UnknownHostException;
2020
import java.util.ArrayList;
21+
import java.util.HashSet;
2122
import java.util.List;
23+
import java.util.Set;
2224

23-
public class DummyClusterManagerMultinode extends ClusterManager
25+
public class DummyClusterManagerMultinode extends AsyncClusterManager
2426
{
25-
@Override
26-
public List<String> getNodesInternal()
27+
private final String currentNode;
28+
private final String otherNode;
29+
private final Set<String> nodes = new HashSet<>();
30+
31+
public DummyClusterManagerMultinode()
2732
{
28-
List<String> list = new ArrayList<String>();
29-
String hostName = "";
33+
String currentNode;
3034
try {
31-
hostName = InetAddress.getLocalHost().getCanonicalHostName();
35+
currentNode = InetAddress.getLocalHost().getCanonicalHostName();
3236
}
3337
catch (UnknownHostException e) {
34-
hostName = "localhost";
38+
currentNode = "localhost";
3539
}
40+
this.currentNode = currentNode;
41+
nodes.add(currentNode);
42+
this.otherNode = currentNode + "_copy";
43+
nodes.add(otherNode);
44+
}
3645

37-
list.add(hostName);
38-
list.add(hostName + "_copy");
39-
40-
return list;
46+
@Override
47+
public Set<String> getNodesInternal()
48+
{
49+
return nodes;
4150
}
4251

4352
@Override
@@ -49,11 +58,11 @@ public ClusterType getClusterType()
4958
@Override
5059
public String getCurrentNodeName()
5160
{
52-
return getNodes().get(0);
61+
return currentNode;
5362
}
5463

5564
public String locateKey(String key)
5665
{
57-
return getNodes().get(1);
66+
return otherNode;
5867
}
5968
}

‎rubix-core/src/test/java/com/qubole/rubix/core/utils/DockerTestClusterManager.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313
package com.qubole.rubix.core.utils;
1414

1515
import com.google.common.collect.Lists;
16-
import com.qubole.rubix.spi.ClusterManager;
16+
import com.google.common.collect.Sets;
17+
import com.qubole.rubix.spi.AsyncClusterManager;
1718
import com.qubole.rubix.spi.ClusterType;
1819

19-
import java.util.List;
20+
import java.util.Set;
2021

21-
public class DockerTestClusterManager extends ClusterManager
22+
public class DockerTestClusterManager extends AsyncClusterManager
2223
{
2324
@Override
24-
public List<String> getNodesInternal()
25+
public Set<String> getNodesInternal()
2526
{
26-
return Lists.newArrayList("172.18.8.1", "172.18.8.2");
27+
return Sets.newHashSet("172.18.8.1", "172.18.8.2");
2728
}
2829

2930
@Override

‎rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/Hadoop2ClusterManager.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
*/
1313
package com.qubole.rubix.hadoop2;
1414

15+
import com.google.api.client.util.Sets;
1516
import com.google.common.base.Throwables;
1617
import com.google.common.collect.ImmutableList;
18+
import com.google.common.collect.ImmutableSet;
1719
import com.google.common.collect.Lists;
18-
import com.qubole.rubix.spi.ClusterManager;
20+
import com.qubole.rubix.spi.AsyncClusterManager;
1921
import com.qubole.rubix.spi.ClusterType;
2022
import org.apache.commons.logging.Log;
2123
import org.apache.commons.logging.LogFactory;
@@ -30,7 +32,7 @@
3032
/**
3133
* Created by sakshia on 28/7/16.
3234
*/
33-
public class Hadoop2ClusterManager extends ClusterManager
35+
public class Hadoop2ClusterManager extends AsyncClusterManager
3436
{
3537
YarnConfiguration yconf;
3638
private Log log = LogFactory.getLog(Hadoop2ClusterManager.class);
@@ -44,7 +46,7 @@ public void initialize(Configuration conf)
4446
}
4547

4648
@Override
47-
public List<String> getNodesInternal()
49+
public Set<String> getNodesInternal()
4850
{
4951
try {
5052
List<Hadoop2ClusterManagerUtil.Node> allNodes = Hadoop2ClusterManagerUtil.getAllNodes(yconf);
@@ -53,7 +55,7 @@ public List<String> getNodesInternal()
5355
}
5456

5557
if (allNodes.isEmpty()) {
56-
return ImmutableList.of();
58+
return ImmutableSet.of();
5759
}
5860

5961
Set<String> hosts = new HashSet<>();
@@ -70,8 +72,7 @@ public List<String> getNodesInternal()
7072
throw new Exception("No healthy data nodes found.");
7173
}
7274

73-
List<String> hostList = Lists.newArrayList(hosts.toArray(new String[0]));
74-
return hostList;
75+
return ImmutableSet.copyOf(hosts);
7576
}
7677
catch (Exception e) {
7778
throw Throwables.propagate(e);

‎rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManager.java

+22-22
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.io.IOException;
2525
import java.net.InetAddress;
2626
import java.net.UnknownHostException;
27-
import java.util.List;
27+
import java.util.Set;
2828

2929
import static org.testng.Assert.assertTrue;
3030

@@ -66,13 +66,13 @@ static ClusterManager buildClusterManager()
6666
public void testGetNodes_multipleWorkers()
6767
throws IOException
6868
{
69-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
69+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
7070
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
7171
worker.new MultipleRunningWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
7272

7373
assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
74-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
75-
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
74+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
75+
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
7676
}
7777

7878
@Test
@@ -82,12 +82,12 @@ public void testGetNodes_multipleWorkers()
8282
public void testGetNodes_oneWorker()
8383
throws IOException
8484
{
85-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
85+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
8686
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
8787
worker.new OneRunningWorker(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
8888

8989
assertTrue(nodeHostnames.size() == 1, "Should only have one node");
90-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
90+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
9191
}
9292

9393
@Test
@@ -97,13 +97,13 @@ public void testGetNodes_oneWorker()
9797
public void testGetNodes_oneNewWorker()
9898
throws IOException
9999
{
100-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
100+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
101101
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
102102
worker.new MultipleWorkersOneNew(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
103103

104104
assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
105-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
106-
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
105+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
106+
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
107107
}
108108

109109
@Test
@@ -113,13 +113,13 @@ public void testGetNodes_oneNewWorker()
113113
public void testGetNodes_oneRebootedWorker()
114114
throws IOException
115115
{
116-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
116+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
117117
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
118118
worker.new MultipleWorkersOneRebooted(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
119119

120120
assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
121-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
122-
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
121+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
122+
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
123123
}
124124

125125
@Test
@@ -129,12 +129,12 @@ public void testGetNodes_oneRebootedWorker()
129129
public void testMasterOnlyCluster()
130130
throws IOException
131131
{
132-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
132+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
133133
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
134134
worker.new NoWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
135135

136136
assertTrue(nodeHostnames.size() == 1, "Should have added localhost in list");
137-
assertTrue(nodeHostnames.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
137+
assertTrue(nodeHostnames.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
138138
}
139139

140140
@Test
@@ -144,12 +144,12 @@ public void testMasterOnlyCluster()
144144
public void testUnhealthyNodeCluster_decommissioned()
145145
throws IOException
146146
{
147-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
147+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
148148
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
149149
worker.new MultipleWorkersOneDecommissioned(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
150150

151151
assertTrue(nodeHostnames.size() == 1, "Should only have one node");
152-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
152+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
153153
}
154154

155155
@Test
@@ -159,12 +159,12 @@ public void testUnhealthyNodeCluster_decommissioned()
159159
public void testUnhealthyNodeCluster_decommissioning()
160160
throws IOException
161161
{
162-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
162+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
163163
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
164164
worker.new MultipleWorkersOneDecommissioning(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
165165

166166
assertTrue(nodeHostnames.size() == 1, "Should only have one node");
167-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
167+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
168168
}
169169

170170
@Test
@@ -174,12 +174,12 @@ public void testUnhealthyNodeCluster_decommissioning()
174174
public void testUnhealthyNodeCluster_lost()
175175
throws IOException
176176
{
177-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
177+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
178178
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
179179
worker.new MultipleWorkersOneLost(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
180180

181181
assertTrue(nodeHostnames.size() == 1, "Should only have one node");
182-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
182+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
183183
}
184184

185185
@Test
@@ -189,12 +189,12 @@ public void testUnhealthyNodeCluster_lost()
189189
public void testUnhealthyNodeCluster_unhealthy()
190190
throws IOException
191191
{
192-
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
192+
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
193193
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
194194
worker.new MultipleWorkersOneUnhealthy(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);
195195

196196
assertTrue(nodeHostnames.size() == 1, "Should only have one node");
197-
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
197+
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
198198
}
199199

200200
@Test

‎rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManagerUtil.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ static HttpServer createServer(String endpoint, HttpHandler handler)
8484
* @return A list of hostnames for the nodes in the cluster.
8585
* @throws IOException if the cluster server could not be created.
8686
*/
87-
static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
87+
static Set<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
8888
Configuration conf, ClusterType clusterType)
8989
throws IOException
9090
{
@@ -93,7 +93,7 @@ static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler res
9393

9494
ClusterManager clusterManager = getClusterManagerInstance(clusterType, conf);
9595
clusterManager.initialize(conf);
96-
final List<String> nodes = clusterManager.getNodes();
96+
final Set<String> nodes = clusterManager.getNodes();
9797
log.info("Got nodes: " + nodes);
9898

9999
server.stop(0);
@@ -157,10 +157,10 @@ static int matchMemberships(TestWorker prevWorker, TestWorker newWorker, Set<Str
157157
Configuration conf, ClusterType clusterType)
158158
throws IOException
159159
{
160-
final List<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
160+
final Set<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
161161
Map<String, String> keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType);
162162

163-
final List<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
163+
final Set<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
164164
Map<String, String> keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType);
165165

166166
int match = 0;

‎rubix-presto/src/main/java/com/qubole/rubix/presto/PrestoClusterManager.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414

1515
import com.google.common.base.Throwables;
1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.collect.ImmutableSet;
1718
import com.google.common.collect.Lists;
1819
import com.google.common.reflect.TypeToken;
1920
import com.google.gson.Gson;
2021
import com.qubole.rubix.common.utils.ClusterUtil;
21-
import com.qubole.rubix.spi.ClusterManager;
22+
import com.qubole.rubix.spi.AsyncClusterManager;
2223
import com.qubole.rubix.spi.ClusterType;
2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
@@ -43,7 +44,7 @@
4344
/**
4445
* Created by stagra on 14/1/16.
4546
*/
46-
public class PrestoClusterManager extends ClusterManager
47+
public class PrestoClusterManager extends AsyncClusterManager
4748
{
4849
private static final String DEFAULT_USER = "rubix";
4950
private int serverPort = 8081;
@@ -66,7 +67,7 @@ public void initialize(Configuration conf)
6667
}
6768

6869
@Override
69-
public List<String> getNodesInternal()
70+
public Set<String> getNodesInternal()
7071
{
7172
try {
7273
URL allNodesRequest = getNodeUrl();
@@ -141,11 +142,7 @@ public List<String> getNodesInternal()
141142
List<Stats> allNodes = gson.fromJson(allResponse.toString(), type);
142143
List<Stats> failedNodes = gson.fromJson(failedResponse.toString(), type);
143144
if (allNodes.isEmpty()) {
144-
return ImmutableList.of();
145-
}
146-
147-
if (failedNodes.isEmpty()) {
148-
failedNodes = ImmutableList.of();
145+
return ImmutableSet.of();
149146
}
150147

151148
// keep only the healthy nodes
@@ -156,7 +153,7 @@ public List<String> getNodesInternal()
156153
for (Stats node : allNodes) {
157154
hosts.add(node.getUri().getHost());
158155
}
159-
return Lists.newArrayList(hosts);
156+
return ImmutableSet.copyOf(hosts);
160157
}
161158
catch (IOException e) {
162159
throw Throwables.propagate(e);

‎rubix-presto/src/test/java/com/qubole/rubix/presto/TestClusterManager.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.net.InetSocketAddress;
2929
import java.net.UnknownHostException;
3030
import java.util.List;
31+
import java.util.Set;
3132

3233
import static org.testng.Assert.assertTrue;
3334

@@ -52,11 +53,11 @@ public void testGetNodes()
5253
log.info("STARTED SERVER");
5354

5455
ClusterManager clusterManager = getPrestoClusterManager();
55-
List<String> nodes = clusterManager.getNodes();
56+
Set<String> nodes = clusterManager.getNodes();
5657
log.info("Got nodes: " + nodes);
5758

5859
assertTrue(nodes.size() == 2, "Should only have two nodes");
59-
assertTrue(nodes.get(0).equals("192.168.1.3") && nodes.get(1).equals("192.168.2.252"), "Wrong nodes data");
60+
assertTrue(nodes.contains("192.168.1.3") && nodes.contains("192.168.2.252"), "Wrong nodes data");
6061

6162
server.stop(0);
6263
}
@@ -73,11 +74,11 @@ public void testMasterOnlyCluster()
7374
log.info("STARTED SERVER");
7475

7576
ClusterManager clusterManager = getPrestoClusterManager();
76-
List<String> nodes = clusterManager.getNodes();
77+
Set<String> nodes = clusterManager.getNodes();
7778
log.info("Got nodes: " + nodes);
7879

7980
assertTrue(nodes.size() == 1, "Should have added localhost in list");
80-
assertTrue(nodes.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
81+
assertTrue(nodes.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
8182
server.stop(0);
8283
}
8384

@@ -93,11 +94,11 @@ public void testFailedNodeCluster()
9394
log.info("STARTED SERVER");
9495

9596
ClusterManager clusterManager = getPrestoClusterManager();
96-
List<String> nodes = clusterManager.getNodes();
97+
Set<String> nodes = clusterManager.getNodes();
9798
log.info("Got nodes: " + nodes);
9899

99100
assertTrue(nodes.size() == 1, "Should only have two nodes");
100-
assertTrue(nodes.get(0).equals("192.168.2.252"), "Wrong nodes data");
101+
assertTrue(nodes.contains("192.168.2.252"), "Wrong nodes data");
101102

102103
server.stop(0);
103104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.prestosql;
14+
15+
import io.prestosql.spi.Node;
16+
import io.prestosql.spi.NodeManager;
17+
import java.util.Set;
18+
import java.util.stream.Collectors;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public class ClusterManagerNodeGetter {
23+
public static Set<String> getNodesInternal(NodeManager nodeManager)
24+
{
25+
requireNonNull(nodeManager, "nodeManager is null");
26+
return nodeManager.getWorkerNodes().stream()
27+
.map(Node::getHost)
28+
.collect(Collectors.toSet());
29+
}
30+
}

‎rubix-prestosql/src/main/java/com/qubole/rubix/prestosql/PrestoClusterManager.java

+14-263
Original file line numberDiff line numberDiff line change
@@ -12,300 +12,51 @@
1212
*/
1313
package com.qubole.rubix.prestosql;
1414

15-
import com.google.common.base.Throwables;
16-
import com.google.common.collect.ImmutableList;
17-
import com.google.common.collect.Lists;
18-
import com.google.common.reflect.TypeToken;
19-
import com.google.gson.Gson;
20-
import com.qubole.rubix.common.utils.ClusterUtil;
21-
import com.qubole.rubix.spi.ClusterManager;
15+
import com.qubole.rubix.spi.AsyncClusterManager;
2216
import com.qubole.rubix.spi.ClusterType;
2317
import io.prestosql.spi.Node;
2418
import io.prestosql.spi.NodeManager;
2519
import org.apache.commons.logging.Log;
2620
import org.apache.commons.logging.LogFactory;
2721
import org.apache.hadoop.conf.Configuration;
2822

29-
import javax.annotation.Nullable;
30-
31-
import java.io.BufferedReader;
32-
import java.io.IOException;
33-
import java.io.InputStreamReader;
34-
import java.lang.reflect.Type;
35-
import java.net.HttpURLConnection;
36-
import java.net.MalformedURLException;
37-
import java.net.URI;
38-
import java.net.URL;
3923
import java.net.UnknownHostException;
40-
import java.util.HashSet;
41-
import java.util.List;
42-
import java.util.Objects;
4324
import java.util.Set;
44-
import java.util.stream.Collectors;
45-
46-
import static java.util.Objects.requireNonNull;
4725

4826
/**
4927
* Created by stagra on 14/1/16.
5028
*/
51-
public class PrestoClusterManager extends ClusterManager
29+
public class PrestoClusterManager extends AsyncClusterManager
5230
{
53-
private static final String DEFAULT_USER = "rubix";
54-
private int serverPort = 8081;
55-
private String serverAddress = "localhost";
56-
57-
private Log log = LogFactory.getLog(PrestoClusterManager.class);
58-
59-
@Nullable
60-
private static volatile NodeManager nodeManager;
61-
public static String serverPortConf = "caching.fs.presto-server-port";
62-
63-
// Safe to use single instance of HttpClient since Supplier.get() provides synchronization
64-
@Override
65-
public void initialize(Configuration conf)
66-
throws UnknownHostException
67-
{
68-
super.initialize(conf);
69-
this.serverPort = conf.getInt(serverPortConf, serverPort);
70-
this.serverAddress = ClusterUtil.getMasterHostname(conf);
71-
}
72-
73-
@Override
74-
public List<String> getNodesInternal()
75-
{
76-
if (nodeManager != null) {
77-
return getNodesFromNodeManager();
78-
}
79-
80-
try {
81-
URL allNodesRequest = getNodeUrl();
82-
URL failedNodesRequest = getFailedNodeUrl();
83-
84-
HttpURLConnection allHttpCon = getHttpURLConnection(allNodesRequest);
85-
86-
int allNodesResponseCode = allHttpCon.getResponseCode();
87-
88-
StringBuilder allResponse = new StringBuilder();
89-
StringBuilder failedResponse = new StringBuilder();
90-
try {
91-
if (allNodesResponseCode == HttpURLConnection.HTTP_OK) {
92-
BufferedReader in = new BufferedReader(new InputStreamReader(allHttpCon.getInputStream()));
93-
String inputLine = "";
94-
try {
95-
while ((inputLine = in.readLine()) != null) {
96-
allResponse.append(inputLine);
97-
}
98-
}
99-
catch (IOException e) {
100-
throw new IOException(e);
101-
}
102-
finally {
103-
in.close();
104-
}
105-
}
106-
else {
107-
log.warn("v1/node failed with code: " + allNodesResponseCode);
108-
return null;
109-
}
110-
}
111-
catch (IOException e) {
112-
throw new IOException(e);
113-
}
114-
finally {
115-
allHttpCon.disconnect();
116-
}
117-
118-
HttpURLConnection failHttpConn = getHttpURLConnection(failedNodesRequest);
119-
int failedNodesResponseCode = failHttpConn.getResponseCode();
120-
// check on failed nodes
121-
try {
122-
if (failedNodesResponseCode == HttpURLConnection.HTTP_OK) {
123-
BufferedReader in = new BufferedReader(new InputStreamReader(failHttpConn.getInputStream()));
124-
String inputLine;
125-
try {
126-
while ((inputLine = in.readLine()) != null) {
127-
failedResponse.append(inputLine);
128-
}
129-
}
130-
catch (IOException e) {
131-
throw new IOException(e);
132-
}
133-
finally {
134-
in.close();
135-
}
136-
}
137-
}
138-
catch (IOException e) {
139-
throw new IOException(e);
140-
}
141-
finally {
142-
failHttpConn.disconnect();
143-
}
31+
private static Log log = LogFactory.getLog(PrestoClusterManager.class);
32+
static volatile NodeManager NODE_MANAGER;
14433

145-
Gson gson = new Gson();
146-
Type type = new TypeToken<List<Stats>>()
147-
{
148-
}.getType();
149-
150-
List<Stats> allNodes = gson.fromJson(allResponse.toString(), type);
151-
List<Stats> failedNodes = gson.fromJson(failedResponse.toString(), type);
152-
153-
if (failedNodes.isEmpty()) {
154-
failedNodes = ImmutableList.of();
155-
}
156-
157-
// keep only the healthy nodes
158-
allNodes.removeAll(failedNodes);
159-
160-
Set<String> hosts = new HashSet<String>();
161-
for (Stats node : allNodes) {
162-
hosts.add(node.getUri().getHost());
163-
}
164-
165-
return Lists.newArrayList(hosts.toArray(new String[0]));
166-
}
167-
catch (IOException e) {
168-
throw Throwables.propagate(e);
169-
}
170-
}
171-
172-
private HttpURLConnection getHttpURLConnection(URL urlRequest)
173-
throws IOException
34+
public static void setNodeManager(NodeManager nodeManager)
17435
{
175-
requireNonNull(urlRequest, "urlRequest is null");
176-
HttpURLConnection allHttpCon = (HttpURLConnection) urlRequest.openConnection();
177-
allHttpCon.setConnectTimeout(500); //ms
178-
allHttpCon.setRequestMethod("GET");
179-
allHttpCon.setRequestProperty("X-Presto-User", DEFAULT_USER);
180-
return allHttpCon;
36+
PrestoClusterManager.NODE_MANAGER = nodeManager;
18137
}
18238

183-
private List<String> getNodesFromNodeManager()
184-
{
185-
requireNonNull(nodeManager, "nodeManager is null");
186-
List<String> workers = nodeManager.getWorkerNodes().stream()
187-
.filter(node -> !node.isCoordinator())
188-
.map(Node::getHost)
189-
.collect(Collectors.toList());
190-
191-
return workers;
192-
}
39+
private volatile NodeManager nodeManager;
19340

19441
@Override
195-
protected String getCurrentNodeHostname()
42+
public void initialize(Configuration conf) throws UnknownHostException
19643
{
197-
if (nodeManager != null) {
198-
return nodeManager.getCurrentNode().getHost();
44+
super.initialize(conf);
45+
nodeManager = NODE_MANAGER;
46+
if (nodeManager == null) {
47+
nodeManager = new StandaloneNodeManager(conf);
19948
}
200-
201-
return super.getCurrentNodeHostname();
20249
}
20350

20451
@Override
205-
protected String getCurrentNodeHostAddress()
52+
public Set<String> getNodesInternal()
20653
{
207-
if (nodeManager != null) {
208-
try {
209-
return nodeManager.getCurrentNode().getHostAndPort().toInetAddress().getHostAddress();
210-
}
211-
catch (UnknownHostException e) {
212-
log.warn("Could not get HostAddress from NodeManager", e);
213-
// fallback
214-
}
215-
}
216-
217-
return super.getCurrentNodeHostAddress();
54+
return ClusterManagerNodeGetter.getNodesInternal(nodeManager);
21855
}
21956

22057
@Override
22158
public ClusterType getClusterType()
22259
{
22360
return ClusterType.PRESTOSQL_CLUSTER_MANAGER;
22461
}
225-
226-
public static void setPrestoServerPort(Configuration conf, int port)
227-
{
228-
conf.setInt(serverPortConf, port);
229-
}
230-
231-
public static void setNodeManager(NodeManager nodeManager)
232-
{
233-
PrestoClusterManager.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
234-
}
235-
236-
private URL getNodeUrl()
237-
throws MalformedURLException
238-
{
239-
return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node");
240-
}
241-
242-
private URL getFailedNodeUrl()
243-
throws MalformedURLException
244-
{
245-
return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node/failed");
246-
}
247-
248-
public static class Stats
249-
{
250-
URI uri;
251-
String lastResponseTime;
252-
253-
public Stats()
254-
{
255-
}
256-
257-
public Stats(URI uri, String lastResponseTime)
258-
{
259-
this.uri = uri;
260-
this.lastResponseTime = lastResponseTime;
261-
}
262-
263-
public URI getUri()
264-
{
265-
return uri;
266-
}
267-
268-
public void setURI(URI uri)
269-
{
270-
this.uri = uri;
271-
}
272-
273-
String getLastResponseTime()
274-
{
275-
return lastResponseTime;
276-
}
277-
278-
public void setLastResponseTime(String lastResponseTime)
279-
{
280-
this.lastResponseTime = lastResponseTime;
281-
}
282-
283-
@Override
284-
public boolean equals(Object other)
285-
{
286-
if (this == other) {
287-
return true;
288-
}
289-
if (other == null || getClass() != other.getClass()) {
290-
return false;
291-
}
292-
Stats o = (Stats) other;
293-
294-
if (!uri.equals(o.getUri())) {
295-
return false;
296-
}
297-
298-
if (lastResponseTime != null && o.getLastResponseTime() != null) {
299-
return lastResponseTime.equals(o.getLastResponseTime());
300-
}
301-
302-
return lastResponseTime == o.getLastResponseTime();
303-
}
304-
305-
@Override
306-
public int hashCode()
307-
{
308-
return Objects.hash(uri, lastResponseTime);
309-
}
310-
}
31162
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.prestosql;
14+
15+
import com.google.common.base.Throwables;
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.common.reflect.TypeToken;
18+
import com.google.gson.Gson;
19+
import com.qubole.rubix.common.utils.ClusterUtil;
20+
import io.prestosql.spi.HostAddress;
21+
import io.prestosql.spi.Node;
22+
import io.prestosql.spi.NodeManager;
23+
import java.net.InetAddress;
24+
import java.net.UnknownHostException;
25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.apache.hadoop.conf.Configuration;
28+
29+
import java.io.BufferedReader;
30+
import java.io.IOException;
31+
import java.io.InputStreamReader;
32+
import java.lang.reflect.Type;
33+
import java.net.HttpURLConnection;
34+
import java.net.MalformedURLException;
35+
import java.net.URI;
36+
import java.net.URL;
37+
import java.util.HashSet;
38+
import java.util.List;
39+
import java.util.Objects;
40+
import java.util.Set;
41+
42+
import static java.util.Objects.requireNonNull;
43+
44+
public class StandaloneNodeManager
45+
implements NodeManager {
46+
private static Log LOG = LogFactory.getLog(StandaloneNodeManager.class);
47+
private static final int DEFAULT_SERVER_PORT = 8081;
48+
private static final String DEFAULT_USER = "rubix";
49+
public static final String SERVER_PORT_CONF_KEY = "caching.fs.presto-server-port";
50+
51+
private final String serverAddress;
52+
private final Node currentNode;
53+
private final int serverPort;
54+
55+
public StandaloneNodeManager(Configuration conf) {
56+
this.serverPort = conf.getInt(SERVER_PORT_CONF_KEY, DEFAULT_SERVER_PORT);
57+
this.serverAddress = ClusterUtil.getMasterHostname(conf);
58+
Node currentNode = null;
59+
try {
60+
currentNode = new StandaloneNode(URI.create("http://" + InetAddress.getLocalHost().getHostAddress()));
61+
}
62+
catch (UnknownHostException e) {
63+
LOG.warn("Unable to set current node", e);
64+
}
65+
this.currentNode = currentNode;
66+
}
67+
68+
@Override
69+
public Set<Node> getAllNodes() {
70+
return getWorkerNodes();
71+
}
72+
73+
@Override
74+
public Set<Node> getWorkerNodes() {
75+
try {
76+
URL allNodesRequest = getNodeUrl();
77+
URL failedNodesRequest = getFailedNodeUrl();
78+
79+
HttpURLConnection allHttpCon = getHttpURLConnection(allNodesRequest);
80+
81+
int allNodesResponseCode = allHttpCon.getResponseCode();
82+
83+
StringBuilder allResponse = new StringBuilder();
84+
StringBuilder failedResponse = new StringBuilder();
85+
try {
86+
if (allNodesResponseCode == HttpURLConnection.HTTP_OK) {
87+
BufferedReader in = new BufferedReader(new InputStreamReader(allHttpCon.getInputStream()));
88+
String inputLine = "";
89+
try {
90+
while ((inputLine = in.readLine()) != null) {
91+
allResponse.append(inputLine);
92+
}
93+
}
94+
catch (IOException e) {
95+
throw new IOException(e);
96+
}
97+
finally {
98+
in.close();
99+
}
100+
}
101+
else {
102+
LOG.warn("v1/node failed with code: " + allNodesResponseCode);
103+
return null;
104+
}
105+
}
106+
catch (IOException e) {
107+
throw new IOException(e);
108+
}
109+
finally {
110+
allHttpCon.disconnect();
111+
}
112+
113+
HttpURLConnection failHttpConn = getHttpURLConnection(failedNodesRequest);
114+
int failedNodesResponseCode = failHttpConn.getResponseCode();
115+
// check on failed nodes
116+
try {
117+
if (failedNodesResponseCode == HttpURLConnection.HTTP_OK) {
118+
BufferedReader in = new BufferedReader(new InputStreamReader(failHttpConn.getInputStream()));
119+
String inputLine;
120+
try {
121+
while ((inputLine = in.readLine()) != null) {
122+
failedResponse.append(inputLine);
123+
}
124+
}
125+
catch (IOException e) {
126+
throw new IOException(e);
127+
}
128+
finally {
129+
in.close();
130+
}
131+
}
132+
}
133+
catch (IOException e) {
134+
throw new IOException(e);
135+
}
136+
finally {
137+
failHttpConn.disconnect();
138+
}
139+
140+
Gson gson = new Gson();
141+
Type type = new TypeToken<List<Stats>>()
142+
{
143+
}.getType();
144+
145+
List<Stats> allNodes = gson.fromJson(allResponse.toString(), type);
146+
List<Stats> failedNodes = gson.fromJson(failedResponse.toString(), type);
147+
148+
if (failedNodes.isEmpty()) {
149+
failedNodes = ImmutableList.of();
150+
}
151+
152+
// keep only the healthy nodes
153+
allNodes.removeAll(failedNodes);
154+
155+
Set<Node> hosts = new HashSet<Node>();
156+
for (Stats node : allNodes) {
157+
hosts.add(new StandaloneNode(node.getUri()));
158+
}
159+
160+
return hosts;
161+
}
162+
catch (IOException e) {
163+
throw Throwables.propagate(e);
164+
}
165+
}
166+
167+
@Override
168+
public Node getCurrentNode() {
169+
return currentNode;
170+
}
171+
172+
@Override
173+
public String getEnvironment() {
174+
return "testenv";
175+
}
176+
177+
private HttpURLConnection getHttpURLConnection(URL urlRequest)
178+
throws IOException
179+
{
180+
requireNonNull(urlRequest, "urlRequest is null");
181+
HttpURLConnection allHttpCon = (HttpURLConnection) urlRequest.openConnection();
182+
allHttpCon.setConnectTimeout(500); //ms
183+
allHttpCon.setRequestMethod("GET");
184+
allHttpCon.setRequestProperty("X-Presto-User", DEFAULT_USER);
185+
return allHttpCon;
186+
}
187+
188+
private URL getNodeUrl()
189+
throws MalformedURLException
190+
{
191+
return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node");
192+
}
193+
194+
private URL getFailedNodeUrl()
195+
throws MalformedURLException
196+
{
197+
return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node/failed");
198+
}
199+
200+
public static class StandaloneNode
201+
implements Node
202+
{
203+
private final URI uri;
204+
205+
public StandaloneNode(URI uri) {
206+
this.uri = uri;
207+
}
208+
209+
@Override
210+
public String getHost() {
211+
return uri.getHost();
212+
}
213+
214+
@Override
215+
public HostAddress getHostAndPort() {
216+
return HostAddress.fromUri(uri);
217+
}
218+
219+
@Override
220+
public URI getHttpUri() {
221+
return uri;
222+
}
223+
224+
@Override
225+
public String getNodeIdentifier() {
226+
return uri.toString();
227+
}
228+
229+
@Override
230+
public String getVersion() {
231+
return "<unknown>";
232+
}
233+
234+
@Override
235+
public boolean isCoordinator() {
236+
return false;
237+
}
238+
}
239+
240+
private static class Stats
241+
{
242+
URI uri;
243+
String lastResponseTime;
244+
245+
public Stats()
246+
{
247+
}
248+
249+
public Stats(URI uri, String lastResponseTime)
250+
{
251+
this.uri = uri;
252+
this.lastResponseTime = lastResponseTime;
253+
}
254+
255+
public URI getUri()
256+
{
257+
return uri;
258+
}
259+
260+
public void setURI(URI uri)
261+
{
262+
this.uri = uri;
263+
}
264+
265+
String getLastResponseTime()
266+
{
267+
return lastResponseTime;
268+
}
269+
270+
public void setLastResponseTime(String lastResponseTime)
271+
{
272+
this.lastResponseTime = lastResponseTime;
273+
}
274+
275+
@Override
276+
public boolean equals(Object other)
277+
{
278+
if (this == other) {
279+
return true;
280+
}
281+
if (other == null || getClass() != other.getClass()) {
282+
return false;
283+
}
284+
Stats o = (Stats) other;
285+
286+
if (!uri.equals(o.getUri())) {
287+
return false;
288+
}
289+
290+
if (lastResponseTime != null && o.getLastResponseTime() != null) {
291+
return lastResponseTime.equals(o.getLastResponseTime());
292+
}
293+
294+
return lastResponseTime == o.getLastResponseTime();
295+
}
296+
297+
@Override
298+
public int hashCode()
299+
{
300+
return Objects.hash(uri, lastResponseTime);
301+
}
302+
}
303+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.prestosql;
14+
15+
import com.qubole.rubix.spi.ClusterType;
16+
import com.qubole.rubix.spi.SyncClusterManager;
17+
import io.prestosql.spi.Node;
18+
import io.prestosql.spi.NodeManager;
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import java.util.Set;
23+
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class SyncPrestoClusterManager extends SyncClusterManager
27+
{
28+
private static Log log = LogFactory.getLog(SyncPrestoClusterManager.class);
29+
private volatile Set<Node> workerNodes;
30+
31+
@Override
32+
protected boolean hasStateChanged() {
33+
requireNonNull(PrestoClusterManager.NODE_MANAGER, "nodeManager is null");
34+
Set<Node> workerNodes = PrestoClusterManager.NODE_MANAGER.getWorkerNodes();
35+
boolean hasChanged = !workerNodes.equals(this.workerNodes);
36+
this.workerNodes = workerNodes;
37+
return hasChanged;
38+
}
39+
40+
@Override
41+
public Set<String> getNodesInternal()
42+
{
43+
return ClusterManagerNodeGetter.getNodesInternal(PrestoClusterManager.NODE_MANAGER);
44+
}
45+
46+
@Override
47+
public ClusterType getClusterType()
48+
{
49+
return ClusterType.PRESTOSQL_CLUSTER_MANAGER;
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.prestosql;
14+
15+
import com.qubole.rubix.spi.ClusterManager;
16+
import org.apache.hadoop.conf.Configuration;
17+
18+
public class TestAsyncClusterManager extends TestClusterManager {
19+
@Override
20+
protected ClusterManager newPrestoClusterManager(Configuration configuration) {
21+
return new PrestoClusterManager();
22+
}
23+
}

‎rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestClusterManager.java

+40-29
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
*/
1313
package com.qubole.rubix.prestosql;
1414

15-
import com.qubole.rubix.spi.CacheConfig;
1615
import com.qubole.rubix.spi.ClusterManager;
1716
import com.sun.net.httpserver.HttpExchange;
1817
import com.sun.net.httpserver.HttpHandler;
@@ -27,7 +26,7 @@
2726
import java.net.InetAddress;
2827
import java.net.InetSocketAddress;
2928
import java.net.UnknownHostException;
30-
import java.util.List;
29+
import java.util.Set;
3130

3231
import static org.testng.Assert.assertTrue;
3332

@@ -36,7 +35,7 @@
3635
*/
3736

3837
@Test(singleThreaded = true)
39-
public class TestClusterManager
38+
public abstract class TestClusterManager
4039
{
4140
private Log log = LogFactory.getLog(TestClusterManager.class);
4241

@@ -49,16 +48,19 @@ public void testGetNodes()
4948
{
5049
HttpServer server = createServer("/v1/node", new MultipleWorkers(), "/v1/node/failed", new NoFailedNode());
5150

52-
log.info("STARTED SERVER");
51+
try {
52+
log.info("STARTED SERVER");
5353

54-
ClusterManager clusterManager = getPrestoClusterManager();
55-
List<String> nodes = clusterManager.getNodes();
56-
log.info("Got nodes: " + nodes);
54+
ClusterManager clusterManager = getPrestoClusterManager();
55+
Set<String> nodes = clusterManager.getNodes();
56+
log.info("Got nodes: " + nodes);
5757

58-
assertTrue(nodes.size() == 2, "Should only have two nodes");
59-
assertTrue(nodes.get(0).equals("192.168.1.3") && nodes.get(1).equals("192.168.2.252"), "Wrong nodes data");
60-
61-
server.stop(0);
58+
assertTrue(nodes.size() == 2, "Should only have two nodes");
59+
assertTrue(nodes.contains("192.168.1.3") && nodes.contains("192.168.2.252"), "Wrong nodes data");
60+
}
61+
finally {
62+
server.stop(0);
63+
}
6264
}
6365

6466
@Test
@@ -70,15 +72,20 @@ public void testMasterOnlyCluster()
7072
{
7173
HttpServer server = createServer("/v1/node", new NoWorker(), "/v1/node/failed", new NoFailedNode());
7274

73-
log.info("STARTED SERVER");
75+
try {
76+
log.info("STARTED SERVER");
7477

75-
ClusterManager clusterManager = getPrestoClusterManager();
76-
List<String> nodes = clusterManager.getNodes();
77-
log.info("Got nodes: " + nodes);
78+
ClusterManager clusterManager = getPrestoClusterManager();
79+
Set<String> nodes = clusterManager.getNodes();
80+
log.info("Got nodes: " + nodes);
81+
log.info(" Host address: " + InetAddress.getLocalHost().getHostAddress());
7882

79-
assertTrue(nodes.size() == 1, "Should have added localhost in list");
80-
assertTrue(nodes.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
81-
server.stop(0);
83+
assertTrue(nodes.size() == 1, "Should have added localhost in list");
84+
assertTrue(nodes.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
85+
}
86+
finally {
87+
server.stop(0);
88+
}
8289
}
8390

8491
@Test
@@ -89,17 +96,19 @@ public void testFailedNodeCluster()
8996
throws IOException
9097
{
9198
HttpServer server = createServer("/v1/node", new MultipleWorkers(), "/v1/node/failed", new OneFailedNode());
99+
try {
100+
log.info("STARTED SERVER");
92101

93-
log.info("STARTED SERVER");
102+
ClusterManager clusterManager = getPrestoClusterManager();
103+
Set<String> nodes = clusterManager.getNodes();
104+
log.info("Got nodes: " + nodes);
94105

95-
ClusterManager clusterManager = getPrestoClusterManager();
96-
List<String> nodes = clusterManager.getNodes();
97-
log.info("Got nodes: " + nodes);
98-
99-
assertTrue(nodes.size() == 1, "Should only have two nodes");
100-
assertTrue(nodes.get(0).equals("192.168.2.252"), "Wrong nodes data");
101-
102-
server.stop(0);
106+
assertTrue(nodes.size() == 1, "Should only have two nodes");
107+
assertTrue(nodes.contains("192.168.2.252"), "Wrong nodes data");
108+
}
109+
finally {
110+
server.stop(0);
111+
}
103112
}
104113

105114
private HttpServer createServer(String endpoint1, HttpHandler handler1, String endpoint2, HttpHandler handler2)
@@ -113,12 +122,14 @@ private HttpServer createServer(String endpoint1, HttpHandler handler1, String e
113122
return server;
114123
}
115124

125+
abstract protected ClusterManager newPrestoClusterManager(Configuration conf);
126+
116127
private ClusterManager getPrestoClusterManager()
117128
throws UnknownHostException
118129
{
119-
ClusterManager clusterManager = new PrestoClusterManager();
120130
Configuration conf = new Configuration();
121-
conf.setInt(PrestoClusterManager.serverPortConf, 45326);
131+
conf.setInt(StandaloneNodeManager.SERVER_PORT_CONF_KEY, 45326);
132+
ClusterManager clusterManager = newPrestoClusterManager(conf);
122133
clusterManager.initialize(conf);
123134
return clusterManager;
124135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.prestosql;
14+
15+
import com.qubole.rubix.spi.ClusterManager;
16+
import org.apache.hadoop.conf.Configuration;
17+
18+
public class TestSyncClusterManager extends TestClusterManager {
19+
@Override
20+
protected ClusterManager newPrestoClusterManager(Configuration conf) {
21+
PrestoClusterManager.setNodeManager(new StandaloneNodeManager(conf));
22+
return new SyncPrestoClusterManager();
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.spi;
14+
15+
import com.google.common.cache.CacheBuilder;
16+
import com.google.common.cache.CacheLoader;
17+
import com.google.common.cache.LoadingCache;
18+
import org.apache.commons.logging.Log;
19+
import org.apache.commons.logging.LogFactory;
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.net.UnknownHostException;
23+
import java.util.Set;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import static java.util.Objects.requireNonNull;
30+
31+
/**
32+
* Created by stagra on 14/1/16.
33+
*/
34+
35+
/*
36+
* This class should be implemented for each engine.
37+
* The implementation should return the nodes in a form which the scheduler of that engine can recognize and route the splits to
38+
*/
39+
public abstract class AsyncClusterManager extends ClusterManager
40+
{
41+
private static Log log = LogFactory.getLog(AsyncClusterManager.class);
42+
43+
private final AtomicReference<LoadingCache<String, Set<String>>> nodesCache = new AtomicReference<>();
44+
45+
public void initialize(Configuration conf)
46+
throws UnknownHostException
47+
{
48+
super.initialize(conf);
49+
if (nodesCache.get() == null) {
50+
synchronized (nodesCache) {
51+
if (nodesCache.get() == null) {
52+
int nodeRefreshTime = CacheConfig.getClusterNodeRefreshTime(conf);
53+
ExecutorService executor =
54+
Executors.newSingleThreadExecutor(
55+
r -> {
56+
Thread t = Executors.defaultThreadFactory().newThread(r);
57+
t.setName("rubix-get-nodes-thread");
58+
t.setDaemon(true);
59+
return t;
60+
});
61+
62+
nodesCache.set(
63+
CacheBuilder.newBuilder()
64+
.refreshAfterWrite(nodeRefreshTime, TimeUnit.SECONDS)
65+
.build(
66+
CacheLoader.asyncReloading(
67+
new CacheLoader<String, Set<String>>() {
68+
@Override
69+
public Set<String> load(String s) {
70+
return getNodesAndUpdateState();
71+
}
72+
},
73+
executor)));
74+
}
75+
}
76+
}
77+
}
78+
79+
// Returns sorted list of nodes in the cluster
80+
public Set<String> getNodes()
81+
{
82+
requireNonNull(nodesCache, "ClusterManager used before initialization");
83+
return nodesCache.get().getUnchecked("nodes");
84+
}
85+
86+
}

‎rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java

+15-50
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
*/
1313
package com.qubole.rubix.spi;
1414

15-
import com.google.common.cache.CacheBuilder;
16-
import com.google.common.cache.CacheLoader;
17-
import com.google.common.cache.LoadingCache;
18-
import com.google.common.collect.ImmutableList;
15+
import com.google.common.collect.ImmutableSet;
1916
import org.apache.commons.logging.Log;
2017
import org.apache.commons.logging.LogFactory;
2118
import org.apache.hadoop.conf.Configuration;
@@ -26,13 +23,7 @@
2623

2724
import java.net.InetAddress;
2825
import java.net.UnknownHostException;
29-
import java.util.List;
30-
import java.util.concurrent.ExecutorService;
31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.atomic.AtomicReference;
34-
35-
import static java.util.Objects.requireNonNull;
26+
import java.util.Set;
3627

3728
/**
3829
* Created by stagra on 14/1/16.
@@ -46,10 +37,9 @@ public abstract class ClusterManager
4637
{
4738
private static Log log = LogFactory.getLog(ClusterManager.class);
4839

49-
private String currentNodeName;
40+
protected String currentNodeName;
5041
private String nodeHostname;
5142
private String nodeHostAddress;
52-
private final AtomicReference<LoadingCache<String, List<String>>> nodesCache = new AtomicReference<>();
5343
// Concluded from testing that Metro Hash results in better load distribution across the nodes in cluster.
5444
private final ConsistentHash<SimpleNode> consistentHashRing = HashRing.<SimpleNode>newBuilder()
5545
.hasher(DefaultHasher.METRO_HASH)
@@ -62,7 +52,10 @@ public abstract class ClusterManager
6252
* returns null in case node list cannot be fetched
6353
* returns empty in case of master-only setup
6454
*/
65-
protected abstract List<String> getNodesInternal();
55+
protected abstract Set<String> getNodesInternal();
56+
57+
// Returns sorted list of nodes in the cluster
58+
public abstract Set<String> getNodes();
6659

6760
protected String getCurrentNodeHostname()
6861
{
@@ -74,16 +67,15 @@ protected String getCurrentNodeHostAddress()
7467
return nodeHostAddress;
7568
}
7669

77-
private List<String> getNodesAndUpdateState()
70+
protected synchronized Set<String> getNodesAndUpdateState()
7871
{
79-
requireNonNull(nodesCache, "ClusterManager used before initialization");
80-
List<String> nodes = getNodesInternal();
72+
Set<String> nodes = getNodesInternal();
8173
if (nodes == null) {
82-
nodes = ImmutableList.of();
74+
nodes = ImmutableSet.of();
8375
} else if (nodes.isEmpty()) {
8476
// Empty result set => server up and only master node running, return localhost has the only node
8577
// Do not need to consider failed nodes list as 1node cluster and server is up since it replied to allNodesRequest
86-
nodes = ImmutableList.of(getCurrentNodeHostAddress());
78+
nodes = ImmutableSet.of(getCurrentNodeHostAddress());
8779
}
8880

8981
// remove stale nodes from consistent hash ring
@@ -122,32 +114,11 @@ else if (consistentHashRing.contains(SimpleNode.of(getCurrentNodeHostAddress()))
122114
public void initialize(Configuration conf)
123115
throws UnknownHostException
124116
{
125-
if (nodesCache.get() == null) {
126-
synchronized (nodesCache) {
127-
if (nodesCache.get() == null) {
128-
int nodeRefreshTime = CacheConfig.getClusterNodeRefreshTime(conf);
129-
117+
if (nodeHostname == null) {
118+
synchronized (this) {
119+
if (nodeHostname == null) {
130120
nodeHostname = InetAddress.getLocalHost().getCanonicalHostName();
131121
nodeHostAddress = InetAddress.getLocalHost().getHostAddress();
132-
133-
ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
134-
Thread t = Executors.defaultThreadFactory().newThread(r);
135-
t.setName("rubix-get-nodes-thread");
136-
t.setDaemon(true);
137-
return t;
138-
});
139-
140-
nodesCache.set(
141-
CacheBuilder.newBuilder()
142-
.refreshAfterWrite(nodeRefreshTime, TimeUnit.SECONDS)
143-
.build(CacheLoader.asyncReloading(new CacheLoader<String, List<String>>()
144-
{
145-
@Override
146-
public List<String> load(String s)
147-
{
148-
return getNodesAndUpdateState();
149-
}
150-
}, executor)));
151122
}
152123
}
153124
}
@@ -158,12 +129,6 @@ public String locateKey(String key)
158129
return consistentHashRing.locate(key).orElseThrow(() -> new RuntimeException("Unable to locate key: " + key)).getKey();
159130
}
160131

161-
// Returns sorted list of nodes in the cluster
162-
public List<String> getNodes()
163-
{
164-
return nodesCache.get().getUnchecked("nodes");
165-
}
166-
167132
public String getCurrentNodeName()
168133
{
169134
// refresh cluster nodes first, which updates currentNodeName if it is not set.
@@ -174,7 +139,7 @@ public String getCurrentNodeName()
174139
private void refreshClusterNodes()
175140
{
176141
// getNodes() updates the currentNodeName
177-
List<String> nodes = getNodes();
142+
Set<String> nodes = getNodes();
178143
if (nodes == null) {
179144
log.error("Initialization not done for Cluster Type: " + getClusterType());
180145
throw new RuntimeException("Unable to find current node name");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright (c) 2019. Qubole Inc
3+
* Licensed under the Apache License, Version 2.0 (the License);
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an AS IS BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License. See accompanying LICENSE file.
12+
*/
13+
package com.qubole.rubix.spi;
14+
15+
import org.apache.commons.logging.Log;
16+
import org.apache.commons.logging.LogFactory;
17+
18+
import java.util.Set;
19+
20+
public abstract class SyncClusterManager extends ClusterManager
21+
{
22+
private static Log log = LogFactory.getLog(SyncClusterManager.class);
23+
24+
private volatile Set<String> currentNodes;
25+
26+
protected abstract boolean hasStateChanged();
27+
28+
private void updateStateIfChanged() {
29+
if (hasStateChanged()) {
30+
currentNodes = getNodesAndUpdateState();
31+
}
32+
}
33+
34+
@Override
35+
public String locateKey(String key)
36+
{
37+
updateStateIfChanged();
38+
return super.locateKey(key);
39+
}
40+
41+
@Override
42+
public String getCurrentNodeName()
43+
{
44+
updateStateIfChanged();
45+
return super.getCurrentNodeName();
46+
}
47+
// Returns sorted list of nodes in the cluster
48+
@Override
49+
public Set<String> getNodes()
50+
{
51+
updateStateIfChanged();
52+
return currentNodes;
53+
}
54+
}

0 commit comments

Comments
 (0)
Please sign in to comment.