Skip to content

Commit 88df1a5

Browse files
committed
fix : fix tests
1 parent 6bdb888 commit 88df1a5

File tree

1 file changed

+50
-15
lines changed

1 file changed

+50
-15
lines changed

src/main/java/org/alxkm/patterns/leaderfollower/LeaderFollowerPattern.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.util.concurrent.locks.ReentrantLock;
99
import java.util.concurrent.locks.Condition;
1010
import java.util.function.Consumer;
11+
import java.util.List;
12+
import java.util.ArrayList;
1113

1214
/**
1315
* Leader-Follower Pattern implementation.
@@ -38,6 +40,9 @@ public class LeaderFollowerPattern<T> {
3840
private final AtomicInteger processedEvents;
3941
private final AtomicInteger leaderPromotions;
4042

43+
// Thread management
44+
private final List<WorkerThread> workerThreads;
45+
4146
/**
4247
* Creates a new Leader-Follower thread pool.
4348
*
@@ -61,10 +66,11 @@ public LeaderFollowerPattern(int threadPoolSize, Consumer<T> eventProcessor) {
6166
this.leaderLock = new ReentrantLock();
6267
this.followerCondition = leaderLock.newCondition();
6368
this.currentLeader = null;
64-
this.followerQueue = new LinkedBlockingQueue<>();
69+
this.followerQueue = new LinkedBlockingQueue<>(threadPoolSize);
6570

6671
this.processedEvents = new AtomicInteger(0);
6772
this.leaderPromotions = new AtomicInteger(0);
73+
this.workerThreads = new ArrayList<>(threadPoolSize);
6874
}
6975

7076
/**
@@ -76,6 +82,7 @@ private class WorkerThread extends Thread {
7682
public WorkerThread(int threadId) {
7783
this.threadId = threadId;
7884
setName("LeaderFollower-Worker-" + threadId);
85+
setDaemon(true); // Make daemon thread to prevent JVM hanging
7986
}
8087

8188
@Override
@@ -84,23 +91,37 @@ public void run() {
8491

8592
try {
8693
while (isRunning.get()) {
87-
if (becomeLeader()) {
88-
// I am the leader, wait for and process events
89-
processAsLeader();
90-
} else {
91-
// I am a follower, wait to be promoted
92-
waitAsFollower();
94+
try {
95+
if (becomeLeader()) {
96+
// I am the leader, wait for and process events
97+
processAsLeader();
98+
} else {
99+
// I am a follower, wait to be promoted
100+
waitAsFollower();
101+
}
102+
} catch (InterruptedException e) {
103+
// Exit if interrupted
104+
Thread.currentThread().interrupt();
105+
break;
93106
}
94107

95108
// Check if we should stop
96109
if (!isRunning.get()) {
97110
break;
98111
}
99112
}
100-
} catch (InterruptedException e) {
101-
Thread.currentThread().interrupt();
102113
} finally {
103114
activeThreads.decrementAndGet();
115+
// Remove self from follower queue on exit
116+
leaderLock.lock();
117+
try {
118+
followerQueue.remove(Thread.currentThread());
119+
if (currentLeader == Thread.currentThread()) {
120+
currentLeader = null;
121+
}
122+
} finally {
123+
leaderLock.unlock();
124+
}
104125
}
105126
}
106127

@@ -115,8 +136,10 @@ private boolean becomeLeader() {
115136
leaderPromotions.incrementAndGet();
116137
return true;
117138
} else {
118-
// Add self to follower queue
119-
followerQueue.offer(Thread.currentThread());
139+
// Add self to follower queue only if not already there
140+
if (!followerQueue.contains(Thread.currentThread())) {
141+
followerQueue.offer(Thread.currentThread());
142+
}
120143
return false;
121144
}
122145
} finally {
@@ -143,9 +166,6 @@ private void processAsLeader() throws InterruptedException {
143166

144167
// After processing, promote a follower to leader and step down
145168
promoteFollowerToLeader();
146-
147-
// Add self back to follower queue
148-
followerQueue.offer(Thread.currentThread());
149169
break; // Exit leader role
150170
}
151171
}
@@ -219,9 +239,14 @@ private void stepDownAsLeader() {
219239
*/
220240
public void start() {
221241
if (isRunning.compareAndSet(false, true)) {
242+
// Clear any previous threads
243+
workerThreads.clear();
244+
222245
// Start worker threads
223246
for (int i = 0; i < threadPoolSize; i++) {
224-
new WorkerThread(i).start();
247+
WorkerThread thread = new WorkerThread(i);
248+
workerThreads.add(thread);
249+
thread.start();
225250
}
226251

227252
// Wait for threads to initialize with timeout
@@ -277,6 +302,11 @@ public void shutdown() {
277302
leaderLock.unlock();
278303
}
279304

305+
// Interrupt all threads
306+
for (WorkerThread thread : workerThreads) {
307+
thread.interrupt();
308+
}
309+
280310
// Wait for all threads to finish
281311
long timeout = System.currentTimeMillis() + 5000; // 5 second timeout
282312
while (activeThreads.get() > 0 && System.currentTimeMillis() < timeout) {
@@ -306,6 +336,11 @@ public void shutdownNow() {
306336
} finally {
307337
leaderLock.unlock();
308338
}
339+
340+
// Interrupt all threads immediately
341+
for (WorkerThread thread : workerThreads) {
342+
thread.interrupt();
343+
}
309344
}
310345

311346
/**

0 commit comments

Comments
 (0)