Skip to content

Commit

Permalink
Closed index replica allocation
Browse files Browse the repository at this point in the history
When an index is closed, we expect primary and replicas to be identical.
This commit improves the gateway replica shard allocator to consider
shards with identical sequence numbers sync'ed for closed indices. This
ensures that we will pick a fast recovery regardless of whether synced
flush was performed prior to closing an index.

Relates elastic#41400 and elastic#33888
  • Loading branch information
henningandersen committed May 3, 2019
1 parent ec2e01a commit 8e14aba
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
Expand All @@ -49,7 +50,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -101,17 +101,16 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
final String currentSyncId;
final TransportNodesListShardStoreMetaData.StoreFilesMetaData currentStore;
if (shardStores.getData().containsKey(currentNode)) {
currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId();
currentStore = shardStores.getData().get(currentNode).storeFilesMetaData();
} else {
currentSyncId = null;
currentStore = null;
}
if (currentNode.equals(nodeWithHighestMatch) == false
&& Objects.equals(currentSyncId, primaryStore.syncId()) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
&& isSyncedRecovery(primaryStore, currentStore, isClosedIndex(shard, allocation)) == false
&& matchingNodes.isSyncedRecovery(nodeWithHighestMatch)) {
// we found a better match that can do a fast recovery, cancel current recovery
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
currentNode, nodeWithHighestMatch);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
Expand Down Expand Up @@ -315,6 +314,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
boolean explain) {
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
final boolean closedIndex = isClosedIndex(shard, allocation);
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData();
Expand All @@ -335,7 +335,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al

long matchingBytes = -1;
if (explain) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}
Expand All @@ -345,7 +345,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
}

if (matchingBytes < 0) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData, closedIndex);
}
nodesToSize.put(discoNode, matchingBytes);
if (logger.isTraceEnabled()) {
Expand All @@ -362,11 +362,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
}

private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = storeFilesMetaData.syncId();
// see if we have a sync id we can make use of
if (replicaSyncId != null && replicaSyncId.equals(primarySyncId)) {
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData,
boolean closedIndex) {
if (isSyncedRecovery(primaryStore, storeFilesMetaData, closedIndex)) {
return Long.MAX_VALUE;
} else {
long sizeMatched = 0;
Expand All @@ -380,6 +378,37 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
}
}

/**
* Is a "synced recovery", which is either sync-id match or a closed index with equivalent last commits.
*/
private static boolean isSyncedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore, boolean closedIndex) {
return syncIdMatch(primaryStore, candidateStore)
|| (closedIndex && equivalentStores(primaryStore, candidateStore));
}

private static boolean syncIdMatch(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
String primarySyncId = primaryStore.syncId();
String replicaSyncId = candidateStore.syncId();
return (replicaSyncId != null && replicaSyncId.equals(primarySyncId));
}

private static boolean equivalentStores(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
TransportNodesListShardStoreMetaData.StoreFilesMetaData candidateStore) {
SequenceNumbers.CommitInfo primarySeqNoInfo = primaryStore.seqNoInfo();
SequenceNumbers.CommitInfo candidateSeqNoInfo = candidateStore.seqNoInfo();

return primarySeqNoInfo.maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED // disregard empty or upgraded without ops
&& primarySeqNoInfo.maxSeqNo == primarySeqNoInfo.localCheckpoint
&& candidateSeqNoInfo.maxSeqNo == candidateSeqNoInfo.localCheckpoint
&& primarySeqNoInfo.maxSeqNo == candidateSeqNoInfo.maxSeqNo;
}

private boolean isClosedIndex(ShardRouting shard, RoutingAllocation allocation) {
return allocation.metaData().getIndexSafe(shard.index()).getState() == IndexMetaData.State.CLOSE;
}

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);

/**
Expand Down Expand Up @@ -418,7 +447,10 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

public boolean isNodeMatchBySyncID(DiscoveryNode node) {
/**
* Is supplied node a sync'ed recovery, either sync-id match or closed index with identical last commit.
*/
public boolean isSyncedRecovery(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
}

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,13 @@ public long getNumDocs() {
return numDocs;
}

