diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index fa3d4997efb4b..496ee9040a899 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -390,31 +390,24 @@ protected void runTask(UpdateTask task) { newClusterState = task.apply(previousClusterState); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - if (logger.isTraceEnabled()) { - logger.trace(() -> new ParameterizedMessage( - "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}", - executionTime, - previousClusterState.version(), - task.source, - previousClusterState.nodes(), - previousClusterState.routingTable(), - previousClusterState.getRoutingNodes()), - e); - } + logger.trace(() -> new ParameterizedMessage( + "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}", + executionTime, previousClusterState.version(), task.source, previousClusterState), e); warnAboutSlowTaskIfNeeded(executionTime, task.source); task.listener.onFailure(task.source, e); return; } if (previousClusterState == newClusterState) { - task.listener.onSuccess(task.source); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, task.source); + task.listener.onSuccess(task.source); } else { if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState); - } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source, + newClusterState); + } else { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source); } try { @@ -424,20 +417,19 @@ protected void runTask(UpdateTask task) { executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, task.source); + task.listener.onSuccess(task.source); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - final long version = newClusterState.version(); - final String stateUUID = newClusterState.stateUUID(); - final String fullState = newClusterState.toString(); - logger.warn(() -> new ParameterizedMessage( - "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", - executionTime, - version, - stateUUID, - task.source, - fullState), - e); - // TODO: do we want to call updateTask.onFailure here? + if (logger.isTraceEnabled()) { + logger.warn(new ParameterizedMessage( + "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", + executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e); + } else { + logger.warn(new ParameterizedMessage( + "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]", + executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e); + } + task.listener.onFailure(task.source, e); } } } @@ -454,17 +446,14 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl } } + logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version()); nodeConnectionsService.connectToNodes(newClusterState.nodes()); - logger.debug("applying cluster state version {}", newClusterState.version()); - try { - // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency - if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { - final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); - clusterSettings.applySettings(incomingSettings); - } - } catch (Exception ex) { - logger.warn("failed to apply cluster settings", ex); + // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency + if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) { + logger.debug("applying settings from cluster state with version {}", newClusterState.version()); + final Settings incomingSettings = clusterChangedEvent.state().metaData().settings(); + clusterSettings.applySettings(incomingSettings); } logger.debug("apply cluster state with version {}", newClusterState.version()); @@ -476,18 +465,12 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl state.set(newClusterState); callClusterStateListeners(clusterChangedEvent); - - task.listener.onSuccess(task.source); } private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) { clusterStateAppliers.forEach(applier -> { - try { - logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); - applier.applyClusterState(clusterChangedEvent); - } catch (Exception ex) { - logger.warn("failed to notify ClusterStateApplier", ex); - } + logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version()); + applier.applyClusterState(clusterChangedEvent); }); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 6951e33d5e741..00b04bff2e5fd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -69,6 +71,8 @@ public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; + private static final Logger logger = LogManager.getLogger(IngestService.class); + private final ClusterService clusterService; private final ScriptService scriptService; private final Map processorFactories; @@ -256,7 +260,11 @@ Map pipelines() { public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); Map originalPipelines = pipelines; - innerUpdatePipelines(event.previousState(), state); + try { + innerUpdatePipelines(event.previousState(), state); + } catch (ElasticsearchParseException e) { + logger.warn("failed to update ingest pipelines", e); + } //pipelines changed, so add the old metrics to the new metrics if (originalPipelines != pipelines) { pipelines.forEach((id, pipeline) -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 7db63ab120e91..be40f0c888362 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -26,10 +26,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; @@ -39,7 +39,9 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -52,6 +54,7 @@ import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; @@ -931,7 +934,7 @@ private void testAppliesNoMasterBlock(String noMasterBlockSetting, ClusterBlock cluster.runFor(defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING) + defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "detecting disconnection"); - assertThat(leader.clusterApplier.lastAppliedClusterState.blocks().global(), hasItem(expectedBlock)); + assertThat(leader.getLastAppliedClusterState().blocks().global(), hasItem(expectedBlock)); // TODO reboot the leader and verify that the same block is applied when it restarts } @@ -1525,12 +1528,12 @@ class ClusterNode { private Coordinator coordinator; private final DiscoveryNode localNode; private final MockPersistedState persistedState; - private FakeClusterApplier clusterApplier; private AckedFakeThreadPoolMasterService masterService; + private DisruptableClusterApplierService clusterApplierService; + private ClusterService clusterService; private TransportService transportService; private DisruptableMockTransport mockTransport; private List> extraJoinValidators = new ArrayList<>(); - private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; ClusterNode(int nodeIndex, boolean masterEligible) { this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier); @@ -1565,37 +1568,46 @@ protected Optional getDisruptableMockTransport(Transpo final Settings settings = Settings.builder() .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap - - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterApplier = new FakeClusterApplier(settings, clusterSettings); - masterService = new AckedFakeThreadPoolMasterService("test_node", "test", - runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(this::onNode), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); + masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", + runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable))); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings, + deterministicTaskQueue, this::onNode); + clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); + clusterService.setNodeConnectionsService( + new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode), + transportService) { + @Override + public void connectToNodes(DiscoveryNodes discoveryNodes) { + // override this method as it does blocking calls + } + }); final Collection> onJoinValidators = Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState, - Cluster.this::provideUnicastHosts, clusterApplier, onJoinValidators, Randomness.get()); + Cluster.this::provideUnicastHosts, clusterApplierService, onJoinValidators, Randomness.get()); masterService.setClusterStatePublisher(coordinator); logger.trace("starting up [{}]", localNode); transportService.start(); transportService.acceptIncomingRequests(); - masterService.start(); coordinator.start(); + clusterService.start(); coordinator.startInitialJoin(); } void close() { logger.trace("taking down [{}]", localNode); - //transportService.stop(); // does blocking stuff :/ - masterService.stop(); coordinator.stop(); - //transportService.close(); // does blocking stuff :/ - masterService.close(); + clusterService.stop(); + //transportService.stop(); // does blocking stuff :/ + clusterService.close(); coordinator.close(); + //transportService.close(); // does blocking stuff :/ } ClusterNode restartedNode() { @@ -1634,11 +1646,11 @@ ClusterState improveConfiguration(ClusterState currentState) { } void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) { - this.clusterStateApplyResponse = clusterStateApplyResponse; + clusterApplierService.clusterStateApplyResponse = clusterStateApplyResponse; } ClusterStateApplyResponse getClusterStateApplyResponse() { - return clusterStateApplyResponse; + return clusterApplierService.clusterStateApplyResponse; } Runnable onNode(Runnable runnable) { @@ -1739,7 +1751,7 @@ void onDisconnectEventFrom(ClusterNode clusterNode) { } ClusterState getLastAppliedClusterState() { - return clusterApplier.lastAppliedClusterState; + return clusterApplierService.state(); } void applyInitialConfiguration() { @@ -1769,84 +1781,6 @@ void applyInitialConfiguration() { private boolean isNotUsefullyBootstrapped() { return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false; } - - private class FakeClusterApplier implements ClusterApplier { - - final ClusterName clusterName; - private final ClusterSettings clusterSettings; - ClusterState lastAppliedClusterState; - - private FakeClusterApplier(Settings settings, ClusterSettings clusterSettings) { - clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - this.clusterSettings = clusterSettings; - } - - @Override - public void setInitialState(ClusterState initialState) { - assert lastAppliedClusterState == null; - assert initialState != null; - lastAppliedClusterState = initialState; - } - - @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { - switch (clusterStateApplyResponse) { - case SUCCEED: - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; - final ClusterState newClusterState = clusterStateSupplier.get(); - assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " - + oldClusterState.version() + " to stale version " + newClusterState.version(); - clusterApplier.lastAppliedClusterState = newClusterState; - final Settings incomingSettings = newClusterState.metaData().settings(); - clusterSettings.applySettings(incomingSettings); // TODO validation might throw exceptions here. - listener.onSuccess(source); - } - - @Override - public String toString() { - return "apply cluster state from [" + source + "]"; - } - })); - break; - case FAIL: - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - listener.onFailure(source, new ElasticsearchException("cluster state application failed")); - } - - @Override - public String toString() { - return "fail to apply cluster state from [" + source + "]"; - } - })); - break; - case HANG: - if (randomBoolean()) { - deterministicTaskQueue.scheduleNow(onNode(new Runnable() { - @Override - public void run() { - final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; - final ClusterState newClusterState = clusterStateSupplier.get(); - assert oldClusterState.version() <= newClusterState.version() : - "updating cluster state from version " - + oldClusterState.version() + " to stale version " + newClusterState.version(); - clusterApplier.lastAppliedClusterState = newClusterState; - } - - @Override - public String toString() { - return "apply cluster state from [" + source + "] without ack"; - } - })); - } - break; - } - } - } } private List provideUnicastHosts(HostsResolver ignored) { @@ -1938,6 +1872,52 @@ public void onNodeAck(DiscoveryNode node, Exception e) { } } + static class DisruptableClusterApplierService extends ClusterApplierService { + private final String nodeName; + private final DeterministicTaskQueue deterministicTaskQueue; + ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; + + DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, + DeterministicTaskQueue deterministicTaskQueue, Function runnableWrapper) { + super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper)); + this.nodeName = nodeName; + this.deterministicTaskQueue = deterministicTaskQueue; + addStateApplier(event -> { + switch (clusterStateApplyResponse) { + case SUCCEED: + case HANG: + final ClusterState oldClusterState = event.previousState(); + final ClusterState newClusterState = event.state(); + assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " + + oldClusterState.version() + " to stale version " + newClusterState.version(); + break; + case FAIL: + throw new ElasticsearchException("simulated cluster state applier failure"); + } + }); + } + + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue); + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + if (clusterStateApplyResponse == ClusterStateApplyResponse.HANG) { + if (randomBoolean()) { + // apply cluster state, but don't notify listener + super.onNewClusterState(source, clusterStateSupplier, (source1, e) -> { + // ignore result + }); + } + } else { + super.onNewClusterState(source, clusterStateSupplier, listener); + } + } + + } + private static DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) { final TransportAddress address = buildNewFakeTransportAddress(); return new DiscoveryNode("", "node" + nodeIndex, diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 2690909489c75..770ae68e1285f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -28,8 +28,10 @@ import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; @@ -53,6 +55,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; public class ClusterApplierServiceTests extends ESTestCase { @@ -357,6 +360,97 @@ public void onFailure(String source, Exception e) { assertTrue(applierCalled.get()); } + public void testClusterStateApplierBubblesUpExceptionsInApplier() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + clusterApplierService.addStateApplier(event -> { + throw new RuntimeException("dummy exception"); + }); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(String source, Exception e) { + assertTrue(error.compareAndSet(null, e)); + latch.countDown(); + } + } + ); + + latch.await(); + assertNotNull(error.get()); + assertThat(error.get().getMessage(), containsString("dummy exception")); + } + + public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, + v -> {}); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()) + .metaData(MetaData.builder(clusterApplierService.state().metaData()) + .persistentSettings( + Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), false).build()) + .build()) + .build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(String source, Exception e) { + assertTrue(error.compareAndSet(null, e)); + latch.countDown(); + } + } + ); + + latch.await(); + assertNotNull(error.get()); + assertThat(error.get().getMessage(), containsString("illegal value can't update")); + } + + public void testClusterStateApplierSwallowsExceptionInListener() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + AtomicBoolean applierCalled = new AtomicBoolean(); + clusterApplierService.addListener(event -> { + assertTrue(applierCalled.compareAndSet(false, true)); + throw new RuntimeException("dummy exception"); + }); + + CountDownLatch latch = new CountDownLatch(1); + clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), + new ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + error.compareAndSet(null, e); + } + } + ); + + latch.await(); + assertNull(error.get()); + assertTrue(applierCalled.get()); + } + public void testClusterStateApplierCanCreateAnObserver() throws InterruptedException { AtomicReference error = new AtomicReference<>(); AtomicBoolean applierCalled = new AtomicBoolean(); @@ -407,10 +501,12 @@ public void onFailure(String source, Exception e) { static class TimedClusterApplierService extends ClusterApplierService { + final ClusterSettings clusterSettings; public volatile Long currentTimeOverride = null; TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super("test_node", settings, clusterSettings, threadPool); + this.clusterSettings = clusterSettings; } @Override diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 3dde7babb0a96..e5aea1f5d5ce1 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; @@ -39,11 +42,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; import org.mockito.ArgumentMatcher; @@ -254,7 +259,7 @@ public void testPut() { assertThat(pipeline.getProcessors().size(), equalTo(0)); } - public void testPutWithErrorResponse() { + public void testPutWithErrorResponse() throws IllegalAccessException { IngestService ingestService = createWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); @@ -265,11 +270,22 @@ public void testPutWithErrorResponse() { new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "test1", + IngestService.class.getCanonicalName(), + Level.WARN, + "failed to update ingest pipelines")); + Logger ingestLogger = LogManager.getLogger(IngestService.class); + Loggers.addAppender(ingestLogger, mockAppender); try { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - fail("should fail"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), equalTo("[processors] required property is missing")); + mockAppender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(ingestLogger, mockAppender); + mockAppender.stop(); } pipeline = ingestService.getPipeline(id); assertNotNull(pipeline); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index cc21fef5f5559..bcc10f1521b29 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -53,6 +53,12 @@ protected void afterExecute(Runnable r, Throwable t) { throw new KillWorkerError(); } + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + // ensures we don't block + return false; + } + private static final class KillWorkerError extends Error { } }