Skip to content

Commit

Permalink
[CCR] Make shard follow tasks more resilient for restarts
Browse files Browse the repository at this point in the history
If a running shard follow task needs to be restarted and
the remote connection seeds have changed then
a shard follow task currently fails with a fatal error.

The change creates the remote client lazily and adjusts
the errors a shard follow task should retry.

This issue was found in test failures in the recently added
ccr rolling upgrade tests. The reason why this issue occurs
more frequently in the rolling upgrade test is because ccr
is setup in local mode (so remote connection seed will become stale) and
all nodes are restarted, which forces the shard follow tasks to get
restarted at some point during the test. Note that these tests
cannot be enabled yet, because this change will need to be backported
to 6.x first. (otherwise the issue still occurs on non upgraded nodes)

I also changed the RestartIndexFollowingIT to setup remote cluster
via persistent settings and to also restart the leader cluster. This
way what happens during the ccr rolling upgrade qa tests, also happens
in this test.

Relates to elastic#37231
  • Loading branch information
martijnvg committed Jan 8, 2019
1 parent d6608ca commit 7299c93
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;

Expand Down Expand Up @@ -448,7 +447,10 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
return true;
}

// This is thrown when using a Client and its remote cluster alias went MIA
String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
// This is thrown when creating a Client and the remote cluster does not exist:
String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]";
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
Expand All @@ -458,11 +460,11 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges
actual instanceof ClusterBlockException || // If leader index is closed or no elected master
actual instanceof IndexClosedException || // If follow index is closed
actual instanceof NodeDisconnectedException ||
actual instanceof NodeNotConnectedException ||
actual instanceof ConnectTransportException ||
actual instanceof NodeClosedException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
(actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage()));
(actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) ||
unknownClusterMessage.equals(actual.getMessage())));
}

// These methods are protected for testing purposes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Map<String, String> headers) {
ShardFollowTask params = taskInProgress.getParams();
final Client remoteClient;
if (params.getRemoteCluster() != null) {
remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
} else {
remoteClient = wrapClient(client, params.getHeaders());
}
Client followerClient = wrapClient(client, params.getHeaders());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
try {
Expand All @@ -123,8 +117,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
Expand All @@ -140,7 +133,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
}, errorHandler));
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
}

@Override
Expand Down Expand Up @@ -181,7 +179,11 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
}
}
};
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
}

private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
Expand Down Expand Up @@ -236,7 +238,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout());
try {
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
Expand All @@ -251,6 +253,10 @@ private String getLeaderShardHistoryUUID(ShardFollowTask params) {
return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()];
}

private Client remoteClient(ShardFollowTask params) {
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
}

interface FollowerStatsInfoHandler {
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (leaderSeedAddress != null) {
if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
return new NodeConfigurationSource() {
Expand Down Expand Up @@ -247,6 +247,10 @@ protected boolean reuseClusters() {
return true;
}

protected boolean configureRemoteClusterViaNodeSettings() {
return true;
}

protected final Client leaderClient() {
return clusterGroup.leaderCluster.client();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;

Expand All @@ -24,12 +27,17 @@ protected int numberOfNodesPerCluster() {
return 1;
}

@Override
protected boolean configureRemoteClusterViaNodeSettings() {
return false;
}

public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
setupRemoteCluster();

final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
Expand Down Expand Up @@ -57,6 +65,28 @@ public void testFollowIndex() throws Exception {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs));
});

getLeaderCluster().fullRestart();
ensureLeaderGreen("index1");
// Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted:
setupRemoteCluster();

final long thirdBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < thirdBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}

assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
});
}

private void setupRemoteCluster() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getMasterNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}

}

0 comments on commit 7299c93

Please sign in to comment.