Skip to content

Commit

Permalink
Retry errors when fetching follower global checkpoint. (#34019)
Browse files Browse the repository at this point in the history
Closes #34016
  • Loading branch information
martijnvg authored and kcm committed Oct 30, 2018
1 parent 777e5c7 commit 3827a2c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
return Math.min(backOffDelay, maxRetryDelayInMillis);
}

private static boolean shouldRetry(Exception e) {
static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand All @@ -21,8 +22,10 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
Expand Down Expand Up @@ -164,9 +167,24 @@ interface BiLongConsumer {
protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) {
Client followerClient = wrapClient(client, params.getHeaders());
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
(followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo), task::markAsFailed);
logger.info("{} Starting to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());

BiLongConsumer handler = (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo);
Consumer<Exception> errorHandler = e -> {
if (shardFollowNodeTask.isStopped()) {
return;
}

if (ShardFollowNodeTask.shouldRetry(e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state));
} else {
shardFollowNodeTask.markAsFailed(e);
}
};

fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), handler, errorHandler);
}

private void fetchGlobalCheckpoint(
Expand All @@ -176,6 +194,11 @@ private void fetchGlobalCheckpoint(
final Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
if (indexStats == null) {
errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
return;
}

Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId))
.filter(shardStats -> shardStats.getShardRouting().primary())
Expand All @@ -186,7 +209,7 @@ private void fetchGlobalCheckpoint(
final long maxSeqNo = seqNoStats.getMaxSeqNo();
handler.accept(globalCheckpoint, maxSeqNo);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
errorHandler.accept(new ShardNotFoundException(shardId));
}
}, errorHandler));
}
Expand Down

0 comments on commit 3827a2c

Please sign in to comment.