diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java index 35020556b1ed3..da5074b41aa4f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.unit.TimeValue; public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBuilder { @@ -100,4 +101,21 @@ public ClusterStateRequestBuilder setIndicesOptions(IndicesOptions indicesOption request.indicesOptions(indicesOptions); return this; } + + /** + * Causes the request to wait for the metadata version to advance to at least the given version. + * @param waitForMetaDataVersion The metadata version for which to wait + */ + public ClusterStateRequestBuilder setWaitForMetaDataVersion(long waitForMetaDataVersion) { + request.waitForMetaDataVersion(waitForMetaDataVersion); + return this; + } + + /** + * If {@link ClusterStateRequest#waitForMetaDataVersion()} is set then this determines how long to wait + */ + public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) { + request.waitForTimeout(waitForTimeout); + return this; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index d0fd08b690f21..4aad1a81cfe77 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -86,50 +87,50 @@ protected ClusterStateResponse newResponse() { protected void masterOperation(final ClusterStateRequest request, final ClusterState state, final ActionListener listener) throws IOException { - if (request.waitForMetaDataVersion() != null) { - final Predicate metadataVersionPredicate = clusterState -> { - return clusterState.metaData().version() >= request.waitForMetaDataVersion(); - }; - final ClusterStateObserver observer = - new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext()); - final ClusterState clusterState = observer.setAndGetObservedState(); - if (metadataVersionPredicate.test(clusterState)) { - buildResponse(request, clusterState, listener); - } else { - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - try { - buildResponse(request, state, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } + final Predicate acceptableClusterStatePredicate + = request.waitForMetaDataVersion() == null ? clusterState -> true + : clusterState -> clusterState.metaData().version() >= request.waitForMetaDataVersion(); + + final Predicate acceptableClusterStateOrNotMasterPredicate = request.local() + ? acceptableClusterStatePredicate + : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false); - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); + if (acceptableClusterStatePredicate.test(state)) { + ActionListener.completeWith(listener, () -> buildResponse(request, state)); + } else { + assert acceptableClusterStateOrNotMasterPredicate.test(state) == false; + new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext()) + .waitForNextChange(new ClusterStateObserver.Listener() { + + @Override + public void onNewClusterState(ClusterState newState) { + if (acceptableClusterStatePredicate.test(newState)) { + ActionListener.completeWith(listener, () -> buildResponse(request, newState)); + } else { + listener.onFailure(new NotMasterException( + "master stepped down waiting for metadata version " + request.waitForMetaDataVersion())); } + } - @Override - public void onTimeout(TimeValue timeout) { - try { - listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, true)); - } catch (Exception e) { - listener.onFailure(e); - } + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + try { + listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true)); + } catch (Exception e) { + listener.onFailure(e); } - }, metadataVersionPredicate); - } - } else { - ClusterState currentState = clusterService.state(); - buildResponse(request, currentState, listener); + } + }, acceptableClusterStateOrNotMasterPredicate); } } - private void buildResponse(final ClusterStateRequest request, - final ClusterState currentState, - final ActionListener listener) throws IOException { + private ClusterStateResponse buildResponse(final ClusterStateRequest request, + final ClusterState currentState) { logger.trace("Serving cluster state request using version {}", currentState.version()); ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName()); builder.version(currentState.version()); @@ -192,8 +193,7 @@ private void buildResponse(final ClusterStateRequest request, } } - listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(), false)); + return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false); } - } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java new file mode 100644 index 0000000000000..0d51f647ee28c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java @@ -0,0 +1,182 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.state; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ClusterBootstrapService; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; + +@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0) +public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + public void testNonLocalRequestAlwaysFindsMaster() throws Exception { + runRepeatedlyWhileChangingMaster(() -> { + final ClusterStateRequestBuilder clusterStateRequestBuilder = client().admin().cluster().prepareState() + .clear().setNodes(true).setMasterNodeTimeout("100ms"); + final ClusterStateResponse clusterStateResponse; + try { + clusterStateResponse = clusterStateRequestBuilder.get(); + } catch (MasterNotDiscoveredException e) { + return; // ok, we hit the disconnected node + } + assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId()); + }); + } + + public void testLocalRequestAlwaysSucceeds() throws Exception { + runRepeatedlyWhileChangingMaster(() -> { + final String node = randomFrom(internalCluster().getNodeNames()); + final DiscoveryNodes discoveryNodes = client(node).admin().cluster().prepareState() + .clear().setLocal(true).setNodes(true).setMasterNodeTimeout("100ms").get().getState().nodes(); + for (DiscoveryNode discoveryNode : discoveryNodes) { + if (discoveryNode.getName().equals(node)) { + return; + } + } + fail("nodes did not contain [" + node + "]: " + discoveryNodes); + }); + } + + public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception { + runRepeatedlyWhileChangingMaster(() -> { + final String node = randomFrom(internalCluster().getNodeNames()); + final long metadataVersion + = internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version(); + final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5); + final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin().cluster().prepareState() + .clear().setNodes(true).setMetaData(true) + .setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100)) + .setWaitForMetaDataVersion(waitForMetaDataVersion); + final ClusterStateResponse clusterStateResponse; + try { + clusterStateResponse = clusterStateRequestBuilder.get(); + } catch (MasterNotDiscoveredException e) { + return; // ok, we hit the disconnected node + } + if (clusterStateResponse.isWaitForTimedOut() == false) { + final ClusterState state = clusterStateResponse.getState(); + assertNotNull("should always contain a master node", state.nodes().getMasterNodeId()); + assertThat("waited for metadata version", state.metaData().version(), greaterThanOrEqualTo(waitForMetaDataVersion)); + } + }); + } + + public void testLocalRequestWaitsForMetadata() throws Exception { + runRepeatedlyWhileChangingMaster(() -> { + final String node = randomFrom(internalCluster().getNodeNames()); + final long metadataVersion + = internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version(); + final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5); + final ClusterStateResponse clusterStateResponse = client(node).admin().cluster() + .prepareState().clear().setLocal(true).setMetaData(true).setWaitForMetaDataVersion(waitForMetaDataVersion) + .setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100)) + .get(); + if (clusterStateResponse.isWaitForTimedOut() == false) { + final MetaData metaData = clusterStateResponse.getState().metaData(); + assertThat("waited for metadata version " + waitForMetaDataVersion + " with node " + node, + metaData.version(), greaterThanOrEqualTo(waitForMetaDataVersion)); + } + }); + } + + public void runRepeatedlyWhileChangingMaster(Runnable runnable) throws Exception { + internalCluster().startNodes(3); + + assertBusy(() -> assertThat(client().admin().cluster().prepareState().clear().setMetaData(true) + .get().getState().getLastCommittedConfiguration().getNodeIds().stream() + .filter(n -> ClusterBootstrapService.isBootstrapPlaceholder(n) == false).collect(Collectors.toSet()), hasSize(3))); + + final String masterName = internalCluster().getMasterName(); + + final AtomicBoolean shutdown = new AtomicBoolean(); + final Thread assertingThread = new Thread(() -> { + while (shutdown.get() == false) { + runnable.run(); + } + }, "asserting thread"); + + final Thread updatingThread = new Thread(() -> { + String value = "none"; + while (shutdown.get() == false) { + value = "none".equals(value) ? "all" : "none"; + final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames())); + assertAcked(client(nonMasterNode).admin().cluster().prepareUpdateSettings().setPersistentSettings( + Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), value))); + } + }, "updating thread"); + + final List mockTransportServices + = StreamSupport.stream(internalCluster().getInstances(TransportService.class).spliterator(), false) + .map(ts -> (MockTransportService) ts).collect(Collectors.toList()); + + assertingThread.start(); + updatingThread.start(); + + final MockTransportService masterTransportService + = (MockTransportService) internalCluster().getInstance(TransportService.class, masterName); + + for (MockTransportService mockTransportService : mockTransportServices) { + if (masterTransportService != mockTransportService) { + masterTransportService.addFailToSendNoConnectRule(mockTransportService); + mockTransportService.addFailToSendNoConnectRule(masterTransportService); + } + } + + assertBusy(() -> { + final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames())); + final String claimedMasterName = internalCluster().getMasterName(nonMasterNode); + assertThat(claimedMasterName, not(equalTo(masterName))); + }); + + shutdown.set(true); + assertingThread.join(); + updatingThread.join(); + internalCluster().close(); + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index f80a5befa83d9..38b9579eff046 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -86,7 +86,6 @@ public void testSimpleOnlyMasterNodeElection() throws IOException { .execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38331") public void testElectOnlyBetweenMasterNodes() throws Exception { internalCluster().setBootstrapMasterNodeIndex(0); logger.info("--> start data node / non master node"); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 4b5a85f65b29a..82b07672060cb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -167,7 +167,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1957,9 +1956,7 @@ public String getMasterName() { public String getMasterName(@Nullable String viaNode) { try { Client client = viaNode != null ? client(viaNode) : client(); - final DiscoveryNode masterNode = client.admin().cluster().prepareState().get().getState().nodes().getMasterNode(); - assertNotNull(masterNode); - return masterNode.getName(); + return client.admin().cluster().prepareState().get().getState().nodes().getMasterNode().getName(); } catch (Exception e) { logger.warn("Can't fetch cluster state", e); throw new RuntimeException("Can't get master node " + e.getMessage(), e);