From 2aad9b04e45a4e7ec612fd9d467c4596685fe0a0 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Tue, 25 Feb 2025 16:58:23 +0100 Subject: [PATCH 1/5] added backoff policy field to transport --- .../transport/DefaultTransportOptions.java | 21 +++++++++++++++-- .../clients/transport/TransportOptions.java | 4 ++++ .../rest_client/RestClientOptions.java | 23 ++++++++++++++++--- .../documentation/DocTestsTransport.java | 7 ++++++ .../clients/transport/TransportTest.java | 2 +- .../rest_client/RestClientOptionsTest.java | 3 ++- 6 files changed, 53 insertions(+), 7 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java index cdc35639e..f74fb8981 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/DefaultTransportOptions.java @@ -38,6 +38,7 @@ public class DefaultTransportOptions implements TransportOptions { private final Map parameters; private final Function, Boolean> onWarnings; private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; public static final DefaultTransportOptions EMPTY = new DefaultTransportOptions(); @@ -49,10 +50,12 @@ public DefaultTransportOptions( @Nullable HeaderMap headers, @Nullable Map parameters, @Nullable Function, Boolean> onWarnings, - boolean keepResponseBodyOnException + boolean keepResponseBodyOnException, + BackoffPolicy backoffPolicy ) { this(headers,parameters,onWarnings); this.keepResponseBodyOnException = keepResponseBodyOnException; + this.backoffPolicy = backoffPolicy; } public DefaultTransportOptions( @@ -65,10 +68,11 @@ public DefaultTransportOptions( Collections.emptyMap() : Collections.unmodifiableMap(parameters); this.onWarnings = onWarnings; this.keepResponseBodyOnException = false; + this.backoffPolicy = BackoffPolicy.noBackoff(); } protected DefaultTransportOptions(AbstractBuilder builder) { - this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException); + this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException, builder.backoffPolicy); } public static DefaultTransportOptions of(@Nullable TransportOptions options) { @@ -105,6 +109,11 @@ public boolean keepResponseBodyOnException() { return keepResponseBodyOnException; } + @Override + public BackoffPolicy backoffPolicy() { + return backoffPolicy; + } + @Override public Builder toBuilder() { return new Builder(this); @@ -129,6 +138,7 @@ public abstract static class AbstractBuilder parameters; private Function, Boolean> onWarnings; private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; public AbstractBuilder() { } @@ -138,6 +148,7 @@ public AbstractBuilder(DefaultTransportOptions options) { this.parameters = copyOrNull(options.parameters); this.onWarnings = options.onWarnings; this.keepResponseBodyOnException = options.keepResponseBodyOnException; + this.backoffPolicy = options.backoffPolicy; } protected abstract BuilderT self(); @@ -148,6 +159,12 @@ public BuilderT keepResponseBodyOnException(boolean value) { return self(); } + @Override + public BuilderT backoffPolicy(BackoffPolicy policy) { + this.backoffPolicy = policy; + return self(); + } + @Override public BuilderT addHeader(String name, String value) { if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java index 9cbbdd40d..904e4ad20 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/TransportOptions.java @@ -47,6 +47,8 @@ public interface TransportOptions { Builder toBuilder(); + BackoffPolicy backoffPolicy(); + default TransportOptions with(Consumer fn) { Builder builder = toBuilder(); fn.accept(builder); @@ -73,5 +75,7 @@ interface Builder extends ObjectBuilder { * streamed by the http library. */ Builder keepResponseBodyOnException(boolean value); + + Builder backoffPolicy(BackoffPolicy policy); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java index 842a45c62..2f7199b9a 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java @@ -19,6 +19,7 @@ package co.elastic.clients.transport.rest_client; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.http.HeaderMap; @@ -44,6 +45,8 @@ public class RestClientOptions implements TransportOptions { boolean keepResponseBodyOnException; + BackoffPolicy backoffPolicy; + @VisibleForTesting static final String CLIENT_META_VALUE = getClientMeta(); @VisibleForTesting @@ -65,8 +68,9 @@ static RestClientOptions of(@Nullable TransportOptions options) { return builder.build(); } - public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException) { + public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException, BackoffPolicy backoffPolicy) { this.keepResponseBodyOnException = keepResponseBodyOnException; + this.backoffPolicy = backoffPolicy; this.options = addBuiltinHeaders(options.toBuilder()).build(); } @@ -107,6 +111,11 @@ public boolean keepResponseBodyOnException() { return this.keepResponseBodyOnException; } + @Override + public BackoffPolicy backoffPolicy() { + return backoffPolicy; + } + @Override public Builder toBuilder() { return new Builder(options.toBuilder()); @@ -118,6 +127,8 @@ public static class Builder implements TransportOptions.Builder { private boolean keepResponseBodyOnException; + private BackoffPolicy backoffPolicy; + public Builder(RequestOptions.Builder builder) { this.builder = builder; } @@ -197,14 +208,20 @@ public TransportOptions.Builder keepResponseBodyOnException(boolean value) { return this; } + @Override + public TransportOptions.Builder backoffPolicy(BackoffPolicy policy) { + this.backoffPolicy = policy; + return this; + } + @Override public RestClientOptions build() { - return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException); + return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException, backoffPolicy); } } static RestClientOptions initialOptions() { - return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false); + return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false, BackoffPolicy.noBackoff()); } private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) { diff --git a/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java b/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java index 6d1c41eb5..7c8503515 100644 --- a/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java +++ b/java-client/src/test/java/co/elastic/clients/documentation/DocTestsTransport.java @@ -21,6 +21,7 @@ import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.TransportOptions; @@ -69,10 +70,16 @@ public boolean keepResponseBodyOnException() { return false; } + @Override + public BackoffPolicy backoffPolicy() { + return BackoffPolicy.noBackoff(); + } + @Override public Builder toBuilder() { return null; } + }; public void setResult(Object result) { diff --git a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java index d25466bbd..b9a24187e 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java @@ -122,7 +122,7 @@ public void testOriginalJsonBodyRetrievalException() throws Exception { assertNotEquals(RepeatableBodyResponse.class, ex.response().getClass()); // setting transport option - RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true); + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true,BackoffPolicy.noBackoff()); ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper(), options); diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java index cf8995944..7196c9cba 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/RestClientOptionsTest.java @@ -22,6 +22,7 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.SimpleJsonpMapper; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BooleanResponse; @@ -192,7 +193,7 @@ void testRequestOptionsOverridingBuiltin() throws Exception { new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort(), "http") ).build(); - ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options,false)); + ElasticsearchTransport transport = newRestClientTransport(llrc, new SimpleJsonpMapper(), new RestClientOptions(options,false, BackoffPolicy.noBackoff())); ElasticsearchClient esClient = new ElasticsearchClient(transport); // Should not override client meta String id = checkHeaders(esClient); From 7843f17b0c50352a70420760b605b9e8b7d419c4 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Tue, 25 Feb 2025 18:04:19 +0100 Subject: [PATCH 2/5] base retry class --- .../transport/ElasticsearchTransportBase.java | 4 +++ .../RetryRestClientHttpClient.java | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java diff --git a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java index 26237501e..66e16ec6f 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java +++ b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java @@ -111,6 +111,10 @@ public ElasticsearchTransportBase( instrumentation = NoopInstrumentation.INSTANCE; } this.instrumentation = instrumentation; + + if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){ + + } } @Override diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java new file mode 100644 index 000000000..0abd756a8 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java @@ -0,0 +1,28 @@ +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.transport.TransportOptions; +import co.elastic.clients.transport.http.TransportHttpClient; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public class RetryRestClientHttpClient implements TransportHttpClient { + + private TransportHttpClient delegate; + + @Override + public Response performRequest(String endpointId, @Nullable Node node, Request request, TransportOptions options) throws IOException { + return delegate.performRequest(endpointId, node, request, options); + } + + @Override + public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options) { + return delegate.performRequestAsync(endpointId, node, request, options); + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} From 4eb7ed05b4aa12442504d050a571dd49b65e4a8d Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 26 Feb 2025 13:10:06 +0100 Subject: [PATCH 3/5] sync retry draft + test --- .../transport/ElasticsearchTransportBase.java | 13 ++- .../RetryRestClientHttpClient.java | 43 ++++++++- .../clients/transport/TransportTest.java | 93 ++++++++++++++++--- 3 files changed, 130 insertions(+), 19 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java index 66e16ec6f..a9428c5d2 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java +++ b/java-client/src/main/java/co/elastic/clients/transport/ElasticsearchTransportBase.java @@ -34,6 +34,7 @@ import co.elastic.clients.transport.instrumentation.Instrumentation; import co.elastic.clients.transport.instrumentation.NoopInstrumentation; import co.elastic.clients.transport.instrumentation.OpenTelemetryForElasticsearch; +import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient; import co.elastic.clients.util.ApiTypeHelper; import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.ByteArrayBinaryData; @@ -100,9 +101,15 @@ public ElasticsearchTransportBase( @Nullable Instrumentation instrumentation ) { this.mapper = jsonpMapper; - this.httpClient = httpClient; this.transportOptions = httpClient.createOptions(options); + if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){ + this.httpClient = new RetryRestClientHttpClient(httpClient,this.transportOptions.backoffPolicy()); + } + else { + this.httpClient = httpClient; + } + // If no instrumentation is provided, fallback to OpenTelemetry and ultimately noop if (instrumentation == null) { instrumentation = OpenTelemetryForElasticsearch.getDefault(); @@ -111,10 +118,6 @@ public ElasticsearchTransportBase( instrumentation = NoopInstrumentation.INSTANCE; } this.instrumentation = instrumentation; - - if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){ - - } } @Override diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java index 0abd756a8..370521a1e 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java @@ -1,23 +1,60 @@ package co.elastic.clients.transport.rest_client; +import co.elastic.clients.transport.BackoffPolicy; import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.http.TransportHttpClient; +import org.elasticsearch.client.ResponseException; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; public class RetryRestClientHttpClient implements TransportHttpClient { private TransportHttpClient delegate; + private BackoffPolicy backoffPolicy; + + public RetryRestClientHttpClient(TransportHttpClient delegate, BackoffPolicy backoffPolicy) { + this.delegate = delegate; + this.backoffPolicy = backoffPolicy; + } @Override - public Response performRequest(String endpointId, @Nullable Node node, Request request, TransportOptions options) throws IOException { - return delegate.performRequest(endpointId, node, request, options); + public Response performRequest(String endpointId, @Nullable Node node, Request request, + TransportOptions options) throws IOException { + return performRequestRetry(endpointId, node, request, options, backoffPolicy.iterator()); + } + + public Response performRequestRetry(String endpointId, @Nullable Node node, Request request, + TransportOptions options, Iterator backoffIter) throws IOException { + try { + return delegate.performRequest(endpointId, node, request, options); + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses + // synchronous retry + if (backoffIter.hasNext()) { + try { + Thread.sleep(backoffIter.next()); // TODO ... no + } catch (InterruptedException ie) { + } + System.out.println("Retrying"); + return performRequestRetry(endpointId, node, request, options, backoffIter); + } + else { + // retries finished + throw e; + } + } else { + // error not retryable + throw e; + } + } } @Override - public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options) { + public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, + Request request, TransportOptions options) { return delegate.performRequestAsync(endpointId, node, request, options); } diff --git a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java index b9a24187e..570511e01 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java @@ -20,15 +20,18 @@ package co.elastic.clients.transport; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch.cat.IndicesResponse; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.http.RepeatableBodyResponse; import co.elastic.clients.transport.rest_client.RestClientOptions; import co.elastic.clients.transport.rest_client.RestClientTransport; +import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient; import co.elastic.clients.util.BinaryData; import com.sun.net.httpserver.HttpServer; import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -39,7 +42,9 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; import static co.elastic.clients.util.ContentType.APPLICATION_JSON; @@ -122,7 +127,8 @@ public void testOriginalJsonBodyRetrievalException() throws Exception { assertNotEquals(RepeatableBodyResponse.class, ex.response().getClass()); // setting transport option - RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true,BackoffPolicy.noBackoff()); + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, true, + BackoffPolicy.noBackoff()); ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper(), options); @@ -139,17 +145,82 @@ public void testOriginalJsonBodyRetrievalException() throws Exception { assertEquals(200, ex.statusCode()); assertEquals(RepeatableBodyResponse.class, ex.response().getClass()); - try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()){ + try (RepeatableBodyResponse repeatableResponse = (RepeatableBodyResponse) ex.response()) { BinaryData body = repeatableResponse.body(); - StringBuilder sb = new StringBuilder(); - BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream())); - String read; - - while ((read = br.readLine()) != null) { - sb.append(read); - } - br.close(); - assertEquals("definitely not json",sb.toString()); + StringBuilder sb = new StringBuilder(); + BufferedReader br = new BufferedReader(new InputStreamReader(body.asInputStream())); + String read; + + while ((read = br.readLine()) != null) { + sb.append(read); + } + br.close(); + assertEquals("definitely not json", sb.toString()); } } + + + @Test + public void testRetryClientSync() throws Exception { + HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), + 0), 0); + + // server will return success after 7 retries + AtomicInteger errorCounter = new AtomicInteger(); + + httpServer.createContext("/_cat/indices", exchange -> { + exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON)); + exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch" + )); + if (errorCounter.get()>6){ + exchange.sendResponseHeaders(200, 0); + OutputStream out = exchange.getResponseBody(); + String jsonRes = " [{\n" + + " \"health\": \"green\",\n" + + " \"status\": \"open\",\n" + + " \"index\": \"test\",\n" + + " \"uuid\": \"3iSkOlZAQVq2ir1hOtaVlw\",\n" + + " \"pri\": \"1\",\n" + + " \"rep\": \"1\",\n" + + " \"docs.count\": \"5\",\n" + + " \"docs.deleted\": \"0\",\n" + + " \"store.size\": \"8.8kb\",\n" + + " \"pri.store.size\": \"4.4kb\",\n" + + " \"dataset.size\": \"4.4kb\"\n" + + " }]"; + out.write(jsonRes.getBytes(StandardCharsets.UTF_8)); + out.close(); + } + else { + exchange.sendResponseHeaders(503, 0); + OutputStream out = exchange.getResponseBody(); + out.write("{}".getBytes(StandardCharsets.UTF_8)); + out.close(); + errorCounter.incrementAndGet(); + } + }); + + httpServer.start(); + InetSocketAddress address = httpServer.getAddress(); + + RestClient restClient = RestClient + .builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .build(); + + // setting transport option + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false, + BackoffPolicy.constantBackoff(50L,8)); + + ElasticsearchTransport transport = new RestClientTransport( + restClient, new JacksonJsonpMapper(), options); + + ElasticsearchClient esClient = new ElasticsearchClient(transport); + + IndicesResponse res = esClient.cat().indices(); + + httpServer.stop(0); + + assertTrue(errorCounter.get() == 7); + assertEquals("test", res.valueBody().get(0).index()); + } } From cf02e28f31ba35311a710d7afd0d92d659ebd690 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Wed, 26 Feb 2025 16:36:32 +0100 Subject: [PATCH 4/5] async retry draft + test --- .../RetryRestClientHttpClient.java | 39 +++++++--- .../clients/transport/TransportTest.java | 78 +++++++++++++++++-- 2 files changed, 101 insertions(+), 16 deletions(-) diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java index 370521a1e..12a4a1c55 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java @@ -35,27 +35,48 @@ public Response performRequestRetry(String endpointId, @Nullable Node node, Requ // synchronous retry if (backoffIter.hasNext()) { try { - Thread.sleep(backoffIter.next()); // TODO ... no + Thread.sleep(backoffIter.next()); // TODO ... no? } catch (InterruptedException ie) { } System.out.println("Retrying"); return performRequestRetry(endpointId, node, request, options, backoffIter); } - else { - // retries finished - throw e; - } - } else { - // error not retryable - throw e; } + // error not retryable + throw e; } } @Override public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options) { - return delegate.performRequestAsync(endpointId, node, request, options); + return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator()); + } + + public CompletableFuture performRequestAsyncRetry(String endpointId, @Nullable Node node, + Request request, + TransportOptions options, + Iterator backoffIter) { + CompletableFuture fut = delegate.performRequestAsync(endpointId, node, request, options); + try { + fut.get(); // TODO is this problematic? + return fut; + } catch (Exception e) { + if (e.getCause() instanceof ResponseException) { + if (((ResponseException) e.getCause()).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses + if (backoffIter.hasNext()) { + try { + Thread.sleep(backoffIter.next()); // TODO ... no? + } catch (InterruptedException ie) { + fut.completeExceptionally(e); // TODO masking internal errors and just returning original error okay? + } + System.out.println("Retrying"); + return performRequestAsyncRetry(endpointId, node, request, options, backoffIter); + } + } + } + return fut; + } } @Override diff --git a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java index 570511e01..cee6d2ebf 100644 --- a/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java +++ b/java-client/src/test/java/co/elastic/clients/transport/TransportTest.java @@ -19,19 +19,18 @@ package co.elastic.clients.transport; +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.cat.IndicesResponse; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.http.RepeatableBodyResponse; import co.elastic.clients.transport.rest_client.RestClientOptions; import co.elastic.clients.transport.rest_client.RestClientTransport; -import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient; import co.elastic.clients.util.BinaryData; import com.sun.net.httpserver.HttpServer; import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -42,8 +41,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static co.elastic.clients.util.ContentType.APPLICATION_JSON; @@ -172,7 +171,7 @@ public void testRetryClientSync() throws Exception { exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON)); exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch" )); - if (errorCounter.get()>6){ + if (errorCounter.get() > 6) { exchange.sendResponseHeaders(200, 0); OutputStream out = exchange.getResponseBody(); String jsonRes = " [{\n" + @@ -190,8 +189,7 @@ public void testRetryClientSync() throws Exception { " }]"; out.write(jsonRes.getBytes(StandardCharsets.UTF_8)); out.close(); - } - else { + } else { exchange.sendResponseHeaders(503, 0); OutputStream out = exchange.getResponseBody(); out.write("{}".getBytes(StandardCharsets.UTF_8)); @@ -209,7 +207,7 @@ public void testRetryClientSync() throws Exception { // setting transport option RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false, - BackoffPolicy.constantBackoff(50L,8)); + BackoffPolicy.constantBackoff(50L, 8)); ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper(), options); @@ -223,4 +221,70 @@ public void testRetryClientSync() throws Exception { assertTrue(errorCounter.get() == 7); assertEquals("test", res.valueBody().get(0).index()); } + + @Test + public void testRetryClientAsync() throws Exception { + HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), + 0), 0); + + // server will return success after 7 retries + AtomicInteger errorCounter = new AtomicInteger(); + + httpServer.createContext("/_cat/indices", exchange -> { + exchange.getResponseHeaders().put("Content-Type", Collections.singletonList(APPLICATION_JSON)); + exchange.getResponseHeaders().put("X-Elastic-Product", Collections.singletonList("Elasticsearch" + )); + if (errorCounter.get() > 6) { + exchange.sendResponseHeaders(200, 0); + OutputStream out = exchange.getResponseBody(); + String jsonRes = " [{\n" + + " \"health\": \"green\",\n" + + " \"status\": \"open\",\n" + + " \"index\": \"test\",\n" + + " \"uuid\": \"3iSkOlZAQVq2ir1hOtaVlw\",\n" + + " \"pri\": \"1\",\n" + + " \"rep\": \"1\",\n" + + " \"docs.count\": \"5\",\n" + + " \"docs.deleted\": \"0\",\n" + + " \"store.size\": \"8.8kb\",\n" + + " \"pri.store.size\": \"4.4kb\",\n" + + " \"dataset.size\": \"4.4kb\"\n" + + " }]"; + out.write(jsonRes.getBytes(StandardCharsets.UTF_8)); + out.close(); + } else { + exchange.sendResponseHeaders(503, 0); + OutputStream out = exchange.getResponseBody(); + out.write("{}".getBytes(StandardCharsets.UTF_8)); + out.close(); + errorCounter.incrementAndGet(); + } + }); + + httpServer.start(); + InetSocketAddress address = httpServer.getAddress(); + + RestClient restClient = RestClient + .builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .build(); + + // setting transport option + RestClientOptions options = new RestClientOptions(RequestOptions.DEFAULT, false, + BackoffPolicy.constantBackoff(50L, 8)); + + ElasticsearchTransport transport = new RestClientTransport( + restClient, new JacksonJsonpMapper(), options); + + ElasticsearchAsyncClient esClient = new ElasticsearchAsyncClient(transport); + + CompletableFuture fut = esClient.cat().indices(); + + IndicesResponse res = fut.get(); + + httpServer.stop(0); + + assertTrue(errorCounter.get() == 7); + assertTrue(fut.isDone()); + assertEquals("test", res.valueBody().get(0).index()); + } } From 74dd40afe17f3949b8f3f5e74378c02104c6d4b7 Mon Sep 17 00:00:00 2001 From: Laura Trotta Date: Mon, 3 Mar 2025 17:57:33 +0100 Subject: [PATCH 5/5] async retry actually async --- .../transport/rest_client/RequestFuture.java | 26 ++++++++++ .../rest_client/RestClientHttpClient.java | 21 +------- .../RetryRestClientHttpClient.java | 49 +++++++++++-------- 3 files changed, 57 insertions(+), 39 deletions(-) create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java new file mode 100644 index 000000000..7f0e378c1 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RequestFuture.java @@ -0,0 +1,26 @@ +package co.elastic.clients.transport.rest_client; + +import org.elasticsearch.client.Cancellable; + +import java.util.concurrent.CompletableFuture; + +/** + * The {@code Future} implementation returned by async requests. + * It wraps the RestClient's cancellable and propagates cancellation. + */ +public class RequestFuture extends CompletableFuture { + private volatile Cancellable cancellable; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = super.cancel(mayInterruptIfRunning); + if (cancelled && cancellable != null) { + cancellable.cancel(); + } + return cancelled; + } + + public void setCancellable(Cancellable cancellable) { + this.cancellable = cancellable; + } +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java index 1bcc06b05..63e25f53f 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientHttpClient.java @@ -49,23 +49,6 @@ public class RestClientHttpClient implements TransportHttpClient { private static final ConcurrentHashMap ContentTypeCache = new ConcurrentHashMap<>(); - /** - * The {@code Future} implementation returned by async requests. - * It wraps the RestClient's cancellable and propagates cancellation. - */ - private static class RequestFuture extends CompletableFuture { - private volatile Cancellable cancellable; - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = super.cancel(mayInterruptIfRunning); - if (cancelled && cancellable != null) { - cancellable.cancel(); - } - return cancelled; - } - } - private final RestClient restClient; public RestClientHttpClient(RestClient restClient) { @@ -110,7 +93,7 @@ public CompletableFuture performRequestAsync( return future; } - future.cancellable = restClient.performRequestAsync(restRequest, new ResponseListener() { + future.setCancellable(restClient.performRequestAsync(restRequest, new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { future.complete(new RestResponse(response)); @@ -120,7 +103,7 @@ public void onSuccess(org.elasticsearch.client.Response response) { public void onFailure(Exception exception) { future.completeExceptionally(exception); } - }); + })); return future; } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java index 12a4a1c55..5b82c9712 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RetryRestClientHttpClient.java @@ -31,12 +31,13 @@ public Response performRequestRetry(String endpointId, @Nullable Node node, Requ try { return delegate.performRequest(endpointId, node, request, options); } catch (ResponseException e) { - if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses + if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? // synchronous retry if (backoffIter.hasNext()) { try { - Thread.sleep(backoffIter.next()); // TODO ... no? + Thread.sleep(backoffIter.next()); } catch (InterruptedException ie) { + throw e; // TODO okay with masking IE and just returning original exception? } System.out.println("Retrying"); return performRequestRetry(endpointId, node, request, options, backoffIter); @@ -50,33 +51,41 @@ public Response performRequestRetry(String endpointId, @Nullable Node node, Requ @Override public CompletableFuture performRequestAsync(String endpointId, @Nullable Node node, Request request, TransportOptions options) { - return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator()); + RequestFuture futureResult = new RequestFuture<>(); + return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator(), + futureResult); } public CompletableFuture performRequestAsyncRetry(String endpointId, @Nullable Node node, Request request, TransportOptions options, - Iterator backoffIter) { - CompletableFuture fut = delegate.performRequestAsync(endpointId, node, request, options); - try { - fut.get(); // TODO is this problematic? - return fut; - } catch (Exception e) { - if (e.getCause() instanceof ResponseException) { - if (((ResponseException) e.getCause()).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses - if (backoffIter.hasNext()) { - try { - Thread.sleep(backoffIter.next()); // TODO ... no? - } catch (InterruptedException ie) { - fut.completeExceptionally(e); // TODO masking internal errors and just returning original error okay? + Iterator backoffIter, + CompletableFuture futureResult) { + CompletableFuture res = delegate.performRequestAsync(endpointId, node, request, options); + + res.whenComplete((resp, e) -> { + if (e != null) { + if (e instanceof ResponseException) { + if (((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? + if (backoffIter.hasNext()) { + try { + Thread.sleep(backoffIter.next()); + } catch (InterruptedException ie) { + // TODO okay with masking IE and just returning original exception? + futureResult.completeExceptionally(e); + } + System.out.println("Retrying"); + performRequestAsyncRetry(endpointId, node, request, options, backoffIter,futureResult); } - System.out.println("Retrying"); - return performRequestAsyncRetry(endpointId, node, request, options, backoffIter); } } } - return fut; - } + else { + futureResult.complete(resp); + } + }); + + return futureResult; } @Override