diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 472cb04936d64..e6a86d47f55c0 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -420,6 +420,12 @@ private void removeShards(final ClusterState state) { // state may result in a new shard being initialized while having the same allocation id as the currently started shard. logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting); indexService.removeShard(shardId.id(), "removing shard (stale copy)"); + } else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) { + assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause + // this can happen when cluster state batching batches activation of the shard, closing an index, reopening it + // and assigning an initializing primary to this node + logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting); + indexService.removeShard(shardId.id(), "removing shard (stale copy)"); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 59ede535c2f39..60053748d68c9 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -27,10 +27,12 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable.Builder; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -44,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -93,7 +96,8 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build(); + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm) + .build(); RoutingTable.Builder routing = new RoutingTable.Builder(); routing.addAsNew(indexMetaData); @@ -138,12 +142,19 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, unassignedInfo)); } + final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + + IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData); + indexMetaDataBuilder.putInSyncAllocationIds(0, + indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId) + .collect(Collectors.toSet()) + ); ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); state.nodes(discoBuilder); - state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded()); + state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded()); state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex()) - .addIndexShard(indexShardRoutingBuilder.build())).build()); + .addIndexShard(indexShardRoutingTable)).build()); return state.build(); } @@ -272,21 +283,21 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()); return state.build(); } - - + + /** * Creates cluster state with several indexes, shards and replicas and all shards STARTED. */ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) { - int numberOfDataNodes = numberOfReplicas + 1; + int numberOfDataNodes = numberOfReplicas + 1; DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (int i = 0; i < numberOfDataNodes + 1; i++) { final DiscoveryNode node = newNode(i); discoBuilder = discoBuilder.add(node); } discoBuilder.localNodeId(newNode(0).getId()); - discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId()); + discoBuilder.masterNodeId(newNode(numberOfDataNodes + 1).getId()); ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); state.nodes(discoBuilder); Builder routingTableBuilder = RoutingTable.builder(); @@ -316,7 +327,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice state.metaData(metadataBuilder); state.routingTable(routingTableBuilder.build()); return state.build(); - } + } /** * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 5c6b000f7e519..5e9ba653d40e6 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -74,6 +74,10 @@ public void injectRandomFailures() { enableRandomFailures = randomBoolean(); } + protected void disalbeRandomFailures() { + enableRandomFailures = false; + } + protected void failRandomly() { if (enableRandomFailures && rarely()) { throw new RuntimeException("dummy test failure"); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 5611421594aa1..ae9b7ea06a418 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.threadpool.TestThreadPool; @@ -75,6 +76,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -97,7 +99,6 @@ public void tearDown() throws Exception { terminate(threadPool); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32308") public void testRandomClusterStateUpdates() { // we have an IndicesClusterStateService per node in the cluster final Map clusterStateServiceMap = new HashMap<>(); @@ -199,6 +200,59 @@ public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() { } } + /** + * In rare cases it is possible that a nodes gets an instruction to replace a replica + * shard that's in POST_RECOVERY with a new initializing primary with the same allocation id. + * This can happen by batching cluster states that include the starting of the replica, with + * closing of the indices, opening it up again and allocating the primary shard to the node in + * question. The node should then clean it's initializing replica and replace it with a new + * initializing primary. + */ + public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() { + disalbeRandomFailures(); + String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT); + ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(), + ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING); + + // the initial state which is derived from the newly created cluster state but doesn't contain the index + ClusterState previousState = ClusterState.builder(state) + .metaData(MetaData.builder(state.metaData()).remove(index)) + .routingTable(RoutingTable.builder().build()) + .build(); + + // pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it + final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0); + final ShardId shardId = shardRouting.shardId(); + DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId()); + + // simulate the cluster state change on the node + ClusterState localState = adaptClusterStateToLocalNode(state, node); + ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); + IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new); + indicesCSSvc.start(); + indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState)); + previousState = state; + + // start the replica + state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards()); + + // close the index and open it up again (this will sometimes swap roles between primary and replica) + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.closeIndices(state, closeIndexRequest); + OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName()); + state = cluster.openIndices(state, openIndexRequest); + + localState = adaptClusterStateToLocalNode(state, node); + previousLocalState = adaptClusterStateToLocalNode(previousState, node); + + indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState)); + + final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId); + assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(), + equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId))); + + } + public ClusterState randomInitialClusterState(Map clusterStateServiceMap, Supplier indicesServiceSupplier) { List allNodes = new ArrayList<>();