Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add license checks for auto-follow implementation #33496

Merged
merged 8 commits into from
Sep 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public AbstractEventExpectation(String name, String logger, Level level, String

@Override
public void match(LogEvent event) {
if (event.getLevel().equals(level) && event.getLoggerName().equals(logger)) {
if (event.getLevel().equals(level) && event.getLoggerName().equals(logger) && innerMatch(event)) {
if (Regex.isSimpleMatchPattern(message)) {
if (Regex.simpleMatch(message, event.getMessage().getFormattedMessage())) {
saw = true;
Expand All @@ -97,6 +97,11 @@ public void match(LogEvent event) {
}
}
}

public boolean innerMatch(final LogEvent event) {
return true;
}

}

public static class UnseenEventExpectation extends AbstractEventExpectation {
Expand All @@ -123,6 +128,32 @@ public void assertMatched() {
}
}

public static class ExceptionSeenEventExpectation extends SeenEventExpectation {

private final Class<? extends Exception> clazz;
private final String exceptionMessage;

public ExceptionSeenEventExpectation(
final String name,
final String logger,
final Level level,
final String message,
final Class<? extends Exception> clazz,
final String exceptionMessage) {
super(name, logger, level, message);
this.clazz = clazz;
this.exceptionMessage = exceptionMessage;
}

@Override
public boolean innerMatch(final LogEvent event) {
return event.getThrown() != null
&& event.getThrown().getClass() == clazz
&& event.getThrown().getMessage().equals(exceptionMessage);
}

}

public static class PatternSeenEventExcpectation implements LoggingExpectation {

protected final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}

task writeJavaPolicy {
doLast {
final File javaPolicy = file("${buildDir}/tmp/java.policy")
javaPolicy.write(
[
"grant {",
" permission java.io.FilePermission \"${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log\", \"read\";",
"};"
].join("\n"))
}
}

task followClusterTest(type: RestIntegTestTask) {}
followClusterTest.dependsOn writeJavaPolicy

followClusterTestCluster {
dependsOn leaderClusterTestRunner
Expand All @@ -31,8 +44,10 @@ followClusterTestCluster {
}

followClusterTestRunner {
systemProperty 'java.security.policy', "file://${buildDir}/tmp/java.policy"
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'log', "${-> followClusterTest.getNodes().get(0).homeDir}/logs/${-> followClusterTest.getNodes().get(0).clusterName}.log"
finalizedBy 'leaderClusterTestCluster#stop'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.nio.file.Files;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;

public class CcrMultiClusterLicenseIT extends ESRestTestCase {
Expand All @@ -29,19 +34,52 @@ public void testFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
assertNonCompliantLicense(request);
}
}

public void testCreateAndFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
assertNonCompliantLicense(request);
}
}

private static void assertLicenseIncompatible(final Request request) {
public void testAutoFollow() throws Exception {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"]}");
client().performRequest(request);

// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
assertBusy(() -> {
final List<String> lines = Files.readAllLines(PathUtils.get(System.getProperty("log")));

final Iterator<String> it = lines.iterator();

boolean warn = false;
while (it.hasNext()) {
final String line = it.next();
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
"failure occurred during auto-follower coordination")) {
warn = true;
break;
}
}
assertTrue(warn);
assertTrue(it.hasNext());
final String lineAfterWarn = it.next();
assertThat(
lineAfterWarn,
equalTo("org.elasticsearch.ElasticsearchStatusException: " +
"can not fetch remote cluster state as the remote cluster [leader_cluster] is not licensed for [ccr]; " +
"the license mode [BASIC] on cluster [leader_cluster] does not enable [ccr]"));
});
}
}

private static void assertNonCompliantLicense(final Request request) {
final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
final String expected = String.format(
Locale.ROOT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService)
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Encapsulates licensing checking for CCR.
Expand Down Expand Up @@ -58,23 +59,89 @@ public boolean isCcrAllowed() {

/**
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
* of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
* remote cluster.
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
* Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param listener the listener
* @param onFailure the failure consumer
* @param leaderIndexMetadataConsumer the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
final Client client,
final String clusterAlias,
final String leaderIndex,
final ActionListener<T> listener,
final Consumer<Exception> onFailure,
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {

final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
request,
onFailure,
leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)),
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck),
e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e));
}

/**
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
* the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
* the remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
request,
onFailure,
leaderClusterStateConsumer,
CcrLicenseChecker::clusterStateNonCompliantRemoteLicense,
e -> clusterStateUnknownRemoteLicense(clusterAlias, e));
}

/**
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
* the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
* the remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
* @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
* @param <T> the type of response the listener is waiting for
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
Collections.singletonList(clusterAlias),
Expand All @@ -83,35 +150,25 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
final ActionListener<ClusterStateResponse> clusterStateListener = ActionListener.wrap(
r -> {
final ClusterState remoteClusterState = r.getState();
final IndexMetaData leaderIndexMetadata =
remoteClusterState.getMetaData().index(leaderIndex);
leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
},
listener::onFailure);
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener);
leaderClient.admin().cluster().state(request, clusterStateListener);
} else {
listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck));
onFailure.accept(nonCompliantLicense.apply(licenseCheck));
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e));
onFailure.accept(unknownLicense.apply(e));
}

});
}

private static ElasticsearchStatusException incompatibleRemoteLicense(
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
final String message = String.format(
Expand All @@ -127,7 +184,21 @@ private static ElasticsearchStatusException incompatibleRemoteLicense(
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}

private static ElasticsearchStatusException unknownRemoteLicense(
private static ElasticsearchStatusException clusterStateNonCompliantRemoteLicense(
final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
final String message = String.format(
Locale.ROOT,
"can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; %s",
clusterAlias,
RemoteClusterLicenseChecker.buildErrorMessage(
"ccr",
licenseCheck.remoteClusterLicenseInfo(),
RemoteClusterLicenseChecker::isLicensePlatinumOrTrial));
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}

private static ElasticsearchStatusException indexMetadataUnknownRemoteLicense(
final String leaderIndex, final String clusterAlias, final Exception cause) {
final String message = String.format(
Locale.ROOT,
Expand All @@ -138,4 +209,11 @@ private static ElasticsearchStatusException unknownRemoteLicense(
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
}

private static ElasticsearchStatusException clusterStateUnknownRemoteLicense(final String clusterAlias, final Exception cause) {
final String message = String.format(
Locale.ROOT,
"can not fetch remote cluster state as the license state of the remote cluster [%s] could not be determined", clusterAlias);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
}

}
Loading