diff --git a/pom.xml b/pom.xml
index ce50c06..951c14f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
io.github.q3769
semver-maven-plugin
- 20221011.20230506.20230611
+ 20240116.0.202402
org.apache.maven.plugins
@@ -187,6 +187,11 @@
3.0.2
provided
+
+ io.github.q3769
+ coco4j
+ 10.0.0
+
org.projectlombok
lombok
diff --git a/src/main/java/conseq4j/execute/ConseqExecutor.java b/src/main/java/conseq4j/execute/ConseqExecutor.java
index 6e8e3bf..41ff20c 100644
--- a/src/main/java/conseq4j/execute/ConseqExecutor.java
+++ b/src/main/java/conseq4j/execute/ConseqExecutor.java
@@ -24,6 +24,9 @@
package conseq4j.execute;
+import static coco4j.Tasks.callUnchecked;
+
+import coco4j.DefensiveFuture;
import conseq4j.Terminable;
import java.util.List;
import java.util.Map;
@@ -89,14 +92,6 @@ private ConseqExecutor(ExecutorService workerExecutorService) {
return new ConseqExecutor(workerExecutorService);
}
- private static T call(Callable task) {
- try {
- return task.call();
- } catch (Exception e) {
- throw new CompletionException(e);
- }
- }
-
/**
* @param command the command to run asynchronously in proper sequence
* @param sequenceKey the key under which this task should be sequenced
@@ -144,17 +139,17 @@ private static T call(Callable task) {
@Override
@SuppressWarnings("unchecked")
public @NonNull Future submit(Callable task, Object sequenceKey) {
- CompletableFuture> latestTask = activeSequentialTasks.compute(
+ CompletableFuture> taskCompletable = activeSequentialTasks.compute(
sequenceKey,
(k, presentTask) -> (presentTask == null)
- ? CompletableFuture.supplyAsync(() -> call(task), workerExecutorService)
- : presentTask.handleAsync((r, e) -> call(task), workerExecutorService));
- Future> copy = new ProtectiveFuture<>(latestTask);
- latestTask.whenCompleteAsync(
+ ? CompletableFuture.supplyAsync(() -> callUnchecked(task), workerExecutorService)
+ : presentTask.handleAsync((r, e) -> callUnchecked(task), workerExecutorService));
+ CompletableFuture> copy = taskCompletable.copy();
+ taskCompletable.whenCompleteAsync(
(r, e) -> activeSequentialTasks.computeIfPresent(
sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask),
adminExecutorService);
- return (ProtectiveFuture) copy;
+ return (Future) new DefensiveFuture<>(copy);
}
/** Orderly shutdown, and awaits thread pool termination. */
@@ -188,59 +183,4 @@ public boolean isTerminated() {
adminExecutorService.shutdownNow();
return neverStartedTasks;
}
-
- /**
- * Protective copy of the wrapped {@link CompletableFuture} instance, so the client does not get to interact with
- * internal thread resources
- *
- * @param The value type held by the Future
- */
- public static final class ProtectiveFuture implements Future {
- private final CompletableFuture completableFuture;
-
- private ProtectiveFuture(CompletableFuture completableFuture) {
- this.completableFuture = completableFuture;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return this.completableFuture.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return this.completableFuture.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return this.completableFuture.isDone();
- }
-
- @Override
- public V get() throws InterruptedException, ExecutionException {
- return this.completableFuture.get();
- }
-
- @Override
- public V get(long timeout, @NonNull TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return this.completableFuture.get(timeout, unit);
- }
-
- @Override
- public V resultNow() {
- return this.completableFuture.resultNow();
- }
-
- @Override
- public Throwable exceptionNow() {
- return this.completableFuture.exceptionNow();
- }
-
- @Override
- public State state() {
- return this.completableFuture.state();
- }
- }
}