diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle index 3f222be0594a9..0df8740424def 100644 --- a/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle +++ b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle @@ -41,6 +41,7 @@ for (Version version : bwcVersions.wireCompatible) { Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner") leaderClusterTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'none' systemProperty 'tests.rest.cluster_name', 'leader' @@ -71,6 +72,7 @@ for (Version version : bwcVersions.wireCompatible) { Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner") followerClusterTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'none' systemProperty 'tests.rest.cluster_name', 'follower' @@ -115,6 +117,7 @@ for (Version version : bwcVersions.wireCompatible) { Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner") followerOneThirdUpgradedTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'one_third' systemProperty 'tests.rest.cluster_name', 'follower' @@ -135,6 +138,7 @@ for (Version version : bwcVersions.wireCompatible) { Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner") followerTwoThirdsUpgradedTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'two_third' systemProperty 'tests.rest.cluster_name', 'follower' @@ -155,6 +159,7 @@ for (Version version : bwcVersions.wireCompatible) { Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner") followerUpgradedClusterTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'all' systemProperty 'tests.rest.cluster_name', 'follower' @@ -181,6 +186,7 @@ for (Version version : bwcVersions.wireCompatible) { Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner") leaderOneThirdUpgradedTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'one_third' systemProperty 'tests.rest.cluster_name', 'leader' @@ -201,6 +207,7 @@ for (Version version : bwcVersions.wireCompatible) { Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner") leaderTwoThirdsUpgradedTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'two_third' systemProperty 'tests.rest.cluster_name', 'leader' @@ -221,6 +228,7 @@ for (Version version : bwcVersions.wireCompatible) { Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner") leaderUpgradedClusterTestRunner.configure { + systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '') systemProperty 'tests.rest.upgrade_state', 'all' systemProperty 'tests.rest.cluster_name', 'leader' diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java index d1420c54bb90d..3221e899eac36 100644 --- a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java +++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java @@ -7,6 +7,7 @@ import org.apache.http.HttpHost; import org.apache.http.util.EntityUtils; +import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -71,6 +72,9 @@ public static ClusterName parse(String value) { protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name")); + protected static final Version UPGRADE_FROM_VERSION = + Version.fromString(System.getProperty("tests.upgrade_from_version")); + private static RestClient leaderClient; private static RestClient followerClient; private static boolean initialized = false; diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java index 7f11931bd28f3..5e5c3d47c729b 100644 --- a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.support.XContentMapValues; import java.io.IOException; @@ -88,6 +89,128 @@ public void testIndexFollowing() throws Exception { } } + public void testAutoFollowing() throws Exception { + final Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .build(); + + String leaderIndex1 = "logs-20200101"; + String leaderIndex2 = "logs-20200102"; + String leaderIndex3 = "logs-20200103"; + + if (clusterName == ClusterName.LEADER) { + switch (upgradeState) { + case NONE: + case ONE_THIRD: + case TWO_THIRD: + break; + case ALL: + index(leaderClient(), leaderIndex1, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertTotalHitCount(followerIndex, 320, followerClient()); + }); + index(leaderClient(), leaderIndex2, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertTotalHitCount(followerIndex, 256, followerClient()); + }); + index(leaderClient(), leaderIndex3, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex3; + assertTotalHitCount(followerIndex, 192, followerClient()); + }); + + deleteAutoFollowPattern(followerClient(), "test_pattern"); + stopIndexFollowing(followerClient(), "copy-" + leaderIndex1); + stopIndexFollowing(followerClient(), "copy-" + leaderIndex2); + stopIndexFollowing(followerClient(), "copy-" + leaderIndex3); + break; + default: + throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]"); + } + } else if (clusterName == ClusterName.FOLLOWER) { + switch (upgradeState) { + case NONE: + putAutoFollowPattern(followerClient(), "test_pattern", "leader", "logs-*"); + createIndex(leaderIndex1, indexSettings); + index(leaderClient(), leaderIndex1, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1)); + assertTotalHitCount(followerIndex, 64, followerClient()); + }); + break; + case ONE_THIRD: + index(leaderClient(), leaderIndex1, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertTotalHitCount(followerIndex, 128, followerClient()); + }); + // Auto follow stats are kept in-memory on master elected node + // and if this node get updated then auto follow stats are reset + { + int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + createIndex(leaderIndex2, indexSettings); + index(leaderClient(), leaderIndex2, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); + assertTotalHitCount(followerIndex, 64, followerClient()); + }); + } + break; + case TWO_THIRD: + index(leaderClient(), leaderIndex1, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertTotalHitCount(followerIndex, 192, followerClient()); + }); + index(leaderClient(), leaderIndex2, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertTotalHitCount(followerIndex, 128, followerClient()); + }); + + // Auto follow stats are kept in-memory on master elected node + // and if this node get updated then auto follow stats are reset + { + int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); + createIndex(leaderIndex3, indexSettings); + index(leaderClient(), leaderIndex3, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex3; + assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); + assertTotalHitCount(followerIndex, 64, followerClient()); + }); + } + break; + case ALL: + index(leaderClient(), leaderIndex1, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex1; + assertTotalHitCount(followerIndex, 256, followerClient()); + }); + index(leaderClient(), leaderIndex2, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex2; + assertTotalHitCount(followerIndex, 192, followerClient()); + }); + index(leaderClient(), leaderIndex3, 64); + assertBusy(() -> { + String followerIndex = "copy-" + leaderIndex3; + assertTotalHitCount(followerIndex, 128, followerClient()); + }); + break; + default: + throw new UnsupportedOperationException("unexpected upgrade state [" + upgradeState + "]"); + } + } else { + throw new AssertionError("unexpected cluster_name [" + clusterName + "]"); + } + } + public void testCannotFollowLeaderInUpgradedCluster() throws Exception { assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL); @@ -134,6 +257,29 @@ private static void followIndex(RestClient client, String leaderCluster, String assertOK(client.performRequest(request)); } + private static void putAutoFollowPattern(RestClient client, String name, String remoteCluster, String pattern) throws IOException { + Request request = new Request("PUT", "/_ccr/auto_follow/" + name); + request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"" + remoteCluster + "\"," + + "\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}"); + assertOK(client.performRequest(request)); + } + + private static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException { + Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName); + assertOK(client.performRequest(request)); + } + + private int getNumberOfSuccessfulFollowedIndices() throws IOException { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response); + if (actualSuccessfulFollowedIndices != null) { + return actualSuccessfulFollowedIndices; + } else { + return -1; + } + } + private static void index(RestClient client, String index, int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final Request request = new Request("POST", "/" + index + "/_doc/"); @@ -162,4 +308,10 @@ private static void verifyTotalHitCount(final String index, assertThat(totalHits, equalTo(expectedTotalHits)); } + private static void stopIndexFollowing(RestClient client, String followerIndex) throws IOException { + assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow"))); + assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_close"))); + assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/unfollow"))); + } + } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java deleted file mode 100644 index 9fa34568a1e14..0000000000000 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/CCRIT.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.upgrades; - -import org.apache.http.util.EntityUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ObjectPath; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.json.JsonXContent; - -import java.io.IOException; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class CCRIT extends AbstractUpgradeTestCase { - - private static final Logger LOGGER = LogManager.getLogger(CCRIT.class); - - private static final Version UPGRADE_FROM_VERSION = - Version.fromString(System.getProperty("tests.upgrade_from_version")); - - private static final boolean SECOND_ROUND = "false".equals(System.getProperty("tests.first_round")); - - @Override - protected boolean preserveClusterSettings() { - return true; - } - - public void testIndexFollowing() throws Exception { - assumeTrue("CCR became available in 6.5, but test relies on a fix that was shipped with 6.6.0", - UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_6_0)); - setupRemoteCluster(); - - final String leaderIndex = "my-leader-index"; - final String followerIndex = "my-follower-index"; - - switch (CLUSTER_TYPE) { - case OLD: - Settings indexSettings = Settings.builder() - .put("index.soft_deletes.enabled", true) - .put("index.number_of_shards", 1) - .build(); - createIndex(leaderIndex, indexSettings); - followIndex(leaderIndex, followerIndex); - index(leaderIndex, "1"); - assertDocumentExists(leaderIndex, "1"); - assertBusy(() -> { - assertFollowerGlobalCheckpoint(followerIndex, 0); - assertDocumentExists(followerIndex, "1"); - }); - break; - case MIXED: - if (SECOND_ROUND == false) { - index(leaderIndex, "2"); - assertDocumentExists(leaderIndex, "1", "2"); - assertBusy(() -> { - assertFollowerGlobalCheckpoint(followerIndex, 1); - assertDocumentExists(followerIndex, "1", "2"); - }); - } else { - index(leaderIndex, "3"); - assertDocumentExists(leaderIndex, "1", "2", "3"); - assertBusy(() -> { - assertFollowerGlobalCheckpoint(followerIndex, 2); - assertDocumentExists(followerIndex, "1", "2", "3"); - }); - } - break; - case UPGRADED: - index(leaderIndex, "4"); - assertDocumentExists(leaderIndex, "1", "2", "3", "4"); - assertBusy(() -> { - assertFollowerGlobalCheckpoint(followerIndex, 3); - assertDocumentExists(followerIndex, "1", "2", "3", "4"); - }); - stopIndexFollowing(followerIndex); - break; - default: - throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); - } - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37231") - public void testAutoFollowing() throws Exception { - assumeTrue("CCR became available in 6.5, but test relies on a fix that was shipped with 6.6.0", - UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_6_0)); - setupRemoteCluster(); - - final Settings indexSettings = Settings.builder() - .put("index.soft_deletes.enabled", true) - .put("index.number_of_shards", 1) - .build(); - - String leaderIndex1 = "logs-20200101"; - String leaderIndex2 = "logs-20200102"; - String leaderIndex3 = "logs-20200103"; - - switch (CLUSTER_TYPE) { - case OLD: - putAutoFollowPattern("test_pattern", "logs-*"); - createIndex(leaderIndex1, indexSettings); - index(leaderIndex1, "1"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex1; - assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1)); - assertFollowerGlobalCheckpoint(followerIndex, 0); - assertDocumentExists(followerIndex, "1"); - }); - break; - case MIXED: - if (SECOND_ROUND == false) { - index(leaderIndex1, "2"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex1; - assertFollowerGlobalCheckpoint(followerIndex, 1); - assertDocumentExists(followerIndex, "2"); - }); - // Auto follow stats are kept in-memory on master elected node - // and if this node get updated then auto follow stats are reset - int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); - createIndex(leaderIndex2, indexSettings); - index(leaderIndex2, "1"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex2; - assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); - assertFollowerGlobalCheckpoint(followerIndex, 0); - assertDocumentExists(followerIndex, "1"); - }); - } else { - index(leaderIndex1, "3"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex1; - assertFollowerGlobalCheckpoint(followerIndex, 2); - assertDocumentExists(followerIndex, "3"); - }); - index(leaderIndex2, "2"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex2; - assertFollowerGlobalCheckpoint(followerIndex, 1); - assertDocumentExists(followerIndex, "2"); - }); - - // Auto follow stats are kept in-memory on master elected node - // and if this node get updated then auto follow stats are reset - int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices(); - createIndex(leaderIndex3, indexSettings); - index(leaderIndex3, "1"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex3; - assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1)); - assertFollowerGlobalCheckpoint(followerIndex, 0); - assertDocumentExists(followerIndex, "1"); - }); - } - break; - case UPGRADED: - index(leaderIndex1, "4"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex1; - assertFollowerGlobalCheckpoint(followerIndex, 3); - assertDocumentExists(followerIndex, "4"); - }); - index(leaderIndex2, "3"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex2; - assertFollowerGlobalCheckpoint(followerIndex, 2); - assertDocumentExists(followerIndex, "3"); - }); - index(leaderIndex3, "2"); - assertBusy(() -> { - String followerIndex = "copy-" + leaderIndex3; - assertFollowerGlobalCheckpoint(followerIndex, 1); - assertDocumentExists(followerIndex, "2"); - }); - - deleteAutoFollowPattern("test_pattern"); - - stopIndexFollowing("copy-" + leaderIndex1); - stopIndexFollowing("copy-" + leaderIndex2); - stopIndexFollowing("copy-" + leaderIndex3); - break; - default: - throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); - } - } - - private static void stopIndexFollowing(String followerIndex) throws IOException { - pauseFollow(followerIndex); - closeIndex(followerIndex); - unfollow(followerIndex); - } - - private static void followIndex(String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"remote_cluster\": \"local\", \"leader_index\": \"" + leaderIndex + - "\", \"read_poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void pauseFollow(String followIndex) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow"))); - } - - private static void unfollow(String followIndex) throws IOException { - assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow"))); - } - - private static void putAutoFollowPattern(String name, String pattern) throws IOException { - Request request = new Request("PUT", "/_ccr/auto_follow/" + name); - request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"local\"," + - "\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}"); - assertOK(client().performRequest(request)); - } - - private static void deleteAutoFollowPattern(String patternName) throws IOException { - Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName); - assertOK(client().performRequest(request)); - } - - private static void index(String index, String id) throws IOException { - Request request = new Request("POST", "/" + index + "/_doc/" + id); - request.setJsonEntity("{}"); - assertOK(client().performRequest(request)); - } - - private static void assertDocumentExists(String index, String... ids) throws IOException { - for (String id : ids) { - Request request = new Request("HEAD", "/" + index + "/_doc/" + id); - Response response = client().performRequest(request); - assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - } - } - - private static void setupRemoteCluster() throws IOException { - Request request = new Request("GET", "/_nodes"); - Map nodesResponse = (Map) toMap(client().performRequest(request)).get("nodes"); - // Select node info of first node (we don't know the node id): - nodesResponse = (Map) nodesResponse.get(nodesResponse.keySet().iterator().next()); - String transportAddress = (String) nodesResponse.get("transport_address"); - - LOGGER.info("Configuring local remote cluster [{}]", transportAddress); - request = new Request("PUT", "/_cluster/settings"); - request.setJsonEntity("{\"persistent\": {\"cluster.remote.local.seeds\": \"" + transportAddress + "\"}}"); - assertThat(client().performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); - } - - private int getNumberOfSuccessfulFollowedIndices() throws IOException { - Request statsRequest = new Request("GET", "/_ccr/stats"); - Map response = toMap(client().performRequest(statsRequest)); - Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response); - if (actualSuccessfulFollowedIndices != null) { - return actualSuccessfulFollowedIndices; - } else { - return -1; - } - } - - private void assertFollowerGlobalCheckpoint(String followerIndex, int expectedFollowerCheckpoint) throws IOException { - Request statsRequest = new Request("GET", "/" + followerIndex + "/_stats"); - statsRequest.addParameter("level", "shards"); - // Just docs metric is sufficient here: - statsRequest.addParameter("metric", "docs"); - Map response = toMap(client().performRequest(statsRequest)); - LOGGER.info("INDEX STATS={}", response); - assertThat(((Map) response.get("indices")).size(), equalTo(1)); - Integer actualFollowerCheckpoint = ObjectPath.eval("indices." + followerIndex + ".shards.0.0.seq_no.global_checkpoint", response); - assertThat(actualFollowerCheckpoint, equalTo(expectedFollowerCheckpoint)); - } - - private static Map toMap(Response response) throws IOException { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); - } - -}