From 3827a2c1436e0d6ce2f57f338af54e0c5d1c97eb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 28 Sep 2018 10:34:08 +0200 Subject: [PATCH] Retry errors when fetching follower global checkpoint. (#34019) Closes #34016 --- .../xpack/ccr/action/ShardFollowNodeTask.java | 2 +- .../ccr/action/ShardFollowTasksExecutor.java | 31 ++++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 45daff9b0368e..3690db8e729b3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -376,7 +376,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { return Math.min(backOffDelay, maxRetryDelayInMillis); } - private static boolean shouldRetry(Exception e) { + static boolean shouldRetry(Exception e) { return NetworkExceptionHelper.isConnectException(e) || NetworkExceptionHelper.isCloseConnectionException(e) || TransportActions.isShardNotAvailableException(e); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index c5dab5360daeb..090b81f0101ff 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -21,8 +22,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -164,9 +167,24 @@ interface BiLongConsumer { protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { Client followerClient = wrapClient(client, params.getHeaders()); ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); - fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), - (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo), task::markAsFailed); + logger.info("{} Starting to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); + + BiLongConsumer handler = (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo); + Consumer errorHandler = e -> { + if (shardFollowNodeTask.isStopped()) { + return; + } + + if (ShardFollowNodeTask.shouldRetry(e)) { + logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", + shardFollowNodeTask), e); + threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); + } else { + shardFollowNodeTask.markAsFailed(e); + } + }; + + fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), handler, errorHandler); } private void fetchGlobalCheckpoint( @@ -176,6 +194,11 @@ private void fetchGlobalCheckpoint( final Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); + if (indexStats == null) { + errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + return; + } + Optional filteredShardStats = Arrays.stream(indexStats.getShards()) .filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId)) .filter(shardStats -> shardStats.getShardRouting().primary()) @@ -186,7 +209,7 @@ private void fetchGlobalCheckpoint( final long maxSeqNo = seqNoStats.getMaxSeqNo(); handler.accept(globalCheckpoint, maxSeqNo); } else { - errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + errorHandler.accept(new ShardNotFoundException(shardId)); } }, errorHandler)); }