Skip to content

Commit

Permalink
+ javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
q3769 committed Jun 11, 2024
1 parent 8071b46 commit d87a9e5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 12 deletions.
32 changes: 32 additions & 0 deletions src/main/java/conseq4j/execute/ConseqExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,32 @@
@ThreadSafe
@ToString
public final class ConseqExecutor implements SequentialExecutor, Terminable, AutoCloseable {
/**
* A concurrent hash map that stores the active sequential tasks, where the key is the sequence key, and the value
* is a CompletableFuture representing the currently executing task for that sequence key.
*/
private final Map<Object, CompletableFuture<?>> activeSequentialTasks = new ConcurrentHashMap<>();

/**
* An ExecutorService used for administrative tasks, such as cleaning up completed tasks from the
* activeSequentialTasks map.
*/
private final ExecutorService adminExecutorService = Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("conseq-admin-", 1).factory());

/**
* The worker thread pool facilitates the overall async execution, independent of the submitted tasks. Any thread
* from the pool can be used to execute any task, regardless of sequence keys. The pool capacity decides the overall
* max parallelism of task execution.
*/
private final ExecutorService workerExecutorService;

/**
* Private constructor that initializes the workerExecutorService.
*
* @param workerExecutorService The ExecutorService used for executing the actual tasks submitted to the
* ConseqExecutor.
*/
private ConseqExecutor(ExecutorService workerExecutorService) {
this.workerExecutorService = workerExecutorService;
}
Expand Down Expand Up @@ -146,10 +162,16 @@ public void close() {
adminExecutorService.close();
}

/**
* Checks if there are no tasks pending execution.
*
* @return true if there are no pending tasks, false otherwise.
*/
boolean noTaskPending() {
return activeSequentialTasks.isEmpty();
}

/** Initiates an orderly shutdown of the workerExecutorService and adminExecutorService. */
@Override
public void terminate() {
new Thread(() -> {
Expand All @@ -159,11 +181,21 @@ public void terminate() {
.start();
}

/**
* Checks if both the workerExecutorService and adminExecutorService have terminated.
*
* @return true if both services have terminated, false otherwise.
*/
@Override
public boolean isTerminated() {
return workerExecutorService.isTerminated() && adminExecutorService.isTerminated();
}

/**
* Attempts to stop all actively executing tasks and returns a list of tasks that never commenced execution.
*
* @return a list of tasks that never commenced execution.
*/
@Override
public @Nonnull List<Runnable> terminateNow() {
List<Runnable> neverStartedTasks = workerExecutorService.shutdownNow();
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/conseq4j/summon/ConseqServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.awaitility.core.ConditionFactory;

/**
* A factory to produce sequential executors of type {@link ExecutorService} with an upper-bound global execution
* concurrency. Providing task execution concurrency, as well as sequencing, via the {@link ExecutorService} API.
* This class represents a factory for creating and managing a collection of ExecutorService instances. Each
* ExecutorService instance is used to execute tasks concurrently in separate threads.
*
* @author Qingtian Wang
*/
Expand All @@ -56,8 +56,9 @@ public final class ConseqServiceFactory implements SequentialExecutorServiceFact
private final ConcurrentMap<Object, ShutdownDisabledExecutorService> sequentialExecutors;

/**
* @param concurrency max count of "buckets"/executors, i.e. the max number of unrelated tasks that can be
* concurrently executed at any given time by this conseq instance.
* Private constructor for the ConseqServiceFactory class.
*
* @param concurrency The maximum number of unrelated tasks that can be executed concurrently.
*/
private ConseqServiceFactory(int concurrency) {
if (concurrency <= 0) {
Expand All @@ -68,19 +69,19 @@ private ConseqServiceFactory(int concurrency) {
}

/**
* Default static factory method uses available processor count as max task concurrency.
* Factory method to create an instance of ConseqServiceFactory with default concurrency.
*
* @return ExecutorService factory with default concurrency
* @return An instance of ConseqServiceFactory.
*/
public static @Nonnull ConseqServiceFactory instance() {
return instance(DEFAULT_CONCURRENCY);
}

/**
* Static factory method taking specified task concurrency
* Factory method to create an instance of ConseqServiceFactory with specified concurrency.
*
* @param concurrency max number of tasks possible to be executed in parallel
* @return ExecutorService factory with given concurrency
* @param concurrency The maximum number of tasks that can be executed in parallel.
* @return An instance of ConseqServiceFactory.
*/
public static @Nonnull ConseqServiceFactory instance(int concurrency) {
return new ConseqServiceFactory(concurrency);
Expand All @@ -90,7 +91,13 @@ private static ConditionFactory awaitForever() {
return await().forever().pollDelay(Duration.ofMillis(10));
}

/** @return a single-thread executor that does not support any shutdown action. */
/**
* Method to get an ExecutorService for a given sequence key. If an ExecutorService for the sequence key does not
* exist, it creates a new one.
*
* @param sequenceKey The key for the sequence of tasks to be executed.
* @return a single-thread executor that does not support any shutdown action.
*/
@Override
public ExecutorService getExecutorService(Object sequenceKey) {
return this.sequentialExecutors.computeIfAbsent(
Expand All @@ -99,7 +106,7 @@ public ExecutorService getExecutorService(Object sequenceKey) {
ThreadFactories.newPlatformThreadFactory("sequential-executor"))));
}

/** Shuts down all executors and awaits termination to complete */
/** Method to shut down all ExecutorService instances and wait for them to terminate. */
@Override
public void close() {
sequentialExecutors.values().forEach(ShutdownDisabledExecutorService::shutdownDelegate);
Expand All @@ -110,16 +117,27 @@ private int bucketOf(Object sequenceKey) {
return floorMod(Objects.hash(sequenceKey), this.concurrency);
}

/** Method to terminate all ExecutorService instances. */
@Override
public void terminate() {
sequentialExecutors.values().parallelStream().forEach(ShutdownDisabledExecutorService::shutdownDelegate);
}

/**
* Method to check if all ExecutorService instances are terminated.
*
* @return True if all ExecutorService instances are terminated, false otherwise.
*/
@Override
public boolean isTerminated() {
return sequentialExecutors.values().stream().allMatch(ExecutorService::isTerminated);
}

/**
* Method to terminate all ExecutorService instances immediately.
*
* @return A list of tasks that never commenced execution.
*/
@Override
public List<Runnable> terminateNow() {
return sequentialExecutors.values().parallelStream()
Expand All @@ -143,7 +161,11 @@ static final class ShutdownDisabledExecutorService implements ExecutorService {
@Delegate(excludes = ShutdownOperations.class)
private final ExecutorService delegate;

/** @param delegate the delegate {@link ExecutorService} to run the submitted task(s). */
/**
* Constructor for the ShutdownDisabledExecutorService class.
*
* @param delegate The delegate ExecutorService to run the submitted tasks.
*/
public ShutdownDisabledExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
Expand All @@ -164,10 +186,16 @@ public void shutdown() {
throw new UnsupportedOperationException(SHUTDOWN_UNSUPPORTED_MESSAGE);
}

/** Method to shut down the delegate ExecutorService. */
void shutdownDelegate() {
this.delegate.shutdown();
}

/**
* Method to shut down the delegate ExecutorService immediately.
*
* @return A list of tasks that never commenced execution.
*/
@Nonnull
List<Runnable> shutdownDelegateNow() {
return this.delegate.shutdownNow();
Expand Down

0 comments on commit d87a9e5

Please sign in to comment.