diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle new file mode 100644 index 0000000000000..3f222be0594a9 --- /dev/null +++ b/x-pack/qa/rolling-upgrade-multi-cluster/build.gradle @@ -0,0 +1,262 @@ +import org.elasticsearch.gradle.Version +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + // "org.elasticsearch.plugin:x-pack-core:${version}" doesn't work with idea because the testArtifacts are also here + testCompile project(path: xpackModule('core'), configuration: 'default') + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') // to be moved in a later commit +} + +// This is a top level task which we will add dependencies to below. +// It is a single task that can be used to backcompat tests against all versions. +task bwcTest { + description = 'Runs backwards compatibility tests.' + group = 'verification' +} + +for (Version version : bwcVersions.wireCompatible) { + String taskPrefix = "v${version}" + + // ============================================================================================ + // Create leader cluster + // ============================================================================================ + + RestIntegTestTask leaderClusterTest = tasks.create(name: "${taskPrefix}#leader#clusterTest", type: RestIntegTestTask) { + mustRunAfter(precommit) + } + + configure(extensions.findByName("${taskPrefix}#leader#clusterTestCluster")) { + bwcVersion = version + numBwcNodes = 3 + numNodes = 3 + clusterName = 'leader' + setting 'xpack.security.enabled', 'false' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + } + + Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner") + leaderClusterTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'none' + systemProperty 'tests.rest.cluster_name', 'leader' + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}" + } + + // ============================================================================================ + // Create follower cluster + // ============================================================================================ + + RestIntegTestTask followerClusterTest = tasks.create(name: "${taskPrefix}#follower#clusterTest", type: RestIntegTestTask) { + mustRunAfter(precommit) + } + + configure(extensions.findByName("${taskPrefix}#follower#clusterTestCluster")) { + dependsOn leaderClusterTestRunner + bwcVersion = version + numBwcNodes = 3 + numNodes = 3 + clusterName = 'follower' + setting 'xpack.security.enabled', 'false' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + } + + Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner") + followerClusterTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'none' + systemProperty 'tests.rest.cluster_name', 'follower' + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}" + + systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(0).transportUri()}" + } + + // ============================================================================================ + // Upgrade follower cluster + // ============================================================================================ + + Closure configureUpgradeCluster = {String prefix, String cluster, String name, Task lastRunner, int stopNode, + RestIntegTestTask clusterTest, Closure getOtherUnicastHostAddresses -> + configure(extensions.findByName("${prefix}#${cluster}#${name}")) { + dependsOn lastRunner, "${prefix}#${cluster}#clusterTestCluster#node${stopNode}.stop" + clusterName = cluster + otherUnicastHostAddresses = { getOtherUnicastHostAddresses() } + minimumMasterNodes = { 2 } + autoSetInitialMasterNodes = false + /* Override the data directory so the new node always gets the node we + * just stopped's data directory. */ + dataDir = { nodeNumber -> clusterTest.nodes[stopNode].dataDir } + setting 'repositories.url.allowed_urls', 'http://snapshot.test*' + setting 'xpack.security.enabled', 'false' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + setting 'node.name', "upgraded-node-${cluster}-${stopNode}" + setting 'node.attr.upgraded', 'true' + } + } + + Task followerOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#follower#oneThirdUpgradedTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'follower', 'oneThirdUpgradedTestCluster', followerClusterTestRunner, 0, followerClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [followerClusterTest.nodes.get(1).transportUri(), followerClusterTest.nodes.get(2).transportUri()] }) + + Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner") + followerOneThirdUpgradedTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'one_third' + systemProperty 'tests.rest.cluster_name', 'follower' + + systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(1).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(1).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}" + + finalizedBy "${taskPrefix}#follower#clusterTestCluster#node1.stop" + } + + Task followerTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#follower#twoThirdsUpgradedTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'follower', 'twoThirdsUpgradedTestCluster', followerOneThirdUpgradedTestRunner, 1, followerClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [followerClusterTest.nodes.get(2).transportUri(), followerOneThirdUpgradedTest.nodes.get(0).transportUri()] }) + + Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner") + followerTwoThirdsUpgradedTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'two_third' + systemProperty 'tests.rest.cluster_name', 'follower' + + systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(2).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(2).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}" + + finalizedBy "${taskPrefix}#follower#clusterTestCluster#node2.stop" + } + + Task followerUpgradedClusterTest = tasks.create(name: "${taskPrefix}#follower#upgradedClusterTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'follower', 'upgradedClusterTestCluster', followerTwoThirdsUpgradedTestRunner, 2, followerClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [followerOneThirdUpgradedTest.nodes.get(0).transportUri(), followerTwoThirdsUpgradedTest.nodes.get(0).transportUri()] }) + + Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner") + followerUpgradedClusterTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'all' + systemProperty 'tests.rest.cluster_name', 'follower' + + systemProperty 'tests.follower_host', "${-> followerOneThirdUpgradedTest.nodes.get(0).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerOneThirdUpgradedTest.nodes.get(0).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}" + + // This is needed, otherwise leader node 0 will stop after the leaderClusterTestRunner task has run. + // Here it is ok to stop, because in the next task, the leader node 0 gets upgraded. + finalizedBy "v${version}#leader#clusterTestCluster#node0.stop" + } + + // ============================================================================================ + // Upgrade leader cluster + // ============================================================================================ + + Task leaderOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#leader#oneThirdUpgradedTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'leader', 'oneThirdUpgradedTestCluster', followerUpgradedClusterTestRunner, 0, leaderClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [leaderClusterTest.nodes.get(1).transportUri(), leaderClusterTest.nodes.get(2).transportUri()] }) + + Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner") + leaderOneThirdUpgradedTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'one_third' + systemProperty 'tests.rest.cluster_name', 'leader' + + systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(2).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(2).transportUri()}" + + finalizedBy "${taskPrefix}#leader#clusterTestCluster#node1.stop" + } + + Task leaderTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#leader#twoThirdsUpgradedTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'leader', 'twoThirdsUpgradedTestCluster', leaderOneThirdUpgradedTestRunner, 1, leaderClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [leaderClusterTest.nodes.get(2).transportUri(), leaderOneThirdUpgradedTest.nodes.get(0).transportUri()] }) + + Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner") + leaderTwoThirdsUpgradedTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'two_third' + systemProperty 'tests.rest.cluster_name', 'leader' + + systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderOneThirdUpgradedTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderOneThirdUpgradedTest.nodes.get(0).transportUri()}" + + finalizedBy "${taskPrefix}#leader#clusterTestCluster#node2.stop" + } + + Task leaderUpgradedClusterTest = tasks.create(name: "${taskPrefix}#leader#upgradedClusterTest", type: RestIntegTestTask) + + configureUpgradeCluster(taskPrefix, 'leader', "upgradedClusterTestCluster", leaderTwoThirdsUpgradedTestRunner, 2, leaderClusterTest, + // Use all running nodes as seed nodes so there is no race between pinging and the tests + { [leaderOneThirdUpgradedTest.nodes.get(0).transportUri(), leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()] }) + + Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner") + leaderUpgradedClusterTestRunner.configure { + systemProperty 'tests.rest.upgrade_state', 'all' + systemProperty 'tests.rest.cluster_name', 'leader' + + systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}" + systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}" + + systemProperty 'tests.leader_host', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).httpUri()}" + systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()}" + + /* + * Force stopping all the upgraded nodes after the test runner + * so they are alive during the test. + */ + finalizedBy "${taskPrefix}#follower#oneThirdUpgradedTestCluster#stop" + finalizedBy "${taskPrefix}#follower#twoThirdsUpgradedTestCluster#stop" + finalizedBy "${taskPrefix}#follower#upgradedClusterTestCluster#stop" + finalizedBy "${taskPrefix}#leader#oneThirdUpgradedTestCluster#stop" + finalizedBy "${taskPrefix}#leader#twoThirdsUpgradedTestCluster#stop" + } + + if (project.bwc_tests_enabled) { + Task versionBwcTest = tasks.create(name: "${taskPrefix}#bwcTest") { + dependsOn = [leaderUpgradedClusterTest] + } + bwcTest.dependsOn(versionBwcTest) + } +} + +unitTest.enabled = false // no unit tests for rolling upgrades, only the rest integration test + +// basic integ tests includes testing bwc against the most recent version +task integTest { + if (project.bwc_tests_enabled) { + for (final def version : bwcVersions.unreleasedWireCompatible) { + dependsOn "v${version}#bwcTest" + } + } +} +check.dependsOn(integTest) \ No newline at end of file diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java new file mode 100644 index 0000000000000..d1420c54bb90d --- /dev/null +++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/AbstractMultiClusterUpgradeTestCase.java @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.HttpHost; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.AfterClass; +import org.junit.Before; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public abstract class AbstractMultiClusterUpgradeTestCase extends ESRestTestCase { + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + enum UpgradeState { + NONE, + ONE_THIRD, + TWO_THIRD, + ALL; + + public static UpgradeState parse(String value) { + switch (value) { + case "none": + return NONE; + case "one_third": + return ONE_THIRD; + case "two_third": + return TWO_THIRD; + case "all": + return ALL; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + protected final UpgradeState upgradeState = UpgradeState.parse(System.getProperty("tests.rest.upgrade_state")); + + enum ClusterName { + LEADER, + FOLLOWER; + + public static ClusterName parse(String value) { + switch (value) { + case "leader": + return LEADER; + case "follower": + return FOLLOWER; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name")); + + private static RestClient leaderClient; + private static RestClient followerClient; + private static boolean initialized = false; + + @Before + public void initClientsAndConfigureClusters() throws IOException { + String leaderHost = System.getProperty("tests.leader_host"); + if (leaderHost == null) { + throw new AssertionError("leader host is missing"); + } + + if (initialized) { + return; + } + + String followerHost = System.getProperty("tests.follower_host"); + if (clusterName == ClusterName.LEADER) { + leaderClient = buildClient(leaderHost); + if (followerHost != null) { + followerClient = buildClient(followerHost); + } + } else if (clusterName == ClusterName.FOLLOWER) { + if (followerHost == null) { + throw new AssertionError("follower host is missing"); + } + + leaderClient = buildClient(leaderHost); + followerClient = buildClient(followerHost); + } else { + throw new AssertionError("unknown cluster name: " + clusterName); + } + + configureLeaderRemoteClusters(); + configureFollowerRemoteClusters(); + initialized = true; + } + + private void configureLeaderRemoteClusters() throws IOException { + String leaderRemoteClusterSeed = System.getProperty("tests.leader_remote_cluster_seed"); + if (leaderRemoteClusterSeed != null) { + logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed); + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity("{\"persistent\": {\"cluster.remote.leader.seeds\": \"" + leaderRemoteClusterSeed + "\"}}"); + assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); + if (followerClient != null) { + assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); + } + } else { + logger.info("No leader remote cluster seed found."); + } + } + + private void configureFollowerRemoteClusters() throws IOException { + String followerRemoteClusterSeed = System.getProperty("tests.follower_remote_cluster_seed"); + if (followerRemoteClusterSeed != null) { + logger.info("Configuring follower remote cluster [{}]", followerRemoteClusterSeed); + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity("{\"persistent\": {\"cluster.remote.follower.seeds\": \"" + followerRemoteClusterSeed + "\"}}"); + assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); + assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200)); + } else { + logger.info("No follower remote cluster seed found."); + } + } + + @AfterClass + public static void destroyClients() throws IOException { + try { + IOUtils.close(leaderClient, followerClient); + } finally { + leaderClient = null; + followerClient = null; + } + } + + protected static RestClient leaderClient() { + return leaderClient; + } + + protected static RestClient followerClient() { + return followerClient; + } + + private RestClient buildClient(final String url) throws IOException { + int portSeparator = url.lastIndexOf(':'); + HttpHost httpHost = new HttpHost(url.substring(0, portSeparator), + Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); + return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); + } + + protected static Map toMap(Response response) throws IOException { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); + } + +} diff --git a/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java new file mode 100644 index 0000000000000..7f11931bd28f3 --- /dev/null +++ b/x-pack/qa/rolling-upgrade-multi-cluster/src/test/java/org/elasticsearch/upgrades/CcrRollingUpgradeIT.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase { + + public void testIndexFollowing() throws Exception { + logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState); + + if (clusterName == ClusterName.LEADER) { + switch (upgradeState) { + case NONE: + createLeaderIndex(leaderClient(), "leader_index1"); + index(leaderClient(), "leader_index1", 64); + createLeaderIndex(leaderClient(), "leader_index2"); + index(leaderClient(), "leader_index2", 64); + break; + case ONE_THIRD: + break; + case TWO_THIRD: + break; + case ALL: + createLeaderIndex(leaderClient(), "leader_index4"); + followIndex(followerClient(), "leader", "leader_index4", "follower_index4"); + index(leaderClient(), "leader_index4", 64); + assertTotalHitCount("follower_index4", 64, followerClient()); + break; + default: + throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]"); + } + } else if (clusterName == ClusterName.FOLLOWER) { + switch (upgradeState) { + case NONE: + followIndex(followerClient(), "leader", "leader_index1", "follower_index1"); + assertTotalHitCount("follower_index1", 64, followerClient()); + break; + case ONE_THIRD: + index(leaderClient(), "leader_index1", 64); + assertTotalHitCount("follower_index1", 128, followerClient()); + + followIndex(followerClient(), "leader", "leader_index2", "follower_index2"); + assertTotalHitCount("follower_index2", 64, followerClient()); + break; + case TWO_THIRD: + index(leaderClient(), "leader_index1", 64); + assertTotalHitCount("follower_index1", 192, followerClient()); + + index(leaderClient(), "leader_index2", 64); + assertTotalHitCount("follower_index2", 128, followerClient()); + + createLeaderIndex(leaderClient(), "leader_index3"); + index(leaderClient(), "leader_index3", 64); + followIndex(followerClient(), "leader", "leader_index3", "follower_index3"); + assertTotalHitCount("follower_index3", 64, followerClient()); + break; + case ALL: + index(leaderClient(), "leader_index1", 64); + assertTotalHitCount("follower_index1", 256, followerClient()); + + index(leaderClient(), "leader_index2", 64); + assertTotalHitCount("follower_index2", 192, followerClient()); + + index(leaderClient(), "leader_index3", 64); + assertTotalHitCount("follower_index3", 128, followerClient()); + break; + default: + throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]"); + } + } else { + throw new AssertionError("unexpected cluster_name [" + clusterName + "]"); + } + } + + public void testCannotFollowLeaderInUpgradedCluster() throws Exception { + assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL); + + if (clusterName == ClusterName.FOLLOWER) { + // At this point the leader cluster has not been upgraded, but follower cluster has been upgrade. + // Create a leader index in the follow cluster and try to follow it in the leader cluster. + // This should fail, because the leader cluster at this point in time can't do file based recovery from follower. + createLeaderIndex(followerClient(), "not_supported"); + index(followerClient(), "not_supported", 64); + + ResponseException e = expectThrows(ResponseException.class, + () -> followIndex(leaderClient(), "follower", "not_supported", "not_supported")); + assertThat(e.getMessage(), containsString("the snapshot was created with Elasticsearch version [")); + assertThat(e.getMessage(), containsString("] which is higher than the version of this node [")); + } else if (clusterName == ClusterName.LEADER) { + // At this point all nodes in both clusters have been updated and + // the leader cluster can now follow leader_index4 in the follower cluster: + followIndex(leaderClient(), "follower", "not_supported", "not_supported"); + assertTotalHitCount("not_supported", 64, leaderClient()); + } else { + throw new AssertionError("unexpected cluster_name [" + clusterName + "]"); + } + } + + private static void createLeaderIndex(RestClient client, String indexName) throws IOException { + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + createIndex(client, indexName, indexSettings); + } + + private static void createIndex(RestClient client, String name, Settings settings) throws IOException { + Request request = new Request("PUT", "/" + name); + request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings) + "}"); + client.performRequest(request); + } + + private static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException { + final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1"); + request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex + + "\", \"read_poll_timeout\": \"10ms\"}"); + assertOK(client.performRequest(request)); + } + + private static void index(RestClient client, String index, int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final Request request = new Request("POST", "/" + index + "/_doc/"); + request.setJsonEntity("{}"); + assertOK(client.performRequest(request)); + if (randomIntBetween(0, 5) == 3) { + assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh"))); + } + } + } + + private static void assertTotalHitCount(final String index, + final int expectedTotalHits, + final RestClient client) throws Exception { + assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh"))); + assertBusy(() -> verifyTotalHitCount(index, expectedTotalHits, client)); + } + + private static void verifyTotalHitCount(final String index, + final int expectedTotalHits, + final RestClient client) throws IOException { + final Request request = new Request("GET", "/" + index + "/_search"); + request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + Map response = toMap(client.performRequest(request)); + final int totalHits = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(totalHits, equalTo(expectedTotalHits)); + } + +}