|
18 | 18 |
|
19 | 19 | package org.apache.hadoop.hive.metastore; |
20 | 20 |
|
| 21 | +import org.apache.commons.lang3.tuple.Pair; |
21 | 22 | import org.apache.hadoop.conf.Configuration; |
22 | 23 | import org.apache.hadoop.hdfs.MiniDFSCluster; |
| 24 | +import org.apache.hadoop.hive.common.TableName; |
23 | 25 | import org.apache.hadoop.hive.metastore.conf.MetastoreConf; |
24 | 26 | import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; |
25 | 27 | import org.apache.hadoop.hive.metastore.leader.LeaderElection; |
| 28 | +import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext; |
| 29 | +import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection; |
26 | 30 | import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; |
27 | 31 | import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread; |
28 | 32 | import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; |
|
32 | 36 | import org.slf4j.Logger; |
33 | 37 | import org.slf4j.LoggerFactory; |
34 | 38 |
|
| 39 | +import java.io.IOException; |
| 40 | +import java.util.ArrayList; |
35 | 41 | import java.util.HashMap; |
| 42 | +import java.util.List; |
36 | 43 | import java.util.Map; |
37 | 44 | import java.util.Set; |
38 | 45 | import java.util.concurrent.CountDownLatch; |
|
41 | 48 | /** |
42 | 49 | * Base class for HMS leader config testing. |
43 | 50 | */ |
44 | | -public abstract class MetastoreHousekeepingLeaderTestBase { |
| 51 | +abstract class MetastoreHousekeepingLeaderTestBase { |
45 | 52 | private static final Logger LOG = LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class); |
46 | 53 | private static HiveMetaStoreClient client; |
47 | 54 | protected Configuration conf; |
@@ -72,8 +79,6 @@ void setup(final String leaderHostName, Configuration configuration) throws Exce |
72 | 79 |
|
73 | 80 | warehouse = new Warehouse(conf); |
74 | 81 |
|
75 | | - conf.set("metastore.leader.test.listener", TestLeaderNotification.class.getName()); |
76 | | - |
77 | 82 | if (isServerStarted) { |
78 | 83 | Assert.assertNotNull("Unable to connect to the MetaStore server", client); |
79 | 84 | return; |
@@ -201,32 +206,145 @@ private void resetThreadStatus() { |
201 | 206 | threadClasses.forEach((thread, status) -> threadClasses.put(thread, false)); |
202 | 207 | } |
203 | 208 |
|
204 | | - public static class TestLeaderNotification implements LeaderElection.LeadershipStateListener { |
| 209 | + static class CombinedLeaderElector implements AutoCloseable { |
| 210 | + List<Pair<TableName, LeaderElection<TableName>>> elections = new ArrayList<>(); |
| 211 | + private final Configuration configuration; |
| 212 | + private String name; |
| 213 | + |
| 214 | + CombinedLeaderElector(Configuration conf) throws IOException { |
| 215 | + this.configuration = conf; |
| 216 | + for (LeaderElectionContext.TTYPE type : LeaderElectionContext.TTYPE.values()) { |
| 217 | + TableName table = type.getTableName(); |
| 218 | + elections.add(Pair.of(table, new LeaseLeaderElection())); |
| 219 | + } |
| 220 | + } |
| 221 | + |
| 222 | + public void tryBeLeader() throws Exception { |
| 223 | + int i = 0; |
| 224 | + for (Pair<TableName, LeaderElection<TableName>> election : elections) { |
| 225 | + LeaderElection<TableName> le = election.getRight(); |
| 226 | + le.setName(name + "-" + i++); |
| 227 | + le.tryBeLeader(configuration, election.getLeft()); |
| 228 | + } |
| 229 | + } |
| 230 | + |
| 231 | + public boolean isLeader() { |
| 232 | + boolean isLeader = true; |
| 233 | + for (Pair<TableName, LeaderElection<TableName>> election : elections) { |
| 234 | + isLeader &= election.getRight().isLeader(); |
| 235 | + } |
| 236 | + return isLeader; |
| 237 | + } |
| 238 | + |
| 239 | + public void setName(String name) { |
| 240 | + this.name = name; |
| 241 | + } |
| 242 | + |
| 243 | + @Override |
| 244 | + public void close() throws Exception { |
| 245 | + for (Pair<TableName, LeaderElection<TableName>> election : elections) { |
| 246 | + election.getRight().close(); |
| 247 | + } |
| 248 | + } |
| 249 | + } |
| 250 | + |
| 251 | + static class ReleaseAndRequireLease extends LeaseLeaderElection { |
205 | 252 | private static CountDownLatch latch; |
| 253 | + private final Configuration configuration; |
| 254 | + private final boolean needRenewLease; |
| 255 | + private TableName tableName; |
206 | 256 |
|
207 | 257 | public static void setMonitor(CountDownLatch latch) { |
208 | | - TestLeaderNotification.latch = latch; |
| 258 | + ReleaseAndRequireLease.latch = latch; |
209 | 259 | } |
210 | 260 | public static void reset() { |
211 | | - TestLeaderNotification.latch = null; |
| 261 | + ReleaseAndRequireLease.latch = null; |
| 262 | + } |
| 263 | + |
| 264 | + public ReleaseAndRequireLease(Configuration conf, boolean needRenewLease) throws IOException { |
| 265 | + super(); |
| 266 | + this.configuration = conf; |
| 267 | + this.needRenewLease = needRenewLease; |
212 | 268 | } |
213 | 269 |
|
214 | 270 | @Override |
215 | | - public void takeLeadership(LeaderElection election) throws Exception { |
216 | | - if (latch != null) { |
217 | | - latch.countDown(); |
| 271 | + public void setName(String name) { |
| 272 | + super.setName(name); |
| 273 | + LeaderElectionContext.TTYPE type = null; |
| 274 | + for (LeaderElectionContext.TTYPE value : LeaderElectionContext.TTYPE.values()) { |
| 275 | + if (value.getName().equalsIgnoreCase(name)) { |
| 276 | + type = value; |
| 277 | + break; |
| 278 | + } |
| 279 | + } |
| 280 | + if (type == null) { |
| 281 | + // This shouldn't happen at all |
| 282 | + throw new AssertionError("Unknown elector name: " + name); |
218 | 283 | } |
| 284 | + this.tableName = type.getTableName(); |
219 | 285 | } |
220 | 286 |
|
221 | 287 | @Override |
222 | | - public void lossLeadership(LeaderElection election) throws Exception { |
| 288 | + protected void notifyListener() { |
| 289 | + super.notifyListener(); |
| 290 | + if (isLeader) { |
| 291 | + if (!needRenewLease) { |
| 292 | + super.shutdownWatcher(); |
| 293 | + // In our tests, the time spent on notifying the listener might be greater than the lease timeout, |
| 294 | + // which makes the leader loss the leadership quickly after wake up, and kill all housekeeping services. |
| 295 | + // Make sure the leader is still valid while notifying the listener, and switch to ReleaseAndRequireWatcher |
| 296 | + // after all listeners finish their work. |
| 297 | + heartbeater = new ReleaseAndRequireWatcher(configuration, tableName); |
| 298 | + heartbeater.startWatch(); |
| 299 | + } |
| 300 | + } else { |
| 301 | + try { |
| 302 | + // This is the last one get notified, sleep some time to make sure all other |
| 303 | + // services have been stopped before return |
| 304 | + Thread.sleep(12000); |
| 305 | + } catch (InterruptedException ignore) { |
| 306 | + } |
| 307 | + } |
223 | 308 | if (latch != null) { |
224 | | - // This is the last one get notified, sleep some time to make sure all other |
225 | | - // services have been stopped before return |
226 | | - Thread.sleep(12000); |
227 | 309 | latch.countDown(); |
228 | 310 | } |
229 | 311 | } |
| 312 | + |
| 313 | + // For testing purpose only, lock would become timeout and then acquire it again |
| 314 | + private class ReleaseAndRequireWatcher extends LeaseWatcher { |
| 315 | + long timeout; |
| 316 | + public ReleaseAndRequireWatcher(Configuration conf, |
| 317 | + TableName tableName) { |
| 318 | + super(conf, tableName); |
| 319 | + timeout = MetastoreConf.getTimeVar(conf, |
| 320 | + MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000; |
| 321 | + setName("ReleaseAndRequireWatcher-" + ((name != null) ? name + "-" : "") + ID.incrementAndGet()); |
| 322 | + } |
| 323 | + |
| 324 | + @Override |
| 325 | + public void beforeRun() { |
| 326 | + try { |
| 327 | + Thread.sleep(timeout); |
| 328 | + } catch (InterruptedException e) { |
| 329 | + // ignore this |
| 330 | + } |
| 331 | + } |
| 332 | + |
| 333 | + @Override |
| 334 | + public void runInternal() { |
| 335 | + shutDown(); |
| 336 | + // The timeout lock should be cleaned, |
| 337 | + // sleep some time to let others take the chance to become the leader |
| 338 | + try { |
| 339 | + Thread.sleep(5000); |
| 340 | + } catch (InterruptedException e) { |
| 341 | + // ignore |
| 342 | + } |
| 343 | + // Acquire the lock again |
| 344 | + conf = new Configuration(conf); |
| 345 | + reclaim(); |
| 346 | + } |
| 347 | + } |
230 | 348 | } |
231 | 349 |
|
232 | 350 | public void checkHouseKeepingThreadExistence(boolean isLeader) throws Exception { |
|
0 commit comments