diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 3690db8e729b3..3befeb02d5f61 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -6,19 +6,26 @@ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -48,7 +55,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private static final int DELAY_MILLIS = 50; - private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + private static final Logger LOGGER = LogManager.getLogger(ShardFollowNodeTask.class); private final String leaderIndex; private final ShardFollowTask params; @@ -377,9 +384,21 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { } static boolean shouldRetry(Exception e) { - return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e) || - TransportActions.isShardNotAvailableException(e); + if (NetworkExceptionHelper.isConnectException(e)) { + return true; + } else if (NetworkExceptionHelper.isCloseConnectionException(e)) { + return true; + } + + final Throwable actual = ExceptionsHelper.unwrapCause(e); + return actual instanceof ShardNotFoundException || + actual instanceof IllegalIndexShardStateException || + actual instanceof NoShardAvailableActionException || + actual instanceof UnavailableShardsException || + actual instanceof AlreadyClosedException || + actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges + actual instanceof ClusterBlockException || // If leader index is closed or no elected master + actual instanceof IndexClosedException; // If follow index is closed } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index a3f4c54606b09..6b31f7caa9472 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ccr; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; @@ -51,6 +54,9 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; @@ -72,7 +78,10 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -542,6 +551,110 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { "this setting is managed via a dedicated API")); } + public void testCloseLeaderIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + + final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().close(new CloseIndexRequest("index1")).actionGet(); + assertBusy(() -> { + StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L)); + assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1)); + ElasticsearchException exception = response.getStatsResponses().get(0).status() + .fetchExceptions().entrySet().iterator().next().getValue().v2(); + assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];")); + }); + + client().admin().indices().open(new OpenIndexRequest("index1")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + + unfollowIndex("index2"); + } + + public void testCloseFollowIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + + final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet(); + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getStatsResponses(), hasSize(1)); + assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L)); + }); + client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L))); + + unfollowIndex("index2"); + } + + public void testDeleteLeaderIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + + final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet(); + ensureNoCcrTasks(); + } + + public void testDeleteFollowerIndex() throws Exception { + assertAcked(client().admin().indices().prepareCreate("index1") + .setSettings(Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build())); + + final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + + client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L))); + + client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); + client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get(); + ensureNoCcrTasks(); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); @@ -584,10 +697,14 @@ private void unfollowIndex(String... indices) throws Exception { unfollowRequest.setFollowIndex(index); client().execute(PauseFollowAction.INSTANCE, unfollowRequest).get(); } + ensureNoCcrTasks(); + } + + private void ensureNoCcrTasks() throws Exception { assertBusy(() -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + assertThat(tasks.tasks(), empty()); ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setDetailed(true);