Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added CCR rolling upgrade tests #36648

Merged
merged 18 commits into from
Jan 8, 2019
Merged
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.upgrades;

import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class CCRIT extends AbstractUpgradeTestCase {

private static final Logger LOGGER = LogManager.getLogger(CCRIT.class);

private static final Version UPGRADE_FROM_VERSION =
Version.fromString(System.getProperty("tests.upgrade_from_version"));

private static final boolean SECOND_ROUND = "false".equals(System.getProperty("tests.first_round"));

@Override
protected boolean preserveClusterSettings() {
return true;
}

public void testIndexFollowing() throws Exception {
assumeTrue("CCR became available in 6.5 and test relies on a fix that was shipped with 6.5.4",
UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_5_4));
setupRemoteCluster();

final String leaderIndex = "my-leader-index";
final String followerIndex = "my-follower-index";

switch (CLUSTER_TYPE) {
case OLD:
Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.put("index.number_of_shards", 1)
.build();
createIndex(leaderIndex, indexSettings);
followIndex(leaderIndex, followerIndex);
index(leaderIndex, "1");
assertDocumentExists(leaderIndex, "1");
assertBusy(() -> {
assertFollowerGlobalCheckpoint(followerIndex, 0);
assertDocumentExists(followerIndex, "1");
});
break;
case MIXED:
if (SECOND_ROUND == false) {
index(leaderIndex, "2");
assertDocumentExists(leaderIndex, "1", "2");
assertBusy(() -> {
assertFollowerGlobalCheckpoint(followerIndex, 1);
assertDocumentExists(followerIndex, "1", "2");
});
} else {
index(leaderIndex, "3");
assertDocumentExists(leaderIndex, "3");
assertBusy(() -> {
assertFollowerGlobalCheckpoint(followerIndex, 2);
assertDocumentExists(followerIndex, "3");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we also verify doc1, and doc2

});
}
break;
case UPGRADED:
index(leaderIndex, "4");
assertDocumentExists(leaderIndex, "4");
assertBusy(() -> {
assertFollowerGlobalCheckpoint(followerIndex, 3);
assertDocumentExists(followerIndex, "4");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we also verify doc1, doc2, and doc3?

});
stopIndexFollowing(followerIndex);
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}
}

public void testAutoFollowing() throws Exception {
assumeTrue("CCR became available in 6.5 and test relies on a fix that was shipped with 6.5.4",
UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_5_4));
setupRemoteCluster();

final Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.put("index.number_of_shards", 1)
.build();

String leaderIndex1 = "logs-20200101";
String leaderIndex2 = "logs-20200102";
String leaderIndex3 = "logs-20200103";

switch (CLUSTER_TYPE) {
case OLD:
putAutoFollowPattern("test_pattern", "logs-*");
createIndex(leaderIndex1, indexSettings);
index(leaderIndex1, "1");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1));
assertFollowerGlobalCheckpoint(followerIndex, 0);
assertDocumentExists(followerIndex, "1");
});
break;
case MIXED:
if (SECOND_ROUND == false) {
index(leaderIndex1, "2");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertFollowerGlobalCheckpoint(followerIndex, 1);
assertDocumentExists(followerIndex, "2");
});
// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createIndex(leaderIndex2, indexSettings);
index(leaderIndex2, "1");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertFollowerGlobalCheckpoint(followerIndex, 0);
assertDocumentExists(followerIndex, "1");
});
} else {
index(leaderIndex1, "3");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertFollowerGlobalCheckpoint(followerIndex, 2);
assertDocumentExists(followerIndex, "3");
});
index(leaderIndex2, "2");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertFollowerGlobalCheckpoint(followerIndex, 1);
assertDocumentExists(followerIndex, "2");
});

// Auto follow stats are kept in-memory on master elected node
// and if this node get updated then auto follow stats are reset
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
createIndex(leaderIndex3, indexSettings);
index(leaderIndex3, "1");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
assertFollowerGlobalCheckpoint(followerIndex, 0);
assertDocumentExists(followerIndex, "1");
});
}
break;
case UPGRADED:
index(leaderIndex1, "4");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex1;
assertFollowerGlobalCheckpoint(followerIndex, 3);
assertDocumentExists(followerIndex, "4");
});
index(leaderIndex2, "3");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex2;
assertFollowerGlobalCheckpoint(followerIndex, 2);
assertDocumentExists(followerIndex, "3");
});
index(leaderIndex3, "2");
assertBusy(() -> {
String followerIndex = "copy-" + leaderIndex3;
assertFollowerGlobalCheckpoint(followerIndex, 1);
assertDocumentExists(followerIndex, "2");
});

deleteAutoFollowPattern("test_pattern");

stopIndexFollowing("copy-" + leaderIndex1);
stopIndexFollowing("copy-" + leaderIndex2);
stopIndexFollowing("copy-" + leaderIndex3);
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}
}

private static void stopIndexFollowing(String followerIndex) throws IOException {
pauseFollow(followerIndex);
closeIndex(followerIndex);
unfollow(followerIndex);
}

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"remote_cluster\": \"local\", \"leader_index\": \"" + leaderIndex +
"\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void pauseFollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

private static void unfollow(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
}

private static void putAutoFollowPattern(String name, String pattern) throws IOException {
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"local\"," +
"\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void deleteAutoFollowPattern(String patternName) throws IOException {
Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
assertOK(client().performRequest(request));
}

private static void index(String index, String id) throws IOException {
Request request = new Request("POST", "/" + index + "/_doc/" + id);
request.setJsonEntity("{}");
assertOK(client().performRequest(request));
}

private static void assertDocumentExists(String index, String... ids) throws IOException {
for (String id : ids) {
Request request = new Request("HEAD", "/" + index + "/_doc/" + id);
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
}
}

private static void setupRemoteCluster() throws IOException {
Request request = new Request("GET", "/_nodes");
Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
// Select node info of first node (we don't know the node id):
nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
String transportAddress = (String) nodesResponse.get("transport_address");

LOGGER.info("Configuring local remote cluster [{}]", transportAddress);
request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("{\"persistent\": {\"cluster.remote.local.seeds\": \"" + transportAddress + "\"}}");
assertThat(client().performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response);
if (actualSuccessfulFollowedIndices != null) {
return actualSuccessfulFollowedIndices;
} else {
return -1;
}
}

private void assertFollowerGlobalCheckpoint(String followerIndex, int expectedFollowerCheckpoint) throws IOException {
Request statsRequest = new Request("GET", "/" + followerIndex + "/_stats");
statsRequest.addParameter("level", "shards");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
LOGGER.info("INDEX STATS={}", response);
assertThat(((Map) response.get("indices")).size(), equalTo(1));
Integer actualFollowerCheckpoint = ObjectPath.eval("indices." + followerIndex + ".shards.0.0.seq_no.global_checkpoint", response);
assertThat(actualFollowerCheckpoint, equalTo(expectedFollowerCheckpoint));
}

private static Map<String, Object> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}

}