Skip to content

Commit a6a0a56

Browse files
HBASE-29713 resolving race condition while acquiring and releasing the worker for splitWalProcedure (#7465)
Signed-off-by: Andrew Purtell <[email protected]> Signed-off-by: Viraj Jasani <[email protected]> Signed-off-by: Aman Poonia <[email protected]>
1 parent 854921d commit a6a0a56

File tree

2 files changed

+99
-2
lines changed

2 files changed

+99
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,12 @@ public synchronized void release(ServerName serverName) {
225225
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
226226
}
227227

228-
public void suspend(Procedure<?> proc) {
228+
public synchronized void suspend(Procedure<?> proc) {
229229
event.suspend();
230230
event.suspendIfNotReady(proc);
231231
}
232232

233-
public void wake(MasterProcedureScheduler scheduler) {
233+
public synchronized void wake(MasterProcedureScheduler scheduler) {
234234
if (!event.isReady()) {
235235
event.wake(scheduler);
236236
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,103 @@ public void testWorkerReloadWhenMasterRestart() throws Exception {
302302
failedProcedure.countDown();
303303
}
304304

305+
/**
306+
* Test the race condition between suspend() and wake() in SplitWorkerAssigner. This test
307+
* reproduces the issue where a procedure can be lost if wake() is called between event.suspend()
308+
* and event.suspendIfNotReady() in the suspend() method.
309+
* <p>
310+
* The race condition happens when: 1. Thread-1: calls suspend() which sets event.ready=false 2.
311+
* Thread-2: calls wake() which sees ready=false and marks event.ready=true 3. Thread-1: calls
312+
* suspendIfNotReady() which sees ready=true and doesn't add procedure 4. Result: Procedure is in
313+
* WAITING state but not in suspended queue, never woken up
314+
*/
315+
@Test
316+
public void testSuspendWakeRaceCondition() throws Exception {
317+
final int NUM_ITERATIONS = 50; // Run multiple times to increase chance of race
318+
final int NUM_PROCEDURES = 10;
319+
320+
for (int iteration = 0; iteration < NUM_ITERATIONS; iteration++) {
321+
List<FakeServerProcedure> testProcedures = new ArrayList<>();
322+
323+
// Fill all worker slots (3 servers * 1 max splitter = 3 workers)
324+
for (int i = 0; i < 3; i++) {
325+
FakeServerProcedure procedure =
326+
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
327+
testProcedures.add(procedure);
328+
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
329+
HConstants.NO_NONCE, HConstants.NO_NONCE);
330+
}
331+
332+
// Wait for all workers to be acquired
333+
TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
334+
335+
// Create additional procedures that will need to suspend
336+
List<FakeServerProcedure> suspendedProcedures = new ArrayList<>();
337+
for (int i = 0; i < NUM_PROCEDURES; i++) {
338+
FakeServerProcedure procedure =
339+
new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
340+
suspendedProcedures.add(procedure);
341+
}
342+
343+
// Submit all suspended procedures in parallel to create contention
344+
CountDownLatch startLatch = new CountDownLatch(1);
345+
List<Thread> submitterThreads = new ArrayList<>();
346+
347+
for (FakeServerProcedure proc : suspendedProcedures) {
348+
Thread t = new Thread(() -> {
349+
try {
350+
startLatch.await();
351+
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), proc,
352+
HConstants.NO_NONCE, HConstants.NO_NONCE);
353+
} catch (Exception e) {
354+
LOG.error("Failed to submit procedure", e);
355+
}
356+
});
357+
t.start();
358+
submitterThreads.add(t);
359+
}
360+
361+
// Start all submissions at once
362+
startLatch.countDown();
363+
364+
// Simultaneously release workers to create race between suspend and wake
365+
Thread releaseThread = new Thread(() -> {
366+
try {
367+
Thread.sleep(10); // Small delay to ensure some procedures are suspending
368+
for (FakeServerProcedure proc : testProcedures) {
369+
proc.countDown();
370+
Thread.sleep(1); // Stagger releases slightly
371+
}
372+
for (FakeServerProcedure proc : suspendedProcedures) {
373+
proc.countDown();
374+
Thread.sleep(1); // Stagger releases slightly
375+
}
376+
} catch (Exception e) {
377+
LOG.error("Failed to release workers", e);
378+
}
379+
});
380+
releaseThread.start();
381+
382+
// Wait for all threads to finish
383+
for (Thread t : submitterThreads) {
384+
t.join(5000);
385+
}
386+
releaseThread.join(5000);
387+
388+
// All suspended procedures should eventually acquire workers and complete
389+
// This will fail if the race condition causes a procedure to be lost
390+
for (FakeServerProcedure proc : suspendedProcedures) {
391+
TEST_UTIL.waitFor(30000, 1000, proc::isWorkerAcquired);
392+
TEST_UTIL.waitFor(10000, proc::isSuccess);
393+
}
394+
395+
// Also check for the initial 3 procedures to complete
396+
for (FakeServerProcedure proc : testProcedures) {
397+
TEST_UTIL.waitFor(10000, proc::isSuccess);
398+
}
399+
}
400+
}
401+
305402
public static final class FakeServerProcedure
306403
extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
307404
implements ServerProcedureInterface {

0 commit comments

Comments
 (0)