Skip to content

Commit 86af1ce

Browse files
committed
[CCR] Retry when no index shard stats can be found (elastic#34852)
Index shard stats for the follower shard are fetched, when a shard follow task is started. This is needed in order to bootstap the shard follow task with the follower global checkpoint. Sometimes index shard stats are not available (e.g. during a restart) and we fail now, while it is very likely that these stats will be available some time later.
1 parent 7ec6bc1 commit 86af1ce

File tree

4 files changed

+162
-83
lines changed

4 files changed

+162
-83
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ private void fetchFollowerShardInfo(
205205
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
206206
IndexStats indexStats = r.getIndex(shardId.getIndexName());
207207
if (indexStats == null) {
208-
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
208+
IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
209+
if (indexMetaData != null) {
210+
errorHandler.accept(new ShardNotFoundException(shardId));
211+
} else {
212+
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
213+
}
209214
return;
210215
}
211216

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

+94
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
1010
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
11+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
12+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
13+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1114
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1215
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1316
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -24,9 +27,11 @@
2427
import org.elasticsearch.common.Priority;
2528
import org.elasticsearch.common.Strings;
2629
import org.elasticsearch.common.UUIDs;
30+
import org.elasticsearch.common.bytes.BytesReference;
2731
import org.elasticsearch.common.network.NetworkModule;
2832
import org.elasticsearch.common.settings.Settings;
2933
import org.elasticsearch.common.unit.TimeValue;
34+
import org.elasticsearch.common.xcontent.XContentBuilder;
3035
import org.elasticsearch.core.internal.io.IOUtils;
3136
import org.elasticsearch.env.NodeEnvironment;
3237
import org.elasticsearch.index.Index;
@@ -35,6 +40,7 @@
3540
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3641
import org.elasticsearch.plugins.Plugin;
3742
import org.elasticsearch.script.ScriptService;
43+
import org.elasticsearch.tasks.TaskInfo;
3844
import org.elasticsearch.test.ESIntegTestCase;
3945
import org.elasticsearch.test.ESTestCase;
4046
import org.elasticsearch.test.InternalTestCluster;
@@ -47,6 +53,9 @@
4753
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
4854
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
4955
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
56+
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
57+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
58+
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
5059
import org.junit.After;
5160
import org.junit.AfterClass;
5261
import org.junit.Before;
@@ -58,14 +67,17 @@
5867
import java.util.Collection;
5968
import java.util.Collections;
6069
import java.util.Locale;
70+
import java.util.Map;
6171
import java.util.concurrent.CountDownLatch;
6272
import java.util.concurrent.TimeUnit;
6373
import java.util.function.Function;
6474

75+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
6576
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
6677
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
6778
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6879
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
80+
import static org.hamcrest.Matchers.empty;
6981
import static org.hamcrest.Matchers.equalTo;
7082
import static org.hamcrest.Matchers.lessThanOrEqualTo;
7183

@@ -284,6 +296,88 @@ protected void ensureEmptyWriteBuffers() throws Exception {
284296
});
285297
}
286298

299+
protected void pauseFollow(String... indices) throws Exception {
300+
for (String index : indices) {
301+
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
302+
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
303+
}
304+
ensureNoCcrTasks();
305+
}
306+
307+
protected void ensureNoCcrTasks() throws Exception {
308+
assertBusy(() -> {
309+
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
310+
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
311+
assertThat(tasks.tasks(), empty());
312+
313+
ListTasksRequest listTasksRequest = new ListTasksRequest();
314+
listTasksRequest.setDetailed(true);
315+
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
316+
int numNodeTasks = 0;
317+
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
318+
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
319+
numNodeTasks++;
320+
}
321+
}
322+
assertThat(numNodeTasks, equalTo(0));
323+
}, 30, TimeUnit.SECONDS);
324+
}
325+
326+
protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
327+
final Map<String, String> additionalIndexSettings) throws IOException {
328+
final String settings;
329+
try (XContentBuilder builder = jsonBuilder()) {
330+
builder.startObject();
331+
{
332+
builder.startObject("settings");
333+
{
334+
builder.field("index.number_of_shards", numberOfShards);
335+
builder.field("index.number_of_replicas", numberOfReplicas);
336+
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
337+
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
338+
}
339+
}
340+
builder.endObject();
341+
builder.startObject("mappings");
342+
{
343+
builder.startObject("doc");
344+
{
345+
builder.startObject("properties");
346+
{
347+
builder.startObject("f");
348+
{
349+
builder.field("type", "integer");
350+
}
351+
builder.endObject();
352+
}
353+
builder.endObject();
354+
}
355+
builder.endObject();
356+
}
357+
builder.endObject();
358+
}
359+
builder.endObject();
360+
settings = BytesReference.bytes(builder).utf8ToString();
361+
}
362+
return settings;
363+
}
364+
365+
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
366+
PutFollowAction.Request request = new PutFollowAction.Request();
367+
request.setRemoteCluster("leader_cluster");
368+
request.setLeaderIndex(leaderIndex);
369+
request.setFollowRequest(resumeFollow(followerIndex));
370+
return request;
371+
}
372+
373+
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
374+
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
375+
request.setFollowerIndex(followerIndex);
376+
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
377+
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
378+
return request;
379+
}
380+
287381
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
288382
CountDownLatch latch = new CountDownLatch(1);
289383
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

