diff --git a/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java index cdc35639e..f74fb8981 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java @@ -38,6 +38,7 @@ public class DefaultTransportOptions implements TransportOptions { private final Map parameters; private final Function, Boolean> onWarnings; private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; public static final DefaultTransportOptions EMPTY = new DefaultTransportOptions(); @@ -49,10 +50,12 @@ public DefaultTransportOptions( @Nullable HeaderMap headers, @Nullable Map parameters, @Nullable Function, Boolean> onWarnings, - boolean keepResponseBodyOnException + boolean keepResponseBodyOnException, + BackoffPolicy backoffPolicy ) { this(headers,parameters,onWarnings); this.keepResponseBodyOnException = keepResponseBodyOnException; + this.backoffPolicy = backoffPolicy; } public DefaultTransportOptions( @@ -65,10 +68,11 @@ public DefaultTransportOptions( Collections.emptyMap() : Collections.unmodifiableMap(parameters); this.onWarnings = onWarnings; this.keepResponseBodyOnException = false; + this.backoffPolicy = BackoffPolicy.noBackoff(); } protected DefaultTransportOptions(AbstractBuilder builder) { - this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException); + this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException, builder.backoffPolicy); } public static DefaultTransportOptions of(@Nullable TransportOptions options) { @@ -105,6 +109,11 @@ public boolean keepResponseBodyOnException() { return keepResponseBodyOnException; } + @Override + public BackoffPolicy backoffPolicy() { + return backoffPolicy; + } + @Override public Builder toBuilder() { return new Builder(this); @@ -129,6 +138,7 @@ public abstract static class AbstractBuilder parameters; private Function, Boolean> onWarnings; private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; public AbstractBuilder() { } @@ -138,6 +148,7 @@ public AbstractBuilder(DefaultTransportOptions options) { this.parameters = copyOrNull(options.parameters); this.onWarnings = options.onWarnings; this.keepResponseBodyOnException = options.keepResponseBodyOnException; + this.backoffPolicy = options.backoffPolicy; } protected abstract BuilderT self(); @@ -148,6 +159,12 @@ public BuilderT keepResponseBodyOnException(boolean value) { return self(); } + @Override + public BuilderT backoffPolicy(BackoffPolicy policy) { + this.backoffPolicy = policy; + return self(); + } + @Override public BuilderT addHeader(String name, String value) { if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java index 26237501e..a9428c5d2 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java +++ b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java @@ -34,6 +34,7 @@ import co.elastic.clients.transport.instrumentation.Instrumentation; import co.elastic.clients.transport.instrumentation.NoopInstrumentation; import co.elastic.clients.transport.instrumentation.OpenTelemetryForElasticsearch; +import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient; import co.elastic.clients.util.ApiTypeHelper; import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.ByteArrayBinaryData; @@ -100,9 +101,15 @@ public ElasticsearchTransportBase( @Nullable Instrumentation instrumentation ) { this.mapper = jsonpMapper; - this.httpClient = httpClient; this.transportOptions = httpClient.createOptions(options); + if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){ + this.httpClient = new RetryRestClientHttpClient(httpClient,this.transportOptions.backoffPolicy()); + } + else { + this.httpClient = httpClient; + } + // If no instrumentation is provided, fallback to OpenTelemetry and ultimately noop if (instrumentation == null) { instrumentation = OpenTelemetryForElasticsearch.getDefault(); diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java index 9cbbdd40d..904e4ad20 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java @@ -47,6 +47,8 @@ public interface TransportOptions { Builder toBuilder(); + BackoffPolicy backoffPolicy(); + default TransportOptions with(Consumer fn) { Builder builder = toBuilder(); fn.accept(builder); @@ -73,5 +75,7 @@ interface Builder extends ObjectBuilder { * streamed by the http library. */ Builder keepResponseBodyOnException(boolean value); + + Builder backoffPolicy(BackoffPolicy policy); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java new file mode 100644 index 000000000..7f0e378c1 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java @@ -0,0 +1,26 @@ +package co.elastic.clients.transport.rest_client; + +import org.elasticsearch.client.Cancellable; + +import java.util.concurrent.CompletableFuture; + +/** + * The {@code Future} implementation returned by async requests. + * It wraps the RestClient's cancellable and propagates cancellation. + */ +public class RequestFuture extends CompletableFuture { + private volatile Cancellable cancellable; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && cancellable != null) { + cancellable.cancel(); + } + return cancelled; + } + + public void setCancellable(Cancellable cancellable) { + this.cancellable = cancellable; + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java index 1bcc06b05..63e25f53f 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java @@ -49,23 +49,6 @@ public class RestClientHttpClient implements TransportHttpClient { private static final ConcurrentHashMap ContentTypeCache = new ConcurrentHashMap<>(); - /** - * The {@code Future} implementation returned by async requests. - * It wraps the RestClient's cancellable and propagates cancellation. - */ - private static class RequestFuture extends CompletableFuture { - private volatile Cancellable cancellable; - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = super.cancel(mayInterruptIfRunning); - if (cancelled && cancellable != null) { - cancellable.cancel(); - } - return cancelled; - } - } - private final RestClient restClient; public RestClientHttpClient(RestClient restClient) { @@ -110,7 +93,7 @@ public CompletableFuture performRequestAsync( return future; } - future.cancellable = restClient.performRequestAsync(restRequest, new ResponseListener() { + future.setCancellable(restClient.performRequestAsync(restRequest, new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { future.complete(new RestResponse(response)); @@ -120,7 +103,7 @@ public void onSuccess(org.elasticsearch.client.Response response) { public void onFailure(Exception exception) { future.completeExceptionally(exception); } - }); + })); return future; } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java index 842a45c62..2f7199b9a 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java @@ -19,6 +19,7 @@ package co.elastic.clients.transport.rest_client; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.http.HeaderMap; @@ -44,6 +45,8 @@ public class RestClientOptions implements TransportOptions { boolean keepResponseBodyOnException; + BackoffPolicy backoffPolicy; + @VisibleForTesting static final String CLIENT_META_VALUE = getClientMeta(); @VisibleForTesting @@ -65,8 +68,9 @@ static RestClientOptions of(@Nullable TransportOptions options) { return builder.build(); } - public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException) { + public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException, BackoffPolicy backoffPolicy) { this.keepResponseBodyOnException = keepResponseBodyOnException; + this.backoffPolicy = backoffPolicy; this.options = addBuiltinHeaders(options.toBuilder()).build(); } @@ -107,6 +111,11 @@ public boolean keepResponseBodyOnException() { return this.keepResponseBodyOnException; } + @Override + public BackoffPolicy backoffPolicy() { + return backoffPolicy; + } + @Override public Builder toBuilder() { return new Builder(options.toBuilder()); @@ -118,6 +127,8 @@ public static class Builder implements TransportOptions.Builder { private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; + public Builder(RequestOptions.Builder builder) { this.builder = builder; } @@ -197,14 +208,20 @@ public TransportOptions.Builder keepResponseBodyOnException(boolean value) { return this; } + @Override + public TransportOptions.Builder backoffPolicy(BackoffPolicy policy) { + this.backoffPolicy = policy; + return this; + } + @Override public RestClientOptions build() { - return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException); + return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException, backoffPolicy); } } static RestClientOptions initialOptions() { - return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false); + return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false, BackoffPolicy.noBackoff()); } private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java new file mode 100644 index 000000000..5b82c9712 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java @@ -0,0 +1,95 @@ +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.transport.BackoffPolicy; +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.TransportHttpClient; +import org.elasticsearch.client.ResponseException; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +public class RetryRestClientHttpClient implements TransportHttpClient { + + private TransportHttpClient delegate; + private BackoffPolicy backoffPolicy; + + public RetryRestClientHttpClient(TransportHttpClient delegate, BackoffPolicy backoffPolicy) { + this.delegate = delegate; + this.backoffPolicy = backoffPolicy; + } + + @Override + public Response performRequest(String endpointId, @Nullable Node node, Request request, + TransportOptions options) throws IOException { + return performRequestRetry(endpointId, node, request, options, backoffPolicy.iterator()); + } + + public Response performRequestRetry(String endpointId, @Nullable Node node, Request request, + TransportOptions options, Iterator backoffIter) throws IOException { + try { + return delegate.performRequest(endpointId, node, request, options); + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? + // synchronous retry + if (backoffIter.hasNext()) { + try { + Thread.sleep(backoffIter.next()); + } catch (InterruptedException ie) { + throw e; // TODO okay with masking IE and just returning original exception? + } + System.out.println("Retrying"); + return performRequestRetry(endpointId, node, request, options, backoffIter); + } + } + // error not retryable + throw e; + } + } + + @Override + public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, + Request request, TransportOptions options) { + RequestFuture futureResult = new RequestFuture<>(); + return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator(), + futureResult); + } + + public CompletableFuture performRequestAsyncRetry(String endpointId, @Nullable Node node, + Request request, + TransportOptions options, + Iterator backoffIter, + CompletableFuture futureResult) { + CompletableFuture res = delegate.performRequestAsync(endpointId, node, request, options); + + res.whenComplete((resp, e) -> { + if (e != null) { + if (e instanceof ResponseException) { + if (((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? + if (backoffIter.hasNext()) { + try { + Thread.sleep(backoffIter.next()); + } catch (InterruptedException ie) { + // TODO okay with masking IE and just returning original exception? + futureResult.completeExceptionally(e); + } + System.out.println("Retrying"); + performRequestAsyncRetry(endpointId, node, request, options, backoffIter,futureResult); + } + } + } + } + else { + futureResult.complete(resp); + } + }); + + return futureResult; + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java b/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java index 6d1c41eb5..7c8503515 100644 --- a/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java +++ b/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java @@ -21,6 +21,7 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; @@ -69,10 +70,16 @@ public boolean keepResponseBodyOnException() { return false; } + @Override + public BackoffPolicy backoffPolicy() { + return BackoffPolicy.noBackoff(); + } + @Override public Builder toBuilder() { return null; } + }; public void setResult(Object result) { diff --git a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java index d25466bbd..cee6d2ebf 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java @@ -19,7 +19,9 @@ package co.elastic.clients.transport; +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.cat.IndicesResponse; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.http.RepeatableBodyResponse; import co.elastic.clients.transport.rest_client.RestClientOptions; @@ -40,6 +42,8 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import static co.elastic.clients.util.ContentType.APPLICATION_JSON; @@ -122,7 +126,8 @@ public void testOriginalJsonBodyRetrievalException() throws Exception { assertNotEquals(RepeatableBodyResponse.class, ex.response().getClass()); // setting transport option - RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true); + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true, + BackoffPolicy.noBackoff()); ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper(), options); @@ -139,17 +144,147 @@ public void testOriginalJsonBodyRetrievalException() throws Exception { assertEquals(200, ex.statusCode()); assertEquals(RepeatableBodyResponse.class, ex.response().getClass()); - try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()){ + try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()) { BinaryData body = repeatableResponse.body(); - StringBuilder sb = new StringBuilder(); - BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream())); - String read; - - while ((read = br.readLine()) != null) { - sb.append(read); - } - br.close(); - assertEquals("definitely not json",sb.toString()); + StringBuilder sb = new StringBuilder(); + BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream())); + String read; + + while ((read = br.readLine()) != null) { + sb.append(read); + } + br.close(); + assertEquals("definitely not json", sb.toString()); } } + + + @Test + public void testRetryClientSync() throws Exception { + HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), + 0), 0); + + // server will return success after 7 retries + AtomicInteger errorCounter = new AtomicInteger(); + + httpServer.createContext("/_cat/indices", exchange -> { + exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON)); + exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch" + )); + if (errorCounter.get() > 6) { + exchange.sendResponseHeaders(200, 0); + OutputStream out = exchange.getResponseBody(); + String jsonRes = " [{\n" + + " \"health\": \"green\",\n" + + " \"status\": \"open\",\n" + + " \"index\": \"test\",\n" + + " \"uuid\": \"3iSkOlZAQVq2ir1hOtaVlw\",\n" + + " \"pri\": \"1\",\n" + + " \"rep\": \"1\",\n" + + " \"docs.count\": \"5\",\n" + + " \"docs.deleted\": \"0\",\n" + + " \"store.size\": \"8.8kb\",\n" + + " \"pri.store.size\": \"4.4kb\",\n" + + " \"dataset.size\": \"4.4kb\"\n" + + " }]"; + out.write(jsonRes.getBytes(StandardCharsets.UTF_8)); + out.close(); + } else { + exchange.sendResponseHeaders(503, 0); + OutputStream out = exchange.getResponseBody(); + out.write("{}".getBytes(StandardCharsets.UTF_8)); + out.close(); + errorCounter.incrementAndGet(); + } + }); + + httpServer.start(); + InetSocketAddress address = httpServer.getAddress(); + + RestClient restClient = RestClient + .builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .build(); + + // setting transport option + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false, + BackoffPolicy.constantBackoff(50L, 8)); + + ElasticsearchTransport transport = new RestClientTransport( + restClient, new JacksonJsonpMapper(), options); + + ElasticsearchClient esClient = new ElasticsearchClient(transport); + + IndicesResponse res = esClient.cat().indices(); + + httpServer.stop(0); + + assertTrue(errorCounter.get() == 7); + assertEquals("test", res.valueBody().get(0).index()); + } + + @Test + public void testRetryClientAsync() throws Exception { + HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), + 0), 0); + + // server will return success after 7 retries + AtomicInteger errorCounter = new AtomicInteger(); + + httpServer.createContext("/_cat/indices", exchange -> { + exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON)); + exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch" + )); + if (errorCounter.get() > 6) { + exchange.sendResponseHeaders(200, 0); + OutputStream out = exchange.getResponseBody(); + String jsonRes = " [{\n" + + " \"health\": \"green\",\n" + + " \"status\": \"open\",\n" + + " \"index\": \"test\",\n" + + " \"uuid\": \"3iSkOlZAQVq2ir1hOtaVlw\",\n" + + " \"pri\": \"1\",\n" + + " \"rep\": \"1\",\n" + + " \"docs.count\": \"5\",\n" + + " \"docs.deleted\": \"0\",\n" + + " \"store.size\": \"8.8kb\",\n" + + " \"pri.store.size\": \"4.4kb\",\n" + + " \"dataset.size\": \"4.4kb\"\n" + + " }]"; + out.write(jsonRes.getBytes(StandardCharsets.UTF_8)); + out.close(); + } else { + exchange.sendResponseHeaders(503, 0); + OutputStream out = exchange.getResponseBody(); + out.write("{}".getBytes(StandardCharsets.UTF_8)); + out.close(); + errorCounter.incrementAndGet(); + } + }); + + httpServer.start(); + InetSocketAddress address = httpServer.getAddress(); + + RestClient restClient = RestClient + .builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .build(); + + // setting transport option + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false, + BackoffPolicy.constantBackoff(50L, 8)); + + ElasticsearchTransport transport = new RestClientTransport( + restClient, new JacksonJsonpMapper(), options); + + ElasticsearchAsyncClient esClient = new ElasticsearchAsyncClient(transport); + + CompletableFuture fut = esClient.cat().indices(); + + IndicesResponse res = fut.get(); + + httpServer.stop(0); + + assertTrue(errorCounter.get() == 7); + assertTrue(fut.isDone()); + assertEquals("test", res.valueBody().get(0).index()); + } } diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java index cf8995944..7196c9cba 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java @@ -22,6 +22,7 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BooleanResponse; @@ -192,7 +193,7 @@ void testRequestOptionsOverridingBuiltin() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options,false)); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options,false, BackoffPolicy.noBackoff())); ElasticsearchClient esClient = new ElasticsearchClient(transport); // Should not override client meta String id = checkHeaders(esClient);