Skip to content

Commit

Permalink
+ coco4j version bump
Browse files Browse the repository at this point in the history
  • Loading branch information
q3769 committed May 12, 2024
1 parent 3cc7418 commit 0efa144
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 70 deletions.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<plugin>
<groupId>io.github.q3769</groupId>
<artifactId>semver-maven-plugin</artifactId>
<version>20221011.20230506.20230611</version>
<version>20240116.0.202402</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -187,6 +187,11 @@
<version>3.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.github.q3769</groupId>
<artifactId>coco4j</artifactId>
<version>10.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
78 changes: 9 additions & 69 deletions src/main/java/conseq4j/execute/ConseqExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

package conseq4j.execute;

import static coco4j.Tasks.callUnchecked;

import coco4j.DefensiveFuture;
import conseq4j.Terminable;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -89,14 +92,6 @@ private ConseqExecutor(ExecutorService workerExecutorService) {
return new ConseqExecutor(workerExecutorService);
}

private static <T> T call(Callable<T> task) {
try {
return task.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}

/**
* @param command the command to run asynchronously in proper sequence
* @param sequenceKey the key under which this task should be sequenced
Expand Down Expand Up @@ -144,17 +139,17 @@ private static <T> T call(Callable<T> task) {
@Override
@SuppressWarnings("unchecked")
public <T> @NonNull Future<T> submit(Callable<T> task, Object sequenceKey) {
CompletableFuture<?> latestTask = activeSequentialTasks.compute(
CompletableFuture<?> taskCompletable = activeSequentialTasks.compute(
sequenceKey,
(k, presentTask) -> (presentTask == null)
? CompletableFuture.supplyAsync(() -> call(task), workerExecutorService)
: presentTask.handleAsync((r, e) -> call(task), workerExecutorService));
Future<?> copy = new ProtectiveFuture<>(latestTask);
latestTask.whenCompleteAsync(
? CompletableFuture.supplyAsync(() -> callUnchecked(task), workerExecutorService)
: presentTask.handleAsync((r, e) -> callUnchecked(task), workerExecutorService));
CompletableFuture<?> copy = taskCompletable.copy();
taskCompletable.whenCompleteAsync(
(r, e) -> activeSequentialTasks.computeIfPresent(
sequenceKey, (k, checkedTask) -> checkedTask.isDone() ? null : checkedTask),
adminExecutorService);
return (ProtectiveFuture<T>) copy;
return (Future<T>) new DefensiveFuture<>(copy);
}

/** Orderly shutdown, and awaits thread pool termination. */
Expand Down Expand Up @@ -188,59 +183,4 @@ public boolean isTerminated() {
adminExecutorService.shutdownNow();
return neverStartedTasks;
}

/**
* Protective copy of the wrapped {@link CompletableFuture} instance, so the client does not get to interact with
* internal thread resources
*
* @param <V> The value type held by the Future
*/
public static final class ProtectiveFuture<V> implements Future<V> {
private final CompletableFuture<V> completableFuture;

private ProtectiveFuture(CompletableFuture<V> completableFuture) {
this.completableFuture = completableFuture;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return this.completableFuture.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return this.completableFuture.isCancelled();
}

@Override
public boolean isDone() {
return this.completableFuture.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return this.completableFuture.get();
}

@Override
public V get(long timeout, @NonNull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return this.completableFuture.get(timeout, unit);
}

@Override
public V resultNow() {
return this.completableFuture.resultNow();
}

@Override
public Throwable exceptionNow() {
return this.completableFuture.exceptionNow();
}

@Override
public State state() {
return this.completableFuture.state();
}
}
}

0 comments on commit 0efa144

Please sign in to comment.