diff --git a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java index 88389d97c8945..42d6d2fed0954 100644 --- a/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestGetAction; import org.elasticsearch.rest.action.document.RestIndexAction; @@ -89,7 +90,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { private String type; @Before - public void setIndex() throws IOException { + public void setIndex() { index = getTestName().toLowerCase(Locale.ROOT); } @@ -1338,4 +1339,26 @@ protected void ensureGreenLongWait(String index) throws IOException { assertEquals("green", healthRsp.get("status")); assertFalse((Boolean) healthRsp.get("timed_out")); } + + public void testPeerRecoveryRetentionLeases() throws IOException { + if (isRunningAgainstOldCluster()) { + XContentBuilder settings = jsonBuilder(); + settings.startObject(); + { + settings.startObject("settings"); + settings.field("number_of_shards", between(1, 5)); + settings.field("number_of_replicas", between(0, 2)); + settings.endObject(); + } + settings.endObject(); + + Request createIndex = new Request("PUT", "/" + index); + createIndex.setJsonEntity(Strings.toString(settings)); + client().performRequest(createIndex); + ensureGreen(index); + } else { + ensureGreen(index); + RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index); + } + } } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 863172e91e7da..624b4cbec5ded 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLeaseUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.document.RestIndexAction; import org.elasticsearch.rest.action.document.RestUpdateAction; @@ -382,6 +383,80 @@ public void testRecoveryWithSoftDeletes() throws Exception { ensureGreen(index); } + public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception { + final String index = "recover_and_create_leases_in_promotion"; + if (CLUSTER_TYPE == ClusterType.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + createIndex(index, settings.build()); + int numDocs = randomInt(10); + indexDocs(index, 0, numDocs); + if (randomBoolean()) { + client().performRequest(new Request("POST", "/" + index + "/_flush")); + } + } + ensureGreen(index); + if (CLUSTER_TYPE == ClusterType.UPGRADED) { + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + } + } + + public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception { + final String index = "recover_and_create_leases_in_relocation"; + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1)) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms") + .put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + createIndex(index, settings.build()); + int numDocs = randomInt(10); + indexDocs(index, 0, numDocs); + if (randomBoolean()) { + client().performRequest(new Request("POST", "/" + index + "/_flush")); + } + ensureGreen(index); + break; + + case MIXED: + // trigger a primary relocation by excluding the last old node with a shard filter + final Map nodesMap + = ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes"); + final List oldNodeNames = new ArrayList<>(); + for (Object nodeDetails : nodesMap.values()) { + final Map nodeDetailsMap = (Map) nodeDetails; + final String versionString = (String) nodeDetailsMap.get("version"); + if (versionString.equals(Version.CURRENT.toString()) == false) { + oldNodeNames.add((String) nodeDetailsMap.get("name")); + } + } + + if (oldNodeNames.size() == 1) { + final String oldNodeName = oldNodeNames.get(0); + logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName); + final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings"); + putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}"); + assertOK(client().performRequest(putSettingsRequest)); + ensureGreen(index); + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + } else { + ensureGreen(index); + } + break; + + case UPGRADED: + ensureGreen(index); + assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index)); + break; + } + } + /** * This test creates an index in the non upgraded cluster and closes it. It then checks that the index * is effectively closed and potentially replicated (if the version the index was created on supports diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index cee6a19cd5f5b..257833dbb5c05 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; @@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private long persistedRetentionLeasesVersion; + /** + * Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from + * {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version + * if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not + * yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed. + */ + private boolean hasAllPeerRecoveryRetentionLeases; + /** * Get all retention leases tracked on this shard. * @@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() { if (retentionLease == null) { /* * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't - * create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation. + * create peer recovery retention leases for every shard copy. */ assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false - || indexSettings.getIndexVersionCreated().before(Version.V_7_4_0); + || hasAllPeerRecoveryRetentionLeases == false; return false; } return retentionLease.timestamp() <= renewalTimeMillis @@ -752,7 +761,7 @@ private boolean invariant() { if (primaryMode && indexSettings.isSoftDeleteEnabled() && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN - && indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) { + && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { @@ -819,6 +828,7 @@ public ReplicationTracker( this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -913,30 +923,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) { updateGlobalCheckpointOnPrimary(); if (indexSettings.isSoftDeleteEnabled()) { + addPeerRecoveryRetentionLeaseForSolePrimary(); + } + + assert invariant(); + } + + /** + * Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the + * replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done + * a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously + * by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}. + */ + private void addPeerRecoveryRetentionLeaseForSolePrimary() { + assert primaryMode; + assert Thread.holdsLock(this); + + if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) { final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { - /* - * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention - * leases for every shard copy, but in this case we do not expect any leases to exist. - */ - if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) { - // We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then - // this copy must already be in-sync and active and therefore holds a retention lease for itself. - assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards(); + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { assert primaryShard.allocationId().getId().equals(shardAllocationId) - : routingTable.activeShards() + " vs " + shardAllocationId; - assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); - + : routingTable.assignedShards() + " vs " + shardAllocationId; // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication // group. + logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), PEER_RECOVERY_RETENTION_LEASE_SOURCE); + hasAllPeerRecoveryRetentionLeases = true; + } else { + /* + * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; + logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } + } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> + retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { + // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we + // don't need to do any more work. + hasAllPeerRecoveryRetentionLeases = true; } } - - assert invariant(); } /** @@ -1239,9 +1270,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex // note that if there was no cluster state update between start of the engine of this shard and the call to // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. runAfter.run(); + + if (indexSettings.isSoftDeleteEnabled()) { + addPeerRecoveryRetentionLeaseForSolePrimary(); + } + + assert invariant(); + } + + private synchronized void setHasAllPeerRecoveryRetentionLeases() { + hasAllPeerRecoveryRetentionLeases = true; assert invariant(); } + /** + * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version + * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. + */ + public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { + if (hasAllPeerRecoveryRetentionLeases == false) { + final List shardRoutings = routingTable.assignedShards(); + final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { + setHasAllPeerRecoveryRetentionLeases(); + listener.onResponse(null); + }, listener::onFailure), shardRoutings.size()); + for (ShardRouting shardRouting : shardRoutings) { + if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) { + groupedActionListener.onResponse(null); + } else { + final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); + if (checkpointState.tracked == false) { + groupedActionListener.onResponse(null); + } else { + logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting); + try { + addPeerRecoveryRetentionLease(shardRouting.currentNodeId(), + Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + } + } + } else { + logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do"); + listener.onResponse(null); + } + } + private Runnable getMasterUpdateOperationFromCurrentState() { assert primaryMode == false; final long lastAppliedClusterStateVersion = appliedClusterStateVersion; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 886a5efbe332b..8aa4d2b3c7d81 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -486,6 +486,7 @@ public void updateShardState(final ShardRouting newRouting, if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) { // the master started a recovering primary, activate primary mode. replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + ensurePeerRecoveryRetentionLeasesExist(); } } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; @@ -528,6 +529,7 @@ public void updateShardState(final ShardRouting newRouting, assert getOperationPrimaryTerm() == newPrimaryTerm; try { replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + ensurePeerRecoveryRetentionLeasesExist(); /* * If this shard was serving as a replica shard when another shard was promoted to primary then * its Lucene index was reset during the primary term transition. In particular, the Lucene index @@ -2275,6 +2277,13 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } + ensurePeerRecoveryRetentionLeasesExist(); + } + + private void ensurePeerRecoveryRetentionLeasesExist() { + threadPool.generic().execute(() -> replicationTracker.createMissingPeerRecoveryRetentionLeases(ActionListener.wrap( + r -> logger.trace("created missing peer recovery retention leases"), + e -> logger.debug("failed creating missing peer recovery retention leases", e)))); } /** diff --git a/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java new file mode 100644 index 0000000000000..4611495e6f2db --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/PeerRecoveryRetentionLeaseCreationIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.seqno; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.VersionUtils; + +import java.nio.file.Path; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0) +public class PeerRecoveryRetentionLeaseCreationIT extends ESIntegTestCase { + + @Override + protected boolean forbidPrivateIndexSettings() { + return false; + } + + public void testCanRecoverFromStoreWithoutPeerRecoveryRetentionLease() throws Exception { + /* + * In a full cluster restart from a version without peer-recovery retention leases, the leases on disk will not include a lease for + * the local node. The same sort of thing can happen in weird situations This test ensures that a primary that is recovering from + * store creates a lease for itself. + */ + + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final Path[] nodeDataPaths = internalCluster().getInstance(NodeEnvironment.class, dataNode).nodeDataPaths(); + + assertAcked(prepareCreate("index").setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_VERSION_CREATED, + VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT)))); + ensureGreen("index"); + + // Change the node ID so that the persisted retention lease no longer applies. + final String oldNodeId = client().admin().cluster().prepareNodesInfo(dataNode).clear().get().getNodes().get(0).getNode().getId(); + final String newNodeId = randomValueOtherThan(oldNodeId, () -> UUIDs.randomBase64UUID(random())); + + internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + final NodeMetaData nodeMetaData = new NodeMetaData(newNodeId, Version.CURRENT); + NodeMetaData.FORMAT.writeAndCleanup(nodeMetaData, nodeDataPaths); + return Settings.EMPTY; + } + }); + + ensureGreen("index"); + assertThat(client().admin().cluster().prepareNodesInfo(dataNode).clear().get().getNodes().get(0).getNode().getId(), + equalTo(newNodeId)); + final RetentionLeases retentionLeases = client().admin().indices().prepareStats("index").get().getShards()[0] + .getRetentionLeaseStats().retentionLeases(); + assertTrue("expected lease for [" + newNodeId + "] in " + retentionLeases, + retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(newNodeId))); + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java index 55807161d51ad..9b2eda120d8b9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseUtils.java @@ -18,11 +18,27 @@ */ package org.elasticsearch.index.seqno; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import org.junit.Assert; + +import java.io.IOException; +import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.hasItems; + public class RetentionLeaseUtils { private RetentionLeaseUtils() { @@ -45,4 +61,39 @@ public static Map toMapExcludingPeerRecoveryRetentionLea }, LinkedHashMap::new)); } + + /** + * Asserts that every copy of every shard of the given index has a peer recovery retention lease according to the stats exposed by the + * REST API + */ + public static void assertAllCopiesHavePeerRecoveryRetentionLeases(RestClient restClient, String index) throws IOException { + final Request statsRequest = new Request("GET", "/" + index + "/_stats"); + statsRequest.addParameter("level", "shards"); + final Map shardsStats = ObjectPath.createFromResponse(restClient.performRequest(statsRequest)) + .evaluate("indices." + index + ".shards"); + for (Map.Entry shardCopiesEntry : shardsStats.entrySet()) { + final List shardCopiesList = (List) shardCopiesEntry.getValue(); + + final Set expectedLeaseIds = new HashSet<>(); + for (Object shardCopyStats : shardCopiesList) { + final String nodeId + = Objects.requireNonNull((String) ((Map) (((Map) shardCopyStats).get("routing"))).get("node")); + expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId( + ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L))); + } + + final Set actualLeaseIds = new HashSet<>(); + for (Object shardCopyStats : shardCopiesList) { + final List leases + = (List) ((Map) (((Map) shardCopyStats).get("retention_leases"))).get("leases"); + for (Object lease : leases) { + actualLeaseIds.add(Objects.requireNonNull((String) (((Map) lease).get("id")))); + } + } + Assert.assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds + + " but expected " + expectedLeaseIds, + actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0]))); + } + } }