From 4b19a4bd01b750ec72da7d92e52956ea0ced1b4f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Jul 2019 12:13:51 +0100 Subject: [PATCH] Create missing PRRLs after primary activation (#44009) Today peer recovery retention leases (PRRLs) are created when starting a replication group from scratch and during peer recovery. However, if the replication group was migrated from nodes running a version which does not create PRRLs (e.g. 7.3 and earlier) then it's possible that the primary was relocated or promoted without first establishing all the expected leases. It's not possible to establish these leases before or during primary activation, so we must create them as soon as possible afterwards. This gives weaker guarantees about history retention, since there's a possibility that history will be discarded before it can be used. In practice such situations are expected to occur only rarely. This commit adds the machinery to create missing leases after primary activation, and strengthens the assertions about the existence of such leases in order to ensure that once all the leases do exist we never again enter a state where there's a missing lease. Relates #41536 --- .../upgrades/FullClusterRestartIT.java | 25 +++- .../elasticsearch/upgrades/RecoveryIT.java | 75 ++++++++++++ .../index/seqno/ReplicationTracker.java | 108 +++++++++++++++--- .../elasticsearch/index/shard/IndexShard.java | 9 ++ .../PeerRecoveryRetentionLeaseCreationIT.java | 85 ++++++++++++++ .../index/seqno/RetentionLeaseUtils.java | 51 +++++++++ 6 files changed, 336 insertions(+), 17 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 88389d97c8945..42d6d2fed0954 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestGetAction; import org.elasticsearch.rest.action.document.RestIndexAction; @@ -89,7 +90,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { private String type; @Before - public void setIndex() throws IOException { + public void setIndex() { index = getTestName().toLowerCase(Locale.ROOT); } @@ -1338,4 +1339,26 @@ protected void ensureGreenLongWait(String index) throws IOException { assertEquals("green", healthRsp.get("status")); assertFalse((Boolean) healthRsp.get("timed_out")); } + + public void testPeerRecoveryRetentionLeases() throws IOException { + if (isRunningAgainstOldCluster()) { + XContentBuilder settings = jsonBuilder(); + settings.startObject(); + { + settings.startObject("settings"); + settings.field("number_of_shards", between(1, 5)); + settings.field("number_of_replicas", between(0, 2)); + settings.endObject(); + } + settings.endObject(); + + Request createIndex = new Request("PUT", "/" + index); + createIndex.setJsonEntity(Strings.toString(settings)); + client().performRequest(createIndex); + ensureGreen(index); + } else { + ensureGreen(index); + RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index); + } + } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 863172e91e7da..624b4cbec5ded 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.rest.action.document.RestUpdateAction; @@ -382,6 +383,80 @@ public void testRecoveryWithSoftDeletes() throws Exception { ensureGreen(index); } + public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception { + final String index = "recover_and_create_leases_in_promotion"; + if (CLUSTER_TYPE == ClusterType.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + createIndex(index, settings.build()); + int numDocs = randomInt(10); + indexDocs(index, 0, numDocs); + if (randomBoolean()) { + client().performRequest(new Request("POST", "/" + index + "/_flush")); + } + } + ensureGreen(index); + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + } + } + + public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception { + final String index = "recover_and_create_leases_in_relocation"; + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1)) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + createIndex(index, settings.build()); + int numDocs = randomInt(10); + indexDocs(index, 0, numDocs); + if (randomBoolean()) { + client().performRequest(new Request("POST", "/" + index + "/_flush")); + } + ensureGreen(index); + break; + + case MIXED: + // trigger a primary relocation by excluding the last old node with a shard filter + final Map nodesMap + = ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes"); + final List oldNodeNames = new ArrayList<>(); + for (Object nodeDetails : nodesMap.values()) { + final Map nodeDetailsMap = (Map) nodeDetails; + final String versionString = (String) nodeDetailsMap.get("version"); + if (versionString.equals(Version.CURRENT.toString()) == false) { + oldNodeNames.add((String) nodeDetailsMap.get("name")); + } + } + + if (oldNodeNames.size() == 1) { + final String oldNodeName = oldNodeNames.get(0); + logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName); + final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings"); + putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}"); + assertOK(client().performRequest(putSettingsRequest)); + ensureGreen(index); + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + } else { + ensureGreen(index); + } + break; + + case UPGRADED: + ensureGreen(index); + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + break; + } + } + /** * This test creates an index in the non upgraded cluster and closes it. It then checks that the index * is effectively closed and potentially replicated (if the version the index was created on supports diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index cee6a19cd5f5b..257833dbb5c05 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; @@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private long persistedRetentionLeasesVersion; + /** + * Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from + * {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version + * if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not + * yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed. + */ + private boolean hasAllPeerRecoveryRetentionLeases; + /** * Get all retention leases tracked on this shard. * @@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() { if (retentionLease == null) { /* * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't - * create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation. + * create peer recovery retention leases for every shard copy. */ assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false - || indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + || hasAllPeerRecoveryRetentionLeases == false; return false; } return retentionLease.timestamp() <= renewalTimeMillis @@ -752,7 +761,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN - && indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) { + && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { @@ -819,6 +828,7 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -913,30 +923,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { updateGlobalCheckpointOnPrimary(); if (indexSettings.isSoftDeleteEnabled()) { + addPeerRecoveryRetentionLeaseForSolePrimary(); + } + + assert invariant(); + } + + /** + * Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the + * replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done + * a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously + * by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}. + */ + private void addPeerRecoveryRetentionLeaseForSolePrimary() { + assert primaryMode; + assert Thread.holdsLock(this); + + if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) { final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { - /* - * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention - * leases for every shard copy, but in this case we do not expect any leases to exist. - */ - if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) { - // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then - // this copy must already be in-sync and active and therefore holds a retention lease for itself. - assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards(); + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { assert primaryShard.allocationId().getId().equals(shardAllocationId) - : routingTable.activeShards() + " vs " + shardAllocationId; - assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); - + : routingTable.assignedShards() + " vs " + shardAllocationId; // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication // group. + logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), PEER_RECOVERY_RETENTION_LEASE_SOURCE); + hasAllPeerRecoveryRetentionLeases = true; + } else { + /* + * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; + logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } + } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> + retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { + // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we + // don't need to do any more work. + hasAllPeerRecoveryRetentionLeases = true; } } - - assert invariant(); } /** @@ -1239,9 +1270,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex // note that if there was no cluster state update between start of the engine of this shard and the call to // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. runAfter.run(); + + if (indexSettings.isSoftDeleteEnabled()) { + addPeerRecoveryRetentionLeaseForSolePrimary(); + } + + assert invariant(); + } + + private synchronized void setHasAllPeerRecoveryRetentionLeases() { + hasAllPeerRecoveryRetentionLeases = true; assert invariant(); } + /** + * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version + * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. + */ + public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { + if (hasAllPeerRecoveryRetentionLeases == false) { + final List shardRoutings = routingTable.assignedShards(); + final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { + setHasAllPeerRecoveryRetentionLeases(); + listener.onResponse(null); + }, listener::onFailure), shardRoutings.size()); + for (ShardRouting shardRouting : shardRoutings) { + if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) { + groupedActionListener.onResponse(null); + } else { + final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); + if (checkpointState.tracked == false) { + groupedActionListener.onResponse(null); + } else { + logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting); + try { + addPeerRecoveryRetentionLease(shardRouting.currentNodeId(), + Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + } + } + } else { + logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do"); + listener.onResponse(null); + } + } + private Runnable getMasterUpdateOperationFromCurrentState() { assert primaryMode == false; final long lastAppliedClusterStateVersion = appliedClusterStateVersion; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 886a5efbe332b..8aa4d2b3c7d81 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -486,6 +486,7 @@ public void updateShardState(final ShardRouting newRouting, if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) { // the master started a recovering primary, activate primary mode. replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + ensurePeerRecoveryRetentionLeasesExist(); } } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; @@ -528,6 +529,7 @@ public void updateShardState(final ShardRouting newRouting, assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + ensurePeerRecoveryRetentionLeasesExist(); /* * If this shard was serving as a replica shard when another shard was promoted to primary then * its Lucene index was reset during the primary term transition. In particular, the Lucene index @@ -2275,6 +2277,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } + ensurePeerRecoveryRetentionLeasesExist(); + } + + private void ensurePeerRecoveryRetentionLeasesExist() { + threadPool.generic().execute(() -> replicationTracker.createMissingPeerRecoveryRetentionLeases(ActionListener.wrap( + r -> logger.trace("created missing peer recovery retention leases"), + e -> logger.debug("failed creating missing peer recovery retention leases", e)))); } /** diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java new file mode 100644 index 0000000000000..4611495e6f2db --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.VersionUtils; + +import java.nio.file.Path; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0) +public class PeerRecoveryRetentionLeaseCreationIT extends ESIntegTestCase { + + @Override + protected boolean forbidPrivateIndexSettings() { + return false; + } + + public void testCanRecoverFromStoreWithoutPeerRecoveryRetentionLease() throws Exception { + /* + * In a full cluster restart from a version without peer-recovery retention leases, the leases on disk will not include a lease for + * the local node. The same sort of thing can happen in weird situations This test ensures that a primary that is recovering from + * store creates a lease for itself. + */ + + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final Path[] nodeDataPaths = internalCluster().getInstance(NodeEnvironment.class, dataNode).nodeDataPaths(); + + assertAcked(prepareCreate("index").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_VERSION_CREATED, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT)))); + ensureGreen("index"); + + // Change the node ID so that the persisted retention lease no longer applies. + final String oldNodeId = client().admin().cluster().prepareNodesInfo(dataNode).clear().get().getNodes().get(0).getNode().getId(); + final String newNodeId = randomValueOtherThan(oldNodeId, () -> UUIDs.randomBase64UUID(random())); + + internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final NodeMetaData nodeMetaData = new NodeMetaData(newNodeId, Version.CURRENT); + NodeMetaData.FORMAT.writeAndCleanup(nodeMetaData, nodeDataPaths); + return Settings.EMPTY; + } + }); + + ensureGreen("index"); + assertThat(client().admin().cluster().prepareNodesInfo(dataNode).clear().get().getNodes().get(0).getNode().getId(), + equalTo(newNodeId)); + final RetentionLeases retentionLeases = client().admin().indices().prepareStats("index").get().getShards()[0] + .getRetentionLeaseStats().retentionLeases(); + assertTrue("expected lease for [" + newNodeId + "] in " + retentionLeases, + retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(newNodeId))); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java index 55807161d51ad..9b2eda120d8b9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java @@ -18,11 +18,27 @@ */ package org.elasticsearch.index.seqno; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.junit.Assert; + +import java.io.IOException; +import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.hasItems; + public class RetentionLeaseUtils { private RetentionLeaseUtils() { @@ -45,4 +61,39 @@ public static Map toMapExcludingPeerRecoveryRetentionLea }, LinkedHashMap::new)); } + + /** + * Asserts that every copy of every shard of the given index has a peer recovery retention lease according to the stats exposed by the + * REST API + */ + public static void assertAllCopiesHavePeerRecoveryRetentionLeases(RestClient restClient, String index) throws IOException { + final Request statsRequest = new Request("GET", "/" + index + "/_stats"); + statsRequest.addParameter("level", "shards"); + final Map shardsStats = ObjectPath.createFromResponse(restClient.performRequest(statsRequest)) + .evaluate("indices." + index + ".shards"); + for (Map.Entry shardCopiesEntry : shardsStats.entrySet()) { + final List shardCopiesList = (List) shardCopiesEntry.getValue(); + + final Set expectedLeaseIds = new HashSet<>(); + for (Object shardCopyStats : shardCopiesList) { + final String nodeId + = Objects.requireNonNull((String) ((Map) (((Map) shardCopyStats).get("routing"))).get("node")); + expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId( + ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L))); + } + + final Set actualLeaseIds = new HashSet<>(); + for (Object shardCopyStats : shardCopiesList) { + final List leases + = (List) ((Map) (((Map) shardCopyStats).get("retention_leases"))).get("leases"); + for (Object lease : leases) { + actualLeaseIds.add(Objects.requireNonNull((String) (((Map) lease).get("id")))); + } + } + Assert.assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds + + " but expected " + expectedLeaseIds, + actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0]))); + } + } }