diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 777efdd654b4f..b5199d77b4134 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -374,7 +374,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { return Math.min(backOffDelay, maxRetryDelayInMillis); } - private static boolean shouldRetry(Exception e) { + static boolean shouldRetry(Exception e) { return NetworkExceptionHelper.isConnectException(e) || NetworkExceptionHelper.isCloseConnectionException(e) || TransportActions.isShardNotAvailableException(e); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index d473091f80c31..678db31ec86cc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -21,8 +22,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -162,9 +165,24 @@ interface BiLongConsumer { protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { Client followerClient = wrapClient(client, params.getHeaders()); ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); - fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), - (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo), task::markAsFailed); + logger.info("{} Starting to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); + + BiLongConsumer handler = (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo); + Consumer errorHandler = e -> { + if (shardFollowNodeTask.isStopped()) { + return; + } + + if (ShardFollowNodeTask.shouldRetry(e)) { + logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", + shardFollowNodeTask), e); + threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); + } else { + shardFollowNodeTask.markAsFailed(e); + } + }; + + fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), handler, errorHandler); } private void fetchGlobalCheckpoint( @@ -174,6 +192,11 @@ private void fetchGlobalCheckpoint( final Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); + if (indexStats == null) { + errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + return; + } + Optional filteredShardStats = Arrays.stream(indexStats.getShards()) .filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId)) .filter(shardStats -> shardStats.getShardRouting().primary()) @@ -184,7 +207,7 @@ private void fetchGlobalCheckpoint( final long maxSeqNo = seqNoStats.getMaxSeqNo(); handler.accept(globalCheckpoint, maxSeqNo); } else { - errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId)); + errorHandler.accept(new ShardNotFoundException(shardId)); } }, errorHandler)); }