Skip to content

Commit

Permalink
Fail demoted primary shards and retry request
Browse files Browse the repository at this point in the history
This commit handles the scenario where a replication action fails on a
replica shard, the primary shard attempts to fail the replica shard
but the primary shard is notified of demotion by the master. In this
scenario, the demoted primary shard must be failed, and then the
request rerouted again to the new primary shard.

Closes #16415, closes #14252
  • Loading branch information
jasontedor committed Feb 10, 2016
1 parent 321c463 commit 346ff04
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ protected void doRun() throws Exception {

public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
super(msg);
this(shardId, msg, null);
}

public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
setShard(shardId);
}

Expand Down Expand Up @@ -801,6 +805,7 @@ protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
* relocating copies
*/
final class ReplicationPhase extends AbstractRunnable {

private final ReplicationTask task;
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
Expand Down Expand Up @@ -982,9 +987,17 @@ public void onSuccess() {
}

@Override
public void onFailure(Throwable t) {
// TODO: handle catastrophic non-channel failures
onReplicaFailure(nodeId, exp);
public void onFailure(Throwable shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
ShardRouting primaryShard = indexShardReference.routingEntry();
String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
// we are no longer the primary, fail ourselves and start over
indexShardReference.failShard(message, shardFailedError);
forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
} else {
assert false : shardFailedError;
onReplicaFailure(nodeId, exp);
}
}
}
);
Expand Down Expand Up @@ -1070,7 +1083,7 @@ protected boolean shouldExecuteReplication(Settings settings) {

interface IndexShardReference extends Releasable {
boolean isRelocated();

void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();
}

Expand Down Expand Up @@ -1098,6 +1111,11 @@ public boolean isRelocated() {
return indexShard.state() == IndexShardState.RELOCATED;
}

@Override
public void failShard(String reason, @Nullable Throwable e) {
indexShard.failShard(reason, e);
}

@Override
public ShardRouting routingEntry() {
return indexShard.routingEntry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode);
} else {
primaryNode = selectAndRemove(unassignedNodes);
Set<String> unassignedNodesExecludingPrimary = new HashSet<>(unassignedNodes);
unassignedNodesExecludingPrimary.remove(newNode(0).id());
primaryNode = selectAndRemove(unassignedNodesExecludingPrimary);
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.either;
Expand Down Expand Up @@ -631,9 +633,11 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard
indexShardRouting.set(primaryShard);

assertIndexShardCounter(2);
// TODO: set a default timeout
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase = action.new ReplicationPhase(task,
request, new Response(), request.shardId(), createTransportChannel(listener), reference);
AtomicReference<Throwable> error = new AtomicReference<>();

TransportChannel channel = createTransportChannel(listener, error::set);
TransportReplicationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference);

assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
Expand Down Expand Up @@ -704,7 +708,8 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardRoutingEntry.getShardRouting(), routing);
failures.add(shardFailedRequest);
if (randomBoolean()) {
int ternary = randomIntBetween(0, 2);
if (ternary == 0) {
// simulate master left and test that the shard failure is retried
int numberOfRetries = randomIntBetween(1, 4);
CapturingTransport.CapturedRequest currentRequest = shardFailedRequest;
Expand All @@ -718,8 +723,19 @@ protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shard
}
// now simulate that the last retry succeeded
transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE);
} else {
} else if (ternary == 1) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId, new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), "shard-failed-test"));
// the primary should fail itself
assertShardIsFailed();
// we should see a retry on primary exception
assertNotNull(error.get());
assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class));
return;
} else if (ternary == 2) {
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
} else {
assert false;
}
}
} else {
Expand Down Expand Up @@ -882,14 +898,85 @@ public void testCounterDecrementedIfShardOperationThrowsException() throws Inter
assertPhase(task, "failed");
}

public void testReroutePhaseRetriedAfterDemotedPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
boolean localPrimary = true;
clusterService.setState(state(index, localPrimary,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) {
request.setShardId(shardId);
}
};
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();

TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
reroutePhase.run();

// reroute phase should send primary action
CapturingTransport.CapturedRequest[] primaryRequests = transport.getCapturedRequestsAndClear();
assertThat(primaryRequests.length, equalTo(1));
assertThat(primaryRequests[0].action, equalTo("testAction" + (localPrimary ? "[p]" : "")));
AtomicReference<Throwable> error = new AtomicReference<>();
TransportChannel channel = createTransportChannel(listener, error::set);

// simulate primary action
TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(maybeTask(), request, channel);
primaryPhase.run();

// primary action should send replica request
CapturingTransport.CapturedRequest[] replicaRequests = transport.getCapturedRequestsAndClear();
assertThat(replicaRequests.length, equalTo(1));
assertThat(replicaRequests[0].action, equalTo("testAction[r]"));
indexShardRouting.set(clusterService.state().getRoutingTable().shardRoutingTable(shardId).primaryShard());

// simulate replica failure
transport.handleRemoteError(replicaRequests[0].requestId, new Exception("exception"));

// the primary should request replica failure
CapturingTransport.CapturedRequest[] replicaFailures = transport.getCapturedRequestsAndClear();
assertThat(replicaFailures.length, equalTo(1));
assertThat(replicaFailures[0].action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));

// simulate demoted primary
transport.handleRemoteError(replicaFailures[0].requestId, new ShardStateAction.NoLongerPrimaryShardException(shardId, "demoted"));
assertTrue(isShardFailed.get());
assertTrue(listener.isDone());
assertNotNull(error.get());
assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class));
assertThat(error.get().getMessage(), containsString("was demoted while failing replica shard"));

// reroute phase sees the retry
transport.handleRemoteError(primaryRequests[0].requestId, error.get());

// publish a new cluster state
boolean localPrimaryOnRetry = randomBoolean();
clusterService.setState(state(index, localPrimaryOnRetry,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear();

// the request should be retried
assertThat(primaryRetry.length, equalTo(1));
assertThat(primaryRetry[0].action, equalTo("testAction" + (localPrimaryOnRetry ? "[p]" : "")));
}

private void assertIndexShardCounter(int expected) {
assertThat(count.get(), equalTo(expected));
}

private void assertShardIsFailed() {
assertTrue(isShardFailed.get());
}

private final AtomicInteger count = new AtomicInteger(0);

private final AtomicBoolean isRelocated = new AtomicBoolean(false);

private final AtomicBoolean isShardFailed = new AtomicBoolean();

private final AtomicReference<ShardRouting> indexShardRouting = new AtomicReference<>();

/**
Expand All @@ -903,6 +990,11 @@ public boolean isRelocated() {
return isRelocated.get();
}

@Override
public void failShard(String reason, @Nullable Throwable e) {
isShardFailed.set(true);
}

@Override
public ShardRouting routingEntry() {
ShardRouting shardRouting = indexShardRouting.get();
Expand Down Expand Up @@ -1099,6 +1191,10 @@ protected void shardOperationOnReplica(Request shardRequest) {
* Transport channel that is needed for replica operation testing.
*/
public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener) {
return createTransportChannel(listener, error -> {});
}

public TransportChannel createTransportChannel(final PlainActionFuture<Response> listener, Consumer<Throwable> consumer) {
return new TransportChannel() {

@Override
Expand All @@ -1123,6 +1219,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op

@Override
public void sendResponse(Throwable error) throws IOException {
consumer.accept(error);
listener.onFailure(error);
}

Expand Down

0 comments on commit 346ff04

Please sign in to comment.