|
| 1 | +## 6. DependentPromise |
| 2 | +As it mentioned above, once you cancel `Promise`, all `Promise`-s that depends on this promise are completed with [CompletionException](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionException.html) wrapping [CancellationException](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CancellationException.html). This is a standard behavior, and [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) works just like this. |
| 3 | + |
| 4 | +However, when you cancel derived `Promise`, the original `Promise` is not cancelled: |
| 5 | +```java |
| 6 | +Promise<?> original = CompletableTask.supplyAsync(() -> someIoBoundMethod(), myExecutor); |
| 7 | +Promise<?> derivedA = original.thenRunAsync(() -> someMethodA() ); |
| 8 | +Promise<?> derivedB = original.thenRunAsync(() -> someMethodB() ); |
| 9 | +... |
| 10 | +derivedB.cancel(true); |
| 11 | +``` |
| 12 | +So if you cancel `derivedB` above it's [Runnable](https://docs.oracle.com/javase/8/docs/api/java/lang/Runnable.html) method, wrapping `someMethod`, is interrupted. However the `original` promise is not cancelled and `someIoBoundMethod` keeps running. Also, `derivedA` is not cancelled, and such behavior is expected. However, sometimes we have a linear chain of the promises and have a requirement to cancel the whole chain from a tail to the head. Consider the following method: |
| 13 | + |
| 14 | +```java |
| 15 | +public Promise<DataStructure> loadData(String url) { |
| 16 | + return CompletableTask.supplyAsync( () -> loadXml(url) ).thenApplyAsync( xml -> parseXml(xml) ); |
| 17 | +} |
| 18 | + |
| 19 | +... |
| 20 | +Promise<DataStructure> p = loadData("http://someserver.com/rest/ds"); |
| 21 | +... |
| 22 | +if (someCondition()) { |
| 23 | + // Only second promise is canceled, parseXml. |
| 24 | + p.cancel(true); |
| 25 | +} |
| 26 | +``` |
| 27 | + |
| 28 | +Clients of this method see only derived promise, and once they decide to cancel it, it is expected that any of `loadXml` and `parseXml` will be interrupted if not completed yet. To address this issue the library provides [DependentPromise](https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/DependentPromise.java) class: |
| 29 | +```java |
| 30 | +public Promise<DataStructure> loadData(String url) { |
| 31 | + return DependentPromise |
| 32 | + .from(CompletableTask.supplyAsync( () -> loadXml(url) )) |
| 33 | + .thenApplyAsync( xml -> parseXml(xml), true ); |
| 34 | +} |
| 35 | + |
| 36 | +... |
| 37 | +Promise<DataStructure> p = loadData("http://someserver.com/rest/ds"); |
| 38 | +... |
| 39 | +if (someCondition()) { |
| 40 | + // Now the whole chain is canceled. |
| 41 | + p.cancel(true); |
| 42 | +} |
| 43 | +``` |
| 44 | +[DependentPromise](https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/DependentPromise.java) overloads methods like `thenApply` / `thenRun` / `thenAccept` / `thenCombine` etc with additional argument: |
| 45 | +- if method accepts no other [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html), like `thenApply` / `thenRun` / `thenAccept` etc, then it's a boolean flag `enlistOrigin` to specify whether or not the original `Promise` should be enlisted for the cancellation. |
| 46 | +- if method accepts other [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html), like `thenCombine` / `applyToEither` / `thenAcceptBoth` etc, then it's a set of [PromiseOrigin](https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/PromiseOrigin.java) enum values, that specifies whether or not the original `Promise` and/or a `CompletionStage` supplied as argument should be enlisted for the cancellation along with the resulting promise, for example: |
| 47 | + |
| 48 | +```java |
| 49 | +public Promise<DataStructure> loadData(String url) { |
| 50 | + return DependentPromise |
| 51 | + .from(CompletableTask.supplyAsync( () -> loadXml(url + "/source1") )) |
| 52 | + .thenCombine( |
| 53 | + CompletableTask.supplyAsync( () -> loadXml(url + "/source2") ), |
| 54 | + (xml1, xml2) -> Arrays.asList(xml1, xml2), |
| 55 | + PromiseOrigin.ALL |
| 56 | + ) . |
| 57 | + .thenApplyAsync( xmls -> parseXmlsList(xmls), true ); |
| 58 | +} |
| 59 | +``` |
| 60 | + |
| 61 | +Please note, then in release 0.5.4 there is a new default method `dependent` in interface [Promise](https://github.com/vsilaev/tascalate-concurrent/blob/master/src/main/java/net/tascalate/concurrent/Promise.java) that serves the same purpose and allows to write chained calls: |
| 62 | + |
| 63 | +```java |
| 64 | +public Promise<DataStructure> loadData(String url) { |
| 65 | + return CompletableTask |
| 66 | + .supplyAsync( () -> loadXml(url) ) |
| 67 | + .dependent() |
| 68 | + .thenApplyAsync( xml -> parseXml(xml), true ); |
| 69 | +} |
| 70 | +``` |
| 71 | + |
| 72 | +## 7. Polling and asynchronous retry functionality |
| 73 | +Once you departure from the pure algebraic calculations to the unreliable terrain of the I/O-related functionality you have to deal with failures. Network outage, insuffcient disk space, overloaded third-party servers, exhausted database connection pools - these and many similar infrastructure failures is what application have to cope with flawlessly. And many of the aforementioned issues are temporal by the nature, so it makes sense to re-try after small delay and keep fingers crossed that this time everything will run smoothly. So this is the primary use-case for the retry functionality, or better yet -- asynchronous retry functionality, while all we want our applications be as scalable as possible. |
| 74 | + |
| 75 | +Another related area is polling functionality - unlike infrastructure failures these are sporadic, polling is built-in in certain asynchronous protocol communications. Say, an application sends an HTTP request to generate a report and waits for the known file on FTP server. There is no "asynchronous reply" expected from the third-party server, and the application has to `poll` periodically till the file will be available. |
| 76 | + |
| 77 | +Both use-case are fully supported by the Tascalate Concurrent library. The library provides an API that is both unobtrusive and rich for a wide range of tasks. The following `retry*` methods are available in the `Promises` class: |
| 78 | + |
| 79 | +Provided by utility class Promises but stands on its own |
| 80 | +```java |
| 81 | +static Promise<Void> retry(Runnable codeBlock, Executor executor, |
| 82 | + RetryPolicy<? super Void> retryPolicy); |
| 83 | +static Promise<Void> retry(RetryRunnable codeBlock, Executor executor, |
| 84 | + RetryPolicy<? super Void> retryPolicy); |
| 85 | + |
| 86 | +static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor, |
| 87 | + RetryPolicy<? super T> retryPolicy); |
| 88 | +static <T> Promise<T> retry(RetryCallable<T, T> codeBlock, Executor executor, |
| 89 | + RetryPolicy<? super T> retryPolicy); |
| 90 | + |
| 91 | +static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor, |
| 92 | + RetryPolicy<? super T> retryPolicy); |
| 93 | +static <T> Promise<T> retryOptional(RetryCallable<Optional<T>, T> codeBlock, Executor executor, |
| 94 | + RetryPolicy<? super T> retryPolicy); |
| 95 | + |
| 96 | +static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>> invoker, |
| 97 | + RetryPolicy<? super T> retryPolicy); |
| 98 | +static <T> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, T> invoker, |
| 99 | + RetryPolicy<? super T> retryPolicy); |
| 100 | +``` |
| 101 | +All the methods from `retry` family share the same pattern. First, there is a block of code that is executed per every attempt. It could be either a full block of the asynchronous code (`retry` and `retryOptional`) or a method that returns a CompletionStage<T> from third-party API like Async Http library (`retryFuture`). Next, if we retry custom code block, then it's necessary to provide an `Executor` it should be run on. For `retryFuture` there is no explicit `Executor`, and it's up to the third-party library to provide scalable and robust `Executor` as a default asynchronous executor of the returned `CompletionStage`. Finally, `RetryPolicy` should be specified that provides a lot of customization options: |
| 102 | +1. How much attempts should be made? |
| 103 | +2. What is a time interval between attempts? Should it be fixed or dynamic? |
| 104 | +3. What is a timeout before a single attempted is considered "hanged"? Should it be dynamic? |
| 105 | +4. What exceptions are re-trieable and what are not? |
| 106 | +5. What result is expected to be valid? Is a `null` result valid? Is any non-`null` result valid or some returned object properties should be examined? |
| 107 | + |
| 108 | +All in all, `RetryPolicy` is provides an API with endless customizations per every imaginable use-case. |
| 109 | +But before discussing it, it's necessary to explain a difference in each pair of methods. Why there are overloads with `Runnable` vs `RetryRunnable` and `Callable` vs `RetryCallable`? The reason is the following: |
| 110 | +1. Contextless retriable operations are captured as `Runnable` or `Callable` lambdas - they behaves the same for every iteration, and hence do not need a context. |
| 111 | +2. Methods with `RetryRunnable` and `RetryCallable` are contextual and may dynamically alter their behavior for the given iteration depending on the context passed. The `RetryContext` provides provides all necessary iteration-specific information. |
0 commit comments