Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Adjust list retryable errors #33985

Merged
merged 8 commits into from
Sep 28, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
Expand Down Expand Up @@ -47,7 +54,7 @@
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private static final int DELAY_MILLIS = 50;
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
private static final Logger LOGGER = LogManager.getLogger(ShardFollowNodeTask.class);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved

private final String leaderIndex;
private final ShardFollowTask params;
Expand Down Expand Up @@ -375,9 +382,21 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
}

private static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true;
}

final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException ||
actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges
actual instanceof ClusterBlockException || // If leader index is closed or no elected master
actual instanceof IndexClosedException; // If follow index is closed
}

// These methods are protected for testing purposes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand Down Expand Up @@ -46,6 +49,9 @@
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
Expand All @@ -66,7 +72,10 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -529,6 +538,114 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {
"this setting is managed via a dedicated API"));
}

public void testCloseLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
ensureGreen("index1");

final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use atLeastDocsIndexed.


client().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1));
ElasticsearchException exception = response.getStatsResponses().get(0).status()
.fetchExceptions().entrySet().iterator().next().getValue().v2();
assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];"));
});

client().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use atLeastDocsIndexed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @martijnvg, I was wrong. We should strictly assert here instead of using at least. I think we should use the previous form.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, now I think about it that makes sense. I changed that here: 327ef72

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martijnvg.


unfollowIndex("index2");
}

public void testCloseFollowIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
ensureGreen("index1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, we will wait until the primary shard gets allocated. Thus, we do not have to call ensureGreen if the number of replicas is 0.


final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
});
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


unfollowIndex("index2");
}

public void testDeleteLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
ensureGreen("index1");

final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
ensureNoCcrTasks();
}

public void testDeleteFollowerIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
ensureGreen("index1");

final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2");
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));

client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
ensureNoCcrTasks();
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down Expand Up @@ -571,10 +688,14 @@ private void unfollowIndex(String... indices) throws Exception {
unfollowRequest.setFollowIndex(index);
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}

private void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
assertThat(tasks.tasks(), empty());

ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
Expand Down