From 0efa144f54f50a5f6c294beb1a6c6929a5da46f9 Mon Sep 17 00:00:00 2001 From: Qingtian Wang Date: Sun, 12 May 2024 01:41:31 -0500 Subject: [PATCH] + coco4j version bump --- pom.xml | 7 +- .../java/conseq4j/execute/ConseqExecutor.java | 78 +++---------------- 2 files changed, 15 insertions(+), 70 deletions(-) diff --git a/pom.xml b/pom.xml index ce50c06..951c14f 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ io.github.q3769 semver-maven-plugin - 20221011.20230506.20230611 + 20240116.0.202402 org.apache.maven.plugins @@ -187,6 +187,11 @@ 3.0.2 provided + + io.github.q3769 + coco4j + 10.0.0 + org.projectlombok lombok diff --git a/src/main/java/conseq4j/execute/ConseqExecutor.java b/src/main/java/conseq4j/execute/ConseqExecutor.java index 6e8e3bf..41ff20c 100644 --- a/src/main/java/conseq4j/execute/ConseqExecutor.java +++ b/src/main/java/conseq4j/execute/ConseqExecutor.java @@ -24,6 +24,9 @@ package conseq4j.execute; +import static coco4j.Tasks.callUnchecked; + +import coco4j.DefensiveFuture; import conseq4j.Terminable; import java.util.List; import java.util.Map; @@ -89,14 +92,6 @@ private ConseqExecutor(ExecutorService workerExecutorService) { return new ConseqExecutor(workerExecutorService); } - private static T call(Callable task) { - try { - return task.call(); - } catch (Exception e) { - throw new CompletionException(e); - } - } - /** * @param command the command to run asynchronously in proper sequence * @param sequenceKey the key under which this task should be sequenced @@ -144,17 +139,17 @@ private static T call(Callable task) { @Override @SuppressWarnings("unchecked") public @NonNull Future submit(Callable task, Object sequenceKey) { - CompletableFuture latestTask = activeSequentialTasks.compute( + CompletableFuture taskCompletable = activeSequentialTasks.compute( sequenceKey, (k, presentTask) -> (presentTask == null) - ? CompletableFuture.supplyAsync(() -> call(task), workerExecutorService) - : presentTask.handleAsync((r, e) -> call(task), workerExecutorService)); - Future copy = new ProtectiveFuture<>(latestTask); - latestTask.whenCompleteAsync( + ? CompletableFuture.supplyAsync(() -> callUnchecked(task), workerExecutorService) + : presentTask.handleAsync((r, e) -> callUnchecked(task), workerExecutorService)); + CompletableFuture copy = taskCompletable.copy(); + taskCompletable.whenCompleteAsync( (r, e) -> activeSequentialTasks.computeIfPresent( sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask), adminExecutorService); - return (ProtectiveFuture) copy; + return (Future) new DefensiveFuture<>(copy); } /** Orderly shutdown, and awaits thread pool termination. */ @@ -188,59 +183,4 @@ public boolean isTerminated() { adminExecutorService.shutdownNow(); return neverStartedTasks; } - - /** - * Protective copy of the wrapped {@link CompletableFuture} instance, so the client does not get to interact with - * internal thread resources - * - * @param The value type held by the Future - */ - public static final class ProtectiveFuture implements Future { - private final CompletableFuture completableFuture; - - private ProtectiveFuture(CompletableFuture completableFuture) { - this.completableFuture = completableFuture; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return this.completableFuture.cancel(mayInterruptIfRunning); - } - - @Override - public boolean isCancelled() { - return this.completableFuture.isCancelled(); - } - - @Override - public boolean isDone() { - return this.completableFuture.isDone(); - } - - @Override - public V get() throws InterruptedException, ExecutionException { - return this.completableFuture.get(); - } - - @Override - public V get(long timeout, @NonNull TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return this.completableFuture.get(timeout, unit); - } - - @Override - public V resultNow() { - return this.completableFuture.resultNow(); - } - - @Override - public Throwable exceptionNow() { - return this.completableFuture.exceptionNow(); - } - - @Override - public State state() { - return this.completableFuture.state(); - } - } }