Skip to content

Commit

Permalink
IndicesClusterStateService should replace an init. replica with an in…
Browse files Browse the repository at this point in the history
…it. primary with the same aId

In rare cases it is possible that a nodes gets an instruction to replace a replica
shard that's in POST_RECOVERY with a new initializing primary with the same allocation id.
This can happen by batching cluster states that include the starting of the replica, with
closing of the indices, opening it up again and allocating the primary shard to the node in
question. The node should then clean it's initializing replica and replace it with a new
initializing primary.

Closes elastic#32308
  • Loading branch information
bleskes committed Jul 25, 2018
1 parent 8067250 commit d5b68cb
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,12 @@ private void removeShards(final ClusterState state) {
// state may result in a new shard being initialized while having the same allocation id as the currently started shard.
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
} else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
// this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
// and assigning an initializing primary to this node
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable.Builder;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
Expand All @@ -44,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
Expand Down Expand Up @@ -93,7 +96,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build();
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm)
.build();

RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
Expand Down Expand Up @@ -138,12 +142,19 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState,
unassignedInfo));
}
final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();

IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData);
indexMetaDataBuilder.putInSyncAllocationIds(0,
indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId)
.collect(Collectors.toSet())
);

ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex())
.addIndexShard(indexShardRoutingBuilder.build())).build());
.addIndexShard(indexShardRoutingTable)).build());
return state.build();
}

Expand Down Expand Up @@ -272,21 +283,21 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
return state.build();
}


/**
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {

int numberOfDataNodes = numberOfReplicas + 1;
int numberOfDataNodes = numberOfReplicas + 1;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfDataNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.add(node);
}
discoBuilder.localNodeId(newNode(0).getId());
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId());
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
Builder routingTableBuilder = RoutingTable.builder();
Expand Down Expand Up @@ -316,7 +327,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
state.metaData(metadataBuilder);
state.routingTable(routingTableBuilder.build());
return state.build();
}
}

/**
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public void injectRandomFailures() {
enableRandomFailures = randomBoolean();
}

protected void disalbeRandomFailures() {
enableRandomFailures = false;
}

protected void failRandomly() {
if (enableRandomFailures && rarely()) {
throw new RuntimeException("dummy test failure");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -75,6 +76,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -97,7 +99,6 @@ public void tearDown() throws Exception {
terminate(threadPool);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32308")
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
Expand Down Expand Up @@ -199,6 +200,59 @@ public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() {
}
}

/**
* In rare cases it is possible that a nodes gets an instruction to replace a replica
* shard that's in POST_RECOVERY with a new initializing primary with the same allocation id.
* This can happen by batching cluster states that include the starting of the replica, with
* closing of the indices, opening it up again and allocating the primary shard to the node in
* question. The node should then clean it's initializing replica and replace it with a new
* initializing primary.
*/
public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {
disalbeRandomFailures();
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);

// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState previousState = ClusterState.builder(state)
.metaData(MetaData.builder(state.metaData()).remove(index))
.routingTable(RoutingTable.builder().build())
.build();

// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
final ShardId shardId = shardRouting.shardId();
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());

// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));
previousState = state;

// start the replica
state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards());

// close the index and open it up again (this will sometimes swap roles between primary and replica)
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.closeIndices(state, closeIndexRequest);
OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.openIndices(state, openIndexRequest);

localState = adaptClusterStateToLocalNode(state, node);
previousLocalState = adaptClusterStateToLocalNode(previousState, node);

indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState));

final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId);
assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(),
equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId)));

}

public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();
Expand Down

0 comments on commit d5b68cb

Please sign in to comment.