-82
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.apache.lucene.store.AlreadyClosedException;
1010
import org.elasticsearch.ElasticsearchException;
11-
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
1211
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1312
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1413
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -757,33 +756,6 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
757756
};
758757
}
759758

760-
private void pauseFollow(String... indices) throws Exception {
761-
for (String index : indices) {
762-
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
763-
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
764-
}
765-
ensureNoCcrTasks();
766-
}
767-
768-
private void ensureNoCcrTasks() throws Exception {
769-
assertBusy(() -> {
770-
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
771-
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
772-
assertThat(tasks.tasks(), empty());
773-
774-
ListTasksRequest listTasksRequest = new ListTasksRequest();
775-
listTasksRequest.setDetailed(true);
776-
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
777-
int numNodeTasks = 0;
778-
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
779-
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
780-
numNodeTasks++;
781-
}
782-
}
783-
assertThat(numNodeTasks, equalTo(0));
784-
}, 30, TimeUnit.SECONDS);
785-
}
786-
787759
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
788760
return () -> {
789761
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
@@ -793,45 +765,6 @@ private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int valu
793765
};
794766
}
795767

796-
private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
797-
final Map<String, String> additionalIndexSettings) throws IOException {
798-
final String settings;
799-
try (XContentBuilder builder = jsonBuilder()) {
800-
builder.startObject();
801-
{
802-
builder.startObject("settings");
803-
{
804-
builder.field("index.number_of_shards", numberOfShards);
805-
builder.field("index.number_of_replicas", numberOfReplicas);
806-
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
807-
builder.field(additionalSetting.getKey(), additionalSetting.getValue());
808-
}
809-
}
810-
builder.endObject();
811-
builder.startObject("mappings");
812-
{
813-
builder.startObject("doc");
814-
{
815-
builder.startObject("properties");
816-
{
817-
builder.startObject("f");
818-
{
819-
builder.field("type", "integer");
820-
}
821-
builder.endObject();
822-
}
823-
builder.endObject();
824-
}
825-
builder.endObject();
826-
}
827-
builder.endObject();
828-
}
829-
builder.endObject();
830-
settings = BytesReference.bytes(builder).utf8ToString();
831-
}
832-
return settings;
833-
}
834-
835768
private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
836769
final Map<String, String> additionalIndexSettings) throws IOException {
837770
final String settings;
@@ -969,19 +902,4 @@ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numbe
969902
});
970903
}
971904

972-
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
973-
PutFollowAction.Request request = new PutFollowAction.Request();
974-
request.setRemoteCluster("leader_cluster");
975-
request.setLeaderIndex(leaderIndex);
976-
request.setFollowRequest(resumeFollow(followerIndex));
977-
return request;
978-
}
979-
980-
public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
981-
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
982-
request.setFollowerIndex(followerIndex);
983-
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
984-
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
985-
return request;
986-
}
987905
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.common.xcontent.XContentType;
10+
import org.elasticsearch.index.IndexSettings;
11+
import org.elasticsearch.xpack.CcrIntegTestCase;
12+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
13+
14+
import java.util.Locale;
15+
16+
import static java.util.Collections.singletonMap;
17+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
18+
import static org.hamcrest.Matchers.equalTo;
19+
20+
public class RestartIndexFollowingIT extends CcrIntegTestCase {
21+
22+
@Override
23+
protected int numberOfNodesPerCluster() {
24+
return 1;
25+
}
26+
27+
public void testFollowIndex() throws Exception {
28+
final String leaderIndexSettings = getIndexSettings(1, 0,
29+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
30+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
31+
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
32+
ensureLeaderGreen("index1");
33+
34+
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
35+
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
36+
37+
final long firstBatchNumDocs = randomIntBetween(2, 64);
38+
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
39+
for (int i = 0; i < firstBatchNumDocs; i++) {
40+
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
41+
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
42+
}
43+
44+
assertBusy(() -> {
45+
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs));
46+
});
47+
48+
getFollowerCluster().fullRestart();
49+
ensureFollowerGreen("index2");
50+
51+
final long secondBatchNumDocs = randomIntBetween(2, 64);
52+
for (int i = 0; i < secondBatchNumDocs; i++) {
53+
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
54+
}
55+
56+
assertBusy(() -> {
57+
assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits,
58+
equalTo(firstBatchNumDocs + secondBatchNumDocs));
59+
});
60+
}
61+
62+
}

0 commit comments

Comments
 (0)