/**
* @return sequence number info from lucene commit
*/
public SequenceNumbers.CommitInfo seqNoInfo() {
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(commitUserData.entrySet());
}

static class LoadedMetadata {
final Map<String, StoreFileMetaData> fileMetadata;
final Map<String, String> userData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.gateway.AsyncShardFetch;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
Expand Down Expand Up @@ -220,6 +221,13 @@ public String syncId() {
return metadataSnapshot.getSyncId();
}

/**
* @return sequence number info from lucene commit
*/
public SequenceNumbers.CommitInfo seqNoInfo() {
return metadataSnapshot.seqNoInfo();
}

@Override
public String toString() {
return "StoreFilesMetaData{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -47,11 +48,11 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.junit.Before;

import java.util.Arrays;
Expand All @@ -60,6 +61,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.unmodifiableMap;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -74,6 +76,8 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {

private TestAllocator testAllocator;

private AtomicLong sequenceGenerator = new AtomicLong();

@Before
public void buildTestAllocator() {
this.testAllocator = new TestAllocator();
Expand Down Expand Up @@ -145,6 +149,27 @@ public void testSyncIdMatch() {
equalTo(nodeToMatch.getId()));
}

/**
* Verifies that when a closed index is equivalent, we choose the closed index match over file match.
*/
public void testEquivalentClosedIndexMatch() {
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.EMPTY,
UnassignedInfo.Reason.CLUSTER_RECOVERED, IndexMetaData.State.CLOSE);
DiscoveryNode nodeToMatch = randomBoolean() ? node2 : node3;
DiscoveryNode nodeNotToMatch = nodeToMatch == node2 ? node3 : node2;
testAllocator
.addData(node1, "MATCH", new SequenceNumbers.CommitInfo(10, 10),
new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addData(nodeToMatch, "NO_MATCH", new SequenceNumbers.CommitInfo(10, 10),
new StoreFileMetaData("file1", 10, "NO_MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
.addData(nodeNotToMatch, "NO_MATCH", new SequenceNumbers.CommitInfo(11, 11),
new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
equalTo(nodeToMatch.getId()));
}

/**
* Verifies that when there is no sync id match but files match, we allocate it to matching node.
*/
Expand Down Expand Up @@ -298,10 +323,17 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide
}

private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) {
return onePrimaryOnNode1And1Replica(deciders, settings, reason,
randomBoolean() ? IndexMetaData.State.OPEN : IndexMetaData.State.CLOSE);
}

private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason,
IndexMetaData.State indexState) {
ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT).put(settings))
.numberOfShards(1).numberOfReplicas(1)
.state(indexState)
.putInSyncAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build();
// mark shard as delayed if reason is NODE_LEFT
Expand Down Expand Up @@ -369,6 +401,16 @@ public boolean getFetchDataCalledAndClean() {
}

public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaData... files) {
SequenceNumbers.CommitInfo seqNoInfo = null;
if (randomBoolean()) {
// generate unique sequence numbers, validating that a non-match has no effect.
long seqNo = sequenceGenerator.incrementAndGet();
seqNoInfo = new SequenceNumbers.CommitInfo(seqNo, seqNo);
}
return addData(node, syncId, seqNoInfo, files);
}

public TestAllocator addData(DiscoveryNode node, String syncId, SequenceNumbers.CommitInfo seqNoInfo, StoreFileMetaData... files) {
if (data == null) {
data = new HashMap<>();
}
Expand All @@ -379,6 +421,10 @@ public TestAllocator addData(DiscoveryNode node, String syncId, StoreFileMetaDat
Map<String, String> commitData = new HashMap<>();
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
if (seqNoInfo != null) {
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(seqNoInfo.localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoInfo.maxSeqNo));
}
}
data.put(node, new TransportNodesListShardStoreMetaData.StoreFilesMetaData(shardId,
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt())));
Expand Down

0 comments on commit 8e14aba

Please sign in to comment.